From 06788bc2d0b0f8572f4ee84df454d50d3839a3e0 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 10 Feb 2023 15:57:26 +0800 Subject: [PATCH] [Bug](pipeline) Fix projection on streaming operator (#16592) --- be/src/exec/exec_node.cpp | 7 +++++-- be/src/exec/exec_node.h | 14 +++++++++++++- be/src/pipeline/exec/operator.h | 8 ++++++-- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 08b29bdb2a..53e5cc7327 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -570,9 +570,12 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo Status ExecNode::get_next_after_projects( RuntimeState* state, vectorized::Block* block, bool* eos, - const std::function& func) { + const std::function& func, + bool clear_data) { if (_output_row_descriptor) { - _origin_block.clear_column_data(_row_descriptor.num_materialized_slots()); + if (clear_data) { + clear_origin_block(); + } auto status = func(state, &_origin_block, eos); if (UNLIKELY(!status.ok())) return status; return do_projections(&_origin_block, block); diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 0905f1788d..d5a38890ab 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -108,7 +108,19 @@ public: // new interface to compatible new optimizers in FE Status get_next_after_projects( RuntimeState* state, vectorized::Block* block, bool* eos, - const std::function& fn); + const std::function& fn, + bool clear_data = true); + + // Used by pipeline streaming operators. + vectorized::Block* get_clear_input_block() { + clear_origin_block(); + return &_origin_block; + } + bool has_output_row_descriptor() const { return _output_row_descriptor != nullptr; } + // If use projection, we should clear `_origin_block`. + void clear_origin_block() { + _origin_block.clear_column_data(_row_descriptor.num_materialized_slots()); + } // Emit data, both need impl with method: sink // Eg: Aggregation, Sort, Scan diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index e23bb78e63..de02d4c73e 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -324,6 +324,7 @@ public: std::make_unique(get_name() + ": " + _runtime_profile->name(), _runtime_profile.get(), nullptr, "PeakMemoryUsage"); _node->increase_ref(); + _use_projection = _node->has_output_row_descriptor(); return Status::OK(); } @@ -355,12 +356,14 @@ public: SourceState& source_state) override { SCOPED_TIMER(_runtime_profile->total_time_counter()); DCHECK(_child); - RETURN_IF_ERROR(_child->get_block(state, block, source_state)); + auto input_block = _use_projection ? _node->get_clear_input_block() : block; + RETURN_IF_ERROR(_child->get_block(state, input_block, source_state)); bool eos = false; RETURN_IF_ERROR(_node->get_next_after_projects( state, block, &eos, std::bind(&ExecNode::pull, _node, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3))); + std::placeholders::_3), + false)); return Status::OK(); } @@ -375,6 +378,7 @@ protected: } NodeType* _node; + bool _use_projection; }; template