From ece5f8e86c26cb8317402a38d239159f57ad7165 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 15 Jan 2024 21:46:31 +0800 Subject: [PATCH] [pipelineX](fix) Fix input data distribution for distinct streaming agg (#29980) --- ...inct_streaming_aggregation_sink_operator.h | 5 ++++ .../local_exchange_sink_operator.h | 24 ++++++++++-------- .../local_exchange/local_exchanger.cpp | 25 ++++++++++--------- be/src/pipeline/pipeline_x/operator.h | 3 ++- .../pipeline_x_fragment_context.cpp | 5 ++-- .../pipeline_x/pipeline_x_fragment_context.h | 1 + 6 files changed, 37 insertions(+), 26 deletions(-) diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h index 6607516d6c..900aa78663 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h @@ -112,6 +112,11 @@ public: SourceState source_state) override; DataDistribution required_data_distribution() const override { + if (_needs_finalize) { + return _is_colocate + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + } return DataSinkOperatorX::required_data_distribution(); } }; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index 7275e54520..7ed7d7d0e9 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -86,13 +86,11 @@ public: using Base = DataSinkOperatorX; LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions, const std::vector& texprs, - const std::map& bucket_seq_to_instance_idx, - const std::map& shuffle_idx_to_instance_idx) + const std::map& bucket_seq_to_instance_idx) : Base(sink_id, dest_id, dest_id), _num_partitions(num_partitions), _texprs(texprs), - _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx), - _shuffle_idx_to_instance_idx(shuffle_idx_to_instance_idx) {} + _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {} Status init(const TPlanNode& tnode, RuntimeState* state) override { return Status::InternalError("{} should not init with TPlanNode", Base::_name); @@ -102,8 +100,8 @@ public: return Status::InternalError("{} should not init with TPlanNode", Base::_name); } - Status init(ExchangeType type, const int num_buckets, - const bool is_shuffled_hash_join) override { + Status init(ExchangeType type, const int num_buckets, const bool is_shuffled_hash_join, + const std::map& shuffle_idx_to_instance_idx) override { _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; _type = type; if (_type == ExchangeType::HASH_SHUFFLE) { @@ -111,10 +109,16 @@ public: // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, // we should use map shuffle idx to instance idx because all instances will be // distributed to all BEs. Otherwise, we should use shuffle idx directly. - if (!is_shuffled_hash_join) { - _shuffle_idx_to_instance_idx.clear(); + if (is_shuffled_hash_join) { + std::for_each(shuffle_idx_to_instance_idx.begin(), + shuffle_idx_to_instance_idx.end(), [&](const auto& item) { + DCHECK(item.first != -1); + _shuffle_idx_to_instance_idx.push_back({item.first, item.second}); + }); + } else { + _shuffle_idx_to_instance_idx.resize(_num_partitions); for (int i = 0; i < _num_partitions; i++) { - _shuffle_idx_to_instance_idx.insert({i, i}); + _shuffle_idx_to_instance_idx[i] = {i, i}; } } _partitioner.reset( @@ -156,7 +160,7 @@ private: const std::vector& _texprs; std::unique_ptr _partitioner; const std::map _bucket_seq_to_instance_idx; - std::map _shuffle_idx_to_instance_idx; + std::vector> _shuffle_idx_to_instance_idx; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 900e31e663..1f680fdd1d 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -116,26 +116,26 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest return Status::OK(); } local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes()); - new_block_wrapper->ref(_num_partitions); if (get_type() == ExchangeType::HASH_SHUFFLE) { - auto map = local_state._parent->cast() - ._shuffle_idx_to_instance_idx; - for (size_t i = 0; i < _num_partitions; i++) { - DCHECK(map.contains(i)) << " i: " << i << " _num_partitions: " << _num_partitions - << " map.size(): " << map.size(); - DCHECK(map[i] >= 0 && map[i] < _num_partitions) << map[i] << " " << _num_partitions; - size_t start = local_state._partition_rows_histogram[i]; - size_t size = local_state._partition_rows_histogram[i + 1] - start; + const auto& map = local_state._parent->cast() + ._shuffle_idx_to_instance_idx; + new_block_wrapper->ref(map.size()); + for (const auto& it : map) { + DCHECK(it.second >= 0 && it.second < _num_partitions) + << it.first << " : " << it.second << " " << _num_partitions; + size_t start = local_state._partition_rows_histogram[it.first]; + size_t size = local_state._partition_rows_histogram[it.first + 1] - start; if (size > 0) { local_state._shared_state->add_mem_usage( - map[i], new_block_wrapper->data_block.allocated_bytes(), false); - data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}}); - local_state._shared_state->set_ready_to_read(map[i]); + it.second, new_block_wrapper->data_block.allocated_bytes(), false); + data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}}); + local_state._shared_state->set_ready_to_read(it.second); } else { new_block_wrapper->unref(local_state._shared_state); } } } else if (_num_senders != _num_sources || _ignore_source_data_distribution) { + new_block_wrapper->ref(_num_partitions); for (size_t i = 0; i < _num_partitions; i++) { size_t start = local_state._partition_rows_histogram[i]; size_t size = local_state._partition_rows_histogram[i + 1] - start; @@ -149,6 +149,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest } } } else { + new_block_wrapper->ref(_num_partitions); auto map = local_state._parent->cast()._bucket_seq_to_instance_idx; for (size_t i = 0; i < _num_partitions; i++) { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 6792ce35f3..d46dc859b0 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -470,7 +470,8 @@ public: Status init(const TDataSink& tsink) override; [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets, - const bool is_shuffled_hash_join) { + const bool is_shuffled_hash_join, + const std::map& shuffle_idx_to_instance_idx) { return Status::InternalError("init() is only implemented in local exchange!"); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index e2f1d9742b..a44db66745 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -733,11 +733,10 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( : cur_pipe->sink_x()->is_shuffled_hash_join(); sink.reset(new LocalExchangeSinkOperatorX( sink_id, local_exchange_id, is_shuffled_hash_join ? _total_instances : _num_instances, - data_distribution.partition_exprs, bucket_seq_to_instance_idx, - shuffle_idx_to_instance_idx)); + data_distribution.partition_exprs, bucket_seq_to_instance_idx)); RETURN_IF_ERROR(new_pip->set_sink(sink)); RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets, - is_shuffled_hash_join)); + is_shuffled_hash_join, shuffle_idx_to_instance_idx)); // 2. Create and initialize LocalExchangeSharedState. auto shared_state = LocalExchangeSharedState::create_shared(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 92178d359d..439b0072d7 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -244,6 +244,7 @@ private: std::vector> _runtime_filter_states; + // Total instance num running on all BEs int _total_instances = -1; };