From fe184e322a90e680ebbbb951f0ba5a8299407c1a Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Wed, 20 Dec 2023 11:45:06 +0800 Subject: [PATCH] [code](pipelineX) refine some pipelineX code (#28570) --- .../pipeline/exec/exchange_sink_operator.cpp | 14 ++--- be/src/pipeline/exec/exchange_sink_operator.h | 16 +++-- .../exec/exchange_source_operator.cpp | 14 ++--- .../pipeline/exec/exchange_source_operator.h | 7 ++- .../exec/multi_cast_data_stream_source.cpp | 10 +++- .../exec/multi_cast_data_stream_source.h | 3 + be/src/pipeline/exec/scan_operator.cpp | 14 ++++- be/src/pipeline/exec/scan_operator.h | 9 +++ be/src/pipeline/pipeline_x/operator.cpp | 60 ++++++++++--------- be/src/pipeline/pipeline_x/operator.h | 23 ++++--- .../pipeline_x_fragment_context.cpp | 7 ++- .../pipeline/pipeline_x/pipeline_x_task.cpp | 31 ++++++---- be/src/pipeline/pipeline_x/pipeline_x_task.h | 3 +- be/src/runtime/fragment_mgr.cpp | 1 + 14 files changed, 130 insertions(+), 82 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e70be36b77..0dd1dda415 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -100,7 +100,7 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const { } Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { - RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info)); + RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _sender_id = info.sender_id; @@ -174,9 +174,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf id, p._dest_node_id, _sender_id, _state->be_number(), state->get_query_ctx()); register_channels(_sink_buffer.get()); - - _exchange_sink_dependency = AndDependency::create_shared( - _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); + auto* _exchange_sink_dependency = _dependency; _queue_dependency = ExchangeSinkQueueDependency::create_shared( _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); @@ -237,7 +235,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf } Status ExchangeSinkLocalState::open(RuntimeState* state) { - RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state)); + RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast(); if (p._part_type == TPartitionType::HASH_PARTITIONED || p._part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { @@ -522,8 +520,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status) std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}", - PipelineXSinkLocalState<>::debug_string(indentation_level)); + fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, _busy_channels = {})", _sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load()); return fmt::to_string(debug_string_buffer); @@ -536,6 +533,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->watcher_elapse_time()); + COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); if (_broadcast_dependency) { COUNTER_UPDATE(_wait_broadcast_buffer_timer, _broadcast_dependency->watcher_elapse_time()); } @@ -545,7 +543,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { } _sink_buffer->update_profile(profile()); _sink_buffer->close(); - return PipelineXSinkLocalState<>::close(state, exec_status); + return Base::close(state, exec_status); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index bc91e5dc19..766f43dc80 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -144,20 +144,25 @@ public: // TODO(gabriel): blocked by memory }; -class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { +class ExchangeSinkLocalState final : public PipelineXSinkLocalState { ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState); + using Base = PipelineXSinkLocalState; public: ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : PipelineXSinkLocalState<>(parent, state), + : Base(parent, state), current_channel_idx(0), only_local_exchange(false), - _serializer(this) {} + _serializer(this) { + _finish_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", + state->get_query_ctx()); + } Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; - Dependency* dependency() override { return _exchange_sink_dependency.get(); } + Dependency* finishdependency() override { return _finish_dependency.get(); } Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1); void register_channels(pipeline::ExchangeSinkBuffer* buffer); Status get_next_available_buffer(vectorized::BroadcastPBlockHolder** holder); @@ -231,11 +236,12 @@ private: vectorized::BlockSerializer _serializer; std::shared_ptr _queue_dependency; - std::shared_ptr _exchange_sink_dependency; std::shared_ptr _broadcast_dependency; std::vector> _local_channels_dependency; std::unique_ptr _partitioner; int _partition_count; + + std::shared_ptr _finish_dependency; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 41e57fbde7..99c9e8e9fc 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -41,12 +41,11 @@ bool ExchangeSourceOperator::is_pending_finish() const { } ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* parent) - : PipelineXLocalState<>(state, parent), num_rows_skipped(0), is_ready(false) {} + : Base(state, parent), num_rows_skipped(0), is_ready(false) {} std::string ExchangeLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}", - PipelineXLocalState<>::debug_string(indentation_level)); + fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); fmt::format_to(debug_string_buffer, ", Queues: ("); const auto& queues = stream_recvr->sender_queues(); for (size_t i = 0; i < queues.size(); i++) { @@ -68,15 +67,14 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const { } Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { - RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); + RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), profile(), p.is_merging(), p.sub_plan_query_statistics_recvr()); - source_dependency = AndDependency::create_shared(_parent->operator_id(), _parent->node_id(), - state->get_query_ctx()); + auto* source_dependency = _dependency; const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); metrics.resize(queues.size()); @@ -101,7 +99,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { Status ExchangeLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); - RETURN_IF_ERROR(PipelineXLocalState<>::open(state)); + RETURN_IF_ERROR(Base::open(state)); return Status::OK(); } @@ -215,7 +213,7 @@ Status ExchangeLocalState::close(RuntimeState* state) { if (_parent->cast()._is_merging) { vsort_exec_exprs.close(state); } - return PipelineXLocalState<>::close(state); + return Base::close(state); } Status ExchangeSourceOperatorX::close(RuntimeState* state) { diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 4921132188..a9848be23c 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -71,21 +71,22 @@ private: }; class ExchangeSourceOperatorX; -class ExchangeLocalState final : public PipelineXLocalState<> { +class ExchangeLocalState final : public PipelineXLocalState { ENABLE_FACTORY_CREATOR(ExchangeLocalState); + +public: + using Base = PipelineXLocalState; ExchangeLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - Dependency* dependency() override { return source_dependency.get(); } std::string debug_string(int indentation_level) const override; std::shared_ptr stream_recvr; doris::vectorized::VSortExecExprs vsort_exec_exprs; int64_t num_rows_skipped; bool is_ready; - std::shared_ptr source_dependency; std::vector> deps; std::vector metrics; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 4c73f6ecb2..a4f3ff55a5 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -128,9 +128,13 @@ RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent), - vectorized::RuntimeFilterConsumer( - static_cast(parent)->dest_id_from_sink(), parent->runtime_filter_descs(), - static_cast(parent)->_row_desc(), _conjuncts) {}; + vectorized::RuntimeFilterConsumer(static_cast(parent)->dest_id_from_sink(), + parent->runtime_filter_descs(), + static_cast(parent)->_row_desc(), _conjuncts) { + _filter_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", + state->get_query_ctx()); +}; Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 86034a76ce..baeca2ca7b 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -120,8 +120,11 @@ public: friend class MultiCastDataStreamerSourceOperatorX; + RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); } + private: vectorized::VExprContextSPtrs _output_expr_contexts; + std::shared_ptr _filter_dependency; }; class MultiCastDataStreamerSourceOperatorX final diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index d8d3958a77..398765c8cc 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -100,7 +100,14 @@ std::string ScanOperator::debug_string() const { template ScanLocalState::ScanLocalState(RuntimeState* state, OperatorXBase* parent) - : ScanLocalStateBase(state, parent) {} + : ScanLocalStateBase(state, parent) { + _finish_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", + state->get_query_ctx()); + _filter_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", + state->get_query_ctx()); +} template bool ScanLocalState::ready_to_read() { @@ -1311,6 +1318,9 @@ Status ScanLocalState::_init_profile() { _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT); + _wait_for_finish_dependency_timer = + ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency"); + return Status::OK(); } @@ -1442,7 +1452,7 @@ Status ScanLocalState::close(RuntimeState* state) { _scanner_ctx->clear_and_join(reinterpret_cast(this), state); } COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); - + COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); return PipelineXLocalState<>::close(state); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 603f3804aa..78cb399427 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -171,6 +171,8 @@ protected: RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr; // time of prefilter input block from scanner RuntimeProfile::Counter* _wait_for_eos_timer = nullptr; + + RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; }; template @@ -211,6 +213,9 @@ class ScanLocalState : public ScanLocalStateBase { Dependency* dependency() override { return _scan_dependency.get(); } + RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }; + Dependency* finishdependency() override { return _finish_dependency.get(); } + protected: template friend class ScanOperatorX; @@ -405,6 +410,10 @@ protected: std::atomic _eos = false; std::mutex _block_lock; + + std::shared_ptr _filter_dependency; + + std::shared_ptr _finish_dependency; }; template diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index a466b8b3b6..55fa103e19 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -308,25 +308,14 @@ Status OperatorX::setup_local_state(RuntimeState* state, LocalSt PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent, RuntimeState* state) - : _parent(parent), - _state(state), - _finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(), - parent->get_name() + "_FINISH_DEPENDENCY", - state->get_query_ctx())) {} + : _parent(parent), _state(state) {} PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent) : _num_rows_returned(0), _rows_returned_counter(nullptr), _peak_memory_usage_counter(nullptr), _parent(parent), - _state(state), - _finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(), - parent->get_name() + "_FINISH_DEPENDENCY", - state->get_query_ctx())) { - _filter_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", - state->get_query_ctx()); -} + _state(state) {} template Status PipelineXLocalState::init(RuntimeState* state, LocalStateInfo& info) { @@ -334,22 +323,30 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState _runtime_profile->set_metadata(_parent->node_id()); _runtime_profile->set_is_sink(false); info.parent_profile->add_child(_runtime_profile.get(), true, nullptr); - _wait_for_finish_dependency_timer = - ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency"); + constexpr auto is_fake_shared = + std::is_same_v; _dependency = (DependencyType*)info.dependency.get(); if constexpr (!std::is_same_v) { + _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( + _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); auto& deps = info.upstream_dependencies; if constexpr (std::is_same_v) { _dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first); - } else { + _shared_state = + (typename DependencyType::SharedState*)_dependency->shared_state().get(); + _shared_state->ref(); + + _shared_state->source_dep = _dependency; + _shared_state->sink_dep = deps.front().get(); + } else if constexpr (!is_fake_shared) { _dependency->set_shared_state(deps.front()->shared_state()); + _shared_state = + (typename DependencyType::SharedState*)_dependency->shared_state().get(); + _shared_state->ref(); + + _shared_state->source_dep = _dependency; + _shared_state->sink_dep = deps.front().get(); } - _shared_state = (typename DependencyType::SharedState*)_dependency->shared_state().get(); - _shared_state->ref(); - _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( - _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); - _shared_state->source_dep = _dependency; - _shared_state->sink_dep = deps.front().get(); } _conjuncts.resize(_parent->_conjuncts.size()); @@ -386,7 +383,6 @@ Status PipelineXLocalState::close(RuntimeState* state) { if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } - COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); if (_rows_returned_counter != nullptr) { COUNTER_SET(_rows_returned_counter, _num_rows_returned); } @@ -405,6 +401,8 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, _profile->set_metadata(_parent->node_id()); _profile->set_is_sink(true); _wait_for_finish_dependency_timer = ADD_TIMER(_profile, "PendingFinishDependency"); + constexpr auto is_fake_shared = + std::is_same_v; if constexpr (!std::is_same_v) { auto& deps = info.dependencys; _dependency = (DependencyType*)deps.front().get(); @@ -412,12 +410,18 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, _dependency = info.le_state_map[_parent->dests_id().front()].second.get(); } if (_dependency) { - _shared_state = - (typename DependencyType::SharedState*)_dependency->shared_state().get(); + if constexpr (!is_fake_shared) { + _shared_state = + (typename DependencyType::SharedState*)_dependency->shared_state().get(); + } + _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( _profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); } - _shared_state->ref(); + if constexpr (!is_fake_shared) { + _shared_state->ref(); + } + } else { auto& deps = info.dependencys; deps.front() = std::make_shared(0, 0, state->get_query_ctx()); @@ -446,7 +450,6 @@ Status PipelineXSinkLocalState::close(RuntimeState* state, Statu if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } - COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); if (_peak_memory_usage_counter) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); } @@ -536,6 +539,7 @@ Status AsyncWriterSink::close(RuntimeState* state, Status exec_s return Status::OK(); } COUNTER_SET(_wait_for_dependency_timer, _async_writer_dependency->watcher_elapse_time()); + COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); // if the init failed, the _writer may be nullptr. so here need check if (_writer) { if (_writer->need_normal_close()) { @@ -630,6 +634,7 @@ template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; +template class PipelineXSinkLocalState; template class PipelineXLocalState; template class PipelineXLocalState; @@ -642,6 +647,7 @@ template class PipelineXLocalState; template class PipelineXLocalState; template class PipelineXLocalState; template class PipelineXLocalState; +template class PipelineXLocalState; template class AsyncWriterSink; template class AsyncWriterSink; diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 783b15ac7e..d697970311 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -102,8 +102,10 @@ public: virtual Dependency* dependency() { return nullptr; } - Dependency* finishdependency() { return _finish_dependency.get(); } - RuntimeFilterDependency* filterdependency() { return _filter_dependency.get(); } + // override in Scan + virtual Dependency* finishdependency() { return nullptr; } + // override in Scan MultiCastSink + virtual RuntimeFilterDependency* filterdependency() { return nullptr; } protected: friend class OperatorXBase; @@ -121,7 +123,6 @@ protected: RuntimeProfile::Counter* _blocks_returned_counter = nullptr; RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr; RuntimeProfile::Counter* _memory_used_counter = nullptr; - RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; RuntimeProfile::Counter* _projection_timer = nullptr; RuntimeProfile::Counter* _exec_timer = nullptr; // Account for peak memory used by this node @@ -135,8 +136,6 @@ protected: vectorized::VExprContextSPtrs _projections; bool _closed = false; vectorized::Block _origin_block; - std::shared_ptr _finish_dependency; - std::shared_ptr _filter_dependency; }; class OperatorXBase : public OperatorBase { @@ -397,7 +396,8 @@ public: RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } virtual Dependency* dependency() { return nullptr; } - Dependency* finishdependency() { return _finish_dependency.get(); } + // override in exchange sink , AsyncWriterSink + virtual Dependency* finishdependency() { return nullptr; } protected: DataSinkOperatorXBase* _parent = nullptr; @@ -424,7 +424,6 @@ protected: RuntimeProfile::Counter* _exec_timer = nullptr; RuntimeProfile::Counter* _memory_used_counter = nullptr; RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; - std::shared_ptr _finish_dependency; }; class DataSinkOperatorXBase : public OperatorBase { @@ -659,7 +658,11 @@ class AsyncWriterSink : public PipelineXSinkLocalState { public: using Base = PipelineXSinkLocalState; AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state) - : Base(parent, state), _async_writer_dependency(nullptr) {} + : Base(parent, state), _async_writer_dependency(nullptr) { + _finish_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", + state->get_query_ctx()); + } Status init(RuntimeState* state, LocalSinkStateInfo& info) override; @@ -672,11 +675,15 @@ public: Status try_close(RuntimeState* state, Status exec_status) override; + Dependency* finishdependency() override { return _finish_dependency.get(); } + protected: vectorized::VExprContextSPtrs _output_vexpr_ctxs; std::unique_ptr _writer; std::shared_ptr _async_writer_dependency; + + std::shared_ptr _finish_dependency; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 31ba7b0b88..6cb666da41 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -601,8 +601,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t pip_idx) { DCHECK(pipeline_id_to_profile[pip_idx]); - RETURN_IF_ERROR(task->prepare(get_task_runtime_state(task->task_id()), local_params, - request.fragment.output_sink)); + RETURN_IF_ERROR(task->prepare(local_params, request.fragment.output_sink)); return Status::OK(); }; @@ -828,7 +827,7 @@ Status PipelineXFragmentContext::_add_local_exchange( OperatorXPtr source_op; source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id)); RETURN_IF_ERROR(source_op->init(exchange_type)); - if (operator_xs.size() > 0) { + if (!operator_xs.empty()) { RETURN_IF_ERROR(operator_xs.front()->set_child(source_op)); } operator_xs.insert(operator_xs.begin(), source_op); @@ -878,6 +877,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx) { + // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. + // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); std::stringstream error_msg; switch (tnode.node_type) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 283b7851e4..be62fcac21 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -56,7 +56,7 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta _source(_operators.front()), _root(_operators.back()), _sink(pipeline->sink_shared_pointer()), - _le_state_map(le_state_map), + _le_state_map(std::move(le_state_map)), _task_idx(task_idx), _execution_dep(state->get_query_ctx()->get_execution_dependency()) { _pipeline_task_watcher.start(); @@ -67,15 +67,13 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta pipeline->incr_created_tasks(); } -Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams& local_params, - const TDataSink& tsink) { +Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink) { DCHECK(_sink); DCHECK(_cur_state == PipelineTaskState::NOT_READY) << get_state_name(_cur_state); _init_profile(); SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_prepare_timer); - DCHECK_EQ(state, _state); { // set sink local state @@ -85,20 +83,20 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams get_downstream_dependency(), _le_state_map, tsink}; - RETURN_IF_ERROR(_sink->setup_local_state(state, info)); + RETURN_IF_ERROR(_sink->setup_local_state(_state, info)); } std::vector no_scan_ranges; auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, _operators.front()->node_id(), no_scan_ranges); - auto* parent_profile = state->get_sink_local_state(_sink->operator_id())->profile(); + auto* parent_profile = _state->get_sink_local_state(_sink->operator_id())->profile(); for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& op = _operators[op_idx]; auto& deps = get_upstream_dependency(op->operator_id()); LocalStateInfo info {parent_profile, scan_ranges, deps, _le_state_map, _task_idx, _source_dependency[op->operator_id()]}; - RETURN_IF_ERROR(op->setup_local_state(state, info)); - parent_profile = state->get_local_state(op->operator_id())->profile(); + RETURN_IF_ERROR(op->setup_local_state(_state, info)); + parent_profile = _state->get_local_state(op->operator_id())->profile(); } _block = doris::vectorized::Block::create_unique(); @@ -120,7 +118,9 @@ Status PipelineXTask::_extract_dependencies() { DCHECK(dep != nullptr); _read_dependencies.push_back(dep); auto* fin_dep = local_state->finishdependency(); - _finish_dependencies.push_back(fin_dep); + if (fin_dep) { + _finish_dependencies.push_back(fin_dep); + } } { auto result = _state->get_sink_local_state_result(_sink->operator_id()); @@ -132,7 +132,9 @@ Status PipelineXTask::_extract_dependencies() { DCHECK(dep != nullptr); _write_dependencies = dep; auto* fin_dep = local_state->finishdependency(); - _finish_dependencies.push_back(fin_dep); + if (fin_dep) { + _finish_dependencies.push_back(fin_dep); + } } { auto result = _state->get_local_state_result(_source->operator_id()); @@ -193,6 +195,7 @@ Status PipelineXTask::_open() { for (size_t i = 0; i < 2; i++) { auto st = local_state->open(_state); if (st.is()) { + DCHECK(_filter_dependency); _blocked_dep = _filter_dependency->is_blocked_by(this); if (_blocked_dep) { set_state(PipelineTaskState::BLOCKED_FOR_RF); @@ -377,9 +380,11 @@ std::string PipelineXTask::debug_string() { fmt::format_to(debug_string_buffer, "{}. {}\n", i, _write_dependencies->debug_string(1)); i++; - fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n"); - fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1)); - i++; + if (_filter_dependency) { + fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n"); + fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1)); + i++; + } fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n"); for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index fc633339c3..f7b996f40a 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -62,8 +62,7 @@ public: return Status::InternalError("Should not reach here!"); } - Status prepare(RuntimeState* state, const TPipelineInstanceParams& local_params, - const TDataSink& tsink); + Status prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink); Status execute(bool* eos) override; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 90ebbff37a..c54786e542 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -220,6 +220,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { DCHECK(req.runtime_state != nullptr); if (req.query_statistics) { + // use to report 'insert into select' TQueryStatistics queryStatistics; DCHECK(req.query_statistics->collect_dml_statistics()); req.query_statistics->to_thrift(&queryStatistics);