diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java index aee2aadac2..12a32da5a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java @@ -47,13 +47,15 @@ public class KafkaUtil { Map convertedCustomProperties) throws UserException { TNetworkAddress address = null; Backend be = null; + long beId = -1L; try { List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); if (backendIds.isEmpty()) { throw new LoadException("Failed to get all partitions. No alive backends"); } Collections.shuffle(backendIds); - be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); + beId = backendIds.get(0); + be = Env.getCurrentSystemInfo().getBackend(beId); address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); // create request @@ -79,9 +81,9 @@ public class KafkaUtil { return result.getKafkaMetaResult().getPartitionIdsList(); } } catch (Exception e) { - LOG.warn("failed to get partitions from backend[{}].", be.getId(), e); + LOG.warn("failed to get partitions from backend[{}].", beId, e); throw new LoadException( - "Failed to get all partitions of kafka topic: " + topic + " from backend[" + be.getId() + "Failed to get all partitions of kafka topic: " + topic + " from backend[" + beId + "]. error: " + e.getMessage()); } }