From fe5c4e3b465ff4055d979ba2624c86526428fa45 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 23 Nov 2023 00:24:32 +0800 Subject: [PATCH] [pipelineX](api) Fix core dump for pipelineX API (#27437) --- be/src/pipeline/exec/operator.h | 10 ------- .../exec/partition_sort_sink_operator.cpp | 2 -- .../exec/partition_sort_sink_operator.h | 2 -- .../exec/partition_sort_source_operator.cpp | 2 -- .../exec/partition_sort_source_operator.h | 4 +-- be/src/pipeline/pipeline_task.cpp | 13 --------- be/src/pipeline/pipeline_task.h | 5 +--- be/src/pipeline/pipeline_x/operator.h | 4 --- .../pipeline_x_fragment_context.cpp | 2 +- .../pipeline/pipeline_x/pipeline_x_task.cpp | 25 +++++++++-------- be/src/pipeline/pipeline_x/pipeline_x_task.h | 13 ++++----- be/src/pipeline/task_scheduler.cpp | 28 +++++++------------ 12 files changed, 31 insertions(+), 79 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 5362be88e8..d733064875 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -241,12 +241,6 @@ public: virtual Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) = 0; - virtual Status finalize(RuntimeState* state) { - std::stringstream error_msg; - error_msg << " not a sink, can not finalize"; - return Status::NotSupported(error_msg.str()); - } - /** * pending_finish means we have called `close` and there are still some work to do before finishing. * Now it is a pull-based pipeline and operators pull data from its child by this method. @@ -323,8 +317,6 @@ public: return Status::OK(); } - Status finalize(RuntimeState* state) override { return Status::OK(); } - [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { return _sink->profile(); } void set_query_statistics(std::shared_ptr statistics) override { _sink->set_query_statistics(statistics); @@ -391,8 +383,6 @@ public: return Status::OK(); } - Status finalize(RuntimeState* state) override { return Status::OK(); } - bool can_read() override { return _node->can_read(); } [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index d7a2bc4c92..327d186d77 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -42,8 +42,6 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _agg_arena_pool = std::make_unique(); _hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", TUnit::UNIT); _build_timer = ADD_TIMER(_profile, "HashTableBuildTime"); - _partition_sort_timer = ADD_TIMER(_profile, "PartitionSortTime"); - _get_sorted_timer = ADD_TIMER(_profile, "GetSortedTime"); _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime"); _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime"); _init_hash_method(); diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index dbfb7ad5d0..62bf7c16b4 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -84,8 +84,6 @@ private: RuntimeProfile::Counter* _build_timer; RuntimeProfile::Counter* _emplace_key_timer; - RuntimeProfile::Counter* _partition_sort_timer; - RuntimeProfile::Counter* _get_sorted_timer; RuntimeProfile::Counter* _selector_block_timer; RuntimeProfile::Counter* _hash_table_size_counter; diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index 0a2f28b19f..56db3852bc 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -33,7 +33,6 @@ Status PartitionSortSourceLocalState::init(RuntimeState* state, LocalStateInfo& RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); - _get_next_timer = ADD_TIMER(profile(), "GetResultTime"); _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime"); _shared_state->previous_row = std::make_unique(); return Status::OK(); @@ -44,7 +43,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - SCOPED_TIMER(local_state._get_next_timer); output_block->clear_column_data(); { std::lock_guard lock(local_state._shared_state->buffer_mutex); diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index df22bde636..719f6c1388 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -83,15 +83,13 @@ public: using Base = PipelineXLocalState; PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState(state, parent), - _get_sorted_timer(nullptr), - _get_next_timer(nullptr) {} + _get_sorted_timer(nullptr) {} Status init(RuntimeState* state, LocalStateInfo& info) override; private: friend class PartitionSortSourceOperatorX; RuntimeProfile::Counter* _get_sorted_timer; - RuntimeProfile::Counter* _get_next_timer; std::atomic _sort_idx = 0; }; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 72552b4846..7d9900f637 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -113,7 +113,6 @@ void PipelineTask::_init_profile() { _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time); _get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", TUnit::UNIT); _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time); - _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time); _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time); _wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime"); @@ -320,18 +319,6 @@ Status PipelineTask::execute(bool* eos) { return Status::OK(); } -Status PipelineTask::finalize() { - SCOPED_TIMER(_task_profile->total_time_counter()); - SCOPED_CPU_TIMER(_task_cpu_timer); - Defer defer {[&]() { - if (_task_queue) { - _task_queue->update_statistics(this, _finalize_timer->value()); - } - }}; - SCOPED_TIMER(_finalize_timer); - return _sink->finalize(_state); -} - Status PipelineTask::_collect_query_statistics() { // The execnode tree of a fragment will be split into multiple pipelines, we only need to collect the root pipeline. if (_pipeline->is_root_pipeline()) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 0b26c9d215..4e9c2f8cc9 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -172,7 +172,7 @@ public: virtual bool sink_can_write() { return _sink->can_write() || _pipeline->_always_can_write; } - virtual Status finalize(); + virtual void finalize() {} PipelineFragmentContext* fragment_context() { return _fragment_context; } @@ -193,8 +193,6 @@ public: _previous_schedule_id = id; } - virtual void release_dependency() {} - bool has_dependency(); OperatorPtr get_root() { return _root; } @@ -315,7 +313,6 @@ protected: RuntimeProfile::Counter* _get_block_timer; RuntimeProfile::Counter* _get_block_counter; RuntimeProfile::Counter* _sink_timer; - RuntimeProfile::Counter* _finalize_timer; RuntimeProfile::Counter* _close_timer; RuntimeProfile::Counter* _block_counts; RuntimeProfile::Counter* _block_by_source_counts; diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 943d8c4670..3f9548099a 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -182,8 +182,6 @@ public: Status open(RuntimeState* state) override; - Status finalize(RuntimeState* state) override { return Status::OK(); } - [[nodiscard]] bool can_terminate_early() override { return false; } [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; } @@ -518,8 +516,6 @@ public: [[nodiscard]] std::string get_name() const override { return _name; } - Status finalize(RuntimeState* state) override { return Status::OK(); } - virtual bool should_dry_run(RuntimeState* state) { return false; } protected: diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index e47b55e491..e832124bf8 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1044,7 +1044,7 @@ std::string PipelineXFragmentContext::debug_string() { for (size_t j = 0; j < _tasks.size(); j++) { fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j); for (size_t i = 0; i < _tasks[j].size(); i++) { - if (_tasks[j][i]->get_state() == PipelineTaskState::FINISHED) { + if (_tasks[j][i]->is_finished()) { continue; } fmt::format_to(debug_string_buffer, "Task {}: {}\n", i, _tasks[j][i]->debug_string()); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 703e4862f7..110faba5ca 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -156,7 +156,6 @@ void PipelineXTask::_init_profile() { _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time); _get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", TUnit::UNIT); _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time); - _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time); _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time); _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime"); @@ -289,16 +288,14 @@ Status PipelineXTask::execute(bool* eos) { return Status::OK(); } -Status PipelineXTask::finalize() { - SCOPED_TIMER(_task_profile->total_time_counter()); - SCOPED_CPU_TIMER(_task_cpu_timer); - Defer defer {[&]() { - if (_task_queue) { - _task_queue->update_statistics(this, _finalize_timer->value()); - } - }}; - SCOPED_TIMER(_finalize_timer); - return _sink->finalize(_state); +void PipelineXTask::finalize() { + PipelineTask::finalize(); + std::unique_lock lc(_release_lock); + _finished = true; + std::vector {}.swap(_downstream_dependency); + DependencyMap {}.swap(_upstream_dependency); + + _local_exchange_state = nullptr; } Status PipelineXTask::try_close(Status exec_status) { @@ -338,6 +335,10 @@ Status PipelineXTask::close(Status exec_status) { } std::string PipelineXTask::debug_string() { + std::unique_lock lc(_release_lock); + if (_finished) { + return "ALREADY FINISHED"; + } fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id())); @@ -374,7 +375,7 @@ std::string PipelineXTask::debug_string() { fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n"); for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) { fmt::format_to(debug_string_buffer, "{}. {}\n", i, - _finish_dependencies[i]->debug_string(j + 1)); + _finish_dependencies[j]->debug_string(j + 1)); } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 46d006d0da..621773b2ac 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -84,7 +84,9 @@ public: bool sink_can_write() override { return _write_blocked_dependency() == nullptr; } - Status finalize() override; + void finalize() override; + + bool is_finished() const { return _finished.load(); } std::string debug_string() override; @@ -103,13 +105,6 @@ public: } } - void release_dependency() override { - std::vector {}.swap(_downstream_dependency); - DependencyMap {}.swap(_upstream_dependency); - - _local_exchange_state = nullptr; - } - std::vector& get_upstream_dependency(int id) { if (_upstream_dependency.find(id) == _upstream_dependency.end()) { _upstream_dependency.insert({id, {DependencySPtr {}}}); @@ -212,6 +207,8 @@ private: Dependency* _blocked_dep {nullptr}; std::atomic _use_blocking_queue {true}; + std::atomic _finished {false}; + std::mutex _release_lock; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 5470ba2259..cc6e502fb5 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -313,23 +313,15 @@ void TaskScheduler::_do_work(size_t index) { task->set_eos_time(); // TODO: pipeline parallel need to wait the last task finish to call finalize // and find_p_dependency - status = task->finalize(); - if (!status.ok()) { - // execute failed,cancel all fragment - fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, - "finalize fail:" + status.msg()); - } else { - VLOG_DEBUG << fmt::format( - "Try close task: {}, fragment_ctx->is_canceled(): {}", - PrintInstanceStandardInfo( - task->query_context()->query_id(), - task->fragment_context()->get_fragment_instance_id()), - fragment_ctx->is_canceled()); - _try_close_task(task, - fragment_ctx->is_canceled() ? PipelineTaskState::CANCELED - : PipelineTaskState::FINISHED, - status); - } + VLOG_DEBUG << fmt::format( + "Try close task: {}, fragment_ctx->is_canceled(): {}", + PrintInstanceStandardInfo(task->query_context()->query_id(), + task->fragment_context()->get_fragment_instance_id()), + fragment_ctx->is_canceled()); + _try_close_task(task, + fragment_ctx->is_canceled() ? PipelineTaskState::CANCELED + : PipelineTaskState::FINISHED, + status); VLOG_DEBUG << fmt::format( "Task {} is eos, status {}.", PrintInstanceStandardInfo(task->query_context()->query_id(), @@ -388,7 +380,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, } task->set_state(state); task->set_close_pipeline_time(); - task->release_dependency(); + task->finalize(); task->set_running(false); // close_a_pipeline may delete fragment context and will core in some defer // code, because the defer code will access fragment context it self.