[TABLELOCK] fix several bugs about timeout in tablelock

This commit is contained in:
obdev 2024-02-10 05:34:25 +00:00 committed by ob-robot
parent 0b4699b3e9
commit 0c56e628e0
18 changed files with 220 additions and 161 deletions

View File

@ -406,7 +406,7 @@ public:
&ls_tx_ctx_mgr_,
tx_data, // ObTxData
NULL); // mailbox_mgr
store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ = 0; // nowait
store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ts_ = 0; // nowait
store_ctx->mvcc_acc_ctx_.tx_desc_ = tx_desc;
store_ctx->mvcc_acc_ctx_.tx_id_ = tx_id;
store_ctx->mvcc_acc_ctx_.tx_ctx_ = tx_ctx;
@ -438,7 +438,7 @@ public:
store_ctx->mvcc_acc_ctx_.snapshot_.version_ = snapshot_scn;
store_ctx->mvcc_acc_ctx_.snapshot_.scn_ = ObTxSEQ(ObSequence::get_max_seq_no());
const int64_t abs_expire_time = expire_time + ::oceanbase::common::ObTimeUtility::current_time();
store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ = abs_expire_time;
store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ts_ = abs_expire_time;
store_ctx->mvcc_acc_ctx_.tx_scn_ = ObTxSEQ(ObSequence::inc_and_get_max_seq_no());
}
void start_pdml_stmt(ObStoreCtx *store_ctx,
@ -452,7 +452,7 @@ public:
store_ctx->mvcc_acc_ctx_.snapshot_.version_ = snapshot_scn;
store_ctx->mvcc_acc_ctx_.snapshot_.scn_ = read_seq_no;
const int64_t abs_expire_time = expire_time + ::oceanbase::common::ObTimeUtility::current_time();
store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ = abs_expire_time;
store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ts_ = abs_expire_time;
store_ctx->mvcc_acc_ctx_.tx_scn_ = ObTxSEQ(ObSequence::inc_and_get_max_seq_no());
}
void print_callback(ObStoreCtx *wtx)
@ -3444,7 +3444,7 @@ TEST_F(TestMemtableV2, test_parallel_lock_with_same_txn)
wtx->mvcc_acc_ctx_.snapshot_.version_ = scn_1000;
wtx->mvcc_acc_ctx_.snapshot_.scn_ = read_seq_no;
const int64_t abs_expire_time = 10000000000 + ::oceanbase::common::ObTimeUtility::current_time();
wtx->mvcc_acc_ctx_.abs_lock_timeout_ = abs_expire_time;
wtx->mvcc_acc_ctx_.abs_lock_timeout_ts_ = abs_expire_time;
wtx->mvcc_acc_ctx_.tx_scn_ = ObTxSEQ(ObSequence::inc_and_get_max_seq_no());
ObTableAccessContext context;

View File

@ -650,7 +650,7 @@ TEST_F(TestLockMemtable, lock_twice_out)
ret = memtable_.lock(param,
store_ctx2,
lock_second);
ASSERT_EQ(OB_TRY_LOCK_ROW_CONFLICT, ret);
ASSERT_EQ(OB_ERR_EXCLUSIVE_LOCK_CONFLICT, ret);
// 2.1 update to complete
LOG_INFO("TestLockMemtable::lock_twice_out 2.1");
@ -687,7 +687,7 @@ TEST_F(TestLockMemtable, lock_twice_out)
ret = memtable_.lock(param,
store_ctx2,
lock_second);
ASSERT_EQ(OB_TRY_LOCK_ROW_CONFLICT, ret);
ASSERT_EQ(OB_ERR_EXCLUSIVE_LOCK_CONFLICT, ret);
// clean: unlock complete.
LOG_INFO("TestLockMemtable::lock_twice_out clean");
share::SCN min_commited_scn;

View File

@ -422,6 +422,7 @@ TEST_F(ObTableLockServiceTest, lock_part)
lock_arg.timeout_us_ = 0;
lock_arg.table_id_ = table_id;
lock_arg.part_object_id_ = part_ids[0];
lock_arg.is_from_sql_ = true;
ret = MTL(ObTableLockService*)->lock_partition(*tx_desc,
tx_param,
@ -433,7 +434,7 @@ TEST_F(ObTableLockServiceTest, lock_part)
ret = MTL(ObTableLockService*)->lock_table(table_id,
lock_mode,
OWNER_TWO);
ASSERT_EQ(OB_TRY_LOCK_ROW_CONFLICT, ret);
ASSERT_EQ(OB_EAGAIN, ret);
// 2. COMMIT
LOG_INFO("ObTableLockServiceTest::lock_part 2");
@ -449,7 +450,7 @@ TEST_F(ObTableLockServiceTest, lock_part)
ret = MTL(ObTableLockService*)->lock_table(table_id,
lock_mode,
OWNER_TWO);
ASSERT_EQ(OB_TRY_LOCK_ROW_CONFLICT, ret);
ASSERT_EQ(OB_EAGAIN, ret);
// 4. UNLOCK
LOG_INFO("ObTableLockServiceTest::unlock_part 4");
@ -638,6 +639,7 @@ TEST_F(ObTableLockServiceTest, in_trans_lock_table)
lock_arg.lock_mode_ = lock_mode;
lock_arg.op_type_ = IN_TRANS_COMMON_LOCK;
lock_arg.timeout_us_ = 0;
lock_arg.is_from_sql_ = true;
ret = MTL(ObTableLockService*)->lock_table(*tx_desc,
tx_param,
lock_arg);
@ -648,7 +650,7 @@ TEST_F(ObTableLockServiceTest, in_trans_lock_table)
ret = MTL(ObTableLockService*)->lock_table(table_id,
lock_mode,
OWNER_ONE);
ASSERT_EQ(OB_TRY_LOCK_ROW_CONFLICT, ret);
ASSERT_EQ(OB_EAGAIN, ret);
// 2. LOCK MULTI PART TABLE
// 2.1 lock multi part table
// lock upgrade
@ -660,6 +662,7 @@ TEST_F(ObTableLockServiceTest, in_trans_lock_table)
lock_arg.lock_mode_ = lock_mode;
lock_arg.op_type_ = IN_TRANS_COMMON_LOCK;
lock_arg.timeout_us_ = 0;
lock_arg.is_from_sql_ = true;
ret = MTL(ObTableLockService*)->lock_table(*tx_desc,
tx_param,
lock_arg);
@ -670,7 +673,7 @@ TEST_F(ObTableLockServiceTest, in_trans_lock_table)
ret = MTL(ObTableLockService*)->lock_table(table_id,
lock_mode,
OWNER_ONE);
ASSERT_EQ(OB_TRY_LOCK_ROW_CONFLICT, ret);
ASSERT_EQ(OB_EAGAIN, ret);
// 3. CLEAN
LOG_INFO("ObTableLockServiceTest::in_trans_lock_table 3");
const int64_t stmt_timeout_ts = ObTimeUtility::current_time() + 1000 * 1000;
@ -720,6 +723,7 @@ TEST_F(ObTableLockServiceTest, lock_out_trans_after_in_trans)
lock_arg.lock_mode_ = lock_mode;
lock_arg.op_type_ = IN_TRANS_COMMON_LOCK;
lock_arg.timeout_us_ = 0;
lock_arg.is_from_sql_ = true;
ret = MTL(ObTableLockService*)->lock_table(*tx_desc,
tx_param,
lock_arg);
@ -730,7 +734,7 @@ TEST_F(ObTableLockServiceTest, lock_out_trans_after_in_trans)
ret = MTL(ObTableLockService*)->lock_table(table_id,
lock_mode,
OWNER_ONE);
ASSERT_EQ(OB_TRY_LOCK_ROW_CONFLICT, ret);
ASSERT_EQ(OB_EAGAIN, ret);
// 1.3. commit lock
LOG_INFO("ObTableLockServiceTest::lock_out_trans_after_in_trans 1.3");
@ -767,6 +771,7 @@ TEST_F(ObTableLockServiceTest, lock_out_trans_after_in_trans)
lock_arg.lock_mode_ = lock_mode;
lock_arg.op_type_ = IN_TRANS_COMMON_LOCK;
lock_arg.timeout_us_ = 0;
lock_arg.is_from_sql_ = true;
ret = MTL(ObTableLockService*)->lock_table(*tx_desc,
tx_param,
lock_arg);
@ -780,6 +785,7 @@ TEST_F(ObTableLockServiceTest, lock_out_trans_after_in_trans)
lock_arg.lock_mode_ = lock_mode;
lock_arg.op_type_ = OUT_TRANS_LOCK;
lock_arg.timeout_us_ = 0;
lock_arg.is_from_sql_ = true;
ret = MTL(ObTableLockService*)->lock_table(*tx_desc,
tx_param,
lock_arg);
@ -791,7 +797,7 @@ TEST_F(ObTableLockServiceTest, lock_out_trans_after_in_trans)
ret = MTL(ObTableLockService*)->lock_table(table_id,
lock_mode,
OWNER_ONE);
ASSERT_EQ(OB_TRY_LOCK_ROW_CONFLICT, ret);
ASSERT_EQ(OB_EAGAIN, ret);
// 2.4 commit lock
LOG_INFO("ObTableLockServiceTest::lock_out_trans_after_in_trans 2.4");
@ -807,7 +813,7 @@ TEST_F(ObTableLockServiceTest, lock_out_trans_after_in_trans)
ret = MTL(ObTableLockService*)->lock_table(table_id,
lock_mode,
OWNER_ONE);
ASSERT_EQ(OB_TRY_LOCK_ROW_CONFLICT, ret);
ASSERT_EQ(OB_EAGAIN, ret);
// 2.6 unlock out_trans lock
lock_mode = ROW_EXCLUSIVE;
@ -868,6 +874,7 @@ TEST_F(ObTableLockServiceTest, in_trans_lock_obj)
lock_arg.timeout_us_ = 0;
lock_arg.obj_type_ = ObLockOBJType::OBJ_TYPE_COMMON_OBJ;
lock_arg.obj_id_ = obj_id1;
lock_arg.is_from_sql_ = true;
ret = MTL(ObTableLockService*)->lock_obj(*tx_desc1,
tx_param,
@ -882,6 +889,7 @@ TEST_F(ObTableLockServiceTest, in_trans_lock_obj)
lock_arg.timeout_us_ = 0;
lock_arg.obj_type_ = ObLockOBJType::OBJ_TYPE_COMMON_OBJ;
lock_arg.obj_id_ = obj_id1;
lock_arg.is_from_sql_ = true;
ret = MTL(ObTableLockService*)->lock_obj(*tx_desc2,
tx_param,
@ -897,11 +905,12 @@ TEST_F(ObTableLockServiceTest, in_trans_lock_obj)
lock_arg.timeout_us_ = 0;
lock_arg.obj_type_ = ObLockOBJType::OBJ_TYPE_COMMON_OBJ;
lock_arg.obj_id_ = obj_id1;
lock_arg.is_from_sql_ = true;
ret = MTL(ObTableLockService*)->lock_obj(*tx_desc3,
tx_param,
lock_arg);
ASSERT_EQ(OB_TRY_LOCK_ROW_CONFLICT, ret);
ASSERT_EQ(OB_ERR_EXCLUSIVE_LOCK_CONFLICT, ret);
LOG_INFO("ObTableLockServiceTest::in_trans_lock_obj 1.4");
const int64_t stmt_timeout_ts = ObTimeUtility::current_time() + 1000 * 1000;

View File

@ -6098,7 +6098,6 @@ int ObDDLService::lock_tablets(ObMySQLTransaction &trans,
timeout,
conn))) {
LOG_WARN("lock dest table failed", KR(ret), K(table_id), K(tenant_id));
ret = ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
}
}
}
@ -6132,7 +6131,6 @@ int ObDDLService::lock_table(ObMySQLTransaction &trans,
timeout,
conn))) {
LOG_WARN("lock dest table failed", KR(ret), K(table_schema));
ret = ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
}
}
return ret;
@ -6169,7 +6167,6 @@ int ObDDLService::lock_mview(ObMySQLTransaction &trans, const ObSimpleTableSchem
lock_arg.timeout_us_ = 0;
if (OB_FAIL(ObInnerConnectionLockUtil::lock_obj(tenant_id, lock_arg, conn))) {
LOG_WARN("fail to lock mview obj", KR(ret), K(tenant_id), K(lock_arg), KPC(conn));
ret = ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
}
}
return ret;

View File

@ -466,11 +466,6 @@ public:
static int clear_ddl_checksum(sql::ObPhysicalPlan *phy_plan);
static bool is_table_lock_retry_ret_code(int ret)
{
return OB_TRY_LOCK_ROW_CONFLICT == ret || OB_NOT_MASTER == ret || OB_TIMEOUT == ret
|| OB_EAGAIN == ret || OB_LS_LOCATION_LEADER_NOT_EXIST == ret || OB_TRANS_CTX_NOT_EXIST == ret;
}
static bool need_remote_write(const int ret_code);
static int check_can_convert_character(const ObObjMeta &obj_meta)

View File

@ -74,7 +74,6 @@ int ObDDLLock::lock_for_add_drop_index_in_trans(
LOG_WARN("failed to lock index table", K(ret));
}
}
ret = share::ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
return ret;
}
@ -132,7 +131,6 @@ int ObDDLLock::lock_for_add_drop_index(
}
}
}
ret = share::ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
return ret;
}
@ -187,7 +185,6 @@ int ObDDLLock::lock_for_add_lob_in_trans(
} else if (OB_FAIL(ObOnlineDDLLock::lock_tablets_in_trans(tenant_id, data_tablet_ids, ROW_EXCLUSIVE, timeout_us, trans))) {
LOG_WARN("failed to lock data table tablets", K(ret));
}
ret = share::ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
return ret;
}
@ -212,7 +209,6 @@ int ObDDLLock::lock_for_add_partition_in_trans(
} else {
LOG_INFO("skip ddl lock", K(ret), K(table_id));
}
ret = share::ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
return ret;
}
@ -242,7 +238,6 @@ int ObDDLLock::lock_for_drop_partition_in_trans(
} else {
LOG_INFO("skip ddl lock", K(ret), K(table_id));
}
ret = share::ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
return ret;
}
@ -262,7 +257,6 @@ int ObDDLLock::lock_for_common_ddl_in_trans(const ObTableSchema &table_schema, O
} else if (OB_FAIL(ObOnlineDDLLock::lock_table_in_trans(tenant_id, table_id, ROW_SHARE, timeout_us, trans))) {
LOG_WARN("failed to lock ddl table", K(ret));
}
ret = share::ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
return ret;
}
@ -282,7 +276,6 @@ int ObDDLLock::lock_for_common_ddl(
} else if (OB_FAIL(ObOnlineDDLLock::lock_table(tenant_id, table_id, ROW_SHARE, lock_owner, timeout_us, trans))) {
LOG_WARN("failed to lock ddl table", K(ret));
}
ret = share::ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
return ret;
}
@ -325,7 +318,6 @@ int ObDDLLock::lock_for_offline_ddl(
LOG_WARN("failed to check tablet in same ls", K(ret));
}
}
ret = share::ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
return ret;
}

View File

@ -3212,7 +3212,6 @@ int ObLSTabletService::lock_rows(
ObTabletHandle &tablet_handle,
ObStoreCtx &ctx,
const ObDMLBaseParam &dml_param,
const int64_t abs_lock_timeout,
const ObLockFlag lock_flag,
const bool is_sfu,
ObNewRowIterator *row_iter,
@ -3255,8 +3254,6 @@ int ObLSTabletService::lock_rows(
} else {
timeguard.click("GetIds");
run_ctx.column_ids_ = &column_ids;
ctx.mvcc_acc_ctx_.abs_lock_timeout_ =
ObTablet::get_lock_wait_timeout(abs_lock_timeout, dml_param.timeout_);
ObTabletHandle tmp_handle;
while (OB_SUCCESS == ret && OB_SUCC(row_iter->get_next_row(row))) {
// Let ObStorageTableGuard refresh retired memtable, should not hold origin tablet handle
@ -3300,7 +3297,6 @@ int ObLSTabletService::lock_row(
ObTabletHandle &tablet_handle,
ObStoreCtx &ctx,
const ObDMLBaseParam &dml_param,
const int64_t abs_lock_timeout,
const ObNewRow &row,
const ObLockFlag lock_flag,
const bool is_sfu)
@ -3329,8 +3325,6 @@ int ObLSTabletService::lock_row(
} else if (OB_FAIL(run_ctx.relative_table_.get_rowkey_column_ids(col_desc))) {
LOG_WARN("Fail to get column desc", K(ret));
} else {
ctx.mvcc_acc_ctx_.abs_lock_timeout_ =
ObTablet::get_lock_wait_timeout(abs_lock_timeout, dml_param.timeout_);
if (ObTimeUtility::current_time() > dml_param.timeout_) {
ret = OB_TIMEOUT;
int64_t cur_time = ObClockGenerator::getClock();

View File

@ -368,7 +368,6 @@ public:
ObTabletHandle &tablet_handle,
ObStoreCtx &ctx,
const ObDMLBaseParam &dml_param,
const int64_t abs_lock_timeout,
const ObLockFlag lock_flag,
const bool is_sfu,
ObNewRowIterator *row_iter,
@ -377,7 +376,6 @@ public:
ObTabletHandle &tablet_handle,
ObStoreCtx &ctx,
const ObDMLBaseParam &dml_param,
const int64_t abs_lock_timeout,
const ObNewRow &row,
const ObLockFlag lock_flag,
const bool is_sfu);

View File

@ -43,8 +43,8 @@ class ObMvccAccessCtx
public:
ObMvccAccessCtx()
: type_(T::INVL),
abs_lock_timeout_(-1),
tx_lock_timeout_(-1),
abs_lock_timeout_ts_(-1),
tx_lock_timeout_us_(-1),
snapshot_(),
tx_table_guards_(),
tx_id_(),
@ -59,8 +59,8 @@ public:
{}
~ObMvccAccessCtx() {
type_ = T::INVL;
abs_lock_timeout_ = -1;
tx_lock_timeout_ = -1;
abs_lock_timeout_ts_ = -1;
tx_lock_timeout_us_ = -1;
tx_id_.reset();
tx_desc_ = NULL;
tx_ctx_ = NULL;
@ -75,8 +75,8 @@ public:
warn_tx_ctx_leaky_();
}
type_ = T::INVL;
abs_lock_timeout_ = -1;
tx_lock_timeout_ = -1;
abs_lock_timeout_ts_ = -1;
tx_lock_timeout_us_ = -1;
snapshot_.reset();
tx_table_guards_.reset();
tx_id_.reset();
@ -98,7 +98,7 @@ public:
}
}
bool is_write_valid__() const {
return abs_lock_timeout_ >= 0
return abs_lock_timeout_ts_ >= 0
&& snapshot_.is_valid()
&& tx_ctx_
&& mem_ctx_
@ -112,7 +112,7 @@ public:
&& tx_id_.is_valid();
}
bool is_read_valid__() const {
return abs_lock_timeout_ >= 0
return abs_lock_timeout_ts_ >= 0
&& snapshot_.is_valid()
&& tx_table_guards_.is_valid()
&& (!tx_ctx_ || mem_ctx_);
@ -131,8 +131,8 @@ public:
mem_ctx_ = mem_ctx;
tx_table_guards_.tx_table_guard_ = tx_table_guard;
snapshot_ = snapshot;
abs_lock_timeout_ = abs_lock_timeout;
tx_lock_timeout_ = tx_lock_timeout;
abs_lock_timeout_ts_ = abs_lock_timeout;
tx_lock_timeout_us_ = tx_lock_timeout;
}
// light read, used by storage background merge/compaction routine
void init_read(const storage::ObTxTableGuard &tx_table_guard,
@ -164,8 +164,8 @@ public:
tx_desc_ = &tx_desc;
tx_table_guards_.tx_table_guard_ = tx_table_guard;
snapshot_ = snapshot;
abs_lock_timeout_ = abs_lock_timeout;
tx_lock_timeout_ = tx_lock_timeout;
abs_lock_timeout_ts_ = abs_lock_timeout;
tx_lock_timeout_us_ = tx_lock_timeout;
write_flag_ = write_flag;
}
@ -208,26 +208,23 @@ public:
bool is_replay() const { return type_ == T::REPLAY; }
int64_t eval_lock_expire_ts(int64_t lock_wait_start_ts = 0) const {
int64_t expire_ts = OB_INVALID_TIMESTAMP;
if (tx_lock_timeout_ >= 0) {
// Case 1: When tx_lock_timeout is bigger than 0, we use the minimum of
// the tx_lock_timeout plus remaining time(defined from system
// variable) and abs_lock_timeout(calcualted from select-for-update
// timeout).
// Case 2: When tx_lock_timeout is euqal to 0, we use the remaining time
// as timeout(And it must trigger timeout when write-write conflict)
lock_wait_start_ts = lock_wait_start_ts > 0 ?
lock_wait_start_ts : ObTimeUtility::current_time();
expire_ts = MIN(lock_wait_start_ts + tx_lock_timeout_, abs_lock_timeout_);
if (tx_lock_timeout_us_ >= 0) {
// Case 1: When tx_lock_timeout_us is not less than 0, we need to calculate the timeout timestamp for waiting for
// the lock (by adding tx_lock_timeout_us to the timestamp of when we start waiting for the lock, i.e.
// lock_wait_start_ts), and take the minimum value between this timeout timestamp and abs_lock_timeout_ts (which
// is calcualted from select-for-update timeout and ob_query_timeout) as the value for absolute timeout timestamp.
lock_wait_start_ts = lock_wait_start_ts > 0 ? lock_wait_start_ts : ObTimeUtility::current_time();
expire_ts = MIN(lock_wait_start_ts + tx_lock_timeout_us_, abs_lock_timeout_ts_);
} else {
// Case 2: When tx_lock_timeout is smaller than 0, we use abs_lock_timeout
// as timeout(calcualted from select-for-update timeout).
expire_ts = abs_lock_timeout_;
// Case 2: When tx_lock_timeout_us is less than 0, we use abs_lock_timeout_ts (which is calcualted from
// select-for-update timeout and ob_query_timeout) as absolute timeout timestamp .
expire_ts = abs_lock_timeout_ts_;
}
return expire_ts;
}
TO_STRING_KV(K_(type),
K_(abs_lock_timeout),
K_(tx_lock_timeout),
K_(abs_lock_timeout_ts),
K_(tx_lock_timeout_us),
K_(snapshot),
K_(tx_table_guards),
K_(tx_id),
@ -242,10 +239,11 @@ private:
void warn_tx_ctx_leaky_();
public: // NOTE: those field should only be accessed by txn relative routine
enum class T { INVL, STRONG_READ, WEAK_READ, WRITE, REPLAY } type_;
// abs_lock_timeout is calculated from the minimum of the wait time of the
// select_for_update and timeout in dml_param / scan_param
int64_t abs_lock_timeout_;
// tx_lock_timeout is defined as a system variable `ob_trx_lock_timeout`,
// abs_lock_timeout_ts is the minimum value between the timeout timestamp of
// the 'select for update' SQL statement and the timeout timestamp of the
// dml_param / scan_param (which is calculated from ob_query_timeout).
int64_t abs_lock_timeout_ts_;
// tx_lock_timeout_us is defined as a system variable `ob_trx_lock_timeout`,
// as the timeout of waiting on the WW conflict. it timeout reached
// return OB_ERR_EXCLUSIVE_LOCK_CONFLICT error to SQL
// SQL will stop retry, otherwise return OB_TRY_LOCK_ROW_CONFLICT, SQL will
@ -255,7 +253,7 @@ public: // NOTE: those field should only be accessed by txn relative routine
// - When ob_trx_lock_timeout is bigger than 0, the timeout is equal to the
// minimum between ob_query_timeout and ob_trx_lock_timeout
// - When ob_trx_lock_timeout is equal to 0, it means never wait
int64_t tx_lock_timeout_;
int64_t tx_lock_timeout_us_;
transaction::ObTxSnapshot snapshot_;
storage::ObTxTableGuards tx_table_guards_; // for transfer query
// specials for MvccWrite

View File

@ -151,17 +151,13 @@ int ObLockMemtable::lock_(
do {
// retry if there is lock conflict at part trans ctx.
need_retry = false;
{
succ_step = STEP_BEGIN;
lock_exist = false;
lock_mode_in_same_trans = 0x0;
conflict_tx_set.reset();
ObMvccWriteGuard guard;
if (ObClockGenerator::getClock() >= param.expired_time_) {
ret = (ret == OB_TRY_LOCK_ROW_CONFLICT ? OB_ERR_EXCLUSIVE_LOCK_CONFLICT : OB_TIMEOUT);
LOG_WARN("lock timeout", K(ret), K(lock_op), K(param));
} else if (OB_FAIL(guard.write_auth(ctx))) {
if (OB_FAIL(guard.write_auth(ctx))) {
LOG_WARN("not allow lock table.", K(ret), K(ctx));
} else if (OB_FAIL(check_tablet_write_allow_(lock_op))) {
LOG_WARN("check tablet write allow failed", K(ret), K(lock_op));
@ -201,26 +197,41 @@ int ObLockMemtable::lock_(
if (OB_FAIL(ret) && succ_step == STEP_IN_LOCK_MGR) {
obj_lock_map_.remove_lock_record(lock_op);
}
if (!need_retry &&
ret == OB_TRY_LOCK_ROW_CONFLICT) {
if (param.is_try_lock_) {
} else if (ctx.mvcc_acc_ctx_.tx_ctx_->is_table_lock_killed()) {
// trans is killed by deadlock detect or abort because of
// something else.
ret = OB_TRANS_KILLED;
} else if (lock_op.is_dml_lock_op() /* only dml lock will wait at lock wait mgr */) {
// wait at lock wait mgr but not retry at here.
} else {
// register to deadlock detector.
need_retry = true;
if (!lock_op.is_dml_lock_op() && !register_to_deadlock) {
if (OB_TMP_FAIL(register_into_deadlock_detector_(ctx, lock_op))) {
LOG_WARN("register to deadlock detector failed", K(ret), K(lock_op));
} else {
register_to_deadlock = true;
if (OB_TRY_LOCK_ROW_CONFLICT == ret) {
if (OB_TMP_FAIL(check_and_set_tx_lock_timeout_(ctx.mvcc_acc_ctx_))) {
ret = tmp_ret;
LOG_WARN("tx lock timeout", K(ret));
break;
} else if (!need_retry) {
if (param.is_try_lock_) {
ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
} else if (ctx.mvcc_acc_ctx_.tx_ctx_->is_table_lock_killed()) {
// trans is killed by deadlock detect or abort because of
// something else.
ret = OB_TRANS_KILLED;
} else if (lock_op.is_dml_lock_op() /* only dml lock will wait at lock wait mgr */) {
// wait at lock wait mgr but not retry at here.
} else {
// register to deadlock detector.
need_retry = true;
if (!lock_op.is_dml_lock_op() && !register_to_deadlock) {
if (OB_TMP_FAIL(register_into_deadlock_detector_(ctx, lock_op))) {
LOG_WARN("register to deadlock detector failed", K(ret), K(lock_op));
} else {
register_to_deadlock = true;
}
}
}
}
} else if (OB_SUCCESS == ret) {
// lock successfully, reset lock_wait_start_ts
ctx.mvcc_acc_ctx_.set_lock_wait_start_ts(0);
}
if (ObClockGenerator::getClock() >= param.expired_time_) {
ret = (ret == OB_TRY_LOCK_ROW_CONFLICT ? OB_ERR_EXCLUSIVE_LOCK_CONFLICT : OB_TIMEOUT);
LOG_WARN("lock timeout", K(ret), K(lock_op), K(param));
break;
}
}
if (need_retry) {
@ -262,12 +273,16 @@ int ObLockMemtable::lock_(
if (!recheck_f.is_valid()) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN("recheck function construct failed", K(tmp_ret));
} else if (OB_SUCCESS != (tmp_ret = post_obj_lock_conflict_(ctx.mvcc_acc_ctx_,
lock_op.lock_id_,
lock_op.lock_mode_,
*(conflict_tx_set.begin()),
recheck_f))) {
LOG_WARN("post obj lock conflict failed", K(tmp_ret));
} else if (OB_TMP_FAIL(post_obj_lock_conflict_(ctx.mvcc_acc_ctx_,
lock_op.lock_id_,
lock_op.lock_mode_,
*(conflict_tx_set.begin()),
recheck_f))) {
if (OB_ERR_EXCLUSIVE_LOCK_CONFLICT == tmp_ret) {
ret = tmp_ret;
} else {
LOG_WARN("post obj lock conflict failed", K(tmp_ret), K(lock_op));
}
} else {
// do nothing
}
@ -459,12 +474,7 @@ int ObLockMemtable::post_obj_lock_conflict_(ObMvccAccessCtx &acc_ctx,
? mem_ctx->get_lock_wait_start_ts()
: current_ts;
int64_t lock_wait_expire_ts = acc_ctx.eval_lock_expire_ts(lock_wait_start_ts);
if (current_ts >= lock_wait_expire_ts) {
ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
LOG_WARN("exclusive lock conflict", K(ret), K(lock_id),
K(conflict_tx_id), K(acc_ctx), K(lock_wait_expire_ts));
} else if (OB_ISNULL(lock_wait_mgr = MTL_WITH_CHECK_TENANT(ObLockWaitMgr*,
mem_ctx->get_tenant_id()))) {
if (OB_ISNULL(lock_wait_mgr = MTL_WITH_CHECK_TENANT(ObLockWaitMgr *, mem_ctx->get_tenant_id()))) {
LOG_WARN("can not get tenant lock_wait_mgr MTL", K(mem_ctx->get_tenant_id()));
} else {
int tmp_ret = OB_SUCCESS;
@ -473,7 +483,7 @@ int ObLockMemtable::post_obj_lock_conflict_(ObMvccAccessCtx &acc_ctx,
bool remote_tx = tx_ctx->get_scheduler() != tx_ctx->get_addr();
// TODO: one thread only can wait at one lock now.
// this may be not enough.
tmp_ret = lock_wait_mgr->post_lock(OB_TRY_LOCK_ROW_CONFLICT,
if (OB_TMP_FAIL(lock_wait_mgr->post_lock(OB_TRY_LOCK_ROW_CONFLICT,
LS_LOCK_TABLET,
lock_id,
lock_wait_expire_ts,
@ -483,15 +493,15 @@ int ObLockMemtable::post_obj_lock_conflict_(ObMvccAccessCtx &acc_ctx,
tx_id,
conflict_tx_id,
lock_mode,
recheck_f);
if (OB_SUCCESS != tmp_ret) {
recheck_f))) {
LOG_WARN("post_lock after tx conflict failed",
K(tmp_ret), K(tx_id), K(conflict_tx_id));
} else if (mem_ctx->get_lock_wait_start_ts() <= 0) {
mem_ctx->set_lock_wait_start_ts(lock_wait_start_ts);
}
}
LOG_DEBUG("ObLockMemtable::post_obj_lock_conflict_", K(ret), K(lock_id), K(conflict_tx_id));
LOG_DEBUG("ObLockMemtable::post_obj_lock_conflict_",
K(ret),
K(lock_id),
K(conflict_tx_id));
return ret;
}
@ -1102,6 +1112,25 @@ int ObLockMemtable::unregister_from_deadlock_detector_(const ObTableLockOp &lock
return ret;
}
int ObLockMemtable::check_and_set_tx_lock_timeout_(const ObMvccAccessCtx &acc_ctx)
{
int ret = OB_SUCCESS;
ObMemtableCtx *mem_ctx = acc_ctx.get_mem_ctx();
int64_t current_ts = common::ObClockGenerator::getClock();
if (mem_ctx->get_lock_wait_start_ts() <= 0) {
mem_ctx->set_lock_wait_start_ts(current_ts);
} else {
int64_t lock_wait_start_ts = mem_ctx->get_lock_wait_start_ts();
int64_t lock_wait_expire_ts = acc_ctx.eval_lock_expire_ts(lock_wait_start_ts);
if (current_ts >= lock_wait_expire_ts) {
ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
LOG_WARN(
"exclusive lock conflict", K(ret), K(acc_ctx), K(lock_wait_start_ts), K(lock_wait_expire_ts), K(current_ts));
}
}
return ret;
}
} // tablelock
} // transaction
} // oceanbase

View File

@ -203,6 +203,8 @@ private:
int unregister_from_deadlock_detector_(const ObTableLockOp &lock_op);
int check_tablet_write_allow_(const ObTableLockOp &lock_op);
int get_lock_wait_expire_ts_(const int64_t lock_wait_start_ts);
int check_and_set_tx_lock_timeout_(const memtable::ObMvccAccessCtx &acc_ctx);
private:
typedef common::SpinRWLock RWLock;
typedef common::SpinRLockGuard RLockGuard;

View File

@ -55,6 +55,7 @@ ObTableLockService::ObTableLockCtx::ObTableLockCtx(const ObTableLockTaskType tas
schema_version_(-1),
tx_is_killed_(false),
is_from_sql_(false),
ret_code_before_end_stmt_or_tx_(OB_SUCCESS),
stmt_savepoint_()
{
abs_timeout_ts_ = (0 == timeout_us)
@ -426,6 +427,7 @@ int ObTableLockService::lock_table(const uint64_t table_id,
LOG_INFO("ObTableLockService::lock_table",
K(table_id), K(lock_mode), K(lock_owner), K(timeout_us));
int ret = OB_SUCCESS;
int ret_code_before_end_stmt_or_tx = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("lock service is not inited", K(ret), K(table_id), K(lock_mode),
@ -451,9 +453,10 @@ int ObTableLockService::lock_table(const uint64_t table_id,
ctx.lock_op_type_ = OUT_TRANS_LOCK;
ret = process_lock_task_(ctx, lock_mode, lock_owner);
need_retry = need_retry_trans_(ctx, ret);
ret_code_before_end_stmt_or_tx = ctx.ret_code_before_end_stmt_or_tx_;
} while (need_retry);
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ret_code_before_end_stmt_or_tx, false /*is_from_sql*/);
return ret;
}
@ -502,6 +505,7 @@ int ObTableLockService::lock_tablet(const uint64_t table_id,
const int64_t timeout_us)
{
int ret = OB_SUCCESS;
int ret_code_before_end_stmt_or_tx = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("lock service is not inited", K(ret), K(table_id), K(tablet_id),
@ -533,9 +537,10 @@ int ObTableLockService::lock_tablet(const uint64_t table_id,
LOG_WARN("process lock task failed", K(ret), K(tablet_id));
}
need_retry = need_retry_trans_(ctx, ret);
ret_code_before_end_stmt_or_tx = ctx.ret_code_before_end_stmt_or_tx_;
} while (need_retry);
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ret_code_before_end_stmt_or_tx, false /*is_from_sql*/);
return ret;
}
@ -611,7 +616,7 @@ int ObTableLockService::lock_table(ObTxDesc &tx_desc,
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.ret_code_before_end_stmt_or_tx_, ctx.is_from_sql_);
}
return ret;
}
@ -639,8 +644,9 @@ int ObTableLockService::unlock_table(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.is_from_sql_);
}
return ret;
}
@ -668,12 +674,13 @@ int ObTableLockService::lock_tablet(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
if (OB_FAIL(ctx.set_tablet_id(arg.tablet_id_))) {
LOG_WARN("set tablet id failed", K(ret), K(arg));
} else if (OB_FAIL(process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_))) {
LOG_WARN("process lock task failed", K(ret), K(arg));
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.ret_code_before_end_stmt_or_tx_, ctx.is_from_sql_);
}
return ret;
}
@ -703,12 +710,13 @@ int ObTableLockService::lock_tablet(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
if (OB_FAIL(ctx.set_tablet_id(arg.tablet_ids_))) {
LOG_WARN("set tablet id failed", K(ret), K(arg));
} else if (OB_FAIL(process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_))) {
LOG_WARN("process lock task failed", K(ret), K(arg));
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.ret_code_before_end_stmt_or_tx_, ctx.is_from_sql_);
}
return ret;
}
@ -737,12 +745,13 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
if (OB_FAIL(ctx.set_tablet_id(arg.tablet_id_))) {
LOG_WARN("set tablet id failed", K(ret), K(arg));
} else if (OB_FAIL(process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_))) {
LOG_WARN("process lock task failed", K(ret), K(arg));
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.is_from_sql_);
}
return ret;
}
@ -772,12 +781,13 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
if (OB_FAIL(ctx.set_tablet_id(arg.tablet_ids_))) {
LOG_WARN("set tablet id failed", K(ret), K(arg));
} else if (OB_FAIL(process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_))) {
LOG_WARN("process lock task failed", K(ret), K(arg));
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.is_from_sql_);
}
return ret;
}
@ -807,12 +817,13 @@ int ObTableLockService::lock_tablet(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
if (OB_FAIL(ctx.set_tablet_id(arg.tablet_ids_))) {
LOG_WARN("set tablet id failed", K(ret), K(arg));
} else if (OB_FAIL(process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_))) {
LOG_WARN("process lock task failed", K(ret), K(arg));
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.ret_code_before_end_stmt_or_tx_, ctx.is_from_sql_);
}
return ret;
}
@ -843,12 +854,13 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
if (OB_FAIL(ctx.set_tablet_id(arg.tablet_ids_))) {
LOG_WARN("set tablet id failed", K(ret), K(arg));
} else if (OB_FAIL(process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_))) {
LOG_WARN("process lock task failed", K(ret), K(arg));
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.is_from_sql_);
}
return ret;
}
@ -916,7 +928,7 @@ int ObTableLockService::lock_partition(ObTxDesc &tx_desc,
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.ret_code_before_end_stmt_or_tx_, ctx.is_from_sql_);
}
return ret;
}
@ -947,8 +959,9 @@ int ObTableLockService::unlock_partition(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.is_from_sql_);
}
return ret;
}
@ -980,7 +993,7 @@ int ObTableLockService::lock_subpartition(ObTxDesc &tx_desc,
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.ret_code_before_end_stmt_or_tx_, ctx.is_from_sql_);
}
return ret;
}
@ -1011,8 +1024,9 @@ int ObTableLockService::unlock_subpartition(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.is_from_sql_);
}
return ret;
}
@ -1041,12 +1055,13 @@ int ObTableLockService::lock_obj(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
if (OB_FAIL(ctx.set_lock_id(arg.obj_type_, arg.obj_id_))) {
LOG_WARN("set lock id failed", K(ret), K(arg));
} else {
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.ret_code_before_end_stmt_or_tx_, ctx.is_from_sql_);
}
return ret;
}
@ -1076,12 +1091,13 @@ int ObTableLockService::unlock_obj(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
if (OB_FAIL(ctx.set_lock_id(arg.obj_type_, arg.obj_id_))) {
LOG_WARN("set lock id failed", K(ret), K(arg));
} else {
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.is_from_sql_);
}
return ret;
}
@ -1112,12 +1128,13 @@ int ObTableLockService::lock_obj(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
if (OB_FAIL(ctx.set_lock_id(arg.objs_))) {
LOG_WARN("set lock id failed", K(ret), K(arg));
} else {
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.ret_code_before_end_stmt_or_tx_, ctx.is_from_sql_);
}
return ret;
}
@ -1149,12 +1166,13 @@ int ObTableLockService::unlock_obj(ObTxDesc &tx_desc,
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ctx.is_from_sql_ = arg.is_from_sql_;
if (OB_FAIL(ctx.set_lock_id(arg.objs_))) {
LOG_WARN("set lock id failed", K(ret), K(arg));
} else {
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
}
ret = rewrite_return_code_(ret);
ret = rewrite_return_code_(ret, ctx.is_from_sql_);
}
return ret;
}
@ -1209,7 +1227,7 @@ int ObTableLockService::process_lock_task_(ObTableLockCtx &ctx,
LOG_WARN("process table lock task failed", K(ret), K(ctx), K(lock_mode), K(lock_owner));
}
}
ctx.ret_code_before_end_stmt_or_tx_ = ret;
if (ctx.is_in_trans_ && OB_UNLIKELY(OB_SUCCESS != (tmp_ret = end_stmt_(ctx, OB_SUCCESS != ret)))) {
LOG_WARN("failed to end stmt", K(ret), K(tmp_ret), K(ctx));
// end stmt failed need rollback the whole trans.
@ -1573,7 +1591,7 @@ int ObTableLockService::parallel_batch_rpc_handle_(RpcProxy &proxy_batch,
const ObLSLockMap *in_map = nullptr;
ObLSLockMap *retry_map = nullptr;
bool can_retry = true; // whether the whole rpc task can retry.
int64_t retry_times = 0;
int64_t retry_times = 1;
for (int64_t i = 0; i < MAP_NUM && OB_SUCC(ret); i++) {
if (OB_FAIL(maps[i].create(10, lib::ObLabel("LSLockMap")))) {
LOG_WARN("ls lock map create failed", KR(ret), K(i));
@ -1748,7 +1766,8 @@ int ObTableLockService::batch_pre_check_lock_(ObTableLockCtx &ctx,
}
if (ret == OB_TRY_LOCK_ROW_CONFLICT) {
if (ctx.is_try_lock()) {
// do nothing
ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
LOG_INFO("try lock and meet conflict", K(ret), K(ctx));
} else if (OB_UNLIKELY(ctx.is_timeout())) {
ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
LOG_WARN("lock table timeout", K(ret), K(ctx));
@ -2024,7 +2043,7 @@ int ObTableLockService::batch_rpc_handle_(RpcProxy &proxy_batch,
const ObLSLockMap *in_map = nullptr;
ObLSLockMap *retry_map = nullptr;
bool can_retry = true; // whether the whole rpc task can retry.
int64_t retry_times = 0;
int64_t retry_times = 1;
for (int64_t i = 0; i < MAP_NUM && OB_SUCC(ret); i++) {
if (OB_FAIL(maps[i].create(10, lib::ObLabel("LSLockMap")))) {
LOG_WARN("ls lock map create failed", KR(ret), K(i));
@ -2815,21 +2834,36 @@ bool ObTableLockService::need_renew_location_(const int64_t ret) const
return (OB_LS_LOCATION_NOT_EXIST == ret || OB_LS_LOCATION_LEADER_NOT_EXIST == ret);
}
int ObTableLockService::rewrite_return_code_(const int ret) const
int ObTableLockService::rewrite_return_code_(const int ret, const int ret_code_before_end_stmt_or_tx, const bool is_from_sql) const
{
int rewrite_rcode = ret;
// rewrite to OB_EAGAIN, to make sure the ddl process will retry again.
if (OB_TRANS_KILLED == ret ||
OB_OBJ_UNLOCK_CONFLICT == ret ||
OB_OBJ_LOCK_NOT_COMPLETED == ret) {
if (is_from_sql) {
if (is_lock_conflict_ret_code_(ret_code_before_end_stmt_or_tx) && is_timeout_ret_code_(ret)) {
rewrite_rcode = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
}
} else if (is_can_retry_err_(ret)) {
// rewrite to OB_EAGAIN, to make sure the ddl process will retry again.
rewrite_rcode = OB_EAGAIN;
}
return rewrite_rcode;
}
int ObTableLockService::is_timeout_ret_code_(const int ret) const
bool ObTableLockService::is_lock_conflict_ret_code_(const int ret) const
{
return (OB_TIMEOUT == ret || OB_GET_LOCATION_TIME_OUT == ret);
return (OB_TRY_LOCK_ROW_CONFLICT == ret || OB_ERR_EXCLUSIVE_LOCK_CONFLICT == ret);
}
bool ObTableLockService::is_timeout_ret_code_(const int ret) const
{
return (OB_TIMEOUT == ret || OB_TRANS_TIMEOUT == ret ||
OB_TRANS_STMT_TIMEOUT == ret || OB_GET_LOCATION_TIME_OUT == ret);
}
bool ObTableLockService::is_can_retry_err_(const int ret) const
{
return (OB_TRANS_KILLED == ret || OB_OBJ_UNLOCK_CONFLICT == ret || OB_OBJ_LOCK_NOT_COMPLETED == ret
|| OB_TRY_LOCK_ROW_CONFLICT == ret || OB_ERR_EXCLUSIVE_LOCK_CONFLICT == ret || OB_NOT_MASTER == ret
|| OB_TIMEOUT == ret || OB_LS_LOCATION_LEADER_NOT_EXIST == ret || OB_TRANS_CTX_NOT_EXIST == ret);
}
int ObTableLockService::get_ls_leader_(

View File

@ -143,8 +143,9 @@ private:
// TODO: yanyuan.cxf we need better performance.
// share::ObLSArray ls_list_; // related ls list
int64_t schema_version_; // the schema version of the table to be locked
bool tx_is_killed_; // used to kill a trans.
bool tx_is_killed_; // used to kill a trans.
bool is_from_sql_;
int ret_code_before_end_stmt_or_tx_; // used to mark this lock is still conflict while lock request exiting
// use to kill the whole lock table stmt.
transaction::ObTxSEQ stmt_savepoint_;
@ -155,7 +156,7 @@ private:
K(abs_timeout_ts_), KPC(tx_desc_), K(tx_param_),
K(current_savepoint_), K(need_rollback_ls_),
K(schema_version_), K(tx_is_killed_),
K(is_from_sql_), K(stmt_savepoint_));
K(is_from_sql_), K(ret_code_before_end_stmt_or_tx_), K(stmt_savepoint_));
};
class ObRetryCtx
{
@ -345,8 +346,10 @@ private:
bool need_retry_part_rpc_task_(const int ret,
const ObTableLockTaskResult *result) const;
bool need_renew_location_(const int64_t ret) const;
int rewrite_return_code_(const int ret) const;
int is_timeout_ret_code_(const int ret) const;
int rewrite_return_code_(const int ret, const int ret_code_before_end_stmt_or_tx = OB_SUCCESS, const bool is_from_sql = false) const;
bool is_lock_conflict_ret_code_(const int ret) const;
bool is_timeout_ret_code_(const int ret) const;
bool is_can_retry_err_(const int ret) const;
int process_lock_task_(ObTableLockCtx &ctx,
const ObTableLockMode lock_mode,
const ObTableLockOwnerID lock_owner);

View File

@ -412,9 +412,6 @@ public:
const int64_t len,
share::ObLSID &ls_id,
common::ObTabletID &tablet_id);
static int64_t get_lock_wait_timeout(
const int64_t abs_lock_timeout,
const int64_t stmt_timeout);
static int check_transfer_seq_equal(const ObTablet &tablet, const int64_t transfer_seq);
int rowkey_exists(
ObRelativeTable &relative_table,
@ -951,14 +948,6 @@ inline int64_t ObTablet::dec_ref()
return cnt;
}
inline int64_t ObTablet::get_lock_wait_timeout(
const int64_t abs_lock_timeout,
const int64_t stmt_timeout)
{
return (abs_lock_timeout < 0 ? stmt_timeout :
(abs_lock_timeout > stmt_timeout ? stmt_timeout : abs_lock_timeout));
}
#ifdef OB_BUILD_TDE_SECURITY
inline void ObTablet::get_encrypt_meta(
const uint64_t table_id,

View File

@ -576,6 +576,7 @@ int ObAccessService::check_write_allowed_(
const common::ObTabletID &tablet_id,
const ObStoreAccessType access_type,
const ObDMLBaseParam &dml_param,
const int64_t lock_wait_timeout_ts,
transaction::ObTxDesc &tx_desc,
ObTabletHandle &tablet_handle,
ObStoreCtxGuard &ctx_guard)
@ -588,15 +589,16 @@ int ObAccessService::check_write_allowed_(
const ObTableLockMode lock_mode = ROW_EXCLUSIVE;
const ObTableLockOpType lock_op_type = IN_TRANS_DML_LOCK;
const ObTableLockOwnerID lock_owner(0);
const bool is_try_lock = false;
const bool is_deadlock_avoid_enabled = false;
bool is_try_lock = lock_wait_timeout_ts <= 0;
int64_t abs_timeout_ts = MIN(lock_wait_timeout_ts, tx_desc.get_expire_ts());
if (OB_FAIL(check_tenant_out_of_memstore_limit_(is_out_of_mem))) {
LOG_WARN("fail to check tenant out of mem limit", K(ret), K_(tenant_id));
} else if (is_out_of_mem && !tablet_id.is_inner_tablet()) {
ret = OB_TENANT_OUT_OF_MEM;
LOG_WARN("this tenant is already out of memstore limit", K(ret), K_(tenant_id));
} else if (OB_FAIL(get_write_store_ctx_guard_(ls_id,
dml_param.timeout_,
abs_timeout_ts,
tx_desc,
dml_param.snapshot_,
dml_param.branch_id_,
@ -609,7 +611,6 @@ int ObAccessService::check_write_allowed_(
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls should not be null", K(ret), K(ls_id), K_(tenant_id));
} else {
// TODO: this may confuse user, because of txn timeout won't notify user proactively
int64_t lock_expired_ts = MIN(dml_param.timeout_, tx_desc.get_expire_ts());
if (OB_FAIL(get_lock_id(tablet_id, lock_id))) {
LOG_WARN("get lock id failed", K(ret), K(tablet_id));
@ -620,6 +621,11 @@ int ObAccessService::check_write_allowed_(
dml_param.schema_version_,
is_deadlock_avoid_enabled,
is_try_lock,
// we can not use abs_timeout_ts here,
// because we may meet select-for-update nowait,
// and abs_timeout_ts is 0. We will judge
// timeout before meet lock conflict in tablelock,
// so it will lead to incorrect error
lock_expired_ts))) {
LOG_WARN("get lock param failed", K(ret), K(lock_id));
} // When locking the table, the tablet is not detected to be deleted.
@ -674,6 +680,7 @@ int ObAccessService::delete_rows(
tablet_id,
ObStoreAccessType::MODIFY,
dml_param,
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {
@ -733,6 +740,7 @@ int ObAccessService::put_rows(
tablet_id,
ObStoreAccessType::MODIFY,
dml_param,
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {
@ -792,6 +800,7 @@ int ObAccessService::insert_rows(
tablet_id,
ObStoreAccessType::MODIFY,
dml_param,
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {
@ -855,6 +864,7 @@ int ObAccessService::insert_row(
tablet_id,
ObStoreAccessType::MODIFY,
dml_param,
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {
@ -928,6 +938,7 @@ int ObAccessService::update_rows(
tablet_id,
ObStoreAccessType::MODIFY,
dml_param,
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {
@ -973,6 +984,7 @@ int ObAccessService::lock_rows(
ObLSTabletService *tablet_service = nullptr;
// Attention!!! This handle is only used for ObLSTabletService, will be reset inside ObLSTabletService.
ObTabletHandle tablet_handle;
int64_t lock_wait_timeout_ts = get_lock_wait_timeout_(abs_lock_timeout, dml_param.timeout_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ob access service is not running.", K(ret));
@ -988,6 +1000,7 @@ int ObAccessService::lock_rows(
tablet_id,
ObStoreAccessType::ROW_LOCK,
dml_param,
lock_wait_timeout_ts,
tx_desc,
tablet_handle,
ctx_guard))) {
@ -1002,7 +1015,6 @@ int ObAccessService::lock_rows(
ret = tablet_service->lock_rows(tablet_handle,
ctx_guard.get_store_ctx(),
dml_param,
abs_lock_timeout,
lock_flag,
false,
row_iter,
@ -1027,6 +1039,7 @@ int ObAccessService::lock_row(
ObLSTabletService *tablet_service = nullptr;
// Attention!!! This handle is only used for ObLSTabletService, will be reset inside ObLSTabletService.
ObTabletHandle tablet_handle;
int64_t lock_wait_timeout_ts = get_lock_wait_timeout_(abs_lock_timeout, dml_param.timeout_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ob access service is not running.", K(ret));
@ -1042,6 +1055,7 @@ int ObAccessService::lock_row(
tablet_id,
ObStoreAccessType::ROW_LOCK,
dml_param,
lock_wait_timeout_ts,
tx_desc,
tablet_handle,
ctx_guard))) {
@ -1056,7 +1070,6 @@ int ObAccessService::lock_row(
ret = tablet_service->lock_row(tablet_handle,
ctx_guard.get_store_ctx(),
dml_param,
abs_lock_timeout,
row,
lock_flag,
false);
@ -1327,6 +1340,5 @@ int ObAccessService::audit_tablet_opt_dml_stat(
}
return ret;
}
}
}

View File

@ -225,6 +225,7 @@ protected:
const common::ObTabletID &tablet_id,
const ObStoreAccessType access_type,
const ObDMLBaseParam &dml_param,
const int64_t lock_wait_timeout_ts,
transaction::ObTxDesc &tx_desc,
ObTabletHandle &tablet_handle,
ObStoreCtxGuard &ctx_guard);
@ -243,6 +244,11 @@ protected:
const share::SCN &snapshot,
ObTabletHandle &tablet_handle,
ObStoreCtxGuard &ctx_guard);
static OB_INLINE int64_t get_lock_wait_timeout_(const int64_t abs_lock_timeout, const int64_t stmt_timeout)
{
return (abs_lock_timeout < 0 ? stmt_timeout : (abs_lock_timeout > stmt_timeout ? stmt_timeout : abs_lock_timeout));
}
private:
bool is_inited_;
uint64_t tenant_id_;

View File

@ -56,6 +56,7 @@ int MockObAccessService::insert_rows(
tablet_id,
ObStoreAccessType::MODIFY,
dml_param,
dml_param.timeout_,
tx_desc,
tablet_handle,
ctx_guard))) {

View File

@ -526,7 +526,7 @@ int ObTxNode::read(const ObTxReadSnapshot &snapshot,
OZ(txs_.get_read_store_ctx(snapshot, false, 5000ll * 1000, read_store_ctx));
// HACK, refine: mock LS's each member in some way
read_store_ctx.mvcc_acc_ctx_.tx_table_guards_.tx_table_guard_.init(&fake_tx_table_);
read_store_ctx.mvcc_acc_ctx_.abs_lock_timeout_ = ObTimeUtility::current_time() + 5000ll * 1000;
read_store_ctx.mvcc_acc_ctx_.abs_lock_timeout_ts_ = ObTimeUtility::current_time() + 5000ll * 1000;
blocksstable::ObDatumRow row;
{
ObTableIterParam iter_param;