From c9e2df607b19df9fd9d791dc659cc59c52df16f7 Mon Sep 17 00:00:00 2001 From: camby <104178625@qq.com> Date: Tue, 24 Jun 2025 09:27:13 +0800 Subject: [PATCH] [opt](metrics) add metrics pipeline_task_queue_size (#51878) (#52141) ### What problem does this PR solve? pick #51878 to branch-2.1 --- be/src/pipeline/task_queue.cpp | 5 ++++- be/src/util/doris_metrics.cpp | 3 +++ be/src/util/doris_metrics.h | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index ad89a7d56a..4ed2faf0ca 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -52,6 +52,7 @@ void PriorityTaskQueue::close() { std::unique_lock lock(_work_size_mutex); _closed = true; _wait_task.notify_all(); + DorisMetrics::instance()->pipeline_task_queue_size->increment(-_total_task_size); } PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) { @@ -77,6 +78,7 @@ PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) { if (task) { task->update_queue_level(level); _total_task_size--; + DorisMetrics::instance()->pipeline_task_queue_size->increment(-1); } return task; } @@ -126,6 +128,7 @@ Status PriorityTaskQueue::push(PipelineTask* task) { _sub_queues[level].push_back(task); _total_task_size++; + DorisMetrics::instance()->pipeline_task_queue_size->increment(1); _wait_task.notify_one(); return Status::OK(); } @@ -205,4 +208,4 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) { } } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 76f93fbb96..efd0333621 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -190,6 +190,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(pipeline_task_queue_size, MetricUnit::NOUNIT); const std::string DorisMetrics::_s_registry_name = "doris_be"; const std::string DorisMetrics::_s_hook_name = "doris_metrics"; @@ -317,6 +318,8 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt); + + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, pipeline_task_queue_size); } void DorisMetrics::initialize(bool init_system_metrics, const std::set& disk_devices, diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 0b37882f77..432189d6c1 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -244,6 +244,7 @@ public: IntCounter* scanner_ctx_cnt = nullptr; IntCounter* scanner_cnt = nullptr; IntCounter* scanner_task_cnt = nullptr; + IntCounter* pipeline_task_queue_size = nullptr; static DorisMetrics* instance() { static DorisMetrics instance;