[master] fix concurrent callback sql with tx-participant callback scheduler
This commit is contained in:
@ -710,6 +710,11 @@ inline bool ObTxDesc::acq_commit_cb_lock_if_need_()
|
|||||||
bool ObTxDesc::execute_commit_cb()
|
bool ObTxDesc::execute_commit_cb()
|
||||||
{
|
{
|
||||||
bool executed = false;
|
bool executed = false;
|
||||||
|
/*
|
||||||
|
* load_acquire state_ and commit_out_
|
||||||
|
* pair with ObTransService::handle_tx_commit_result_
|
||||||
|
*/
|
||||||
|
ATOMIC_LOAD_ACQ((int*)&state_);
|
||||||
if (is_tx_end() || is_xa_terminate_state_()) {
|
if (is_tx_end() || is_xa_terminate_state_()) {
|
||||||
auto tx_id = tx_id_;
|
auto tx_id = tx_id_;
|
||||||
auto cb = commit_cb_;
|
auto cb = commit_cb_;
|
||||||
|
|||||||
@ -344,7 +344,7 @@ protected:
|
|||||||
|
|
||||||
uint64_t op_sn_; // Tx level operation sequence No
|
uint64_t op_sn_; // Tx level operation sequence No
|
||||||
|
|
||||||
enum class State // State of Tx
|
enum class State : int // State of Tx
|
||||||
{
|
{
|
||||||
INVL,
|
INVL,
|
||||||
IDLE, // created
|
IDLE, // created
|
||||||
|
|||||||
@ -429,6 +429,7 @@ int ObTransService::handle_tx_commit_result_(ObTxDesc &tx,
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool commit_fin = true;
|
bool commit_fin = true;
|
||||||
|
ObTxDesc::State state = ObTxDesc::State::INVL;
|
||||||
int commit_out = OB_SUCCESS;
|
int commit_out = OB_SUCCESS;
|
||||||
switch (result) {
|
switch (result) {
|
||||||
case OB_EAGAIN:
|
case OB_EAGAIN:
|
||||||
@ -465,23 +466,23 @@ int ObTransService::handle_tx_commit_result_(ObTxDesc &tx,
|
|||||||
break;
|
break;
|
||||||
case OB_TRANS_COMMITED:
|
case OB_TRANS_COMMITED:
|
||||||
case OB_SUCCESS:
|
case OB_SUCCESS:
|
||||||
tx.state_ = ObTxDesc::State::COMMITTED;
|
state = ObTxDesc::State::COMMITTED;
|
||||||
tx.commit_version_ = commit_version;
|
tx.commit_version_ = commit_version;
|
||||||
commit_out = OB_SUCCESS;
|
commit_out = OB_SUCCESS;
|
||||||
break;
|
break;
|
||||||
case OB_TRANS_KILLED:
|
case OB_TRANS_KILLED:
|
||||||
case OB_TRANS_ROLLBACKED:
|
case OB_TRANS_ROLLBACKED:
|
||||||
tx.state_ = ObTxDesc::State::ROLLED_BACK;
|
state = ObTxDesc::State::ROLLED_BACK;
|
||||||
commit_out = result;
|
commit_out = result;
|
||||||
break;
|
break;
|
||||||
case OB_TRANS_TIMEOUT:
|
case OB_TRANS_TIMEOUT:
|
||||||
TX_STAT_TIMEOUT_INC
|
TX_STAT_TIMEOUT_INC
|
||||||
case OB_TRANS_STMT_TIMEOUT:
|
case OB_TRANS_STMT_TIMEOUT:
|
||||||
tx.state_ = ObTxDesc::State::COMMIT_TIMEOUT;
|
state = ObTxDesc::State::COMMIT_TIMEOUT;
|
||||||
commit_out = result;
|
commit_out = result;
|
||||||
break;
|
break;
|
||||||
case OB_TRANS_UNKNOWN:
|
case OB_TRANS_UNKNOWN:
|
||||||
tx.state_ = ObTxDesc::State::COMMIT_UNKNOWN;
|
state = ObTxDesc::State::COMMIT_UNKNOWN;
|
||||||
commit_out = result;
|
commit_out = result;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
@ -494,7 +495,12 @@ int ObTransService::handle_tx_commit_result_(ObTxDesc &tx,
|
|||||||
if (tx.finish_ts_ <= 0) { // maybe aborted early
|
if (tx.finish_ts_ <= 0) { // maybe aborted early
|
||||||
tx.finish_ts_ = ObClockGenerator::getClock();
|
tx.finish_ts_ = ObClockGenerator::getClock();
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
* store_release ObTxDesc::{commit_out_, state_}
|
||||||
|
* pair with ObTxDesc::execute_commit_cb
|
||||||
|
*/
|
||||||
tx.commit_out_ = commit_out;
|
tx.commit_out_ = commit_out;
|
||||||
|
ATOMIC_STORE_REL((int*)&tx.state_, (int)state);
|
||||||
if (tx.commit_task_.is_registered()) {
|
if (tx.commit_task_.is_registered()) {
|
||||||
if (OB_FAIL(unregister_commit_retry_task_(tx))) {
|
if (OB_FAIL(unregister_commit_retry_task_(tx))) {
|
||||||
TRANS_LOG(ERROR, "deregister timeout task fail", K(tx));
|
TRANS_LOG(ERROR, "deregister timeout task fail", K(tx));
|
||||||
|
|||||||
Reference in New Issue
Block a user