[CP] [FIX] handle 4018 when parallel dump tx data memtable
This commit is contained in:
parent
b6e830cb4f
commit
3baaf656a8
@ -48,7 +48,7 @@ using namespace palf;
|
||||
|
||||
namespace storage
|
||||
{
|
||||
class TestTabletFreezeForDirectLoad;
|
||||
class TestTxDataTable;
|
||||
class MockTxDataTable;
|
||||
class MockTxTable;
|
||||
static const uint64_t TEST_TENANT_ID = 1;
|
||||
@ -111,7 +111,7 @@ public:
|
||||
|
||||
class MockTxDataTable : public ObTxDataTable
|
||||
{
|
||||
friend TestTabletFreezeForDirectLoad;
|
||||
friend TestTxDataTable;
|
||||
|
||||
public:
|
||||
MockTxDataTable() : ObTxDataTable() {}
|
||||
@ -154,11 +154,11 @@ public:
|
||||
MockTxTable(MockTxDataTable *tx_data_table) : ObTxTable(*tx_data_table) {}
|
||||
};
|
||||
|
||||
class TestTabletFreezeForDirectLoad : public ::testing::Test
|
||||
class TestTxDataTable : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestTabletFreezeForDirectLoad() : tx_table_(&tx_data_table_) {}
|
||||
virtual ~TestTabletFreezeForDirectLoad() {}
|
||||
TestTxDataTable() : tx_table_(&tx_data_table_) {}
|
||||
virtual ~TestTxDataTable() {}
|
||||
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
@ -221,7 +221,7 @@ public:
|
||||
ObArenaAllocator allocator_;
|
||||
};
|
||||
|
||||
void TestTabletFreezeForDirectLoad::make_freezing_to_frozen_(ObTxDataMemtableMgr *memtable_mgr)
|
||||
void TestTxDataTable::make_freezing_to_frozen_(ObTxDataMemtableMgr *memtable_mgr)
|
||||
{
|
||||
ObArray<ObTableHandleV2> memtables;
|
||||
ObTxDataMemtable *memtable = nullptr;
|
||||
@ -234,7 +234,7 @@ void TestTabletFreezeForDirectLoad::make_freezing_to_frozen_(ObTxDataMemtableMgr
|
||||
}
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::insert_tx_data_()
|
||||
void TestTxDataTable::insert_tx_data_()
|
||||
{
|
||||
insert_start_scn.convert_for_logservice(ObTimeUtil::current_time_ns());
|
||||
ObTxData *tx_data = nullptr;
|
||||
@ -273,7 +273,7 @@ void TestTabletFreezeForDirectLoad::insert_tx_data_()
|
||||
}
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::insert_rollback_tx_data_()
|
||||
void TestTxDataTable::insert_rollback_tx_data_()
|
||||
{
|
||||
auto tx_id = transaction::ObTransID(INT64_MAX-2);
|
||||
share::SCN max_end_scn = share::SCN::min_scn();
|
||||
@ -306,7 +306,7 @@ void TestTabletFreezeForDirectLoad::insert_rollback_tx_data_()
|
||||
}
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::insert_abort_tx_data_()
|
||||
void TestTxDataTable::insert_abort_tx_data_()
|
||||
{
|
||||
insert_start_scn.convert_for_logservice(ObTimeUtil::current_time_ns());
|
||||
ObTxData *tx_data = nullptr;
|
||||
@ -328,7 +328,7 @@ void TestTabletFreezeForDirectLoad::insert_abort_tx_data_()
|
||||
ASSERT_EQ(OB_SUCCESS, tx_data_table_.insert(tx_data));
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::generate_past_commit_version_(ObCommitVersionsArray &past_commit_versions)
|
||||
void TestTxDataTable::generate_past_commit_version_(ObCommitVersionsArray &past_commit_versions)
|
||||
{
|
||||
share::SCN start_scn = share::SCN::minus(insert_start_scn, 300LL * ONE_SEC_NS);
|
||||
share::SCN commit_version = share::SCN::plus(start_scn, 2LL * ONE_SEC_NS);
|
||||
@ -339,7 +339,7 @@ void TestTabletFreezeForDirectLoad::generate_past_commit_version_(ObCommitVersio
|
||||
}
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::set_freezer_()
|
||||
void TestTxDataTable::set_freezer_()
|
||||
{
|
||||
ObLSID ls_id(1);
|
||||
ObLSWRSHandler ls_loop_worker;
|
||||
@ -351,7 +351,7 @@ void TestTabletFreezeForDirectLoad::set_freezer_()
|
||||
tx_data_table_.freezer_.init(&ls_);
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::init_memtable_mgr_(ObTxDataMemtableMgr *memtable_mgr)
|
||||
void TestTxDataTable::init_memtable_mgr_(ObTxDataMemtableMgr *memtable_mgr)
|
||||
{
|
||||
ASSERT_NE(nullptr, memtable_mgr);
|
||||
memtable_mgr->set_freezer(&tx_data_table_.freezer_);
|
||||
@ -359,7 +359,7 @@ void TestTabletFreezeForDirectLoad::init_memtable_mgr_(ObTxDataMemtableMgr *memt
|
||||
ASSERT_EQ(1, memtable_mgr->get_memtable_count_());
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::check_freeze_(ObTxDataMemtableMgr *memtable_mgr,
|
||||
void TestTxDataTable::check_freeze_(ObTxDataMemtableMgr *memtable_mgr,
|
||||
ObTxDataMemtable *&freezing_memtable,
|
||||
ObTxDataMemtable *&active_memtable)
|
||||
{
|
||||
@ -383,7 +383,7 @@ void TestTabletFreezeForDirectLoad::check_freeze_(ObTxDataMemtableMgr *memtable_
|
||||
freezing_memtable->set_state(ObTxDataMemtable::State::FROZEN);
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::do_basic_test()
|
||||
void TestTxDataTable::do_basic_test()
|
||||
{
|
||||
// init tx data table
|
||||
ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_));
|
||||
@ -457,7 +457,7 @@ void TestTabletFreezeForDirectLoad::do_basic_test()
|
||||
memtable_mgr->offline();
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::do_undo_status_test()
|
||||
void TestTxDataTable::do_undo_status_test()
|
||||
{
|
||||
// init tx data table
|
||||
ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_));
|
||||
@ -516,7 +516,7 @@ void TestTabletFreezeForDirectLoad::do_undo_status_test()
|
||||
}
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::test_serialize_with_action_cnt_(int cnt)
|
||||
void TestTxDataTable::test_serialize_with_action_cnt_(int cnt)
|
||||
{
|
||||
ObTxData *tx_data = nullptr;
|
||||
ObTxDataGuard tx_data_guard;
|
||||
@ -562,7 +562,7 @@ void TestTabletFreezeForDirectLoad::test_serialize_with_action_cnt_(int cnt)
|
||||
}
|
||||
|
||||
|
||||
void TestTabletFreezeForDirectLoad::do_tx_data_serialize_test()
|
||||
void TestTxDataTable::do_tx_data_serialize_test()
|
||||
{
|
||||
ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_));
|
||||
ObTxDataMemtableMgr *memtable_mgr = tx_data_table_.get_memtable_mgr_();
|
||||
@ -578,7 +578,7 @@ void TestTabletFreezeForDirectLoad::do_tx_data_serialize_test()
|
||||
memtable_mgr->offline();
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::test_commit_versions_serialize_()
|
||||
void TestTxDataTable::test_commit_versions_serialize_()
|
||||
{
|
||||
ObCommitVersionsArray cur_array;
|
||||
ObCommitVersionsArray past_array;
|
||||
@ -637,7 +637,7 @@ void TestTabletFreezeForDirectLoad::test_commit_versions_serialize_()
|
||||
ASSERT_EQ(s_pos, d_pos);
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::do_repeat_insert_test() {
|
||||
void TestTxDataTable::do_repeat_insert_test() {
|
||||
ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_));
|
||||
|
||||
set_freezer_();
|
||||
@ -650,10 +650,10 @@ void TestTabletFreezeForDirectLoad::do_repeat_insert_test() {
|
||||
insert_start_scn.convert_for_logservice(ObTimeUtil::current_time_ns());
|
||||
|
||||
ObTxData *tx_data = nullptr;
|
||||
int64_t int_tx_id = 0;
|
||||
transaction::ObTransID tx_id;
|
||||
|
||||
for (int i = 1; i <= 100; i++) {
|
||||
// 插入10000个tx data,但都是同一个txid的rollback to,所以最后只会保留一个
|
||||
for (int i = 1; i <= 10000; i++) {
|
||||
tx_id = transaction::ObTransID(269381);
|
||||
tx_data = nullptr;
|
||||
ObTxDataGuard tx_data_guard;
|
||||
@ -669,8 +669,8 @@ void TestTabletFreezeForDirectLoad::do_repeat_insert_test() {
|
||||
tx_data->state_ = ObTxData::RUNNING;
|
||||
int undo_act_num = (rand() & 31) + 1;
|
||||
for (int j = 0; j < undo_act_num; j++) {
|
||||
int64_t from = ObRandom::rand(1, 200000);
|
||||
auto to = (from > 100000) ? (from - 100000) : 1;
|
||||
int64_t from = ObRandom::rand(1, 2000000);
|
||||
auto to = (from > 1000000) ? (from - 1000000) : 1;
|
||||
transaction::ObUndoAction undo_action(ObTxSEQ(from, 0), ObTxSEQ(to, 0));
|
||||
ASSERT_EQ(OB_SUCCESS, tx_data->add_undo_action(&tx_table_, undo_action));
|
||||
}
|
||||
@ -678,11 +678,48 @@ void TestTabletFreezeForDirectLoad::do_repeat_insert_test() {
|
||||
ASSERT_EQ(OB_SUCCESS, tx_data_table_.insert(tx_data));
|
||||
}
|
||||
|
||||
memtable_mgr->destroy();
|
||||
// 插入了4个tx data
|
||||
for (int i = 1; i <= 4; i++) {
|
||||
tx_id = transaction::ObTransID(269381 + i);
|
||||
tx_data = nullptr;
|
||||
ObTxDataGuard tx_data_guard;
|
||||
ASSERT_EQ(OB_SUCCESS, tx_data_table_.alloc_tx_data(tx_data_guard));
|
||||
ASSERT_NE(nullptr, tx_data = tx_data_guard.tx_data());
|
||||
|
||||
tx_data->tx_id_ = tx_id;
|
||||
tx_data->start_scn_ = insert_start_scn;
|
||||
tx_data->end_scn_ = share::SCN::plus(insert_start_scn, i);
|
||||
tx_data->commit_version_ = tx_data->end_scn_;
|
||||
tx_data->state_ = ObTxData::COMMIT;
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, tx_data_table_.insert(tx_data));
|
||||
}
|
||||
|
||||
|
||||
ObTxDataMemtable *frozen_memtable = nullptr;
|
||||
ObTxDataMemtable *active_memtable = nullptr;
|
||||
check_freeze_(memtable_mgr, frozen_memtable, active_memtable);
|
||||
// 预处理会产生一个特殊行的tx data
|
||||
ASSERT_EQ(OB_SUCCESS, frozen_memtable->pre_process_for_merge());
|
||||
|
||||
// 总计有6个合法的tx data待转储。此时如果试图切7个区间,函数不会报错,但是返回结果里只有一个range
|
||||
ObStoreRange input_range;
|
||||
input_range.set_whole_range();
|
||||
ObSEArray<common::ObStoreRange, 7> range_array;
|
||||
ASSERT_EQ(OB_SUCCESS, frozen_memtable->get_split_ranges(input_range, 7, range_array));
|
||||
ASSERT_EQ(1, range_array.count());
|
||||
STORAGE_LOG(INFO, "output range", K(range_array));
|
||||
|
||||
// 如果试图切6个区间,能顺利切出6个range
|
||||
range_array.reuse();
|
||||
ASSERT_EQ(OB_SUCCESS, frozen_memtable->get_split_ranges(input_range, 6, range_array));
|
||||
ASSERT_EQ(6, range_array.count());
|
||||
STORAGE_LOG(INFO, "output range", K(range_array));
|
||||
|
||||
memtable_mgr->destroy();
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::fake_ls_(ObLS &ls)
|
||||
void TestTxDataTable::fake_ls_(ObLS &ls)
|
||||
{
|
||||
ls.ls_meta_.tenant_id_ = 1;
|
||||
ls.ls_meta_.ls_id_.id_ = 1001;
|
||||
@ -692,7 +729,7 @@ void TestTabletFreezeForDirectLoad::fake_ls_(ObLS &ls)
|
||||
ls.ls_meta_.rebuild_seq_ = 0;
|
||||
}
|
||||
|
||||
void TestTabletFreezeForDirectLoad::do_print_leak_slice_test()
|
||||
void TestTxDataTable::do_print_leak_slice_test()
|
||||
{
|
||||
const int32_t CONCURRENCY = 4;
|
||||
ObMemAttr attr;
|
||||
@ -727,17 +764,19 @@ void TestTabletFreezeForDirectLoad::do_print_leak_slice_test()
|
||||
slice_allocator.destroy();
|
||||
}
|
||||
|
||||
TEST_F(TestTabletFreezeForDirectLoad, basic_test)
|
||||
TEST_F(TestTxDataTable, basic_test)
|
||||
{
|
||||
tx_data_num = const_data_num;
|
||||
do_basic_test();
|
||||
}
|
||||
|
||||
TEST_F(TestTabletFreezeForDirectLoad, repeat_insert_test) { do_repeat_insert_test(); }
|
||||
TEST_F(TestTxDataTable, repeat_insert_test) { do_repeat_insert_test(); }
|
||||
|
||||
TEST_F(TestTabletFreezeForDirectLoad, undo_status_test) { do_undo_status_test(); }
|
||||
TEST_F(TestTxDataTable, undo_status_test) { do_undo_status_test(); }
|
||||
|
||||
TEST_F(TestTabletFreezeForDirectLoad, serialize_test) { do_tx_data_serialize_test(); }
|
||||
TEST_F(TestTxDataTable, serialize_test) { do_tx_data_serialize_test(); }
|
||||
|
||||
TEST_F(TestTxDataTable, split_range_test) { do_tx_data_serialize_test(); }
|
||||
|
||||
// TEST_F(TestTxDataTable, print_leak_slice) { do_print_leak_slice_test(); }
|
||||
|
||||
@ -752,7 +791,7 @@ int main(int argc, char **argv)
|
||||
system("rm -fr run_*");
|
||||
ObLogger &logger = ObLogger::get_logger();
|
||||
logger.set_file_name("test_tx_data_table.log", true);
|
||||
logger.set_log_level(OB_LOG_LEVEL_INFO);
|
||||
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
|
||||
TRANS_LOG(WARN, "init memory pool error!");
|
||||
// OB_LOGGER.set_enable_async_log(false);
|
||||
|
||||
|
@ -690,7 +690,15 @@ int ObTxDataMemtable::get_split_ranges(const ObStoreRange &input_range,
|
||||
}
|
||||
}
|
||||
|
||||
STORAGE_LOG(INFO, "generate range bounds for parallel dump tx data memtable:", K(row_key_array_), K(tx_id_2_range_));
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
// reset ret code and use input_range as output result
|
||||
ret = OB_SUCCESS;
|
||||
if (OB_FAIL(range_array.push_back(input_range))) {
|
||||
STORAGE_LOG(WARN, "Failed to push back the merge range to array", KR(ret), K(input_range));
|
||||
}
|
||||
}
|
||||
|
||||
STORAGE_LOG(INFO, "generate range bounds for parallel dump tx data memtable:", K(ret), K(row_key_array_), K(tx_id_2_range_));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -962,6 +962,14 @@ bool ObTxDataTable::skip_this_sstable_end_scn_(const SCN &sstable_end_scn)
|
||||
K(max_decided_scn),
|
||||
K(calc_upper_info_),
|
||||
K(min_start_scn_in_tx_data_memtable));
|
||||
} else {
|
||||
STORAGE_LOG(TRACE,
|
||||
"do calculate upper trans version.",
|
||||
K(need_skip),
|
||||
K(sstable_end_scn),
|
||||
K(max_decided_scn),
|
||||
K(calc_upper_info_),
|
||||
K(min_start_scn_in_tx_data_memtable));
|
||||
}
|
||||
|
||||
return need_skip;
|
||||
|
Loading…
x
Reference in New Issue
Block a user