From 93a41b2625af74255487b3ac439ec293b9682ee0 Mon Sep 17 00:00:00 2001 From: xy720 <22125576+xy720@users.noreply.github.com> Date: Fri, 29 Apr 2022 10:40:07 +0800 Subject: [PATCH] [refactor][routineload] Remove unused client object from routine load (#9223) --- .../apache/doris/common/util/KafkaUtil.java | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java index ecbb897ab5..3739ab22cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java @@ -18,14 +18,12 @@ package org.apache.doris.common.util; import org.apache.doris.catalog.Catalog; -import org.apache.doris.common.ClientPool; import org.apache.doris.common.LoadException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.proto.InternalService; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.system.Backend; -import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; @@ -47,9 +45,7 @@ public class KafkaUtil { public static List getAllKafkaPartitions(String brokerList, String topic, Map convertedCustomProperties) throws UserException { - BackendService.Client client = null; TNetworkAddress address = null; - boolean ok = false; try { List backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true); if (backendIds.isEmpty()) { @@ -89,12 +85,6 @@ public class KafkaUtil { LOG.warn("failed to get partitions.", e); throw new LoadException( "Failed to get all partitions of kafka topic: " + topic + ". error: " + e.getMessage()); - } finally { - if (ok) { - ClientPool.backendPool.returnObject(address, client); - } else { - ClientPool.backendPool.invalidateObject(address, client); - } } } @@ -104,10 +94,8 @@ public class KafkaUtil { public static List> getOffsetsForTimes(String brokerList, String topic, Map convertedCustomProperties, List> timestampOffsets) throws LoadException { - BackendService.Client client = null; TNetworkAddress address = null; LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets); - boolean ok = false; try { List backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true); if (backendIds.isEmpty()) { @@ -159,23 +147,15 @@ public class KafkaUtil { LOG.warn("failed to get offsets for times.", e); throw new LoadException( "Failed to get offsets for times of kafka topic: " + topic + ". error: " + e.getMessage()); - } finally { - if (ok) { - ClientPool.backendPool.returnObject(address, client); - } else { - ClientPool.backendPool.invalidateObject(address, client); - } } } public static List> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic, Map convertedCustomProperties, List partitionIds) throws LoadException { - BackendService.Client client = null; TNetworkAddress address = null; LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}", partitionIds, topic, taskId, jobId); - boolean ok = false; try { List backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true); if (backendIds.isEmpty()) { @@ -226,12 +206,6 @@ public class KafkaUtil { LOG.warn("failed to get latest offsets.", e); throw new LoadException( "Failed to get latest offsets of kafka topic: " + topic + ". error: " + e.getMessage()); - } finally { - if (ok) { - ClientPool.backendPool.returnObject(address, client); - } else { - ClientPool.backendPool.invalidateObject(address, client); - } } } }