From 5c776198436ba0c868bed2f949d8a95e737fc0ea Mon Sep 17 00:00:00 2001 From: Tsunaou <895254752@qq.com> Date: Wed, 20 Nov 2024 11:46:08 +0000 Subject: [PATCH] move batch freeze dag to compaction low thread and limit max concurrent task cnt --- src/share/compaction/ob_batch_exec_dag.h | 3 + src/share/scheduler/ob_dag_scheduler_config.h | 2 +- .../scheduler/ob_tenant_dag_scheduler.cpp | 11 ++- src/share/scheduler/ob_tenant_dag_scheduler.h | 4 + .../ob_batch_freeze_tablets_dag.cpp | 16 ++++ .../compaction/ob_batch_freeze_tablets_dag.h | 3 + .../share/scheduler/test_dag_scheduler.cpp | 73 ++++++++++++++++++- 7 files changed, 105 insertions(+), 7 deletions(-) diff --git a/src/share/compaction/ob_batch_exec_dag.h b/src/share/compaction/ob_batch_exec_dag.h index 2295f973fd..eb4e154f90 100644 --- a/src/share/compaction/ob_batch_exec_dag.h +++ b/src/share/compaction/ob_batch_exec_dag.h @@ -106,6 +106,7 @@ public: ObBatchExecDag(const share::ObDagType::ObDagTypeEnum type); virtual ~ObBatchExecDag() {} int init_by_param(const share::ObIDagInitParam *param); + virtual int inner_init() { return OB_SUCCESS; } virtual int create_first_task() override; virtual bool operator == (const ObIDag &other) const override; virtual int64_t hash() const override { return param_.get_hash(); } @@ -245,6 +246,8 @@ int ObBatchExecDag::init_by_param( STORAGE_LOG(WARN, "failed to init param", KR(ret), KPC(init_param)); } else if (OB_FAIL(init_merge_history())) { STORAGE_LOG(WARN, "failed to init merge history", KR(ret), KPC(init_param)); + } else if (OB_FAIL(inner_init())) { + STORAGE_LOG(WARN, "failed to inner init", KR(ret), KPC(init_param)); } else { is_inited_ = true; } diff --git a/src/share/scheduler/ob_dag_scheduler_config.h b/src/share/scheduler/ob_dag_scheduler_config.h index 3700f2d2a3..cbcb52c95a 100644 --- a/src/share/scheduler/ob_dag_scheduler_config.h +++ b/src/share/scheduler/ob_dag_scheduler_config.h @@ -63,7 +63,7 @@ DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_MDS_MINI_MERGE, ObDagPrio::DAG_PRIO_COMPACTI false, 3, {"ls_id", "tablet_id", "flush_scn"}) DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_VERIFY_CKM, ObDagPrio::DAG_PRIO_COMPACTION_LOW, ObSysTaskType::SSTABLE_MAJOR_MERGE_TASK, "VERIFY_CKM", "COMPACTION", false, 2, {"ls_id", "tablet_count"}) -DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BATCH_FREEZE_TABLETS, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::BATCH_FREEZE_TABLET_TASK, "BATCH_FREEZE", "COMPACTION", +DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BATCH_FREEZE_TABLETS, ObDagPrio::DAG_PRIO_COMPACTION_LOW, ObSysTaskType::BATCH_FREEZE_TABLET_TASK, "BATCH_FREEZE", "COMPACTION", false, 2, {"ls_id", "tablet_count"}) DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_UPDATE_SKIP_MAJOR, ObDagPrio::DAG_PRIO_COMPACTION_LOW, ObSysTaskType::SSTABLE_MAJOR_MERGE_TASK, "UPDATE_SKIP_MAJOR", "COMPACTION", false, 2, {"ls_id", "tablet_count"}) diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.cpp b/src/share/scheduler/ob_tenant_dag_scheduler.cpp index cc87e6ced3..6c5cd318d7 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.cpp +++ b/src/share/scheduler/ob_tenant_dag_scheduler.cpp @@ -488,6 +488,7 @@ ObIDag::ObIDag(const ObDagType::ObDagTypeEnum type) priority_(OB_DAG_TYPES[type].init_dag_prio_), dag_status_(ObIDag::DAG_STATUS_INITING), running_task_cnt_(0), + max_concurrent_task_cnt_(INT64_MAX), is_stop_(false), max_retry_times_(0), running_times_(0), @@ -541,6 +542,7 @@ void ObIDag::clear_running_info() start_time_ = 0; consumer_group_id_ = USER_RESOURCE_OTHER_GROUP_ID; running_task_cnt_ = 0; + max_concurrent_task_cnt_ = INT64_MAX; dag_status_ = ObDagStatus::DAG_STATUS_INITING; dag_ret_ = OB_SUCCESS; error_location_.reset(); @@ -718,7 +720,12 @@ int ObIDag::get_next_ready_task(ObITask *&task) bool found = false; ObMutexGuard guard(lock_); - if (!is_stop_ && ObIDag::DAG_STATUS_NODE_RUNNING == dag_status_) { + if (is_stop_ || ObIDag::DAG_STATUS_NODE_RUNNING != dag_status_) { + } else if (OB_UNLIKELY(max_concurrent_task_cnt_ >= 1 && running_task_cnt_ >= max_concurrent_task_cnt_)) { + if (REACH_TENANT_TIME_INTERVAL(DUMP_STATUS_INTERVAL)) { + COMMON_LOG(INFO, "delay scheduling since concurrent running task cnts reach limit", KPC(this)); + } + } else { ObITask *cur_task = task_list_.get_first(); const ObITask *head = task_list_.get_header(); while (!found && head != cur_task && nullptr != cur_task) { @@ -855,7 +862,7 @@ int64_t ObIDag::to_string(char *buf, const int64_t buf_len) const } else { J_OBJ_START(); J_KV(KP(this), K_(is_inited), K_(type), "name", get_dag_type_str(type_), K_(id), KPC_(dag_net), K_(dag_ret), K_(dag_status), - K_(add_time), K_(start_time), K_(running_task_cnt), K_(indegree), K_(consumer_group_id), "hash", hash(), K(task_list_.get_size()), + K_(add_time), K_(start_time), K_(running_task_cnt), K_(max_concurrent_task_cnt), K_(indegree), K_(consumer_group_id), "hash", hash(), K(task_list_.get_size()), K_(emergency)); J_OBJ_END(); } diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index b2f4d08464..87909971fe 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -436,6 +436,8 @@ public: lib::ObMutexGuard guard(lock_); return task_list_.get_size(); } + void set_max_concurrent_task_cnt(int64_t max_concurrent_task_cnt) { max_concurrent_task_cnt_ = max_concurrent_task_cnt; } + int64_t get_max_concurrent_task_cnt() const { return max_concurrent_task_cnt_;} virtual int gene_warning_info(ObDagWarningInfo &info, ObIAllocator &allocator); virtual bool ignore_warning() { return false; } virtual bool check_can_retry(); @@ -529,6 +531,7 @@ protected: private: typedef common::ObDList TaskList; static const int64_t DEFAULT_TASK_NUM = 32; + static const int64_t DUMP_STATUS_INTERVAL = 30 * 60 * 1000L * 1000L /*30min*/; private: void reset(); void clear_task_list(); @@ -547,6 +550,7 @@ private: ObDagId id_; ObDagStatus dag_status_; int64_t running_task_cnt_; + int64_t max_concurrent_task_cnt_; TaskList task_list_; // should protect by lock bool is_stop_; // should protect by lock uint32_t max_retry_times_; // should protect by lock diff --git a/src/storage/compaction/ob_batch_freeze_tablets_dag.cpp b/src/storage/compaction/ob_batch_freeze_tablets_dag.cpp index 787c407a02..322732466c 100644 --- a/src/storage/compaction/ob_batch_freeze_tablets_dag.cpp +++ b/src/storage/compaction/ob_batch_freeze_tablets_dag.cpp @@ -18,6 +18,22 @@ using namespace share; using namespace storage; namespace compaction { +/* + * ----------------------------------------ObBatchFreezeTabletsDag-------------------------------------------- + */ +int ObBatchFreezeTabletsDag::inner_init() +{ + int ret = OB_SUCCESS; + const ObBatchFreezeTabletsParam ¶m = get_param(); + if (!param.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid param", K(ret), K(param)); + } else { + (void) set_max_concurrent_task_cnt(MAX_CONCURRENT_FREEZE_TASK_CNT); + } + return ret; +} + /* * ----------------------------------------ObBatchFreezeTabletsTask-------------------------------------------- */ diff --git a/src/storage/compaction/ob_batch_freeze_tablets_dag.h b/src/storage/compaction/ob_batch_freeze_tablets_dag.h index 14dfc0957e..6f4ef4b58a 100644 --- a/src/storage/compaction/ob_batch_freeze_tablets_dag.h +++ b/src/storage/compaction/ob_batch_freeze_tablets_dag.h @@ -57,6 +57,9 @@ public: : ObBatchExecDag(share::ObDagType::DAG_TYPE_BATCH_FREEZE_TABLETS) {} virtual ~ObBatchFreezeTabletsDag() = default; + virtual int inner_init(); +public: + static constexpr int64_t MAX_CONCURRENT_FREEZE_TASK_CNT = 2; private: DISALLOW_COPY_AND_ASSIGN(ObBatchFreezeTabletsDag); }; diff --git a/unittest/share/scheduler/test_dag_scheduler.cpp b/unittest/share/scheduler/test_dag_scheduler.cpp index 6a157b3e91..253ac0124f 100644 --- a/unittest/share/scheduler/test_dag_scheduler.cpp +++ b/unittest/share/scheduler/test_dag_scheduler.cpp @@ -1815,9 +1815,9 @@ TEST_F(TestDagScheduler, test_check_ls_compaction_dag_exist_with_cancel) ASSERT_TRUE(nullptr != scheduler); ASSERT_EQ(OB_SUCCESS, scheduler->init(MTL_ID(), time_slice, 64)); EXPECT_EQ(OB_SUCCESS, scheduler->set_thread_score(ObDagPrio::DAG_PRIO_COMPACTION_MID, 1)); - EXPECT_EQ(OB_SUCCESS, scheduler->set_thread_score(ObDagPrio::DAG_PRIO_COMPACTION_HIGH, 1)); + EXPECT_EQ(OB_SUCCESS, scheduler->set_thread_score(ObDagPrio::DAG_PRIO_COMPACTION_LOW, 1)); EXPECT_EQ(1, scheduler->prio_sche_[ObDagPrio::DAG_PRIO_COMPACTION_MID].limits_); - EXPECT_EQ(1, scheduler->prio_sche_[ObDagPrio::DAG_PRIO_COMPACTION_HIGH].limits_); + EXPECT_EQ(1, scheduler->prio_sche_[ObDagPrio::DAG_PRIO_COMPACTION_LOW].limits_); LoopWaitTask *wait_task = nullptr; LoopWaitTask *wait_task2 = nullptr; @@ -1837,7 +1837,7 @@ TEST_F(TestDagScheduler, test_check_ls_compaction_dag_exist_with_cancel) EXPECT_EQ(OB_SUCCESS, dag->add_task(*wait_task)); EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag)); } - // add 2 dag at prio = DAG_PRIO_COMPACTION_HIGH + // add 2 dag at prio = DAG_PRIO_COMPACTION_LOW for (int64_t i = 0; i < ls_cnt; ++i) { ObBatchFreezeTabletsDag *batch_freeze_dag = NULL; EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(batch_freeze_dag)); @@ -1849,7 +1849,7 @@ TEST_F(TestDagScheduler, test_check_ls_compaction_dag_exist_with_cancel) } EXPECT_EQ(dag_cnt, scheduler->dag_cnts_[ObDagType::DAG_TYPE_MERGE_EXECUTE]); CHECK_EQ_UTIL_TIMEOUT(1, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_MID)); - CHECK_EQ_UTIL_TIMEOUT(1, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_HIGH)); + CHECK_EQ_UTIL_TIMEOUT(1, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_LOW)); // cancel waiting dag of ls_ids[0], all dag of ls_ids[1] will be destroyed when check_cancel bool exist = false; @@ -2018,6 +2018,71 @@ TEST_F(TestDagScheduler, test_maybe_cycle_tasks) wait_scheduler(); } +TEST_F(TestDagScheduler, test_max_concurrent_task) +{ + ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*); + ASSERT_TRUE(nullptr != scheduler); + ASSERT_EQ(OB_SUCCESS, scheduler->init(MTL_ID(), time_slice, 64)); + EXPECT_EQ(OB_SUCCESS, scheduler->set_thread_score(ObDagPrio::DAG_PRIO_COMPACTION_MID, 7)); + EXPECT_EQ(7, scheduler->prio_sche_[ObDagPrio::DAG_PRIO_COMPACTION_MID].limits_); + + const int64_t dag_cnt = 3; + ObLSID ls_id(1001); + bool finish_flag[dag_cnt] = {false, false, false}; + for (int64_t idx = 0; idx < dag_cnt; ++idx) { + TestCompMidCancelDag *dag = nullptr; + LoopWaitTask *wait_task = nullptr; + ObTabletID tablet_id(200001 + idx); + EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(dag)); + dag->max_concurrent_task_cnt_ = 2; + dag->ls_id_ = ls_id; + dag->tablet_id_ = tablet_id; + EXPECT_EQ(OB_SUCCESS, alloc_task(*dag, wait_task)); + EXPECT_EQ(OB_SUCCESS, wait_task->init(1, 10 /*cnt*/, finish_flag[idx])); + EXPECT_EQ(OB_SUCCESS, dag->add_task(*wait_task)); + EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag)); + } + CHECK_EQ_UTIL_TIMEOUT(6, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_MID)); + + AtomicOperator op(0); + TestDag *dag1 = nullptr; + AtomicIncTask *inc_task = nullptr; + AtomicIncTask *inc_task1 = nullptr; + AtomicMulTask *mul_task = nullptr; + EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(dag1)); + EXPECT_EQ(OB_SUCCESS, dag1->init(1)); + EXPECT_EQ(OB_SUCCESS, alloc_task(*dag1, inc_task)); + EXPECT_EQ(OB_SUCCESS, inc_task->init(1, 10, op)); + EXPECT_EQ(OB_SUCCESS, dag1->add_task(*inc_task)); + EXPECT_EQ(OB_SUCCESS, alloc_task(*dag1, mul_task)); + EXPECT_EQ(OB_SUCCESS, mul_task->init(1, 4, op)); + EXPECT_EQ(OB_SUCCESS, mul_task->add_child(*inc_task)); + EXPECT_EQ(OB_SUCCESS, dag1->add_task(*mul_task)); + EXPECT_EQ(OB_SUCCESS, alloc_task(*dag1, inc_task1)); + EXPECT_EQ(OB_SUCCESS, inc_task1->init(1, 10, op)); + EXPECT_EQ(OB_SUCCESS, inc_task1->add_child(*mul_task)); + EXPECT_EQ(OB_SUCCESS, dag1->add_task(*inc_task1)); + EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag1)); + CHECK_EQ_UTIL_TIMEOUT(170, op.value()); + + int64_t start_time = oceanbase::common::ObTimeUtility::current_time(); + while (oceanbase::common::ObTimeUtility::current_time() - start_time < CHECK_TIMEOUT) { + const int64_t cnt = scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_MID); + EXPECT_LE(cnt, 7); + if (cnt == 6) { + break; + } else { + usleep(10 * 1000 /*10 ms*/); + } + } + EXPECT_EQ(6, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_MID)); + finish_flag[0] = true; + finish_flag[1] = true; + finish_flag[2] = true; + wait_scheduler(); + EXPECT_EQ(0, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_MID)); +} + /* TEST_F(TestDagScheduler, test_large_thread_cnt) {