diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 393caea0d0..c55ef9e305 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -35,6 +35,7 @@ void PipelineTask::_init_profile() { _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime"); _wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime"); _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT); + _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT); _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT); } @@ -178,6 +179,7 @@ Status PipelineTask::close() { } if (_opened) { COUNTER_UPDATE(_wait_source_timer, _wait_source_watcher.elapsed_time()); + COUNTER_UPDATE(_schedule_counts, _schedule_time); COUNTER_UPDATE(_wait_sink_timer, _wait_sink_watcher.elapsed_time()); COUNTER_UPDATE(_wait_worker_timer, _wait_worker_watcher.elapsed_time()); COUNTER_UPDATE(_wait_schedule_timer, _wait_schedule_watcher.elapsed_time()); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index ecc7f884b6..f14669f41a 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -121,6 +121,8 @@ public: void start_schedule_watcher() { _wait_schedule_watcher.start(); } void stop_schedule_watcher() { _wait_schedule_watcher.stop(); } + int pipeline_id() const { return _pipeline->_pipeline_id; } + PipelineTaskState get_state() { return _cur_state; } void set_state(PipelineTaskState state); @@ -191,6 +193,7 @@ private: RuntimeProfile::Counter* _sink_timer; RuntimeProfile::Counter* _get_block_timer; RuntimeProfile::Counter* _block_counts; + RuntimeProfile::Counter* _schedule_counts; MonotonicStopWatch _wait_source_watcher; RuntimeProfile::Counter* _wait_source_timer; MonotonicStopWatch _wait_sink_watcher; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index b9ca3c9f2a..93f569ed31 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -166,7 +166,7 @@ PipelineTask* TaskQueue::steal_take(size_t core_id) { next_id = 0; } DCHECK(next_id < _core_size); - auto task = _async_queue[core_id].try_take(); + auto task = _async_queue[next_id].try_take(); if (task) { return task; } diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 0a6bf81554..6432001fbc 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -108,7 +108,7 @@ private: size_t _core_size; std::atomic _next_core = 0; std::atomic _closed; - static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 200; + static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; }; } // namespace pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 915a2e8935..c5e5ee57c1 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -285,7 +285,6 @@ void TaskScheduler::_do_work(size_t index) { break; } } - LOG(INFO) << "Stop TaskScheduler worker " << index; } void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) { diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 9770c2800a..28497bfadb 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -228,7 +228,6 @@ template Status VSetOperationNode::get_next(RuntimeState* state, Block* output_block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExceptNode::get_next"); - SCOPED_TIMER(_probe_timer); return pull(state, output_block, eos); } @@ -239,6 +238,7 @@ Status VSetOperationNode::prepare(RuntimeState* state) { SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); + _pull_timer = ADD_TIMER(runtime_profile(), "PullTime"); // Prepare result expr lists. for (int i = 0; i < _child_expr_lists.size(); ++i) { @@ -348,6 +348,7 @@ void VSetOperationNode::hash_table_init() { template Status VSetOperationNode::sink(RuntimeState*, Block* block, bool eos) { + SCOPED_TIMER(_build_timer); constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; if (block->rows() != 0) { @@ -382,6 +383,7 @@ Status VSetOperationNode::sink(RuntimeState*, Block* block, bool e template Status VSetOperationNode::pull(RuntimeState* state, Block* output_block, bool* eos) { + SCOPED_TIMER(_pull_timer); create_mutable_cols(output_block); auto st = std::visit( [&](auto&& arg) -> Status { @@ -411,7 +413,6 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { bool eos = false; while (!eos) { block.clear_column_data(); - SCOPED_TIMER(_build_timer); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR_AND_CHECK_SPAN( child(0)->get_next_after_projects( @@ -471,6 +472,7 @@ void VSetOperationNode::add_result_columns(RowRefListWithFlags& va template Status VSetOperationNode::sink_probe(RuntimeState* /*state*/, int child_id, Block* block, bool eos) { + SCOPED_TIMER(_probe_timer); CHECK(_build_finished) << "cannot sink probe data before build finished"; if (child_id > 1) { CHECK(_probe_finished_children_index[child_id - 1]) diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index 1e8b5e1ebe..8b8d10e9e3 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -110,6 +110,7 @@ private: MutableBlock _mutable_block; RuntimeProfile::Counter* _build_timer; // time to build hash table RuntimeProfile::Counter* _probe_timer; // time to probe + RuntimeProfile::Counter* _pull_timer; // time to pull data template friend struct HashTableBuild;