[fix](routine-load) fix get kafka offset timeout may too long (#33502)
This commit is contained in:
committed by
yiguolei
parent
341cb40693
commit
e0ec2da29b
@ -349,7 +349,7 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
|
||||
// corresponding partition.
|
||||
// See librdkafka/rdkafkacpp.h##offsetsForTimes()
|
||||
Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times,
|
||||
std::vector<PIntegerPair>* offsets) {
|
||||
std::vector<PIntegerPair>* offsets, int timeout) {
|
||||
// create topic partition
|
||||
std::vector<RdKafka::TopicPartition*> topic_partitions;
|
||||
for (const auto& entry : times) {
|
||||
@ -364,8 +364,8 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>&
|
||||
}};
|
||||
|
||||
// get offsets for times
|
||||
RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, 5000);
|
||||
if (err != RdKafka::ERR_NO_ERROR) {
|
||||
RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, timeout);
|
||||
if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
|
||||
std::stringstream ss;
|
||||
ss << "failed to get offsets for times: " << RdKafka::err2str(err);
|
||||
LOG(WARNING) << ss.str();
|
||||
@ -384,13 +384,21 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>&
|
||||
|
||||
// get latest offsets for given partitions
|
||||
Status KafkaDataConsumer::get_latest_offsets_for_partitions(
|
||||
const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets) {
|
||||
const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets,
|
||||
int timeout) {
|
||||
MonotonicStopWatch watch;
|
||||
watch.start();
|
||||
for (int32_t partition_id : partition_ids) {
|
||||
int64_t low = 0;
|
||||
int64_t high = 0;
|
||||
auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000);
|
||||
if (UNLIKELY(timeout_ms <= 0)) {
|
||||
return Status::InternalError("get kafka latest offsets for partitions timeout");
|
||||
}
|
||||
|
||||
RdKafka::ErrorCode err =
|
||||
_k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, 5000);
|
||||
if (err != RdKafka::ERR_NO_ERROR) {
|
||||
_k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, timeout_ms);
|
||||
if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
|
||||
std::stringstream ss;
|
||||
ss << "failed to get latest offset for partition: " << partition_id
|
||||
<< ", err: " << RdKafka::err2str(err);
|
||||
|
||||
@ -151,10 +151,10 @@ public:
|
||||
Status get_partition_meta(std::vector<int32_t>* partition_ids);
|
||||
// get offsets for times
|
||||
Status get_offsets_for_times(const std::vector<PIntegerPair>& times,
|
||||
std::vector<PIntegerPair>* offsets);
|
||||
std::vector<PIntegerPair>* offsets, int timeout);
|
||||
// get latest offsets for partitions
|
||||
Status get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids,
|
||||
std::vector<PIntegerPair>* offsets);
|
||||
std::vector<PIntegerPair>* offsets, int timeout);
|
||||
|
||||
private:
|
||||
std::string _brokers;
|
||||
|
||||
@ -130,7 +130,8 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
|
||||
}
|
||||
|
||||
Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(
|
||||
const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* partition_offsets) {
|
||||
const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* partition_offsets,
|
||||
int timeout) {
|
||||
CHECK(request.has_kafka_info());
|
||||
|
||||
// This context is meaningless, just for unifing the interface
|
||||
@ -142,7 +143,7 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(
|
||||
|
||||
Status st = std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_offsets_for_times(
|
||||
std::vector<PIntegerPair>(request.offset_times().begin(), request.offset_times().end()),
|
||||
partition_offsets);
|
||||
partition_offsets, timeout);
|
||||
if (st.ok()) {
|
||||
_data_consumer_pool.return_consumer(consumer);
|
||||
}
|
||||
@ -150,7 +151,8 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(
|
||||
}
|
||||
|
||||
Status RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(
|
||||
const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* partition_offsets) {
|
||||
const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* partition_offsets,
|
||||
int timeout) {
|
||||
CHECK(request.has_kafka_info());
|
||||
|
||||
// This context is meaningless, just for unifing the interface
|
||||
@ -165,7 +167,7 @@ Status RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(
|
||||
->get_latest_offsets_for_partitions(
|
||||
std::vector<int32_t>(request.partition_id_for_latest_offsets().begin(),
|
||||
request.partition_id_for_latest_offsets().end()),
|
||||
partition_offsets);
|
||||
partition_offsets, timeout);
|
||||
if (st.ok()) {
|
||||
_data_consumer_pool.return_consumer(consumer);
|
||||
}
|
||||
|
||||
@ -60,10 +60,12 @@ public:
|
||||
std::vector<int32_t>* partition_ids);
|
||||
|
||||
Status get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest& request,
|
||||
std::vector<PIntegerPair>* partition_offsets);
|
||||
std::vector<PIntegerPair>* partition_offsets,
|
||||
int timeout);
|
||||
|
||||
Status get_kafka_latest_offsets_for_partitions(const PKafkaMetaProxyRequest& request,
|
||||
std::vector<PIntegerPair>* partition_offsets);
|
||||
std::vector<PIntegerPair>* partition_offsets,
|
||||
int timeout);
|
||||
|
||||
private:
|
||||
// execute the task
|
||||
|
||||
@ -1090,6 +1090,7 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
|
||||
// Currently it supports 2 kinds of requests:
|
||||
// 1. get all kafka partition ids for given topic
|
||||
// 2. get all kafka partition offsets for given topic and timestamp.
|
||||
int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 5 * 1000;
|
||||
if (request->has_kafka_meta_request()) {
|
||||
const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request();
|
||||
if (!kafka_request.partition_id_for_latest_offsets().empty()) {
|
||||
@ -1097,7 +1098,8 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
|
||||
std::vector<PIntegerPair> partition_offsets;
|
||||
Status st = _exec_env->routine_load_task_executor()
|
||||
->get_kafka_latest_offsets_for_partitions(
|
||||
request->kafka_meta_request(), &partition_offsets);
|
||||
request->kafka_meta_request(), &partition_offsets,
|
||||
timeout_ms);
|
||||
if (st.ok()) {
|
||||
PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
|
||||
for (const auto& entry : partition_offsets) {
|
||||
@ -1113,7 +1115,8 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
|
||||
std::vector<PIntegerPair> partition_offsets;
|
||||
Status st = _exec_env->routine_load_task_executor()
|
||||
->get_kafka_partition_offsets_for_times(
|
||||
request->kafka_meta_request(), &partition_offsets);
|
||||
request->kafka_meta_request(), &partition_offsets,
|
||||
timeout_ms);
|
||||
if (st.ok()) {
|
||||
PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
|
||||
for (const auto& entry : partition_offsets) {
|
||||
|
||||
@ -42,6 +42,7 @@ import java.util.stream.Collectors;
|
||||
public class KafkaUtil {
|
||||
private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
|
||||
private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
|
||||
private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5;
|
||||
|
||||
public static List<Integer> getAllKafkaPartitions(String brokerList, String topic,
|
||||
Map<String, String> convertedCustomProperties) throws UserException {
|
||||
@ -128,11 +129,11 @@ public class KafkaUtil {
|
||||
}
|
||||
|
||||
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
|
||||
metaRequestBuilder).build();
|
||||
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
|
||||
|
||||
// get info
|
||||
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
|
||||
InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS);
|
||||
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
|
||||
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
throw new UserException("failed to get offsets for times: " + result.getStatus().getErrorMsgsList());
|
||||
@ -190,11 +191,11 @@ public class KafkaUtil {
|
||||
metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
|
||||
}
|
||||
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
|
||||
metaRequestBuilder).build();
|
||||
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
|
||||
|
||||
// get info
|
||||
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
|
||||
InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS);
|
||||
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
|
||||
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
throw new UserException("failed to get latest offsets: " + result.getStatus().getErrorMsgsList());
|
||||
|
||||
@ -395,6 +395,7 @@ message PKafkaMetaProxyRequest {
|
||||
|
||||
message PProxyRequest {
|
||||
optional PKafkaMetaProxyRequest kafka_meta_request = 1;
|
||||
optional int64 timeout_secs = 2;
|
||||
};
|
||||
|
||||
message PKafkaMetaProxyResult {
|
||||
|
||||
Reference in New Issue
Block a user