[pipelineX](fix) Fix incorrect partition number (#29963)

This commit is contained in:
Gabriel
2024-01-15 11:49:45 +08:00
committed by yiguolei
parent 9282ff8e56
commit ffc6f58e85

View File

@ -728,21 +728,24 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
// 1. Create a new pipeline with local exchange sink.
DataSinkOperatorXPtr sink;
auto sink_id = next_sink_operator_id();
const bool is_shuffled_hash_join = operator_xs.size() > idx
? operator_xs[idx]->is_shuffled_hash_join()
: cur_pipe->sink_x()->is_shuffled_hash_join();
sink.reset(new LocalExchangeSinkOperatorX(
sink_id, local_exchange_id, _total_instances, data_distribution.partition_exprs,
bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
sink_id, local_exchange_id, is_shuffled_hash_join ? _total_instances : _num_instances,
data_distribution.partition_exprs, bucket_seq_to_instance_idx,
shuffle_idx_to_instance_idx));
RETURN_IF_ERROR(new_pip->set_sink(sink));
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets,
operator_xs.size() > idx
? operator_xs[idx]->is_shuffled_hash_join()
: cur_pipe->sink_x()->is_shuffled_hash_join()));
is_shuffled_hash_join));
// 2. Create and initialize LocalExchangeSharedState.
auto shared_state = LocalExchangeSharedState::create_shared();
switch (data_distribution.distribution_type) {
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
std::max(cur_pipe->num_tasks(), _num_instances), _total_instances);
std::max(cur_pipe->num_tasks(), _num_instances),
is_shuffled_hash_join ? _total_instances : _num_instances);
break;
case ExchangeType::BUCKET_HASH_SHUFFLE:
shared_state->exchanger = BucketShuffleExchanger::create_unique(