### What problem does this PR solve? pick #51878 to branch-2.1
This commit is contained in:
@ -52,6 +52,7 @@ void PriorityTaskQueue::close() {
|
||||
std::unique_lock<std::mutex> lock(_work_size_mutex);
|
||||
_closed = true;
|
||||
_wait_task.notify_all();
|
||||
DorisMetrics::instance()->pipeline_task_queue_size->increment(-_total_task_size);
|
||||
}
|
||||
|
||||
PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
|
||||
@ -77,6 +78,7 @@ PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
|
||||
if (task) {
|
||||
task->update_queue_level(level);
|
||||
_total_task_size--;
|
||||
DorisMetrics::instance()->pipeline_task_queue_size->increment(-1);
|
||||
}
|
||||
return task;
|
||||
}
|
||||
@ -126,6 +128,7 @@ Status PriorityTaskQueue::push(PipelineTask* task) {
|
||||
|
||||
_sub_queues[level].push_back(task);
|
||||
_total_task_size++;
|
||||
DorisMetrics::instance()->pipeline_task_queue_size->increment(1);
|
||||
_wait_task.notify_one();
|
||||
return Status::OK();
|
||||
}
|
||||
@ -205,4 +208,4 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
|
||||
}
|
||||
|
||||
} // namespace pipeline
|
||||
} // namespace doris
|
||||
} // namespace doris
|
||||
|
||||
@ -190,6 +190,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
|
||||
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
|
||||
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
|
||||
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
|
||||
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(pipeline_task_queue_size, MetricUnit::NOUNIT);
|
||||
|
||||
const std::string DorisMetrics::_s_registry_name = "doris_be";
|
||||
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
|
||||
@ -317,6 +318,8 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
|
||||
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
|
||||
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
|
||||
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt);
|
||||
|
||||
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, pipeline_task_queue_size);
|
||||
}
|
||||
|
||||
void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::string>& disk_devices,
|
||||
|
||||
@ -244,6 +244,7 @@ public:
|
||||
IntCounter* scanner_ctx_cnt = nullptr;
|
||||
IntCounter* scanner_cnt = nullptr;
|
||||
IntCounter* scanner_task_cnt = nullptr;
|
||||
IntCounter* pipeline_task_queue_size = nullptr;
|
||||
|
||||
static DorisMetrics* instance() {
|
||||
static DorisMetrics instance;
|
||||
|
||||
Reference in New Issue
Block a user