integrated test for participant failure notify scheduler

This commit is contained in:
chinaxing 2024-05-23 08:21:52 +00:00 committed by ob-robot
parent 42bf26214d
commit e98a77be79
4 changed files with 116 additions and 1 deletions

View File

@ -2464,6 +2464,59 @@ TEST_F(ObTestTx, rollback_with_branch_savepoint)
ASSERT_EQ(OB_ENTRY_NOT_EXIST, n1->read(tx, 206, val));
ROLLBACK_TX(n1, tx);
}
#define TEST_MARK_ABORT_AND_COMMIT(FLG) \
TEST_F(ObTestTx, commit_tx_sanity_check_flag_ ## FLG) \
{ \
START_ONE_TX_NODE(n1); \
PREPARE_TX(n1, tx); \
PREPARE_TX_PARAM(tx_param); \
CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, global_sp1); \
ASSERT_EQ(n1->write(tx, 1, 1), OB_SUCCESS); \
ASSERT_EQ(tx.state_, ObTxDesc::State::IMPLICIT_ACTIVE); \
tx.flags_.FLG = true; \
const int commit_ret = COMMIT_TX(n1, tx, 50000); \
EXPECT_EQ(commit_ret, OB_TRANS_ROLLBACKED); \
}
TEST_MARK_ABORT_AND_COMMIT(PART_ABORTED_)
TEST_MARK_ABORT_AND_COMMIT(PART_EPOCH_MISMATCH_)
TEST_MARK_ABORT_AND_COMMIT(PARTS_INCOMPLETE_)
#undef _MARK_ABORT_AND_COMMIT
TEST_F(ObTestTx, participant_abort_asynchronously)
{
START_TWO_TX_NODE(n1, n2);
PREPARE_TX(n1, tx);
PREPARE_TX_PARAM(tx_param);
CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, global_sp1);
ASSERT_EQ(n1->write(tx, 1, 1), OB_SUCCESS);
ASSERT_EQ(n2->write(tx, 2, 2), OB_SUCCESS);
// n1 switch to follower forcedly, then switch to leader
FLUSH_REDO(n1);
n1->wait_tx_log_synced();
SWITCH_TO_FOLLOWER_FORCEDLY(n1);
SWITCH_TO_LEADER(n1);
// check received participant aborted notify from n1
n1->wait_all_msg_consumed();
ASSERT_TRUE(tx.flags_.PART_ABORTED_);
ASSERT_EQ(tx.abort_cause_, ObTxAbortCause::PARTICIPANT_SWITCH_LEADER_DATA_INCOMPLETE);
share::ObLSArray extra_touched_ls;
extra_touched_ls.push_back(ObLSID(111));
ASSERT_EQ(ROLLBACK_TO_IMPLICIT_SAVEPOINT_X(n1, tx, global_sp1, 2000, &extra_touched_ls),
OB_TRANS_NEED_ROLLBACK);
ASSERT_EQ(tx.state_, ObTxDesc::State::ABORTED);
bool found_touched_ls_id_in_participant_set = false;
for (int i = 0; i< tx.parts_.count(); i++) {
if (tx.parts_[i].id_.id() == 111) {
found_touched_ls_id_in_participant_set = true;
}
}
ASSERT_TRUE(found_touched_ls_id_in_participant_set);
const int commit_ret = COMMIT_TX(n1, tx, 5000);
EXPECT_EQ(commit_ret, OB_TRANS_ROLLBACKED);
}
////
/// APPEND NEW TEST HERE, USE PRE DEFINED MACRO IN FILE `test_tx.dsl`
/// SEE EXAMPLE: TEST_F(ObTestTx, rollback_savepoint_timeout)

View File

@ -60,6 +60,9 @@
#define ROLLBACK_TO_IMPLICIT_SAVEPOINT(n1, tx, sp, timeout_us) \
n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_us(timeout_us), nullptr)
#define ROLLBACK_TO_IMPLICIT_SAVEPOINT_X(n1, tx, sp, timeout_us, extra_touched_ls) \
n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_us(timeout_us), extra_touched_ls)
#define INJECT_LINK_FAILURE(n1, n2) \
ASSERT_EQ(OB_SUCCESS, bus_.inject_link_failure(n1->addr_, n2->addr_)); \
LOG_INFO("##JINECT_LINK_FAILURE##", K(n1->addr_), K(n2->addr_));
@ -92,3 +95,39 @@
#define COMMIT_TX(n1, tx, timeout_us) \
n1->commit_tx(tx, n1->ts_after_us(timeout_us));
#define FLUSH_REDO(n1) \
do { \
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; \
ASSERT_EQ(n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr), OB_SUCCESS); \
ObTransID fail_tx_id; \
ASSERT_EQ(ls_tx_ctx_mgr->traverse_tx_to_submit_redo_log(fail_tx_id, UINT32_MAX), OB_SUCCESS); \
} while(0)
#define SWITCH_TO_FOLLOWER_FORCEDLY(n1) \
do { \
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; \
ASSERT_EQ(n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr), OB_SUCCESS); \
ASSERT_EQ(ls_tx_ctx_mgr->switch_to_follower_forcedly(), OB_SUCCESS); \
} while(0)
#define SWITCH_TO_FOLLOWER_GRACEFULLY(n1) \
do { \
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; \
ASSERT_EQ(n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr), OB_SUCCESS); \
ASSERT_EQ(ls_tx_ctx_mgr->switch_to_follower_gracefully(), OB_SUCCESS); \
} while(0)
#define SWITCH_TO_LEADER(n1) \
do { \
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; \
ASSERT_EQ(n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr), OB_SUCCESS); \
ASSERT_EQ(ls_tx_ctx_mgr->switch_to_leader(), OB_SUCCESS); \
/* wait state to LEADER */ \
while (!ls_tx_ctx_mgr->is_master()) { \
if (REACH_TIME_INTERVAL(500_ms)) { \
TRANS_LOG(INFO, "wait LS TxCtxMgr to be leader", KPC(ls_tx_ctx_mgr)); \
} \
usleep(10_ms); \
} \
} while(0)

View File

@ -269,6 +269,26 @@ void ObTxNode::dump_msg_queue_()
}
}
void ObTxNode::wait_all_msg_consumed()
{
while (msg_queue_.size() > 0 || !msg_consumer_.is_idle()) {
if (REACH_TIME_INTERVAL(200_ms)) {
TRANS_LOG(INFO, "wait msg_queue to be empty", K(msg_queue_.size()), KPC(this));
}
usleep(5_ms);
}
}
void ObTxNode::wait_tx_log_synced()
{
while(fake_tx_log_adapter_->get_inflight_cnt() > 0) {
if (REACH_TIME_INTERVAL(200_ms)) {
TRANS_LOG(INFO, "wait tx log synced...", K(fake_tx_log_adapter_->get_inflight_cnt()), KPC(this));
}
usleep(5_ms);
}
}
ObTxNode::~ObTxNode() __attribute__((optnone)) {
int ret = OB_SUCCESS;
TRANS_LOG(INFO, "destroy TxNode", KPC(this));

View File

@ -89,6 +89,7 @@ public:
}
void wakeup() { if (ATOMIC_BCAS(&is_sleeping_, true, false)) { cond_.signal(); } }
void set_name(ObString &name) { name_ = name; }
bool is_idle() { return ATOMIC_LOAD(&is_sleeping_); }
TO_STRING_KV(KP(this), K_(name), KP_(queue), K(queue_->size()), K_(stop));
private:
ObString name_;
@ -264,11 +265,11 @@ public:
// helpers
int64_t ts_after_us(int64_t d) const { return ObTimeUtility::current_time() + d; }
int64_t ts_after_ms(int64_t d) const { return ObTimeUtility::current_time() + d * 1000; }
private:
static void reset_localtion_adapter() {
get_location_adapter_().reset();
}
public:
void add_drop_msg_type(TX_MSG_TYPE type) {
drop_msg_type_set_.set_refactored(type);
@ -276,6 +277,8 @@ public:
void del_drop_msg_type(TX_MSG_TYPE type) {
drop_msg_type_set_.erase_refactored(type);
}
void wait_all_msg_consumed();
void wait_tx_log_synced();
public:
ObString name_; char name_buf_[32];
ObAddr addr_;