[master][tx-route] do not abort tx if txn has explict savepoint or serializable snapshot
This commit is contained in:
@ -915,7 +915,7 @@ int ObVariableSetExecutor::process_session_autocommit_hook(ObExecContext &exec_c
|
|||||||
LOG_WARN("session is NULL", K(ret));
|
LOG_WARN("session is NULL", K(ret));
|
||||||
} else {
|
} else {
|
||||||
auto tx_desc = my_session->get_tx_desc();
|
auto tx_desc = my_session->get_tx_desc();
|
||||||
bool in_trans = OB_NOT_NULL(tx_desc) && tx_desc->in_tx_or_has_state();
|
bool in_trans = OB_NOT_NULL(tx_desc) && tx_desc->in_tx_or_has_extra_state();
|
||||||
if (OB_FAIL(my_session->get_autocommit(orig_ac))) {
|
if (OB_FAIL(my_session->get_autocommit(orig_ac))) {
|
||||||
LOG_WARN("fail to get autocommit", K(ret));
|
LOG_WARN("fail to get autocommit", K(ret));
|
||||||
} else if (OB_FAIL(val.get_int(autocommit))) {
|
} else if (OB_FAIL(val.get_int(autocommit))) {
|
||||||
|
|||||||
@ -539,7 +539,7 @@ int ObBasicSessionInfo::switch_tenant(uint64_t effective_tenant_id)
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid tenant_id", K(ret), K(effective_tenant_id));
|
LOG_WARN("invalid tenant_id", K(ret), K(effective_tenant_id));
|
||||||
} else if (OB_NOT_NULL(tx_desc_) && effective_tenant_id != effective_tenant_id_) {
|
} else if (OB_NOT_NULL(tx_desc_) && effective_tenant_id != effective_tenant_id_) {
|
||||||
if (tx_desc_->in_tx_or_has_state()) {
|
if (tx_desc_->in_tx_or_has_extra_state()) {
|
||||||
ret = OB_NOT_SUPPORTED;
|
ret = OB_NOT_SUPPORTED;
|
||||||
// only inner-SQL goes switch_tenant and may fall into such state
|
// only inner-SQL goes switch_tenant and may fall into such state
|
||||||
// print out error to easy trouble-shot
|
// print out error to easy trouble-shot
|
||||||
|
|||||||
@ -434,17 +434,19 @@ void ObTxDesc::print_trace()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObTxDesc::in_tx_or_has_state()
|
bool ObTxDesc::in_tx_or_has_extra_state()
|
||||||
{
|
{
|
||||||
ObSpinLockGuard guard(lock_);
|
ObSpinLockGuard guard(lock_);
|
||||||
return in_tx_or_has_state_();
|
return in_tx_or_has_extra_state_();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObTxDesc::in_tx_or_has_state_()
|
bool ObTxDesc::in_tx_or_has_extra_state_() const
|
||||||
{
|
{
|
||||||
if (is_in_tx()) {
|
return is_in_tx() || has_extra_state_();
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ObTxDesc::has_extra_state_() const
|
||||||
|
{
|
||||||
if (snapshot_version_.is_valid()) {
|
if (snapshot_version_.is_valid()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -466,7 +468,7 @@ bool ObTxDesc::in_tx_for_free_route()
|
|||||||
bool ObTxDesc::in_tx_for_free_route_()
|
bool ObTxDesc::in_tx_for_free_route_()
|
||||||
{
|
{
|
||||||
return (addr_.is_valid() && (addr_ != GCONF.self_addr_)) // txn free route temporary node
|
return (addr_.is_valid() && (addr_ != GCONF.self_addr_)) // txn free route temporary node
|
||||||
|| in_tx_or_has_state_();
|
|| in_tx_or_has_extra_state_();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObTxDesc::contain_savepoint(const ObString &sp)
|
bool ObTxDesc::contain_savepoint(const ObString &sp)
|
||||||
|
|||||||
@ -481,7 +481,8 @@ private:
|
|||||||
int update_parts_(const ObTxPartList &list);
|
int update_parts_(const ObTxPartList &list);
|
||||||
void implicit_start_tx_();
|
void implicit_start_tx_();
|
||||||
bool acq_commit_cb_lock_if_need_();
|
bool acq_commit_cb_lock_if_need_();
|
||||||
bool in_tx_or_has_state_();
|
bool has_extra_state_() const;
|
||||||
|
bool in_tx_or_has_extra_state_() const;
|
||||||
bool in_tx_for_free_route_();
|
bool in_tx_for_free_route_();
|
||||||
void print_trace_() const;
|
void print_trace_() const;
|
||||||
public:
|
public:
|
||||||
@ -602,7 +603,7 @@ public:
|
|||||||
bool is_in_tx() const { return state_ > State::IDLE; }
|
bool is_in_tx() const { return state_ > State::IDLE; }
|
||||||
bool is_tx_active() const { return state_ >= State::ACTIVE && state_ < State::IN_TERMINATE; }
|
bool is_tx_active() const { return state_ >= State::ACTIVE && state_ < State::IN_TERMINATE; }
|
||||||
void print_trace();
|
void print_trace();
|
||||||
bool in_tx_or_has_state();
|
bool in_tx_or_has_extra_state();
|
||||||
bool in_tx_for_free_route();
|
bool in_tx_for_free_route();
|
||||||
const ObTransID &get_tx_id() const { return tx_id_; }
|
const ObTransID &get_tx_id() const { return tx_id_; }
|
||||||
ObITxCallback *get_end_tx_cb() { return commit_cb_; }
|
ObITxCallback *get_end_tx_cb() { return commit_cb_; }
|
||||||
|
|||||||
@ -1024,7 +1024,10 @@ int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
|
|||||||
break;
|
break;
|
||||||
case ObTxDesc::State::IMPLICIT_ACTIVE:
|
case ObTxDesc::State::IMPLICIT_ACTIVE:
|
||||||
tx.release_implicit_savepoint(savepoint);
|
tx.release_implicit_savepoint(savepoint);
|
||||||
if (!tx.flags_.REPLICA_ && !tx.has_implicit_savepoint() && tx.active_scn_ >= savepoint) {
|
if (!tx.flags_.REPLICA_ // on tx start node
|
||||||
|
&& !tx.has_implicit_savepoint() // to first savepoint
|
||||||
|
&& tx.active_scn_ >= savepoint // rollback all dirty state
|
||||||
|
&& !tx.has_extra_state_()) { // hasn't explicit savepoint or serializable snapshot
|
||||||
reset_tx = true;
|
reset_tx = true;
|
||||||
} else {
|
} else {
|
||||||
normal_rollback = true;
|
normal_rollback = true;
|
||||||
@ -1091,6 +1094,7 @@ int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx,
|
|||||||
OB_Y(ret), OB_Y(savepoint), OB_Y(expire_ts),
|
OB_Y(ret), OB_Y(savepoint), OB_Y(expire_ts),
|
||||||
OB_ID(time_used), elapsed_us,
|
OB_ID(time_used), elapsed_us,
|
||||||
OB_ID(arg), (void*)extra_touched_ls,
|
OB_ID(arg), (void*)extra_touched_ls,
|
||||||
|
OB_ID(tag1), reset_tx,
|
||||||
OB_ID(opid), tx.op_sn_,
|
OB_ID(opid), tx.op_sn_,
|
||||||
OB_ID(ref), tx.get_ref(),
|
OB_ID(ref), tx.get_ref(),
|
||||||
OB_ID(thread_id), GETTID());
|
OB_ID(thread_id), GETTID());
|
||||||
|
|||||||
Reference in New Issue
Block a user