optimize parallem minor dag scheduling

This commit is contained in:
obdev 2023-03-17 03:43:51 +00:00 committed by ob-robot
parent 5510d6c791
commit 0c661df5f4
4 changed files with 44 additions and 33 deletions

View File

@ -1125,9 +1125,10 @@ int ObPartitionMergePolicy::generate_parallel_minor_interval(
* 1. If compact_trigger is small, minor merge should be easier to schedule, we should lower the threshold;
* 2. If compact_trigger is big, we should upper the threshold to prevent the creation of dag frequently.
*/
int64_t table_count_threshold = minor_range_mgr.exe_range_array_.empty()
int64_t exist_dag_cnt = minor_range_mgr.exe_range_array_.count();
int64_t table_count_threshold = (0 == exist_dag_cnt)
? minor_compact_trigger
: MIN(OB_MINOR_PARALLEL_SSTABLE_CNT_IN_DAG / 2, minor_compact_trigger * 2);
: OB_MINOR_PARALLEL_SSTABLE_CNT_IN_DAG + (OB_MINOR_PARALLEL_SSTABLE_CNT_IN_DAG / 2) * (exist_dag_cnt - 1);
for (int64_t i = 0; OB_SUCC(ret) && i < input_result_array.count(); ++i) {
if (OB_FAIL(split_parallel_minor_range(table_count_threshold, input_result_array.at(i), parallel_result))) {
LOG_WARN("failed to split parallel minor range", K(ret), K(input_result_array.at(i)));

View File

@ -253,7 +253,7 @@ int ObBasicTabletMergeDag::get_tablet_and_compat_mode()
if (OB_SUCC(ret) && inc_sstable_cnt >= MAX_SSTABLE_CNT_IN_STORAGE) {
ret = OB_EAGAIN;
LOG_WARN("Too many sstables in tablet, cannot schdule mini compaction, retry later",
K(ret), K(inc_sstable_cnt), K(tmp_tablet_handle.get_obj()));
K(ret), K_(ls_id), K_(tablet_id), K(inc_sstable_cnt), K(tmp_tablet_handle.get_obj()));
}
}

View File

@ -425,21 +425,26 @@ int ObTabletTableStore::update_memtables()
{
int ret = OB_SUCCESS;
ObTableHandleArray inc_memtables;
ObTimeGuard time_guard("ObTabletTableStore::update_memtables", 5 * 1000);
if (OB_ISNULL(tablet_ptr_) || OB_ISNULL(tablet_ptr_->get_memtable_mgr())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected tablet_ptr or memtable_mgr", K(ret), KP(tablet_ptr_));
} else if (!tablet_ptr_->get_memtable_mgr()->has_memtable()) {
LOG_INFO("no memtable in memtable mgr", K(ret));
} else if (FALSE_IT(time_guard.click("has_memtable"))) {
} else if (OB_FAIL(tablet_ptr_->get_memtable_mgr()->get_all_memtables(inc_memtables))) {
LOG_WARN("failed to get all memtables from memtable_mgr", K(ret));
} else if (FALSE_IT(time_guard.click("get_all_memtable"))) {
} else if (OB_FAIL(memtables_.rebuild(inc_memtables))) {
LOG_ERROR("failed to rebuild table store memtables", K(ret), K(inc_memtables), KPC(this));
} else if (FALSE_IT(time_guard.click("memtables_.rebuild"))) {
} else {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = init_read_cache())) {
LOG_WARN("failed to rebuild read cache", K(tmp_ret));
}
time_guard.click("init_read_cache");
}
return ret;
}

View File

@ -35,13 +35,12 @@ public:
void SetUp()
{
ObTenantMetaMemMgr *t3m = OB_NEW(ObTenantMetaMemMgr, ObModIds::TEST, 500);
tenant_base_.set(t3m);
ObTenantMetaMemMgr *t3m = OB_NEW(ObTenantMetaMemMgr, ObModIds::TEST, 500);
ASSERT_EQ(OB_SUCCESS, t3m->init());
ObTenantEnv::set_tenant(&tenant_base_);
ASSERT_EQ(OB_SUCCESS, tenant_base_.init());
ASSERT_EQ(OB_SUCCESS, t3m->init());
tenant_base_.set(t3m);
ObTenantEnv::set_tenant(&tenant_base_);
ASSERT_EQ(OB_SUCCESS, tenant_base_.init());
}
share::SCN get_start_log_ts(const int64_t idx);
@ -226,54 +225,54 @@ TEST_F(TestParallelMinorDag, test_parallel_with_range_mgr)
ObArray<ObGetMergeTablesResult> result_array;
ObMinorExecuteRangeMgr minor_range_mgr;
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(11, 21));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(31, 41));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(16, 21));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(37, 41));
ASSERT_EQ(OB_SUCCESS, prepare_merge_result(sstable_cnt, result));
ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array));
ASSERT_EQ(result_array.count(), 2);
ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 1);
ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 11);
ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 16);
ASSERT_EQ(result_array.at(1).scn_range_.start_scn_.get_val_for_tx(), 21);
ASSERT_EQ(result_array.at(1).scn_range_.end_scn_.get_val_for_tx(), 31);
ASSERT_EQ(result_array.at(1).scn_range_.end_scn_.get_val_for_tx(), 37);
result_array.reset();
minor_range_mgr.reset();
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(15, 19));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(13, 19));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(37, 39));
ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array));
COMMON_LOG(INFO, "generate_parallel_minor_interval", K(result_array));
ASSERT_EQ(result_array.count(), 2);
ASSERT_EQ(result_array.count(), 1);
ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 1);
ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 15);
ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 19);
ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 37);
ASSERT_EQ(result_array.at(1).scn_range_.start_scn_.get_val_for_tx(), 19);
ASSERT_EQ(result_array.at(1).scn_range_.end_scn_.get_val_for_tx(), 37);
result_array.reset();
minor_range_mgr.reset();
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(1, 17));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(18, 34));
ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array));
ASSERT_EQ(result_array.count(), 1);
ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 34);
ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 41);
ASSERT_EQ(result_array.count(), 0);
result_array.reset();
minor_range_mgr.reset();
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(1, 17));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(17, 37));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(1, 5));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(26, 37));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(39, 40));
ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array));
ASSERT_EQ(result_array.count(), 1);
ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 37);
ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 41);
ASSERT_EQ(result_array.count(), 2);
ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 5);
ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 15);
ASSERT_EQ(result_array.at(1).scn_range_.start_scn_.get_val_for_tx(), 15);
ASSERT_EQ(result_array.at(1).scn_range_.end_scn_.get_val_for_tx(), 26);
result_array.reset();
@ -281,19 +280,25 @@ TEST_F(TestParallelMinorDag, test_parallel_with_range_mgr)
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(1, 34));
ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array));
COMMON_LOG(INFO, "generate_parallel_minor_interval", K(result_array));
ASSERT_EQ(result_array.count(), 0);
result_array.reset();
minor_range_mgr.reset();
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(1, 30));
ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array));
COMMON_LOG(INFO, "generate_parallel_minor_interval", K(result_array));
ASSERT_EQ(result_array.count(), 1);
ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 34);
ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 30);
ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 41);
result_array.reset();
minor_range_mgr.reset();
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(5, 25));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(31, 41));
minor_range_mgr.exe_range_array_.push_back(construct_scn_range(35, 41));
ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(4, result, minor_range_mgr, result_array));
ASSERT_EQ(result_array.count(), 1);
ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 25);
ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 31);
ASSERT_EQ(result_array.count(), 0);
result_array.reset();