[FEAT MERGE] transfer without kill tx

Co-authored-by: Minionyh <minions.yh@gmail.com>
Co-authored-by: KyrielightWei <weixx1203@outlook.com>
This commit is contained in:
Handora
2023-12-08 14:12:30 +00:00
committed by ob-robot
parent 17d06c023b
commit ff0ec7e185
125 changed files with 10829 additions and 1129 deletions

View File

@ -59,7 +59,9 @@ int MockOb2pcCtx::init(ObMailBoxMgr<ObTwoPhaseCommitMsgType> *mgr)
tx_state_ = ObTxState::INIT;
log_queue_.clear();
participants_.clear();
intermediate_participants_.clear();
coordinator_ = -1;
sender_ = -1;
mailbox_mgr_ = mgr;
if (OB_FAIL(mailbox_mgr_->register_mailbox(addr_, mailbox_, this))) {
TRANS_LOG(ERROR, "mock ctx register mailbox failed");
@ -72,6 +74,7 @@ int MockOb2pcCtx::commit(const MockObParticipants& participants)
{
ObLockGuard<ObSpinLock> lock_guard(latch_);
participants_.assign(participants.begin(), participants.end());
coordinator_ = addr_;
return two_phase_commit();
}
@ -131,12 +134,13 @@ int MockOb2pcCtx::on_clear()
Ob2PCRole MockOb2pcCtx::get_2pc_role() const
{
Ob2PCRole role;
if (participants_.size()!=0) {
if (coordinator_ == -1) {
role = Ob2PCRole::UNKNOWN;
} else if (addr_ == coordinator_) {
role = Ob2PCRole::ROOT;
} else if (participants_.size()==0) {
} else if (0 == participants_.size()) {
// not root & downstream is empty
role = Ob2PCRole::LEAF;
} else {
@ -180,21 +184,33 @@ int MockOb2pcCtx::post_msg(const ObTwoPhaseCommitMsgType& msg_type,
from = mailbox_.addr_;
if (participant == OB_C2PC_UPSTREAM_ID) {
to = coordinator_;
} else if (participant == OB_C2PC_SENDER_ID) {
if (-1 != sender_) {
to = sender_;
} else if (-1 != coordinator_) {
to = coordinator_;
} else {
to = -1;
}
} else {
to = participants_[participant];
}
mail.init(from, to, sizeof(ObTwoPhaseCommitMsgType), msg_type);
if (-1 == coordinator_
if (-1 == to
&& participant == OB_C2PC_UPSTREAM_ID
&& ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP == msg_type) {
TRANS_LOG(INFO, "self decide abort", K(ret), K(msg_type), K(participant),
K(mail));
} else if (-1 == coordinator_
} else if (-1 == to
&& participant == OB_C2PC_UPSTREAM_ID) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(ERROR, "invalid dst", K(ret), K(msg_type), K(participant),
K(mail));
} else if (-1 == to
&& participant == OB_C2PC_SENDER_ID) {
TRANS_LOG(INFO, "new transfer without sender", K(ret), K(msg_type), K(participant),
K(mail), K(to));
} else if (mail.from_ != mail.to_) {
if (OB_FAIL(mailbox_mgr_->send(mail, mail.to_))) {
TRANS_LOG(WARN, "send mailbox failed", K(ret), K(msg_type), K(participant));
@ -234,11 +250,14 @@ int MockOb2pcCtx::handle(const ObMail<ObTwoPhaseCommitMsgType> &mail)
ObTwoPhaseCommitMsgType type = *mail.mail_;
ObLockGuard<ObSpinLock> lock_guard(latch_);
sender_ = mail.from_;
if ((participant_id = find_participant_id(mail.from_)) == -1
&& is_2pc_response_msg(type)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(ERROR, "2pc request with wrong participant id", K(ret));
} else if (is_2pc_request_msg(type)
&& -1 == coordinator_
&& FALSE_IT(coordinator_ = mail.from_)) {
} else if (is_2pc_request_msg(type)
&& OB_FAIL(handle_2pc_req(type))) {
@ -250,6 +269,8 @@ int MockOb2pcCtx::handle(const ObMail<ObTwoPhaseCommitMsgType> &mail)
TRANS_LOG(INFO, "handle msg success", K(addr_), K(mail), K(*this));
}
sender_ = -1;
return ret;
}
@ -416,5 +437,103 @@ bool MockOb2pcCtx::is_sub2pc() const
return false;
}
int MockOb2pcCtx::merge_intermediate_participants()
{
int ret = OB_SUCCESS;
bool exist = false;
for (int64_t i = 0; i < intermediate_participants_.size(); i++) {
exist = false;
for (int64_t j = 0; !exist && j < participants_.size(); j++) {
if (participants_[j] == intermediate_participants_[i]) {
exist = true;
}
}
if (!exist) {
participants_.push_back(intermediate_participants_[i]);
}
}
intermediate_participants_.clear();
return ret;
}
void MockOb2pcCtx::add_intermediate_participants(const int64_t ls_id)
{
bool exist = false;
for (int64_t i = 0; !exist && i < intermediate_participants_.size(); i++) {
if (intermediate_participants_[i] == ls_id) {
exist = true;
}
}
if (!exist) {
intermediate_participants_.push_back(ls_id);
}
}
void MockOb2pcCtx::print_downstream()
{
TRANS_LOG(INFO, "[TREE_COMMIT_PRINT]", K(addr_));
for (int64_t i = 0; i < participants_.size(); i++) {
TRANS_LOG(INFO, "[TREE_COMMIT_PRINT] ", K(participants_[i]));
}
}
bool MockOb2pcCtx::is_real_upstream()
{
bool bret = false;
if (-1 == sender_) {
bret = true;
} else {
bret = sender_ == coordinator_;
}
return bret;
}
bool MockOb2pcCtx::need_to_advance()
{
Ob2PCRole role = get_2pc_role();
if (role == Ob2PCRole::ROOT) {
if (!all_downstream_collected_()) {
return true;
} else {
return false;
}
} else if (role == Ob2PCRole::INTERNAL) {
if (!all_downstream_collected_()) {
return true;
} else {
return false;
}
} else {
return false;
}
return false;
}
// bool MockOb2pcCtx::is_downstream_of(const int64_t ls_id)
// {
// for (int64_t i = 0; i < participants_.size(); i++) {
// if (participants_[i] == ls_id) {
// return true;
// }
// }
// for (int64_t i = 0; i < incremental_participants_.size(); i++) {
// if (intermediate_participants_[i] == ls_id) {
// return true;
// }
// }
// return false;
// }
} // end namespace transaction
} // end namespace oceanbase