[branch-2.1](routine-load) add retry when get Kafka meta info (#37371)
pick #35376
This commit is contained in:
@ -43,24 +43,11 @@ import java.util.stream.Collectors;
|
||||
public class KafkaUtil {
|
||||
private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
|
||||
private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
|
||||
private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5;
|
||||
private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10;
|
||||
|
||||
public static List<Integer> getAllKafkaPartitions(String brokerList, String topic,
|
||||
Map<String, String> convertedCustomProperties) throws UserException {
|
||||
TNetworkAddress address = null;
|
||||
Backend be = null;
|
||||
long beId = -1L;
|
||||
try {
|
||||
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
if (backendIds.isEmpty()) {
|
||||
throw new LoadException("Failed to get all partitions. No alive backends");
|
||||
}
|
||||
Collections.shuffle(backendIds);
|
||||
beId = backendIds.get(0);
|
||||
be = Env.getCurrentSystemInfo().getBackend(beId);
|
||||
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
|
||||
|
||||
// create request
|
||||
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
|
||||
InternalService.PKafkaMetaProxyRequest.newBuilder()
|
||||
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
|
||||
@ -72,21 +59,10 @@ public class KafkaUtil {
|
||||
)
|
||||
)
|
||||
).build();
|
||||
|
||||
// get info
|
||||
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
|
||||
InternalService.PProxyResult result = future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS);
|
||||
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList());
|
||||
} else {
|
||||
return result.getKafkaMetaResult().getPartitionIdsList();
|
||||
}
|
||||
return getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to get partitions from backend[{}].", beId, e);
|
||||
throw new LoadException(
|
||||
"Failed to get all partitions of kafka topic: " + topic + " from backend[" + beId
|
||||
+ "]. error: " + e.getMessage());
|
||||
"Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@ -96,20 +72,10 @@ public class KafkaUtil {
|
||||
public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, String topic,
|
||||
Map<String, String> convertedCustomProperties, List<Pair<Integer, Long>> timestampOffsets)
|
||||
throws LoadException {
|
||||
TNetworkAddress address = null;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets);
|
||||
}
|
||||
try {
|
||||
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
if (backendIds.isEmpty()) {
|
||||
throw new LoadException("Failed to get offset for times. No alive backends");
|
||||
}
|
||||
Collections.shuffle(backendIds);
|
||||
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
|
||||
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
|
||||
|
||||
// create request
|
||||
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
|
||||
InternalService.PKafkaMetaProxyRequest.newBuilder()
|
||||
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
|
||||
@ -131,24 +97,17 @@ public class KafkaUtil {
|
||||
|
||||
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
|
||||
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
|
||||
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);
|
||||
|
||||
// get info
|
||||
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
|
||||
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
|
||||
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
throw new UserException("failed to get offsets for times: " + result.getStatus().getErrorMsgsList());
|
||||
} else {
|
||||
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
|
||||
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
|
||||
for (InternalService.PIntegerPair pair : pairs) {
|
||||
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets);
|
||||
}
|
||||
return partitionOffsets;
|
||||
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
|
||||
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
|
||||
for (InternalService.PIntegerPair pair : pairs) {
|
||||
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets);
|
||||
}
|
||||
return partitionOffsets;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to get offsets for times.", e);
|
||||
throw new LoadException(
|
||||
@ -159,21 +118,11 @@ public class KafkaUtil {
|
||||
public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic,
|
||||
Map<String, String> convertedCustomProperties,
|
||||
List<Integer> partitionIds) throws LoadException {
|
||||
TNetworkAddress address = null;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}",
|
||||
partitionIds, topic, taskId, jobId);
|
||||
}
|
||||
try {
|
||||
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
if (backendIds.isEmpty()) {
|
||||
throw new LoadException("Failed to get latest offsets. No alive backends");
|
||||
}
|
||||
Collections.shuffle(backendIds);
|
||||
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
|
||||
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
|
||||
|
||||
// create request
|
||||
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
|
||||
InternalService.PKafkaMetaProxyRequest.newBuilder()
|
||||
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
|
||||
@ -193,25 +142,18 @@ public class KafkaUtil {
|
||||
}
|
||||
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
|
||||
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
|
||||
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);
|
||||
|
||||
// get info
|
||||
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
|
||||
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
|
||||
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
throw new UserException("failed to get latest offsets: " + result.getStatus().getErrorMsgsList());
|
||||
} else {
|
||||
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
|
||||
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
|
||||
for (InternalService.PIntegerPair pair : pairs) {
|
||||
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}",
|
||||
partitionOffsets, topic, taskId, jobId);
|
||||
}
|
||||
return partitionOffsets;
|
||||
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
|
||||
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
|
||||
for (InternalService.PIntegerPair pair : pairs) {
|
||||
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}",
|
||||
partitionOffsets, topic, taskId, jobId);
|
||||
}
|
||||
return partitionOffsets;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to get latest offsets.", e);
|
||||
throw new LoadException(
|
||||
@ -239,17 +181,7 @@ public class KafkaUtil {
|
||||
return offsets;
|
||||
}
|
||||
|
||||
TNetworkAddress address = null;
|
||||
try {
|
||||
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
if (backendIds.isEmpty()) {
|
||||
throw new LoadException("Failed to get real offsets. No alive backends");
|
||||
}
|
||||
Collections.shuffle(backendIds);
|
||||
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
|
||||
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
|
||||
|
||||
// create request
|
||||
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
|
||||
InternalService.PKafkaMetaProxyRequest.newBuilder()
|
||||
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
|
||||
@ -270,27 +202,56 @@ public class KafkaUtil {
|
||||
}
|
||||
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
|
||||
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
|
||||
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);
|
||||
|
||||
// get info
|
||||
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
|
||||
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
|
||||
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
throw new UserException("failed to get real offsets: " + result.getStatus().getErrorMsgsList());
|
||||
} else {
|
||||
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
|
||||
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
|
||||
for (InternalService.PIntegerPair pair : pairs) {
|
||||
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
|
||||
}
|
||||
realOffsets.addAll(partitionOffsets);
|
||||
LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic);
|
||||
return realOffsets;
|
||||
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
|
||||
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
|
||||
for (InternalService.PIntegerPair pair : pairs) {
|
||||
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
|
||||
}
|
||||
realOffsets.addAll(partitionOffsets);
|
||||
LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic);
|
||||
return realOffsets;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to get real offsets.", e);
|
||||
throw new LoadException(
|
||||
"Failed to get real offsets of kafka topic: " + topic + ". error: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout)
|
||||
throws LoadException {
|
||||
int retryTimes = 0;
|
||||
TNetworkAddress address = null;
|
||||
Future<InternalService.PProxyResult> future = null;
|
||||
InternalService.PProxyResult result = null;
|
||||
while (retryTimes < 3) {
|
||||
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
if (backendIds.isEmpty()) {
|
||||
throw new LoadException("Failed to get info. No alive backends");
|
||||
}
|
||||
Collections.shuffle(backendIds);
|
||||
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
|
||||
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
|
||||
|
||||
try {
|
||||
future = BackendServiceProxy.getInstance().getInfo(address, request);
|
||||
result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
|
||||
retryTimes++;
|
||||
continue;
|
||||
}
|
||||
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
LOG.warn("failed to get info request to "
|
||||
+ address + " err " + result.getStatus().getErrorMsgsList());
|
||||
retryTimes++;
|
||||
} else {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
throw new LoadException("Failed to get info");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user