[fix](npe) fix kafka be id npe (#33151)

This commit is contained in:
yujun
2024-04-08 08:49:09 +08:00
committed by yiguolei
parent 741d4ff97e
commit 159ebc76e7

View File

@ -47,13 +47,15 @@ public class KafkaUtil {
Map<String, String> convertedCustomProperties) throws UserException {
TNetworkAddress address = null;
Backend be = null;
long beId = -1L;
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get all partitions. No alive backends");
}
Collections.shuffle(backendIds);
be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
beId = backendIds.get(0);
be = Env.getCurrentSystemInfo().getBackend(beId);
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
// create request
@ -79,9 +81,9 @@ public class KafkaUtil {
return result.getKafkaMetaResult().getPartitionIdsList();
}
} catch (Exception e) {
LOG.warn("failed to get partitions from backend[{}].", be.getId(), e);
LOG.warn("failed to get partitions from backend[{}].", beId, e);
throw new LoadException(
"Failed to get all partitions of kafka topic: " + topic + " from backend[" + be.getId()
"Failed to get all partitions of kafka topic: " + topic + " from backend[" + beId
+ "]. error: " + e.getMessage());
}
}