move batch freeze dag to compaction low thread and limit max concurrent task cnt

This commit is contained in:
Tsunaou 2024-11-20 11:46:08 +00:00 committed by ob-robot
parent 01fe394aba
commit 5c77619843
7 changed files with 105 additions and 7 deletions

View File

@ -106,6 +106,7 @@ public:
ObBatchExecDag(const share::ObDagType::ObDagTypeEnum type);
virtual ~ObBatchExecDag() {}
int init_by_param(const share::ObIDagInitParam *param);
virtual int inner_init() { return OB_SUCCESS; }
virtual int create_first_task() override;
virtual bool operator == (const ObIDag &other) const override;
virtual int64_t hash() const override { return param_.get_hash(); }
@ -245,6 +246,8 @@ int ObBatchExecDag<TASK, PARAM>::init_by_param(
STORAGE_LOG(WARN, "failed to init param", KR(ret), KPC(init_param));
} else if (OB_FAIL(init_merge_history())) {
STORAGE_LOG(WARN, "failed to init merge history", KR(ret), KPC(init_param));
} else if (OB_FAIL(inner_init())) {
STORAGE_LOG(WARN, "failed to inner init", KR(ret), KPC(init_param));
} else {
is_inited_ = true;
}

View File

@ -63,7 +63,7 @@ DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_MDS_MINI_MERGE, ObDagPrio::DAG_PRIO_COMPACTI
false, 3, {"ls_id", "tablet_id", "flush_scn"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_VERIFY_CKM, ObDagPrio::DAG_PRIO_COMPACTION_LOW, ObSysTaskType::SSTABLE_MAJOR_MERGE_TASK, "VERIFY_CKM", "COMPACTION",
false, 2, {"ls_id", "tablet_count"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BATCH_FREEZE_TABLETS, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::BATCH_FREEZE_TABLET_TASK, "BATCH_FREEZE", "COMPACTION",
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BATCH_FREEZE_TABLETS, ObDagPrio::DAG_PRIO_COMPACTION_LOW, ObSysTaskType::BATCH_FREEZE_TABLET_TASK, "BATCH_FREEZE", "COMPACTION",
false, 2, {"ls_id", "tablet_count"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_UPDATE_SKIP_MAJOR, ObDagPrio::DAG_PRIO_COMPACTION_LOW, ObSysTaskType::SSTABLE_MAJOR_MERGE_TASK, "UPDATE_SKIP_MAJOR", "COMPACTION",
false, 2, {"ls_id", "tablet_count"})

View File

@ -488,6 +488,7 @@ ObIDag::ObIDag(const ObDagType::ObDagTypeEnum type)
priority_(OB_DAG_TYPES[type].init_dag_prio_),
dag_status_(ObIDag::DAG_STATUS_INITING),
running_task_cnt_(0),
max_concurrent_task_cnt_(INT64_MAX),
is_stop_(false),
max_retry_times_(0),
running_times_(0),
@ -541,6 +542,7 @@ void ObIDag::clear_running_info()
start_time_ = 0;
consumer_group_id_ = USER_RESOURCE_OTHER_GROUP_ID;
running_task_cnt_ = 0;
max_concurrent_task_cnt_ = INT64_MAX;
dag_status_ = ObDagStatus::DAG_STATUS_INITING;
dag_ret_ = OB_SUCCESS;
error_location_.reset();
@ -718,7 +720,12 @@ int ObIDag::get_next_ready_task(ObITask *&task)
bool found = false;
ObMutexGuard guard(lock_);
if (!is_stop_ && ObIDag::DAG_STATUS_NODE_RUNNING == dag_status_) {
if (is_stop_ || ObIDag::DAG_STATUS_NODE_RUNNING != dag_status_) {
} else if (OB_UNLIKELY(max_concurrent_task_cnt_ >= 1 && running_task_cnt_ >= max_concurrent_task_cnt_)) {
if (REACH_TENANT_TIME_INTERVAL(DUMP_STATUS_INTERVAL)) {
COMMON_LOG(INFO, "delay scheduling since concurrent running task cnts reach limit", KPC(this));
}
} else {
ObITask *cur_task = task_list_.get_first();
const ObITask *head = task_list_.get_header();
while (!found && head != cur_task && nullptr != cur_task) {
@ -855,7 +862,7 @@ int64_t ObIDag::to_string(char *buf, const int64_t buf_len) const
} else {
J_OBJ_START();
J_KV(KP(this), K_(is_inited), K_(type), "name", get_dag_type_str(type_), K_(id), KPC_(dag_net), K_(dag_ret), K_(dag_status),
K_(add_time), K_(start_time), K_(running_task_cnt), K_(indegree), K_(consumer_group_id), "hash", hash(), K(task_list_.get_size()),
K_(add_time), K_(start_time), K_(running_task_cnt), K_(max_concurrent_task_cnt), K_(indegree), K_(consumer_group_id), "hash", hash(), K(task_list_.get_size()),
K_(emergency));
J_OBJ_END();
}

View File

@ -436,6 +436,8 @@ public:
lib::ObMutexGuard guard(lock_);
return task_list_.get_size();
}
void set_max_concurrent_task_cnt(int64_t max_concurrent_task_cnt) { max_concurrent_task_cnt_ = max_concurrent_task_cnt; }
int64_t get_max_concurrent_task_cnt() const { return max_concurrent_task_cnt_;}
virtual int gene_warning_info(ObDagWarningInfo &info, ObIAllocator &allocator);
virtual bool ignore_warning() { return false; }
virtual bool check_can_retry();
@ -529,6 +531,7 @@ protected:
private:
typedef common::ObDList<ObITask> TaskList;
static const int64_t DEFAULT_TASK_NUM = 32;
static const int64_t DUMP_STATUS_INTERVAL = 30 * 60 * 1000L * 1000L /*30min*/;
private:
void reset();
void clear_task_list();
@ -547,6 +550,7 @@ private:
ObDagId id_;
ObDagStatus dag_status_;
int64_t running_task_cnt_;
int64_t max_concurrent_task_cnt_;
TaskList task_list_; // should protect by lock
bool is_stop_; // should protect by lock
uint32_t max_retry_times_; // should protect by lock

View File

@ -18,6 +18,22 @@ using namespace share;
using namespace storage;
namespace compaction
{
/*
* ----------------------------------------ObBatchFreezeTabletsDag--------------------------------------------
*/
int ObBatchFreezeTabletsDag::inner_init()
{
int ret = OB_SUCCESS;
const ObBatchFreezeTabletsParam &param = get_param();
if (!param.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid param", K(ret), K(param));
} else {
(void) set_max_concurrent_task_cnt(MAX_CONCURRENT_FREEZE_TASK_CNT);
}
return ret;
}
/*
* ----------------------------------------ObBatchFreezeTabletsTask--------------------------------------------
*/

View File

@ -57,6 +57,9 @@ public:
: ObBatchExecDag(share::ObDagType::DAG_TYPE_BATCH_FREEZE_TABLETS)
{}
virtual ~ObBatchFreezeTabletsDag() = default;
virtual int inner_init();
public:
static constexpr int64_t MAX_CONCURRENT_FREEZE_TASK_CNT = 2;
private:
DISALLOW_COPY_AND_ASSIGN(ObBatchFreezeTabletsDag);
};

View File

@ -1815,9 +1815,9 @@ TEST_F(TestDagScheduler, test_check_ls_compaction_dag_exist_with_cancel)
ASSERT_TRUE(nullptr != scheduler);
ASSERT_EQ(OB_SUCCESS, scheduler->init(MTL_ID(), time_slice, 64));
EXPECT_EQ(OB_SUCCESS, scheduler->set_thread_score(ObDagPrio::DAG_PRIO_COMPACTION_MID, 1));
EXPECT_EQ(OB_SUCCESS, scheduler->set_thread_score(ObDagPrio::DAG_PRIO_COMPACTION_HIGH, 1));
EXPECT_EQ(OB_SUCCESS, scheduler->set_thread_score(ObDagPrio::DAG_PRIO_COMPACTION_LOW, 1));
EXPECT_EQ(1, scheduler->prio_sche_[ObDagPrio::DAG_PRIO_COMPACTION_MID].limits_);
EXPECT_EQ(1, scheduler->prio_sche_[ObDagPrio::DAG_PRIO_COMPACTION_HIGH].limits_);
EXPECT_EQ(1, scheduler->prio_sche_[ObDagPrio::DAG_PRIO_COMPACTION_LOW].limits_);
LoopWaitTask *wait_task = nullptr;
LoopWaitTask *wait_task2 = nullptr;
@ -1837,7 +1837,7 @@ TEST_F(TestDagScheduler, test_check_ls_compaction_dag_exist_with_cancel)
EXPECT_EQ(OB_SUCCESS, dag->add_task(*wait_task));
EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag));
}
// add 2 dag at prio = DAG_PRIO_COMPACTION_HIGH
// add 2 dag at prio = DAG_PRIO_COMPACTION_LOW
for (int64_t i = 0; i < ls_cnt; ++i) {
ObBatchFreezeTabletsDag *batch_freeze_dag = NULL;
EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(batch_freeze_dag));
@ -1849,7 +1849,7 @@ TEST_F(TestDagScheduler, test_check_ls_compaction_dag_exist_with_cancel)
}
EXPECT_EQ(dag_cnt, scheduler->dag_cnts_[ObDagType::DAG_TYPE_MERGE_EXECUTE]);
CHECK_EQ_UTIL_TIMEOUT(1, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_MID));
CHECK_EQ_UTIL_TIMEOUT(1, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_HIGH));
CHECK_EQ_UTIL_TIMEOUT(1, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_LOW));
// cancel waiting dag of ls_ids[0], all dag of ls_ids[1] will be destroyed when check_cancel
bool exist = false;
@ -2018,6 +2018,71 @@ TEST_F(TestDagScheduler, test_maybe_cycle_tasks)
wait_scheduler();
}
TEST_F(TestDagScheduler, test_max_concurrent_task)
{
ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
ASSERT_TRUE(nullptr != scheduler);
ASSERT_EQ(OB_SUCCESS, scheduler->init(MTL_ID(), time_slice, 64));
EXPECT_EQ(OB_SUCCESS, scheduler->set_thread_score(ObDagPrio::DAG_PRIO_COMPACTION_MID, 7));
EXPECT_EQ(7, scheduler->prio_sche_[ObDagPrio::DAG_PRIO_COMPACTION_MID].limits_);
const int64_t dag_cnt = 3;
ObLSID ls_id(1001);
bool finish_flag[dag_cnt] = {false, false, false};
for (int64_t idx = 0; idx < dag_cnt; ++idx) {
TestCompMidCancelDag *dag = nullptr;
LoopWaitTask *wait_task = nullptr;
ObTabletID tablet_id(200001 + idx);
EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(dag));
dag->max_concurrent_task_cnt_ = 2;
dag->ls_id_ = ls_id;
dag->tablet_id_ = tablet_id;
EXPECT_EQ(OB_SUCCESS, alloc_task(*dag, wait_task));
EXPECT_EQ(OB_SUCCESS, wait_task->init(1, 10 /*cnt*/, finish_flag[idx]));
EXPECT_EQ(OB_SUCCESS, dag->add_task(*wait_task));
EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag));
}
CHECK_EQ_UTIL_TIMEOUT(6, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_MID));
AtomicOperator op(0);
TestDag *dag1 = nullptr;
AtomicIncTask *inc_task = nullptr;
AtomicIncTask *inc_task1 = nullptr;
AtomicMulTask *mul_task = nullptr;
EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(dag1));
EXPECT_EQ(OB_SUCCESS, dag1->init(1));
EXPECT_EQ(OB_SUCCESS, alloc_task(*dag1, inc_task));
EXPECT_EQ(OB_SUCCESS, inc_task->init(1, 10, op));
EXPECT_EQ(OB_SUCCESS, dag1->add_task(*inc_task));
EXPECT_EQ(OB_SUCCESS, alloc_task(*dag1, mul_task));
EXPECT_EQ(OB_SUCCESS, mul_task->init(1, 4, op));
EXPECT_EQ(OB_SUCCESS, mul_task->add_child(*inc_task));
EXPECT_EQ(OB_SUCCESS, dag1->add_task(*mul_task));
EXPECT_EQ(OB_SUCCESS, alloc_task(*dag1, inc_task1));
EXPECT_EQ(OB_SUCCESS, inc_task1->init(1, 10, op));
EXPECT_EQ(OB_SUCCESS, inc_task1->add_child(*mul_task));
EXPECT_EQ(OB_SUCCESS, dag1->add_task(*inc_task1));
EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag1));
CHECK_EQ_UTIL_TIMEOUT(170, op.value());
int64_t start_time = oceanbase::common::ObTimeUtility::current_time();
while (oceanbase::common::ObTimeUtility::current_time() - start_time < CHECK_TIMEOUT) {
const int64_t cnt = scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_MID);
EXPECT_LE(cnt, 7);
if (cnt == 6) {
break;
} else {
usleep(10 * 1000 /*10 ms*/);
}
}
EXPECT_EQ(6, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_MID));
finish_flag[0] = true;
finish_flag[1] = true;
finish_flag[2] = true;
wait_scheduler();
EXPECT_EQ(0, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_MID));
}
/*
TEST_F(TestDagScheduler, test_large_thread_cnt)
{