fix fuse row cache problem with uncommited row across micro block
This commit is contained in:
@ -809,6 +809,7 @@ void ObMultiVersionMicroBlockRowScanner::reuse_prev_micro_row()
|
|||||||
}
|
}
|
||||||
prev_micro_row_.flag_ = ObActionFlag::OP_ROW_DOES_NOT_EXIST;
|
prev_micro_row_.flag_ = ObActionFlag::OP_ROW_DOES_NOT_EXIST;
|
||||||
prev_micro_row_.from_base_ = false;
|
prev_micro_row_.from_base_ = false;
|
||||||
|
prev_micro_row_.snapshot_version_ = 0;
|
||||||
cell_allocator_.reuse();
|
cell_allocator_.reuse();
|
||||||
|
|
||||||
reserved_pos_ = ObIMicroBlockReader::INVALID_ROW_INDEX;
|
reserved_pos_ = ObIMicroBlockReader::INVALID_ROW_INDEX;
|
||||||
@ -1089,6 +1090,7 @@ int ObMultiVersionMicroBlockRowScanner::cache_cur_micro_row(const bool found_fir
|
|||||||
prev_micro_row_.flag_ = cur_micro_row_.flag_;
|
prev_micro_row_.flag_ = cur_micro_row_.flag_;
|
||||||
prev_micro_row_.from_base_ = cur_micro_row_.from_base_;
|
prev_micro_row_.from_base_ = cur_micro_row_.from_base_;
|
||||||
prev_micro_row_.row_val_.count_ = cur_micro_row_.row_val_.count_;
|
prev_micro_row_.row_val_.count_ = cur_micro_row_.row_val_.count_;
|
||||||
|
prev_micro_row_.snapshot_version_ = cur_micro_row_.snapshot_version_;
|
||||||
}
|
}
|
||||||
// The positive scan scans from new to old (trans_version is negative), so cur_row is older than prev_row
|
// The positive scan scans from new to old (trans_version is negative), so cur_row is older than prev_row
|
||||||
// So just add the nop column (column for which the value has not been decided)
|
// So just add the nop column (column for which the value has not been decided)
|
||||||
|
@ -14,12 +14,16 @@
|
|||||||
#define private public
|
#define private public
|
||||||
#define protected public
|
#define protected public
|
||||||
#include "ob_multi_version_sstable_test.h"
|
#include "ob_multi_version_sstable_test.h"
|
||||||
|
#include "ob_uncommitted_trans_test.h"
|
||||||
|
#include "storage/memtable/ob_memtable.h"
|
||||||
|
#include "storage/memtable/ob_memtable_context.h"
|
||||||
|
|
||||||
namespace oceanbase {
|
namespace oceanbase {
|
||||||
using namespace common;
|
using namespace common;
|
||||||
using namespace blocksstable;
|
using namespace blocksstable;
|
||||||
using namespace storage;
|
using namespace storage;
|
||||||
using namespace share::schema;
|
using namespace share::schema;
|
||||||
|
using namespace oceanbase::memtable;
|
||||||
|
|
||||||
namespace unittest {
|
namespace unittest {
|
||||||
|
|
||||||
@ -40,6 +44,7 @@ public:
|
|||||||
part = pkey;
|
part = pkey;
|
||||||
trans_service_.part_trans_ctx_mgr_.mgr_cache_.set(pkey.hash(), &ctx_mgr_);
|
trans_service_.part_trans_ctx_mgr_.mgr_cache_.set(pkey.hash(), &ctx_mgr_);
|
||||||
ObPartitionService::get_instance().txs_ = &trans_service_;
|
ObPartitionService::get_instance().txs_ = &trans_service_;
|
||||||
|
store_ctx_.trans_table_guard_ = new transaction::ObTransStateTableGuard();
|
||||||
}
|
}
|
||||||
virtual void TearDown()
|
virtual void TearDown()
|
||||||
{
|
{
|
||||||
@ -63,7 +68,10 @@ public:
|
|||||||
ObStoreCtx store_ctx_;
|
ObStoreCtx store_ctx_;
|
||||||
transaction::ObTransService trans_service_;
|
transaction::ObTransService trans_service_;
|
||||||
transaction::ObPartitionTransCtxMgr ctx_mgr_;
|
transaction::ObPartitionTransCtxMgr ctx_mgr_;
|
||||||
|
TestUncommittedMinorMergeScan test_trans_part_ctx_;
|
||||||
|
static ObMemtableCtxFactory fty_;
|
||||||
};
|
};
|
||||||
|
ObMemtableCtxFactory TestMultiVersionSSTableSingleGet::fty_;
|
||||||
|
|
||||||
void TestMultiVersionSSTableSingleGet::prepare_query_param(
|
void TestMultiVersionSSTableSingleGet::prepare_query_param(
|
||||||
const ObVersionRange& version_range, const bool is_reverse_scan)
|
const ObVersionRange& version_range, const bool is_reverse_scan)
|
||||||
@ -112,6 +120,10 @@ void TestMultiVersionSSTableSingleGet::prepare_query_param(
|
|||||||
context_.block_cache_ws_ = &block_cache_ws_;
|
context_.block_cache_ws_ = &block_cache_ws_;
|
||||||
context_.trans_version_range_ = version_range;
|
context_.trans_version_range_ = version_range;
|
||||||
context_.is_inited_ = true;
|
context_.is_inited_ = true;
|
||||||
|
store_ctx_.trans_table_guard_->set_trans_state_table(&test_trans_part_ctx_);
|
||||||
|
store_ctx_.snapshot_info_.snapshot_version_ = 10000;
|
||||||
|
store_ctx_.mem_ctx_ = fty_.alloc();
|
||||||
|
store_ctx_.mem_ctx_->trans_begin();
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(TestMultiVersionSSTableSingleGet, exist)
|
TEST_F(TestMultiVersionSSTableSingleGet, exist)
|
||||||
@ -606,6 +618,90 @@ TEST_F(TestMultiVersionSSTableSingleGet, empty_get)
|
|||||||
// ASSERT_EQ(1, meta->empty_read_cnt_[2]);
|
// ASSERT_EQ(1, meta->empty_read_cnt_[2]);
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
TEST_F(TestMultiVersionSSTableSingleGet, across_micro_uncommit)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const int64_t rowkey_cnt = 4;
|
||||||
|
const char *micro_data[2];
|
||||||
|
micro_data[0] = "bigint var bigint bigint bigint bigint flag multi_version_row_flag\n"
|
||||||
|
"2 var3 MIN -10 NOP 12 EXIST U\n"
|
||||||
|
"2 var3 MIN -9 NOP 11 EXIST LU\n"
|
||||||
|
"3 var3 MIN -10 NOP 12 EXIST U\n"
|
||||||
|
"3 var3 MIN -9 NOP 11 EXIST U\n"
|
||||||
|
"3 var3 MIN -8 NOP 9 EXIST U\n";
|
||||||
|
micro_data[1] = "bigint var bigint bigint bigint bigint flag multi_version_row_flag\n"
|
||||||
|
"3 var3 MIN -7 NOP 8 EXIST U\n"
|
||||||
|
"3 var3 MIN -6 6 10 EXIST U\n"
|
||||||
|
"3 var3 MIN -5 5 9 EXIST LU\n";
|
||||||
|
prepare_data_start(micro_data, rowkey_cnt, 18, "none", FLAT_ROW_STORE, 0);
|
||||||
|
prepare_one_macro(micro_data, 2);
|
||||||
|
prepare_data_end();
|
||||||
|
|
||||||
|
test_trans_part_ctx_.clear_all();
|
||||||
|
if (OB_FAIL(
|
||||||
|
test_trans_part_ctx_.set_all_transaction_status(transaction::ObTransTableStatusType::RUNNING, INT64_MAX))) {
|
||||||
|
STORAGE_LOG(ERROR, "add transaction status failed", K(ret));
|
||||||
|
}
|
||||||
|
|
||||||
|
ObMockIterator res_iter;
|
||||||
|
ObMockIterator rowkey_iter;
|
||||||
|
ObStoreRowIterator *getter = NULL;
|
||||||
|
const ObStoreRow *row = NULL;
|
||||||
|
ObExtStoreRowkey ext_rowkey;
|
||||||
|
|
||||||
|
ObVersionRange version_range;
|
||||||
|
version_range.snapshot_version_ = 20;
|
||||||
|
version_range.base_version_ = 0;
|
||||||
|
version_range.multi_version_start_ = 0;
|
||||||
|
prepare_query_param(version_range);
|
||||||
|
context_.query_flag_.read_latest_ = common::ObQueryFlag::OBSF_MASK_READ_LATEST;
|
||||||
|
|
||||||
|
const char *rowkey2 = "bigint var flag multi_version_row_flag\n"
|
||||||
|
"2 var3 EXIST N\n";
|
||||||
|
const char *result2 = "bigint var bigint bigint flag multi_version_row_flag\n"
|
||||||
|
"2 var3 NOP 12 EXIST N\n";
|
||||||
|
|
||||||
|
ext_rowkey.reset();
|
||||||
|
rowkey_iter.reset();
|
||||||
|
OK(rowkey_iter.from(rowkey2));
|
||||||
|
OK(rowkey_iter.get_row(0, row));
|
||||||
|
ASSERT_TRUE(NULL != row);
|
||||||
|
ObSSTableTest::convert_rowkey(ObStoreRowkey(row->row_val_.cells_, rowkey_cnt - 2), ext_rowkey, allocator_);
|
||||||
|
OK(sstable_.get(param_, context_, ext_rowkey, getter));
|
||||||
|
res_iter.reset();
|
||||||
|
OK(res_iter.from(result2));
|
||||||
|
ASSERT_TRUE(res_iter.equals(*getter));
|
||||||
|
getter->~ObStoreRowIterator();
|
||||||
|
OK(sstable_.get(param_, context_, ext_rowkey, getter));
|
||||||
|
OK(getter->get_next_row(row));
|
||||||
|
STORAGE_LOG(INFO, "print row2", KPC(row));
|
||||||
|
ASSERT_TRUE(INT64_MAX == row->snapshot_version_);
|
||||||
|
getter->~ObStoreRowIterator();
|
||||||
|
|
||||||
|
const char *rowkey1 = "bigint var flag multi_version_row_flag\n"
|
||||||
|
"3 var3 EXIST N\n";
|
||||||
|
const char *result1 = "bigint var bigint bigint flag multi_version_row_flag\n"
|
||||||
|
"3 var3 6 12 EXIST N\n";
|
||||||
|
|
||||||
|
ext_rowkey.reset();
|
||||||
|
rowkey_iter.reset();
|
||||||
|
OK(rowkey_iter.from(rowkey1));
|
||||||
|
OK(rowkey_iter.get_row(0, row));
|
||||||
|
ASSERT_TRUE(NULL != row);
|
||||||
|
ObSSTableTest::convert_rowkey(ObStoreRowkey(row->row_val_.cells_, rowkey_cnt - 2), ext_rowkey, allocator_);
|
||||||
|
context_.query_flag_.read_latest_ = common::ObQueryFlag::OBSF_MASK_READ_LATEST;
|
||||||
|
OK(sstable_.get(param_, context_, ext_rowkey, getter));
|
||||||
|
res_iter.reset();
|
||||||
|
OK(res_iter.from(result1));
|
||||||
|
ASSERT_TRUE(res_iter.equals(*getter));
|
||||||
|
getter->~ObStoreRowIterator();
|
||||||
|
OK(sstable_.get(param_, context_, ext_rowkey, getter));
|
||||||
|
OK(getter->get_next_row(row));
|
||||||
|
STORAGE_LOG(INFO, "print row", KPC(row));
|
||||||
|
ASSERT_TRUE(INT64_MAX == row->snapshot_version_);
|
||||||
|
getter->~ObStoreRowIterator();
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace unittest
|
} // namespace unittest
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user