[4.1] donot lock when lock on the same row twice
This commit is contained in:
@ -3016,6 +3016,52 @@ TEST_F(TestMemtableV2, test_seq_set_violation)
|
||||
memtable->destroy();
|
||||
}
|
||||
|
||||
TEST_F(TestMemtableV2, test_parallel_lock_with_same_txn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMemtable *memtable = create_memtable();
|
||||
|
||||
TRANS_LOG(INFO, "######## CASE1: lock row into memtable parallelly");
|
||||
ObDatumRowkey rowkey;
|
||||
ObStoreRow write_row;
|
||||
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
|
||||
2, /*value*/
|
||||
rowkey,
|
||||
write_row));
|
||||
|
||||
ObTransID write_tx_id = ObTransID(1);
|
||||
ObStoreCtx *wtx = start_tx(write_tx_id);
|
||||
share::SCN scn_1000;
|
||||
scn_1000.convert_for_tx(1000);
|
||||
|
||||
// Step1: prepare the global sequence
|
||||
ObSequence::inc();
|
||||
int64_t read_seq_no = ObSequence::get_max_seq_no();
|
||||
|
||||
// Step2: init the mvcc acc ctx
|
||||
wtx->mvcc_acc_ctx_.type_ = ObMvccAccessCtx::T::WRITE;
|
||||
wtx->mvcc_acc_ctx_.snapshot_.tx_id_ = wtx->mvcc_acc_ctx_.tx_id_;
|
||||
wtx->mvcc_acc_ctx_.snapshot_.version_ = scn_1000;
|
||||
wtx->mvcc_acc_ctx_.snapshot_.scn_ = read_seq_no;
|
||||
const int64_t abs_expire_time = 10000000000 + ::oceanbase::common::ObTimeUtility::current_time();
|
||||
wtx->mvcc_acc_ctx_.abs_lock_timeout_ = abs_expire_time;
|
||||
wtx->mvcc_acc_ctx_.tx_scn_ = ObSequence::inc_and_get_max_seq_no();
|
||||
|
||||
// Step3: lock for the first time
|
||||
EXPECT_EQ(OB_SUCCESS, (ret = memtable->lock(*wtx,
|
||||
tablet_id_.id(),
|
||||
read_info_,
|
||||
rowkey)));
|
||||
|
||||
// Step4: lock for the second time
|
||||
wtx->mvcc_acc_ctx_.tx_scn_ = ObSequence::inc_and_get_max_seq_no();
|
||||
EXPECT_EQ(OB_SUCCESS, (ret = memtable->lock(*wtx,
|
||||
tablet_id_.id(),
|
||||
read_info_,
|
||||
rowkey)));
|
||||
memtable->destroy();
|
||||
}
|
||||
|
||||
|
||||
} // namespace unittest
|
||||
|
||||
|
||||
@ -564,7 +564,7 @@ int ObMemtable::check_row_locked_by_myself(
|
||||
const ObIArray<ObITable *> *stores = nullptr;
|
||||
common::ObSEArray<ObITable *, 4> iter_tables;
|
||||
ctx.table_iter_->resume();
|
||||
auto my_tx_id = ctx.mvcc_acc_ctx_.get_tx_id();
|
||||
ObTransID my_tx_id = ctx.mvcc_acc_ctx_.get_tx_id();
|
||||
while (OB_SUCC(ret)) {
|
||||
ObITable *table_ptr = nullptr;
|
||||
if (OB_FAIL(ctx.table_iter_->get_next(table_ptr))) {
|
||||
@ -1247,7 +1247,26 @@ int ObMemtable::lock_row_on_frozen_stores_(ObStoreCtx &ctx,
|
||||
value->set_lower_lock_scaned();
|
||||
TRANS_LOG(DEBUG, "lower lock check finish", K(*value), K(*stores));
|
||||
} else {
|
||||
// there is the lock on frozen stores.
|
||||
// There is the lock on frozen stores by my self
|
||||
|
||||
// If the lock is locked by myself and the locker's tnode is DF_LOCK, it
|
||||
// means the row is under my control and new lock is unnecessary for the
|
||||
// semantic of the LOCK dml. So we remove the lock tnode here and report
|
||||
// the success of the mvcc_write .
|
||||
//
|
||||
// NB: You need pay attention to the requirement of the parallel das
|
||||
// update. It may insert two locks on the same row in the same sql. So
|
||||
// it will cause two upside down lock(which means smaller lock tnode
|
||||
// lies ahead the bigger one). So the optimization here is essential.
|
||||
if (res.has_insert()
|
||||
&& lock_state.lock_trans_id_ == my_tx_id
|
||||
&& blocksstable::ObDmlFlag::DF_LOCK == arg.data_->dml_flag_) {
|
||||
(void)mvcc_engine_.mvcc_undo(value);
|
||||
res.need_insert_ = false;
|
||||
}
|
||||
|
||||
// We need check whether the same row is operated by same txn
|
||||
// concurrently to prevent undesirable resuly.
|
||||
if (res.has_insert() && // we only need check when the node is exactly inserted
|
||||
OB_FAIL(concurrent_control::check_sequence_set_violation(ctx.mvcc_acc_ctx_.write_flag_,
|
||||
reader_seq_no,
|
||||
@ -2593,8 +2612,24 @@ int ObMemtable::mvcc_write_(storage::ObStoreCtx &ctx,
|
||||
value,
|
||||
read_info,
|
||||
res))) {
|
||||
if (OB_UNLIKELY(!res.is_new_locked_) && OB_TRY_LOCK_ROW_CONFLICT == ret) {
|
||||
TRANS_LOG(ERROR, "double lock detected", K(*key), K(*value), K(ctx));
|
||||
// Double lock detection is used to prevent that the row who has been
|
||||
// operated by the same txn before will be unexpectedly conflicted with
|
||||
// other writes in sstable. So we report the error when conflict is
|
||||
// discovered with the data operation is not the first time.
|
||||
//
|
||||
// TIP: While we need notice that only the tnode which has been operated
|
||||
// successfully this time need to be checked with double lock detection.
|
||||
// Because under the case of parallel lock(the same row may be locked by the
|
||||
// different threads under the same txn parallelly. You can understand the
|
||||
// behavior through das for update), two lock operation may be inserted
|
||||
// parallelly for the same row in the memtable, while both lock may fail
|
||||
// with conflicts even the second lock operate successfully(the lock will be
|
||||
// pretended to insert successfully at the beginning of mvcc_write and fail
|
||||
// to pass the sstable row lock check for performance issue).
|
||||
if (OB_UNLIKELY(!res.is_new_locked_)
|
||||
&& res.has_insert()
|
||||
&& OB_TRY_LOCK_ROW_CONFLICT == ret) {
|
||||
TRANS_LOG(ERROR, "double lock detected", K(*key), K(*value), K(ctx), K(res));
|
||||
}
|
||||
if (!res.has_insert()) {
|
||||
TRANS_LOG(ERROR, "sstable conflict will occurred when already inserted", K(ctx), KPC(this));
|
||||
|
||||
Reference in New Issue
Block a user