[refactor] (runtime)tidy up the plan_fragment_executor codes (#8110)

Co-authored-by: zuochunwei <zuochunwei@meituan.com>
This commit is contained in:
zuochunwei
2022-02-22 09:20:27 +08:00
committed by GitHub
parent 0d5b297cad
commit d0ee101c2f
2 changed files with 13 additions and 14 deletions

View File

@ -62,9 +62,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
_collect_query_statistics_with_every_batch(false) {}
PlanFragmentExecutor::~PlanFragmentExecutor() {
// if (_prepared) {
close();
// }
// at this point, the report thread should have been stopped
DCHECK(!_report_thread_active);
}
@ -198,7 +196,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
RETURN_IF_ERROR(_sink->prepare(runtime_state()));
RuntimeProfile* sink_profile = _sink->profile();
if (sink_profile != nullptr) {
profile()->add_child(sink_profile, true, nullptr);
}
@ -279,8 +276,10 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
SCOPED_CPU_TIMER(_fragment_cpu_timer);
RETURN_IF_ERROR(_sink->open(runtime_state()));
}
doris::vectorized::Block* block = nullptr;
while (true) {
doris::vectorized::Block* block;
{
SCOPED_CPU_TIMER(_fragment_cpu_timer);
RETURN_IF_ERROR(get_vectorized_internal(&block));
@ -331,11 +330,10 @@ Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
return Status::OK();
}
auto vexec_node = static_cast<doris::ExecNode*>(_plan);
while (!_done) {
_block->clear_column_data(vexec_node->row_desc().num_materialized_slots());
_block->clear_column_data(_plan->row_desc().num_materialized_slots());
SCOPED_TIMER(profile()->total_time_counter());
RETURN_IF_ERROR(vexec_node->get_next(_runtime_state.get(), _block.get(), &_done));
RETURN_IF_ERROR(_plan->get_next(_runtime_state.get(), _block.get(), &_done));
if (_block->rows() > 0) {
COUNTER_UPDATE(_rows_produced_counter, _block->rows());
@ -473,8 +471,7 @@ void PlanFragmentExecutor::report_profile() {
// two cases (e.g. there is a race here where the wait timed out but before grabbing
// the lock, the condition variable was signaled). Instead, we will use an external
// flag, _report_thread_active, to coordinate this.
_stop_report_thread_cv.wait_for(l,
std::chrono::seconds(config::status_report_interval));
_stop_report_thread_cv.wait_for(l, std::chrono::seconds(config::status_report_interval));
} else {
LOG(WARNING) << "config::status_report_interval is equal to or less than zero, exiting "
"reporting thread.";
@ -617,11 +614,13 @@ void PlanFragmentExecutor::cancel() {
_runtime_state->set_is_cancelled(true);
// must close stream_mgr to avoid dead lock in Exchange Node
auto env = _runtime_state->exec_env();
auto id = _runtime_state->fragment_instance_id();
if (_runtime_state->enable_vectorized_exec()) {
_runtime_state->exec_env()->vstream_mgr()->cancel(_runtime_state->fragment_instance_id());
env->vstream_mgr()->cancel(id);
} else {
_runtime_state->exec_env()->stream_mgr()->cancel(_runtime_state->fragment_instance_id());
_runtime_state->exec_env()->result_mgr()->cancel(_runtime_state->fragment_instance_id());
env->stream_mgr()->cancel(id);
env->result_mgr()->cancel(id);
}
}

View File

@ -138,7 +138,7 @@ public:
const Status& status() const { return _status; }
DataSink* get_sink() { return _sink.get(); }
DataSink* get_sink() const { return _sink.get(); }
void set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
@ -245,7 +245,7 @@ private:
// Idempotent.
void stop_report_thread();
const DescriptorTbl& desc_tbl() { return _runtime_state->desc_tbl(); }
const DescriptorTbl& desc_tbl() const { return _runtime_state->desc_tbl(); }
void _collect_query_statistics();