From 6be74d22eaec57df2ea58c957b052749a78c8b97 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Thu, 16 Nov 2023 12:08:22 +0800 Subject: [PATCH] [fix](nereids)fix bug that query infomation_schema.rowsets fe send fragment to one of muilti be. (#27025) Fixed the bug of incomplete query results when querying information_schema.rowsets in the case of multiple BEs. The reason is that the schema scanner sends the scan fragment to one of multiple bes, and be queries the information of fe through rpc. Since the rowsets information requires information about all BEs, the scan fragment needs to be sent to all BEs. --- .../translator/PhysicalPlanTranslator.java | 18 +++++++++++++++--- .../BackendPartitionedSchemaScanNode.java | 7 +++++++ .../apache/doris/planner/SchemaScanNode.java | 2 +- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 53ffb36e1e..5d92125263 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -140,6 +140,7 @@ import org.apache.doris.nereids.util.Utils; import org.apache.doris.planner.AggregationNode; import org.apache.doris.planner.AnalyticEvalNode; import org.apache.doris.planner.AssertNumRowsNode; +import org.apache.doris.planner.BackendPartitionedSchemaScanNode; import org.apache.doris.planner.CTEScanNode; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataStreamSink; @@ -725,13 +726,24 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor slots = ImmutableList.copyOf(schemaScan.getOutput()); TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); - SchemaScanNode scanNode = new SchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor); + + // For the information_schema.rowsets table, the scan fragment needs to be sent to all BEs. + // For other information_schema tables, the scan fragment only needs to be sent to one of the BEs. + SchemaScanNode scanNode = null; + if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable( + table.getName())) { + scanNode = new BackendPartitionedSchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor); + } else { + scanNode = new SchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor); + } + SchemaScanNode finalScanNode = scanNode; context.getRuntimeTranslator().ifPresent( runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(schemaScan.getRelationId()) - .forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context) + .forEach(expr -> runtimeFilterGenerator + .translateRuntimeFilterTarget(expr, finalScanNode, context) ) ); - scanNode.finalizeForNereids(); + Utils.execWithUncheckedException(scanNode::finalizeForNereids); context.addScanNode(scanNode); PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan); context.addPlanFragment(planFragment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java index 53f48c60e8..592fc3c96c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java @@ -88,6 +88,13 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode { createScanRangeLocations(); } + @Override + public void finalizeForNereids() throws UserException { + computeColumnsFilter(); + computePartitionInfo(); + createScanRangeLocations(); + } + @Override public List getScanRangeLocations(long maxScanRangeLength) { return scanRangeLocations; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java index 595d09792f..4a8a488dfc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java @@ -85,7 +85,7 @@ public class SchemaScanNode extends ScanNode { } @Override - public void finalizeForNereids() { + public void finalizeForNereids() throws UserException { // Convert predicates to MySQL columns and filters. frontendIP = FrontendOptions.getLocalHostAddress(); frontendPort = Config.rpc_port;