[pipelineX](fix) Fix input data distribution for distinct streaming agg (#29980)

This commit is contained in:
Gabriel
2024-01-15 21:46:31 +08:00
committed by yiguolei
parent c7d3b833c5
commit ece5f8e86c
6 changed files with 37 additions and 26 deletions

View File

@ -112,6 +112,11 @@ public:
SourceState source_state) override;
DataDistribution required_data_distribution() const override {
if (_needs_finalize) {
return _is_colocate
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return DataSinkOperatorX<DistinctStreamingAggSinkLocalState>::required_data_distribution();
}
};

View File

@ -86,13 +86,11 @@ public:
using Base = DataSinkOperatorX<LocalExchangeSinkLocalState>;
LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions,
const std::vector<TExpr>& texprs,
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx)
const std::map<int, int>& bucket_seq_to_instance_idx)
: Base(sink_id, dest_id, dest_id),
_num_partitions(num_partitions),
_texprs(texprs),
_bucket_seq_to_instance_idx(bucket_seq_to_instance_idx),
_shuffle_idx_to_instance_idx(shuffle_idx_to_instance_idx) {}
_bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {}
Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
@ -102,8 +100,8 @@ public:
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
}
Status init(ExchangeType type, const int num_buckets,
const bool is_shuffled_hash_join) override {
Status init(ExchangeType type, const int num_buckets, const bool is_shuffled_hash_join,
const std::map<int, int>& shuffle_idx_to_instance_idx) override {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
@ -111,10 +109,16 @@ public:
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
// we should use map shuffle idx to instance idx because all instances will be
// distributed to all BEs. Otherwise, we should use shuffle idx directly.
if (!is_shuffled_hash_join) {
_shuffle_idx_to_instance_idx.clear();
if (is_shuffled_hash_join) {
std::for_each(shuffle_idx_to_instance_idx.begin(),
shuffle_idx_to_instance_idx.end(), [&](const auto& item) {
DCHECK(item.first != -1);
_shuffle_idx_to_instance_idx.push_back({item.first, item.second});
});
} else {
_shuffle_idx_to_instance_idx.resize(_num_partitions);
for (int i = 0; i < _num_partitions; i++) {
_shuffle_idx_to_instance_idx.insert({i, i});
_shuffle_idx_to_instance_idx[i] = {i, i};
}
}
_partitioner.reset(
@ -156,7 +160,7 @@ private:
const std::vector<TExpr>& _texprs;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
const std::map<int, int> _bucket_seq_to_instance_idx;
std::map<int, int> _shuffle_idx_to_instance_idx;
std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
};
} // namespace doris::pipeline

View File

@ -116,26 +116,26 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
return Status::OK();
}
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
new_block_wrapper->ref(_num_partitions);
if (get_type() == ExchangeType::HASH_SHUFFLE) {
auto map = local_state._parent->cast<LocalExchangeSinkOperatorX>()
._shuffle_idx_to_instance_idx;
for (size_t i = 0; i < _num_partitions; i++) {
DCHECK(map.contains(i)) << " i: " << i << " _num_partitions: " << _num_partitions
<< " map.size(): " << map.size();
DCHECK(map[i] >= 0 && map[i] < _num_partitions) << map[i] << " " << _num_partitions;
size_t start = local_state._partition_rows_histogram[i];
size_t size = local_state._partition_rows_histogram[i + 1] - start;
const auto& map = local_state._parent->cast<LocalExchangeSinkOperatorX>()
._shuffle_idx_to_instance_idx;
new_block_wrapper->ref(map.size());
for (const auto& it : map) {
DCHECK(it.second >= 0 && it.second < _num_partitions)
<< it.first << " : " << it.second << " " << _num_partitions;
size_t start = local_state._partition_rows_histogram[it.first];
size_t size = local_state._partition_rows_histogram[it.first + 1] - start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
map[i], new_block_wrapper->data_block.allocated_bytes(), false);
data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}});
local_state._shared_state->set_ready_to_read(map[i]);
it.second, new_block_wrapper->data_block.allocated_bytes(), false);
data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}});
local_state._shared_state->set_ready_to_read(it.second);
} else {
new_block_wrapper->unref(local_state._shared_state);
}
}
} else if (_num_senders != _num_sources || _ignore_source_data_distribution) {
new_block_wrapper->ref(_num_partitions);
for (size_t i = 0; i < _num_partitions; i++) {
size_t start = local_state._partition_rows_histogram[i];
size_t size = local_state._partition_rows_histogram[i + 1] - start;
@ -149,6 +149,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
}
}
} else {
new_block_wrapper->ref(_num_partitions);
auto map =
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
for (size_t i = 0; i < _num_partitions; i++) {

View File

@ -470,7 +470,8 @@ public:
Status init(const TDataSink& tsink) override;
[[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
const bool is_shuffled_hash_join) {
const bool is_shuffled_hash_join,
const std::map<int, int>& shuffle_idx_to_instance_idx) {
return Status::InternalError("init() is only implemented in local exchange!");
}

View File

@ -733,11 +733,10 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
: cur_pipe->sink_x()->is_shuffled_hash_join();
sink.reset(new LocalExchangeSinkOperatorX(
sink_id, local_exchange_id, is_shuffled_hash_join ? _total_instances : _num_instances,
data_distribution.partition_exprs, bucket_seq_to_instance_idx,
shuffle_idx_to_instance_idx));
data_distribution.partition_exprs, bucket_seq_to_instance_idx));
RETURN_IF_ERROR(new_pip->set_sink(sink));
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets,
is_shuffled_hash_join));
is_shuffled_hash_join, shuffle_idx_to_instance_idx));
// 2. Create and initialize LocalExchangeSharedState.
auto shared_state = LocalExchangeSharedState::create_shared();

View File

@ -244,6 +244,7 @@ private:
std::vector<std::unique_ptr<RuntimeFilterParamsContext>> _runtime_filter_states;
// Total instance num running on all BEs
int _total_instances = -1;
};