In Coordinator, a shuffle map consists of `recvrId` in each instance.
For example, if 3 BEs exist in a cluster, for a shuffled hash join, we
get 3 maps for a fragment sent to each BE:
BE0: {0:0, 1:1}
BE1: {2:0, 3:1}
BE2: {4:0, 5:1}
In this example, parallelism is 2. Keys in shuffle map indicate the
global shuffle id and the values indicate the instance id in current BE.
In Coordinator, the `recvrId` is the global shuffle id of each instance
so we may get a wrong result if it is wrong.
This bug is caused by `recvrId` set by a BHJ fragment. If a fragment
contains both BHJ and SHJ, `recvrId` should be set by SHJ and BHJ should
be ignored.
This commit is contained in:
@ -157,12 +157,14 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
|
||||
const auto& map = local_state._parent->cast<LocalExchangeSinkOperatorX>()
|
||||
._shuffle_idx_to_instance_idx;
|
||||
new_block_wrapper->ref(map.size());
|
||||
uint32_t enqueue_rows = 0;
|
||||
for (const auto& it : map) {
|
||||
DCHECK(it.second >= 0 && it.second < _num_partitions)
|
||||
<< it.first << " : " << it.second << " " << _num_partitions;
|
||||
uint32_t start = local_state._partition_rows_histogram[it.first];
|
||||
uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start;
|
||||
if (size > 0) {
|
||||
enqueue_rows += size;
|
||||
local_state._shared_state->add_mem_usage(
|
||||
it.second, new_block_wrapper->data_block.allocated_bytes(), false);
|
||||
|
||||
@ -176,6 +178,18 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
|
||||
new_block_wrapper->unref(local_state._shared_state);
|
||||
}
|
||||
}
|
||||
if (enqueue_rows != rows) [[unlikely]] {
|
||||
fmt::memory_buffer debug_string_buffer;
|
||||
fmt::format_to(debug_string_buffer, "Type: {}, Local Exchange Id: {}, Shuffled Map: ",
|
||||
get_exchange_type_name(get_type()), local_state.parent()->node_id());
|
||||
for (const auto& it : map) {
|
||||
fmt::format_to(debug_string_buffer, "[{}:{}], ", it.first, it.second);
|
||||
}
|
||||
return Status::InternalError(
|
||||
"Rows mismatched! Data may be lost. [Expected enqueue rows={}, Real enqueue "
|
||||
"rows={}, Detail: {}]",
|
||||
rows, enqueue_rows, fmt::to_string(debug_string_buffer));
|
||||
}
|
||||
} else if (_num_senders != _num_sources || _ignore_source_data_distribution) {
|
||||
// In this branch, data just should be distributed equally into all instances.
|
||||
new_block_wrapper->ref(_num_partitions);
|
||||
|
||||
@ -1745,7 +1745,6 @@ public class Coordinator implements CoordInterface {
|
||||
destHosts.put(param.host, param);
|
||||
param.buildHashTableForBroadcastJoin = true;
|
||||
TPlanFragmentDestination dest = new TPlanFragmentDestination();
|
||||
param.recvrId = params.destinations.size();
|
||||
dest.fragment_instance_id = param.instanceId;
|
||||
try {
|
||||
dest.server = toRpcHost(param.host);
|
||||
@ -1870,7 +1869,6 @@ public class Coordinator implements CoordInterface {
|
||||
param.buildHashTableForBroadcastJoin = true;
|
||||
TPlanFragmentDestination dest = new TPlanFragmentDestination();
|
||||
dest.fragment_instance_id = param.instanceId;
|
||||
param.recvrId = params.destinations.size();
|
||||
try {
|
||||
dest.server = toRpcHost(param.host);
|
||||
dest.setBrpcServer(toBrpcHost(param.host));
|
||||
|
||||
Reference in New Issue
Block a user