fix delay callback scheduler in clear phase

This commit is contained in:
chinaxing 2023-09-25 02:40:07 +00:00 committed by ob-robot
parent b8f2ed8666
commit 223cf90c93
5 changed files with 48 additions and 48 deletions

View File

@ -104,6 +104,7 @@ public:
cluster_version_(0), ls_tx_ctx_mgr_(NULL),
session_id_(UINT32_MAX),
stc_(0), part_trans_action_(ObPartTransAction::UNKNOWN),
callback_scheduler_on_clear_(false),
pending_callback_param_(common::OB_SUCCESS), p_mt_ctx_(NULL),
is_exiting_(false), for_replay_(false),
has_pending_callback_(false),
@ -260,6 +261,8 @@ protected:
// the variable is used to record the action of the current transaction in the stmt execution
int64_t part_trans_action_;
ObTxCommitCallback commit_cb_;
// [only used by mysqltest]: will callback scheduler when clear log is persistented
bool callback_scheduler_on_clear_;
ObTransNeedWaitWrap trans_need_wait_wrap_;
int pending_callback_param_;
// it is used to wake up the lock queue after submitting the log of elr trans

View File

@ -280,6 +280,7 @@ void ObPartTransCtx::default_init_()
// lock_.reset();
stc_.reset();
commit_cb_.reset();
callback_scheduler_on_clear_ = false;
pending_callback_param_ = OB_SUCCESS;
trans_need_wait_wrap_.reset();
is_exiting_ = false;

View File

@ -251,23 +251,19 @@ int ObPartTransCtx::do_commit()
int ObPartTransCtx::check_and_response_scheduler_(ObTxState next_phase, int result)
{
int ret = OB_SUCCESS;
ret = OB_E(EventTable::EN_EARLY_RESPONSE_SCHEDULER) OB_SUCCESS;
if (!is_sub2pc() && OB_FAIL(ret)) {
// when error inject, response scheduler delayed to CLEAR state
if (ObTxState::CLEAR == next_phase) {
if (REACH_TIME_INTERVAL(1000 * 1000)) {
TRANS_LOG(INFO, "response scheduler in clear state", K(ret), K(*this));
}
ret = OB_SUCCESS;
} else {
TRANS_LOG(INFO, "response scheduler in 2pc", K(ret), K(result), KPC(this));
return OB_SUCCESS;
}
} else {
// general path, won't response scheduler in CLEAR state
if (ObTxState::CLEAR == next_phase) {
return OB_SUCCESS;
}
// when error inject, response scheduler delayed to CLEAR state
int inject_err = OB_E(EventTable::EN_EARLY_RESPONSE_SCHEDULER) OB_SUCCESS;
if (!is_sub2pc()
&& inject_err != OB_SUCCESS
&& next_phase != ObTxState::CLEAR
&& !callback_scheduler_on_clear_) {
callback_scheduler_on_clear_ = true;
return OB_SUCCESS;
}
if (callback_scheduler_on_clear_ && ObTxState::CLEAR != next_phase) {
// delayed, skip other state
return OB_SUCCESS;
}
if (is_sub2pc()) {

View File

@ -559,7 +559,7 @@ namespace transaction
|| (20 <= msg_type && 22 >= msg_type)
|| (40 <= msg_type && 49 >= msg_type)
|| (50 <= msg_type && 53 >= msg_type)
|| (60 <= msg_type && 66 >= msg_type));
|| (60 <= msg_type && 67 >= msg_type));
}
static bool is_2pc_msg_type(const int16_t msg_type)

View File

@ -892,36 +892,36 @@ TEST_F(ObTestTxFreeRoute, upgrade_to_4_1)
EX_COMMIT_TX();
}
TEST_F(ObTestTxFreeRoute, twiddle_knob_on_the_fly)
{
TXFR_TEST_SETUP("127.0.0.1", "127.0.0.2", "127.0.0.3");
// previous is on
EX_START_TX(1);
EX_WRITE(102,2);
EX_WRITE(103,2);
EX_COMMIT_TX();
omt::the_ctrl_of_enable_transaction_free_route = false;
// off -> on
EX_START_TX(1);
A_T(proxy.in_txn_);
A_F(proxy.can_free_route_);
EX_WRITE(100, 1); // on server 2
omt::the_ctrl_of_enable_transaction_free_route = true;
A_T(proxy.in_txn_);
A_F(proxy.can_free_route_);
EX_WRITE(101, 1); // on server 2
EX_COMMIT_TX();
// on -> off
EX_START_TX(1);
A_T(proxy.in_txn_);
A_T(proxy.can_free_route_);
EX_WRITE(100, 1); // on server 1
omt::the_ctrl_of_enable_transaction_free_route = false;
A_T(proxy.in_txn_);
A_T(proxy.can_free_route_);
EX_WRITE(101, 1); // on server 2
EX_COMMIT_TX();
}
// TEST_F(ObTestTxFreeRoute, twiddle_knob_on_the_fly)
// {
// TXFR_TEST_SETUP("127.0.0.1", "127.0.0.2", "127.0.0.3");
// // previous is on
// EX_START_TX(1);
// EX_WRITE(102,2);
// EX_WRITE(103,2);
// EX_COMMIT_TX();
// omt::the_ctrl_of_enable_transaction_free_route = false;
// // off -> on
// EX_START_TX(1);
// A_T(proxy.in_txn_);
// A_F(proxy.can_free_route_);
// EX_WRITE(100, 1); // on server 2
// omt::the_ctrl_of_enable_transaction_free_route = true;
// A_T(proxy.in_txn_);
// A_F(proxy.can_free_route_);
// EX_WRITE(101, 1); // on server 2
// EX_COMMIT_TX();
// // on -> off
// EX_START_TX(1);
// A_T(proxy.in_txn_);
// A_T(proxy.can_free_route_);
// EX_WRITE(100, 1); // on server 1
// omt::the_ctrl_of_enable_transaction_free_route = false;
// A_T(proxy.in_txn_);
// A_T(proxy.can_free_route_);
// EX_WRITE(101, 1); // on server 2
// EX_COMMIT_TX();
// }
TEST_F(ObTestTxFreeRoute, sample)