[fix](nereids)fix bug that query infomation_schema.rowsets fe send fragment to one of muilti be. (#27025)
Fixed the bug of incomplete query results when querying information_schema.rowsets in the case of multiple BEs. The reason is that the schema scanner sends the scan fragment to one of multiple bes, and be queries the information of fe through rpc. Since the rowsets information requires information about all BEs, the scan fragment needs to be sent to all BEs.
This commit is contained in:
@ -140,6 +140,7 @@ import org.apache.doris.nereids.util.Utils;
|
||||
import org.apache.doris.planner.AggregationNode;
|
||||
import org.apache.doris.planner.AnalyticEvalNode;
|
||||
import org.apache.doris.planner.AssertNumRowsNode;
|
||||
import org.apache.doris.planner.BackendPartitionedSchemaScanNode;
|
||||
import org.apache.doris.planner.CTEScanNode;
|
||||
import org.apache.doris.planner.DataPartition;
|
||||
import org.apache.doris.planner.DataStreamSink;
|
||||
@ -725,13 +726,24 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
Table table = schemaScan.getTable();
|
||||
List<Slot> slots = ImmutableList.copyOf(schemaScan.getOutput());
|
||||
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context);
|
||||
SchemaScanNode scanNode = new SchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor);
|
||||
|
||||
// For the information_schema.rowsets table, the scan fragment needs to be sent to all BEs.
|
||||
// For other information_schema tables, the scan fragment only needs to be sent to one of the BEs.
|
||||
SchemaScanNode scanNode = null;
|
||||
if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable(
|
||||
table.getName())) {
|
||||
scanNode = new BackendPartitionedSchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor);
|
||||
} else {
|
||||
scanNode = new SchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor);
|
||||
}
|
||||
SchemaScanNode finalScanNode = scanNode;
|
||||
context.getRuntimeTranslator().ifPresent(
|
||||
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(schemaScan.getRelationId())
|
||||
.forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
|
||||
.forEach(expr -> runtimeFilterGenerator
|
||||
.translateRuntimeFilterTarget(expr, finalScanNode, context)
|
||||
)
|
||||
);
|
||||
scanNode.finalizeForNereids();
|
||||
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
|
||||
context.addScanNode(scanNode);
|
||||
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan);
|
||||
context.addPlanFragment(planFragment);
|
||||
|
||||
@ -88,6 +88,13 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode {
|
||||
createScanRangeLocations();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalizeForNereids() throws UserException {
|
||||
computeColumnsFilter();
|
||||
computePartitionInfo();
|
||||
createScanRangeLocations();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
|
||||
return scanRangeLocations;
|
||||
|
||||
@ -85,7 +85,7 @@ public class SchemaScanNode extends ScanNode {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalizeForNereids() {
|
||||
public void finalizeForNereids() throws UserException {
|
||||
// Convert predicates to MySQL columns and filters.
|
||||
frontendIP = FrontendOptions.getLocalHostAddress();
|
||||
frontendPort = Config.rpc_port;
|
||||
|
||||
Reference in New Issue
Block a user