From f8b61d3d8e816fc2f328ac6d7dc7c94667576977 Mon Sep 17 00:00:00 2001 From: DuRipeng <453243496@qq.com> Date: Fri, 17 Nov 2023 18:29:55 +0800 Subject: [PATCH] [Enhance](fe) select BE local broker to scan Hive table when 'broker.name' in hms catalog is specified (#27122) Since #24830 introduce `broker.name` in hms catalog, data scan will run on specified brokers. And [doris operator](https://github.com/selectdb/doris-operator) support BE and broker deployed in same pod, BE access local broker is the fastest approach to access data. In previous logic, every inputSplit will select one BE to execute, then randomly select one broker for actual data access, BE and related broker are always located on separate K8S pod. This pr optimizes the broker select strategy to prioritize BE-local broker when `broker.name` is specified in hms catalog. --- .../planner/external/FileQueryScanNode.java | 16 +++++++++++++--- .../doris/planner/external/HiveScanNode.java | 2 ++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 7f7be8f51b..e53bc2b2a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -99,6 +99,8 @@ public abstract class FileQueryScanNode extends FileScanNode { @Getter protected TableSample tableSample; + protected String brokerName; + /** * External file scan node for Query hms table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -318,7 +320,6 @@ public abstract class FileQueryScanNode extends FileScanNode { for (Split split : inputSplits) { FileSplit fileSplit = (FileSplit) split; TFileType locationType = getLocationType(fileSplit.getPath().toString()); - setLocationPropertiesIfNecessary(locationType, locationProperties); TScanRangeLocations curLocations = newLocations(); // If fileSplit has partition values, use the values collected from hive partitions. @@ -378,6 +379,7 @@ public abstract class FileQueryScanNode extends FileScanNode { } else { selectedBackend = backendPolicy.getNextBe(); } + setLocationPropertiesIfNecessary(selectedBackend, locationType, locationProperties); location.setBackendId(selectedBackend.getId()); location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); curLocations.addToLocations(location); @@ -394,7 +396,7 @@ public abstract class FileQueryScanNode extends FileScanNode { scanRangeLocations.size(), (System.currentTimeMillis() - start)); } - private void setLocationPropertiesIfNecessary(TFileType locationType, + private void setLocationPropertiesIfNecessary(Backend selectedBackend, TFileType locationType, Map locationProperties) throws UserException { if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { if (!params.isSetHdfsParams()) { @@ -407,7 +409,15 @@ public abstract class FileQueryScanNode extends FileScanNode { params.setProperties(locationProperties); if (!params.isSetBrokerAddresses()) { - FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); + FsBroker broker; + if (brokerName != null) { + broker = Env.getCurrentEnv().getBrokerMgr().getBroker(brokerName, selectedBackend.getHost()); + LOG.debug(String.format( + "Set location for broker [%s], selected BE host: [%s] selected broker host: [%s]", + brokerName, selectedBackend.getHost(), broker.host)); + } else { + broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); + } if (broker == null) { throw new UserException("No alive broker."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index d414181048..943d30017e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -100,12 +100,14 @@ public class HiveScanNode extends FileQueryScanNode { public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv); hmsTable = (HMSExternalTable) desc.getTable(); + brokerName = hmsTable.getCatalog().bindBrokerName(); } public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); hmsTable = (HMSExternalTable) desc.getTable(); + brokerName = hmsTable.getCatalog().bindBrokerName(); } @Override