diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 021b05fa5d..3966c8d952 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -62,9 +62,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, _collect_query_statistics_with_every_batch(false) {} PlanFragmentExecutor::~PlanFragmentExecutor() { - // if (_prepared) { close(); - // } // at this point, the report thread should have been stopped DCHECK(!_report_thread_active); } @@ -198,7 +196,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, RETURN_IF_ERROR(_sink->prepare(runtime_state())); RuntimeProfile* sink_profile = _sink->profile(); - if (sink_profile != nullptr) { profile()->add_child(sink_profile, true, nullptr); } @@ -279,8 +276,10 @@ Status PlanFragmentExecutor::open_vectorized_internal() { SCOPED_CPU_TIMER(_fragment_cpu_timer); RETURN_IF_ERROR(_sink->open(runtime_state())); } - doris::vectorized::Block* block = nullptr; + while (true) { + doris::vectorized::Block* block; + { SCOPED_CPU_TIMER(_fragment_cpu_timer); RETURN_IF_ERROR(get_vectorized_internal(&block)); @@ -331,11 +330,10 @@ Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block* return Status::OK(); } - auto vexec_node = static_cast(_plan); while (!_done) { - _block->clear_column_data(vexec_node->row_desc().num_materialized_slots()); + _block->clear_column_data(_plan->row_desc().num_materialized_slots()); SCOPED_TIMER(profile()->total_time_counter()); - RETURN_IF_ERROR(vexec_node->get_next(_runtime_state.get(), _block.get(), &_done)); + RETURN_IF_ERROR(_plan->get_next(_runtime_state.get(), _block.get(), &_done)); if (_block->rows() > 0) { COUNTER_UPDATE(_rows_produced_counter, _block->rows()); @@ -473,8 +471,7 @@ void PlanFragmentExecutor::report_profile() { // two cases (e.g. there is a race here where the wait timed out but before grabbing // the lock, the condition variable was signaled). Instead, we will use an external // flag, _report_thread_active, to coordinate this. - _stop_report_thread_cv.wait_for(l, - std::chrono::seconds(config::status_report_interval)); + _stop_report_thread_cv.wait_for(l, std::chrono::seconds(config::status_report_interval)); } else { LOG(WARNING) << "config::status_report_interval is equal to or less than zero, exiting " "reporting thread."; @@ -617,11 +614,13 @@ void PlanFragmentExecutor::cancel() { _runtime_state->set_is_cancelled(true); // must close stream_mgr to avoid dead lock in Exchange Node + auto env = _runtime_state->exec_env(); + auto id = _runtime_state->fragment_instance_id(); if (_runtime_state->enable_vectorized_exec()) { - _runtime_state->exec_env()->vstream_mgr()->cancel(_runtime_state->fragment_instance_id()); + env->vstream_mgr()->cancel(id); } else { - _runtime_state->exec_env()->stream_mgr()->cancel(_runtime_state->fragment_instance_id()); - _runtime_state->exec_env()->result_mgr()->cancel(_runtime_state->fragment_instance_id()); + env->stream_mgr()->cancel(id); + env->result_mgr()->cancel(id); } } diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 3cdb6bb249..874daa64c2 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -138,7 +138,7 @@ public: const Status& status() const { return _status; } - DataSink* get_sink() { return _sink.get(); } + DataSink* get_sink() const { return _sink.get(); } void set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; } @@ -245,7 +245,7 @@ private: // Idempotent. void stop_report_thread(); - const DescriptorTbl& desc_tbl() { return _runtime_state->desc_tbl(); } + const DescriptorTbl& desc_tbl() const { return _runtime_state->desc_tbl(); } void _collect_query_statistics();