From ef46b580d09f181d1eedd728cc52d75316f86ecf Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Sat, 10 Dec 2022 19:32:11 +0800 Subject: [PATCH] [Vectorized](operator) support analytic eval operator (#14774) --- be/src/pipeline/CMakeLists.txt | 2 + .../pipeline/exec/analytic_sink_operator.cpp | 25 ++ be/src/pipeline/exec/analytic_sink_operator.h | 47 ++++ .../exec/analytic_source_operator.cpp | 24 ++ .../pipeline/exec/analytic_source_operator.h | 48 ++++ be/src/pipeline/pipeline_fragment_context.cpp | 15 ++ be/src/vec/exec/vanalytic_eval_node.cpp | 249 ++++++++++-------- be/src/vec/exec/vanalytic_eval_node.h | 36 ++- 8 files changed, 324 insertions(+), 122 deletions(-) create mode 100644 be/src/pipeline/exec/analytic_sink_operator.cpp create mode 100644 be/src/pipeline/exec/analytic_sink_operator.h create mode 100644 be/src/pipeline/exec/analytic_source_operator.cpp create mode 100644 be/src/pipeline/exec/analytic_source_operator.h diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt index 01cd4fbe34..bca5816c2b 100644 --- a/be/src/pipeline/CMakeLists.txt +++ b/be/src/pipeline/CMakeLists.txt @@ -40,6 +40,8 @@ set(PIPELINE_FILES exec/aggregation_source_operator.cpp exec/hashjoin_build_sink.cpp exec/hashjoin_probe_operator.cpp + exec/analytic_sink_operator.cpp + exec/analytic_source_operator.cpp exec/streaming_aggregation_source_operator.cpp exec/streaming_aggregation_sink_operator.cpp exec/agg_context.cpp diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp new file mode 100644 index 0000000000..0e06e517d9 --- /dev/null +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -0,0 +1,25 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "analytic_sink_operator.h" + +namespace doris::pipeline { + +OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, Operator) + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h new file mode 100644 index 0000000000..b330b37637 --- /dev/null +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -0,0 +1,47 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "vec/exec/vanalytic_eval_node.h" + +namespace doris { +namespace vectorized { +class VAnalyticEvalNode; +} // namespace vectorized + +namespace pipeline { +class AnalyticSinkOperatorBuilder final : public OperatorBuilder { +public: + AnalyticSinkOperatorBuilder(int32_t, ExecNode*); + + OperatorPtr build_operator() override; + + bool is_sink() const override { return true; }; +}; + +class AnalyticSinkOperator final : public Operator { +public: + AnalyticSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* node); + + bool can_write() override { return _node->can_write(); }; +}; + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp new file mode 100644 index 0000000000..7f0194a62f --- /dev/null +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "analytic_source_operator.h" + +namespace doris::pipeline { + +OPERATOR_CODE_GENERATOR(AnalyticSourceOperator, Operator) + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h new file mode 100644 index 0000000000..0d25bb850b --- /dev/null +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "vec/exec/vanalytic_eval_node.h" + +namespace doris { + +namespace vectorized { +class VAnalyticEvalNode; +} + +namespace pipeline { + +class AnalyticSourceOperatorBuilder final : public OperatorBuilder { +public: + AnalyticSourceOperatorBuilder(int32_t, ExecNode*); + + bool is_source() const override { return true; } + + OperatorPtr build_operator() override; +}; + +class AnalyticSourceOperator final : public Operator { +public: + AnalyticSourceOperator(OperatorBuilderBase*, ExecNode*); + + Status open(RuntimeState*) override { return Status::OK(); } +}; + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index ccd277db42..0a6a76e3a9 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -22,6 +22,8 @@ #include "exec/agg_context.h" #include "exec/aggregation_sink_operator.h" #include "exec/aggregation_source_operator.h" +#include "exec/analytic_sink_operator.h" +#include "exec/analytic_source_operator.h" #include "exec/data_sink.h" #include "exec/datagen_operator.h" #include "exec/empty_set_operator.h" @@ -384,6 +386,19 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(cur_pipe->add_operator(sort_source)); break; } + case TPlanNodeType::ANALYTIC_EVAL_NODE: { + auto new_pipeline = add_pipeline(); + RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline)); + + OperatorBuilderPtr analytic_sink = + std::make_shared(next_operator_builder_id(), node); + RETURN_IF_ERROR(new_pipeline->set_sink(analytic_sink)); + + OperatorBuilderPtr analytic_source = + std::make_shared(next_operator_builder_id(), node); + RETURN_IF_ERROR(cur_pipe->add_operator(analytic_source)); + break; + } case TPlanNodeType::REPEAT_NODE: { RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); OperatorBuilderPtr builder = diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 5fa25cb1c6..eabcaa88ca 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -17,13 +17,10 @@ #include "vec/exec/vanalytic_eval_node.h" -#include "exprs/agg_fn_evaluator.h" -#include "exprs/anyval_util.h" #include "runtime/descriptors.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" -#include "udf/udf_internal.h" -#include "vec/utils/util.hpp" +#include "vec/exprs/vexpr.h" namespace doris::vectorized { @@ -41,8 +38,7 @@ VAnalyticEvalNode::VAnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, if (!tnode.analytic_node.__isset .window) { //haven't set window, Unbounded: [unbounded preceding,unbounded following] _executor.get_next = std::bind(&VAnalyticEvalNode::_get_next_for_partition, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3); + std::placeholders::_1); } else if (tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) { DCHECK(!_window.__isset.window_start) << "RANGE windows must have UNBOUNDED PRECEDING"; @@ -53,21 +49,20 @@ VAnalyticEvalNode::VAnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, if (!_window.__isset .window_end) { //haven't set end, so same as PARTITION, [unbounded preceding, unbounded following] _executor.get_next = std::bind(&VAnalyticEvalNode::_get_next_for_partition, - this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3); + this, std::placeholders::_1); + } else { _fn_scope = AnalyticFnScope::RANGE; //range: [unbounded preceding,current row] _executor.get_next = std::bind(&VAnalyticEvalNode::_get_next_for_range, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3); + std::placeholders::_1); } } else { if (!_window.__isset.window_start && !_window.__isset.window_end) { //haven't set start and end, same as PARTITION _executor.get_next = std::bind(&VAnalyticEvalNode::_get_next_for_partition, - this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3); + this, std::placeholders::_1); + } else { if (_window.__isset.window_start) { //calculate start boundary TAnalyticWindowBoundary b = _window.window_start; @@ -97,8 +92,7 @@ VAnalyticEvalNode::VAnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, _fn_scope = AnalyticFnScope::ROWS; _executor.get_next = std::bind(&VAnalyticEvalNode::_get_next_for_rows, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3); + std::placeholders::_1); } } _agg_arena_pool = std::make_unique(); @@ -216,15 +210,28 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { } Status VAnalyticEvalNode::open(RuntimeState* state) { + RETURN_IF_ERROR(alloc_resource(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + RETURN_IF_ERROR(child(0)->open(state)); + return Status::OK(); +} + +Status VAnalyticEvalNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK(); + } + release_resource(state); + return ExecNode::close(state); +} + +Status VAnalyticEvalNode::alloc_resource(RuntimeState* state) { { SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(ExecNode::open(state)); + RETURN_IF_ERROR(ExecNode::alloc_resource(state)); RETURN_IF_CANCELLED(state); } - RETURN_IF_ERROR(child(0)->open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(VExpr::open(_partition_by_eq_expr_ctxs, state)); RETURN_IF_ERROR(VExpr::open(_order_by_eq_expr_ctxs, state)); @@ -237,20 +244,65 @@ Status VAnalyticEvalNode::open(RuntimeState* state) { return Status::OK(); } -Status VAnalyticEvalNode::close(RuntimeState* state) { - if (is_closed()) { +Status VAnalyticEvalNode::pull(doris::RuntimeState* /*state*/, vectorized::Block* output_block, + bool* eos) { + if (_input_eos && (_output_block_index == _input_blocks.size() || _input_total_rows == 0)) { + *eos = true; return Status::OK(); } + + while (!_input_eos || _output_block_index < _input_blocks.size()) { + _found_partition_end = _get_partition_by_end(); + _need_more_input = whether_need_next_partition(_found_partition_end); + if (_need_more_input) { + return Status::OK(); + } + _next_partition = _init_next_partition(_found_partition_end); + _init_result_columns(); + size_t current_block_rows = _input_blocks[_output_block_index].rows(); + _executor.get_next(current_block_rows); + if (_window_end_position == current_block_rows) { + break; + } + } + RETURN_IF_ERROR(_output_current_block(output_block)); + RETURN_IF_ERROR( + VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns())); + reached_limit(output_block, eos); + return Status::OK(); +} + +void VAnalyticEvalNode::release_resource(RuntimeState* state) { + if (is_closed()) { + return; + } START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::close"); VExpr::close(_partition_by_eq_expr_ctxs, state); VExpr::close(_order_by_eq_expr_ctxs, state); - for (size_t i = 0; i < _agg_functions_size; ++i) VExpr::close(_agg_expr_ctxs[i], state); - for (auto* agg_function : _agg_functions) agg_function->close(state); + for (size_t i = 0; i < _agg_functions_size; ++i) { + VExpr::close(_agg_expr_ctxs[i], state); + } + for (auto* agg_function : _agg_functions) { + agg_function->close(state); + } _destroy_agg_status(); _release_mem(); - return ExecNode::close(state); + return ExecNode::release_resource(state); +} + +//TODO: maybe could have better strategy, not noly when need data to sink data +//even could get some resources in advance as soon as possible +bool VAnalyticEvalNode::can_write() { + return _need_more_input; +} + +bool VAnalyticEvalNode::can_read() { + if (_need_more_input) { + return false; + } + return true; } Status VAnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { @@ -268,99 +320,70 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block return Status::OK(); } - RETURN_IF_ERROR(_executor.get_next(state, block, eos)); - + while (!_input_eos || _output_block_index < _input_blocks.size()) { + RETURN_IF_ERROR(_consumed_block_and_init_partition(state, &_next_partition, eos)); + if (*eos) { + return Status::OK(); + } + size_t current_block_rows = _input_blocks[_output_block_index].rows(); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + RETURN_IF_ERROR(_executor.get_next(current_block_rows)); + if (_window_end_position == current_block_rows) { + break; + } + } + RETURN_IF_ERROR(_output_current_block(block)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns())); reached_limit(block, eos); return Status::OK(); } -Status VAnalyticEvalNode::_get_next_for_partition(RuntimeState* state, Block* block, bool* eos) { - while (!_input_eos || _output_block_index < _input_blocks.size()) { - bool next_partition = false; - RETURN_IF_ERROR(_consumed_block_and_init_partition(state, &next_partition, eos)); - if (*eos) { - break; - } +Status VAnalyticEvalNode::_get_next_for_partition(size_t current_block_rows) { + if (_next_partition) { + _executor.execute(_partition_by_start.pos, _partition_by_end.pos, _partition_by_start.pos, + _partition_by_end.pos); + } + _executor.insert_result(current_block_rows); + return Status::OK(); +} - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); - size_t current_block_rows = _input_blocks[_output_block_index].rows(); - if (next_partition) { - _executor.execute(_partition_by_start.pos, _partition_by_end.pos, - _partition_by_start.pos, _partition_by_end.pos); +Status VAnalyticEvalNode::_get_next_for_range(size_t current_block_rows) { + while (_current_row_position < _partition_by_end.pos && + _window_end_position < current_block_rows) { + if (_current_row_position >= _order_by_end.pos) { + _update_order_by_range(); + _executor.execute(_order_by_start.pos, _order_by_end.pos, _order_by_start.pos, + _order_by_end.pos); } _executor.insert_result(current_block_rows); - if (_window_end_position == current_block_rows) { - return _output_current_block(block); - } } return Status::OK(); } -Status VAnalyticEvalNode::_get_next_for_range(RuntimeState* state, Block* block, bool* eos) { - while (!_input_eos || _output_block_index < _input_blocks.size()) { - bool next_partition = false; - RETURN_IF_ERROR(_consumed_block_and_init_partition(state, &next_partition, eos)); - if (*eos) { - break; - } - - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); - size_t current_block_rows = _input_blocks[_output_block_index].rows(); - while (_current_row_position < _partition_by_end.pos && - _window_end_position < current_block_rows) { - if (_current_row_position >= _order_by_end.pos) { - _update_order_by_range(); - _executor.execute(_order_by_start.pos, _order_by_end.pos, _order_by_start.pos, - _order_by_end.pos); - } - _executor.insert_result(current_block_rows); - } - if (_window_end_position == current_block_rows) { - return _output_current_block(block); - } - } - return Status::OK(); -} - -Status VAnalyticEvalNode::_get_next_for_rows(RuntimeState* state, Block* block, bool* eos) { - while (!_input_eos || _output_block_index < _input_blocks.size()) { - bool next_partition = false; - RETURN_IF_ERROR(_consumed_block_and_init_partition(state, &next_partition, eos)); - if (*eos) { - break; - } - - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); - size_t current_block_rows = _input_blocks[_output_block_index].rows(); - while (_current_row_position < _partition_by_end.pos && - _window_end_position < current_block_rows) { - int64_t range_start, range_end; - if (!_window.__isset.window_start && - _window.window_end.type == - TAnalyticWindowBoundaryType:: - CURRENT_ROW) { //[preceding, current_row],[current_row, following] - range_start = _current_row_position; - range_end = _current_row_position + - 1; //going on calculate,add up data, no need to reset state +Status VAnalyticEvalNode::_get_next_for_rows(size_t current_block_rows) { + while (_current_row_position < _partition_by_end.pos && + _window_end_position < current_block_rows) { + int64_t range_start, range_end; + if (!_window.__isset.window_start && + _window.window_end.type == + TAnalyticWindowBoundaryType:: + CURRENT_ROW) { //[preceding, current_row],[current_row, following] + range_start = _current_row_position; + range_end = _current_row_position + + 1; //going on calculate,add up data, no need to reset state + } else { + _reset_agg_status(); + if (!_window.__isset + .window_start) { //[preceding, offset] --unbound: [preceding, following] + range_start = _partition_by_start.pos; } else { - _reset_agg_status(); - if (!_window.__isset - .window_start) { //[preceding, offset] --unbound: [preceding, following] - range_start = _partition_by_start.pos; - } else { - range_start = _current_row_position + _rows_start_offset; - } - range_end = _current_row_position + _rows_end_offset + 1; + range_start = _current_row_position + _rows_start_offset; } - _executor.execute(_partition_by_start.pos, _partition_by_end.pos, range_start, - range_end); - _executor.insert_result(current_block_rows); - } - if (_window_end_position == current_block_rows) { - return _output_current_block(block); + range_end = _current_row_position + _rows_end_offset + 1; } + _executor.execute(_partition_by_start.pos, _partition_by_end.pos, range_start, range_end); + _executor.insert_result(current_block_rows); } return Status::OK(); } @@ -492,14 +515,22 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) { _children[0]->get_next_span(), _input_eos); } while (!_input_eos && block.rows() == 0); - if (_input_eos && block.rows() == 0) { + RETURN_IF_ERROR(sink(state, &block, _input_eos)); + return Status::OK(); +} + +Status VAnalyticEvalNode::sink(doris::RuntimeState* /*state*/, vectorized::Block* input_block, + bool eos) { + _input_eos = eos; + if (_input_eos && input_block->rows() == 0) { + _need_more_input = false; return Status::OK(); } SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); input_block_first_row_positions.emplace_back(_input_total_rows); - size_t block_rows = block.rows(); + size_t block_rows = input_block->rows(); _input_total_rows += block_rows; _all_block_end.block_num = _input_blocks.size(); _all_block_end.row_num = block_rows; @@ -507,7 +538,7 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) { if (_origin_cols .empty()) { //record origin columns, maybe be after this, could cast some column but no need to save - for (int c = 0; c < block.columns(); ++c) { + for (int c = 0; c < input_block->columns(); ++c) { _origin_cols.emplace_back(c); } } @@ -515,31 +546,33 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) { for (size_t i = 0; i < _agg_functions_size; ++i) { //insert _agg_intput_columns, execute calculate for its for (size_t j = 0; j < _agg_expr_ctxs[i].size(); ++j) { - RETURN_IF_ERROR(_insert_range_column(&block, _agg_expr_ctxs[i][j], + RETURN_IF_ERROR(_insert_range_column(input_block, _agg_expr_ctxs[i][j], _agg_intput_columns[i][j].get(), block_rows)); } } //record column idx in block for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size(); ++i) { int result_col_id = -1; - RETURN_IF_ERROR(_partition_by_eq_expr_ctxs[i]->execute(&block, &result_col_id)); + RETURN_IF_ERROR(_partition_by_eq_expr_ctxs[i]->execute(input_block, &result_col_id)); DCHECK_GE(result_col_id, 0); _partition_by_column_idxs[i] = result_col_id; } for (size_t i = 0; i < _order_by_eq_expr_ctxs.size(); ++i) { int result_col_id = -1; - RETURN_IF_ERROR(_order_by_eq_expr_ctxs[i]->execute(&block, &result_col_id)); + RETURN_IF_ERROR(_order_by_eq_expr_ctxs[i]->execute(input_block, &result_col_id)); DCHECK_GE(result_col_id, 0); _ordey_by_column_idxs[i] = result_col_id; } - mem_tracker_held()->consume(block.allocated_bytes()); - _blocks_memory_usage->add(block.allocated_bytes()); + mem_tracker_held()->consume(input_block->allocated_bytes()); + _blocks_memory_usage->add(input_block->allocated_bytes()); //TODO: if need improvement, the is a tips to maintain a free queue, //so the memory could reuse, no need to new/delete again; - _input_blocks.emplace_back(std::move(block)); + _input_blocks.emplace_back(std::move(*input_block)); + _found_partition_end = _get_partition_by_end(); + _need_more_input = whether_need_next_partition(_found_partition_end); return Status::OK(); } diff --git a/be/src/vec/exec/vanalytic_eval_node.h b/be/src/vec/exec/vanalytic_eval_node.h index a6824a108a..9957221193 100644 --- a/be/src/vec/exec/vanalytic_eval_node.h +++ b/be/src/vec/exec/vanalytic_eval_node.h @@ -19,13 +19,12 @@ #include +#include + #include "exec/exec_node.h" -#include "exprs/expr.h" -#include "runtime/tuple.h" #include "vec/common/arena.h" #include "vec/core/block.h" #include "vec/exprs/vectorized_agg_fn.h" -#include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" namespace doris::vectorized { @@ -39,24 +38,30 @@ struct BlockRowPos { class AggFnEvaluator; class VAnalyticEvalNode : public ExecNode { public: - ~VAnalyticEvalNode() {} + ~VAnalyticEvalNode() override = default; VAnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); - virtual Status prepare(RuntimeState* state); - virtual Status open(RuntimeState* state); - virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); - virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos); - virtual Status close(RuntimeState* state); + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; + Status close(RuntimeState* state) override; + Status alloc_resource(RuntimeState* state) override; + void release_resource(RuntimeState* state) override; + Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; + Status pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + bool can_read(); + bool can_write(); protected: using ExecNode::debug_string; virtual std::string debug_string(); private: - Status _get_next_for_rows(RuntimeState* state, Block* block, bool* eos); - Status _get_next_for_range(RuntimeState* state, Block* block, bool* eos); - Status _get_next_for_partition(RuntimeState* state, Block* block, bool* eos); + Status _get_next_for_rows(size_t rows); + Status _get_next_for_range(size_t rows); + Status _get_next_for_partition(size_t rows); void _execute_for_win_func(int64_t partition_start, int64_t partition_end, int64_t frame_start, int64_t frame_end); @@ -82,7 +87,7 @@ private: std::string debug_window_bound_string(TAnalyticWindowBoundary b); using vectorized_execute = std::function; - using vectorized_get_next = std::function; + using vectorized_get_next = std::function; using vectorized_get_result = std::function; using vectorized_closer = std::function; @@ -117,6 +122,9 @@ private: std::vector _partition_by_column_idxs; bool _input_eos = false; + bool _next_partition = false; + std::atomic_bool _need_more_input = true; + BlockRowPos _found_partition_end; int64_t _input_total_rows = 0; int64_t _output_block_index = 0; int64_t _window_end_position = 0;