fix standby read blocked

This commit is contained in:
Minionyh
2024-08-21 04:59:57 +00:00
committed by ob-robot
parent 2266d37762
commit 2df6814ad8
4 changed files with 179 additions and 43 deletions

View File

@ -138,18 +138,58 @@ public :
return ret;
}
int handle_trans_ask_state(const SCN &snapshot, ObAskStateRespMsg &resp)
int handle_trans_ask_state(const SCN &snapshot, ObAskStateRespMsg &resp, const SCN &state_version)
{
int ret = OB_SUCCESS;
CtxLockGuard guard(lock_);
build_and_post_collect_state_msg(snapshot);
if (OB_FAIL(resp.state_info_array_.assign(state_info_array_))) {
TRANS_LOG(WARN, "build ObAskStateRespMsg fail", K(ret), K(snapshot), KPC(this));
ObStateInfo state_info;
state_info.ls_id_ = ls_id_;
state_info.state_ = exec_info_.state_;
state_info.snapshot_version_ = snapshot;
resp.state_info_array_.reset();
switch(exec_info_.state_) {
case ObTxState::UNKNOWN:
case ObTxState::INIT: {
state_info.version_ = state_version;
if (OB_FAIL(resp.state_info_array_.push_back(state_info))) {
TRANS_LOG(WARN, "push back state info to resp msg failed", K(ret), K(snapshot), KPC(this));
}
break;
}
case ObTxState::REDO_COMPLETE:
case ObTxState::PREPARE: {
if (is_root()) {
build_and_post_collect_state_msg(snapshot);
if (OB_FAIL(resp.state_info_array_.assign(state_info_array_))) {
TRANS_LOG(WARN, "build ObAskStateRespMsg fail", K(ret), K(snapshot), KPC(this));
}
} else {
// use coord handle_ask_state_func to test
}
break;
}
case ObTxState::PRE_COMMIT:
case ObTxState::COMMIT:
case ObTxState::CLEAR: {
state_info.version_ = state_version;
if (OB_FAIL(resp.state_info_array_.push_back(state_info))) {
TRANS_LOG(WARN, "push back state info to resp msg failed", K(ret), K(snapshot), KPC(this));
}
break;
}
case ObTxState::ABORT:
{
state_info.version_.set_min();
if (OB_FAIL(resp.state_info_array_.push_back(state_info))) {
TRANS_LOG(WARN, "push back state info to resp msg failed", K(ret), K(snapshot), KPC(this));
}
break;
}
default:
ret = OB_ERR_UNEXPECTED;
}
TRANS_LOG(INFO, "handle trans ask state", K(ret), K(resp), KPC(this));
TRANS_LOG(INFO, "handle trans ask state", K(ret), K(resp), K(state_info), KPC(this));
return ret;
}
@ -193,6 +233,7 @@ TEST_F(TestObStandbyReadTransfer, trans_check_for_standby_transfer)
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(part3_ls, part3.epoch_, 0)));
coord.set_2pc_participants_(parts);
coord.set_2pc_upstream_(coord_ls);
part1.set_2pc_upstream_(coord_ls);
part2.set_2pc_upstream_(coord_ls);
part3.set_2pc_upstream_(coord_ls);
@ -222,7 +263,7 @@ TEST_F(TestObStandbyReadTransfer, trans_check_for_standby_transfer)
transfer_part.exec_info_.prepare_version_.convert_for_tx(50);
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp, snapshot));
ObCollectStateMsg collect_state_req;
ObCollectStateRespMsg collect_state_resp;
@ -246,7 +287,7 @@ TEST_F(TestObStandbyReadTransfer, trans_check_for_standby_transfer)
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(collect_state_resp));
ASSERT_EQ(5, coord.state_info_array_.count());
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp, snapshot));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
@ -262,7 +303,7 @@ TEST_F(TestObStandbyReadTransfer, trans_check_for_standby_transfer)
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(collect_state_resp));
ASSERT_EQ(5, coord.state_info_array_.count());
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp, snapshot));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(true, can_read);
@ -270,6 +311,81 @@ TEST_F(TestObStandbyReadTransfer, trans_check_for_standby_transfer)
ASSERT_EQ(compute_prepare_version, trans_version);
}
TEST_F(TestObStandbyReadTransfer, trans_init_state)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
SCN snapshot;
snapshot.convert_for_tx(100);
SCN compute_prepare_version;
SCN max_decided_scn;
bool can_read = false;
SCN trans_version = SCN::min_scn();
ObStateInfo state_info;
ObAskStateRespMsg resp;
share::ObLSID coord_ls = share::ObLSID(1);
share::ObLSID part1_ls = share::ObLSID(1001);
share::ObLSID part2_ls = share::ObLSID(1002);
share::ObLSID part3_ls = share::ObLSID(1003);
MockObPartTransCtx coord(coord_ls);
MockObPartTransCtx part1(part1_ls), part2(part2_ls), part3(part3_ls);
ObTxCommitParts parts;
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(coord_ls, coord.epoch_, 0)));
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(part1_ls, part1.epoch_, 0)));
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(part2_ls, part2.epoch_, 0)));
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(part3_ls, part3.epoch_, 0)));
coord.set_2pc_participants_(parts);
part1.set_2pc_upstream_(coord_ls);
part2.set_2pc_upstream_(coord_ls);
part3.set_2pc_upstream_(coord_ls);
share::ObLSID part_transfer_ls = share::ObLSID(1004);
MockObPartTransCtx transfer_part(part_transfer_ls);
transfer_part.set_2pc_upstream_(part1_ls);
ASSERT_EQ(OB_SUCCESS, transfer_part.exec_info_.transfer_parts_.push_back(ObTxExecPart(part1_ls, -1, 1)));
ObTxCommitParts transfer_parts;
ASSERT_EQ(OB_SUCCESS, transfer_parts.push_back(ObTxExecPart(part_transfer_ls, -1, 1)));
part1.set_2pc_participants_(transfer_parts);
TRANS_LOG(INFO, "test1:OB_ERR_SHARED_LOCK_CONFLICT with unknown prepare version");
state_info.snapshot_version_ = snapshot;
coord.set_downstream_state(ObTxState::PREPARE);
coord.exec_info_.prepare_version_.convert_for_tx(10);
part1.set_downstream_state(ObTxState::INIT);
// part1.exec_info_.prepare_version_.convert_for_tx(20);
part2.set_downstream_state(ObTxState::PREPARE);
part2.exec_info_.prepare_version_.convert_for_tx(30);
part3.set_downstream_state(ObTxState::PREPARE);
part3.exec_info_.prepare_version_.convert_for_tx(40);
transfer_part.set_downstream_state(ObTxState::PREPARE);
transfer_part.exec_info_.prepare_version_.convert_for_tx(50);
transfer_part.handle_trans_ask_state(snapshot, resp, snapshot);
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, transfer_part.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(false, can_read);
max_decided_scn.convert_for_tx(50);
part1.handle_trans_ask_state(snapshot, resp, max_decided_scn);
ASSERT_EQ(1, resp.state_info_array_.count());
transfer_part.handle_trans_ask_state_resp(resp);
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, transfer_part.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(false, can_read);
max_decided_scn.convert_for_tx(101);
part1.handle_trans_ask_state(snapshot, resp, max_decided_scn);
ASSERT_EQ(1, resp.state_info_array_.count());
transfer_part.handle_trans_ask_state_resp(resp);
ASSERT_EQ(OB_SUCCESS, transfer_part.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(false, can_read);
}
}//end of unittest
}//end of oceanbase