/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include #define private public #define protected public #include "storage/memtable/ob_memtable.h" #include "share/rc/ob_tenant_base.h" #include "mtlenv/mock_tenant_module_env.h" #include "storage/tx/ob_mock_tx_ctx.h" #include "storage/tx_table/ob_tx_table.h" #include "storage/memtable/mvcc/ob_mvcc_row.h" #include "storage/init_basic_struct.h" #include "observer/ob_safe_destroy_thread.h" namespace oceanbase { using namespace common; using namespace share; using namespace memtable; using namespace storage; using namespace transaction; using namespace blocksstable; namespace storage { int ObTxTable::online() { ATOMIC_INC(&epoch_); ATOMIC_STORE(&state_, TxTableState::ONLINE); return OB_SUCCESS; } } // namespace storage namespace memtable { int ObMvccWriteGuard::write_auth(storage::ObStoreCtx &) { return OB_SUCCESS; } ObMvccWriteGuard::~ObMvccWriteGuard() {} void *ObMemtableCtx::callback_alloc(const int64_t size) { void* ret = NULL; if (OB_ISNULL(ret = std::malloc(size))) { TRANS_LOG_RET(ERROR, OB_ALLOCATE_MEMORY_FAILED, "callback alloc error, no memory", K(size), K(*this)); } else { ATOMIC_FAA(&callback_mem_used_, size); ATOMIC_INC(&callback_alloc_count_); } return ret; } void ObMemtableCtx::callback_free(ObITransCallback *cb) { if (OB_ISNULL(cb)) { TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "cb is null, unexpected error", KP(cb), K(*this)); } else { ATOMIC_INC(&callback_free_count_); std::free(cb); cb = NULL; } } int ObMvccRow::check_double_insert_(const share::SCN , ObMvccTransNode &, ObMvccTransNode *) { return OB_SUCCESS; } // int ObTxEndFunctor::operator()(ObITransCallback *callback) // { // int ret = OB_SUCCESS; // if (NULL == callback) { // ret = OB_ERR_UNEXPECTED; // TRANS_LOG(ERROR, "unexpected callback", KP(callback)); // } else if (is_commit_ // && OB_FAIL(callback->trans_commit())) { // TRANS_LOG(ERROR, "trans commit failed", KPC(callback)); // } else if (!is_commit_ // && OB_FAIL(callback->trans_abort())) { // TRANS_LOG(ERROR, "trans abort failed", KPC(callback)); // } else { // need_remove_callback_ = true; // } // return ret; // } } // namespace memtable namespace unittest { int64_t TENANT_ID = 1; int64_t LSID = 1001; class TestMemtableV2 : public ::testing::Test { public: TestMemtableV2() : ls_id_(LSID), tablet_id_(50001), tenant_id_(TENANT_ID), rowkey_cnt_(1), value_cnt_(1), iter_param_(), columns_(), allocator_(), allocator2_(), read_info_(), trans_version_range_(), query_flag_() { columns_.reset(); read_info_.reset(); query_flag_.use_row_cache_ = ObQueryFlag::DoNotUseCache; query_flag_.set_not_use_fuse_row_cache(); } private: static ObTxTable tx_table_; static bool is_sstable_contains_lock_; public: virtual void SetUp() override { // mock sequence no ObClockGenerator::init(); // mock tx table ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01)); EXPECT_EQ(OB_SUCCESS, ls_tx_ctx_mgr_.init(tenant_id_, /*tenant_id*/ ls_id_, &tx_table_, (ObLockTable*)(0x01), (ObITsMgr *)(0x01), (ObTransService *)(0x01), &palf_param, nullptr)); EXPECT_EQ(OB_SUCCESS, tx_table_.tx_ctx_table_.init(ls_id_)); tx_table_.online(); tx_table_.is_inited_ = true; tx_table_.ls_ = &ls_; // mock columns EXPECT_EQ(OB_SUCCESS, mock_col_desc()); // mock iterator parameter EXPECT_EQ(OB_SUCCESS, mock_iter_param()); // mock trans version range EXPECT_EQ(OB_SUCCESS, mock_trans_version_range()); // is_sstable_contain_lock is_sstable_contains_lock_ = false; TRANS_LOG(INFO, "setup success"); } virtual void TearDown() override { // reset iterator parameter reset_iter_param(); // reset columns columns_.reset(); // reset tx table ls_tx_ctx_mgr_.reset(); ls_tx_ctx_mgr_.ls_tx_ctx_map_.reset(); // reset sequence no ObClockGenerator::destroy(); // reset trans version range trans_version_range_.reset(); // reset allocator allocator_.reset(); allocator2_.reset(); TRANS_LOG(INFO, "teardown success"); } static void SetUpTestCase() { TRANS_LOG(INFO, "SetUpTestCase"); EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init()); SAFE_DESTROY_INSTANCE.init(); SAFE_DESTROY_INSTANCE.start(); ObServerCheckpointSlogHandler::get_instance().is_started_ = true; // create ls ObCreateLSArg arg; EXPECT_EQ(OB_SUCCESS, gen_create_ls_arg(TENANT_ID, ObLSID(LSID), arg)); ObLSService* ls_svr = MTL(ObLSService*); EXPECT_EQ(OB_SUCCESS, ls_svr->create_ls(arg)); } static void TearDownTestCase() { // remove ls ObLSID ls_id(1001); ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->remove_ls(ls_id, false)); SAFE_DESTROY_INSTANCE.stop(); SAFE_DESTROY_INSTANCE.wait(); SAFE_DESTROY_INSTANCE.destroy(); MockTenantModuleEnv::get_instance().destroy(); TRANS_LOG(INFO, "TearDownTestCase"); } public: ObMemtable *create_memtable() { int ret = OB_SUCCESS; ObITable::TableKey table_key; table_key.table_type_ = ObITable::DATA_MEMTABLE; table_key.tablet_id_ = ObTabletID(tablet_id_.id()); table_key.scn_range_.start_scn_.convert_for_tx(1); table_key.scn_range_.end_scn_.set_max(); ObLSService* ls_svr = MTL(ObLSService*); ObLSHandle ls_handle; EXPECT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD)); ObMemtable *memtable = new ObMemtable(); ObFreezer *freezer = new ObFreezer; freezer->ls_ = ls_handle.get_ls(); ObTabletMemtableMgr *memtable_mgr = new ObTabletMemtableMgr; int64_t schema_version = 1; uint32_t freeze_clock = 0; EXPECT_EQ(OB_SUCCESS, memtable->init(table_key, ls_handle, freezer, memtable_mgr, schema_version, freeze_clock)); return memtable; } void rollback_to_txn(ObStoreCtx *store_ctx, const int64_t from, const int64_t to) { ObUndoAction undo(from, to); ObPartTransCtx *tx_ctx = store_ctx->mvcc_acc_ctx_.tx_ctx_; EXPECT_EQ(OB_SUCCESS, tx_ctx->ctx_tx_data_.add_undo_action(undo)); ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_; ObTxCallbackList &cb_list = mt_ctx->trans_mgr_.callback_list_; for (ObMvccRowCallback *iter = (ObMvccRowCallback *)(cb_list.get_guard()->get_next()); iter != (ObMvccRowCallback *)(cb_list.get_guard()); iter = (ObMvccRowCallback *)(iter->get_next())) { if (iter->seq_no_ > to) { iter->tnode_->set_delayed_cleanout(true); } } } void fast_commit_txn(ObStoreCtx *store_ctx) { ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_; ObTxCallbackList &cb_list = mt_ctx->trans_mgr_.callback_list_; ObMvccRowCallback *next = NULL; for (ObMvccRowCallback *iter = (ObMvccRowCallback *)(cb_list.get_guard()->get_next()); iter != (ObMvccRowCallback *)(cb_list.get_guard()); iter = next) { next = (ObMvccRowCallback *)(iter->get_next()); iter->tnode_->set_delayed_cleanout(true); iter->remove(); } } void prepare_txn(ObStoreCtx *store_ctx, const int64_t prepare_version) { share::SCN prepare_scn; prepare_scn.convert_for_tx(prepare_version); ObPartTransCtx *tx_ctx = store_ctx->mvcc_acc_ctx_.tx_ctx_; ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_; tx_ctx->exec_info_.state_ = ObTxState::PREPARE; tx_ctx->exec_info_.prepare_version_ = prepare_scn; mt_ctx->trans_version_ = prepare_scn; } void commit_txn(ObStoreCtx *store_ctx, const int64_t commit_version, const bool need_write_back = false) { share::SCN commit_scn; commit_scn.convert_for_tx(commit_version); ObPartTransCtx *tx_ctx = store_ctx->mvcc_acc_ctx_.tx_ctx_; ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_; tx_ctx->exec_info_.state_ = ObTxState::COMMIT; tx_ctx->ctx_tx_data_.set_commit_version(commit_scn); tx_ctx->ctx_tx_data_.set_state(ObTxData::COMMIT); if (need_write_back) { EXPECT_EQ(OB_SUCCESS, mt_ctx->trans_end(true, /*commit*/ commit_scn, commit_scn /*commit log ts*/)); } else { ObTxCallbackList &cb_list = mt_ctx->trans_mgr_.callback_list_; for (ObMvccRowCallback *iter = (ObMvccRowCallback *)(cb_list.get_guard()->get_next()); iter != (ObMvccRowCallback *)(cb_list.get_guard()); iter = (ObMvccRowCallback *)(iter->get_next())) { iter->tnode_->set_delayed_cleanout(true); } } } void abort_txn(ObStoreCtx *store_ctx, const bool need_write_back = false) { ObPartTransCtx *tx_ctx = store_ctx->mvcc_acc_ctx_.tx_ctx_; ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_; tx_ctx->exec_info_.state_ = ObTxState::ABORT; tx_ctx->ctx_tx_data_.set_state(ObTxData::ABORT); if (need_write_back) { EXPECT_EQ(OB_SUCCESS, mt_ctx->trans_end(false, /*commit*/ share::SCN::min_scn() /*commit version*/, share::SCN::max_scn())); } else { ObTxCallbackList &cb_list = mt_ctx->trans_mgr_.callback_list_; for (ObMvccRowCallback *iter = (ObMvccRowCallback *)(cb_list.get_guard()->get_next()); iter != (ObMvccRowCallback *)(cb_list.get_guard()); iter = (ObMvccRowCallback *)(iter->get_next())) { iter->tnode_->set_delayed_cleanout(true); } } } ObStoreCtx *start_tx(const ObTransID &tx_id, const bool for_replay = false) { ObTxDesc *tx_desc = new ObTxDesc(); tx_desc->state_ = ObTxDesc::State::ACTIVE; tx_desc->tx_id_ = tx_id; tx_desc->isolation_ = ObTxIsolationLevel::RC; // used by write conflict error resolve ObStoreCtx *store_ctx = new ObStoreCtx; MockObTxCtx *tx_ctx = new MockObTxCtx; ObTxData *tx_data = new ObTxData; tx_data->reset(); tx_data->tx_id_ = tx_id; tx_ctx->init(ls_id_, tx_id, &ls_tx_ctx_mgr_, tx_data, // ObTxData NULL); // mailbox_mgr store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ = 0; // nowait store_ctx->mvcc_acc_ctx_.tx_desc_ = tx_desc; store_ctx->mvcc_acc_ctx_.tx_id_ = tx_id; store_ctx->mvcc_acc_ctx_.tx_ctx_ = tx_ctx; store_ctx->mvcc_acc_ctx_.mem_ctx_ = &(tx_ctx->mt_ctx_); store_ctx->mvcc_acc_ctx_.mem_ctx_->set_trans_ctx(tx_ctx); store_ctx->mvcc_acc_ctx_.mem_ctx_->get_tx_table_guard()->init(&tx_table_); tx_ctx->mt_ctx_.log_gen_.set(&(tx_ctx->mt_ctx_.trans_mgr_), &(tx_ctx->mt_ctx_)); store_ctx->mvcc_acc_ctx_.snapshot_.tx_id_ = tx_id; store_ctx->mvcc_acc_ctx_.tx_table_guard_.init(&tx_table_); if (for_replay) { store_ctx->mvcc_acc_ctx_.mem_ctx_->commit_to_replay(); } if (ObTransID(READ_TX_ID) != tx_id) { EXPECT_EQ(OB_SUCCESS, ls_tx_ctx_mgr_.ls_tx_ctx_map_.insert_and_get(tx_id, tx_ctx, NULL)); } return store_ctx; } void start_stmt(ObStoreCtx *store_ctx, const share::SCN snapshot_scn, const int64_t expire_time = 10000000000) { ObSequence::inc(); store_ctx->mvcc_acc_ctx_.type_ = ObMvccAccessCtx::T::WRITE; store_ctx->mvcc_acc_ctx_.snapshot_.tx_id_ = store_ctx->mvcc_acc_ctx_.tx_id_; store_ctx->mvcc_acc_ctx_.snapshot_.version_ = snapshot_scn; store_ctx->mvcc_acc_ctx_.snapshot_.scn_ = ObSequence::get_max_seq_no(); const int64_t abs_expire_time = expire_time + ::oceanbase::common::ObTimeUtility::current_time(); store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ = abs_expire_time; store_ctx->mvcc_acc_ctx_.tx_scn_ = ObSequence::inc_and_get_max_seq_no(); } void start_pdml_stmt(ObStoreCtx *store_ctx, const share::SCN snapshot_scn, const int64_t read_seq_no, const int64_t expire_time = 10000000000) { ObSequence::inc(); store_ctx->mvcc_acc_ctx_.type_ = ObMvccAccessCtx::T::WRITE; store_ctx->mvcc_acc_ctx_.snapshot_.tx_id_ = store_ctx->mvcc_acc_ctx_.tx_id_; store_ctx->mvcc_acc_ctx_.snapshot_.version_ = snapshot_scn; store_ctx->mvcc_acc_ctx_.snapshot_.scn_ = read_seq_no; const int64_t abs_expire_time = expire_time + ::oceanbase::common::ObTimeUtility::current_time(); store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ = abs_expire_time; store_ctx->mvcc_acc_ctx_.tx_scn_ = ObSequence::inc_and_get_max_seq_no(); } void print_callback(ObStoreCtx *wtx) { TRANS_LOG(INFO, "========== START PRINT CALLBACK ===========", K(wtx->mvcc_acc_ctx_.tx_id_)); wtx->mvcc_acc_ctx_.mem_ctx_->print_callbacks(); TRANS_LOG(INFO, "=========== END PRINT CALLBACK ============", K(wtx->mvcc_acc_ctx_.tx_id_)); } void write_tx(ObStoreCtx *wtx, ObMemtable *memtable, const int64_t snapshot, const ObStoreRow &write_row, const int expect_ret = OB_SUCCESS, const int64_t expire_time = 10000000000) { int ret = OB_SUCCESS; TRANS_LOG(INFO, "====================== start write tx =====================", K(wtx->mvcc_acc_ctx_.tx_id_), K(*wtx), K(snapshot), K(expire_time), K(write_row)); share::SCN snapshot_scn; snapshot_scn.convert_for_tx(snapshot); start_stmt(wtx, snapshot_scn, expire_time); EXPECT_EQ(expect_ret, (ret = memtable->set(*wtx, tablet_id_.id(), read_info_, columns_, write_row))); TRANS_LOG(INFO, "======================= end write tx ======================", K(ret), K(wtx->mvcc_acc_ctx_.tx_id_), K(*wtx), K(snapshot), K(expire_time), K(write_row)); } void lock_tx(ObStoreCtx *ltx, ObMemtable *memtable, const int64_t snapshot, const ObDatumRowkey &rowkey, const int expect_ret = OB_SUCCESS, const int64_t expire_time = 10000000000) { int ret = OB_SUCCESS; TRANS_LOG(INFO, "====================== start lock tx =====================", K(ltx->mvcc_acc_ctx_.tx_id_), K(*ltx), K(snapshot), K(expire_time), K(rowkey)); share::SCN snapshot_scn; snapshot_scn.convert_for_tx(snapshot); start_stmt(ltx, snapshot_scn, expire_time); EXPECT_EQ(expect_ret, (ret = memtable->lock(*ltx, tablet_id_.id(), read_info_, rowkey))); TRANS_LOG(INFO, "======================= end lock tx ======================", K(ret), K(ltx->mvcc_acc_ctx_.tx_id_), K(*ltx), K(snapshot), K(expire_time), K(rowkey)); } void write_no_value_tx(ObStoreCtx *ltx, ObMemtable *memtable, const int64_t snapshot, const ObDatumRowkey &rowkey, const int expect_ret = OB_SUCCESS, const int64_t expire_time = 10000000000) { int ret = OB_SUCCESS; TRANS_LOG(INFO, "====================== start lock tx =====================", K(ltx->mvcc_acc_ctx_.tx_id_), K(*ltx), K(snapshot), K(expire_time), K(rowkey)); share::SCN snapshot_scn; snapshot_scn.convert_for_tx(snapshot); start_stmt(ltx, snapshot_scn, expire_time); EXPECT_EQ(expect_ret, (ret = memtable->lock(*ltx, tablet_id_.id(), read_info_, rowkey))); ObMvccTransNode *node = get_tx_last_tnode(ltx); ((ObMemtableDataHeader *)(node->buf_))->dml_flag_ = blocksstable::ObDmlFlag::DF_INSERT; TRANS_LOG(INFO, "mock tnode with no value", KPC(node)); TRANS_LOG(INFO, "======================= end lock tx ======================", K(ret), K(ltx->mvcc_acc_ctx_.tx_id_), K(*ltx), K(snapshot), K(expire_time), K(rowkey)); } bool is_write_set_empty(ObStoreCtx *ctx) { ObMemtableCtx *mem_ctx = ctx->mvcc_acc_ctx_.mem_ctx_; ObTxCallbackList &callback_list = mem_ctx->trans_mgr_.callback_list_; return 0 == callback_list.length_; } ObMvccRowCallback *get_tx_last_cb(ObStoreCtx *store_ctx) { ObMemtableCtx *mem_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_; ObTxCallbackList &callback_list = mem_ctx->trans_mgr_.callback_list_; return (ObMvccRowCallback *)callback_list.get_tail(); } ObMvccRowCallback *get_tx_first_cb(ObStoreCtx *store_ctx) { ObMemtableCtx *mem_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_; ObTxCallbackList &callback_list = mem_ctx->trans_mgr_.callback_list_; return (ObMvccRowCallback *)callback_list.get_guard()->next_; } ObMvccTransNode *get_tx_last_tnode(ObStoreCtx *store_ctx) { return get_tx_last_cb(store_ctx)->tnode_; } ObMvccTransNode *get_tx_first_tnode(ObStoreCtx *store_ctx) { return get_tx_first_cb(store_ctx)->tnode_; } ObMvccRow *get_tx_last_mvcc_row(ObStoreCtx *store_ctx) { return &(get_tx_last_cb(store_ctx)->value_); } ObMvccRow *get_tx_first_mvcc_row(ObStoreCtx *store_ctx) { return &(get_tx_first_cb(store_ctx)->value_); } void read_row(ObMemtable *memtable, const ObDatumRowkey &rowkey, const int64_t snapshot, int64_t k, int64_t v, const bool exist = true, const int expect_ret = OB_SUCCESS, const int64_t expire_time = 10000000000) { ObTransID read_tx_id = ObTransID(READ_TX_ID); ObStoreCtx *rtx = start_tx(read_tx_id); read_row(rtx, memtable, rowkey, snapshot, k, v, exist, expect_ret, expire_time); } void read_row(ObStoreCtx *rtx, ObMemtable *memtable, const ObDatumRowkey &rowkey, const int64_t snapshot, int64_t k, int64_t v, const bool exist = true, const int expect_ret = OB_SUCCESS, const int64_t expire_time = 10000000000) { int ret = OB_SUCCESS; ObDatumRow read_row; ObTableAccessContext access_context; TRANS_LOG(INFO, "====================== start read row =====================", K(rtx->mvcc_acc_ctx_.tx_id_), K(*rtx), K(snapshot), K(expire_time)); share::SCN snapshot_scn; snapshot_scn.convert_for_tx(snapshot); start_stmt(rtx, snapshot_scn, expire_time); EXPECT_EQ(OB_SUCCESS, access_context.init(query_flag_, *rtx, allocator_, trans_version_range_)); EXPECT_EQ(expect_ret, (ret = memtable->get(iter_param_, access_context, rowkey, read_row))); if (OB_SUCC(ret)) { if (!exist) { EXPECT_EQ(true, read_row.row_flag_.is_not_exist()); } else { EXPECT_EQ(true, read_row.row_flag_.is_exist()); ObStorageDatum *cells = read_row.storage_datums_; int64_t count = read_row.count_; EXPECT_EQ(rowkey_cnt_ + value_cnt_, count); for (int64_t i = 0; i < count; i++) { int64_t row_v; row_v = cells[i].get_int(); if (i == 0) { EXPECT_EQ(k, row_v); } else if (i == 1) { EXPECT_EQ(v, row_v); } else { ob_abort(); } } } } TRANS_LOG(INFO, "read row success", K(ret), K(read_row)); TRANS_LOG(INFO, "====================== end read row =====================", K(rtx->mvcc_acc_ctx_.tx_id_), K(*rtx), K(snapshot), K(expire_time)); } void compact_row(ObMvccRow *row, ObMemtable *memtable, int64_t snapshot_version, const bool for_replay) { ASSERT_NE(NULL, (long)row); TRANS_LOG(INFO, "====================== start compact row =====================", K(*row), K(snapshot_version)); share::SCN snapshot_scn; snapshot_scn.convert_for_tx(snapshot_version); EXPECT_EQ(OB_SUCCESS, row->row_compact(memtable, for_replay, snapshot_scn, &allocator2_)); TRANS_LOG(INFO, "====================== end compact row =====================", K(*row), K(snapshot_version)); } int mock_col_desc() { share::schema::ObColDesc col_desc; col_desc.col_id_ = OB_APP_MIN_COLUMN_ID; col_desc.col_type_.set_type(ObIntType); col_desc.col_type_.set_collation_type(CS_TYPE_UTF8MB4_BIN); columns_.push_back(col_desc); share::schema::ObColDesc col_desc2; col_desc2.col_id_ = OB_APP_MIN_COLUMN_ID + 1; col_desc2.col_type_.set_type(ObIntType); col_desc2.col_type_.set_collation_type(CS_TYPE_UTF8MB4_BIN); columns_.push_back(col_desc2); return OB_SUCCESS; } int mock_row(const int64_t key, const int64_t value, ObDatumRowkey &rowkey, ObStoreRow &row) { rowkey_datums_[0].set_int(key); rowkey_datums_[1].set_int(value); rowkey.assign(rowkey_datums_, 1); ObObj *obj = new ObObj[2]; obj[0].set_int(key); obj[1].set_int(value); row.row_val_.cells_ = obj; row.row_val_.count_ = 2; row.row_val_.projector_ = NULL; row.flag_.set_flag(ObDmlFlag::DF_INSERT); rowkey.store_rowkey_.assign(obj, 1); return OB_SUCCESS; } int mock_delete(const int64_t key, ObDatumRowkey &rowkey, ObStoreRow &row) { rowkey_datums_[0].set_int(key); rowkey.assign(rowkey_datums_, 1); ObObj *obj = new ObObj[1]; obj[0].set_int(key); row.row_val_.cells_ = obj; row.row_val_.count_ = 2; row.row_val_.projector_ = NULL; row.flag_.set_flag(ObDmlFlag::DF_DELETE); rowkey.store_rowkey_.assign(obj, 1); return OB_SUCCESS; } int mock_row(const int64_t key, const int64_t value, ObStoreRowkey &rowkey, ObStoreRow &row) { ObObj *obj = new ObObj[2]; obj[0].set_int(key); obj[1].set_int(value); rowkey.assign(obj, 1); row.row_val_.cells_ = obj; row.row_val_.count_ = 2; row.row_val_.projector_ = NULL; row.flag_.set_flag(ObDmlFlag::DF_INSERT); return OB_SUCCESS; } void mock_replay_iterator(ObStoreCtx *store_ctx, ObMemtableMutatorIterator &mmi) { mmi.reset(); int64_t serialize_pos = 0; int64_t deserialize_pos = 0; ObCLogEncryptInfo encrypt_info; ObRedoLogSubmitHelper helper; ObIMemtableCtx *mem_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_; char *redo_log_buffer = new char[REDO_BUFFER_SIZE]; encrypt_info.init(); EXPECT_EQ(OB_SUCCESS, mem_ctx->fill_redo_log(redo_log_buffer, REDO_BUFFER_SIZE, serialize_pos, helper)); EXPECT_EQ(OB_SUCCESS, mmi.deserialize(redo_log_buffer, serialize_pos, deserialize_pos, encrypt_info)); } void replay_tx(ObStoreCtx *store_ctx, ObMemtable *memtable, const int64_t replay_log_ts, ObMemtableMutatorIterator &mmi) { int ret = OB_SUCCESS; bool can_continue = true; while (can_continue) { if (OB_FAIL(mmi.iterate_next_row())) { if (OB_ITER_END != ret) { TRANS_LOG(ERROR, "get row head failed", K(ret)); } can_continue = false; } else { ObEncryptRowBuf row_buf; TRANS_LOG(INFO, "TEST_MEMTABLE V2: replay row", K(*store_ctx)); share::SCN replay_scn; replay_scn.convert_for_tx(replay_log_ts); store_ctx->mvcc_acc_ctx_.mem_ctx_->set_redo_scn(replay_scn); EXPECT_EQ(OB_SUCCESS, memtable->replay_row(*store_ctx, &mmi, row_buf)); } } } int mock_iter_param() { // iter_param_.rowkey_cnt_ = rowkey_cnt_; iter_param_.tablet_id_ = tablet_id_; iter_param_.table_id_ = tablet_id_.id(); read_info_.init(allocator_, 16000, rowkey_cnt_, lib::is_oracle_mode(), columns_); iter_param_.read_info_ = &read_info_; return OB_SUCCESS; } void reset_iter_param() { iter_param_.reset(); read_info_.reset(); } int mock_trans_version_range() { trans_version_range_.base_version_ = 0; trans_version_range_.multi_version_start_ = 0; trans_version_range_.snapshot_version_ = INT64_MAX - 2; return OB_SUCCESS; } void verify_cb(ObMvccRowCallback *cb, const ObMemtable *mt, const int64_t seq_no, const int64_t k, const bool is_link = true, const bool need_fill_redo = true, const int64_t log_ts = INT64_MAX) { ASSERT_NE(NULL, (long)cb); TRANS_LOG(INFO, "=============== VERIFY TRANS CALLBACK START ===============", K(*cb)); share::SCN scn; scn.convert_for_tx(log_ts); EXPECT_EQ(mt, cb->memtable_); EXPECT_EQ(scn, cb->scn_); EXPECT_EQ(is_link, cb->is_link_); EXPECT_EQ(need_fill_redo, cb->need_fill_redo_); EXPECT_EQ(need_fill_redo, cb->need_submit_log_); EXPECT_EQ(seq_no, cb->seq_no_); ObStoreRowkey *rowkey = cb->key_.rowkey_; ObObj *key = rowkey->get_obj_ptr(); int64_t row_k; key->get_int(row_k); EXPECT_EQ(k, row_k); TRANS_LOG(INFO, "=============== VERIFY TRANS CALLBACK END ===============", K(*cb)); } void verify_tnode(const ObMvccTransNode *tnode, const ObMvccTransNode *prev, const ObMvccTransNode *next, const ObMemtable *mt, const ObTransID &tx_id, const int64_t trans_version, const int64_t seq_no, const uint32_t modify_count, const uint8_t tnode_flag, const ObDmlFlag dml_flag, const int64_t k, const int64_t v, const int64_t log_ts = INT64_MAX, const uint8_t ndt_type = NDT_NORMAL) { ASSERT_NE(NULL, (long)tnode); TRANS_LOG(INFO, "=============== VERIFY TRANS NODE START ===============", K(*tnode)); share::SCN scn; scn.convert_for_tx(log_ts); EXPECT_EQ(tx_id, tnode->tx_id_); EXPECT_EQ(trans_version, tnode->trans_version_.get_val_for_tx()); EXPECT_EQ(scn, tnode->scn_); EXPECT_EQ(seq_no, tnode->seq_no_); EXPECT_EQ(prev, tnode->prev_); EXPECT_EQ(next, tnode->next_); EXPECT_EQ(modify_count, tnode->modify_count_); // EXPECT_EQ(0, tnode->acc_checksum_); EXPECT_EQ(mt->get_timestamp(), tnode->version_); EXPECT_EQ(ndt_type, tnode->type_); EXPECT_EQ(tnode_flag, tnode->flag_); int ret = OB_SUCCESS; const ObMemtableDataHeader *mtd = reinterpret_cast(tnode->buf_); ObArenaAllocator allocator; ObDatumRow datum_row; ObRowReader row_reader; const blocksstable::ObRowHeader *row_header = nullptr; if (OB_FAIL(row_reader.read_row_header(mtd->buf_, mtd->buf_len_, row_header))) { CLOG_LOG(WARN, "Failed to read row header", K(ret)); } else if (OB_FAIL(datum_row.init(allocator, row_header->get_column_count()))) { CLOG_LOG(WARN, "Failed to init datum row", K(ret)); } else if (OB_FAIL(row_reader.read_row(mtd->buf_, mtd->buf_len_, nullptr, datum_row))) { CLOG_LOG(WARN, "Failed to read datum row", K(ret)); } else { EXPECT_EQ(dml_flag, mtd->dml_flag_); TRANS_LOG(INFO, "TEST_MEMTABLE_V2 row: ", K(*tnode), K(mtd)); } for (int64_t i = 0; OB_SUCC(ret) && i < datum_row.get_column_count(); i++) { int64_t row_v = datum_row.storage_datums_[i].get_int(); if (i == 0) { EXPECT_EQ(k, row_v); } else { EXPECT_EQ(v, row_v); } TRANS_LOG(INFO, " TEST_MEMTABLE_V2 column: ", K(i), K(datum_row.storage_datums_[i])); } TRANS_LOG(INFO, "=============== VERIFY TRANS NODE END ===============", K(*tnode)); } void verify_wtx(ObStoreCtx *wtx, ObMemtable *wmt, ObMvccTransNode *prev, ObMvccTransNode *next, int64_t seq_no, uint32_t modify_count, int64_t k, int64_t v) { ObMvccRowCallback *cb = get_tx_last_cb(wtx); ObMvccTransNode *tnode = cb->tnode_; EXPECT_NE(NULL, (long)cb); EXPECT_NE(NULL, (long)tnode); TRANS_LOG(INFO, "=============== VERIFY TRANS CALLBACK ===============", K(*cb)); EXPECT_EQ(wmt, cb->memtable_); EXPECT_EQ(share::SCN::max_scn(), cb->scn_); EXPECT_EQ(true, cb->is_link_); EXPECT_EQ(true, cb->need_fill_redo_); EXPECT_EQ(true, cb->need_submit_log_); EXPECT_EQ(seq_no, cb->seq_no_); ObStoreRowkey *rowkey = cb->key_.rowkey_; ObObj *key = rowkey->get_obj_ptr(); int64_t row_k; key->get_int(row_k); EXPECT_EQ(k, row_k); TRANS_LOG(INFO, "=============== VERIFY TRANS CALLBACK ===============", K(*cb)); TRANS_LOG(INFO, "=============== VERIFY TRANS NODE START ===============", K(*tnode)); EXPECT_EQ(wtx->mvcc_acc_ctx_.tx_id_, tnode->tx_id_); EXPECT_EQ(share::SCN::max_scn(), tnode->trans_version_); EXPECT_EQ(share::SCN::max_scn(), tnode->scn_); EXPECT_EQ(seq_no, tnode->seq_no_); EXPECT_EQ(prev, tnode->prev_); EXPECT_EQ(next, tnode->next_); EXPECT_EQ(modify_count, tnode->modify_count_); EXPECT_EQ(0, tnode->acc_checksum_); EXPECT_EQ(wmt->get_timestamp(), tnode->version_); EXPECT_EQ(NDT_NORMAL, tnode->type_); EXPECT_EQ(ObMvccTransNode::F_INIT, tnode->flag_); int ret = OB_SUCCESS; const ObMemtableDataHeader *mtd = reinterpret_cast(tnode->buf_); ObArenaAllocator allocator; ObDatumRow datum_row; ObRowReader row_reader; const blocksstable::ObRowHeader *row_header = nullptr; if (OB_FAIL(row_reader.read_row_header(mtd->buf_, mtd->buf_len_, row_header))) { CLOG_LOG(WARN, "Failed to read row header", K(ret)); } else if (OB_FAIL(datum_row.init(allocator, row_header->get_column_count()))) { CLOG_LOG(WARN, "Failed to init datum row", K(ret)); } else if (OB_FAIL(row_reader.read_row(mtd->buf_, mtd->buf_len_, nullptr, datum_row))) { CLOG_LOG(WARN, "Failed to read datum row", K(ret)); } else { EXPECT_EQ(ObDmlFlag::DF_INSERT, mtd->dml_flag_); TRANS_LOG(INFO, "TEST_MEMTABLE_V2 row: ", K(*tnode), K(mtd)); } for (int64_t i = 0; OB_SUCC(ret) && i < datum_row.get_column_count(); i++) { int64_t row_v = datum_row.storage_datums_[i].get_int(); if (i == 0) { EXPECT_EQ(k, row_v); } else { EXPECT_EQ(v, row_v); } TRANS_LOG(INFO, " TEST_MEMTABLE_V2 column: ", K(i), K(datum_row.storage_datums_[i])); } TRANS_LOG(INFO, "=============== VERIFY TRANS NODE END ===============", K(*tnode)); } void verify_mvcc_row(ObMvccRow *row, const int8_t first_dml, const int8_t last_dml, const ObMvccTransNode *list_head, const int64_t max_trans_version, /*const int64_t max_elr_trans_version,*/ const int64_t total_trans_node_cnt, const uint8_t flag = ObMvccRow::F_HASH_INDEX | ObMvccRow::F_BTREE_INDEX) { TRANS_LOG(INFO, "=============== VERIFY MVCC ROW START ===============", K(*row)); EXPECT_EQ(flag, row->flag_); EXPECT_EQ(first_dml, row->first_dml_flag_); EXPECT_EQ(last_dml, row->last_dml_flag_); EXPECT_EQ(list_head, row->list_head_); EXPECT_EQ(total_trans_node_cnt, row->total_trans_node_cnt_); TRANS_LOG(INFO, "=============== VERIFY MVCC ROW END ===============", K(*row)); } private: static const int64_t READ_TX_ID = 987654321; static const int64_t UNUSED_VALUE = -1; static const int64_t REDO_BUFFER_SIZE = 2L * 1024L * 1024L; private: static ObLSTxCtxMgr ls_tx_ctx_mgr_; static ObLS ls_; const ObLSID ls_id_; const ObTabletID tablet_id_; const int64_t tenant_id_; const int64_t rowkey_cnt_; const int64_t value_cnt_; ObTableIterParam iter_param_; ObSEArray columns_; ObStorageDatum rowkey_datums_[2]; ObArenaAllocator allocator_; ObArenaAllocator allocator2_; ObTableReadInfo read_info_; ObVersionRange trans_version_range_; ObQueryFlag query_flag_; char redo_log_buffer_[REDO_BUFFER_SIZE]; }; ObLSTxCtxMgr TestMemtableV2::ls_tx_ctx_mgr_; ObTxTable TestMemtableV2::tx_table_; ObLS TestMemtableV2::ls_; bool TestMemtableV2::is_sstable_contains_lock_; TEST_F(TestMemtableV2, test_write_read_conflict) { ObMemtable *memtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: write row into memtable"); ObDatumRowkey rowkey; ObStoreRow write_row; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); ObTransID write_tx_id = ObTransID(1); ObStoreCtx *wtx = start_tx(write_tx_id); write_tx(wtx, memtable, 1000, /*snapshot version*/ write_row); const int64_t wtx_seq_no = ObSequence::get_max_seq_no(); verify_cb(get_tx_last_cb(wtx), memtable, wtx_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(wtx), NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_INIT, DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(wtx), ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, get_tx_last_tnode(wtx), 0, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); TRANS_LOG(INFO, "######## CASE2: read row(running during txn) from memtable with empty result"); read_row(memtable, rowkey, 1800, /*snapshot version*/ 1, /*key*/ 2, /*value*/ false /*exist*/); TRANS_LOG(INFO, "######## CASE3: read row(prepare during txn) from memtable with lock for read skipping"); prepare_txn(wtx, 1500/*prepare_version*/); read_row(memtable, rowkey, 1200, /*snapshot version*/ 1, /*key*/ 2, /*value*/ false /*exist*/); TRANS_LOG(INFO, "######## CASE4: read row(prepare during txn) from memtable with lock for read blocking"); read_row(memtable, rowkey, 1800, /*snapshot version*/ 1, /*key*/ 2, /*value*/ false, /*exist*/ OB_ERR_SHARED_LOCK_CONFLICT, 1000000 /*expire_time*/); TRANS_LOG(INFO, "######## CASE5: read row(commit during txn) from memtable with lock for read success"); commit_txn(wtx, 2000,/*commit_version*/ false/*need_write_back*/); read_row(memtable, rowkey, 3000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); verify_cb(get_tx_last_cb(wtx), memtable, wtx_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(wtx), NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, 2000, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(wtx), ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, get_tx_last_tnode(wtx), 2000, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); TRANS_LOG(INFO, "######## CASE6: read row(WRRITTEN during txn) from memtable with lock for read success"); ObMvccTransNode *tmp_node = get_tx_last_tnode(wtx); ObMvccRow *tmp_row = get_tx_last_mvcc_row(wtx); commit_txn(wtx, 2000,/*commit_version*/ true /*need_write_back*/); read_row(memtable, rowkey, 3000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); verify_tnode(tmp_node, NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, 2000, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(tmp_row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, tmp_node, 2000, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); memtable->destroy(); } TEST_F(TestMemtableV2, test_tx_abort) { ObMemtable *memtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: write row into memtable"); ObDatumRowkey rowkey; ObStoreRow write_row; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); ObTransID write_tx_id = ObTransID(1); ObStoreCtx *wtx = start_tx(write_tx_id); write_tx(wtx, memtable, 1000, /*snapshot version*/ write_row); const int64_t wtx_seq_no = ObSequence::get_max_seq_no(); verify_cb(get_tx_last_cb(wtx), memtable, wtx_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(wtx), NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(wtx), ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, get_tx_last_tnode(wtx), 0, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); TRANS_LOG(INFO, "######## CASE2: read row(abort during txn) from memtable with lock for read no data"); abort_txn(wtx, false/*need_write_back*/); read_row(memtable, rowkey, 3000, /*snapshot version*/ -1, /*key*/ -1, /*value*/ false /*exist*/); verify_cb(get_tx_last_cb(wtx), memtable, wtx_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(wtx), NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_ABORTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(wtx), ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, NULL, 0, /*max_trans_version*/ 0 /*total_trans_node_cnt*/); TRANS_LOG(INFO, "######## CASE3: read row(WRRITTEN during txn) from memtable with lock for read success"); ObMvccRow *tmp_row = get_tx_last_mvcc_row(wtx); ObMvccTransNode *tmp_node = get_tx_last_tnode(wtx); abort_txn(wtx, true /*need_write_back*/); read_row(memtable, rowkey, 3000, /*snapshot version*/ -1, /*key*/ -1, /*value*/ false /*exist*/); verify_tnode(tmp_node, NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_ABORTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(tmp_row, ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, NULL, /*list_head*/ 0, /*max_trans_version*/ 0 /*total_trans_node_cnt*/); memtable->destroy(); } TEST_F(TestMemtableV2, test_write_write_conflict) { ObMemtable *memtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: txn1 write row into memtable"); ObDatumRowkey rowkey; ObStoreRow write_row; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); ObTransID write_tx_id = ObTransID(1); ObStoreCtx *wtx = start_tx(write_tx_id); write_tx(wtx, memtable, 1000, /*snapshot version*/ write_row); verify_cb(get_tx_last_cb(wtx), memtable, ObSequence::get_max_seq_no(), 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(wtx), NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ ObSequence::get_max_seq_no(), 0, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(wtx), ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, get_tx_last_tnode(wtx), 0, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); TRANS_LOG(INFO, "######## CASE2: txn2 write row into memtable, lock for write failed"); ObDatumRowkey rowkey2; ObStoreRow write_row2; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 3, /*value*/ rowkey2, write_row2)); ObTransID write_tx_id2 = ObTransID(2); ObStoreCtx *wtx2 = start_tx(write_tx_id2); write_tx(wtx2, memtable, 1200, /*snapshot version*/ write_row2, OB_TRY_LOCK_ROW_CONFLICT); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); TRANS_LOG(INFO, "######## CASE3: txn1 write row into memtable, lock for write succeed"); ObDatumRowkey rowkey3; ObStoreRow write_row3; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 4, /*value*/ rowkey3, write_row3)); ObMvccTransNode *tmp_node = get_tx_last_tnode(wtx); write_tx(wtx, memtable, 1200, /*snapshot version*/ write_row3); ObMvccTransNode *wtx_case3_tnode = get_tx_last_tnode(wtx); verify_cb(get_tx_last_cb(wtx), memtable, ObSequence::get_max_seq_no(), 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(wtx), tmp_node, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ ObSequence::get_max_seq_no(), 1, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_INSERT, 1, /*key*/ 4 /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(wtx), ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, get_tx_last_tnode(wtx), 0, /*max_trans_version*/ 2 /*total_trans_node_cnt*/); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 4 /*value*/); TRANS_LOG(INFO, "######## CASE4: txn2 write row(prepare during txn) into memtable, lock for write failed"); prepare_txn(wtx, 1500/*prepare_version*/); write_tx(wtx2, memtable, 1200, /*snapshot version*/ write_row2, OB_TRY_LOCK_ROW_CONFLICT); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 4 /*value*/); TRANS_LOG(INFO, "######## CASE5: txn2 write row(commit during txn) into memtable, lock for write encounters tsc"); commit_txn(wtx, 2000,/*commit_version*/ false/*need_write_back*/); write_tx(wtx2, memtable, 1800, /*snapshot version*/ write_row2, OB_TRANSACTION_SET_VIOLATION); read_row(wtx, memtable, rowkey, 3000, /*snapshot version*/ 1, /*key*/ 4 /*value*/); TRANS_LOG(INFO, "######## CASE6: txn2 write row(commit during txn) into memtable, lock for write succeed"); write_tx(wtx2, memtable, 2100, /*snapshot version*/ write_row2); const int64_t wtx2_seq_no = ObSequence::get_max_seq_no(); verify_cb(get_tx_last_cb(wtx2), memtable, wtx2_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(wtx2), get_tx_last_cb(wtx)->tnode_, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx2->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx2_seq_no, 2, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_INSERT, 1, /*key*/ 3 /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(wtx2), ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, get_tx_last_tnode(wtx2), 2000, /*max_trans_version*/ 3 /*total_trans_node_cnt*/); read_row(wtx2, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 3 /*value*/); ObMvccTransNode *wtx_last_tnode = get_tx_last_tnode(wtx); commit_txn(wtx, 2000,/*commit_version*/ true/*need_write_back*/); TRANS_LOG(INFO, "######## CASE7: txn2 abort, undo mvcc row"); abort_txn(wtx2, false/*need_write_back*/); read_row(memtable, rowkey, 3000, /*snapshot version*/ 1, /*key*/ 4 /*value*/); verify_tnode(get_tx_last_tnode(wtx2), wtx_last_tnode, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx2->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx2_seq_no, 2, /*modify_count*/ ObMvccTransNode::F_ABORTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_INSERT, 1, /*key*/ 3 /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(wtx2), ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, wtx_case3_tnode, /*list_head*/ 2000, /*max_trans_version*/ 2 /*total_trans_node_cnt*/); memtable->destroy(); } TEST_F(TestMemtableV2, test_lock) { ObMemtable *memtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: txn1 lock row in memtable"); ObDatumRowkey rowkey; ObStoreRow tmp_row; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, tmp_row)); ObTransID lock_tx_id = ObTransID(1); ObStoreCtx *ltx = start_tx(lock_tx_id); lock_tx(ltx, memtable, 1000, /*snapshot version*/ rowkey); const int64_t wtx_seq_no = ObSequence::get_max_seq_no(); verify_cb(get_tx_last_cb(ltx), memtable, wtx_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(ltx), NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, ltx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_LOCK, 1, /*key*/ UNUSED_VALUE /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(ltx), ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, get_tx_last_tnode(ltx), 0, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); read_row(ltx, memtable, rowkey, 1200, /*snapshot version*/ 1, /*key*/ UNUSED_VALUE, /*value*/ false /*exist*/); TRANS_LOG(INFO, "######## CASE2: other txn read row in memtable with no data"); read_row(memtable, rowkey, 1200, /*snapshot version*/ -1, /*key*/ -1, /*value*/ false /*exist*/); TRANS_LOG(INFO, "######## CASE3: txn2 write row in memtable with lock for write failed"); ObDatumRowkey rowkey2; ObStoreRow write_row2; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 3, /*value*/ rowkey2, write_row2)); ObTransID write_tx_id2 = ObTransID(2); ObStoreCtx *wtx2 = start_tx(write_tx_id2); write_tx(wtx2, memtable, 1200, /*snapshot version*/ write_row2, OB_TRY_LOCK_ROW_CONFLICT); TRANS_LOG(INFO, "######## CASE4: txn2 lock row in memtable with lock for write failed"); lock_tx(wtx2, memtable, 1200, /*snapshot version*/ rowkey, OB_TRY_LOCK_ROW_CONFLICT); TRANS_LOG(INFO, "######## CASE5: txn1 lock row in memtable with no new lock"); lock_tx(ltx, memtable, 1200, /*snapshot version*/ rowkey); verify_cb(get_tx_last_cb(ltx), memtable, wtx_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(ltx), NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, ltx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_LOCK, 1, /*key*/ UNUSED_VALUE /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(ltx), ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, get_tx_last_tnode(ltx), 0, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); read_row(ltx, memtable, rowkey, 1200, /*snapshot version*/ 1, /*key*/ UNUSED_VALUE, /*value*/ false /*exist*/); TRANS_LOG(INFO, "######## CASE6: txn1 commit, and txn2 lock row in memtable succeed"); commit_txn(ltx, 2000,/*commit_version*/ false/*need_write_back*/); lock_tx(wtx2, memtable, 2500, /*snapshot version*/ rowkey); const int64_t wtx2_seq_no = ObSequence::get_max_seq_no(); verify_cb(get_tx_last_cb(wtx2), memtable, wtx2_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(ltx), NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, ltx->mvcc_acc_ctx_.tx_id_, 2000, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_LOCK, 1, /*key*/ UNUSED_VALUE /*value*/); verify_tnode(get_tx_last_tnode(wtx2), NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx2->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx2_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_LOCK, 1, /*key*/ UNUSED_VALUE /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(wtx2), ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, get_tx_last_tnode(wtx2), 2000, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); read_row(wtx2, memtable, rowkey, 3000, /*snapshot version*/ 1, /*key*/ UNUSED_VALUE, /*value*/ false /*exist*/); read_row(memtable, rowkey, 3000, /*snapshot version*/ 1, /*key*/ UNUSED_VALUE, /*value*/ false /*exist*/); TRANS_LOG(INFO, "######## CASE7: txn2 abort, and txn3 lock row in memtable succeed"); abort_txn(wtx2, false/*need_write_back*/); ObTransID write_tx_id3 = ObTransID(3); ObStoreCtx *wtx3 = start_tx(write_tx_id3); ObDatumRowkey rowkey3; ObStoreRow write_row3; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 4, /*value*/ rowkey3, write_row3)); write_tx(wtx3, memtable, 4500, /*snapshot version*/ write_row3); const int64_t wtx3_seq_no = ObSequence::get_max_seq_no(); verify_cb(get_tx_last_cb(wtx3), memtable, wtx3_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(wtx2), NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx2->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx2_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_ABORTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_LOCK, 1, /*key*/ UNUSED_VALUE /*value*/); verify_tnode(get_tx_last_tnode(wtx3), NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx3->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx3_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_INSERT, 1, /*key*/ 4 /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(wtx3), ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, get_tx_last_tnode(wtx3), 2000, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); read_row(wtx3, memtable, rowkey, 4500, /*snapshot version*/ 1, /*key*/ 4 /*value*/); read_row(memtable, rowkey, 3000, /*snapshot version*/ 1, /*key*/ UNUSED_VALUE, /*value*/ false /*exist*/); ObMvccRow *wtx3_last_row = get_tx_last_mvcc_row(wtx3); ObMvccTransNode *wtx3_last_tnode = get_tx_last_tnode(wtx3); commit_txn(wtx3, 5000,/*commit_version*/ true/*need_write_back*/); verify_mvcc_row(wtx3_last_row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, wtx3_last_tnode, 5000, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); read_row(memtable, rowkey, 6000, /*snapshot version*/ 1, /*key*/ 4 /*value*/); memtable->destroy(); } TEST_F(TestMemtableV2, test_sstable_lock) { ObMemtable *memtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: write row into memtable failed because of sstable lock"); is_sstable_contains_lock_ = true; ObDatumRowkey rowkey; ObStoreRow write_row; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); ObTransID write_tx_id = ObTransID(1); ObStoreCtx *wtx = start_tx(write_tx_id); write_tx(wtx, memtable, 1000, /*snapshot version*/ write_row, OB_TRY_LOCK_ROW_CONFLICT); EXPECT_EQ(true, is_write_set_empty(wtx)); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 0, /*value*/ false /*exist*/); is_sstable_contains_lock_ = false; memtable->destroy(); } TEST_F(TestMemtableV2, test_rollback_to) { ObMemtable *memtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: write row into memtable"); ObDatumRowkey rowkey; ObStoreRow write_row; ObStoreRow write_row2; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 3, /*value*/ rowkey, write_row2)); ObTransID write_tx_id = ObTransID(1); ObStoreCtx *wtx = start_tx(write_tx_id); write_tx(wtx, memtable, 1000, /*snapshot version*/ write_row); const int64_t wtx_seq_no1 = ObSequence::get_max_seq_no(); write_tx(wtx, memtable, 1000, /*snapshot version*/ write_row2); const int64_t wtx_seq_no2 = ObSequence::get_max_seq_no(); print_callback(wtx); verify_cb(get_tx_last_cb(wtx), memtable, wtx_seq_no2, 1, /*key*/ true /*is_link*/); verify_tnode(get_tx_last_tnode(wtx), get_tx_first_tnode(wtx), /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no2, 1, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_INSERT, 1, /*key*/ 3 /*value*/); verify_mvcc_row(get_tx_last_mvcc_row(wtx), ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, get_tx_last_tnode(wtx), 0, /*max_trans_version*/ 2 /*total_trans_node_cnt*/); read_row(wtx, memtable, rowkey, 2000, /*snapshot version*/ 1, /*key*/ 3 /*value*/); TRANS_LOG(INFO, "######## CASE2: rollback the last tnode, and write write conflict"); rollback_to_txn(wtx, wtx_seq_no2, /*from*/ wtx_seq_no1 + 1 /*to*/); ObDatumRowkey rowkey3; ObStoreRow write_row3; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 4, /*value*/ rowkey3, write_row3)); ObTransID write_tx_id2 = ObTransID(2); ObStoreCtx *wtx2 = start_tx(write_tx_id2); write_tx(wtx2, memtable, 3000, /*snapshot version*/ write_row3, OB_TRY_LOCK_ROW_CONFLICT); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); memtable->destroy(); } TEST_F(TestMemtableV2, test_replay) { ObMemtable *lmemtable = create_memtable(); ObMemtable *fmemtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: txn1 and txn3 write row in lmemtable"); ObDatumRowkey rowkey; ObStoreRow write_row; ObDatumRowkey rowkey2; ObStoreRow write_row2; ObDatumRowkey rowkey3; ObStoreRow write_row3; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 3, /*value*/ rowkey2, write_row2)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 4, /*value*/ rowkey3, write_row3)); ObTransID write_tx_id3 = ObTransID(3); ObStoreCtx *wtx3 = start_tx(write_tx_id3); write_tx(wtx3, lmemtable, 500, /*snapshot version*/ write_row3); const int64_t wtx3_seq_no1 = ObSequence::get_max_seq_no(); commit_txn(wtx3, 800,/*commit_version*/ false/*need_write_back*/); ObTransID write_tx_id = ObTransID(1); ObStoreCtx *wtx = start_tx(write_tx_id); write_tx(wtx, lmemtable, 1000, /*snapshot version*/ write_row); const int64_t wtx_seq_no1 = ObSequence::get_max_seq_no(); write_tx(wtx, lmemtable, 1200, /*snapshot version*/ write_row2); const int64_t wtx_seq_no2 = ObSequence::get_max_seq_no(); ObMemtableMutatorIterator mmi; mock_replay_iterator(wtx, mmi); commit_txn(wtx, 2000,/*commit_version*/ false/*need_write_back*/); TRANS_LOG(INFO, "######## CASE2: txn2 replay row in fmemtable"); ObTransID replay_tx_id = ObTransID(2); ObStoreCtx *ptx = start_tx(replay_tx_id, true); replay_tx(ptx, fmemtable, 1300, /*replay_scn*/ mmi); read_row(ptx, fmemtable, rowkey, 1500, /*snapshot version*/ 1, /*key*/ 3 /*value*/); ObMvccRowCallback *first_cb = (ObMvccRowCallback *)(get_tx_last_cb(ptx)->prev_); verify_cb(get_tx_last_cb(ptx), fmemtable, wtx_seq_no2, 1, /*key*/ true, /*is_link*/ false,/*need_fill_redo*/ 1300 /*scn*/); verify_cb(first_cb, fmemtable, wtx_seq_no1, 1, /*key*/ true, /*is_link*/ false,/*need_fill_redo*/ 1300 /*scn*/); verify_tnode(get_tx_last_tnode(ptx), first_cb->tnode_, /*prev tnode*/ NULL, /*next tnode*/ lmemtable, ptx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no2, 2, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_INSERT, 1, /*key*/ 3, /*value*/ 1300 /*scn*/); verify_tnode(first_cb->tnode_, NULL, /*prev tnode*/ get_tx_last_tnode(ptx), /*next tnode*/ lmemtable, ptx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no1, 1, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_INSERT, 1, /*key*/ 2, /*value*/ 1300 /*scn*/); verify_mvcc_row(get_tx_last_mvcc_row(ptx), ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, get_tx_last_tnode(ptx), 0, /*max_trans_version*/ 2 /*total_trans_node_cnt*/); TRANS_LOG(INFO, "######## CASE3: txn4 replay row in fmemtable in reverse order"); ObMemtableMutatorIterator mmi3; mock_replay_iterator(wtx3, mmi3); ObTransID replay_tx_id4 = ObTransID(4); ObStoreCtx *ptx4 = start_tx(replay_tx_id4, true/*for_replay*/); replay_tx(ptx4, fmemtable, 800, /*replay_scn*/ mmi3); read_row(ptx, fmemtable, rowkey, 1500, /*snapshot version*/ 1, /*key*/ 3 /*value*/); commit_txn(ptx, 2000,/*commit_version*/ false/*need_write_back*/); commit_txn(ptx4, 800,/*commit_version*/ false/*need_write_back*/); read_row(fmemtable, rowkey, 3000, /*snapshot version*/ 1, /*key*/ 3 /*value*/); read_row(fmemtable, rowkey, 900, /*snapshot version*/ 1, /*key*/ 4 /*value*/); verify_cb(get_tx_last_cb(ptx4), fmemtable, wtx3_seq_no1, 1, /*key*/ true, /*is_link*/ false,/*need_fill_redo*/ 800 /*scn*/); verify_tnode(get_tx_last_tnode(ptx4), NULL, /*prev tnode*/ get_tx_first_tnode(ptx), /*next tnode*/ lmemtable, ptx4->mvcc_acc_ctx_.tx_id_, 800, /*trans_version*/ wtx3_seq_no1, 0, /*modify_count*/ ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_INSERT, 1, /*key*/ 4, /*value*/ 800 /*scn*/); verify_mvcc_row(get_tx_last_mvcc_row(ptx4), ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, get_tx_last_tnode(ptx), 2000, /*max_trans_version*/ 3 /*total_trans_node_cnt*/); lmemtable->destroy(); fmemtable->destroy(); } TEST_F(TestMemtableV2, test_compact) { ObMemtable *lmemtable = create_memtable(); ObMemtable *fmemtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: txn1 and txn3 write row in lmemtable"); ObDatumRowkey rowkey; ObStoreRow write_row; ObDatumRowkey rowkey2; ObStoreRow write_row2; ObDatumRowkey rowkey3; ObStoreRow write_row3; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 3, /*value*/ rowkey2, write_row2)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 4, /*value*/ rowkey3, write_row3)); ObTransID write_tx_id = ObTransID(1); ObStoreCtx *wtx = start_tx(write_tx_id); write_tx(wtx, lmemtable, 1000, /*snapshot version*/ write_row); const int64_t wtx_seq_no1 = ObSequence::get_max_seq_no(); write_tx(wtx, lmemtable, 1200, /*snapshot version*/ write_row2); const int64_t wtx_seq_no2 = ObSequence::get_max_seq_no(); ObMemtableMutatorIterator mmi; mock_replay_iterator(wtx, mmi); print_callback(wtx); commit_txn(wtx, 2000,/*commit_version*/ false/*need_write_back*/); ObTransID write_tx_id3 = ObTransID(3); ObStoreCtx *wtx3 = start_tx(write_tx_id3); write_tx(wtx3, lmemtable, 2500, /*snapshot version*/ write_row3); const int64_t wtx3_seq_no1 = ObSequence::get_max_seq_no(); commit_txn(wtx3, 3000,/*commit_version*/ false/*need_write_back*/); ObMemtableMutatorIterator mmi3; mock_replay_iterator(wtx3, mmi3); TRANS_LOG(INFO, "######## CASE2: txn2 replay row in fmemtable"); ObTransID replay_tx_id = ObTransID(2); ObStoreCtx *ptx = start_tx(replay_tx_id, true/*for_replay*/); replay_tx(ptx, fmemtable, 2000, /*replay_scn*/ mmi); read_row(ptx, fmemtable, rowkey, 2400, /*snnapshot version*/ 1, /*keny*/ 3 /*value*/); ObMvccRow *row = get_tx_last_mvcc_row(ptx); ObMvccTransNode *ptx_first_tnode = get_tx_first_tnode(ptx); ObMvccTransNode *ptx_last_tnode = get_tx_last_tnode(ptx); commit_txn(ptx, 2000,/*commit_version*/ true/*need_write_back*/); compact_row(row, fmemtable, 2400, true/*for_replay*/); verify_tnode(ptx_last_tnode, ptx_first_tnode, /*prev tnode*/ row->latest_compact_node_, /*next tnode*/ lmemtable, ptx->mvcc_acc_ctx_.tx_id_, 2000, /*trans_version*/ wtx_seq_no2, 1, /*modify_count*/ ObMvccTransNode::F_COMMITTED, ObDmlFlag::DF_INSERT, 1, /*key*/ 3, /*value*/ 2000 /*scn*/); verify_tnode(row->latest_compact_node_, ptx_last_tnode,/*prev tnode*/ NULL, /*next tnode*/ lmemtable, ptx->mvcc_acc_ctx_.tx_id_, 2000, /*trans_version*/ wtx_seq_no2, 1, /*modify_count*/ ObMvccTransNode::F_COMMITTED, ObDmlFlag::DF_INSERT, 1, /*key*/ 3, /*value*/ 2000, /*scn*/ NDT_COMPACT); verify_mvcc_row(row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, row->latest_compact_node_, /*list_head*/ 2000, /*max_trans_version*/ 2 /*total_trans_node_cnt*/); TRANS_LOG(INFO, "######## CASE2: txn4 replay uncommitted row in lmemtable and compacted"); ObTransID replay_tx_id4 = ObTransID(4); ObStoreCtx *ptx4 = start_tx(replay_tx_id4, true/*for_replay*/); replay_tx(ptx4, fmemtable, 3000, /*replay_scn*/ mmi3); read_row(fmemtable, rowkey, 2900, /*snapshot version*/ 1, /*key*/ 3 /*value*/); compact_row(row, fmemtable, 2500, true/*for_replay*/); verify_cb(get_tx_last_cb(ptx4), fmemtable, wtx3_seq_no1, 1, /*key*/ true, /*is_link*/ false,/*need_fill_redo*/ 3000 /*scn*/); verify_tnode(get_tx_last_tnode(ptx4), row->latest_compact_node_, /*prev tnode*/ NULL, /*next tnode*/ lmemtable, ptx4->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx3_seq_no1, 2, /*modify_count*/ ObMvccTransNode::F_INIT, ObDmlFlag::DF_INSERT, 1, /*key*/ 4, /*value*/ 3000 /*scn*/); verify_tnode(row->latest_compact_node_, ptx_last_tnode, /*prev tnode*/ get_tx_last_tnode(ptx4), /*next tnode*/ lmemtable, ptx->mvcc_acc_ctx_.tx_id_, 2000, /*trans_version*/ wtx_seq_no2, 1, /*modify_count*/ ObMvccTransNode::F_COMMITTED, ObDmlFlag::DF_INSERT, 1, /*key*/ 3, /*value*/ 2000, /*scn*/ NDT_COMPACT); verify_mvcc_row(get_tx_last_mvcc_row(ptx4), ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, get_tx_last_tnode(ptx4), 2000, /*max_trans_version*/ 3 /*total_trans_node_cnt*/); TRANS_LOG(INFO, "######## CASE3: commit txn4 in lmemtable and compacted"); ObMvccTransNode *prev_compact_node = row->latest_compact_node_; ObMvccRowCallback *ptx4_cb = get_tx_last_cb(ptx4); ObMvccTransNode *ptx4_tnode = get_tx_last_tnode(ptx4); commit_txn(ptx4, 3000,/*commit_version*/ true/*need_write_back*/); read_row(fmemtable, rowkey, 3500, /*snapshot version*/ 1, /*key*/ 4 /*value*/); compact_row(row, fmemtable, 4000, true/*for_replay*/); verify_tnode(ptx4_tnode, prev_compact_node, /*prev tnode*/ row->latest_compact_node_, /*next tnode*/ lmemtable, ptx4->mvcc_acc_ctx_.tx_id_, 3000, /*trans_version*/ wtx3_seq_no1, 2, /*modify_count*/ ObMvccTransNode::F_COMMITTED, ObDmlFlag::DF_INSERT, 1, /*key*/ 4, /*value*/ 3000 /*scn*/); verify_tnode(row->latest_compact_node_, ptx4_tnode, /*prev tnode*/ NULL, /*next tnode*/ lmemtable, ptx4->mvcc_acc_ctx_.tx_id_, 3000, /*trans_version*/ wtx3_seq_no1, 2, /*modify_count*/ ObMvccTransNode::F_COMMITTED, ObDmlFlag::DF_INSERT, 1, /*key*/ 4, /*value*/ 3000, /*scn*/ NDT_COMPACT); verify_mvcc_row(row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, row->latest_compact_node_, 3000, /*max_trans_version*/ 3 /*total_trans_node_cnt*/); lmemtable->destroy(); fmemtable->destroy(); } TEST_F(TestMemtableV2, test_compact_v2) { ObMemtable *memtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: txn1 write two rows and txn3 write a row in memtable"); ObDatumRowkey rowkey; ObStoreRow write_row; ObDatumRowkey rowkey2; ObStoreRow write_row2; ObDatumRowkey rowkey3; ObStoreRow write_row3; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 3, /*value*/ rowkey2, write_row2)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 4, /*value*/ rowkey3, write_row3)); ObTransID write_tx_id = ObTransID(1); ObStoreCtx *wtx = start_tx(write_tx_id); lock_tx(wtx, memtable, 1000, /*snapshot version*/ rowkey); const int64_t wtx_seq_no1 = ObSequence::get_max_seq_no(); ObMvccTransNode *wtx_first_tnode = get_tx_last_tnode(wtx); ObMvccRow *row = get_tx_last_mvcc_row(wtx); write_tx(wtx, memtable, 1200, /*snapshot version*/ write_row); const int64_t wtx_seq_no2 = ObSequence::get_max_seq_no(); ObMvccTransNode *wtx_second_tnode = get_tx_last_tnode(wtx); write_tx(wtx, memtable, 1300, /*snapshot version*/ write_row2); const int64_t wtx_seq_no3 = ObSequence::get_max_seq_no(); ObMvccTransNode *wtx_third_tnode = get_tx_last_tnode(wtx); print_callback(wtx); commit_txn(wtx, 2000,/*commit_version*/ false/*need_write_back*/); ObTransID write_tx_id3 = ObTransID(3); ObStoreCtx *wtx3 = start_tx(write_tx_id3); write_tx(wtx3, memtable, 2500, /*snapshot version*/ write_row3); const int64_t wtx3_seq_no1 = ObSequence::get_max_seq_no(); ObMvccTransNode *wtx3_first_tnode = get_tx_last_tnode(wtx3); commit_txn(wtx3, 3000,/*commit_version*/ false/*need_write_back*/); verify_tnode(wtx_first_tnode, NULL, /*prev tnode*/ wtx_second_tnode, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no1, 0, /*modify_count*/ ObMvccTransNode::F_INIT | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_LOCK, 1, /*key*/ UNUSED_VALUE /*value*/); verify_tnode(wtx_second_tnode, wtx_first_tnode, /*prev tnode*/ wtx_third_tnode, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no2, 1, /*modify_count*/ ObMvccTransNode::F_INIT | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, wtx3_first_tnode, 2000, /*max_trans_version*/ 4 /*total_trans_node_cnt*/); compact_row(row, memtable, 2400, false/*for_replay*/); verify_tnode(wtx_first_tnode, NULL, /*prev tnode*/ wtx_second_tnode, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, 2000, /*trans_version*/ wtx_seq_no1, 0, /*modify_count*/ ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_LOCK, 1, /*key*/ UNUSED_VALUE /*value*/); verify_tnode(wtx_second_tnode, NULL, /*prev tnode*/ wtx_third_tnode, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, 2000, /*trans_version*/ wtx_seq_no2, 1, /*modify_count*/ ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_INSERT, 1, /*key*/ 2 /*value*/); verify_tnode(wtx_third_tnode, wtx_second_tnode, /*prev tnode*/ row->latest_compact_node_, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, 2000, /*trans_version*/ wtx_seq_no3, 2, /*modify_count*/ ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_INSERT, 1, /*key*/ 3 /*value*/); verify_tnode(row->latest_compact_node_, wtx_third_tnode, /*prev tnode*/ wtx3_first_tnode, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, 2000, /*trans_version*/ wtx_seq_no3, 2, /*modify_count*/ ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT, ObDmlFlag::DF_INSERT, 1, /*key*/ 3, /*value*/ INT64_MAX, NDT_COMPACT); verify_mvcc_row(row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, wtx3_first_tnode, 2000, /*max_trans_version*/ 3 /*total_trans_node_cnt*/); memtable->destroy(); } TEST_F(TestMemtableV2, test_compact_v3) { ObMemtable *lmemtable = create_memtable(); ObMemtable *fmemtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: txn1 write two row and txn2 write row in lmemtable"); ObDatumRowkey rowkey; ObStoreRow write_row; ObDatumRowkey rowkey2; ObStoreRow write_row2; ObDatumRowkey rowkey3; ObStoreRow write_row3; ObDatumRowkey rowkey4; ObStoreRow write_row4; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 3, /*value*/ rowkey2, write_row2)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 4, /*value*/ rowkey3, write_row3)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 5, /*value*/ rowkey4, write_row4)); ObTransID write_tx_id = ObTransID(1); ObStoreCtx *wtx = start_tx(write_tx_id); write_tx(wtx, lmemtable, 1000, /*snapshot version*/ write_row); write_tx(wtx, lmemtable, 1200, /*snapshot version*/ write_row2); ObMemtableMutatorIterator mmi; mock_replay_iterator(wtx, mmi); abort_txn(wtx, false/*need_write_back*/); ObTransID write_tx_id2 = ObTransID(2); ObStoreCtx *wtx2 = start_tx(write_tx_id2); lock_tx(wtx2, lmemtable, 2200, /*snapshot version*/ rowkey3); print_callback(wtx2); ObMemtableMutatorIterator mmi2; mock_replay_iterator(wtx2, mmi2); commit_txn(wtx2, 3000,/*commit_version*/ false/*need_write_back*/); ObTransID write_tx_id5 = ObTransID(5); ObStoreCtx *wtx5 = start_tx(write_tx_id5); lock_tx(wtx5, lmemtable, 3200, /*snapshot version*/ rowkey4); ObMemtableMutatorIterator mmi3; mock_replay_iterator(wtx5, mmi3); TRANS_LOG(INFO, "######## CASE2: txn3, txn4 replay rows in fmemtable in reverse order"); ObTransID replay_tx_id3 = ObTransID(3); ObStoreCtx *ptx3 = start_tx(replay_tx_id3, true/*for_replay*/); ObTransID replay_tx_id4 = ObTransID(4); ObStoreCtx *ptx4 = start_tx(replay_tx_id4, true/*for_replay*/); ObTransID replay_tx_id6 = ObTransID(6); ObStoreCtx *ptx6 = start_tx(replay_tx_id6, true/*for_replay*/); replay_tx(ptx6, fmemtable, 3500, /*replay_scn*/ mmi3); replay_tx(ptx3, fmemtable, 3000, /*replay_scn*/ mmi2); replay_tx(ptx4, fmemtable, 2000, /*replay_scn*/ mmi); ObMvccTransNode *node = get_tx_last_tnode(ptx3); ((ObMemtableDataHeader *)(node->buf_))->dml_flag_ = blocksstable::ObDmlFlag::DF_INSERT; ObMvccTransNode *node2 = get_tx_last_tnode(ptx6); ((ObMemtableDataHeader *)(node2->buf_))->dml_flag_ = blocksstable::ObDmlFlag::DF_INSERT; ObMvccRow *row = get_tx_last_mvcc_row(ptx3); row->print_row(); commit_txn(ptx3, 3000,/*commit_version*/ true/*need_write_back*/); commit_txn(ptx6, 3600,/*commit_version*/ true/*need_write_back*/); abort_txn(ptx4, false/*need_write_back*/); ((ObMemtableDataHeader *)(node->buf_))->dml_flag_ = blocksstable::ObDmlFlag::DF_LOCK; ((ObMemtableDataHeader *)(node2->buf_))->dml_flag_ = blocksstable::ObDmlFlag::DF_LOCK; row->print_row(); compact_row(row, fmemtable, 4000, true/*for_replay*/); row->print_row(); ObMvccTransNode *compact_node = row->latest_compact_node_; EXPECT_EQ(NULL, compact_node); verify_mvcc_row(row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, node2, 3600, /*max_trans_version*/ 2 /*total_trans_node_cnt*/); read_row(fmemtable, rowkey, 4500, /*snapshot version*/ 1, /*key*/ 4, /*value*/ false /*exist*/); lmemtable->destroy(); fmemtable->destroy(); } TEST_F(TestMemtableV2, test_dml_flag) { ObMemtable *lmemtable = create_memtable(); ObMemtable *fmemtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: txns write and row in lmemtable, test its dml flag"); ObDatumRowkey rowkey; ObStoreRow write_row1; ObDatumRowkey rowkey2; ObStoreRow write_row2; ObDatumRowkey rowkey3; ObStoreRow write_row3; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row1)); EXPECT_EQ(OB_SUCCESS, mock_delete(1, /*key*/ rowkey2, write_row2)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 4, /*value*/ rowkey3, write_row3)); ObTransID write_tx_id1 = ObTransID(1); ObStoreCtx *wtx1 = start_tx(write_tx_id1); write_tx(wtx1, lmemtable, 400, /*snapshot version*/ write_row1); const int64_t wtx1_seq_no1 = ObSequence::get_max_seq_no(); ObMvccRow *wtx1_row = get_tx_last_mvcc_row(wtx1); ObMvccTransNode *wtx1_tnode1 = get_tx_last_tnode(wtx1); verify_mvcc_row(wtx1_row, ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, wtx1_tnode1, 0, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); ObMemtableMutatorIterator mmi1; mock_replay_iterator(wtx1, mmi1); commit_txn(wtx1, 800,/*commit_version*/ true/*need_write_back*/); verify_mvcc_row(wtx1_row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, wtx1_tnode1, 800, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); ObTransID write_tx_id2 = ObTransID(2); ObStoreCtx *wtx2 = start_tx(write_tx_id2); lock_tx(wtx2, lmemtable, 1000, /*snapshot version*/ rowkey); const int64_t wtx2_seq_no1 = ObSequence::get_max_seq_no(); ObMemtableMutatorIterator mmi2; mock_replay_iterator(wtx2, mmi2); commit_txn(wtx2, 1200,/*commit_version*/ true/*need_write_back*/); verify_mvcc_row(wtx1_row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, wtx1_tnode1, 1200, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); ObTransID write_tx_id3 = ObTransID(3); ObStoreCtx *wtx3 = start_tx(write_tx_id3); write_tx(wtx3, lmemtable, 1400, /*snapshot version*/ write_row2); const int64_t wtx3_seq_no1 = ObSequence::get_max_seq_no(); ObMvccTransNode *wtx3_tnode1 = get_tx_last_tnode(wtx3); ObMemtableMutatorIterator mmi3; mock_replay_iterator(wtx3, mmi3); commit_txn(wtx3, 1600,/*commit_version*/ true/*need_write_back*/); verify_mvcc_row(wtx1_row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_DELETE, wtx3_tnode1, 1600, /*max_trans_version*/ 2 /*total_trans_node_cnt*/); TRANS_LOG(INFO, "######## CASE2: txns replay row in fmemtable and test dml flag"); ObTransID replay_tx_id1 = ObTransID(4); ObStoreCtx *ptx1 = start_tx(replay_tx_id1, true); ObTransID replay_tx_id2 = ObTransID(5); ObStoreCtx *ptx2 = start_tx(replay_tx_id2, true); ObTransID replay_tx_id3 = ObTransID(6); ObStoreCtx *ptx3 = start_tx(replay_tx_id3, true); replay_tx(ptx2, fmemtable, 1200, /*replay_scn*/ mmi2); ObMvccRow *ptx2_row = get_tx_last_mvcc_row(ptx2); ObMvccTransNode *ptx2_tnode1 = get_tx_last_tnode(ptx2); commit_txn(ptx2, 1200,/*commit_version*/ true/*need_write_back*/); verify_mvcc_row(ptx2_row, ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, NULL, 1200, /*max_trans_version*/ 0 /*total_trans_node_cnt*/); replay_tx(ptx3, fmemtable, 1600, /*replay_scn*/ mmi3); ObMvccRow *ptx3_row = get_tx_last_mvcc_row(ptx3); ObMvccTransNode *ptx3_tnode1 = get_tx_last_tnode(ptx3); commit_txn(ptx3, 1600,/*commit_version*/ true/*need_write_back*/); verify_mvcc_row(ptx3_row, ObDmlFlag::DF_DELETE, ObDmlFlag::DF_DELETE, ptx3_tnode1, 1600, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); replay_tx(ptx1, fmemtable, 800, /*replay_scn*/ mmi1); ObMvccRow *ptx1_row = get_tx_last_mvcc_row(ptx1); ObMvccTransNode *ptx1_tnode1 = get_tx_last_tnode(ptx1); commit_txn(ptx1, 800,/*commit_version*/ true/*need_write_back*/); verify_mvcc_row(ptx1_row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_DELETE, ptx3_tnode1, 1600, /*max_trans_version*/ 2 /*total_trans_node_cnt*/); lmemtable->destroy(); fmemtable->destroy(); } TEST_F(TestMemtableV2, test_fast_commit) { ObMemtable *memtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: write row into memtable and fast commit, then check result is ok"); ObDatumRowkey rowkey; ObStoreRow write_row; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); ObTransID write_tx_id = ObTransID(1); ObTransID write_tx_id2 = ObTransID(2); ObStoreCtx *wtx = start_tx(write_tx_id); ObStoreCtx *wtx2 = start_tx(write_tx_id2); write_tx(wtx, memtable, 1000, /*snapshot version*/ write_row); const int64_t wtx_seq_no = ObSequence::get_max_seq_no(); ObMvccRowCallback *wtx_cb = (ObMvccRowCallback *)(get_tx_last_cb(wtx)); ObMvccRow *wtx_row = get_tx_last_mvcc_row(wtx); ObMvccTransNode *wtx_tnode = get_tx_last_tnode(wtx); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); fast_commit_txn(wtx); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); read_row(wtx2, memtable, rowkey, 1100, /*snapshot version*/ 1, /*key*/ 2, /*value*/ false /*exist*/); prepare_txn(wtx, 1200 /*prepare_version*/); read_row(memtable, rowkey, 1800, /*snapshot version*/ 1, /*key*/ 2, /*value*/ false, /*exist*/ OB_ERR_SHARED_LOCK_CONFLICT, 1000000 /*expire_time*/); verify_cb(wtx_cb, memtable, wtx_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(wtx_tnode, NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_DELAYED_CLEANOUT, DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(wtx_row, ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, wtx_tnode, 0, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); commit_txn(wtx, 2000 /*commit_version*/); read_row(wtx, memtable, rowkey, 3000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); verify_cb(wtx_cb, memtable, wtx_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(wtx_tnode, NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, 2000, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT, DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(wtx_row, ObDmlFlag::DF_INSERT, ObDmlFlag::DF_INSERT, wtx_tnode, 2000, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); memtable->destroy(); } TEST_F(TestMemtableV2, test_fast_commit_with_no_delay_cleanout) { ObMemtable *memtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: write row into memtable and not fast commit, then check result is ok"); ObDatumRowkey rowkey; ObStoreRow write_row; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); ObTransID write_tx_id = ObTransID(1); ObTransID write_tx_id2 = ObTransID(2); ObStoreCtx *wtx = start_tx(write_tx_id); ObStoreCtx *wtx2 = start_tx(write_tx_id2); write_tx(wtx, memtable, 1000, /*snapshot version*/ write_row); const int64_t wtx_seq_no = ObSequence::get_max_seq_no(); ObMvccRowCallback *wtx_cb = (ObMvccRowCallback *)(get_tx_last_cb(wtx)); ObMvccRow *wtx_row = get_tx_last_mvcc_row(wtx); ObMvccTransNode *wtx_tnode = get_tx_last_tnode(wtx); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); read_row(wtx, memtable, rowkey, 1000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); read_row(wtx2, memtable, rowkey, 1100, /*snapshot version*/ 1, /*key*/ 2, /*value*/ false /*exist*/); prepare_txn(wtx, 1200 /*prepare_version*/); read_row(memtable, rowkey, 1800, /*snapshot version*/ 1, /*key*/ 2, /*value*/ false, /*exist*/ OB_ERR_SHARED_LOCK_CONFLICT, 1000000 /*expire_time*/); verify_cb(wtx_cb, memtable, wtx_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(wtx_tnode, NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_INIT, DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(wtx_row, ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, wtx_tnode, 0, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); // NB: we use the following code to mock the concurrency between tx_end and // delay cleanout ObPartTransCtx *tx_ctx = wtx->mvcc_acc_ctx_.tx_ctx_; ObMemtableCtx *mt_ctx = wtx->mvcc_acc_ctx_.mem_ctx_; tx_ctx->exec_info_.state_ = ObTxState::COMMIT; share::SCN commit_scn; commit_scn.convert_for_tx(2000); tx_ctx->ctx_tx_data_.set_commit_version(commit_scn); tx_ctx->ctx_tx_data_.set_state(ObTxData::COMMIT); read_row(wtx, memtable, rowkey, 3000, /*snapshot version*/ 1, /*key*/ 2 /*value*/); verify_cb(wtx_cb, memtable, wtx_seq_no, 1, /*key*/ true /*is_link*/); verify_tnode(wtx_tnode, NULL, /*prev tnode*/ NULL, /*next tnode*/ memtable, wtx->mvcc_acc_ctx_.tx_id_, INT64_MAX, /*trans_version*/ wtx_seq_no, 0, /*modify_count*/ ObMvccTransNode::F_INIT, DF_INSERT, 1, /*key*/ 2 /*value*/); verify_mvcc_row(wtx_row, ObDmlFlag::DF_NOT_EXIST, ObDmlFlag::DF_NOT_EXIST, wtx_tnode, INT64_MAX, /*max_trans_version*/ 1 /*total_trans_node_cnt*/); memtable->destroy(); } TEST_F(TestMemtableV2, test_seq_set_violation) { int ret = OB_SUCCESS; ObMemtable *memtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: write row into memtable"); ObDatumRowkey rowkey; ObStoreRow write_row; ObStoreRow write_row2; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 3, /*value*/ rowkey, write_row2)); ObTransID write_tx_id = ObTransID(1); ObStoreCtx *wtx = start_tx(write_tx_id); int64_t read_seq_no = ObSequence::get_max_seq_no(); share::SCN scn_3000; scn_3000.convert_for_tx(3000); start_pdml_stmt(wtx, scn_3000, read_seq_no, 1000000000/*expire_time*/); EXPECT_EQ(OB_SUCCESS, (ret = memtable->set(*wtx, tablet_id_.id(), read_info_, columns_, write_row))); start_pdml_stmt(wtx, scn_3000, read_seq_no, 1000000000/*expire_time*/); EXPECT_EQ(OB_ERR_PRIMARY_KEY_DUPLICATE, (ret = memtable->set(*wtx, tablet_id_.id(), read_info_, columns_, write_row))); memtable->destroy(); } TEST_F(TestMemtableV2, test_parallel_lock_with_same_txn) { int ret = OB_SUCCESS; ObMemtable *memtable = create_memtable(); TRANS_LOG(INFO, "######## CASE1: lock row into memtable parallelly"); ObDatumRowkey rowkey; ObStoreRow write_row; EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/ 2, /*value*/ rowkey, write_row)); ObTransID write_tx_id = ObTransID(1); ObStoreCtx *wtx = start_tx(write_tx_id); share::SCN scn_1000; scn_1000.convert_for_tx(1000); // Step1: prepare the global sequence ObSequence::inc(); int64_t read_seq_no = ObSequence::get_max_seq_no(); // Step2: init the mvcc acc ctx wtx->mvcc_acc_ctx_.type_ = ObMvccAccessCtx::T::WRITE; wtx->mvcc_acc_ctx_.snapshot_.tx_id_ = wtx->mvcc_acc_ctx_.tx_id_; wtx->mvcc_acc_ctx_.snapshot_.version_ = scn_1000; wtx->mvcc_acc_ctx_.snapshot_.scn_ = read_seq_no; const int64_t abs_expire_time = 10000000000 + ::oceanbase::common::ObTimeUtility::current_time(); wtx->mvcc_acc_ctx_.abs_lock_timeout_ = abs_expire_time; wtx->mvcc_acc_ctx_.tx_scn_ = ObSequence::inc_and_get_max_seq_no(); // Step3: lock for the first time EXPECT_EQ(OB_SUCCESS, (ret = memtable->lock(*wtx, tablet_id_.id(), read_info_, rowkey))); // Step4: lock for the second time wtx->mvcc_acc_ctx_.tx_scn_ = ObSequence::inc_and_get_max_seq_no(); EXPECT_EQ(OB_SUCCESS, (ret = memtable->lock(*wtx, tablet_id_.id(), read_info_, rowkey))); memtable->destroy(); } } // namespace unittest namespace storage { int ObTxDataTable::alloc_undo_status_node(ObUndoStatusNode *&undo_status_node) { int ret = OB_SUCCESS; undo_status_node = new ObUndoStatusNode(); return ret; } int ObTxCtxTable::acquire_ref_(const ObLSID& ls_id) { int ret = OB_SUCCESS; ls_tx_ctx_mgr_ = &unittest::TestMemtableV2::ls_tx_ctx_mgr_; TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx table acquire ref", K(ls_id), K(this)); return ret; } int ObTxCtxTable::release_ref_() { int ret = OB_SUCCESS; ls_tx_ctx_mgr_ = NULL; TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx table release ref", K(this)); return ret; } } // namespace storage namespace memtable { int ObMemtable::lock_row_on_frozen_stores_(ObStoreCtx &, const ObTxNodeArg &, const ObMemtableKey *, ObMvccRow *, const storage::ObTableReadInfo &read_info, ObMvccWriteResult &) { if (unittest::TestMemtableV2::is_sstable_contains_lock_) { return OB_TRY_LOCK_ROW_CONFLICT; } else { return OB_SUCCESS; } } } namespace transaction { int ObLSTxCtxMgr::init(const int64_t tenant_id, const ObLSID &ls_id, ObTxTable *tx_table, ObLockTable *lock_table, ObITsMgr *ts_mgr, ObTransService *txs, ObITxLogParam * param, ObITxLogAdapter * log_adapter) { int ret = OB_SUCCESS; UNUSED(log_adapter); if (is_inited_) { TRANS_LOG(WARN, "ObLSTxCtxMgr inited twice"); ret = OB_INIT_TWICE; } else { if (OB_FAIL(ls_tx_ctx_map_.init(lib::ObMemAttr(tenant_id, "LSTxCtxMgr")))) { TRANS_LOG(WARN, "ls_tx_ctx_map_ init fail", KR(ret)); } else { is_inited_ = true; state_ = State::L_WORKING; tenant_id_ = tenant_id; ls_id_ = ls_id; tx_table_ = tx_table; lock_table_ = lock_table; txs_ = txs; ts_mgr_ = ts_mgr; TRANS_LOG(INFO, "ObLSTxCtxMgr inited success", KP(this), K(ls_id)); } } return ret; } } // namespace transaction } // namespace oceanbase int main(int argc, char **argv) { system("rm -rf test_memtable.log*"); OB_LOGGER.set_file_name("test_memtable.log"); OB_LOGGER.set_log_level("INFO"); STORAGE_LOG(INFO, "begin unittest: test simple memtable"); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }