From 0c661df5f462359958e21d26661051bd9acf9649 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 17 Mar 2023 03:43:51 +0000 Subject: [PATCH] optimize parallem minor dag scheduling --- .../compaction/ob_partition_merge_policy.cpp | 5 +- .../compaction/ob_tablet_merge_task.cpp | 2 +- src/storage/tablet/ob_tablet_table_store.cpp | 5 ++ unittest/storage/test_parallel_minor_dag.cpp | 65 ++++++++++--------- 4 files changed, 44 insertions(+), 33 deletions(-) diff --git a/src/storage/compaction/ob_partition_merge_policy.cpp b/src/storage/compaction/ob_partition_merge_policy.cpp index ea7c9c8e7..34b1f89b7 100644 --- a/src/storage/compaction/ob_partition_merge_policy.cpp +++ b/src/storage/compaction/ob_partition_merge_policy.cpp @@ -1125,9 +1125,10 @@ int ObPartitionMergePolicy::generate_parallel_minor_interval( * 1. If compact_trigger is small, minor merge should be easier to schedule, we should lower the threshold; * 2. If compact_trigger is big, we should upper the threshold to prevent the creation of dag frequently. */ - int64_t table_count_threshold = minor_range_mgr.exe_range_array_.empty() + int64_t exist_dag_cnt = minor_range_mgr.exe_range_array_.count(); + int64_t table_count_threshold = (0 == exist_dag_cnt) ? minor_compact_trigger - : MIN(OB_MINOR_PARALLEL_SSTABLE_CNT_IN_DAG / 2, minor_compact_trigger * 2); + : OB_MINOR_PARALLEL_SSTABLE_CNT_IN_DAG + (OB_MINOR_PARALLEL_SSTABLE_CNT_IN_DAG / 2) * (exist_dag_cnt - 1); for (int64_t i = 0; OB_SUCC(ret) && i < input_result_array.count(); ++i) { if (OB_FAIL(split_parallel_minor_range(table_count_threshold, input_result_array.at(i), parallel_result))) { LOG_WARN("failed to split parallel minor range", K(ret), K(input_result_array.at(i))); diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index 6f68dcd7f..9cd84b792 100644 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -253,7 +253,7 @@ int ObBasicTabletMergeDag::get_tablet_and_compat_mode() if (OB_SUCC(ret) && inc_sstable_cnt >= MAX_SSTABLE_CNT_IN_STORAGE) { ret = OB_EAGAIN; LOG_WARN("Too many sstables in tablet, cannot schdule mini compaction, retry later", - K(ret), K(inc_sstable_cnt), K(tmp_tablet_handle.get_obj())); + K(ret), K_(ls_id), K_(tablet_id), K(inc_sstable_cnt), K(tmp_tablet_handle.get_obj())); } } diff --git a/src/storage/tablet/ob_tablet_table_store.cpp b/src/storage/tablet/ob_tablet_table_store.cpp index 9afe3a6d2..3bd8b66df 100644 --- a/src/storage/tablet/ob_tablet_table_store.cpp +++ b/src/storage/tablet/ob_tablet_table_store.cpp @@ -425,21 +425,26 @@ int ObTabletTableStore::update_memtables() { int ret = OB_SUCCESS; ObTableHandleArray inc_memtables; + ObTimeGuard time_guard("ObTabletTableStore::update_memtables", 5 * 1000); if (OB_ISNULL(tablet_ptr_) || OB_ISNULL(tablet_ptr_->get_memtable_mgr())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected tablet_ptr or memtable_mgr", K(ret), KP(tablet_ptr_)); } else if (!tablet_ptr_->get_memtable_mgr()->has_memtable()) { LOG_INFO("no memtable in memtable mgr", K(ret)); + } else if (FALSE_IT(time_guard.click("has_memtable"))) { } else if (OB_FAIL(tablet_ptr_->get_memtable_mgr()->get_all_memtables(inc_memtables))) { LOG_WARN("failed to get all memtables from memtable_mgr", K(ret)); + } else if (FALSE_IT(time_guard.click("get_all_memtable"))) { } else if (OB_FAIL(memtables_.rebuild(inc_memtables))) { LOG_ERROR("failed to rebuild table store memtables", K(ret), K(inc_memtables), KPC(this)); + } else if (FALSE_IT(time_guard.click("memtables_.rebuild"))) { } else { int tmp_ret = OB_SUCCESS; if (OB_SUCCESS != (tmp_ret = init_read_cache())) { LOG_WARN("failed to rebuild read cache", K(tmp_ret)); } + time_guard.click("init_read_cache"); } return ret; } diff --git a/unittest/storage/test_parallel_minor_dag.cpp b/unittest/storage/test_parallel_minor_dag.cpp index 3d9cdc1af..dfdffcfe1 100644 --- a/unittest/storage/test_parallel_minor_dag.cpp +++ b/unittest/storage/test_parallel_minor_dag.cpp @@ -35,13 +35,12 @@ public: void SetUp() { - ObTenantMetaMemMgr *t3m = OB_NEW(ObTenantMetaMemMgr, ObModIds::TEST, 500); - tenant_base_.set(t3m); + ObTenantMetaMemMgr *t3m = OB_NEW(ObTenantMetaMemMgr, ObModIds::TEST, 500); + ASSERT_EQ(OB_SUCCESS, t3m->init()); - ObTenantEnv::set_tenant(&tenant_base_); - ASSERT_EQ(OB_SUCCESS, tenant_base_.init()); - - ASSERT_EQ(OB_SUCCESS, t3m->init()); + tenant_base_.set(t3m); + ObTenantEnv::set_tenant(&tenant_base_); + ASSERT_EQ(OB_SUCCESS, tenant_base_.init()); } share::SCN get_start_log_ts(const int64_t idx); @@ -226,54 +225,54 @@ TEST_F(TestParallelMinorDag, test_parallel_with_range_mgr) ObArray result_array; ObMinorExecuteRangeMgr minor_range_mgr; - minor_range_mgr.exe_range_array_.push_back(construct_scn_range(11, 21)); - minor_range_mgr.exe_range_array_.push_back(construct_scn_range(31, 41)); + minor_range_mgr.exe_range_array_.push_back(construct_scn_range(16, 21)); + minor_range_mgr.exe_range_array_.push_back(construct_scn_range(37, 41)); ASSERT_EQ(OB_SUCCESS, prepare_merge_result(sstable_cnt, result)); ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array)); ASSERT_EQ(result_array.count(), 2); ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 1); - ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 11); + ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 16); ASSERT_EQ(result_array.at(1).scn_range_.start_scn_.get_val_for_tx(), 21); - ASSERT_EQ(result_array.at(1).scn_range_.end_scn_.get_val_for_tx(), 31); + ASSERT_EQ(result_array.at(1).scn_range_.end_scn_.get_val_for_tx(), 37); result_array.reset(); minor_range_mgr.reset(); - minor_range_mgr.exe_range_array_.push_back(construct_scn_range(15, 19)); + minor_range_mgr.exe_range_array_.push_back(construct_scn_range(13, 19)); minor_range_mgr.exe_range_array_.push_back(construct_scn_range(37, 39)); ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array)); COMMON_LOG(INFO, "generate_parallel_minor_interval", K(result_array)); - ASSERT_EQ(result_array.count(), 2); + ASSERT_EQ(result_array.count(), 1); - ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 1); - ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 15); + ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 19); + ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 37); - ASSERT_EQ(result_array.at(1).scn_range_.start_scn_.get_val_for_tx(), 19); - ASSERT_EQ(result_array.at(1).scn_range_.end_scn_.get_val_for_tx(), 37); result_array.reset(); minor_range_mgr.reset(); minor_range_mgr.exe_range_array_.push_back(construct_scn_range(1, 17)); minor_range_mgr.exe_range_array_.push_back(construct_scn_range(18, 34)); ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array)); - ASSERT_EQ(result_array.count(), 1); - - ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 34); - ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 41); + ASSERT_EQ(result_array.count(), 0); result_array.reset(); minor_range_mgr.reset(); - minor_range_mgr.exe_range_array_.push_back(construct_scn_range(1, 17)); - minor_range_mgr.exe_range_array_.push_back(construct_scn_range(17, 37)); + minor_range_mgr.exe_range_array_.push_back(construct_scn_range(1, 5)); + minor_range_mgr.exe_range_array_.push_back(construct_scn_range(26, 37)); + minor_range_mgr.exe_range_array_.push_back(construct_scn_range(39, 40)); ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array)); - ASSERT_EQ(result_array.count(), 1); - ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 37); - ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 41); + ASSERT_EQ(result_array.count(), 2); + + ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 5); + ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 15); + + ASSERT_EQ(result_array.at(1).scn_range_.start_scn_.get_val_for_tx(), 15); + ASSERT_EQ(result_array.at(1).scn_range_.end_scn_.get_val_for_tx(), 26); result_array.reset(); @@ -281,19 +280,25 @@ TEST_F(TestParallelMinorDag, test_parallel_with_range_mgr) minor_range_mgr.exe_range_array_.push_back(construct_scn_range(1, 34)); ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array)); COMMON_LOG(INFO, "generate_parallel_minor_interval", K(result_array)); + ASSERT_EQ(result_array.count(), 0); + + result_array.reset(); + minor_range_mgr.reset(); + minor_range_mgr.exe_range_array_.push_back(construct_scn_range(1, 30)); + ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(2, result, minor_range_mgr, result_array)); + COMMON_LOG(INFO, "generate_parallel_minor_interval", K(result_array)); ASSERT_EQ(result_array.count(), 1); - ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 34); + + ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 30); ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 41); result_array.reset(); minor_range_mgr.reset(); minor_range_mgr.exe_range_array_.push_back(construct_scn_range(5, 25)); - minor_range_mgr.exe_range_array_.push_back(construct_scn_range(31, 41)); + minor_range_mgr.exe_range_array_.push_back(construct_scn_range(35, 41)); ASSERT_EQ(OB_SUCCESS, ObPartitionMergePolicy::generate_parallel_minor_interval(4, result, minor_range_mgr, result_array)); - ASSERT_EQ(result_array.count(), 1); - ASSERT_EQ(result_array.at(0).scn_range_.start_scn_.get_val_for_tx(), 25); - ASSERT_EQ(result_array.at(0).scn_range_.end_scn_.get_val_for_tx(), 31); + ASSERT_EQ(result_array.count(), 0); result_array.reset();