[Fix](ScanNode) Move the finalize phase of ScanNode to after the end of the Physical Translate phase (#38604)

bp: #37565

Currently, Doris first obtains splits and then performs projection.
After column pruning, it calls `updateRequiredSlots` to update the
scanRange information. However, the Trino connector's column pruning
pushdown needs to be completed before obtaining splits.

Therefore, we move the finalize phase of `ScanNode` to after the end of
the `Physical Translate` phase, so that `createScanRangeLocations` can
use the final columns which have been pruning.

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->
This commit is contained in:
Tiewei Fang
2024-08-05 08:58:59 +08:00
committed by GitHub
parent 8fa0710cb3
commit 40767003c6
7 changed files with 17 additions and 85 deletions

View File

@ -19,7 +19,6 @@ package org.apache.doris.datasource;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
@ -40,7 +39,6 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.datasource.iceberg.source.IcebergSplit;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
@ -80,7 +78,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* FileQueryScanNode for querying the file access type of catalog, now only support
@ -182,16 +179,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
params.setSrcTupleId(-1);
}
/**
* Reset required_slots in contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
*/
@Override
public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
updateRequiredSlots();
}
private void updateRequiredSlots() throws UserException {
params.unsetRequiredSlots();
for (SlotDescriptor slot : desc.getSlots()) {

View File

@ -27,7 +27,6 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
@ -39,7 +38,6 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
@ -59,7 +57,6 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class JdbcScanNode extends ExternalScanNode {
private static final Logger LOG = LogManager.getLogger(JdbcScanNode.class);
@ -252,12 +249,6 @@ public class JdbcScanNode extends ExternalScanNode {
createScanRangeLocations();
}
@Override
public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> requiredByProjectSlotIdSet)
throws UserException {
createJdbcColumns();
}
@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations = Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));

View File

@ -22,7 +22,6 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
@ -33,7 +32,6 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
@ -53,7 +51,6 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* Full scan of an ODBC table.
@ -117,12 +114,6 @@ public class OdbcScanNode extends ExternalScanNode {
createScanRangeLocations();
}
@Override
public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> requiredByProjectSlotIdSet)
throws UserException {
createOdbcColumns();
}
@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations = Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));

View File

@ -17,7 +17,6 @@
package org.apache.doris.datasource.paimon.source;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
@ -29,7 +28,6 @@ import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
@ -40,7 +38,6 @@ import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPaimonDeletionFileDesc;
import org.apache.doris.thrift.TPaimonFileDesc;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.base.Preconditions;
@ -289,22 +286,6 @@ public class PaimonScanNode extends FileQueryScanNode {
}
}
//When calling 'setPaimonParams' and 'getSplits', the column trimming has not been performed yet,
// Therefore, paimon_column_names is temporarily reset here
@Override
public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
super.updateRequiredSlots(planTranslatorContext, requiredByProjectSlotIdSet);
String cols = desc.getSlots().stream().map(slot -> slot.getColumn().getName())
.collect(Collectors.joining(","));
for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) {
List<TFileRangeDesc> ranges = tScanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges;
for (TFileRangeDesc tFileRangeDesc : ranges) {
tFileRangeDesc.table_format_params.paimon_params.setPaimonColumnNames(cols);
}
}
}
@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return getLocationType(((FileStoreTable) source.getPaimonTable()).location().toString());

View File

@ -46,8 +46,6 @@ import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.es.source.EsScanNode;
@ -274,6 +272,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
throw new AnalysisException("tables with unknown column stats: " + builder);
}
}
for (ScanNode scanNode : context.getScanNodes()) {
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
}
return rootFragment;
}
@ -635,7 +636,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, esScanNode, context)
)
);
Utils.execWithUncheckedException(esScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), esScanNode, dataPartition);
context.addPlanFragment(planFragment);
@ -687,7 +687,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan);
@ -712,7 +711,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context)
)
);
Utils.execWithUncheckedException(jdbcScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), jdbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
@ -736,7 +734,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context)
)
);
Utils.execWithUncheckedException(odbcScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), odbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
@ -817,8 +814,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
olapScanNode.setUseTopnOpt(true);
context.getTopnFilterContext().addLegacyTarget(olapScan, olapScanNode);
}
// TODO: we need to remove all finalizeForNereids
olapScanNode.finalizeForNereids();
// Create PlanFragment
// TODO: use a util function to convert distribution to DataPartition
DataPartition dataPartition = DataPartition.RANDOM;
@ -908,7 +903,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
context.addScanNode(scanNode);
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan);
context.addPlanFragment(planFragment);
@ -930,7 +924,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
.forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
)
);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
context.addScanNode(scanNode);
// TODO: it is weird update label in this way
@ -1976,6 +1969,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
requiredSlotIdSet.add(lastSlot.getId());
}
((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet);
}
updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet,
requiredByProjectSlotIdSet, slotIdsByOrder, context);
@ -2443,22 +2437,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
if (scanNode.getTupleDesc().getSlots().isEmpty()) {
scanNode.getTupleDesc().getSlots().add(smallest);
}
try {
if (context.getSessionVariable() != null
&& context.getSessionVariable().forbidUnknownColStats
&& !StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
for (SlotId slotId : requiredByProjectSlotIdSet) {
if (context.isColumnStatsUnknown(scanNode, slotId)) {
String colName = scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
throw new AnalysisException("meet unknown column stats: " + colName);
}
if (context.getSessionVariable() != null
&& context.getSessionVariable().forbidUnknownColStats
&& !StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
for (SlotId slotId : requiredByProjectSlotIdSet) {
if (context.isColumnStatsUnknown(scanNode, slotId)) {
String colName = scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
throw new AnalysisException("meet unknown column stats: " + colName);
}
context.removeScanFromStatsUnknownColumnsMap(scanNode);
}
scanNode.updateRequiredSlots(context, requiredByProjectSlotIdSet);
} catch (UserException e) {
Util.logAndThrowRuntimeException(LOG,
"User Exception while reset external file scan node contexts.", e);
context.removeScanFromStatsUnknownColumnsMap(scanNode);
}
}

View File

@ -1720,7 +1720,11 @@ public class OlapScanNode extends ScanNode {
: Sets.newTreeSet();
}
@Override
/**
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
* Currently, it is only used by ExternalFileScanNode, add the interface here to keep the Nereids code clean.
*/
public void updateRequiredSlots(PlanTranslatorContext context,
Set<SlotId> requiredByProjectSlotIdSet) {
outputColumnUniqueIds.clear();

View File

@ -48,7 +48,6 @@ import org.apache.doris.datasource.FileScanNode;
import org.apache.doris.datasource.SplitAssignment;
import org.apache.doris.datasource.SplitGenerator;
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.query.StatsDelta;
@ -169,15 +168,6 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator {
return false;
}
/**
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
* Currently, it is only used by ExternalFileScanNode, add the interface here to keep the Nereids code clean.
*/
public void updateRequiredSlots(PlanTranslatorContext context,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
}
private void computeColumnFilter(Column column, SlotDescriptor slotDesc, PartitionInfo partitionsInfo) {
// Set `columnFilters` all the time because `DistributionPruner` also use this.
// Maybe we could use `columnNameToRange` for `DistributionPruner` and