[branch-2.1](routine-load) do not schedule task when there is no data (#34654)

This commit is contained in:
HHoflittlefish777
2024-05-11 11:01:18 +08:00
committed by GitHub
parent dd1b54cf62
commit 7ba66c5890
13 changed files with 489 additions and 14 deletions

View File

@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -217,4 +218,79 @@ public class KafkaUtil {
"Failed to get latest offsets of kafka topic: " + topic + ". error: " + e.getMessage());
}
}
public static List<Pair<Integer, Long>> getRealOffsets(String brokerList, String topic,
Map<String, String> convertedCustomProperties,
List<Pair<Integer, Long>> offsets)
throws LoadException {
// filter values greater than 0 as these offsets is real offset
// only update offset like OFFSET_BEGINNING or OFFSET_END
List<Pair<Integer, Long>> offsetFlags = new ArrayList<>();
List<Pair<Integer, Long>> realOffsets = new ArrayList<>();
for (Pair<Integer, Long> pair : offsets) {
if (pair.second < 0) {
offsetFlags.add(pair);
} else {
realOffsets.add(pair);
}
}
if (offsetFlags.size() == 0) {
LOG.info("do not need update and directly return offsets for partitions {} in topic: {}", offsets, topic);
return offsets;
}
TNetworkAddress address = null;
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get real offsets. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
// create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
.setBrokers(brokerList)
.setTopic(topic)
.addAllProperties(
convertedCustomProperties.entrySet().stream().map(
e -> InternalService.PStringPair.newBuilder()
.setKey(e.getKey())
.setVal(e.getValue())
.build()
).collect(Collectors.toList())
)
);
for (Pair<Integer, Long> pair : offsetFlags) {
metaRequestBuilder.addOffsetFlags(InternalService.PIntegerPair.newBuilder().setKey(pair.first)
.setVal(pair.second).build());
}
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get real offsets: " + result.getStatus().getErrorMsgsList());
} else {
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
realOffsets.addAll(partitionOffsets);
LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic);
return realOffsets;
}
} catch (Exception e) {
LOG.warn("failed to get real offsets.", e);
throw new LoadException(
"Failed to get real offsets of kafka topic: " + topic + ". error: " + e.getMessage());
}
}
}

View File

@ -512,17 +512,20 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
for (Integer kafkaPartition : newPartitions) {
partitionOffsets.add(Pair.of(kafkaPartition, beginOffset));
}
if (isOffsetForTimes()) {
try {
try {
if (isOffsetForTimes()) {
partitionOffsets = KafkaUtil.getOffsetsForTimes(this.brokerList,
this.topic, convertedCustomProperties, partitionOffsets);
} catch (LoadException e) {
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("partition:timestamp", Joiner.on(",").join(partitionOffsets))
.add("error_msg", "Job failed to fetch current offsets from times with error " + e.getMessage())
.build(), e);
throw new UserException(e);
} else {
partitionOffsets = KafkaUtil.getRealOffsets(this.brokerList,
this.topic, convertedCustomProperties, partitionOffsets);
}
} catch (LoadException e) {
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("partition:", Joiner.on(",").join(partitionOffsets))
.add("error_msg", "Job failed to fetch current offsets with error " + e.getMessage())
.build(), e);
throw new UserException(e);
}
return partitionOffsets;
}
@ -552,6 +555,10 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
kafkaPartitionOffsets = KafkaUtil.getOffsetsForTimes(kafkaDataSourceProperties.getBrokerList(),
kafkaDataSourceProperties.getTopic(),
convertedCustomProperties, kafkaDataSourceProperties.getKafkaPartitionOffsets());
} else {
kafkaPartitionOffsets = KafkaUtil.getRealOffsets(kafkaDataSourceProperties.getBrokerList(),
kafkaDataSourceProperties.getTopic(),
convertedCustomProperties, kafkaDataSourceProperties.getKafkaPartitionOffsets());
}
for (Pair<Integer, Long> partitionOffset : kafkaPartitionOffsets) {
@ -638,9 +645,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException {
Map<String, String> jobProperties = stmt.getAnalyzedJobProperties();
KafkaDataSourceProperties dataSourceProperties = (KafkaDataSourceProperties) stmt.getDataSourceProperties();
if (null != dataSourceProperties && dataSourceProperties.isOffsetsForTimes()) {
if (null != dataSourceProperties) {
// if the partition offset is set by timestamp, convert it to real offset
convertTimestampToOffset(dataSourceProperties);
convertOffset(dataSourceProperties);
}
writeLock();
@ -659,13 +666,17 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
}
private void convertTimestampToOffset(KafkaDataSourceProperties dataSourceProperties) throws UserException {
private void convertOffset(KafkaDataSourceProperties dataSourceProperties) throws UserException {
List<Pair<Integer, Long>> partitionOffsets = dataSourceProperties.getKafkaPartitionOffsets();
if (partitionOffsets.isEmpty()) {
return;
}
List<Pair<Integer, Long>> newOffsets = KafkaUtil.getOffsetsForTimes(brokerList, topic,
convertedCustomProperties, partitionOffsets);
List<Pair<Integer, Long>> newOffsets;
if (dataSourceProperties.isOffsetsForTimes()) {
newOffsets = KafkaUtil.getOffsetsForTimes(brokerList, topic, convertedCustomProperties, partitionOffsets);
} else {
newOffsets = KafkaUtil.getRealOffsets(brokerList, topic, convertedCustomProperties, partitionOffsets);
}
dataSourceProperties.setKafkaPartitionOffsets(newOffsets);
}