Files
oceanbase/mittest/mtlenv/storage/test_memtable_v2.cpp

3165 lines
104 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define private public
#define protected public
#include "storage/memtable/ob_memtable.h"
#include "share/rc/ob_tenant_base.h"
#include "mtlenv/mock_tenant_module_env.h"
#include "storage/tx/ob_mock_tx_ctx.h"
#include "storage/tx_table/ob_tx_table.h"
#include "storage/memtable/mvcc/ob_mvcc_row.h"
#include "storage/init_basic_struct.h"
#include "observer/ob_safe_destroy_thread.h"
namespace oceanbase
{
using namespace common;
using namespace share;
using namespace memtable;
using namespace storage;
using namespace transaction;
using namespace blocksstable;
namespace storage {
int ObTxTable::online()
{
ATOMIC_INC(&epoch_);
ATOMIC_STORE(&state_, TxTableState::ONLINE);
return OB_SUCCESS;
}
} // namespace storage
namespace memtable
{
int ObMvccWriteGuard::write_auth(storage::ObStoreCtx &) { return OB_SUCCESS; }
ObMvccWriteGuard::~ObMvccWriteGuard() {}
void *ObMemtableCtx::callback_alloc(const int64_t size)
{
void* ret = NULL;
if (OB_ISNULL(ret = std::malloc(size))) {
TRANS_LOG_RET(ERROR, OB_ALLOCATE_MEMORY_FAILED, "callback alloc error, no memory", K(size), K(*this));
} else {
ATOMIC_FAA(&callback_mem_used_, size);
ATOMIC_INC(&callback_alloc_count_);
}
return ret;
}
void ObMemtableCtx::callback_free(ObITransCallback *cb)
{
if (OB_ISNULL(cb)) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "cb is null, unexpected error", KP(cb), K(*this));
} else {
ATOMIC_INC(&callback_free_count_);
std::free(cb);
cb = NULL;
}
}
int ObMvccRow::check_double_insert_(const share::SCN ,
ObMvccTransNode &,
ObMvccTransNode *)
{
return OB_SUCCESS;
}
// int ObTxEndFunctor::operator()(ObITransCallback *callback)
// {
// int ret = OB_SUCCESS;
// if (NULL == callback) {
// ret = OB_ERR_UNEXPECTED;
// TRANS_LOG(ERROR, "unexpected callback", KP(callback));
// } else if (is_commit_
// && OB_FAIL(callback->trans_commit())) {
// TRANS_LOG(ERROR, "trans commit failed", KPC(callback));
// } else if (!is_commit_
// && OB_FAIL(callback->trans_abort())) {
// TRANS_LOG(ERROR, "trans abort failed", KPC(callback));
// } else {
// need_remove_callback_ = true;
// }
// return ret;
// }
} // namespace memtable
namespace unittest
{
int64_t TENANT_ID = 1;
int64_t LSID = 1001;
class TestMemtableV2 : public ::testing::Test
{
public:
TestMemtableV2()
: ls_id_(LSID),
tablet_id_(50001),
tenant_id_(TENANT_ID),
rowkey_cnt_(1),
value_cnt_(1),
iter_param_(),
columns_(),
allocator_(),
allocator2_(),
read_info_(),
trans_version_range_(),
query_flag_()
{
columns_.reset();
read_info_.reset();
query_flag_.use_row_cache_ = ObQueryFlag::DoNotUseCache;
query_flag_.set_not_use_fuse_row_cache();
}
private:
static ObTxTable tx_table_;
static bool is_sstable_contains_lock_;
public:
virtual void SetUp() override
{
// mock sequence no
ObClockGenerator::init();
// mock tx table
ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01));
EXPECT_EQ(OB_SUCCESS,
ls_tx_ctx_mgr_.init(tenant_id_, /*tenant_id*/
ls_id_,
&tx_table_,
(ObLockTable*)(0x01),
(ObITsMgr *)(0x01),
(ObTransService *)(0x01),
&palf_param,
nullptr));
EXPECT_EQ(OB_SUCCESS, tx_table_.tx_ctx_table_.init(ls_id_));
tx_table_.online();
tx_table_.is_inited_ = true;
tx_table_.ls_ = &ls_;
// mock columns
EXPECT_EQ(OB_SUCCESS, mock_col_desc());
// mock iterator parameter
EXPECT_EQ(OB_SUCCESS, mock_iter_param());
// mock trans version range
EXPECT_EQ(OB_SUCCESS, mock_trans_version_range());
// is_sstable_contain_lock
is_sstable_contains_lock_ = false;
TRANS_LOG(INFO, "setup success");
}
virtual void TearDown() override
{
// reset iterator parameter
reset_iter_param();
// reset columns
columns_.reset();
// reset tx table
ls_tx_ctx_mgr_.reset();
ls_tx_ctx_mgr_.ls_tx_ctx_map_.reset();
// reset sequence no
ObClockGenerator::destroy();
// reset trans version range
trans_version_range_.reset();
// reset allocator
allocator_.reset();
allocator2_.reset();
TRANS_LOG(INFO, "teardown success");
}
static void SetUpTestCase()
{
TRANS_LOG(INFO, "SetUpTestCase");
EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init());
SAFE_DESTROY_INSTANCE.init();
SAFE_DESTROY_INSTANCE.start();
ObServerCheckpointSlogHandler::get_instance().is_started_ = true;
// create ls
ObCreateLSArg arg;
EXPECT_EQ(OB_SUCCESS, gen_create_ls_arg(TENANT_ID,
ObLSID(LSID),
arg));
ObLSService* ls_svr = MTL(ObLSService*);
EXPECT_EQ(OB_SUCCESS, ls_svr->create_ls(arg));
}
static void TearDownTestCase()
{
// remove ls
ObLSID ls_id(1001);
ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->remove_ls(ls_id, false));
SAFE_DESTROY_INSTANCE.stop();
SAFE_DESTROY_INSTANCE.wait();
SAFE_DESTROY_INSTANCE.destroy();
MockTenantModuleEnv::get_instance().destroy();
TRANS_LOG(INFO, "TearDownTestCase");
}
public:
ObMemtable *create_memtable()
{
int ret = OB_SUCCESS;
ObITable::TableKey table_key;
table_key.table_type_ = ObITable::DATA_MEMTABLE;
table_key.tablet_id_ = ObTabletID(tablet_id_.id());
table_key.scn_range_.start_scn_.convert_for_tx(1);
table_key.scn_range_.end_scn_.set_max();
ObLSService* ls_svr = MTL(ObLSService*);
ObLSHandle ls_handle;
EXPECT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id_,
ls_handle,
ObLSGetMod::STORAGE_MOD));
ObMemtable *memtable = new ObMemtable();
ObFreezer *freezer = new ObFreezer;
freezer->ls_ = ls_handle.get_ls();
ObTabletMemtableMgr *memtable_mgr = new ObTabletMemtableMgr;
int64_t schema_version = 1;
uint32_t freeze_clock = 0;
EXPECT_EQ(OB_SUCCESS, memtable->init(table_key,
ls_handle,
freezer,
memtable_mgr,
schema_version,
freeze_clock));
return memtable;
}
void rollback_to_txn(ObStoreCtx *store_ctx,
const int64_t from,
const int64_t to)
{
ObUndoAction undo(from, to);
ObPartTransCtx *tx_ctx = store_ctx->mvcc_acc_ctx_.tx_ctx_;
EXPECT_EQ(OB_SUCCESS,
tx_ctx->ctx_tx_data_.add_undo_action(undo));
ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_;
ObTxCallbackList &cb_list = mt_ctx->trans_mgr_.callback_list_;
for (ObMvccRowCallback *iter = (ObMvccRowCallback *)(cb_list.get_guard()->get_next());
iter != (ObMvccRowCallback *)(cb_list.get_guard());
iter = (ObMvccRowCallback *)(iter->get_next())) {
if (iter->seq_no_ > to) {
iter->tnode_->set_delayed_cleanout(true);
}
}
}
void fast_commit_txn(ObStoreCtx *store_ctx)
{
ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_;
ObTxCallbackList &cb_list = mt_ctx->trans_mgr_.callback_list_;
ObMvccRowCallback *next = NULL;
for (ObMvccRowCallback *iter = (ObMvccRowCallback *)(cb_list.get_guard()->get_next());
iter != (ObMvccRowCallback *)(cb_list.get_guard()); iter = next) {
next = (ObMvccRowCallback *)(iter->get_next());
iter->tnode_->set_delayed_cleanout(true);
iter->remove();
}
}
void prepare_txn(ObStoreCtx *store_ctx,
const int64_t prepare_version)
{
share::SCN prepare_scn;
prepare_scn.convert_for_tx(prepare_version);
ObPartTransCtx *tx_ctx = store_ctx->mvcc_acc_ctx_.tx_ctx_;
ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_;
tx_ctx->exec_info_.state_ = ObTxState::PREPARE;
tx_ctx->exec_info_.prepare_version_ = prepare_scn;
mt_ctx->trans_version_ = prepare_scn;
}
void commit_txn(ObStoreCtx *store_ctx,
const int64_t commit_version,
const bool need_write_back = false)
{
share::SCN commit_scn;
commit_scn.convert_for_tx(commit_version);
ObPartTransCtx *tx_ctx = store_ctx->mvcc_acc_ctx_.tx_ctx_;
ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_;
tx_ctx->exec_info_.state_ = ObTxState::COMMIT;
tx_ctx->ctx_tx_data_.set_commit_version(commit_scn);
tx_ctx->ctx_tx_data_.set_state(ObTxData::COMMIT);
if (need_write_back) {
EXPECT_EQ(OB_SUCCESS, mt_ctx->trans_end(true, /*commit*/
commit_scn,
commit_scn /*commit log ts*/));
} else {
ObTxCallbackList &cb_list = mt_ctx->trans_mgr_.callback_list_;
for (ObMvccRowCallback *iter = (ObMvccRowCallback *)(cb_list.get_guard()->get_next());
iter != (ObMvccRowCallback *)(cb_list.get_guard());
iter = (ObMvccRowCallback *)(iter->get_next())) {
iter->tnode_->set_delayed_cleanout(true);
}
}
}
void abort_txn(ObStoreCtx *store_ctx,
const bool need_write_back = false)
{
ObPartTransCtx *tx_ctx = store_ctx->mvcc_acc_ctx_.tx_ctx_;
ObMemtableCtx *mt_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_;
tx_ctx->exec_info_.state_ = ObTxState::ABORT;
tx_ctx->ctx_tx_data_.set_state(ObTxData::ABORT);
if (need_write_back) {
EXPECT_EQ(OB_SUCCESS, mt_ctx->trans_end(false, /*commit*/
share::SCN::min_scn() /*commit version*/,
share::SCN::max_scn()));
} else {
ObTxCallbackList &cb_list = mt_ctx->trans_mgr_.callback_list_;
for (ObMvccRowCallback *iter = (ObMvccRowCallback *)(cb_list.get_guard()->get_next());
iter != (ObMvccRowCallback *)(cb_list.get_guard());
iter = (ObMvccRowCallback *)(iter->get_next())) {
iter->tnode_->set_delayed_cleanout(true);
}
}
}
ObStoreCtx *start_tx(const ObTransID &tx_id, const bool for_replay = false)
{
ObTxDesc *tx_desc = new ObTxDesc();
tx_desc->state_ = ObTxDesc::State::ACTIVE;
tx_desc->tx_id_ = tx_id;
tx_desc->isolation_ = ObTxIsolationLevel::RC; // used by write conflict error resolve
ObStoreCtx *store_ctx = new ObStoreCtx;
MockObTxCtx *tx_ctx = new MockObTxCtx;
ObTxData *tx_data = new ObTxData;
tx_data->reset();
tx_data->tx_id_ = tx_id;
tx_ctx->init(ls_id_,
tx_id,
&ls_tx_ctx_mgr_,
tx_data, // ObTxData
NULL); // mailbox_mgr
store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ = 0; // nowait
store_ctx->mvcc_acc_ctx_.tx_desc_ = tx_desc;
store_ctx->mvcc_acc_ctx_.tx_id_ = tx_id;
store_ctx->mvcc_acc_ctx_.tx_ctx_ = tx_ctx;
store_ctx->mvcc_acc_ctx_.mem_ctx_ = &(tx_ctx->mt_ctx_);
store_ctx->mvcc_acc_ctx_.mem_ctx_->set_trans_ctx(tx_ctx);
store_ctx->mvcc_acc_ctx_.mem_ctx_->get_tx_table_guard()->init(&tx_table_);
tx_ctx->mt_ctx_.log_gen_.set(&(tx_ctx->mt_ctx_.trans_mgr_),
&(tx_ctx->mt_ctx_));
store_ctx->mvcc_acc_ctx_.snapshot_.tx_id_ = tx_id;
store_ctx->mvcc_acc_ctx_.tx_table_guard_.init(&tx_table_);
if (for_replay) {
store_ctx->mvcc_acc_ctx_.mem_ctx_->commit_to_replay();
}
if (ObTransID(READ_TX_ID) != tx_id) {
EXPECT_EQ(OB_SUCCESS, ls_tx_ctx_mgr_.ls_tx_ctx_map_.insert_and_get(tx_id, tx_ctx, NULL));
}
return store_ctx;
}
void start_stmt(ObStoreCtx *store_ctx,
const share::SCN snapshot_scn,
const int64_t expire_time = 10000000000)
{
ObSequence::inc();
store_ctx->mvcc_acc_ctx_.type_ = ObMvccAccessCtx::T::WRITE;
store_ctx->mvcc_acc_ctx_.snapshot_.tx_id_ = store_ctx->mvcc_acc_ctx_.tx_id_;
store_ctx->mvcc_acc_ctx_.snapshot_.version_ = snapshot_scn;
store_ctx->mvcc_acc_ctx_.snapshot_.scn_ = ObSequence::get_max_seq_no();
const int64_t abs_expire_time = expire_time + ::oceanbase::common::ObTimeUtility::current_time();
store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ = abs_expire_time;
store_ctx->mvcc_acc_ctx_.tx_scn_ = ObSequence::inc_and_get_max_seq_no();
}
void start_pdml_stmt(ObStoreCtx *store_ctx,
const share::SCN snapshot_scn,
const int64_t read_seq_no,
const int64_t expire_time = 10000000000)
{
ObSequence::inc();
store_ctx->mvcc_acc_ctx_.type_ = ObMvccAccessCtx::T::WRITE;
store_ctx->mvcc_acc_ctx_.snapshot_.tx_id_ = store_ctx->mvcc_acc_ctx_.tx_id_;
store_ctx->mvcc_acc_ctx_.snapshot_.version_ = snapshot_scn;
store_ctx->mvcc_acc_ctx_.snapshot_.scn_ = read_seq_no;
const int64_t abs_expire_time = expire_time + ::oceanbase::common::ObTimeUtility::current_time();
store_ctx->mvcc_acc_ctx_.abs_lock_timeout_ = abs_expire_time;
store_ctx->mvcc_acc_ctx_.tx_scn_ = ObSequence::inc_and_get_max_seq_no();
}
void print_callback(ObStoreCtx *wtx)
{
TRANS_LOG(INFO, "========== START PRINT CALLBACK ===========", K(wtx->mvcc_acc_ctx_.tx_id_));
wtx->mvcc_acc_ctx_.mem_ctx_->print_callbacks();
TRANS_LOG(INFO, "=========== END PRINT CALLBACK ============", K(wtx->mvcc_acc_ctx_.tx_id_));
}
void write_tx(ObStoreCtx *wtx,
ObMemtable *memtable,
const int64_t snapshot,
const ObStoreRow &write_row,
const int expect_ret = OB_SUCCESS,
const int64_t expire_time = 10000000000)
{
int ret = OB_SUCCESS;
TRANS_LOG(INFO, "====================== start write tx =====================",
K(wtx->mvcc_acc_ctx_.tx_id_), K(*wtx), K(snapshot), K(expire_time), K(write_row));
share::SCN snapshot_scn;
snapshot_scn.convert_for_tx(snapshot);
start_stmt(wtx, snapshot_scn, expire_time);
EXPECT_EQ(expect_ret, (ret = memtable->set(*wtx,
tablet_id_.id(),
read_info_,
columns_,
write_row)));
TRANS_LOG(INFO, "======================= end write tx ======================",
K(ret), K(wtx->mvcc_acc_ctx_.tx_id_), K(*wtx), K(snapshot), K(expire_time), K(write_row));
}
void lock_tx(ObStoreCtx *ltx,
ObMemtable *memtable,
const int64_t snapshot,
const ObDatumRowkey &rowkey,
const int expect_ret = OB_SUCCESS,
const int64_t expire_time = 10000000000)
{
int ret = OB_SUCCESS;
TRANS_LOG(INFO, "====================== start lock tx =====================",
K(ltx->mvcc_acc_ctx_.tx_id_), K(*ltx), K(snapshot), K(expire_time), K(rowkey));
share::SCN snapshot_scn;
snapshot_scn.convert_for_tx(snapshot);
start_stmt(ltx, snapshot_scn, expire_time);
EXPECT_EQ(expect_ret, (ret = memtable->lock(*ltx,
tablet_id_.id(),
read_info_,
rowkey)));
TRANS_LOG(INFO, "======================= end lock tx ======================",
K(ret), K(ltx->mvcc_acc_ctx_.tx_id_), K(*ltx), K(snapshot), K(expire_time), K(rowkey));
}
void write_no_value_tx(ObStoreCtx *ltx,
ObMemtable *memtable,
const int64_t snapshot,
const ObDatumRowkey &rowkey,
const int expect_ret = OB_SUCCESS,
const int64_t expire_time = 10000000000)
{
int ret = OB_SUCCESS;
TRANS_LOG(INFO, "====================== start lock tx =====================",
K(ltx->mvcc_acc_ctx_.tx_id_), K(*ltx), K(snapshot), K(expire_time), K(rowkey));
share::SCN snapshot_scn;
snapshot_scn.convert_for_tx(snapshot);
start_stmt(ltx, snapshot_scn, expire_time);
EXPECT_EQ(expect_ret, (ret = memtable->lock(*ltx,
tablet_id_.id(),
read_info_,
rowkey)));
ObMvccTransNode *node = get_tx_last_tnode(ltx);
((ObMemtableDataHeader *)(node->buf_))->dml_flag_ = blocksstable::ObDmlFlag::DF_INSERT;
TRANS_LOG(INFO, "mock tnode with no value", KPC(node));
TRANS_LOG(INFO, "======================= end lock tx ======================",
K(ret), K(ltx->mvcc_acc_ctx_.tx_id_), K(*ltx), K(snapshot), K(expire_time), K(rowkey));
}
bool is_write_set_empty(ObStoreCtx *ctx)
{
ObMemtableCtx *mem_ctx = ctx->mvcc_acc_ctx_.mem_ctx_;
ObTxCallbackList &callback_list = mem_ctx->trans_mgr_.callback_list_;
return 0 == callback_list.length_;
}
ObMvccRowCallback *get_tx_last_cb(ObStoreCtx *store_ctx)
{
ObMemtableCtx *mem_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_;
ObTxCallbackList &callback_list = mem_ctx->trans_mgr_.callback_list_;
return (ObMvccRowCallback *)callback_list.get_tail();
}
ObMvccRowCallback *get_tx_first_cb(ObStoreCtx *store_ctx)
{
ObMemtableCtx *mem_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_;
ObTxCallbackList &callback_list = mem_ctx->trans_mgr_.callback_list_;
return (ObMvccRowCallback *)callback_list.get_guard()->next_;
}
ObMvccTransNode *get_tx_last_tnode(ObStoreCtx *store_ctx)
{
return get_tx_last_cb(store_ctx)->tnode_;
}
ObMvccTransNode *get_tx_first_tnode(ObStoreCtx *store_ctx)
{
return get_tx_first_cb(store_ctx)->tnode_;
}
ObMvccRow *get_tx_last_mvcc_row(ObStoreCtx *store_ctx)
{
return &(get_tx_last_cb(store_ctx)->value_);
}
ObMvccRow *get_tx_first_mvcc_row(ObStoreCtx *store_ctx)
{
return &(get_tx_first_cb(store_ctx)->value_);
}
void read_row(ObMemtable *memtable,
const ObDatumRowkey &rowkey,
const int64_t snapshot,
int64_t k,
int64_t v,
const bool exist = true,
const int expect_ret = OB_SUCCESS,
const int64_t expire_time = 10000000000)
{
ObTransID read_tx_id = ObTransID(READ_TX_ID);
ObStoreCtx *rtx = start_tx(read_tx_id);
read_row(rtx,
memtable,
rowkey,
snapshot,
k,
v,
exist,
expect_ret,
expire_time);
}
void read_row(ObStoreCtx *rtx,
ObMemtable *memtable,
const ObDatumRowkey &rowkey,
const int64_t snapshot,
int64_t k,
int64_t v,
const bool exist = true,
const int expect_ret = OB_SUCCESS,
const int64_t expire_time = 10000000000)
{
int ret = OB_SUCCESS;
ObDatumRow read_row;
ObTableAccessContext access_context;
TRANS_LOG(INFO, "====================== start read row =====================",
K(rtx->mvcc_acc_ctx_.tx_id_), K(*rtx), K(snapshot), K(expire_time));
share::SCN snapshot_scn;
snapshot_scn.convert_for_tx(snapshot);
start_stmt(rtx, snapshot_scn, expire_time);
EXPECT_EQ(OB_SUCCESS, access_context.init(query_flag_,
*rtx,
allocator_,
trans_version_range_));
EXPECT_EQ(expect_ret, (ret = memtable->get(iter_param_,
access_context,
rowkey,
read_row)));
if (OB_SUCC(ret)) {
if (!exist) {
EXPECT_EQ(true, read_row.row_flag_.is_not_exist());
} else {
EXPECT_EQ(true, read_row.row_flag_.is_exist());
ObStorageDatum *cells = read_row.storage_datums_;
int64_t count = read_row.count_;
EXPECT_EQ(rowkey_cnt_ + value_cnt_, count);
for (int64_t i = 0; i < count; i++) {
int64_t row_v;
row_v = cells[i].get_int();
if (i == 0) {
EXPECT_EQ(k, row_v);
} else if (i == 1) {
EXPECT_EQ(v, row_v);
} else {
ob_abort();
}
}
}
}
TRANS_LOG(INFO, "read row success", K(ret), K(read_row));
TRANS_LOG(INFO, "====================== end read row =====================",
K(rtx->mvcc_acc_ctx_.tx_id_), K(*rtx), K(snapshot), K(expire_time));
}
void compact_row(ObMvccRow *row,
ObMemtable *memtable,
int64_t snapshot_version,
const bool for_replay)
{
ASSERT_NE(NULL, (long)row);
TRANS_LOG(INFO, "====================== start compact row =====================",
K(*row), K(snapshot_version));
share::SCN snapshot_scn;
snapshot_scn.convert_for_tx(snapshot_version);
EXPECT_EQ(OB_SUCCESS, row->row_compact(memtable,
for_replay,
snapshot_scn,
&allocator2_));
TRANS_LOG(INFO, "====================== end compact row =====================",
K(*row), K(snapshot_version));
}
int mock_col_desc()
{
share::schema::ObColDesc col_desc;
col_desc.col_id_ = OB_APP_MIN_COLUMN_ID;
col_desc.col_type_.set_type(ObIntType);
col_desc.col_type_.set_collation_type(CS_TYPE_UTF8MB4_BIN);
columns_.push_back(col_desc);
share::schema::ObColDesc col_desc2;
col_desc2.col_id_ = OB_APP_MIN_COLUMN_ID + 1;
col_desc2.col_type_.set_type(ObIntType);
col_desc2.col_type_.set_collation_type(CS_TYPE_UTF8MB4_BIN);
columns_.push_back(col_desc2);
return OB_SUCCESS;
}
int mock_row(const int64_t key,
const int64_t value,
ObDatumRowkey &rowkey,
ObStoreRow &row)
{
rowkey_datums_[0].set_int(key);
rowkey_datums_[1].set_int(value);
rowkey.assign(rowkey_datums_, 1);
ObObj *obj = new ObObj[2];
obj[0].set_int(key);
obj[1].set_int(value);
row.row_val_.cells_ = obj;
row.row_val_.count_ = 2;
row.row_val_.projector_ = NULL;
row.flag_.set_flag(ObDmlFlag::DF_INSERT);
rowkey.store_rowkey_.assign(obj, 1);
return OB_SUCCESS;
}
int mock_delete(const int64_t key,
ObDatumRowkey &rowkey,
ObStoreRow &row)
{
rowkey_datums_[0].set_int(key);
rowkey.assign(rowkey_datums_, 1);
ObObj *obj = new ObObj[1];
obj[0].set_int(key);
row.row_val_.cells_ = obj;
row.row_val_.count_ = 2;
row.row_val_.projector_ = NULL;
row.flag_.set_flag(ObDmlFlag::DF_DELETE);
rowkey.store_rowkey_.assign(obj, 1);
return OB_SUCCESS;
}
int mock_row(const int64_t key,
const int64_t value,
ObStoreRowkey &rowkey,
ObStoreRow &row)
{
ObObj *obj = new ObObj[2];
obj[0].set_int(key);
obj[1].set_int(value);
rowkey.assign(obj, 1);
row.row_val_.cells_ = obj;
row.row_val_.count_ = 2;
row.row_val_.projector_ = NULL;
row.flag_.set_flag(ObDmlFlag::DF_INSERT);
return OB_SUCCESS;
}
void mock_replay_iterator(ObStoreCtx *store_ctx,
ObMemtableMutatorIterator &mmi)
{
mmi.reset();
int64_t serialize_pos = 0;
int64_t deserialize_pos = 0;
ObCLogEncryptInfo encrypt_info;
ObRedoLogSubmitHelper helper;
ObIMemtableCtx *mem_ctx = store_ctx->mvcc_acc_ctx_.mem_ctx_;
char *redo_log_buffer = new char[REDO_BUFFER_SIZE];
encrypt_info.init();
EXPECT_EQ(OB_SUCCESS, mem_ctx->fill_redo_log(redo_log_buffer,
REDO_BUFFER_SIZE,
serialize_pos,
helper));
EXPECT_EQ(OB_SUCCESS, mmi.deserialize(redo_log_buffer,
serialize_pos,
deserialize_pos,
encrypt_info));
}
void replay_tx(ObStoreCtx *store_ctx,
ObMemtable *memtable,
const int64_t replay_log_ts,
ObMemtableMutatorIterator &mmi)
{
int ret = OB_SUCCESS;
bool can_continue = true;
while (can_continue) {
if (OB_FAIL(mmi.iterate_next_row())) {
if (OB_ITER_END != ret) {
TRANS_LOG(ERROR, "get row head failed", K(ret));
}
can_continue = false;
} else {
ObEncryptRowBuf row_buf;
TRANS_LOG(INFO, "TEST_MEMTABLE V2: replay row",
K(*store_ctx));
share::SCN replay_scn;
replay_scn.convert_for_tx(replay_log_ts);
store_ctx->mvcc_acc_ctx_.mem_ctx_->set_redo_scn(replay_scn);
EXPECT_EQ(OB_SUCCESS, memtable->replay_row(*store_ctx,
&mmi,
row_buf));
}
}
}
int mock_iter_param()
{
// iter_param_.rowkey_cnt_ = rowkey_cnt_;
iter_param_.tablet_id_ = tablet_id_;
iter_param_.table_id_ = tablet_id_.id();
read_info_.init(allocator_, 16000, rowkey_cnt_, lib::is_oracle_mode(), columns_);
iter_param_.read_info_ = &read_info_;
return OB_SUCCESS;
}
void reset_iter_param()
{
iter_param_.reset();
read_info_.reset();
}
int mock_trans_version_range()
{
trans_version_range_.base_version_ = 0;
trans_version_range_.multi_version_start_ = 0;
trans_version_range_.snapshot_version_ = INT64_MAX - 2;
return OB_SUCCESS;
}
void verify_cb(ObMvccRowCallback *cb,
const ObMemtable *mt,
const int64_t seq_no,
const int64_t k,
const bool is_link = true,
const bool need_fill_redo = true,
const int64_t log_ts = INT64_MAX)
{
ASSERT_NE(NULL, (long)cb);
TRANS_LOG(INFO, "=============== VERIFY TRANS CALLBACK START ===============", K(*cb));
share::SCN scn;
scn.convert_for_tx(log_ts);
EXPECT_EQ(mt, cb->memtable_);
EXPECT_EQ(scn, cb->scn_);
EXPECT_EQ(is_link, cb->is_link_);
EXPECT_EQ(need_fill_redo, cb->need_fill_redo_);
EXPECT_EQ(need_fill_redo, cb->need_submit_log_);
EXPECT_EQ(seq_no, cb->seq_no_);
ObStoreRowkey *rowkey = cb->key_.rowkey_;
ObObj *key = rowkey->get_obj_ptr();
int64_t row_k;
key->get_int(row_k);
EXPECT_EQ(k, row_k);
TRANS_LOG(INFO, "=============== VERIFY TRANS CALLBACK END ===============", K(*cb));
}
void verify_tnode(const ObMvccTransNode *tnode,
const ObMvccTransNode *prev,
const ObMvccTransNode *next,
const ObMemtable *mt,
const ObTransID &tx_id,
const int64_t trans_version,
const int64_t seq_no,
const uint32_t modify_count,
const uint8_t tnode_flag,
const ObDmlFlag dml_flag,
const int64_t k,
const int64_t v,
const int64_t log_ts = INT64_MAX,
const uint8_t ndt_type = NDT_NORMAL)
{
ASSERT_NE(NULL, (long)tnode);
TRANS_LOG(INFO, "=============== VERIFY TRANS NODE START ===============", K(*tnode));
share::SCN scn;
scn.convert_for_tx(log_ts);
EXPECT_EQ(tx_id, tnode->tx_id_);
EXPECT_EQ(trans_version, tnode->trans_version_.get_val_for_tx());
EXPECT_EQ(scn, tnode->scn_);
EXPECT_EQ(seq_no, tnode->seq_no_);
EXPECT_EQ(prev, tnode->prev_);
EXPECT_EQ(next, tnode->next_);
EXPECT_EQ(modify_count, tnode->modify_count_);
// EXPECT_EQ(0, tnode->acc_checksum_);
EXPECT_EQ(mt->get_timestamp(), tnode->version_);
EXPECT_EQ(ndt_type, tnode->type_);
EXPECT_EQ(tnode_flag, tnode->flag_);
int ret = OB_SUCCESS;
const ObMemtableDataHeader *mtd = reinterpret_cast<const ObMemtableDataHeader *>(tnode->buf_);
ObArenaAllocator allocator;
ObDatumRow datum_row;
ObRowReader row_reader;
const blocksstable::ObRowHeader *row_header = nullptr;
if (OB_FAIL(row_reader.read_row_header(mtd->buf_, mtd->buf_len_, row_header))) {
CLOG_LOG(WARN, "Failed to read row header", K(ret));
} else if (OB_FAIL(datum_row.init(allocator, row_header->get_column_count()))) {
CLOG_LOG(WARN, "Failed to init datum row", K(ret));
} else if (OB_FAIL(row_reader.read_row(mtd->buf_, mtd->buf_len_, nullptr, datum_row))) {
CLOG_LOG(WARN, "Failed to read datum row", K(ret));
} else {
EXPECT_EQ(dml_flag, mtd->dml_flag_);
TRANS_LOG(INFO, "TEST_MEMTABLE_V2 row: ", K(*tnode), K(mtd));
}
for (int64_t i = 0; OB_SUCC(ret) && i < datum_row.get_column_count(); i++) {
int64_t row_v = datum_row.storage_datums_[i].get_int();
if (i == 0) {
EXPECT_EQ(k, row_v);
} else {
EXPECT_EQ(v, row_v);
}
TRANS_LOG(INFO, " TEST_MEMTABLE_V2 column: ", K(i), K(datum_row.storage_datums_[i]));
}
TRANS_LOG(INFO, "=============== VERIFY TRANS NODE END ===============", K(*tnode));
}
void verify_wtx(ObStoreCtx *wtx,
ObMemtable *wmt,
ObMvccTransNode *prev,
ObMvccTransNode *next,
int64_t seq_no,
uint32_t modify_count,
int64_t k,
int64_t v)
{
ObMvccRowCallback *cb = get_tx_last_cb(wtx);
ObMvccTransNode *tnode = cb->tnode_;
EXPECT_NE(NULL, (long)cb);
EXPECT_NE(NULL, (long)tnode);
TRANS_LOG(INFO, "=============== VERIFY TRANS CALLBACK ===============", K(*cb));
EXPECT_EQ(wmt, cb->memtable_);
EXPECT_EQ(share::SCN::max_scn(), cb->scn_);
EXPECT_EQ(true, cb->is_link_);
EXPECT_EQ(true, cb->need_fill_redo_);
EXPECT_EQ(true, cb->need_submit_log_);
EXPECT_EQ(seq_no, cb->seq_no_);
ObStoreRowkey *rowkey = cb->key_.rowkey_;
ObObj *key = rowkey->get_obj_ptr();
int64_t row_k;
key->get_int(row_k);
EXPECT_EQ(k, row_k);
TRANS_LOG(INFO, "=============== VERIFY TRANS CALLBACK ===============", K(*cb));
TRANS_LOG(INFO, "=============== VERIFY TRANS NODE START ===============", K(*tnode));
EXPECT_EQ(wtx->mvcc_acc_ctx_.tx_id_, tnode->tx_id_);
EXPECT_EQ(share::SCN::max_scn(), tnode->trans_version_);
EXPECT_EQ(share::SCN::max_scn(), tnode->scn_);
EXPECT_EQ(seq_no, tnode->seq_no_);
EXPECT_EQ(prev, tnode->prev_);
EXPECT_EQ(next, tnode->next_);
EXPECT_EQ(modify_count, tnode->modify_count_);
EXPECT_EQ(0, tnode->acc_checksum_);
EXPECT_EQ(wmt->get_timestamp(), tnode->version_);
EXPECT_EQ(NDT_NORMAL, tnode->type_);
EXPECT_EQ(ObMvccTransNode::F_INIT, tnode->flag_);
int ret = OB_SUCCESS;
const ObMemtableDataHeader *mtd = reinterpret_cast<const ObMemtableDataHeader *>(tnode->buf_);
ObArenaAllocator allocator;
ObDatumRow datum_row;
ObRowReader row_reader;
const blocksstable::ObRowHeader *row_header = nullptr;
if (OB_FAIL(row_reader.read_row_header(mtd->buf_, mtd->buf_len_, row_header))) {
CLOG_LOG(WARN, "Failed to read row header", K(ret));
} else if (OB_FAIL(datum_row.init(allocator, row_header->get_column_count()))) {
CLOG_LOG(WARN, "Failed to init datum row", K(ret));
} else if (OB_FAIL(row_reader.read_row(mtd->buf_, mtd->buf_len_, nullptr, datum_row))) {
CLOG_LOG(WARN, "Failed to read datum row", K(ret));
} else {
EXPECT_EQ(ObDmlFlag::DF_INSERT, mtd->dml_flag_);
TRANS_LOG(INFO, "TEST_MEMTABLE_V2 row: ", K(*tnode), K(mtd));
}
for (int64_t i = 0; OB_SUCC(ret) && i < datum_row.get_column_count(); i++) {
int64_t row_v = datum_row.storage_datums_[i].get_int();
if (i == 0) {
EXPECT_EQ(k, row_v);
} else {
EXPECT_EQ(v, row_v);
}
TRANS_LOG(INFO, " TEST_MEMTABLE_V2 column: ", K(i), K(datum_row.storage_datums_[i]));
}
TRANS_LOG(INFO, "=============== VERIFY TRANS NODE END ===============", K(*tnode));
}
void verify_mvcc_row(ObMvccRow *row,
const int8_t first_dml,
const int8_t last_dml,
const ObMvccTransNode *list_head,
const int64_t max_trans_version,
/*const int64_t max_elr_trans_version,*/
const int64_t total_trans_node_cnt,
const uint8_t flag = ObMvccRow::F_HASH_INDEX | ObMvccRow::F_BTREE_INDEX)
{
TRANS_LOG(INFO, "=============== VERIFY MVCC ROW START ===============", K(*row));
EXPECT_EQ(flag, row->flag_);
EXPECT_EQ(first_dml, row->first_dml_flag_);
EXPECT_EQ(last_dml, row->last_dml_flag_);
EXPECT_EQ(list_head, row->list_head_);
EXPECT_EQ(total_trans_node_cnt, row->total_trans_node_cnt_);
TRANS_LOG(INFO, "=============== VERIFY MVCC ROW END ===============", K(*row));
}
private:
static const int64_t READ_TX_ID = 987654321;
static const int64_t UNUSED_VALUE = -1;
static const int64_t REDO_BUFFER_SIZE = 2L * 1024L * 1024L;
private:
static ObLSTxCtxMgr ls_tx_ctx_mgr_;
static ObLS ls_;
const ObLSID ls_id_;
const ObTabletID tablet_id_;
const int64_t tenant_id_;
const int64_t rowkey_cnt_;
const int64_t value_cnt_;
ObTableIterParam iter_param_;
ObSEArray<share::schema::ObColDesc, 2> columns_;
ObStorageDatum rowkey_datums_[2];
ObArenaAllocator allocator_;
ObArenaAllocator allocator2_;
ObTableReadInfo read_info_;
ObVersionRange trans_version_range_;
ObQueryFlag query_flag_;
char redo_log_buffer_[REDO_BUFFER_SIZE];
};
ObLSTxCtxMgr TestMemtableV2::ls_tx_ctx_mgr_;
ObTxTable TestMemtableV2::tx_table_;
ObLS TestMemtableV2::ls_;
bool TestMemtableV2::is_sstable_contains_lock_;
TEST_F(TestMemtableV2, test_write_read_conflict)
{
ObMemtable *memtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: write row into memtable");
ObDatumRowkey rowkey;
ObStoreRow write_row;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
ObTransID write_tx_id = ObTransID(1);
ObStoreCtx *wtx = start_tx(write_tx_id);
write_tx(wtx,
memtable,
1000, /*snapshot version*/
write_row);
const int64_t wtx_seq_no = ObSequence::get_max_seq_no();
verify_cb(get_tx_last_cb(wtx),
memtable,
wtx_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(wtx),
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_INIT,
DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(wtx),
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
get_tx_last_tnode(wtx),
0, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
2 /*value*/);
TRANS_LOG(INFO, "######## CASE2: read row(running during txn) from memtable with empty result");
read_row(memtable,
rowkey,
1800, /*snapshot version*/
1, /*key*/
2, /*value*/
false /*exist*/);
TRANS_LOG(INFO, "######## CASE3: read row(prepare during txn) from memtable with lock for read skipping");
prepare_txn(wtx, 1500/*prepare_version*/);
read_row(memtable,
rowkey,
1200, /*snapshot version*/
1, /*key*/
2, /*value*/
false /*exist*/);
TRANS_LOG(INFO, "######## CASE4: read row(prepare during txn) from memtable with lock for read blocking");
read_row(memtable,
rowkey,
1800, /*snapshot version*/
1, /*key*/
2, /*value*/
false, /*exist*/
OB_ERR_SHARED_LOCK_CONFLICT,
1000000 /*expire_time*/);
TRANS_LOG(INFO, "######## CASE5: read row(commit during txn) from memtable with lock for read success");
commit_txn(wtx,
2000,/*commit_version*/
false/*need_write_back*/);
read_row(memtable,
rowkey,
3000, /*snapshot version*/
1, /*key*/
2 /*value*/);
verify_cb(get_tx_last_cb(wtx),
memtable,
wtx_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(wtx),
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
2000, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(wtx),
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
get_tx_last_tnode(wtx),
2000, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
TRANS_LOG(INFO, "######## CASE6: read row(WRRITTEN during txn) from memtable with lock for read success");
ObMvccTransNode *tmp_node = get_tx_last_tnode(wtx);
ObMvccRow *tmp_row = get_tx_last_mvcc_row(wtx);
commit_txn(wtx,
2000,/*commit_version*/
true /*need_write_back*/);
read_row(memtable,
rowkey,
3000, /*snapshot version*/
1, /*key*/
2 /*value*/);
verify_tnode(tmp_node,
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
2000, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(tmp_row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
tmp_node,
2000, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
memtable->destroy();
}
TEST_F(TestMemtableV2, test_tx_abort)
{
ObMemtable *memtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: write row into memtable");
ObDatumRowkey rowkey;
ObStoreRow write_row;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
ObTransID write_tx_id = ObTransID(1);
ObStoreCtx *wtx = start_tx(write_tx_id);
write_tx(wtx,
memtable,
1000, /*snapshot version*/
write_row);
const int64_t wtx_seq_no = ObSequence::get_max_seq_no();
verify_cb(get_tx_last_cb(wtx),
memtable,
wtx_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(wtx),
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(wtx),
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
get_tx_last_tnode(wtx),
0, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
2 /*value*/);
TRANS_LOG(INFO, "######## CASE2: read row(abort during txn) from memtable with lock for read no data");
abort_txn(wtx, false/*need_write_back*/);
read_row(memtable,
rowkey,
3000, /*snapshot version*/
-1, /*key*/
-1, /*value*/
false /*exist*/);
verify_cb(get_tx_last_cb(wtx),
memtable,
wtx_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(wtx),
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_ABORTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(wtx),
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
NULL,
0, /*max_trans_version*/
0 /*total_trans_node_cnt*/);
TRANS_LOG(INFO, "######## CASE3: read row(WRRITTEN during txn) from memtable with lock for read success");
ObMvccRow *tmp_row = get_tx_last_mvcc_row(wtx);
ObMvccTransNode *tmp_node = get_tx_last_tnode(wtx);
abort_txn(wtx, true /*need_write_back*/);
read_row(memtable,
rowkey,
3000, /*snapshot version*/
-1, /*key*/
-1, /*value*/
false /*exist*/);
verify_tnode(tmp_node,
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_ABORTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(tmp_row,
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
NULL, /*list_head*/
0, /*max_trans_version*/
0 /*total_trans_node_cnt*/);
memtable->destroy();
}
TEST_F(TestMemtableV2, test_write_write_conflict)
{
ObMemtable *memtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: txn1 write row into memtable");
ObDatumRowkey rowkey;
ObStoreRow write_row;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
ObTransID write_tx_id = ObTransID(1);
ObStoreCtx *wtx = start_tx(write_tx_id);
write_tx(wtx,
memtable,
1000, /*snapshot version*/
write_row);
verify_cb(get_tx_last_cb(wtx),
memtable,
ObSequence::get_max_seq_no(),
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(wtx),
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
ObSequence::get_max_seq_no(),
0, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(wtx),
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
get_tx_last_tnode(wtx),
0, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
2 /*value*/);
TRANS_LOG(INFO, "######## CASE2: txn2 write row into memtable, lock for write failed");
ObDatumRowkey rowkey2;
ObStoreRow write_row2;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
3, /*value*/
rowkey2,
write_row2));
ObTransID write_tx_id2 = ObTransID(2);
ObStoreCtx *wtx2 = start_tx(write_tx_id2);
write_tx(wtx2,
memtable,
1200, /*snapshot version*/
write_row2,
OB_TRY_LOCK_ROW_CONFLICT);
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
2 /*value*/);
TRANS_LOG(INFO, "######## CASE3: txn1 write row into memtable, lock for write succeed");
ObDatumRowkey rowkey3;
ObStoreRow write_row3;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
4, /*value*/
rowkey3,
write_row3));
ObMvccTransNode *tmp_node = get_tx_last_tnode(wtx);
write_tx(wtx,
memtable,
1200, /*snapshot version*/
write_row3);
ObMvccTransNode *wtx_case3_tnode = get_tx_last_tnode(wtx);
verify_cb(get_tx_last_cb(wtx),
memtable,
ObSequence::get_max_seq_no(),
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(wtx),
tmp_node, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
ObSequence::get_max_seq_no(),
1, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_INSERT,
1, /*key*/
4 /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(wtx),
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
get_tx_last_tnode(wtx),
0, /*max_trans_version*/
2 /*total_trans_node_cnt*/);
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
4 /*value*/);
TRANS_LOG(INFO, "######## CASE4: txn2 write row(prepare during txn) into memtable, lock for write failed");
prepare_txn(wtx, 1500/*prepare_version*/);
write_tx(wtx2,
memtable,
1200, /*snapshot version*/
write_row2,
OB_TRY_LOCK_ROW_CONFLICT);
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
4 /*value*/);
TRANS_LOG(INFO, "######## CASE5: txn2 write row(commit during txn) into memtable, lock for write encounters tsc");
commit_txn(wtx,
2000,/*commit_version*/
false/*need_write_back*/);
write_tx(wtx2,
memtable,
1800, /*snapshot version*/
write_row2,
OB_TRANSACTION_SET_VIOLATION);
read_row(wtx,
memtable,
rowkey,
3000, /*snapshot version*/
1, /*key*/
4 /*value*/);
TRANS_LOG(INFO, "######## CASE6: txn2 write row(commit during txn) into memtable, lock for write succeed");
write_tx(wtx2,
memtable,
2100, /*snapshot version*/
write_row2);
const int64_t wtx2_seq_no = ObSequence::get_max_seq_no();
verify_cb(get_tx_last_cb(wtx2),
memtable,
wtx2_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(wtx2),
get_tx_last_cb(wtx)->tnode_, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx2->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx2_seq_no,
2, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_INSERT,
1, /*key*/
3 /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(wtx2),
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
get_tx_last_tnode(wtx2),
2000, /*max_trans_version*/
3 /*total_trans_node_cnt*/);
read_row(wtx2,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
3 /*value*/);
ObMvccTransNode *wtx_last_tnode = get_tx_last_tnode(wtx);
commit_txn(wtx,
2000,/*commit_version*/
true/*need_write_back*/);
TRANS_LOG(INFO, "######## CASE7: txn2 abort, undo mvcc row");
abort_txn(wtx2, false/*need_write_back*/);
read_row(memtable,
rowkey,
3000, /*snapshot version*/
1, /*key*/
4 /*value*/);
verify_tnode(get_tx_last_tnode(wtx2),
wtx_last_tnode, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx2->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx2_seq_no,
2, /*modify_count*/
ObMvccTransNode::F_ABORTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_INSERT,
1, /*key*/
3 /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(wtx2),
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
wtx_case3_tnode, /*list_head*/
2000, /*max_trans_version*/
2 /*total_trans_node_cnt*/);
memtable->destroy();
}
TEST_F(TestMemtableV2, test_lock)
{
ObMemtable *memtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: txn1 lock row in memtable");
ObDatumRowkey rowkey;
ObStoreRow tmp_row;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
tmp_row));
ObTransID lock_tx_id = ObTransID(1);
ObStoreCtx *ltx = start_tx(lock_tx_id);
lock_tx(ltx,
memtable,
1000, /*snapshot version*/
rowkey);
const int64_t wtx_seq_no = ObSequence::get_max_seq_no();
verify_cb(get_tx_last_cb(ltx),
memtable,
wtx_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(ltx),
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
ltx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_LOCK,
1, /*key*/
UNUSED_VALUE /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(ltx),
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
get_tx_last_tnode(ltx),
0, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
read_row(ltx,
memtable,
rowkey,
1200, /*snapshot version*/
1, /*key*/
UNUSED_VALUE, /*value*/
false /*exist*/);
TRANS_LOG(INFO, "######## CASE2: other txn read row in memtable with no data");
read_row(memtable,
rowkey,
1200, /*snapshot version*/
-1, /*key*/
-1, /*value*/
false /*exist*/);
TRANS_LOG(INFO, "######## CASE3: txn2 write row in memtable with lock for write failed");
ObDatumRowkey rowkey2;
ObStoreRow write_row2;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
3, /*value*/
rowkey2,
write_row2));
ObTransID write_tx_id2 = ObTransID(2);
ObStoreCtx *wtx2 = start_tx(write_tx_id2);
write_tx(wtx2,
memtable,
1200, /*snapshot version*/
write_row2,
OB_TRY_LOCK_ROW_CONFLICT);
TRANS_LOG(INFO, "######## CASE4: txn2 lock row in memtable with lock for write failed");
lock_tx(wtx2,
memtable,
1200, /*snapshot version*/
rowkey,
OB_TRY_LOCK_ROW_CONFLICT);
TRANS_LOG(INFO, "######## CASE5: txn1 lock row in memtable with no new lock");
lock_tx(ltx,
memtable,
1200, /*snapshot version*/
rowkey);
verify_cb(get_tx_last_cb(ltx),
memtable,
wtx_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(ltx),
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
ltx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_LOCK,
1, /*key*/
UNUSED_VALUE /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(ltx),
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
get_tx_last_tnode(ltx),
0, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
read_row(ltx,
memtable,
rowkey,
1200, /*snapshot version*/
1, /*key*/
UNUSED_VALUE, /*value*/
false /*exist*/);
TRANS_LOG(INFO, "######## CASE6: txn1 commit, and txn2 lock row in memtable succeed");
commit_txn(ltx,
2000,/*commit_version*/
false/*need_write_back*/);
lock_tx(wtx2,
memtable,
2500, /*snapshot version*/
rowkey);
const int64_t wtx2_seq_no = ObSequence::get_max_seq_no();
verify_cb(get_tx_last_cb(wtx2),
memtable,
wtx2_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(ltx),
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
ltx->mvcc_acc_ctx_.tx_id_,
2000, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_LOCK,
1, /*key*/
UNUSED_VALUE /*value*/);
verify_tnode(get_tx_last_tnode(wtx2),
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx2->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx2_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_LOCK,
1, /*key*/
UNUSED_VALUE /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(wtx2),
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
get_tx_last_tnode(wtx2),
2000, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
read_row(wtx2,
memtable,
rowkey,
3000, /*snapshot version*/
1, /*key*/
UNUSED_VALUE, /*value*/
false /*exist*/);
read_row(memtable,
rowkey,
3000, /*snapshot version*/
1, /*key*/
UNUSED_VALUE, /*value*/
false /*exist*/);
TRANS_LOG(INFO, "######## CASE7: txn2 abort, and txn3 lock row in memtable succeed");
abort_txn(wtx2,
false/*need_write_back*/);
ObTransID write_tx_id3 = ObTransID(3);
ObStoreCtx *wtx3 = start_tx(write_tx_id3);
ObDatumRowkey rowkey3;
ObStoreRow write_row3;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
4, /*value*/
rowkey3,
write_row3));
write_tx(wtx3,
memtable,
4500, /*snapshot version*/
write_row3);
const int64_t wtx3_seq_no = ObSequence::get_max_seq_no();
verify_cb(get_tx_last_cb(wtx3),
memtable,
wtx3_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(wtx2),
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx2->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx2_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_ABORTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_LOCK,
1, /*key*/
UNUSED_VALUE /*value*/);
verify_tnode(get_tx_last_tnode(wtx3),
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx3->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx3_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_INSERT,
1, /*key*/
4 /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(wtx3),
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
get_tx_last_tnode(wtx3),
2000, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
read_row(wtx3,
memtable,
rowkey,
4500, /*snapshot version*/
1, /*key*/
4 /*value*/);
read_row(memtable,
rowkey,
3000, /*snapshot version*/
1, /*key*/
UNUSED_VALUE, /*value*/
false /*exist*/);
ObMvccRow *wtx3_last_row = get_tx_last_mvcc_row(wtx3);
ObMvccTransNode *wtx3_last_tnode = get_tx_last_tnode(wtx3);
commit_txn(wtx3,
5000,/*commit_version*/
true/*need_write_back*/);
verify_mvcc_row(wtx3_last_row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
wtx3_last_tnode,
5000, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
read_row(memtable,
rowkey,
6000, /*snapshot version*/
1, /*key*/
4 /*value*/);
memtable->destroy();
}
TEST_F(TestMemtableV2, test_sstable_lock)
{
ObMemtable *memtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: write row into memtable failed because of sstable lock");
is_sstable_contains_lock_ = true;
ObDatumRowkey rowkey;
ObStoreRow write_row;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
ObTransID write_tx_id = ObTransID(1);
ObStoreCtx *wtx = start_tx(write_tx_id);
write_tx(wtx,
memtable,
1000, /*snapshot version*/
write_row,
OB_TRY_LOCK_ROW_CONFLICT);
EXPECT_EQ(true, is_write_set_empty(wtx));
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
0, /*value*/
false /*exist*/);
is_sstable_contains_lock_ = false;
memtable->destroy();
}
TEST_F(TestMemtableV2, test_rollback_to)
{
ObMemtable *memtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: write row into memtable");
ObDatumRowkey rowkey;
ObStoreRow write_row;
ObStoreRow write_row2;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
3, /*value*/
rowkey,
write_row2));
ObTransID write_tx_id = ObTransID(1);
ObStoreCtx *wtx = start_tx(write_tx_id);
write_tx(wtx,
memtable,
1000, /*snapshot version*/
write_row);
const int64_t wtx_seq_no1 = ObSequence::get_max_seq_no();
write_tx(wtx,
memtable,
1000, /*snapshot version*/
write_row2);
const int64_t wtx_seq_no2 = ObSequence::get_max_seq_no();
print_callback(wtx);
verify_cb(get_tx_last_cb(wtx),
memtable,
wtx_seq_no2,
1, /*key*/
true /*is_link*/);
verify_tnode(get_tx_last_tnode(wtx),
get_tx_first_tnode(wtx), /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no2,
1, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_INSERT,
1, /*key*/
3 /*value*/);
verify_mvcc_row(get_tx_last_mvcc_row(wtx),
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
get_tx_last_tnode(wtx),
0, /*max_trans_version*/
2 /*total_trans_node_cnt*/);
read_row(wtx,
memtable,
rowkey,
2000, /*snapshot version*/
1, /*key*/
3 /*value*/);
TRANS_LOG(INFO, "######## CASE2: rollback the last tnode, and write write conflict");
rollback_to_txn(wtx,
wtx_seq_no2, /*from*/
wtx_seq_no1 + 1 /*to*/);
ObDatumRowkey rowkey3;
ObStoreRow write_row3;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
4, /*value*/
rowkey3,
write_row3));
ObTransID write_tx_id2 = ObTransID(2);
ObStoreCtx *wtx2 = start_tx(write_tx_id2);
write_tx(wtx2,
memtable,
3000, /*snapshot version*/
write_row3,
OB_TRY_LOCK_ROW_CONFLICT);
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
2 /*value*/);
memtable->destroy();
}
TEST_F(TestMemtableV2, test_replay)
{
ObMemtable *lmemtable = create_memtable();
ObMemtable *fmemtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: txn1 and txn3 write row in lmemtable");
ObDatumRowkey rowkey;
ObStoreRow write_row;
ObDatumRowkey rowkey2;
ObStoreRow write_row2;
ObDatumRowkey rowkey3;
ObStoreRow write_row3;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
3, /*value*/
rowkey2,
write_row2));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
4, /*value*/
rowkey3,
write_row3));
ObTransID write_tx_id3 = ObTransID(3);
ObStoreCtx *wtx3 = start_tx(write_tx_id3);
write_tx(wtx3,
lmemtable,
500, /*snapshot version*/
write_row3);
const int64_t wtx3_seq_no1 = ObSequence::get_max_seq_no();
commit_txn(wtx3,
800,/*commit_version*/
false/*need_write_back*/);
ObTransID write_tx_id = ObTransID(1);
ObStoreCtx *wtx = start_tx(write_tx_id);
write_tx(wtx,
lmemtable,
1000, /*snapshot version*/
write_row);
const int64_t wtx_seq_no1 = ObSequence::get_max_seq_no();
write_tx(wtx,
lmemtable,
1200, /*snapshot version*/
write_row2);
const int64_t wtx_seq_no2 = ObSequence::get_max_seq_no();
ObMemtableMutatorIterator mmi;
mock_replay_iterator(wtx, mmi);
commit_txn(wtx,
2000,/*commit_version*/
false/*need_write_back*/);
TRANS_LOG(INFO, "######## CASE2: txn2 replay row in fmemtable");
ObTransID replay_tx_id = ObTransID(2);
ObStoreCtx *ptx = start_tx(replay_tx_id, true);
replay_tx(ptx,
fmemtable,
1300, /*replay_scn*/
mmi);
read_row(ptx,
fmemtable,
rowkey,
1500, /*snapshot version*/
1, /*key*/
3 /*value*/);
ObMvccRowCallback *first_cb = (ObMvccRowCallback *)(get_tx_last_cb(ptx)->prev_);
verify_cb(get_tx_last_cb(ptx),
fmemtable,
wtx_seq_no2,
1, /*key*/
true, /*is_link*/
false,/*need_fill_redo*/
1300 /*scn*/);
verify_cb(first_cb,
fmemtable,
wtx_seq_no1,
1, /*key*/
true, /*is_link*/
false,/*need_fill_redo*/
1300 /*scn*/);
verify_tnode(get_tx_last_tnode(ptx),
first_cb->tnode_, /*prev tnode*/
NULL, /*next tnode*/
lmemtable,
ptx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no2,
2, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_INSERT,
1, /*key*/
3, /*value*/
1300 /*scn*/);
verify_tnode(first_cb->tnode_,
NULL, /*prev tnode*/
get_tx_last_tnode(ptx), /*next tnode*/
lmemtable,
ptx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no1,
1, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_INSERT,
1, /*key*/
2, /*value*/
1300 /*scn*/);
verify_mvcc_row(get_tx_last_mvcc_row(ptx),
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
get_tx_last_tnode(ptx),
0, /*max_trans_version*/
2 /*total_trans_node_cnt*/);
TRANS_LOG(INFO, "######## CASE3: txn4 replay row in fmemtable in reverse order");
ObMemtableMutatorIterator mmi3;
mock_replay_iterator(wtx3, mmi3);
ObTransID replay_tx_id4 = ObTransID(4);
ObStoreCtx *ptx4 = start_tx(replay_tx_id4, true/*for_replay*/);
replay_tx(ptx4,
fmemtable,
800, /*replay_scn*/
mmi3);
read_row(ptx,
fmemtable,
rowkey,
1500, /*snapshot version*/
1, /*key*/
3 /*value*/);
commit_txn(ptx,
2000,/*commit_version*/
false/*need_write_back*/);
commit_txn(ptx4,
800,/*commit_version*/
false/*need_write_back*/);
read_row(fmemtable,
rowkey,
3000, /*snapshot version*/
1, /*key*/
3 /*value*/);
read_row(fmemtable,
rowkey,
900, /*snapshot version*/
1, /*key*/
4 /*value*/);
verify_cb(get_tx_last_cb(ptx4),
fmemtable,
wtx3_seq_no1,
1, /*key*/
true, /*is_link*/
false,/*need_fill_redo*/
800 /*scn*/);
verify_tnode(get_tx_last_tnode(ptx4),
NULL, /*prev tnode*/
get_tx_first_tnode(ptx), /*next tnode*/
lmemtable,
ptx4->mvcc_acc_ctx_.tx_id_,
800, /*trans_version*/
wtx3_seq_no1,
0, /*modify_count*/
ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_INSERT,
1, /*key*/
4, /*value*/
800 /*scn*/);
verify_mvcc_row(get_tx_last_mvcc_row(ptx4),
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
get_tx_last_tnode(ptx),
2000, /*max_trans_version*/
3 /*total_trans_node_cnt*/);
lmemtable->destroy();
fmemtable->destroy();
}
TEST_F(TestMemtableV2, test_compact)
{
ObMemtable *lmemtable = create_memtable();
ObMemtable *fmemtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: txn1 and txn3 write row in lmemtable");
ObDatumRowkey rowkey;
ObStoreRow write_row;
ObDatumRowkey rowkey2;
ObStoreRow write_row2;
ObDatumRowkey rowkey3;
ObStoreRow write_row3;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
3, /*value*/
rowkey2,
write_row2));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
4, /*value*/
rowkey3,
write_row3));
ObTransID write_tx_id = ObTransID(1);
ObStoreCtx *wtx = start_tx(write_tx_id);
write_tx(wtx,
lmemtable,
1000, /*snapshot version*/
write_row);
const int64_t wtx_seq_no1 = ObSequence::get_max_seq_no();
write_tx(wtx,
lmemtable,
1200, /*snapshot version*/
write_row2);
const int64_t wtx_seq_no2 = ObSequence::get_max_seq_no();
ObMemtableMutatorIterator mmi;
mock_replay_iterator(wtx, mmi);
print_callback(wtx);
commit_txn(wtx,
2000,/*commit_version*/
false/*need_write_back*/);
ObTransID write_tx_id3 = ObTransID(3);
ObStoreCtx *wtx3 = start_tx(write_tx_id3);
write_tx(wtx3,
lmemtable,
2500, /*snapshot version*/
write_row3);
const int64_t wtx3_seq_no1 = ObSequence::get_max_seq_no();
commit_txn(wtx3,
3000,/*commit_version*/
false/*need_write_back*/);
ObMemtableMutatorIterator mmi3;
mock_replay_iterator(wtx3, mmi3);
TRANS_LOG(INFO, "######## CASE2: txn2 replay row in fmemtable");
ObTransID replay_tx_id = ObTransID(2);
ObStoreCtx *ptx = start_tx(replay_tx_id, true/*for_replay*/);
replay_tx(ptx,
fmemtable,
2000, /*replay_scn*/
mmi);
read_row(ptx,
fmemtable,
rowkey,
2400, /*snnapshot version*/
1, /*keny*/
3 /*value*/);
ObMvccRow *row = get_tx_last_mvcc_row(ptx);
ObMvccTransNode *ptx_first_tnode = get_tx_first_tnode(ptx);
ObMvccTransNode *ptx_last_tnode = get_tx_last_tnode(ptx);
commit_txn(ptx,
2000,/*commit_version*/
true/*need_write_back*/);
compact_row(row, fmemtable, 2400, true/*for_replay*/);
verify_tnode(ptx_last_tnode,
ptx_first_tnode, /*prev tnode*/
row->latest_compact_node_, /*next tnode*/
lmemtable,
ptx->mvcc_acc_ctx_.tx_id_,
2000, /*trans_version*/
wtx_seq_no2,
1, /*modify_count*/
ObMvccTransNode::F_COMMITTED,
ObDmlFlag::DF_INSERT,
1, /*key*/
3, /*value*/
2000 /*scn*/);
verify_tnode(row->latest_compact_node_,
ptx_last_tnode,/*prev tnode*/
NULL, /*next tnode*/
lmemtable,
ptx->mvcc_acc_ctx_.tx_id_,
2000, /*trans_version*/
wtx_seq_no2,
1, /*modify_count*/
ObMvccTransNode::F_COMMITTED,
ObDmlFlag::DF_INSERT,
1, /*key*/
3, /*value*/
2000, /*scn*/
NDT_COMPACT);
verify_mvcc_row(row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
row->latest_compact_node_, /*list_head*/
2000, /*max_trans_version*/
2 /*total_trans_node_cnt*/);
TRANS_LOG(INFO, "######## CASE2: txn4 replay uncommitted row in lmemtable and compacted");
ObTransID replay_tx_id4 = ObTransID(4);
ObStoreCtx *ptx4 = start_tx(replay_tx_id4, true/*for_replay*/);
replay_tx(ptx4,
fmemtable,
3000, /*replay_scn*/
mmi3);
read_row(fmemtable,
rowkey,
2900, /*snapshot version*/
1, /*key*/
3 /*value*/);
compact_row(row, fmemtable, 2500, true/*for_replay*/);
verify_cb(get_tx_last_cb(ptx4),
fmemtable,
wtx3_seq_no1,
1, /*key*/
true, /*is_link*/
false,/*need_fill_redo*/
3000 /*scn*/);
verify_tnode(get_tx_last_tnode(ptx4),
row->latest_compact_node_, /*prev tnode*/
NULL, /*next tnode*/
lmemtable,
ptx4->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx3_seq_no1,
2, /*modify_count*/
ObMvccTransNode::F_INIT,
ObDmlFlag::DF_INSERT,
1, /*key*/
4, /*value*/
3000 /*scn*/);
verify_tnode(row->latest_compact_node_,
ptx_last_tnode, /*prev tnode*/
get_tx_last_tnode(ptx4), /*next tnode*/
lmemtable,
ptx->mvcc_acc_ctx_.tx_id_,
2000, /*trans_version*/
wtx_seq_no2,
1, /*modify_count*/
ObMvccTransNode::F_COMMITTED,
ObDmlFlag::DF_INSERT,
1, /*key*/
3, /*value*/
2000, /*scn*/
NDT_COMPACT);
verify_mvcc_row(get_tx_last_mvcc_row(ptx4),
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
get_tx_last_tnode(ptx4),
2000, /*max_trans_version*/
3 /*total_trans_node_cnt*/);
TRANS_LOG(INFO, "######## CASE3: commit txn4 in lmemtable and compacted");
ObMvccTransNode *prev_compact_node = row->latest_compact_node_;
ObMvccRowCallback *ptx4_cb = get_tx_last_cb(ptx4);
ObMvccTransNode *ptx4_tnode = get_tx_last_tnode(ptx4);
commit_txn(ptx4,
3000,/*commit_version*/
true/*need_write_back*/);
read_row(fmemtable,
rowkey,
3500, /*snapshot version*/
1, /*key*/
4 /*value*/);
compact_row(row, fmemtable, 4000, true/*for_replay*/);
verify_tnode(ptx4_tnode,
prev_compact_node, /*prev tnode*/
row->latest_compact_node_, /*next tnode*/
lmemtable,
ptx4->mvcc_acc_ctx_.tx_id_,
3000, /*trans_version*/
wtx3_seq_no1,
2, /*modify_count*/
ObMvccTransNode::F_COMMITTED,
ObDmlFlag::DF_INSERT,
1, /*key*/
4, /*value*/
3000 /*scn*/);
verify_tnode(row->latest_compact_node_,
ptx4_tnode, /*prev tnode*/
NULL, /*next tnode*/
lmemtable,
ptx4->mvcc_acc_ctx_.tx_id_,
3000, /*trans_version*/
wtx3_seq_no1,
2, /*modify_count*/
ObMvccTransNode::F_COMMITTED,
ObDmlFlag::DF_INSERT,
1, /*key*/
4, /*value*/
3000, /*scn*/
NDT_COMPACT);
verify_mvcc_row(row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
row->latest_compact_node_,
3000, /*max_trans_version*/
3 /*total_trans_node_cnt*/);
lmemtable->destroy();
fmemtable->destroy();
}
TEST_F(TestMemtableV2, test_compact_v2)
{
ObMemtable *memtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: txn1 write two rows and txn3 write a row in memtable");
ObDatumRowkey rowkey;
ObStoreRow write_row;
ObDatumRowkey rowkey2;
ObStoreRow write_row2;
ObDatumRowkey rowkey3;
ObStoreRow write_row3;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
3, /*value*/
rowkey2,
write_row2));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
4, /*value*/
rowkey3,
write_row3));
ObTransID write_tx_id = ObTransID(1);
ObStoreCtx *wtx = start_tx(write_tx_id);
lock_tx(wtx,
memtable,
1000, /*snapshot version*/
rowkey);
const int64_t wtx_seq_no1 = ObSequence::get_max_seq_no();
ObMvccTransNode *wtx_first_tnode = get_tx_last_tnode(wtx);
ObMvccRow *row = get_tx_last_mvcc_row(wtx);
write_tx(wtx,
memtable,
1200, /*snapshot version*/
write_row);
const int64_t wtx_seq_no2 = ObSequence::get_max_seq_no();
ObMvccTransNode *wtx_second_tnode = get_tx_last_tnode(wtx);
write_tx(wtx,
memtable,
1300, /*snapshot version*/
write_row2);
const int64_t wtx_seq_no3 = ObSequence::get_max_seq_no();
ObMvccTransNode *wtx_third_tnode = get_tx_last_tnode(wtx);
print_callback(wtx);
commit_txn(wtx,
2000,/*commit_version*/
false/*need_write_back*/);
ObTransID write_tx_id3 = ObTransID(3);
ObStoreCtx *wtx3 = start_tx(write_tx_id3);
write_tx(wtx3,
memtable,
2500, /*snapshot version*/
write_row3);
const int64_t wtx3_seq_no1 = ObSequence::get_max_seq_no();
ObMvccTransNode *wtx3_first_tnode = get_tx_last_tnode(wtx3);
commit_txn(wtx3,
3000,/*commit_version*/
false/*need_write_back*/);
verify_tnode(wtx_first_tnode,
NULL, /*prev tnode*/
wtx_second_tnode, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no1,
0, /*modify_count*/
ObMvccTransNode::F_INIT | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_LOCK,
1, /*key*/
UNUSED_VALUE /*value*/);
verify_tnode(wtx_second_tnode,
wtx_first_tnode, /*prev tnode*/
wtx_third_tnode, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no2,
1, /*modify_count*/
ObMvccTransNode::F_INIT | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
wtx3_first_tnode,
2000, /*max_trans_version*/
4 /*total_trans_node_cnt*/);
compact_row(row, memtable, 2400, false/*for_replay*/);
verify_tnode(wtx_first_tnode,
NULL, /*prev tnode*/
wtx_second_tnode, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
2000, /*trans_version*/
wtx_seq_no1,
0, /*modify_count*/
ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_LOCK,
1, /*key*/
UNUSED_VALUE /*value*/);
verify_tnode(wtx_second_tnode,
NULL, /*prev tnode*/
wtx_third_tnode, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
2000, /*trans_version*/
wtx_seq_no2,
1, /*modify_count*/
ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_INSERT,
1, /*key*/
2 /*value*/);
verify_tnode(wtx_third_tnode,
wtx_second_tnode, /*prev tnode*/
row->latest_compact_node_, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
2000, /*trans_version*/
wtx_seq_no3,
2, /*modify_count*/
ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_INSERT,
1, /*key*/
3 /*value*/);
verify_tnode(row->latest_compact_node_,
wtx_third_tnode, /*prev tnode*/
wtx3_first_tnode, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
2000, /*trans_version*/
wtx_seq_no3,
2, /*modify_count*/
ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
ObDmlFlag::DF_INSERT,
1, /*key*/
3, /*value*/
INT64_MAX,
NDT_COMPACT);
verify_mvcc_row(row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
wtx3_first_tnode,
2000, /*max_trans_version*/
3 /*total_trans_node_cnt*/);
memtable->destroy();
}
TEST_F(TestMemtableV2, test_compact_v3)
{
ObMemtable *lmemtable = create_memtable();
ObMemtable *fmemtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: txn1 write two row and txn2 write row in lmemtable");
ObDatumRowkey rowkey;
ObStoreRow write_row;
ObDatumRowkey rowkey2;
ObStoreRow write_row2;
ObDatumRowkey rowkey3;
ObStoreRow write_row3;
ObDatumRowkey rowkey4;
ObStoreRow write_row4;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
3, /*value*/
rowkey2,
write_row2));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
4, /*value*/
rowkey3,
write_row3));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
5, /*value*/
rowkey4,
write_row4));
ObTransID write_tx_id = ObTransID(1);
ObStoreCtx *wtx = start_tx(write_tx_id);
write_tx(wtx,
lmemtable,
1000, /*snapshot version*/
write_row);
write_tx(wtx,
lmemtable,
1200, /*snapshot version*/
write_row2);
ObMemtableMutatorIterator mmi;
mock_replay_iterator(wtx, mmi);
abort_txn(wtx,
false/*need_write_back*/);
ObTransID write_tx_id2 = ObTransID(2);
ObStoreCtx *wtx2 = start_tx(write_tx_id2);
lock_tx(wtx2,
lmemtable,
2200, /*snapshot version*/
rowkey3);
print_callback(wtx2);
ObMemtableMutatorIterator mmi2;
mock_replay_iterator(wtx2, mmi2);
commit_txn(wtx2,
3000,/*commit_version*/
false/*need_write_back*/);
ObTransID write_tx_id5 = ObTransID(5);
ObStoreCtx *wtx5 = start_tx(write_tx_id5);
lock_tx(wtx5,
lmemtable,
3200, /*snapshot version*/
rowkey4);
ObMemtableMutatorIterator mmi3;
mock_replay_iterator(wtx5, mmi3);
TRANS_LOG(INFO, "######## CASE2: txn3, txn4 replay rows in fmemtable in reverse order");
ObTransID replay_tx_id3 = ObTransID(3);
ObStoreCtx *ptx3 = start_tx(replay_tx_id3, true/*for_replay*/);
ObTransID replay_tx_id4 = ObTransID(4);
ObStoreCtx *ptx4 = start_tx(replay_tx_id4, true/*for_replay*/);
ObTransID replay_tx_id6 = ObTransID(6);
ObStoreCtx *ptx6 = start_tx(replay_tx_id6, true/*for_replay*/);
replay_tx(ptx6,
fmemtable,
3500, /*replay_scn*/
mmi3);
replay_tx(ptx3,
fmemtable,
3000, /*replay_scn*/
mmi2);
replay_tx(ptx4,
fmemtable,
2000, /*replay_scn*/
mmi);
ObMvccTransNode *node = get_tx_last_tnode(ptx3);
((ObMemtableDataHeader *)(node->buf_))->dml_flag_ = blocksstable::ObDmlFlag::DF_INSERT;
ObMvccTransNode *node2 = get_tx_last_tnode(ptx6);
((ObMemtableDataHeader *)(node2->buf_))->dml_flag_ = blocksstable::ObDmlFlag::DF_INSERT;
ObMvccRow *row = get_tx_last_mvcc_row(ptx3);
row->print_row();
commit_txn(ptx3,
3000,/*commit_version*/
true/*need_write_back*/);
commit_txn(ptx6,
3600,/*commit_version*/
true/*need_write_back*/);
abort_txn(ptx4,
false/*need_write_back*/);
((ObMemtableDataHeader *)(node->buf_))->dml_flag_ = blocksstable::ObDmlFlag::DF_LOCK;
((ObMemtableDataHeader *)(node2->buf_))->dml_flag_ = blocksstable::ObDmlFlag::DF_LOCK;
row->print_row();
compact_row(row, fmemtable, 4000, true/*for_replay*/);
row->print_row();
ObMvccTransNode *compact_node = row->latest_compact_node_;
EXPECT_EQ(NULL, compact_node);
verify_mvcc_row(row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
node2,
3600, /*max_trans_version*/
2 /*total_trans_node_cnt*/);
read_row(fmemtable,
rowkey,
4500, /*snapshot version*/
1, /*key*/
4, /*value*/
false /*exist*/);
lmemtable->destroy();
fmemtable->destroy();
}
TEST_F(TestMemtableV2, test_dml_flag)
{
ObMemtable *lmemtable = create_memtable();
ObMemtable *fmemtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: txns write and row in lmemtable, test its dml flag");
ObDatumRowkey rowkey;
ObStoreRow write_row1;
ObDatumRowkey rowkey2;
ObStoreRow write_row2;
ObDatumRowkey rowkey3;
ObStoreRow write_row3;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row1));
EXPECT_EQ(OB_SUCCESS, mock_delete(1, /*key*/
rowkey2,
write_row2));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
4, /*value*/
rowkey3,
write_row3));
ObTransID write_tx_id1 = ObTransID(1);
ObStoreCtx *wtx1 = start_tx(write_tx_id1);
write_tx(wtx1,
lmemtable,
400, /*snapshot version*/
write_row1);
const int64_t wtx1_seq_no1 = ObSequence::get_max_seq_no();
ObMvccRow *wtx1_row = get_tx_last_mvcc_row(wtx1);
ObMvccTransNode *wtx1_tnode1 = get_tx_last_tnode(wtx1);
verify_mvcc_row(wtx1_row,
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
wtx1_tnode1,
0, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
ObMemtableMutatorIterator mmi1;
mock_replay_iterator(wtx1, mmi1);
commit_txn(wtx1,
800,/*commit_version*/
true/*need_write_back*/);
verify_mvcc_row(wtx1_row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
wtx1_tnode1,
800, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
ObTransID write_tx_id2 = ObTransID(2);
ObStoreCtx *wtx2 = start_tx(write_tx_id2);
lock_tx(wtx2,
lmemtable,
1000, /*snapshot version*/
rowkey);
const int64_t wtx2_seq_no1 = ObSequence::get_max_seq_no();
ObMemtableMutatorIterator mmi2;
mock_replay_iterator(wtx2, mmi2);
commit_txn(wtx2,
1200,/*commit_version*/
true/*need_write_back*/);
verify_mvcc_row(wtx1_row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
wtx1_tnode1,
1200, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
ObTransID write_tx_id3 = ObTransID(3);
ObStoreCtx *wtx3 = start_tx(write_tx_id3);
write_tx(wtx3,
lmemtable,
1400, /*snapshot version*/
write_row2);
const int64_t wtx3_seq_no1 = ObSequence::get_max_seq_no();
ObMvccTransNode *wtx3_tnode1 = get_tx_last_tnode(wtx3);
ObMemtableMutatorIterator mmi3;
mock_replay_iterator(wtx3, mmi3);
commit_txn(wtx3,
1600,/*commit_version*/
true/*need_write_back*/);
verify_mvcc_row(wtx1_row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_DELETE,
wtx3_tnode1,
1600, /*max_trans_version*/
2 /*total_trans_node_cnt*/);
TRANS_LOG(INFO, "######## CASE2: txns replay row in fmemtable and test dml flag");
ObTransID replay_tx_id1 = ObTransID(4);
ObStoreCtx *ptx1 = start_tx(replay_tx_id1, true);
ObTransID replay_tx_id2 = ObTransID(5);
ObStoreCtx *ptx2 = start_tx(replay_tx_id2, true);
ObTransID replay_tx_id3 = ObTransID(6);
ObStoreCtx *ptx3 = start_tx(replay_tx_id3, true);
replay_tx(ptx2,
fmemtable,
1200, /*replay_scn*/
mmi2);
ObMvccRow *ptx2_row = get_tx_last_mvcc_row(ptx2);
ObMvccTransNode *ptx2_tnode1 = get_tx_last_tnode(ptx2);
commit_txn(ptx2,
1200,/*commit_version*/
true/*need_write_back*/);
verify_mvcc_row(ptx2_row,
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
NULL,
1200, /*max_trans_version*/
0 /*total_trans_node_cnt*/);
replay_tx(ptx3,
fmemtable,
1600, /*replay_scn*/
mmi3);
ObMvccRow *ptx3_row = get_tx_last_mvcc_row(ptx3);
ObMvccTransNode *ptx3_tnode1 = get_tx_last_tnode(ptx3);
commit_txn(ptx3,
1600,/*commit_version*/
true/*need_write_back*/);
verify_mvcc_row(ptx3_row,
ObDmlFlag::DF_DELETE,
ObDmlFlag::DF_DELETE,
ptx3_tnode1,
1600, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
replay_tx(ptx1,
fmemtable,
800, /*replay_scn*/
mmi1);
ObMvccRow *ptx1_row = get_tx_last_mvcc_row(ptx1);
ObMvccTransNode *ptx1_tnode1 = get_tx_last_tnode(ptx1);
commit_txn(ptx1,
800,/*commit_version*/
true/*need_write_back*/);
verify_mvcc_row(ptx1_row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_DELETE,
ptx3_tnode1,
1600, /*max_trans_version*/
2 /*total_trans_node_cnt*/);
lmemtable->destroy();
fmemtable->destroy();
}
TEST_F(TestMemtableV2, test_fast_commit)
{
ObMemtable *memtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: write row into memtable and fast commit, then check result is ok");
ObDatumRowkey rowkey;
ObStoreRow write_row;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
ObTransID write_tx_id = ObTransID(1);
ObTransID write_tx_id2 = ObTransID(2);
ObStoreCtx *wtx = start_tx(write_tx_id);
ObStoreCtx *wtx2 = start_tx(write_tx_id2);
write_tx(wtx,
memtable,
1000, /*snapshot version*/
write_row);
const int64_t wtx_seq_no = ObSequence::get_max_seq_no();
ObMvccRowCallback *wtx_cb = (ObMvccRowCallback *)(get_tx_last_cb(wtx));
ObMvccRow *wtx_row = get_tx_last_mvcc_row(wtx);
ObMvccTransNode *wtx_tnode = get_tx_last_tnode(wtx);
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
2 /*value*/);
fast_commit_txn(wtx);
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
2 /*value*/);
read_row(wtx2,
memtable,
rowkey,
1100, /*snapshot version*/
1, /*key*/
2, /*value*/
false /*exist*/);
prepare_txn(wtx, 1200 /*prepare_version*/);
read_row(memtable,
rowkey,
1800, /*snapshot version*/
1, /*key*/
2, /*value*/
false, /*exist*/
OB_ERR_SHARED_LOCK_CONFLICT,
1000000 /*expire_time*/);
verify_cb(wtx_cb,
memtable,
wtx_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(wtx_tnode,
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_DELAYED_CLEANOUT,
DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(wtx_row,
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
wtx_tnode,
0, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
commit_txn(wtx, 2000 /*commit_version*/);
read_row(wtx,
memtable,
rowkey,
3000, /*snapshot version*/
1, /*key*/
2 /*value*/);
verify_cb(wtx_cb,
memtable,
wtx_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(wtx_tnode,
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
2000, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_COMMITTED | ObMvccTransNode::F_DELAYED_CLEANOUT,
DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(wtx_row,
ObDmlFlag::DF_INSERT,
ObDmlFlag::DF_INSERT,
wtx_tnode,
2000, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
memtable->destroy();
}
TEST_F(TestMemtableV2, test_fast_commit_with_no_delay_cleanout)
{
ObMemtable *memtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: write row into memtable and not fast commit, then check result is ok");
ObDatumRowkey rowkey;
ObStoreRow write_row;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
ObTransID write_tx_id = ObTransID(1);
ObTransID write_tx_id2 = ObTransID(2);
ObStoreCtx *wtx = start_tx(write_tx_id);
ObStoreCtx *wtx2 = start_tx(write_tx_id2);
write_tx(wtx,
memtable,
1000, /*snapshot version*/
write_row);
const int64_t wtx_seq_no = ObSequence::get_max_seq_no();
ObMvccRowCallback *wtx_cb = (ObMvccRowCallback *)(get_tx_last_cb(wtx));
ObMvccRow *wtx_row = get_tx_last_mvcc_row(wtx);
ObMvccTransNode *wtx_tnode = get_tx_last_tnode(wtx);
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
2 /*value*/);
read_row(wtx,
memtable,
rowkey,
1000, /*snapshot version*/
1, /*key*/
2 /*value*/);
read_row(wtx2,
memtable,
rowkey,
1100, /*snapshot version*/
1, /*key*/
2, /*value*/
false /*exist*/);
prepare_txn(wtx, 1200 /*prepare_version*/);
read_row(memtable,
rowkey,
1800, /*snapshot version*/
1, /*key*/
2, /*value*/
false, /*exist*/
OB_ERR_SHARED_LOCK_CONFLICT,
1000000 /*expire_time*/);
verify_cb(wtx_cb,
memtable,
wtx_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(wtx_tnode,
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_INIT,
DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(wtx_row,
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
wtx_tnode,
0, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
// NB: we use the following code to mock the concurrency between tx_end and
// delay cleanout
ObPartTransCtx *tx_ctx = wtx->mvcc_acc_ctx_.tx_ctx_;
ObMemtableCtx *mt_ctx = wtx->mvcc_acc_ctx_.mem_ctx_;
tx_ctx->exec_info_.state_ = ObTxState::COMMIT;
share::SCN commit_scn;
commit_scn.convert_for_tx(2000);
tx_ctx->ctx_tx_data_.set_commit_version(commit_scn);
tx_ctx->ctx_tx_data_.set_state(ObTxData::COMMIT);
read_row(wtx,
memtable,
rowkey,
3000, /*snapshot version*/
1, /*key*/
2 /*value*/);
verify_cb(wtx_cb,
memtable,
wtx_seq_no,
1, /*key*/
true /*is_link*/);
verify_tnode(wtx_tnode,
NULL, /*prev tnode*/
NULL, /*next tnode*/
memtable,
wtx->mvcc_acc_ctx_.tx_id_,
INT64_MAX, /*trans_version*/
wtx_seq_no,
0, /*modify_count*/
ObMvccTransNode::F_INIT,
DF_INSERT,
1, /*key*/
2 /*value*/);
verify_mvcc_row(wtx_row,
ObDmlFlag::DF_NOT_EXIST,
ObDmlFlag::DF_NOT_EXIST,
wtx_tnode,
INT64_MAX, /*max_trans_version*/
1 /*total_trans_node_cnt*/);
memtable->destroy();
}
TEST_F(TestMemtableV2, test_seq_set_violation)
{
int ret = OB_SUCCESS;
ObMemtable *memtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: write row into memtable");
ObDatumRowkey rowkey;
ObStoreRow write_row;
ObStoreRow write_row2;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
3, /*value*/
rowkey,
write_row2));
ObTransID write_tx_id = ObTransID(1);
ObStoreCtx *wtx = start_tx(write_tx_id);
int64_t read_seq_no = ObSequence::get_max_seq_no();
share::SCN scn_3000;
scn_3000.convert_for_tx(3000);
start_pdml_stmt(wtx, scn_3000, read_seq_no, 1000000000/*expire_time*/);
EXPECT_EQ(OB_SUCCESS, (ret = memtable->set(*wtx,
tablet_id_.id(),
read_info_,
columns_,
write_row)));
start_pdml_stmt(wtx, scn_3000, read_seq_no, 1000000000/*expire_time*/);
EXPECT_EQ(OB_ERR_PRIMARY_KEY_DUPLICATE, (ret = memtable->set(*wtx,
tablet_id_.id(),
read_info_,
columns_,
write_row)));
memtable->destroy();
}
TEST_F(TestMemtableV2, test_parallel_lock_with_same_txn)
{
int ret = OB_SUCCESS;
ObMemtable *memtable = create_memtable();
TRANS_LOG(INFO, "######## CASE1: lock row into memtable parallelly");
ObDatumRowkey rowkey;
ObStoreRow write_row;
EXPECT_EQ(OB_SUCCESS, mock_row(1, /*key*/
2, /*value*/
rowkey,
write_row));
ObTransID write_tx_id = ObTransID(1);
ObStoreCtx *wtx = start_tx(write_tx_id);
share::SCN scn_1000;
scn_1000.convert_for_tx(1000);
// Step1: prepare the global sequence
ObSequence::inc();
int64_t read_seq_no = ObSequence::get_max_seq_no();
// Step2: init the mvcc acc ctx
wtx->mvcc_acc_ctx_.type_ = ObMvccAccessCtx::T::WRITE;
wtx->mvcc_acc_ctx_.snapshot_.tx_id_ = wtx->mvcc_acc_ctx_.tx_id_;
wtx->mvcc_acc_ctx_.snapshot_.version_ = scn_1000;
wtx->mvcc_acc_ctx_.snapshot_.scn_ = read_seq_no;
const int64_t abs_expire_time = 10000000000 + ::oceanbase::common::ObTimeUtility::current_time();
wtx->mvcc_acc_ctx_.abs_lock_timeout_ = abs_expire_time;
wtx->mvcc_acc_ctx_.tx_scn_ = ObSequence::inc_and_get_max_seq_no();
// Step3: lock for the first time
EXPECT_EQ(OB_SUCCESS, (ret = memtable->lock(*wtx,
tablet_id_.id(),
read_info_,
rowkey)));
// Step4: lock for the second time
wtx->mvcc_acc_ctx_.tx_scn_ = ObSequence::inc_and_get_max_seq_no();
EXPECT_EQ(OB_SUCCESS, (ret = memtable->lock(*wtx,
tablet_id_.id(),
read_info_,
rowkey)));
memtable->destroy();
}
} // namespace unittest
namespace storage
{
int ObTxDataTable::alloc_undo_status_node(ObUndoStatusNode *&undo_status_node)
{
int ret = OB_SUCCESS;
undo_status_node = new ObUndoStatusNode();
return ret;
}
int ObTxCtxTable::acquire_ref_(const ObLSID& ls_id)
{
int ret = OB_SUCCESS;
ls_tx_ctx_mgr_ = &unittest::TestMemtableV2::ls_tx_ctx_mgr_;
TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx table acquire ref", K(ls_id), K(this));
return ret;
}
int ObTxCtxTable::release_ref_()
{
int ret = OB_SUCCESS;
ls_tx_ctx_mgr_ = NULL;
TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx table release ref", K(this));
return ret;
}
} // namespace storage
namespace memtable
{
int ObMemtable::lock_row_on_frozen_stores_(ObStoreCtx &,
const ObTxNodeArg &,
const ObMemtableKey *,
ObMvccRow *,
const storage::ObTableReadInfo &read_info,
ObMvccWriteResult &)
{
if (unittest::TestMemtableV2::is_sstable_contains_lock_) {
return OB_TRY_LOCK_ROW_CONFLICT;
} else {
return OB_SUCCESS;
}
}
}
namespace transaction
{
int ObLSTxCtxMgr::init(const int64_t tenant_id,
const ObLSID &ls_id,
ObTxTable *tx_table,
ObLockTable *lock_table,
ObITsMgr *ts_mgr,
ObTransService *txs,
ObITxLogParam * param,
ObITxLogAdapter * log_adapter)
{
int ret = OB_SUCCESS;
UNUSED(log_adapter);
if (is_inited_) {
TRANS_LOG(WARN, "ObLSTxCtxMgr inited twice");
ret = OB_INIT_TWICE;
} else {
if (OB_FAIL(ls_tx_ctx_map_.init(lib::ObMemAttr(tenant_id, "LSTxCtxMgr")))) {
TRANS_LOG(WARN, "ls_tx_ctx_map_ init fail", KR(ret));
} else {
is_inited_ = true;
state_ = State::L_WORKING;
tenant_id_ = tenant_id;
ls_id_ = ls_id;
tx_table_ = tx_table;
lock_table_ = lock_table;
txs_ = txs;
ts_mgr_ = ts_mgr;
TRANS_LOG(INFO, "ObLSTxCtxMgr inited success", KP(this), K(ls_id));
}
}
return ret;
}
} // namespace transaction
} // namespace oceanbase
int main(int argc, char **argv)
{
system("rm -rf test_memtable.log*");
OB_LOGGER.set_file_name("test_memtable.log");
OB_LOGGER.set_log_level("INFO");
STORAGE_LOG(INFO, "begin unittest: test simple memtable");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}