[refactor][routineload] Remove unused client object from routine load (#9223)

This commit is contained in:
xy720
2022-04-29 10:40:07 +08:00
committed by GitHub
parent d330bc3806
commit 93a41b2625

View File

@ -18,14 +18,12 @@
package org.apache.doris.common.util;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
@ -47,9 +45,7 @@ public class KafkaUtil {
public static List<Integer> getAllKafkaPartitions(String brokerList, String topic,
Map<String, String> convertedCustomProperties) throws UserException {
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
try {
List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
if (backendIds.isEmpty()) {
@ -89,12 +85,6 @@ public class KafkaUtil {
LOG.warn("failed to get partitions.", e);
throw new LoadException(
"Failed to get all partitions of kafka topic: " + topic + ". error: " + e.getMessage());
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
}
@ -104,10 +94,8 @@ public class KafkaUtil {
public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, String topic,
Map<String, String> convertedCustomProperties,
List<Pair<Integer, Long>> timestampOffsets) throws LoadException {
BackendService.Client client = null;
TNetworkAddress address = null;
LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets);
boolean ok = false;
try {
List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
if (backendIds.isEmpty()) {
@ -159,23 +147,15 @@ public class KafkaUtil {
LOG.warn("failed to get offsets for times.", e);
throw new LoadException(
"Failed to get offsets for times of kafka topic: " + topic + ". error: " + e.getMessage());
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
}
public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic,
Map<String, String> convertedCustomProperties,
List<Integer> partitionIds) throws LoadException {
BackendService.Client client = null;
TNetworkAddress address = null;
LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}",
partitionIds, topic, taskId, jobId);
boolean ok = false;
try {
List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
if (backendIds.isEmpty()) {
@ -226,12 +206,6 @@ public class KafkaUtil {
LOG.warn("failed to get latest offsets.", e);
throw new LoadException(
"Failed to get latest offsets of kafka topic: " + topic + ". error: " + e.getMessage());
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
}
}