diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index a83d8bebd3..5604d2b43d 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -19,6 +19,7 @@ #include +#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/streaming_aggregation_sink_operator.h" #include "runtime/primitive_type.h" @@ -946,10 +947,12 @@ Status AggSinkLocalState::close(RuntimeState* state) { } class StreamingAggSinkLocalState; +class DistinctStreamingAggSinkLocalState; template class AggSinkOperatorX; template class AggSinkOperatorX; +template class AggSinkOperatorX; template class AggSinkLocalState; template class AggSinkLocalState; - +template class AggSinkLocalState; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index dcb6b2e497..46cf5906a2 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -352,6 +352,7 @@ protected: template friend class AggSinkLocalState; friend class StreamingAggSinkLocalState; + friend class DistinctStreamingAggSinkLocalState; std::vector _aggregate_evaluators; bool _can_short_circuit = false; diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index a5741e0a91..274424f1f9 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -65,6 +65,8 @@ protected: friend class AggSourceOperatorX; friend class StreamingAggSourceOperatorX; friend class StreamingAggSinkOperatorX; + friend class DistinctStreamingAggSourceOperatorX; + friend class DistinctStreamingAggSinkOperatorX; void _close_without_key(); void _close_with_serialized_key(); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp index 635817973c..a8e9f2b643 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp @@ -19,6 +19,7 @@ #include +#include #include #include "common/compiler_util.h" // IWYU pragma: keep @@ -94,4 +95,169 @@ OperatorPtr DistinctStreamingAggSinkOperatorBuilder::build_operator() { return std::make_shared(this, _node, _data_queue); } +DistinctStreamingAggSinkLocalState::DistinctStreamingAggSinkLocalState( + DataSinkOperatorXBase* parent, RuntimeState* state) + : AggSinkLocalState(parent, state), + dummy_mapped_data(std::make_shared('A')) {} + +Status DistinctStreamingAggSinkLocalState::_distinct_pre_agg_with_serialized_key( + doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) { + SCOPED_TIMER(_build_timer); + DCHECK(!_shared_state->probe_expr_ctxs.empty()); + + size_t key_size = _shared_state->probe_expr_ctxs.size(); + vectorized::ColumnRawPtrs key_columns(key_size); + { + SCOPED_TIMER(_expr_timer); + for (size_t i = 0; i < key_size; ++i) { + int result_column_id = -1; + RETURN_IF_ERROR( + _shared_state->probe_expr_ctxs[i]->execute(in_block, &result_column_id)); + in_block->get_by_position(result_column_id).column = + in_block->get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + key_columns[i] = in_block->get_by_position(result_column_id).column.get(); + } + } + + int rows = in_block->rows(); + _distinct_row.clear(); + _distinct_row.reserve(rows); + + RETURN_IF_CATCH_EXCEPTION( + _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows)); + + bool mem_reuse = _dependency->make_nullable_keys().empty() && out_block->mem_reuse(); + if (mem_reuse) { + for (int i = 0; i < key_size; ++i) { + auto dst = out_block->get_by_position(i).column->assume_mutable(); + key_columns[i]->append_data_by_selector(dst, _distinct_row); + } + } else { + vectorized::ColumnsWithTypeAndName columns_with_schema; + for (int i = 0; i < key_size; ++i) { + auto distinct_column = key_columns[i]->clone_empty(); + key_columns[i]->append_data_by_selector(distinct_column, _distinct_row); + columns_with_schema.emplace_back( + std::move(distinct_column), + _shared_state->probe_expr_ctxs[i]->root()->data_type(), + _shared_state->probe_expr_ctxs[i]->root()->expr_name()); + } + out_block->swap(vectorized::Block(columns_with_schema)); + } + return Status::OK(); +} + +void DistinctStreamingAggSinkLocalState::_emplace_into_hash_table_to_distinct( + vectorized::IColumn::Selector& distinct_row, vectorized::ColumnRawPtrs& key_columns, + const size_t num_rows) { + std::visit( + [&](auto&& agg_method) -> void { + SCOPED_TIMER(_hash_table_compute_timer); + using HashMethodType = std::decay_t; + using HashTableType = std::decay_t; + using AggState = typename HashMethodType::State; + AggState state(key_columns, _shared_state->probe_key_sz, nullptr); + _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); + + if constexpr (HashTableTraits::is_phmap) { + auto keys = state.get_keys(num_rows); + if (_hash_values.size() < num_rows) { + _hash_values.resize(num_rows); + } + + for (size_t i = 0; i < num_rows; ++i) { + _hash_values[i] = agg_method.data.hash(keys[i]); + } + SCOPED_TIMER(_hash_table_emplace_timer); + for (size_t i = 0; i < num_rows; ++i) { + if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { + agg_method.data.prefetch_by_hash( + _hash_values[i + HASH_MAP_PREFETCH_DIST]); + } + auto result = state.emplace_with_key( + agg_method.data, state.pack_key_holder(keys[i], *_agg_arena_pool), + _hash_values[i], i); + if (result.is_inserted()) { + distinct_row.push_back(i); + } + } + } else { + SCOPED_TIMER(_hash_table_emplace_timer); + for (size_t i = 0; i < num_rows; ++i) { + auto result = state.emplace_key(agg_method.data, i, *_agg_arena_pool); + if (result.is_inserted()) { + result.set_mapped(dummy_mapped_data.get()); + distinct_row.push_back(i); + } + } + } + COUNTER_UPDATE(_hash_table_input_counter, num_rows); + }, + _agg_data->method_variant); +} + +DistinctStreamingAggSinkOperatorX::DistinctStreamingAggSinkOperatorX(ObjectPool* pool, + const TPlanNode& tnode, + const DescriptorTbl& descs) + : AggSinkOperatorX(pool, tnode, descs) {} + +Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(AggSinkOperatorX::init(tnode, state)); + _name = "DISTINCT_STREAMING_AGGREGATION_SINK_OPERATOR"; + return Status::OK(); +} + +Status DistinctStreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) { + auto& local_state = + state->get_sink_local_state(id())->cast(); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + local_state._shared_state->input_num_rows += in_block->rows(); + Status ret = Status::OK(); + if (in_block && in_block->rows() > 0) { + if (local_state._output_block == nullptr) { + local_state._output_block = local_state._shared_state->data_queue->get_free_block(); + } + RETURN_IF_ERROR(local_state._distinct_pre_agg_with_serialized_key( + in_block, local_state._output_block.get())); + + // get enough data or reached limit rows, need push block to queue + if (_limit != -1 && + (local_state._output_block->rows() + local_state._output_distinct_rows) >= _limit) { + auto limit_rows = _limit - local_state._output_distinct_rows; + local_state._output_block->set_num_rows(limit_rows); + local_state._output_distinct_rows += limit_rows; + local_state._shared_state->data_queue->push_block(std::move(local_state._output_block)); + } else if (local_state._output_block->rows() >= state->batch_size()) { + local_state._output_distinct_rows += local_state._output_block->rows(); + local_state._shared_state->data_queue->push_block(std::move(local_state._output_block)); + } + } + + // reach limit or source finish + if ((UNLIKELY(source_state == SourceState::FINISHED)) || + (_limit != -1 && local_state._output_distinct_rows >= _limit)) { + if (local_state._output_block != nullptr) { //maybe the last block with eos + local_state._output_distinct_rows += local_state._output_block->rows(); + local_state._shared_state->data_queue->push_block(std::move(local_state._output_block)); + } + local_state._shared_state->data_queue->set_finish(); + return Status::Error(""); // need given finish signal + } + return Status::OK(); +} + +Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); + } + if (_shared_state->data_queue && !_shared_state->data_queue->is_finish()) { + // finish should be set, if not set here means error. + _shared_state->data_queue->set_canceled(); + } + return Base::close(state); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h index 46b1dda008..2693ad2dfe 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h @@ -22,8 +22,11 @@ #include #include +#include "aggregation_sink_operator.h" #include "common/status.h" #include "operator.h" +#include "pipeline/exec/aggregation_sink_operator.h" +#include "pipeline/exec/aggregation_source_operator.h" #include "util/runtime_profile.h" #include "vec/core/block.h" #include "vec/exec/distinct_vaggregation_node.h" @@ -72,5 +75,50 @@ private: std::unique_ptr _output_block = vectorized::Block::create_unique(); }; +class DistinctStreamingAggSinkOperatorX; + +class DistinctStreamingAggSinkLocalState final + : public AggSinkLocalState { +public: + using Parent = DistinctStreamingAggSinkOperatorX; + using Base = AggSinkLocalState; + ENABLE_FACTORY_CREATOR(DistinctStreamingAggSinkLocalState); + DistinctStreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); + Status init(RuntimeState* state, LocalSinkStateInfo& info) override { + RETURN_IF_ERROR(Base::init(state, info)); + _shared_state->data_queue.reset(new DataQueue(1, _dependency)); + return Status::OK(); + } + + Status close(RuntimeState* state) override; + Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block, + vectorized::Block* out_block); + +private: + friend class DistinctStreamingAggSinkOperatorX; + void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector& distinct_row, + vectorized::ColumnRawPtrs& key_columns, + const size_t num_rows); + + std::unique_ptr _output_block = vectorized::Block::create_unique(); + std::shared_ptr dummy_mapped_data = nullptr; + vectorized::IColumn::Selector _distinct_row; + int64_t _output_distinct_rows = 0; +}; + +class DistinctStreamingAggSinkOperatorX final + : public AggSinkOperatorX { +public: + DistinctStreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs); + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override; + + WriteDependency* wait_for_dependency(RuntimeState* state) override { + return state->get_local_state(id())->cast()._dependency->write_blocked_by(); + } +}; + } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp index fb653bdcbd..bed911d130 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp @@ -88,5 +88,53 @@ OperatorPtr DistinctStreamingAggSourceOperatorBuilder::build_operator() { return std::make_shared(this, _node, _data_queue); } +DistinctStreamingAggSourceOperatorX::DistinctStreamingAggSourceOperatorX(ObjectPool* pool, + const TPlanNode& tnode, + const DescriptorTbl& descs) + : Base(pool, tnode, descs) { + if (tnode.agg_node.__isset.use_streaming_preaggregation) { + _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; + if (_is_streaming_preagg) { + DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping"; + DCHECK(_limit == -1) << "Preaggs have no limits"; + } + } else { + _is_streaming_preagg = false; + } +} + +Status DistinctStreamingAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + auto& local_state = state->get_local_state(id())->cast(); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + std::unique_ptr agg_block; + RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block)); + if (agg_block != nullptr) { + block->swap(*agg_block); + agg_block->clear_column_data(block->columns()); + local_state._shared_state->data_queue->push_free_block(std::move(agg_block)); + } + + local_state._dependency->_make_nullable_output_key(block); + if (_is_streaming_preagg == false) { + // dispose the having clause, should not be execute in prestreaming agg + RETURN_IF_ERROR( + vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + } + + if (UNLIKELY(local_state._shared_state->data_queue->data_exhausted())) { + source_state = SourceState::FINISHED; + } else { + source_state = SourceState::DEPEND_ON_SOURCE; + } + return Status::OK(); +} + +Status DistinctStreamingAggSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + _op_name = "DISTINCT_STREAMING_AGGREGATION_OPERATOR"; + return Status::OK(); +} + } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h index 3534193bf8..78edf4815f 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h @@ -23,6 +23,7 @@ #include "common/status.h" #include "operator.h" +#include "pipeline/exec/aggregation_source_operator.h" #include "vec/exec/distinct_vaggregation_node.h" #include "vec/exec/vaggregation_node.h" @@ -63,5 +64,19 @@ private: std::shared_ptr _data_queue; }; +class DistinctStreamingAggSourceOperatorX final : public AggSourceOperatorX { +public: + using Base = AggSourceOperatorX; + DistinctStreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs); + ~DistinctStreamingAggSourceOperatorX() = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + bool _is_streaming_preagg = false; +}; + } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp new file mode 100644 index 0000000000..7da6f0c5cd --- /dev/null +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "partition_sort_sink_operator.h" + +#include "common/status.h" + +namespace doris { + +namespace pipeline { + +OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() { + return std::make_shared(this, _node); +} + +Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); + auto& p = _parent->cast(); + RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs)); + _partition_expr_ctxs.resize(p._partition_expr_ctxs.size()); + _partition_columns.resize(p._partition_expr_ctxs.size()); + for (size_t i = 0; i < p._partition_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state, _partition_expr_ctxs[i])); + } + _partition_exprs_num = p._partition_exprs_num; + _partitioned_data = std::make_unique(); + _agg_arena_pool = std::make_unique(); + _hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", TUnit::UNIT); + _build_timer = ADD_TIMER(_profile, "HashTableBuildTime"); + _partition_sort_timer = ADD_TIMER(_profile, "PartitionSortTime"); + _get_sorted_timer = ADD_TIMER(_profile, "GetSortedTime"); + _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime"); + _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime"); + _init_hash_method(); + return Status::OK(); +} + +PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : DataSinkOperatorX(tnode.node_id), + _pool(pool), + _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), + _limit(tnode.limit) {} + +Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); + + //order by key + if (tnode.partition_sort_node.__isset.sort_info) { + RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.partition_sort_node.sort_info, _pool)); + _is_asc_order = tnode.partition_sort_node.sort_info.is_asc_order; + _nulls_first = tnode.partition_sort_node.sort_info.nulls_first; + } + //partition by key + if (tnode.partition_sort_node.__isset.partition_exprs) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees( + tnode.partition_sort_node.partition_exprs, _partition_expr_ctxs)); + _partition_exprs_num = _partition_expr_ctxs.size(); + } + + _has_global_limit = tnode.partition_sort_node.has_global_limit; + _top_n_algorithm = tnode.partition_sort_node.top_n_algorithm; + _partition_inner_limit = tnode.partition_sort_node.partition_inner_limit; + return Status::OK(); +} + +Status PartitionSortSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_partition_expr_ctxs, state, _child_x->row_desc())); + return Status::OK(); +} + +Status PartitionSortSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); + RETURN_IF_ERROR(vectorized::VExpr::open(_partition_expr_ctxs, state)); + return Status::OK(); +} + +Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* input_block, + SourceState source_state) { + auto& local_state = state->get_sink_local_state(id())->cast(); + auto current_rows = input_block->rows(); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + if (current_rows > 0) { + local_state.child_input_rows = local_state.child_input_rows + current_rows; + if (UNLIKELY(_partition_exprs_num == 0)) { + if (UNLIKELY(local_state._value_places.empty())) { + local_state._value_places.push_back(_pool->add(new vectorized::PartitionBlocks())); + } + //no partition key + local_state._value_places[0]->append_whole_block(input_block, _child_x->row_desc()); + } else { + //just simply use partition num to check + if (local_state._num_partition > config::partition_topn_partition_threshold && + local_state.child_input_rows < 10000 * local_state._num_partition) { + { + std::lock_guard lock(local_state._shared_state->buffer_mutex); + local_state._shared_state->blocks_buffer.push(std::move(*input_block)); + // buffer have data, source could read this. + local_state._dependency->set_ready_for_read(); + } + } else { + RETURN_IF_ERROR( + _split_block_by_partition(input_block, state->batch_size(), local_state)); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR( + state->check_query_state("VPartitionSortNode, while split input block.")); + input_block->clear_column_data(); + } + } + } + + if (source_state == SourceState::FINISHED) { + //seems could free for hashtable + local_state._agg_arena_pool.reset(nullptr); + local_state._partitioned_data.reset(nullptr); + for (int i = 0; i < local_state._value_places.size(); ++i) { + auto sorter = vectorized::PartitionSorter::create_unique( + _vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, + _child_x->row_desc(), state, i == 0 ? local_state._profile : nullptr, + _has_global_limit, _partition_inner_limit, _top_n_algorithm, + local_state._shared_state->previous_row.get()); + + DCHECK(_child_x->row_desc().num_materialized_slots() == + local_state._value_places[i]->blocks.back()->columns()); + //get blocks from every partition, and sorter get those data. + for (const auto& block : local_state._value_places[i]->blocks) { + RETURN_IF_ERROR(sorter->append_block(block.get())); + } + sorter->init_profile(local_state._profile); + RETURN_IF_ERROR(sorter->prepare_for_read()); + local_state._shared_state->partition_sorts.push_back(std::move(sorter)); + } + + COUNTER_SET(local_state._hash_table_size_counter, int64_t(local_state._num_partition)); + //so all data from child have sink completed + local_state._dependency->set_ready_for_read(); + } + + return Status::OK(); +} + +Status PartitionSortSinkOperatorX::_split_block_by_partition( + vectorized::Block* input_block, int batch_size, PartitionSortSinkLocalState& local_state) { + for (int i = 0; i < _partition_exprs_num; ++i) { + int result_column_id = -1; + RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, &result_column_id)); + DCHECK(result_column_id != -1); + local_state._partition_columns[i] = + input_block->get_by_position(result_column_id).column.get(); + } + _emplace_into_hash_table(local_state._partition_columns, input_block, batch_size, local_state); + return Status::OK(); +} + +void PartitionSortSinkOperatorX::_emplace_into_hash_table( + const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* input_block, + int batch_size, PartitionSortSinkLocalState& local_state) { + std::visit( + [&](auto&& agg_method) -> void { + SCOPED_TIMER(local_state._build_timer); + using HashMethodType = std::decay_t; + using HashTableType = std::decay_t; + using AggState = typename HashMethodType::State; + + AggState state(key_columns, local_state._partition_key_sz, nullptr); + size_t num_rows = input_block->rows(); + _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); + + //PHHashMap + if constexpr (HashTableTraits::is_phmap) { + if (local_state._hash_values.size() < num_rows) { + local_state._hash_values.resize(num_rows); + } + if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits< + AggState>::value) { + for (size_t i = 0; i < num_rows; ++i) { + local_state._hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + } + } else { + for (size_t i = 0; i < num_rows; ++i) { + local_state._hash_values[i] = agg_method.data.hash( + state.get_key_holder(i, *local_state._agg_arena_pool)); + } + } + } + + for (size_t row = 0; row < num_rows; ++row) { + SCOPED_TIMER(local_state._emplace_key_timer); + vectorized::PartitionDataPtr aggregate_data = nullptr; + auto emplace_result = [&]() { + if constexpr (HashTableTraits::is_phmap) { + if (LIKELY(row + HASH_MAP_PREFETCH_DIST < num_rows)) { + agg_method.data.prefetch_by_hash( + local_state._hash_values[row + HASH_MAP_PREFETCH_DIST]); + } + return state.emplace_key(agg_method.data, local_state._hash_values[row], + row, *local_state._agg_arena_pool); + } else { + return state.emplace_key(agg_method.data, row, + *local_state._agg_arena_pool); + } + }(); + + /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. + if (emplace_result.is_inserted()) { + /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. + emplace_result.set_mapped(nullptr); + aggregate_data = _pool->add(new vectorized::PartitionBlocks()); + emplace_result.set_mapped(aggregate_data); + local_state._value_places.push_back(aggregate_data); + local_state._num_partition++; + } else { + aggregate_data = emplace_result.get_mapped(); + } + assert(aggregate_data != nullptr); + aggregate_data->add_row_idx(row); + } + for (auto place : local_state._value_places) { + SCOPED_TIMER(local_state._selector_block_timer); + place->append_block_by_selector(input_block, _child_x->row_desc(), + _has_global_limit, _partition_inner_limit, + batch_size); + } + }, + local_state._partitioned_data->method_variant); +} + +void PartitionSortSinkLocalState::_init_hash_method() { + if (_partition_exprs_num == 0) { + return; + } else if (_partition_exprs_num == 1) { + auto is_nullable = _partition_expr_ctxs[0]->root()->is_nullable(); + switch (_partition_expr_ctxs[0]->root()->result_type()) { + case TYPE_TINYINT: + case TYPE_BOOLEAN: + _partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int8_key, + is_nullable); + return; + case TYPE_SMALLINT: + _partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int16_key, + is_nullable); + return; + case TYPE_INT: + case TYPE_FLOAT: + case TYPE_DATEV2: + _partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int32_key, + is_nullable); + return; + case TYPE_BIGINT: + case TYPE_DOUBLE: + case TYPE_DATE: + case TYPE_DATETIME: + case TYPE_DATETIMEV2: + _partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int64_key, + is_nullable); + return; + case TYPE_LARGEINT: { + _partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int128_key, + is_nullable); + return; + } + case TYPE_DECIMALV2: + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128I: { + vectorized::DataTypePtr& type_ptr = _partition_expr_ctxs[0]->root()->data_type(); + vectorized::TypeIndex idx = + is_nullable ? assert_cast(*type_ptr) + .get_nested_type() + ->get_type_id() + : type_ptr->get_type_id(); + vectorized::WhichDataType which(idx); + if (which.is_decimal32()) { + _partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int32_key, + is_nullable); + } else if (which.is_decimal64()) { + _partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int64_key, + is_nullable); + } else { + _partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int128_key, + is_nullable); + } + return; + } + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: { + _partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::string_key, + is_nullable); + break; + } + default: + _partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::serialized); + } + } else { + bool use_fixed_key = true; + bool has_null = false; + size_t key_byte_size = 0; + size_t bitmap_size = vectorized::get_bitmap_size(_partition_exprs_num); + + _partition_key_sz.resize(_partition_exprs_num); + for (int i = 0; i < _partition_exprs_num; ++i) { + const auto& data_type = _partition_expr_ctxs[i]->root()->data_type(); + + if (!data_type->have_maximum_size_of_value()) { + use_fixed_key = false; + break; + } + + auto is_null = data_type->is_nullable(); + has_null |= is_null; + _partition_key_sz[i] = + data_type->get_maximum_size_of_value_in_memory() - (is_null ? 1 : 0); + key_byte_size += _partition_key_sz[i]; + } + + if (bitmap_size + key_byte_size > sizeof(vectorized::UInt256)) { + use_fixed_key = false; + } + + if (use_fixed_key) { + if (has_null) { + if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt64)) { + _partitioned_data->init( + vectorized::PartitionedHashMapVariants::Type::int64_keys, has_null); + } else if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt128)) { + _partitioned_data->init( + vectorized::PartitionedHashMapVariants::Type::int128_keys, has_null); + } else { + _partitioned_data->init( + vectorized::PartitionedHashMapVariants::Type::int256_keys, has_null); + } + } else { + if (key_byte_size <= sizeof(vectorized::UInt64)) { + _partitioned_data->init( + vectorized::PartitionedHashMapVariants::Type::int64_keys, has_null); + } else if (key_byte_size <= sizeof(vectorized::UInt128)) { + _partitioned_data->init( + vectorized::PartitionedHashMapVariants::Type::int128_keys, has_null); + } else { + _partitioned_data->init( + vectorized::PartitionedHashMapVariants::Type::int256_keys, has_null); + } + } + } else { + _partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::serialized); + } + } +} + +} // namespace pipeline +} // namespace doris diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index ddcbebbb9d..7dbe616fd6 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -19,7 +19,11 @@ #include +#include + #include "operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/common/sort/partition_sorter.h" #include "vec/exec/vpartition_sort_node.h" namespace doris { @@ -46,9 +50,90 @@ public: bool can_write() override { return true; } }; -OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() { - return std::make_shared(this, _node); -} +class PartitionSortSinkOperatorX; +class PartitionSortSinkLocalState : public PipelineXSinkLocalState { + ENABLE_FACTORY_CREATOR(PartitionSortSinkLocalState); + +public: + PartitionSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : PipelineXSinkLocalState(parent, state) {} + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + +private: + friend class PartitionSortSinkOperatorX; + + // Expressions and parameters used for build _sort_description + vectorized::VSortExecExprs _vsort_exec_exprs; + vectorized::VExprContextSPtrs _partition_expr_ctxs; + int64_t child_input_rows = 0; + std::vector _value_places; + int _num_partition = 0; + std::vector _partition_columns; + std::vector _hash_values; + std::unique_ptr _partitioned_data; + std::unique_ptr _agg_arena_pool; + std::vector _partition_key_sz; + int _partition_exprs_num = 0; + + RuntimeProfile::Counter* _build_timer; + RuntimeProfile::Counter* _emplace_key_timer; + RuntimeProfile::Counter* _partition_sort_timer; + RuntimeProfile::Counter* _get_sorted_timer; + RuntimeProfile::Counter* _selector_block_timer; + + RuntimeProfile::Counter* _hash_table_size_counter; + void _init_hash_method(); +}; + +class PartitionSortSinkOperatorX final : public DataSinkOperatorX { +public: + PartitionSortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs); + Status init(const TDataSink& tsink) override { + return Status::InternalError("{} should not init with TPlanNode", + DataSinkOperatorX::_name); + } + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override; + +private: + friend class PartitionSortSinkLocalState; + ObjectPool* _pool; + const RowDescriptor _row_descriptor; + int64_t _limit = -1; + int _partition_exprs_num = 0; + vectorized::VExprContextSPtrs _partition_expr_ctxs; + + // Expressions and parameters used for build _sort_description + vectorized::VSortExecExprs _vsort_exec_exprs; + std::vector _is_asc_order; + std::vector _nulls_first; + TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER; + bool _has_global_limit = false; + int64_t _partition_inner_limit = 0; + + Status _split_block_by_partition(vectorized::Block* input_block, int batch_size, + PartitionSortSinkLocalState& local_state); + void _emplace_into_hash_table(const vectorized::ColumnRawPtrs& key_columns, + const vectorized::Block* input_block, int batch_size, + PartitionSortSinkLocalState& local_state); + template + void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method, + const vectorized::ColumnRawPtrs& key_columns, + const size_t num_rows) { + if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits< + AggState>::value) { + (agg_method.serialize_keys(key_columns, num_rows)); + state.set_serialized_keys(agg_method.keys.data()); + } + } +}; } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp new file mode 100644 index 0000000000..b5370d2ca5 --- /dev/null +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "partition_sort_source_operator.h" + +#include "pipeline/exec/operator.h" + +namespace doris { +class ExecNode; +class RuntimeState; + +namespace pipeline { + +OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() { + return std::make_shared(this, _node); +} + +Status PartitionSortSourceLocalState::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); + } + _shared_state->previous_row = nullptr; + _shared_state->partition_sorts.clear(); + return PipelineXLocalState::close(state); +} + +Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* output_block, + SourceState& source_state) { + RETURN_IF_CANCELLED(state); + auto& local_state = state->get_local_state(id())->cast(); + output_block->clear_column_data(); + { + std::lock_guard lock(local_state._shared_state->buffer_mutex); + if (local_state._shared_state->blocks_buffer.empty() == false) { + local_state._shared_state->blocks_buffer.front().swap(*output_block); + local_state._shared_state->blocks_buffer.pop(); + //if buffer have no data, block reading and wait for signal again + if (local_state._shared_state->blocks_buffer.empty()) { + local_state._dependency->block_reading(); + } + return Status::OK(); + } + } + + // is_ready_for_read: this is set by sink node using: local_state._dependency->set_ready_for_read() + RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state)); + { + std::lock_guard lock(local_state._shared_state->buffer_mutex); + if (local_state._shared_state->blocks_buffer.empty() && + local_state._shared_state->sort_idx >= + local_state._shared_state->partition_sorts.size()) { + source_state = SourceState::FINISHED; + } + } + return Status::OK(); +} + +Dependency* PartitionSortSourceOperatorX::wait_for_dependency(RuntimeState* state) { + auto& local_state = state->get_local_state(id())->cast(); + return local_state._dependency->read_blocked_by(); +} + +Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, + vectorized::Block* output_block, + PartitionSortSourceLocalState& local_state) { + SCOPED_TIMER(local_state._get_sorted_timer); + //sorter output data one by one + bool current_eos = false; + if (local_state._shared_state->sort_idx < local_state._shared_state->partition_sorts.size()) { + RETURN_IF_ERROR( + local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx] + ->get_next(state, output_block, ¤t_eos)); + } + if (current_eos) { + //current sort have eos, so get next idx + local_state._shared_state->previous_row->reset(); + auto rows = local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx] + ->get_output_rows(); + local_state._num_rows_returned += rows; + local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx].reset( + nullptr); + local_state._shared_state->sort_idx++; + } + + return Status::OK(); +} + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index bd55c42e4b..859e2d8b58 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -21,6 +21,7 @@ #include "common/status.h" #include "operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/exec/vpartition_sort_node.h" namespace doris { @@ -48,9 +49,53 @@ public: Status open(RuntimeState*) override { return Status::OK(); } }; -OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() { - return std::make_shared(this, _node); -} +class PartitionSortSourceOperatorX; +class PartitionSortSourceLocalState final : public PipelineXLocalState { + ENABLE_FACTORY_CREATOR(PartitionSortSourceLocalState); + +public: + using Base = PipelineXLocalState; + PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent) + : PipelineXLocalState(state, parent), + _get_next_timer(nullptr) {} + + Status init(RuntimeState* state, LocalStateInfo& info) override { + RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); + _get_next_timer = ADD_TIMER(profile(), "GetResultTime"); + _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime"); + _shared_state->previous_row = std::make_unique(); + return Status::OK(); + } + + Status close(RuntimeState* state) override; + + int64_t _num_rows_returned = 0; + +private: + friend class PartitionSortSourceOperatorX; + RuntimeProfile::Counter* _get_sorted_timer; + RuntimeProfile::Counter* _get_next_timer = nullptr; +}; + +class PartitionSortSourceOperatorX final : public OperatorX { +public: + using Base = OperatorX; + PartitionSortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : OperatorX(pool, tnode, descs) {} + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + + Dependency* wait_for_dependency(RuntimeState* state) override; + + bool is_source() const override { return true; } + +private: + friend class PartitionSortSourceLocalState; + Status get_sorted_block(RuntimeState* state, vectorized::Block* output_block, + PartitionSortSourceLocalState& local_state); +}; } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index e52b995b1d..97563d56af 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -45,7 +45,7 @@ Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* bl auto& local_state = state->get_local_state(id())->cast(); SCOPED_TIMER(local_state.profile()->total_time_counter()); SCOPED_TIMER(local_state._get_next_timer); - bool eos; + bool eos = false; RETURN_IF_ERROR_OR_CATCH_EXCEPTION( local_state._shared_state->sorter->get_next(state, block, &eos)); if (eos) { diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index ebc6368226..6b5237c7cc 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -157,21 +157,12 @@ Status StreamingAggSinkLocalState::do_pre_agg(vectorized::Block* input_block, // pre stream agg need use _num_row_return to decide whether to do pre stream agg _num_rows_returned += output_block->rows(); - _make_nullable_output_key(output_block); + _dependency->_make_nullable_output_key(output_block); // COUNTER_SET(_rows_returned_counter, _num_rows_returned); _executor.update_memusage(); return Status::OK(); } -void StreamingAggSinkLocalState::_make_nullable_output_key(vectorized::Block* block) { - if (block->rows() != 0) { - for (auto cid : _dependency->make_nullable_keys()) { - block->get_by_position(cid).column = make_nullable(block->get_by_position(cid).column); - block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type); - } - } -} - bool StreamingAggSinkLocalState::_should_expand_preagg_hash_tables() { if (!_should_expand_hash_table) { return false; diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index a149a44c67..30164679a6 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -91,7 +91,6 @@ private: Status _pre_agg_with_serialized_key(doris::vectorized::Block* in_block, doris::vectorized::Block* out_block); - void _make_nullable_output_key(vectorized::Block* block); bool _should_expand_preagg_hash_tables(); vectorized::Block _preagg_block = vectorized::Block(); diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 965a547c22..50eb1e85b4 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -17,12 +17,16 @@ #pragma once +#include + #include "pipeline/exec/data_queue.h" +#include "vec/common/sort/partition_sorter.h" #include "vec/common/sort/sorter.h" #include "vec/exec/join/process_hash_table_probe.h" #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/vaggregation_node.h" #include "vec/exec/vanalytic_eval_node.h" +#include "vec/exec/vpartition_sort_node.h" namespace doris { namespace pipeline { @@ -64,6 +68,8 @@ public: _ready_for_read = true; } + bool is_ready_for_read() { return _ready_for_read; } + // Notify downstream pipeline tasks this dependency is blocked. virtual void block_reading() { _ready_for_read = false; } @@ -321,7 +327,15 @@ public: void set_make_nullable_keys(std::vector& make_nullable_keys) { _make_nullable_keys = make_nullable_keys; } - + void _make_nullable_output_key(vectorized::Block* block) { + if (block->rows() != 0) { + for (auto cid : _make_nullable_keys) { + block->get_by_position(cid).column = + make_nullable(block->get_by_position(cid).column); + block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type); + } + } + } const std::vector& make_nullable_keys() { return _make_nullable_keys; } void release_tracker(); @@ -520,5 +534,27 @@ private: NestedLoopJoinSharedState _join_state; }; +struct PartitionSortNodeSharedState { +public: + std::queue blocks_buffer; + std::mutex buffer_mutex; + std::vector> partition_sorts; + std::unique_ptr previous_row = nullptr; + int sort_idx = 0; +}; + +class PartitionSortDependency final : public WriteDependency { +public: + using SharedState = PartitionSortNodeSharedState; + PartitionSortDependency(int id) : WriteDependency(id, "PartitionSortDependency") {} + ~PartitionSortDependency() override = default; + void* shared_state() override { return (void*)&_partition_sort_state; }; + void set_ready_for_write() override {} + void block_writing() override {} + +private: + PartitionSortNodeSharedState _partition_sort_state; +}; + } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 64e5fc5d46..e1eff50000 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -24,6 +24,7 @@ #include "pipeline/exec/analytic_sink_operator.h" #include "pipeline/exec/analytic_source_operator.h" #include "pipeline/exec/assert_num_rows_operator.h" +#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h" #include "pipeline/exec/empty_set_operator.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" @@ -32,6 +33,8 @@ #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_scan_operator.h" +#include "pipeline/exec/partition_sort_sink_operator.h" +#include "pipeline/exec/partition_sort_source_operator.h" #include "pipeline/exec/repeat_operator.h" #include "pipeline/exec/result_sink_operator.h" #include "pipeline/exec/select_operator.h" @@ -314,9 +317,11 @@ DECLARE_OPERATOR_X(AnalyticSinkLocalState) DECLARE_OPERATOR_X(SortSinkLocalState) DECLARE_OPERATOR_X(BlockingAggSinkLocalState) DECLARE_OPERATOR_X(StreamingAggSinkLocalState) +DECLARE_OPERATOR_X(DistinctStreamingAggSinkLocalState) DECLARE_OPERATOR_X(ExchangeSinkLocalState) DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState) DECLARE_OPERATOR_X(UnionSinkLocalState) +DECLARE_OPERATOR_X(PartitionSortSinkLocalState) #undef DECLARE_OPERATOR_X @@ -332,6 +337,7 @@ DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState) DECLARE_OPERATOR_X(AssertNumRowsLocalState) DECLARE_OPERATOR_X(EmptySetLocalState) DECLARE_OPERATOR_X(UnionSourceLocalState) +DECLARE_OPERATOR_X(PartitionSortSourceLocalState) #undef DECLARE_OPERATOR_X @@ -349,6 +355,7 @@ template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; +template class PipelineXSinkLocalState; template class PipelineXLocalState; template class PipelineXLocalState; @@ -357,5 +364,6 @@ template class PipelineXLocalState; template class PipelineXLocalState; template class PipelineXLocalState; template class PipelineXLocalState; +template class PipelineXLocalState; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 8c55d004a2..8af1419729 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -48,6 +48,8 @@ #include "pipeline/exec/assert_num_rows_operator.h" #include "pipeline/exec/data_queue.h" #include "pipeline/exec/datagen_operator.h" +#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h" +#include "pipeline/exec/distinct_streaming_aggregation_source_operator.h" #include "pipeline/exec/empty_set_operator.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" @@ -56,6 +58,8 @@ #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_scan_operator.h" +#include "pipeline/exec/partition_sort_sink_operator.h" +#include "pipeline/exec/partition_sort_source_operator.h" #include "pipeline/exec/repeat_operator.h" #include "pipeline/exec/result_sink_operator.h" #include "pipeline/exec/scan_operator.h" @@ -498,8 +502,22 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN break; } case TPlanNodeType::AGGREGATION_NODE: { - if (tnode.agg_node.__isset.use_streaming_preaggregation && - tnode.agg_node.use_streaming_preaggregation) { + if (tnode.agg_node.aggregate_functions.empty()) { + op.reset(new DistinctStreamingAggSourceOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (_dag.find(downstream_pipeline_id) == _dag.end()) { + _dag.insert({downstream_pipeline_id, {}}); + } + cur_pipe = add_pipeline(); + _dag[downstream_pipeline_id].push_back(cur_pipe->id()); + DataSinkOperatorXPtr sink; + sink.reset(new DistinctStreamingAggSinkOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->set_sink(sink)); + RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + } else if (tnode.agg_node.__isset.use_streaming_preaggregation && + tnode.agg_node.use_streaming_preaggregation) { op.reset(new StreamingAggSourceOperatorX(pool, tnode, descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -610,6 +628,23 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); break; } + case doris::TPlanNodeType::PARTITION_SORT_NODE: { + op.reset(new PartitionSortSourceOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (_dag.find(downstream_pipeline_id) == _dag.end()) { + _dag.insert({downstream_pipeline_id, {}}); + } + cur_pipe = add_pipeline(); + _dag[downstream_pipeline_id].push_back(cur_pipe->id()); + + DataSinkOperatorXPtr sink; + sink.reset(new PartitionSortSinkOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->set_sink(sink)); + RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + break; + } case TPlanNodeType::ANALYTIC_EVAL_NODE: { op.reset(new AnalyticSourceOperatorX(pool, tnode, descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); diff --git a/regression-test/data/pipelineX/test_distinct_streaming_agg_operator.out b/regression-test/data/pipelineX/test_distinct_streaming_agg_operator.out new file mode 100644 index 0000000000..7d0eb9fd4f --- /dev/null +++ b/regression-test/data/pipelineX/test_distinct_streaming_agg_operator.out @@ -0,0 +1,69 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !pipeline_1 -- +\N \N \N +1 1989 1001 +2 1986 1001 +3 1989 1002 +4 1991 3021 +5 1985 5014 +6 32767 3021 +7 -32767 1002 +8 255 2147483647 +9 1991 -2147483647 +10 1991 5014 +11 1989 25699 +12 32767 -2147483647 +13 -32767 2147483647 +14 255 103 +15 1992 3021 + +-- !pipeline_2 -- +\N +-2147483647 +103 +1001 +1002 +3021 +5014 +25699 +2147483647 + +-- !pipeline_3 -- +\N +false +true + +-- !pipelineX_1 -- +\N \N \N +1 1989 1001 +2 1986 1001 +3 1989 1002 +4 1991 3021 +5 1985 5014 +6 32767 3021 +7 -32767 1002 +8 255 2147483647 +9 1991 -2147483647 +10 1991 5014 +11 1989 25699 +12 32767 -2147483647 +13 -32767 2147483647 +14 255 103 +15 1992 3021 + +-- !pipelineX_2 -- +\N +-2147483647 +103 +1001 +1002 +3021 +5014 +25699 +2147483647 + +-- !pipelineX_3 -- +\N +false +true + diff --git a/regression-test/data/pipelineX/test_partition_sort_operator.out b/regression-test/data/pipelineX/test_partition_sort_operator.out new file mode 100644 index 0000000000..3771a563f9 --- /dev/null +++ b/regression-test/data/pipelineX/test_partition_sort_operator.out @@ -0,0 +1,71 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !pipeline_1 -- +\N \N 1 +false -32767 1 +false -32767 2 +false 255 3 +false 1986 4 +true 255 1 +true 1985 2 +true 1989 3 +true 1989 4 + +-- !pipeline_2 -- +\N \N 1 +false -32767 1 +false -32767 1 +false 255 3 +false 1986 4 +true 255 1 +true 1985 2 +true 1989 3 +true 1989 3 + +-- !pipeline_3 -- +\N \N 1 +false -32767 1 +false -32767 1 +false 255 2 +false 1986 3 +false 1989 4 +true 255 1 +true 1985 2 +true 1989 3 +true 1989 3 +true 1991 4 + +-- !pipelineX_1 -- +\N \N 1 +false -32767 1 +false -32767 2 +false 255 3 +false 1986 4 +true 255 1 +true 1985 2 +true 1989 3 +true 1989 4 + +-- !pipelineX_2 -- +\N \N 1 +false -32767 1 +false -32767 1 +false 255 3 +false 1986 4 +true 255 1 +true 1985 2 +true 1989 3 +true 1989 3 + +-- !pipelineX_3 -- +\N \N 1 +false -32767 1 +false -32767 1 +false 255 2 +false 1986 3 +false 1989 4 +true 255 1 +true 1985 2 +true 1989 3 +true 1989 3 +true 1991 4 + diff --git a/regression-test/suites/pipelineX/test_distinct_streaming_agg_operator.groovy b/regression-test/suites/pipelineX/test_distinct_streaming_agg_operator.groovy new file mode 100644 index 0000000000..f0337fcfc7 --- /dev/null +++ b/regression-test/suites/pipelineX/test_distinct_streaming_agg_operator.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_distinct_streaming_agg_operator") { + def dbName = "test_distinct_streaming_agg_operator" + sql "DROP DATABASE IF EXISTS ${dbName}" + sql "CREATE DATABASE ${dbName}" + sql "USE $dbName" + sql """ DROP TABLE IF EXISTS baseall """ + sql """ + CREATE TABLE IF NOT EXISTS baseall ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ set forbid_unknown_col_stats = false """ + streamLoad { + table "baseall" + db dbName + set 'column_separator', ',' + file "../query_p0/baseall.txt" + } + + sql"""set enable_pipeline_engine = true; """ + + qt_pipeline_1 """ + select * from ( select k1,k2,k3 from baseall union select k1,k2,k3 from baseall) as t ORDER BY 1, 2,3; + """ + qt_pipeline_2 """ + select k3 from baseall group by k3 order by k3; + """ + + qt_pipeline_3 """ + select k6 from baseall group by k6 order by k6; + """ + + sql"""set experimental_enable_pipeline_x_engine=true; """ + + qt_pipelineX_1 """ + select * from ( select k1,k2,k3 from baseall union select k1,k2,k3 from baseall) as t ORDER BY 1, 2,3; + """ + qt_pipelineX_2 """ + select k3 from baseall group by k3 order by k3; + """ + qt_pipelineX_3 """ + select k6 from baseall group by k6 order by k6; + """ + + +} \ No newline at end of file diff --git a/regression-test/suites/pipelineX/test_partition_sort_operator.groovy b/regression-test/suites/pipelineX/test_partition_sort_operator.groovy new file mode 100644 index 0000000000..22fe5bc20c --- /dev/null +++ b/regression-test/suites/pipelineX/test_partition_sort_operator.groovy @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partition_sort_operator") { + def dbName = "test_partition_sort_operator" + sql "DROP DATABASE IF EXISTS ${dbName}" + sql "CREATE DATABASE ${dbName}" + sql "USE $dbName" + sql """ DROP TABLE IF EXISTS baseall """ + sql """ + CREATE TABLE IF NOT EXISTS baseall ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ set forbid_unknown_col_stats = false """ + streamLoad { + table "baseall" + db dbName + set 'column_separator', ',' + file "../query_p0/baseall.txt" + } + + sql"""set enable_pipeline_engine = true; """ + + qt_pipeline_1 """ + select * from (select k6,k2,row_number() over(partition by k6 order by k2) as num from baseall) as res where num < 5 + ORDER BY 1, 2,3; + """ + qt_pipeline_2 """ + select * from (select k6,k2,rank() over(partition by k6 order by k2) as num from baseall) as res where num < 5 + ORDER BY 1, 2,3; + """ + + qt_pipeline_3 """ + select * from (select k6,k2,dense_rank() over(partition by k6 order by k2) as num from baseall) as res where num < 5 + ORDER BY 1, 2,3; + """ + + sql"""set experimental_enable_pipeline_x_engine=true; """ + + qt_pipelineX_1 """ + select * from (select k6,k2,row_number() over(partition by k6 order by k2) as num from baseall) as res where num < 5 + ORDER BY 1, 2,3; + """ + qt_pipelineX_2 """ + select * from (select k6,k2,rank() over(partition by k6 order by k2) as num from baseall) as res where num < 5 + ORDER BY 1, 2,3; + """ + qt_pipelineX_3 """ + select * from (select k6,k2,dense_rank() over(partition by k6 order by k2) as num from baseall) as res where num < 5 + ORDER BY 1, 2,3; + """ + + +} \ No newline at end of file