From 1fb9022d07396d79bcc9028afb2b7225d8f5a545 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 27 Sep 2023 20:34:47 +0800 Subject: [PATCH] [pipelineX](bug) Fix meta scan operator (#24963) --- be/src/pipeline/exec/meta_scan_operator.cpp | 4 + be/src/pipeline/exec/meta_scan_operator.h | 1 + be/src/pipeline/exec/scan_operator.cpp | 13 +- be/src/pipeline/exec/scan_operator.h | 7 +- be/src/pipeline/pipeline_x/operator.cpp | 156 ++++++++++++++++++ be/src/pipeline/pipeline_x/operator.h | 139 ++-------------- .../org/apache/doris/qe/SessionVariable.java | 2 +- 7 files changed, 182 insertions(+), 140 deletions(-) diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp b/be/src/pipeline/exec/meta_scan_operator.cpp index 4f639eb9f2..87f4e2187a 100644 --- a/be/src/pipeline/exec/meta_scan_operator.cpp +++ b/be/src/pipeline/exec/meta_scan_operator.cpp @@ -43,6 +43,10 @@ void MetaScanLocalState::set_scan_ranges(const std::vector& sc _scan_ranges = scan_ranges; } +Status MetaScanLocalState::_process_conjuncts() { + return Status::OK(); +} + MetaScanOperatorX::MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ScanOperatorX(pool, tnode, descs), diff --git a/be/src/pipeline/exec/meta_scan_operator.h b/be/src/pipeline/exec/meta_scan_operator.h index 0e9c8db791..bbe67ba974 100644 --- a/be/src/pipeline/exec/meta_scan_operator.h +++ b/be/src/pipeline/exec/meta_scan_operator.h @@ -50,6 +50,7 @@ private: void set_scan_ranges(const std::vector& scan_ranges) override; Status _init_scanners(std::list* scanners) override; + Status _process_conjuncts() override; std::vector _scan_ranges; }; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index b10911528a..ddc966edb6 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -172,9 +172,8 @@ Status ScanLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(_acquire_runtime_filter()); RETURN_IF_ERROR(_process_conjuncts()); - auto status = _eos_dependency->read_blocked_by() == nullptr - ? Status::OK() - : _prepare_scanners(state->query_parallel_instance_num()); + auto status = + _eos_dependency->read_blocked_by() == nullptr ? Status::OK() : _prepare_scanners(); if (_scanner_ctx) { DCHECK(_eos_dependency->read_blocked_by() != nullptr && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); @@ -1163,21 +1162,21 @@ Status ScanLocalState::_normalize_match_predicate( } template -Status ScanLocalState::_prepare_scanners(const int query_parallel_instance_num) { +Status ScanLocalState::_prepare_scanners() { std::list scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); if (scanners.empty()) { _eos_dependency->set_ready_for_read(); } else { COUNTER_SET(_num_scanners, static_cast(scanners.size())); - RETURN_IF_ERROR(_start_scanners(scanners, query_parallel_instance_num)); + RETURN_IF_ERROR(_start_scanners(scanners)); } return Status::OK(); } template -Status ScanLocalState::_start_scanners(const std::list& scanners, - const int query_parallel_instance_num) { +Status ScanLocalState::_start_scanners( + const std::list& scanners) { auto& p = _parent->cast(); _scanner_ctx = PipScannerContext::create_shared(state(), this, p._output_tuple_desc, scanners, p.limit(), state()->scan_queue_mem_limit(), diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index ef9f54bff7..ba9cee464d 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -134,7 +134,7 @@ public: [[nodiscard]] virtual int runtime_filter_num() const = 0; - Status virtual clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0; + virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0; virtual void set_scan_ranges(const std::vector& scan_ranges) = 0; virtual TPushAggOp::type get_push_down_agg_type() = 0; @@ -351,11 +351,10 @@ protected: const ChangeFixedValueRangeFunc& func, const std::string& fn_name, int slot_ref_child = -1); - Status _prepare_scanners(const int query_parallel_instance_num); + Status _prepare_scanners(); // Submit the scanner to the thread pool and start execution - Status _start_scanners(const std::list& scanners, - const int query_parallel_instance_num); + Status _start_scanners(const std::list& scanners); // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector // so that it will be destroyed uniformly at the end of the query. diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 9244a43585..672e585db7 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -337,6 +337,96 @@ Status OperatorX::setup_local_states(RuntimeState* state, return Status::OK(); } +template +Status PipelineXLocalState::init(RuntimeState* state, LocalStateInfo& info) { + _runtime_profile.reset(new RuntimeProfile(_parent->get_name() + + " (id=" + std::to_string(_parent->id()) + ")")); + _runtime_profile->set_metadata(_parent->id()); + info.parent_profile->add_child(_runtime_profile.get(), true, nullptr); + if constexpr (!std::is_same_v) { + _dependency = (DependencyType*)info.dependency; + if (_dependency) { + _shared_state = (typename DependencyType::SharedState*)_dependency->shared_state(); + _wait_for_dependency_timer = ADD_TIMER( + _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time"); + } + } + + _conjuncts.resize(_parent->_conjuncts.size()); + _projections.resize(_parent->_projections.size()); + for (size_t i = 0; i < _conjuncts.size(); i++) { + RETURN_IF_ERROR(_parent->_conjuncts[i]->clone(state, _conjuncts[i])); + } + for (size_t i = 0; i < _projections.size(); i++) { + RETURN_IF_ERROR(_parent->_projections[i]->clone(state, _projections[i])); + } + _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT); + _blocks_returned_counter = ADD_COUNTER(_runtime_profile, "BlocksReturned", TUnit::UNIT); + _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime"); + _open_timer = ADD_TIMER(_runtime_profile, "OpenTime"); + _close_timer = ADD_TIMER(_runtime_profile, "CloseTime"); + _rows_returned_rate = profile()->add_derived_counter( + doris::ExecNode::ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, + std::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, + profile()->total_time_counter()), + ""); + _mem_tracker = std::make_unique("PipelineXLocalState:" + _runtime_profile->name()); + _memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage"); + _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( + "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); + return Status::OK(); +} + +template +Status PipelineXLocalState::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); + } + if (_dependency) { + COUNTER_SET(_wait_for_dependency_timer, _dependency->read_watcher_elapse_time()); + } + if (_rows_returned_counter != nullptr) { + COUNTER_SET(_rows_returned_counter, _num_rows_returned); + } + profile()->add_to_span(_span); + _closed = true; + return Status::OK(); +} + +template +Status PipelineXSinkLocalState::init(RuntimeState* state, + LocalSinkStateInfo& info) { + // create profile + _profile = state->obj_pool()->add(new RuntimeProfile( + _parent->get_name() + " (id=" + std::to_string(_parent->id()) + ")")); + if constexpr (!std::is_same_v) { + _dependency = (DependencyType*)info.dependency; + if (_dependency) { + _shared_state = (typename DependencyType::SharedState*)_dependency->shared_state(); + _wait_for_dependency_timer = + ADD_TIMER(_profile, "WaitForDependency[" + _dependency->name() + "]Time"); + } + } + _rows_input_counter = ADD_COUNTER(_profile, "InputRows", TUnit::UNIT); + _open_timer = ADD_TIMER(_profile, "OpenTime"); + _close_timer = ADD_TIMER(_profile, "CloseTime"); + info.parent_profile->add_child(_profile, true, nullptr); + _mem_tracker = std::make_unique(_parent->get_name()); + return Status::OK(); +} + +template +Status PipelineXSinkLocalState::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } + if (_dependency) { + COUNTER_SET(_wait_for_dependency_timer, _dependency->write_watcher_elapse_time()); + } + _closed = true; + return Status::OK(); +} + template Status StreamingOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { @@ -377,6 +467,70 @@ Status StatefulOperatorX::get_block(RuntimeState* state, vectori return Status::OK(); } +template +Status AsyncWriterSink::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info)); + _output_vexpr_ctxs.resize(_parent->cast()._output_vexpr_ctxs.size()); + for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { + RETURN_IF_ERROR( + _parent->cast()._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); + } + + _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs)); + _async_writer_dependency = AsyncWriterDependency::create_shared(_parent->id()); + _writer->set_dependency(_async_writer_dependency.get()); + + _wait_for_dependency_timer = + ADD_TIMER(_profile, "WaitForDependency[" + _async_writer_dependency->name() + "]Time"); + return Status::OK(); +} + +template +Status AsyncWriterSink::open(RuntimeState* state) { + RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state)); + _writer->start_writer(state, _profile); + return Status::OK(); +} + +template +Status AsyncWriterSink::sink(RuntimeState* state, vectorized::Block* block, + SourceState source_state) { + return _writer->sink(block, source_state == SourceState::FINISHED); +} + +template +WriteDependency* AsyncWriterSink::write_blocked_by() { + return _writer->write_blocked_by(); +} + +template +Status AsyncWriterSink::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } + COUNTER_SET(_wait_for_dependency_timer, _async_writer_dependency->write_watcher_elapse_time()); + if (_writer->need_normal_close()) { + if (exec_status.ok() && !state->is_cancelled()) { + RETURN_IF_ERROR(_writer->commit_trans()); + } + RETURN_IF_ERROR(_writer->close(exec_status)); + } + return PipelineXSinkLocalState<>::close(state, exec_status); +} + +template +Status AsyncWriterSink::try_close(RuntimeState* state, Status exec_status) { + if (state->is_cancelled() || !exec_status.ok()) { + _writer->force_close(!exec_status.ok() ? exec_status : Status::Cancelled("Cancelled")); + } + return Status::OK(); +} + +template +bool AsyncWriterSink::is_pending_finish() { + return _writer->is_pending_finish(); +} + #define DECLARE_OPERATOR_X(LOCAL_STATE) template class DataSinkOperatorX; DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState) DECLARE_OPERATOR_X(ResultSinkLocalState) @@ -445,4 +599,6 @@ template class PipelineXLocalState; template class PipelineXSinkLocalState; template class PipelineXLocalState; +template class AsyncWriterSink; + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index cb6c8f1e3a..53a294412a 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -319,60 +319,9 @@ public: : PipelineXLocalStateBase(state, parent) {} ~PipelineXLocalState() override = default; - Status init(RuntimeState* state, LocalStateInfo& info) override { - _runtime_profile.reset(new RuntimeProfile(_parent->get_name() + - " (id=" + std::to_string(_parent->id()) + ")")); - _runtime_profile->set_metadata(_parent->id()); - info.parent_profile->add_child(_runtime_profile.get(), true, nullptr); - if constexpr (!std::is_same_v) { - _dependency = (DependencyType*)info.dependency; - if (_dependency) { - _shared_state = (typename DependencyType::SharedState*)_dependency->shared_state(); - _wait_for_dependency_timer = ADD_TIMER( - _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time"); - } - } + Status init(RuntimeState* state, LocalStateInfo& info) override; - _conjuncts.resize(_parent->_conjuncts.size()); - _projections.resize(_parent->_projections.size()); - for (size_t i = 0; i < _conjuncts.size(); i++) { - RETURN_IF_ERROR(_parent->_conjuncts[i]->clone(state, _conjuncts[i])); - } - for (size_t i = 0; i < _projections.size(); i++) { - RETURN_IF_ERROR(_parent->_projections[i]->clone(state, _projections[i])); - } - _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT); - _blocks_returned_counter = ADD_COUNTER(_runtime_profile, "BlocksReturned", TUnit::UNIT); - _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime"); - _open_timer = ADD_TIMER(_runtime_profile, "OpenTime"); - _close_timer = ADD_TIMER(_runtime_profile, "CloseTime"); - _rows_returned_rate = profile()->add_derived_counter( - doris::ExecNode::ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, - std::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, - profile()->total_time_counter()), - ""); - _mem_tracker = - std::make_unique("PipelineXLocalState:" + _runtime_profile->name()); - _memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage"); - _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( - "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); - return Status::OK(); - } - - Status close(RuntimeState* state) override { - if (_closed) { - return Status::OK(); - } - if (_dependency) { - COUNTER_SET(_wait_for_dependency_timer, _dependency->read_watcher_elapse_time()); - } - if (_rows_returned_counter != nullptr) { - COUNTER_SET(_rows_returned_counter, _num_rows_returned); - } - profile()->add_to_span(_span); - _closed = true; - return Status::OK(); - } + Status close(RuntimeState* state) override; [[nodiscard]] std::string debug_string(int indentation_level = 0) const override; @@ -596,40 +545,13 @@ public: : PipelineXSinkLocalStateBase(parent, state) {} ~PipelineXSinkLocalState() override = default; - Status init(RuntimeState* state, LocalSinkStateInfo& info) override { - // create profile - _profile = state->obj_pool()->add(new RuntimeProfile( - _parent->get_name() + " (id=" + std::to_string(_parent->id()) + ")")); - if constexpr (!std::is_same_v) { - _dependency = (DependencyType*)info.dependency; - if (_dependency) { - _shared_state = (typename DependencyType::SharedState*)_dependency->shared_state(); - _wait_for_dependency_timer = - ADD_TIMER(_profile, "WaitForDependency[" + _dependency->name() + "]Time"); - } - } - _rows_input_counter = ADD_COUNTER(_profile, "InputRows", TUnit::UNIT); - _open_timer = ADD_TIMER(_profile, "OpenTime"); - _close_timer = ADD_TIMER(_profile, "CloseTime"); - info.parent_profile->add_child(_profile, true, nullptr); - _mem_tracker = std::make_unique(_parent->get_name()); - return Status::OK(); - } + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override { return Status::OK(); } Status try_close(RuntimeState* state, Status exec_status) override { return Status::OK(); } - Status close(RuntimeState* state, Status exec_status) override { - if (_closed) { - return Status::OK(); - } - if (_dependency) { - COUNTER_SET(_wait_for_dependency_timer, _dependency->write_watcher_elapse_time()); - } - _closed = true; - return Status::OK(); - } + Status close(RuntimeState* state, Status exec_status) override; [[nodiscard]] std::string debug_string(int indentation_level) const override; typename DependencyType::SharedState*& get_shared_state() { return _shared_state; } @@ -687,58 +609,19 @@ public: AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state) : PipelineXSinkLocalState<>(parent, state), _async_writer_dependency(nullptr) {} - Status init(RuntimeState* state, LocalSinkStateInfo& info) override { - RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info)); - _output_vexpr_ctxs.resize(_parent->cast()._output_vexpr_ctxs.size()); - for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { - RETURN_IF_ERROR(_parent->cast()._output_vexpr_ctxs[i]->clone( - state, _output_vexpr_ctxs[i])); - } + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs)); - _async_writer_dependency = AsyncWriterDependency::create_shared(_parent->id()); - _writer->set_dependency(_async_writer_dependency.get()); + Status open(RuntimeState* state) override; - _wait_for_dependency_timer = ADD_TIMER( - _profile, "WaitForDependency[" + _async_writer_dependency->name() + "]Time"); - return Status::OK(); - } + Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state); - Status open(RuntimeState* state) override { - RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state)); - _writer->start_writer(state, _profile); - return Status::OK(); - } + WriteDependency* write_blocked_by(); - Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) { - return _writer->sink(block, source_state == SourceState::FINISHED); - } + Status close(RuntimeState* state, Status exec_status) override; - WriteDependency* write_blocked_by() { return _writer->write_blocked_by(); } + Status try_close(RuntimeState* state, Status exec_status) override; - Status close(RuntimeState* state, Status exec_status) override { - if (_closed) { - return Status::OK(); - } - COUNTER_SET(_wait_for_dependency_timer, - _async_writer_dependency->write_watcher_elapse_time()); - if (_writer->need_normal_close()) { - if (exec_status.ok() && !state->is_cancelled()) { - RETURN_IF_ERROR(_writer->commit_trans()); - } - RETURN_IF_ERROR(_writer->close(exec_status)); - } - return PipelineXSinkLocalState<>::close(state, exec_status); - } - - Status try_close(RuntimeState* state, Status exec_status) override { - if (state->is_cancelled() || !exec_status.ok()) { - _writer->force_close(!exec_status.ok() ? exec_status : Status::Cancelled("Cancelled")); - } - return Status::OK(); - } - - bool is_pending_finish() { return _writer->is_pending_finish(); } + bool is_pending_finish(); protected: vectorized::VExprContextSPtrs _output_vexpr_ctxs; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 8a138aeda4..fbea68d0e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1654,7 +1654,7 @@ public class SessionVariable implements Serializable, Writable { int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize(); int autoInstance = (size + 1) / 2; return Math.min(autoInstance, maxInstanceNum); - } else if (enablePipelineEngine) { + } else if (getEnablePipelineEngine()) { return parallelPipelineTaskNum; } else { return parallelExecInstanceNum;