[pipelineX](api) Fix core dump for pipelineX API (#27437)
This commit is contained in:
@ -241,12 +241,6 @@ public:
|
||||
virtual Status sink(RuntimeState* state, vectorized::Block* block,
|
||||
SourceState source_state) = 0;
|
||||
|
||||
virtual Status finalize(RuntimeState* state) {
|
||||
std::stringstream error_msg;
|
||||
error_msg << " not a sink, can not finalize";
|
||||
return Status::NotSupported(error_msg.str());
|
||||
}
|
||||
|
||||
/**
|
||||
* pending_finish means we have called `close` and there are still some work to do before finishing.
|
||||
* Now it is a pull-based pipeline and operators pull data from its child by this method.
|
||||
@ -323,8 +317,6 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status finalize(RuntimeState* state) override { return Status::OK(); }
|
||||
|
||||
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override { return _sink->profile(); }
|
||||
void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override {
|
||||
_sink->set_query_statistics(statistics);
|
||||
@ -391,8 +383,6 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status finalize(RuntimeState* state) override { return Status::OK(); }
|
||||
|
||||
bool can_read() override { return _node->can_read(); }
|
||||
|
||||
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
|
||||
|
||||
@ -42,8 +42,6 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
|
||||
_agg_arena_pool = std::make_unique<vectorized::Arena>();
|
||||
_hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", TUnit::UNIT);
|
||||
_build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
|
||||
_partition_sort_timer = ADD_TIMER(_profile, "PartitionSortTime");
|
||||
_get_sorted_timer = ADD_TIMER(_profile, "GetSortedTime");
|
||||
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
|
||||
_emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
|
||||
_init_hash_method();
|
||||
|
||||
@ -84,8 +84,6 @@ private:
|
||||
|
||||
RuntimeProfile::Counter* _build_timer;
|
||||
RuntimeProfile::Counter* _emplace_key_timer;
|
||||
RuntimeProfile::Counter* _partition_sort_timer;
|
||||
RuntimeProfile::Counter* _get_sorted_timer;
|
||||
RuntimeProfile::Counter* _selector_block_timer;
|
||||
|
||||
RuntimeProfile::Counter* _hash_table_size_counter;
|
||||
|
||||
@ -33,7 +33,6 @@ Status PartitionSortSourceLocalState::init(RuntimeState* state, LocalStateInfo&
|
||||
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortSourceDependency>::init(state, info));
|
||||
SCOPED_TIMER(exec_time_counter());
|
||||
SCOPED_TIMER(_open_timer);
|
||||
_get_next_timer = ADD_TIMER(profile(), "GetResultTime");
|
||||
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
|
||||
_shared_state->previous_row = std::make_unique<vectorized::SortCursorCmp>();
|
||||
return Status::OK();
|
||||
@ -44,7 +43,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
|
||||
RETURN_IF_CANCELLED(state);
|
||||
auto& local_state = get_local_state(state);
|
||||
SCOPED_TIMER(local_state.exec_time_counter());
|
||||
SCOPED_TIMER(local_state._get_next_timer);
|
||||
output_block->clear_column_data();
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
|
||||
|
||||
@ -83,15 +83,13 @@ public:
|
||||
using Base = PipelineXLocalState<PartitionSortSourceDependency>;
|
||||
PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent)
|
||||
: PipelineXLocalState<PartitionSortSourceDependency>(state, parent),
|
||||
_get_sorted_timer(nullptr),
|
||||
_get_next_timer(nullptr) {}
|
||||
_get_sorted_timer(nullptr) {}
|
||||
|
||||
Status init(RuntimeState* state, LocalStateInfo& info) override;
|
||||
|
||||
private:
|
||||
friend class PartitionSortSourceOperatorX;
|
||||
RuntimeProfile::Counter* _get_sorted_timer;
|
||||
RuntimeProfile::Counter* _get_next_timer;
|
||||
std::atomic<int> _sort_idx = 0;
|
||||
};
|
||||
|
||||
|
||||
@ -113,7 +113,6 @@ void PipelineTask::_init_profile() {
|
||||
_get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time);
|
||||
_get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", TUnit::UNIT);
|
||||
_sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
|
||||
_finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time);
|
||||
_close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
|
||||
|
||||
_wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime");
|
||||
@ -320,18 +319,6 @@ Status PipelineTask::execute(bool* eos) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PipelineTask::finalize() {
|
||||
SCOPED_TIMER(_task_profile->total_time_counter());
|
||||
SCOPED_CPU_TIMER(_task_cpu_timer);
|
||||
Defer defer {[&]() {
|
||||
if (_task_queue) {
|
||||
_task_queue->update_statistics(this, _finalize_timer->value());
|
||||
}
|
||||
}};
|
||||
SCOPED_TIMER(_finalize_timer);
|
||||
return _sink->finalize(_state);
|
||||
}
|
||||
|
||||
Status PipelineTask::_collect_query_statistics() {
|
||||
// The execnode tree of a fragment will be split into multiple pipelines, we only need to collect the root pipeline.
|
||||
if (_pipeline->is_root_pipeline()) {
|
||||
|
||||
@ -172,7 +172,7 @@ public:
|
||||
|
||||
virtual bool sink_can_write() { return _sink->can_write() || _pipeline->_always_can_write; }
|
||||
|
||||
virtual Status finalize();
|
||||
virtual void finalize() {}
|
||||
|
||||
PipelineFragmentContext* fragment_context() { return _fragment_context; }
|
||||
|
||||
@ -193,8 +193,6 @@ public:
|
||||
_previous_schedule_id = id;
|
||||
}
|
||||
|
||||
virtual void release_dependency() {}
|
||||
|
||||
bool has_dependency();
|
||||
|
||||
OperatorPtr get_root() { return _root; }
|
||||
@ -315,7 +313,6 @@ protected:
|
||||
RuntimeProfile::Counter* _get_block_timer;
|
||||
RuntimeProfile::Counter* _get_block_counter;
|
||||
RuntimeProfile::Counter* _sink_timer;
|
||||
RuntimeProfile::Counter* _finalize_timer;
|
||||
RuntimeProfile::Counter* _close_timer;
|
||||
RuntimeProfile::Counter* _block_counts;
|
||||
RuntimeProfile::Counter* _block_by_source_counts;
|
||||
|
||||
@ -182,8 +182,6 @@ public:
|
||||
|
||||
Status open(RuntimeState* state) override;
|
||||
|
||||
Status finalize(RuntimeState* state) override { return Status::OK(); }
|
||||
|
||||
[[nodiscard]] bool can_terminate_early() override { return false; }
|
||||
|
||||
[[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; }
|
||||
@ -518,8 +516,6 @@ public:
|
||||
|
||||
[[nodiscard]] std::string get_name() const override { return _name; }
|
||||
|
||||
Status finalize(RuntimeState* state) override { return Status::OK(); }
|
||||
|
||||
virtual bool should_dry_run(RuntimeState* state) { return false; }
|
||||
|
||||
protected:
|
||||
|
||||
@ -1044,7 +1044,7 @@ std::string PipelineXFragmentContext::debug_string() {
|
||||
for (size_t j = 0; j < _tasks.size(); j++) {
|
||||
fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
|
||||
for (size_t i = 0; i < _tasks[j].size(); i++) {
|
||||
if (_tasks[j][i]->get_state() == PipelineTaskState::FINISHED) {
|
||||
if (_tasks[j][i]->is_finished()) {
|
||||
continue;
|
||||
}
|
||||
fmt::format_to(debug_string_buffer, "Task {}: {}\n", i, _tasks[j][i]->debug_string());
|
||||
|
||||
@ -156,7 +156,6 @@ void PipelineXTask::_init_profile() {
|
||||
_get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time);
|
||||
_get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", TUnit::UNIT);
|
||||
_sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
|
||||
_finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time);
|
||||
_close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
|
||||
|
||||
_wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
|
||||
@ -289,16 +288,14 @@ Status PipelineXTask::execute(bool* eos) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PipelineXTask::finalize() {
|
||||
SCOPED_TIMER(_task_profile->total_time_counter());
|
||||
SCOPED_CPU_TIMER(_task_cpu_timer);
|
||||
Defer defer {[&]() {
|
||||
if (_task_queue) {
|
||||
_task_queue->update_statistics(this, _finalize_timer->value());
|
||||
}
|
||||
}};
|
||||
SCOPED_TIMER(_finalize_timer);
|
||||
return _sink->finalize(_state);
|
||||
void PipelineXTask::finalize() {
|
||||
PipelineTask::finalize();
|
||||
std::unique_lock<std::mutex> lc(_release_lock);
|
||||
_finished = true;
|
||||
std::vector<DependencySPtr> {}.swap(_downstream_dependency);
|
||||
DependencyMap {}.swap(_upstream_dependency);
|
||||
|
||||
_local_exchange_state = nullptr;
|
||||
}
|
||||
|
||||
Status PipelineXTask::try_close(Status exec_status) {
|
||||
@ -338,6 +335,10 @@ Status PipelineXTask::close(Status exec_status) {
|
||||
}
|
||||
|
||||
std::string PipelineXTask::debug_string() {
|
||||
std::unique_lock<std::mutex> lc(_release_lock);
|
||||
if (_finished) {
|
||||
return "ALREADY FINISHED";
|
||||
}
|
||||
fmt::memory_buffer debug_string_buffer;
|
||||
|
||||
fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id()));
|
||||
@ -374,7 +375,7 @@ std::string PipelineXTask::debug_string() {
|
||||
fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
|
||||
for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
|
||||
fmt::format_to(debug_string_buffer, "{}. {}\n", i,
|
||||
_finish_dependencies[i]->debug_string(j + 1));
|
||||
_finish_dependencies[j]->debug_string(j + 1));
|
||||
}
|
||||
return fmt::to_string(debug_string_buffer);
|
||||
}
|
||||
|
||||
@ -84,7 +84,9 @@ public:
|
||||
|
||||
bool sink_can_write() override { return _write_blocked_dependency() == nullptr; }
|
||||
|
||||
Status finalize() override;
|
||||
void finalize() override;
|
||||
|
||||
bool is_finished() const { return _finished.load(); }
|
||||
|
||||
std::string debug_string() override;
|
||||
|
||||
@ -103,13 +105,6 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void release_dependency() override {
|
||||
std::vector<DependencySPtr> {}.swap(_downstream_dependency);
|
||||
DependencyMap {}.swap(_upstream_dependency);
|
||||
|
||||
_local_exchange_state = nullptr;
|
||||
}
|
||||
|
||||
std::vector<DependencySPtr>& get_upstream_dependency(int id) {
|
||||
if (_upstream_dependency.find(id) == _upstream_dependency.end()) {
|
||||
_upstream_dependency.insert({id, {DependencySPtr {}}});
|
||||
@ -212,6 +207,8 @@ private:
|
||||
Dependency* _blocked_dep {nullptr};
|
||||
|
||||
std::atomic<bool> _use_blocking_queue {true};
|
||||
std::atomic<bool> _finished {false};
|
||||
std::mutex _release_lock;
|
||||
};
|
||||
|
||||
} // namespace doris::pipeline
|
||||
|
||||
@ -313,23 +313,15 @@ void TaskScheduler::_do_work(size_t index) {
|
||||
task->set_eos_time();
|
||||
// TODO: pipeline parallel need to wait the last task finish to call finalize
|
||||
// and find_p_dependency
|
||||
status = task->finalize();
|
||||
if (!status.ok()) {
|
||||
// execute failed,cancel all fragment
|
||||
fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
|
||||
"finalize fail:" + status.msg());
|
||||
} else {
|
||||
VLOG_DEBUG << fmt::format(
|
||||
"Try close task: {}, fragment_ctx->is_canceled(): {}",
|
||||
PrintInstanceStandardInfo(
|
||||
task->query_context()->query_id(),
|
||||
task->fragment_context()->get_fragment_instance_id()),
|
||||
fragment_ctx->is_canceled());
|
||||
_try_close_task(task,
|
||||
fragment_ctx->is_canceled() ? PipelineTaskState::CANCELED
|
||||
: PipelineTaskState::FINISHED,
|
||||
status);
|
||||
}
|
||||
VLOG_DEBUG << fmt::format(
|
||||
"Try close task: {}, fragment_ctx->is_canceled(): {}",
|
||||
PrintInstanceStandardInfo(task->query_context()->query_id(),
|
||||
task->fragment_context()->get_fragment_instance_id()),
|
||||
fragment_ctx->is_canceled());
|
||||
_try_close_task(task,
|
||||
fragment_ctx->is_canceled() ? PipelineTaskState::CANCELED
|
||||
: PipelineTaskState::FINISHED,
|
||||
status);
|
||||
VLOG_DEBUG << fmt::format(
|
||||
"Task {} is eos, status {}.",
|
||||
PrintInstanceStandardInfo(task->query_context()->query_id(),
|
||||
@ -388,7 +380,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state,
|
||||
}
|
||||
task->set_state(state);
|
||||
task->set_close_pipeline_time();
|
||||
task->release_dependency();
|
||||
task->finalize();
|
||||
task->set_running(false);
|
||||
// close_a_pipeline may delete fragment context and will core in some defer
|
||||
// code, because the defer code will access fragment context it self.
|
||||
|
||||
Reference in New Issue
Block a user