diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 82951632e4..2bd76f6fb4 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -222,6 +222,9 @@ public: int64_t rows_returned() const { return _num_rows_returned; } int64_t limit() const { return _limit; } bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; } + /// Only use in vectorized exec engine to check whether reach limit and cut num row for block + // and add block rows for profile + void reached_limit(vectorized::Block* block, bool* eos); const std::vector& get_tuple_ids() const { return _tuple_ids; } RuntimeProfile* runtime_profile() const { return _runtime_profile.get(); } @@ -259,10 +262,6 @@ protected: // 2. delete and release the column which create by function all and other reason void release_block_memory(vectorized::Block& block, uint16_t child_idx = 0); - /// Only use in vectorized exec engine to check whether reach limit and cut num row for block - // and add block rows for profile - void reached_limit(vectorized::Block* block, bool* eos); - /// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc Status do_projections(vectorized::Block* origin_block, vectorized::Block* output_block); diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt index 31531fcbc5..c0982d4085 100644 --- a/be/src/pipeline/CMakeLists.txt +++ b/be/src/pipeline/CMakeLists.txt @@ -44,7 +44,6 @@ set(PIPELINE_FILES exec/analytic_source_operator.cpp exec/streaming_aggregation_source_operator.cpp exec/streaming_aggregation_sink_operator.cpp - exec/agg_context.cpp exec/sort_source_operator.cpp exec/sort_sink_operator.cpp exec/repeat_operator.cpp @@ -54,6 +53,9 @@ set(PIPELINE_FILES exec/set_sink_operator.cpp exec/set_source_operator.cpp exec/set_probe_sink_operator.cpp + exec/union_sink_operator.cpp + exec/union_source_operator.cpp + exec/data_queue.cpp exec/select_operator.cpp) add_library(Pipeline STATIC diff --git a/be/src/pipeline/exec/agg_context.cpp b/be/src/pipeline/exec/agg_context.cpp deleted file mode 100644 index a62e0f132a..0000000000 --- a/be/src/pipeline/exec/agg_context.cpp +++ /dev/null @@ -1,118 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "agg_context.h" - -#include "runtime/descriptors.h" -#include "vec/columns/column_nullable.h" -#include "vec/core/block.h" -#include "vec/data_types/data_type_nullable.h" - -namespace doris { -namespace pipeline { - -AggContext::AggContext() - : _is_finished(false), - _is_canceled(false), - _cur_bytes_in_queue(0), - _cur_blocks_in_queue(0) {} - -AggContext::~AggContext() { - DCHECK(_is_finished); -} - -std::unique_ptr AggContext::get_free_block() { - { - std::lock_guard l(_free_blocks_lock); - if (!_free_blocks.empty()) { - auto block = std::move(_free_blocks.back()); - _free_blocks.pop_back(); - return block; - } - } - - return std::make_unique(); -} - -void AggContext::return_free_block(std::unique_ptr block) { - DCHECK(block->rows() == 0); - std::lock_guard l(_free_blocks_lock); - _free_blocks.emplace_back(std::move(block)); -} - -bool AggContext::has_data_or_finished() { - return _cur_blocks_in_queue > 0 || _is_finished; -} - -Status AggContext::get_block(std::unique_ptr* block) { - if (_is_canceled) { - return Status::InternalError("AggContext canceled"); - } - if (_cur_blocks_in_queue > 0) { - int block_size_t; - { - std::unique_lock l(_transfer_lock); - auto [block_ptr, block_size] = std::move(_blocks_queue.front()); - block_size_t = block_size; - *block = std::move(block_ptr); - _blocks_queue.pop_front(); - } - _cur_bytes_in_queue -= block_size_t; - _cur_blocks_in_queue -= 1; - } else { - if (_is_finished) { - _data_exhausted = true; - } - } - return Status::OK(); -} - -bool AggContext::has_enough_space_to_push() { - return _cur_bytes_in_queue.load() < MAX_BYTE_OF_QUEUE / 2; -} - -void AggContext::push_block(std::unique_ptr block) { - if (!block) { - return; - } - auto block_size = block->allocated_bytes(); - _cur_bytes_in_queue += block_size; - { - std::unique_lock l(_transfer_lock); - _blocks_queue.emplace_back(std::move(block), block_size); - _max_bytes_in_queue = std::max(_max_bytes_in_queue, _cur_bytes_in_queue.load()); - _max_size_of_queue = std::max(_max_size_of_queue, (int64)_blocks_queue.size()); - } - _cur_blocks_in_queue += 1; -} - -void AggContext::set_finish() { - _is_finished = true; -} - -void AggContext::set_canceled() { - DCHECK(!_is_finished); - _is_canceled = true; - _is_finished = true; -} - -bool AggContext::is_finish() { - return _is_finished; -} - -} // namespace pipeline -} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/agg_context.h b/be/src/pipeline/exec/agg_context.h deleted file mode 100644 index c7ddbdeb48..0000000000 --- a/be/src/pipeline/exec/agg_context.h +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#pragma once - -#include -#include - -#include "common/status.h" - -namespace doris { -class TupleDescriptor; -namespace vectorized { -class Block; -} -namespace pipeline { - -class AggContext { -public: - AggContext(); - ~AggContext(); - - std::unique_ptr get_free_block(); - - void return_free_block(std::unique_ptr); - - bool has_data_or_finished(); - Status get_block(std::unique_ptr* block); - - bool has_enough_space_to_push(); - void push_block(std::unique_ptr); - - void set_finish(); - void set_canceled(); // should set before finish - bool is_finish(); - - bool data_exhausted() const { return _data_exhausted; } - - int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; } - - int64_t max_size_of_queue() const { return _max_size_of_queue; } - -private: - std::mutex _free_blocks_lock; - std::vector> _free_blocks; - - std::mutex _transfer_lock; - std::list, size_t>> _blocks_queue; - - bool _data_exhausted = false; // only used by streaming agg source operator - std::atomic _is_finished; - std::atomic _is_canceled; - - // int64_t just for counter of profile - std::atomic _cur_bytes_in_queue; - std::atomic _cur_blocks_in_queue; - int64_t _max_bytes_in_queue = 0; - int64_t _max_size_of_queue = 0; - static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10; -}; - -} // namespace pipeline -} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 7b5c4ee2f6..cc48337c25 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -17,7 +17,6 @@ #pragma once -#include "agg_context.h" #include "operator.h" namespace doris { diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp new file mode 100644 index 0000000000..a6f2c43939 --- /dev/null +++ b/be/src/pipeline/exec/data_queue.cpp @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_queue.h" + +#include + +#include "vec/core/block.h" + +namespace doris { +namespace vectorized { +class Block; +} +namespace pipeline { + +DataQueue::DataQueue(int child_count) { + _child_count = child_count; + _flag_queue_idx = 0; + _queue_blocks.resize(child_count); + _free_blocks.resize(child_count); + _queue_blocks_lock.resize(child_count); + _free_blocks_lock.resize(child_count); + _is_finished.resize(child_count); + _is_canceled.resize(child_count); + _cur_bytes_in_queue.resize(child_count); + _cur_blocks_nums_in_queue.resize(child_count); + for (int i = 0; i < child_count; ++i) { + _queue_blocks_lock[i].reset(new std::mutex()); + _free_blocks_lock[i].reset(new std::mutex()); + _is_finished[i] = false; + _is_canceled[i] = false; + _cur_bytes_in_queue[i] = 0; + _cur_blocks_nums_in_queue[i] = 0; + } +} + +std::unique_ptr DataQueue::get_free_block(int child_idx) { + { + std::lock_guard l(*_free_blocks_lock[child_idx]); + if (!_free_blocks[child_idx].empty()) { + auto block = std::move(_free_blocks[child_idx].front()); + _free_blocks[child_idx].pop_front(); + return block; + } + } + + return std::make_unique(); +} + +void DataQueue::push_free_block(std::unique_ptr block, int child_idx) { + DCHECK(block->rows() == 0); + std::lock_guard l(*_free_blocks_lock[child_idx]); + _free_blocks[child_idx].emplace_back(std::move(block)); +} + +//use sink to check can_write +bool DataQueue::has_enough_space_to_push(int child_idx) { + return _cur_bytes_in_queue[child_idx].load() < MAX_BYTE_OF_QUEUE / 2; +} + +//use source to check can_read +bool DataQueue::has_data_or_finished(int child_idx) { + return remaining_has_data() || _is_finished[child_idx]; +} + +//check which queue have data, and save the idx in _flag_queue_idx, +//so next loop, will check the record idx + 1 first +//maybe it's useful with many queue, others maybe always 0 +bool DataQueue::remaining_has_data() { + int count = _child_count - 1; + while (count >= 0) { + _flag_queue_idx = (_flag_queue_idx + 1) % _child_count; + if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) { + return true; + } + count--; + } + return false; +} + +//the _flag_queue_idx indicate which queue has data, and in check can_read +//will be set idx in remaining_has_data function +Status DataQueue::get_block_from_queue(std::unique_ptr* output_block, + int* child_idx) { + if (_is_canceled[_flag_queue_idx]) { + return Status::InternalError("Current queue of idx {} have beed canceled: ", + _flag_queue_idx); + } + + { + std::lock_guard l(*_queue_blocks_lock[_flag_queue_idx]); + if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) { + *output_block = std::move(_queue_blocks[_flag_queue_idx].front()); + _queue_blocks[_flag_queue_idx].pop_front(); + if (child_idx) { + *child_idx = _flag_queue_idx; + } + _cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes(); + _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1; + } else { + if (_is_finished[_flag_queue_idx]) { + _data_exhausted = true; + } + } + } + return Status::OK(); +} + +void DataQueue::push_block(std::unique_ptr block, int child_idx) { + if (!block) { + return; + } + { + std::lock_guard l(*_queue_blocks_lock[child_idx]); + _cur_bytes_in_queue[child_idx] += block->allocated_bytes(); + _queue_blocks[child_idx].emplace_back(std::move(block)); + _cur_blocks_nums_in_queue[child_idx] += 1; + //this only use to record the queue[0] for profile + _max_bytes_in_queue = std::max(_max_bytes_in_queue, _cur_bytes_in_queue[0].load()); + _max_size_of_queue = std::max(_max_size_of_queue, (int64)_queue_blocks[0].size()); + } +} + +void DataQueue::set_finish(int child_idx) { + _is_finished[child_idx] = true; +} + +void DataQueue::set_canceled(int child_idx) { + DCHECK(!_is_finished[child_idx]); + _is_canceled[child_idx] = true; + _is_finished[child_idx] = true; +} + +bool DataQueue::is_finish(int child_idx) { + return _is_finished[child_idx]; +} + +bool DataQueue::is_all_finish() { + for (int i = 0; i < _child_count; ++i) { + if (_is_finished[i] == false) { + return false; + } + } + return true; +} + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h new file mode 100644 index 0000000000..978fe6e629 --- /dev/null +++ b/be/src/pipeline/exec/data_queue.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include +#include +#include + +#include "vec/core/block.h" +namespace doris { +namespace vectorized { +class Block; +} +namespace pipeline { + +class DataQueue { +public: + //always one is enough, but in union node it's has more children + DataQueue(int child_count = 1); + ~DataQueue() = default; + + Status get_block_from_queue(std::unique_ptr* block, + int* child_idx = nullptr); + + void push_block(std::unique_ptr block, int child_idx = 0); + + std::unique_ptr get_free_block(int child_idx = 0); + + void push_free_block(std::unique_ptr output_block, int child_idx = 0); + + void set_finish(int child_idx = 0); + void set_canceled(int child_idx = 0); // should set before finish + bool is_finish(int child_idx = 0); + bool is_all_finish(); + + bool has_enough_space_to_push(int child_idx = 0); + bool has_data_or_finished(int child_idx = 0); + bool remaining_has_data(); + + int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; } + int64_t max_size_of_queue() const { return _max_size_of_queue; } + + bool data_exhausted() const { return _data_exhausted; } + +private: + std::vector> _queue_blocks_lock; + std::vector>> _queue_blocks; + + std::vector> _free_blocks_lock; + std::vector>> _free_blocks; + + //how many deque will be init, always will be one + int _child_count = 0; + std::deque> _is_finished; + std::deque> _is_canceled; + // int64_t just for counter of profile + std::deque> _cur_bytes_in_queue; + std::deque> _cur_blocks_nums_in_queue; + + //this will be indicate which queue has data, it's useful when have many queues + std::atomic _flag_queue_idx = 0; + // only used by streaming agg source operator + bool _data_exhausted = false; + + //this only use to record the queue[0] for profile + int64_t _max_bytes_in_queue = 0; + int64_t _max_size_of_queue = 0; + static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10; +}; +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index d4049476ac..ae271ebb14 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -23,8 +23,8 @@ namespace doris::pipeline { StreamingAggSinkOperator::StreamingAggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* agg_node, - std::shared_ptr agg_context) - : StreamingOperator(operator_builder, agg_node), _agg_context(std::move(agg_context)) {} + std::shared_ptr queue) + : StreamingOperator(operator_builder, agg_node), _data_queue(std::move(queue)) {} Status StreamingAggSinkOperator::prepare(RuntimeState* state) { RETURN_IF_ERROR(StreamingOperator::prepare(state)); @@ -36,7 +36,7 @@ Status StreamingAggSinkOperator::prepare(RuntimeState* state) { bool StreamingAggSinkOperator::can_write() { // sink and source in diff threads - return _agg_context->has_enough_space_to_push(); + return _data_queue->has_enough_space_to_push(); } Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in_block, @@ -44,39 +44,38 @@ Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in SCOPED_TIMER(_runtime_profile->total_time_counter()); Status ret = Status::OK(); if (in_block && in_block->rows() > 0) { - auto block_from_ctx = _agg_context->get_free_block(); + auto block_from_ctx = _data_queue->get_free_block(); RETURN_IF_ERROR(_node->do_pre_agg(in_block, block_from_ctx.get())); if (block_from_ctx->rows() == 0) { - _agg_context->return_free_block(std::move(block_from_ctx)); + _data_queue->push_free_block(std::move(block_from_ctx)); } else { - _agg_context->push_block(std::move(block_from_ctx)); + _data_queue->push_block(std::move(block_from_ctx)); } } if (UNLIKELY(source_state == SourceState::FINISHED)) { - _agg_context->set_finish(); + _data_queue->set_finish(); } return Status::OK(); } Status StreamingAggSinkOperator::close(RuntimeState* state) { - if (_agg_context && !_agg_context->is_finish()) { + if (_data_queue && !_data_queue->is_finish()) { // finish should be set, if not set here means error. - _agg_context->set_canceled(); + _data_queue->set_canceled(); } - COUNTER_SET(_queue_size_counter, _agg_context->max_size_of_queue()); - COUNTER_SET(_queue_byte_size_counter, _agg_context->max_bytes_in_queue()); + COUNTER_SET(_queue_size_counter, _data_queue->max_size_of_queue()); + COUNTER_SET(_queue_byte_size_counter, _data_queue->max_bytes_in_queue()); return StreamingOperator::close(state); - ; } -StreamingAggSinkOperatorBuilder::StreamingAggSinkOperatorBuilder( - int32_t id, ExecNode* exec_node, std::shared_ptr agg_context) +StreamingAggSinkOperatorBuilder::StreamingAggSinkOperatorBuilder(int32_t id, ExecNode* exec_node, + std::shared_ptr queue) : OperatorBuilder(id, "StreamingAggSinkOperator", exec_node), - _agg_context(std::move(agg_context)) {} + _data_queue(std::move(queue)) {} OperatorPtr StreamingAggSinkOperatorBuilder::build_operator() { - return std::make_shared(this, _node, _agg_context); + return std::make_shared(this, _node, _data_queue); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index 26e080c2a3..55c7c33db6 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -17,8 +17,8 @@ #pragma once -#include "agg_context.h" #include "operator.h" +#include "pipeline/exec/data_queue.h" namespace doris { namespace vectorized { @@ -31,7 +31,7 @@ namespace pipeline { class StreamingAggSinkOperatorBuilder final : public OperatorBuilder { public: - StreamingAggSinkOperatorBuilder(int32_t, ExecNode*, std::shared_ptr); + StreamingAggSinkOperatorBuilder(int32_t, ExecNode*, std::shared_ptr); OperatorPtr build_operator() override; @@ -39,13 +39,13 @@ public: bool is_source() const override { return false; }; private: - std::shared_ptr _agg_context; + std::shared_ptr _data_queue; }; class StreamingAggSinkOperator final : public StreamingOperator { public: StreamingAggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode*, - std::shared_ptr); + std::shared_ptr); Status prepare(RuntimeState*) override; @@ -61,7 +61,7 @@ private: RuntimeProfile::Counter* _queue_byte_size_counter; RuntimeProfile::Counter* _queue_size_counter; - std::shared_ptr _agg_context; + std::shared_ptr _data_queue; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp index 5451ae86b9..f048cd4b62 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp @@ -22,27 +22,27 @@ namespace doris { namespace pipeline { StreamingAggSourceOperator::StreamingAggSourceOperator(OperatorBuilderBase* templ, ExecNode* node, - std::shared_ptr agg_context) - : SourceOperator(templ, node), _agg_context(std::move(agg_context)) {} + std::shared_ptr queue) + : SourceOperator(templ, node), _data_queue(std::move(queue)) {} bool StreamingAggSourceOperator::can_read() { - return _agg_context->has_data_or_finished(); + return _data_queue->has_data_or_finished(); } Status StreamingAggSourceOperator::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); bool eos = false; - if (!_agg_context->data_exhausted()) { + if (!_data_queue->data_exhausted()) { std::unique_ptr agg_block; - RETURN_IF_ERROR(_agg_context->get_block(&agg_block)); + RETURN_IF_ERROR(_data_queue->get_block_from_queue(&agg_block)); - if (_agg_context->data_exhausted()) { + if (_data_queue->data_exhausted()) { RETURN_IF_ERROR(_node->pull(state, block, &eos)); } else { block->swap(*agg_block); agg_block->clear_column_data(_node->row_desc().num_materialized_slots()); - _agg_context->return_free_block(std::move(agg_block)); + _data_queue->push_free_block(std::move(agg_block)); } } else { RETURN_IF_ERROR(_node->pull(state, block, &eos)); @@ -54,12 +54,12 @@ Status StreamingAggSourceOperator::get_block(RuntimeState* state, vectorized::Bl } StreamingAggSourceOperatorBuilder::StreamingAggSourceOperatorBuilder( - int32_t id, ExecNode* exec_node, std::shared_ptr agg_context) + int32_t id, ExecNode* exec_node, std::shared_ptr queue) : OperatorBuilder(id, "StreamingAggSourceOperator", exec_node), - _agg_context(std::move(agg_context)) {} + _data_queue(std::move(queue)) {} OperatorPtr StreamingAggSourceOperatorBuilder::build_operator() { - return std::make_shared(this, _node, _agg_context); + return std::make_shared(this, _node, _data_queue); } } // namespace pipeline diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h b/be/src/pipeline/exec/streaming_aggregation_source_operator.h index 751d0b8e92..74b4be5420 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h @@ -18,8 +18,8 @@ #include -#include "agg_context.h" #include "operator.h" +#include "pipeline/exec/data_queue.h" namespace doris { namespace vectorized { @@ -30,25 +30,25 @@ namespace pipeline { class StreamingAggSourceOperatorBuilder final : public OperatorBuilder { public: - StreamingAggSourceOperatorBuilder(int32_t, ExecNode*, std::shared_ptr); + StreamingAggSourceOperatorBuilder(int32_t, ExecNode*, std::shared_ptr); bool is_source() const override { return true; } OperatorPtr build_operator() override; private: - std::shared_ptr _agg_context; + std::shared_ptr _data_queue; }; class StreamingAggSourceOperator final : public SourceOperator { public: - StreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, std::shared_ptr); + StreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, std::shared_ptr); bool can_read() override; Status get_block(RuntimeState*, vectorized::Block*, SourceState& source_state) override; Status open(RuntimeState*) override { return Status::OK(); } private: - std::shared_ptr _agg_context; + std::shared_ptr _data_queue; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp new file mode 100644 index 0000000000..8e02877cdd --- /dev/null +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "union_sink_operator.h" + +#include "common/status.h" + +namespace doris::pipeline { + +UnionSinkOperatorBuilder::UnionSinkOperatorBuilder(int32_t id, int child_id, ExecNode* node, + std::shared_ptr queue) + : OperatorBuilder(id, "UnionSinkOperatorBuilder", node), + _cur_child_id(child_id), + _data_queue(queue) {}; + +UnionSinkOperator::UnionSinkOperator(OperatorBuilderBase* operator_builder, int child_id, + ExecNode* node, std::shared_ptr queue) + : StreamingOperator(operator_builder, node), _cur_child_id(child_id), _data_queue(queue) {}; + +OperatorPtr UnionSinkOperatorBuilder::build_operator() { + return std::make_shared(this, _cur_child_id, _node, _data_queue); +} + +Status UnionSinkOperator::sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (_output_block == nullptr) { + _output_block = _data_queue->get_free_block(_cur_child_id); + } + + if (_cur_child_id < _node->get_first_materialized_child_idx()) { //pass_through + if (in_block->rows() > 0) { + _output_block->swap(*in_block); + _data_queue->push_block(std::move(_output_block), _cur_child_id); + } + } else if (_node->get_first_materialized_child_idx() != _node->children_count() && + _cur_child_id < _node->children_count()) { //need materialized + this->_node->materialize_child_block(state, _cur_child_id, in_block, _output_block.get()); + } else { + return Status::InternalError("maybe can't reach here, execute const expr: {}, {}, {}", + _cur_child_id, _node->get_first_materialized_child_idx(), + _node->children_count()); + } + + if (UNLIKELY(source_state == SourceState::FINISHED)) { + //if _cur_child_id eos, need check to push block + //Now here can't check _output_block rows, even it's row==0, also need push block + //because maybe sink is eos and queue have none data, if not push block + //the source can't can_read again and can't set source finished + if (_output_block) { + _data_queue->push_block(std::move(_output_block), _cur_child_id); + } + _data_queue->set_finish(_cur_child_id); + return Status::OK(); + } + // not eos and block rows is enough to output,so push block + if (_output_block && (_output_block->rows() >= state->batch_size())) { + _data_queue->push_block(std::move(_output_block), _cur_child_id); + } + return Status::OK(); +} + +Status UnionSinkOperator::close(RuntimeState* state) { + if (_data_queue && !_data_queue->is_finish(_cur_child_id)) { + // finish should be set, if not set here means error. + _data_queue->set_canceled(_cur_child_id); + } + return StreamingOperator::close(state); +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h new file mode 100644 index 0000000000..7bad62a4c6 --- /dev/null +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "pipeline/exec/data_queue.h" +#include "vec/core/block.h" +namespace doris { +namespace vectorized { +class VUnionNode; +class Block; +} // namespace vectorized + +namespace pipeline { + +class UnionSinkOperatorBuilder final : public OperatorBuilder { +public: + UnionSinkOperatorBuilder(int32_t id, int child_id, ExecNode* node, + std::shared_ptr queue); + + OperatorPtr build_operator() override; + + bool is_sink() const override { return true; }; + +private: + int _cur_child_id; + std::shared_ptr _data_queue; +}; + +class UnionSinkOperator final : public StreamingOperator { +public: + UnionSinkOperator(OperatorBuilderBase* operator_builder, int child_id, ExecNode* node, + std::shared_ptr queue); + + bool can_write() override { return true; }; + + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override; + // this operator in sink open directly return, do this work in source + Status open(RuntimeState* /*state*/) override { return Status::OK(); } + + Status close(RuntimeState* state) override; + +private: + int _cur_child_id; + std::shared_ptr _data_queue; + std::unique_ptr _output_block; +}; +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp new file mode 100644 index 0000000000..08855b849c --- /dev/null +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/union_source_operator.h" + +#include + +#include "common/status.h" +#include "pipeline/exec/data_queue.h" + +namespace doris { +namespace vectorized { +class Block; +} + +namespace pipeline { + +UnionSourceOperatorBuilder::UnionSourceOperatorBuilder(int32_t id, ExecNode* node, + std::shared_ptr queue) + : OperatorBuilder(id, "UnionSourceOperatorBuilder", node), _data_queue(queue) {}; + +OperatorPtr UnionSourceOperatorBuilder::build_operator() { + return std::make_shared(this, _node, _data_queue); +} + +UnionSourceOperator::UnionSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* node, + std::shared_ptr queue) + : SourceOperator(operator_builder, node), + _data_queue(queue), + _need_read_for_const_expr(true) {}; + +// we assumed it can read to process const expr, Although we don't know whether there is +// ,and queue have data, could read also +bool UnionSourceOperator::can_read() { + return _need_read_for_const_expr || _data_queue->remaining_has_data(); +} + +Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + // here we precess const expr firstly + if (_need_read_for_const_expr) { + if (this->_node->has_more_const(state)) { + this->_node->get_next_const(state, block); + } + _need_read_for_const_expr = this->_node->has_more_const(state); + } else { + std::unique_ptr output_block; + int child_idx = 0; + _data_queue->get_block_from_queue(&output_block, &child_idx); + block->swap(*output_block); + output_block->clear_column_data(_node->row_desc().num_materialized_slots()); + _data_queue->push_free_block(std::move(output_block), child_idx); + } + + bool reached_limit = false; + this->_node->reached_limit(block, &reached_limit); + //have exectue const expr, queue have no data any more, and child could be colsed + source_state = ((!_need_read_for_const_expr && !_data_queue->remaining_has_data() && + _data_queue->is_all_finish()) || + reached_limit) + ? SourceState::FINISHED + : SourceState::DEPEND_ON_SOURCE; + return Status::OK(); +} +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h new file mode 100644 index 0000000000..53e92e9ab7 --- /dev/null +++ b/be/src/pipeline/exec/union_source_operator.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "operator.h" +#include "pipeline/exec/data_queue.h" + +namespace doris { +namespace vectorized { +class VUnionNode; +} + +namespace pipeline { + +class UnionSourceOperatorBuilder final : public OperatorBuilder { +public: + UnionSourceOperatorBuilder(int32_t id, ExecNode* node, std::shared_ptr); + + bool is_source() const override { return true; } + + OperatorPtr build_operator() override; + +private: + std::shared_ptr _data_queue; +}; + +class UnionSourceOperator final : public SourceOperator { +public: + UnionSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* node, + std::shared_ptr); + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + bool can_read() override; + +private: + std::shared_ptr _data_queue; + bool _need_read_for_const_expr; +}; + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index d87afb8321..36bf365a46 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -20,7 +20,6 @@ #include #include -#include "exec/agg_context.h" #include "exec/aggregation_sink_operator.h" #include "exec/aggregation_source_operator.h" #include "exec/analytic_sink_operator.h" @@ -52,11 +51,14 @@ #include "pipeline/exec/assert_num_rows_operator.h" #include "pipeline/exec/broker_scan_operator.h" #include "pipeline/exec/const_value_operator.h" +#include "pipeline/exec/data_queue.h" #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_table_sink_operator.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/table_function_operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/exec/union_source_operator.h" #include "pipeline_task.h" #include "runtime/client_cache.h" #include "runtime/fragment_mgr.h" @@ -346,14 +348,24 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur } case TPlanNodeType::UNION_NODE: { auto* union_node = assert_cast(node); - if (union_node->children_count() == 0) { + if (union_node->children_count() == 0 && + union_node->get_first_materialized_child_idx() == 0) { // only have const expr OperatorBuilderPtr builder = std::make_shared(next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(builder)); } else { - return Status::InternalError( - "Unsupported exec type in pipeline: {}, later will be support.", - print_plan_node_type(node_type)); + int child_count = union_node->children_count(); + auto data_queue = std::make_shared(child_count); + for (int child_id = 0; child_id < child_count; ++child_id) { + auto new_child_pipeline = add_pipeline(); + RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), new_child_pipeline)); + OperatorBuilderPtr child_sink_builder = std::make_shared( + next_operator_builder_id(), child_id, union_node, data_queue); + RETURN_IF_ERROR(new_child_pipeline->set_sink(child_sink_builder)); + } + OperatorBuilderPtr source_builder = std::make_shared( + next_operator_builder_id(), union_node, data_queue); + RETURN_IF_ERROR(cur_pipe->add_operator(source_builder)); } break; } @@ -362,13 +374,13 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur auto new_pipe = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe)); if (agg_node->is_streaming_preagg()) { - auto agg_ctx = std::make_shared(); + auto data_queue = std::make_shared(1); OperatorBuilderPtr pre_agg_sink = std::make_shared( - next_operator_builder_id(), agg_node, agg_ctx); + next_operator_builder_id(), agg_node, data_queue); RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink)); OperatorBuilderPtr pre_agg_source = std::make_shared( - next_operator_builder_id(), agg_node, agg_ctx); + next_operator_builder_id(), agg_node, data_queue); RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source)); } else { OperatorBuilderPtr agg_sink = diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index a9662214f1..7b8381bc15 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -17,6 +17,8 @@ #include "vec/exec/vunion_node.h" +#include + #include "gen_cpp/PlanNodes_types.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" @@ -157,7 +159,7 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) { child(_child_idx)->get_next_span(), _child_eos); SCOPED_TIMER(_materialize_exprs_evaluate_timer); if (child_block.rows() > 0) { - mblock.merge(materialize_block(&child_block)); + mblock.merge(materialize_block(&child_block, _child_idx)); } // It shouldn't be the case that we reached the limit because we shouldn't have // incremented '_num_rows_returned' yet. @@ -190,7 +192,8 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) { mem_reuse ? MutableBlock::build_mutable_block(block) : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name( _row_descriptor))); - for (; _const_expr_list_idx < _const_expr_lists.size(); ++_const_expr_list_idx) { + for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() <= state->batch_size(); + ++_const_expr_list_idx) { Block tmp_block; tmp_block.insert({vectorized::ColumnUInt8::create(1), std::make_shared(), ""}); @@ -203,6 +206,7 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) { tmp_block.erase_not_in(result_list); if (tmp_block.rows() > 0) { mblock.merge(tmp_block); + tmp_block.clear(); } } @@ -220,6 +224,27 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) { return Status::OK(); } +//for pipeline operator +Status VUnionNode::materialize_child_block(RuntimeState* state, int child_id, + vectorized::Block* input_block, + vectorized::Block* output_block) { + DCHECK_LT(child_id, _children.size()); + DCHECK(!is_child_passthrough(child_id)); + bool mem_reuse = output_block->mem_reuse(); + MutableBlock mblock = + mem_reuse ? MutableBlock::build_mutable_block(output_block) + : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name( + _row_descriptor))); + + if (input_block->rows() > 0) { + mblock.merge(materialize_block(input_block, child_id)); + if (!mem_reuse) { + output_block->swap(mblock.to_block()); + } + } + return Status::OK(); +} + Status VUnionNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VUnionNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); @@ -286,8 +311,8 @@ void VUnionNode::debug_string(int indentation_level, std::stringstream* out) con *out << ")" << std::endl; } -Block VUnionNode::materialize_block(Block* src_block) { - const std::vector& child_exprs = _child_expr_lists[_child_idx]; +Block VUnionNode::materialize_block(Block* src_block, int child_idx) { + const std::vector& child_exprs = _child_expr_lists[child_idx]; ColumnsWithTypeAndName colunms; for (size_t i = 0; i < child_exprs.size(); ++i) { int result_column_id = -1; diff --git a/be/src/vec/exec/vunion_node.h b/be/src/vec/exec/vunion_node.h index 5433e2b27f..ccc683452b 100644 --- a/be/src/vec/exec/vunion_node.h +++ b/be/src/vec/exec/vunion_node.h @@ -36,9 +36,22 @@ public: Status alloc_resource(RuntimeState* state) override; void release_resource(RuntimeState* state) override; + Status materialize_child_block(RuntimeState* state, int child_id, + vectorized::Block* input_block, vectorized::Block* output_block); size_t children_count() const { return _children.size(); } + int get_first_materialized_child_idx() const { return _first_materialized_child_idx; } + + /// Returns true if there are still rows to be returned from constant expressions. + bool has_more_const(const RuntimeState* state) const { + return state->per_fragment_instance_idx() == 0 && + _const_expr_list_idx < _const_expr_lists.size(); + } + + /// GetNext() for the constant expression case. + Status get_next_const(RuntimeState* state, Block* block); + private: /// Const exprs materialized by this node. These exprs don't refer to any children. /// Only materialized by the first fragment instance to avoid duplication. @@ -76,13 +89,10 @@ private: /// non-passthrough child. Status get_next_materialized(RuntimeState* state, Block* block); - /// GetNext() for the constant expression case. - Status get_next_const(RuntimeState* state, Block* block); - /// Evaluates exprs for the current child and materializes the results into 'tuple_buf', /// which is attached to 'dst_block'. Runs until 'dst_block' is at capacity, or all rows /// have been consumed from the current child block. Updates '_child_row_idx'. - Block materialize_block(Block* dst_block); + Block materialize_block(Block* dst_block, int child_idx); Status get_error_msg(const std::vector& exprs); @@ -101,12 +111,6 @@ private: return _first_materialized_child_idx != _children.size() && _child_idx < _children.size(); } - /// Returns true if there are still rows to be returned from constant expressions. - bool has_more_const(const RuntimeState* state) const { - return state->per_fragment_instance_idx() == 0 && - _const_expr_list_idx < _const_expr_lists.size(); - } - void debug_string(int indentation_level, std::stringstream* out) const override; };