diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 016b35b0f2..33312201f4 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -157,12 +157,14 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest const auto& map = local_state._parent->cast() ._shuffle_idx_to_instance_idx; new_block_wrapper->ref(map.size()); + uint32_t enqueue_rows = 0; for (const auto& it : map) { DCHECK(it.second >= 0 && it.second < _num_partitions) << it.first << " : " << it.second << " " << _num_partitions; uint32_t start = local_state._partition_rows_histogram[it.first]; uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start; if (size > 0) { + enqueue_rows += size; local_state._shared_state->add_mem_usage( it.second, new_block_wrapper->data_block.allocated_bytes(), false); @@ -176,6 +178,18 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest new_block_wrapper->unref(local_state._shared_state); } } + if (enqueue_rows != rows) [[unlikely]] { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "Type: {}, Local Exchange Id: {}, Shuffled Map: ", + get_exchange_type_name(get_type()), local_state.parent()->node_id()); + for (const auto& it : map) { + fmt::format_to(debug_string_buffer, "[{}:{}], ", it.first, it.second); + } + return Status::InternalError( + "Rows mismatched! Data may be lost. [Expected enqueue rows={}, Real enqueue " + "rows={}, Detail: {}]", + rows, enqueue_rows, fmt::to_string(debug_string_buffer)); + } } else if (_num_senders != _num_sources || _ignore_source_data_distribution) { // In this branch, data just should be distributed equally into all instances. new_block_wrapper->ref(_num_partitions); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index df6081626c..82d24ef3fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1745,7 +1745,6 @@ public class Coordinator implements CoordInterface { destHosts.put(param.host, param); param.buildHashTableForBroadcastJoin = true; TPlanFragmentDestination dest = new TPlanFragmentDestination(); - param.recvrId = params.destinations.size(); dest.fragment_instance_id = param.instanceId; try { dest.server = toRpcHost(param.host); @@ -1870,7 +1869,6 @@ public class Coordinator implements CoordInterface { param.buildHashTableForBroadcastJoin = true; TPlanFragmentDestination dest = new TPlanFragmentDestination(); dest.fragment_instance_id = param.instanceId; - param.recvrId = params.destinations.size(); try { dest.server = toRpcHost(param.host); dest.setBrpcServer(toBrpcHost(param.host));