[refactor](callonce) remove callonce usage in pipeline task to get full stack trace when core (#27331)

---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2023-11-21 19:42:39 +08:00
committed by GitHub
parent dea40e7095
commit ffd4face00
5 changed files with 22 additions and 8 deletions

View File

@ -713,7 +713,7 @@ Status PipelineFragmentContext::submit() {
if (!st.ok()) {
std::lock_guard<std::mutex> l(_task_mutex);
if (_closed_tasks == _total_tasks) {
std::call_once(_close_once_flag, [this] { _close_action(); });
_close_fragment_instance();
}
return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
BackendOptions::get_localhost());
@ -851,7 +851,13 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
return _root_pipeline->set_sink(sink_);
}
void PipelineFragmentContext::_close_action() {
// If all pipeline tasks binded to the fragment instance are finished, then we could
// close the fragment instance.
void PipelineFragmentContext::_close_fragment_instance() {
if (_is_fragment_instance_closed) {
return;
}
Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
static_cast<void>(send_report(true));
// all submitted tasks done
@ -862,7 +868,7 @@ void PipelineFragmentContext::close_a_pipeline() {
std::lock_guard<std::mutex> l(_task_mutex);
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
std::call_once(_close_once_flag, [this] { _close_action(); });
_close_fragment_instance();
}
}

View File

@ -153,7 +153,7 @@ protected:
virtual Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request);
template <bool is_intersect>
Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
virtual void _close_action();
virtual void _close_fragment_instance();
void _init_next_report_time();
void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
@ -205,7 +205,7 @@ protected:
RuntimeProfile::Counter* _prepare_timer;
std::function<void(RuntimeState*, Status*)> _call_back;
std::once_flag _close_once_flag;
bool _is_fragment_instance_closed = false;
// If this is set to false, and '_is_report_success' is false as well,
// This executor will not report status to FE on being cancelled.

View File

@ -939,7 +939,7 @@ Status PipelineXFragmentContext::submit() {
if (!st.ok()) {
std::lock_guard<std::mutex> l(_task_mutex);
if (_closed_tasks == _total_tasks) {
std::call_once(_close_once_flag, [this] { _close_action(); });
_close_fragment_instance();
}
return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
BackendOptions::get_localhost());
@ -969,7 +969,11 @@ void PipelineXFragmentContext::close_if_prepare_failed() {
}
}
void PipelineXFragmentContext::_close_action() {
void PipelineXFragmentContext::_close_fragment_instance() {
if (_is_fragment_instance_closed) {
return;
}
Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
static_cast<void>(send_report(true));
// all submitted tasks done

View File

@ -118,7 +118,7 @@ public:
std::string debug_string() override;
private:
void _close_action() override;
void _close_fragment_instance() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override;
Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, PipelinePtr& cur_pipe,
const std::vector<TExpr>& texprs);

View File

@ -390,6 +390,10 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state,
task->set_close_pipeline_time();
task->release_dependency();
task->set_running(false);
// close_a_pipeline may delete fragment context and will core in some defer
// code, because the defer code will access fragment context it self.
std::shared_ptr<PipelineFragmentContext> lock_for_context =
task->fragment_context()->shared_from_this();
task->fragment_context()->close_a_pipeline();
}