[master]fix interrupt rollback stmt hang
This commit is contained in:
@ -527,6 +527,7 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx)
|
|||||||
OX (session_id = session->get_sessid());
|
OX (session_id = session->get_sessid());
|
||||||
OX (tx_desc = session->get_tx_desc());
|
OX (tx_desc = session->get_tx_desc());
|
||||||
OX (is_plain_select = plan->is_plain_select());
|
OX (is_plain_select = plan->is_plain_select());
|
||||||
|
OX (tx_desc->clear_interrupt());
|
||||||
if (OB_SUCC(ret) && !is_plain_select) {
|
if (OB_SUCC(ret) && !is_plain_select) {
|
||||||
OZ (stmt_setup_savepoint_(session, das_ctx, plan_ctx, txs, nested_level), session_id, *tx_desc);
|
OZ (stmt_setup_savepoint_(session, das_ctx, plan_ctx, txs, nested_level), session_id, *tx_desc);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -660,6 +660,7 @@ LST_DO(DEF_FREE_ROUTE_DECODE, (;), static, dynamic, parts, extra);
|
|||||||
bool is_parts_changed() { return state_change_flags_.PARTS_CHANGED_; };
|
bool is_parts_changed() { return state_change_flags_.PARTS_CHANGED_; };
|
||||||
bool is_extra_changed() { return state_change_flags_.EXTRA_CHANGED_; };
|
bool is_extra_changed() { return state_change_flags_.EXTRA_CHANGED_; };
|
||||||
void set_explicit() { flags_.EXPLICIT_ = true; }
|
void set_explicit() { flags_.EXPLICIT_ = true; }
|
||||||
|
void clear_interrupt() { flags_.INTERRUPTED_ = false; }
|
||||||
};
|
};
|
||||||
|
|
||||||
// Is used to store and travserse all TxScheduler's Stat information;
|
// Is used to store and travserse all TxScheduler's Stat information;
|
||||||
|
|||||||
@ -819,8 +819,8 @@ int ObTransService::interrupt(ObTxDesc &tx, int cause)
|
|||||||
bool busy_wait = false;
|
bool busy_wait = false;
|
||||||
{
|
{
|
||||||
ObSpinLockGuard guard(tx.lock_);
|
ObSpinLockGuard guard(tx.lock_);
|
||||||
|
tx.flags_.INTERRUPTED_ = true;
|
||||||
if (tx.flags_.BLOCK_) {
|
if (tx.flags_.BLOCK_) {
|
||||||
tx.flags_.INTERRUPTED_ = true;
|
|
||||||
TRANS_LOG(INFO, "will busy wait tx quit from block state", K(tx));
|
TRANS_LOG(INFO, "will busy wait tx quit from block state", K(tx));
|
||||||
busy_wait = true;
|
busy_wait = true;
|
||||||
}
|
}
|
||||||
@ -1545,6 +1545,7 @@ int ObTransService::sync_acquire_global_snapshot_(ObTxDesc &tx,
|
|||||||
[&]() -> bool { return tx.flags_.INTERRUPTED_; });
|
[&]() -> bool { return tx.flags_.INTERRUPTED_; });
|
||||||
tx.lock_.lock();
|
tx.lock_.lock();
|
||||||
bool interrupted = tx.flags_.INTERRUPTED_;
|
bool interrupted = tx.flags_.INTERRUPTED_;
|
||||||
|
tx.clear_interrupt();
|
||||||
tx.flags_.BLOCK_ = false;
|
tx.flags_.BLOCK_ = false;
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (op_sn != tx.op_sn_) {
|
if (op_sn != tx.op_sn_) {
|
||||||
|
|||||||
@ -1511,6 +1511,11 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx,
|
|||||||
}
|
}
|
||||||
int64_t start_ts = ObTimeUtility::current_time();
|
int64_t start_ts = ObTimeUtility::current_time();
|
||||||
int retries = 0;
|
int retries = 0;
|
||||||
|
if (OB_UNLIKELY(tx.flags_.INTERRUPTED_)) {
|
||||||
|
ret = OB_ERR_INTERRUPTED;
|
||||||
|
tx.clear_interrupt();
|
||||||
|
TRANS_LOG(WARN, "interrupted", K(ret), K(tx));
|
||||||
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
// setup state before release lock
|
// setup state before release lock
|
||||||
auto save_state = tx.state_;
|
auto save_state = tx.state_;
|
||||||
@ -1529,7 +1534,7 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx,
|
|||||||
// mask_set need clear
|
// mask_set need clear
|
||||||
tx.brpc_mask_set_.reset();
|
tx.brpc_mask_set_.reset();
|
||||||
// clear interrupt flag
|
// clear interrupt flag
|
||||||
tx.flags_.INTERRUPTED_ = false;
|
tx.clear_interrupt();
|
||||||
}
|
}
|
||||||
if (OB_NOT_NULL(tmp_tx_desc)) {
|
if (OB_NOT_NULL(tmp_tx_desc)) {
|
||||||
msg.tx_ptr_ = NULL;
|
msg.tx_ptr_ = NULL;
|
||||||
|
|||||||
Reference in New Issue
Block a user