[branch-2.1](routine-load) make get Kafka meta timeout configurable (#37399)

pick #36619
This commit is contained in:
hui lai
2024-07-08 10:39:17 +08:00
committed by GitHub
parent af7b69da48
commit dd18652861
3 changed files with 17 additions and 11 deletions

View File

@ -18,6 +18,7 @@
package org.apache.doris.datasource.kafka;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
@ -42,8 +43,6 @@ import java.util.stream.Collectors;
public class KafkaUtil {
private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10;
public static List<Integer> getAllKafkaPartitions(String brokerList, String topic,
Map<String, String> convertedCustomProperties) throws UserException {
@ -59,7 +58,8 @@ public class KafkaUtil {
)
)
).build();
return getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList();
return getInfoRequest(request, Config.max_get_kafka_meta_timeout_second)
.getKafkaMetaResult().getPartitionIdsList();
} catch (Exception e) {
throw new LoadException(
"Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage());
@ -96,8 +96,8 @@ public class KafkaUtil {
}
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second);
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@ -141,8 +141,8 @@ public class KafkaUtil {
metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
}
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second);
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@ -201,8 +201,8 @@ public class KafkaUtil {
.setVal(pair.second).build());
}
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second);
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@ -236,7 +236,7 @@ public class KafkaUtil {
try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
retryTimes++;