diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 7f7be8f51b..e53bc2b2a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -99,6 +99,8 @@ public abstract class FileQueryScanNode extends FileScanNode { @Getter protected TableSample tableSample; + protected String brokerName; + /** * External file scan node for Query hms table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -318,7 +320,6 @@ public abstract class FileQueryScanNode extends FileScanNode { for (Split split : inputSplits) { FileSplit fileSplit = (FileSplit) split; TFileType locationType = getLocationType(fileSplit.getPath().toString()); - setLocationPropertiesIfNecessary(locationType, locationProperties); TScanRangeLocations curLocations = newLocations(); // If fileSplit has partition values, use the values collected from hive partitions. @@ -378,6 +379,7 @@ public abstract class FileQueryScanNode extends FileScanNode { } else { selectedBackend = backendPolicy.getNextBe(); } + setLocationPropertiesIfNecessary(selectedBackend, locationType, locationProperties); location.setBackendId(selectedBackend.getId()); location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); curLocations.addToLocations(location); @@ -394,7 +396,7 @@ public abstract class FileQueryScanNode extends FileScanNode { scanRangeLocations.size(), (System.currentTimeMillis() - start)); } - private void setLocationPropertiesIfNecessary(TFileType locationType, + private void setLocationPropertiesIfNecessary(Backend selectedBackend, TFileType locationType, Map locationProperties) throws UserException { if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { if (!params.isSetHdfsParams()) { @@ -407,7 +409,15 @@ public abstract class FileQueryScanNode extends FileScanNode { params.setProperties(locationProperties); if (!params.isSetBrokerAddresses()) { - FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); + FsBroker broker; + if (brokerName != null) { + broker = Env.getCurrentEnv().getBrokerMgr().getBroker(brokerName, selectedBackend.getHost()); + LOG.debug(String.format( + "Set location for broker [%s], selected BE host: [%s] selected broker host: [%s]", + brokerName, selectedBackend.getHost(), broker.host)); + } else { + broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); + } if (broker == null) { throw new UserException("No alive broker."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index d414181048..943d30017e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -100,12 +100,14 @@ public class HiveScanNode extends FileQueryScanNode { public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv); hmsTable = (HMSExternalTable) desc.getTable(); + brokerName = hmsTable.getCatalog().bindBrokerName(); } public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); hmsTable = (HMSExternalTable) desc.getTable(); + brokerName = hmsTable.getCatalog().bindBrokerName(); } @Override