[feature](multi catalog nereids)Add file scan node to nereids. (#15201)

Add file scan node to nereids, so that the new planner could support external hms table.
This commit is contained in:
Jibing-Li
2022-12-29 10:31:11 +08:00
committed by GitHub
parent ad3b5cbf94
commit 0e154feeb9
24 changed files with 856 additions and 61 deletions

View File

@ -127,6 +127,8 @@ public interface TableIf {
BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info);
long estimatedRowCount();
/**
* Doris table type.
*/

View File

@ -305,6 +305,11 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
throw new NotImplementedException();
}
@Override
public long estimatedRowCount() {
return 1;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);

View File

@ -194,6 +194,7 @@ public class HMSExternalTable extends ExternalTable {
@Override
public boolean isView() {
makeSureInitialized();
return remoteTable.isSetViewOriginalText() || remoteTable.isSetViewExpandedText();
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.nereids.properties.DistributionSpecReplicated;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
@ -115,6 +116,12 @@ public class CostCalculator {
costEstimate.getNetworkCost(), costEstimate.getPenalty());
}
@Override
public CostEstimate visitPhysicalFileScan(PhysicalFileScan physicalFileScan, PlanContext context) {
StatsDeriveResult statistics = context.getStatisticsWithCheck();
return CostEstimate.ofCpu(statistics.getRowCount());
}
@Override
public CostEstimate visitPhysicalProject(PhysicalProject<? extends Plan> physicalProject, PlanContext context) {
return CostEstimate.ofCpu(1);

View File

@ -34,6 +34,8 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.DistributionSpecAny;
@ -65,6 +67,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
@ -112,6 +115,7 @@ import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.TableFunctionNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.planner.external.ExternalFileScanNode;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPushAggOp;
@ -514,6 +518,28 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
return planFragment;
}
@Override
public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTranslatorContext context) {
List<Slot> slotList = fileScan.getOutput();
ExternalTable table = fileScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, table, context);
tupleDescriptor.setTable(table);
ExternalFileScanNode fileScanNode = new ExternalFileScanNode(context.nextPlanNodeId(), tupleDescriptor);
TableName tableName = new TableName(null, "", "");
TableRef ref = new TableRef(tableName, null, null);
BaseTableRef tableRef = new BaseTableRef(ref, table, tableName);
tupleDescriptor.setRef(tableRef);
Utils.execWithUncheckedException(fileScanNode::init);
context.addScanNode(fileScanNode);
Utils.execWithUncheckedException(fileScanNode::finalizeForNerieds);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), fileScanNode, dataPartition);
context.addPlanFragment(planFragment);
return planFragment;
}
@Override
public PlanFragment visitPhysicalTVFRelation(PhysicalTVFRelation tvfRelation, PlanTranslatorContext context) {
List<Slot> slots = tvfRelation.getLogicalProperties().getOutput();
@ -1406,7 +1432,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
}
private TupleDescriptor generateTupleDesc(List<Slot> slotList, Table table, PlanTranslatorContext context) {
private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table, PlanTranslatorContext context) {
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
tupleDescriptor.setTable(table);
for (Slot slot : slotList) {

View File

@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFun
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
@ -233,6 +234,11 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
}
}
@Override
public PhysicalProperties visitPhysicalFileScan(PhysicalFileScan fileScan, PlanContext context) {
return PhysicalProperties.ANY;
}
@Override
public PhysicalProperties visitPhysicalStorageLayerAggregate(
PhysicalStorageLayerAggregate storageLayerAggregate, PlanContext context) {

View File

@ -29,6 +29,7 @@ import org.apache.doris.nereids.rules.exploration.join.SemiJoinSemiJoinTranspose
import org.apache.doris.nereids.rules.implementation.LogicalAssertNumRowsToPhysicalAssertNumRows;
import org.apache.doris.nereids.rules.implementation.LogicalEmptyRelationToPhysicalEmptyRelation;
import org.apache.doris.nereids.rules.implementation.LogicalExceptToPhysicalExcept;
import org.apache.doris.nereids.rules.implementation.LogicalFileScanToPhysicalFileScan;
import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
import org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate;
import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
@ -105,6 +106,7 @@ public class RuleSet {
.add(new LogicalJoinToNestedLoopJoin())
.add(new LogicalOlapScanToPhysicalOlapScan())
.add(new LogicalSchemaScanToPhysicalSchemaScan())
.add(new LogicalFileScanToPhysicalFileScan())
.add(new LogicalProjectToPhysicalProject())
.add(new LogicalLimitToPhysicalLimit())
.add(new LogicalSortToPhysicalQuickSort())

View File

@ -208,6 +208,7 @@ public enum RuleType {
LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_SCAN_TO_PHYSICAL_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_SCHEMA_SCAN_TO_PHYSICAL_SCHEMA_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ASSERT_NUM_ROWS_TO_PHYSICAL_ASSERT_NUM_ROWS(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITH_PROJECT(RuleTypeClass.IMPLEMENTATION),

View File

@ -17,10 +17,15 @@
package org.apache.doris.nereids.rules.analysis;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -29,6 +34,7 @@ import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
@ -64,12 +70,32 @@ public class BindRelation extends OneAnalysisRuleFactory {
// Use database name from table name parts.
return bindWithDbNameFromNamePart(ctx.cascadesContext, ctx.root);
}
case 3: { // catalog.db.table
// Use catalog and database name from name parts.
return bindWithCatalogNameFromNamePart(ctx.cascadesContext, ctx.root);
}
default:
throw new IllegalStateException("Table name [" + ctx.root.getTableName() + "] is invalid.");
}
}).toRule(RuleType.BINDING_RELATION);
}
private TableIf getTable(String catalogName, String dbName, String tableName, Env env) {
CatalogIf catalog = env.getCatalogMgr().getCatalog(catalogName);
if (catalog == null) {
throw new RuntimeException(String.format("Catalog %s does not exist.", catalogName));
}
DatabaseIf<TableIf> db = null;
try {
db = (DatabaseIf<TableIf>) catalog.getDb(dbName)
.orElseThrow(() -> new RuntimeException("Database [" + dbName + "] does not exist."));
} catch (Throwable e) {
throw new RuntimeException(e);
}
return db.getTable(tableName).orElseThrow(() -> new RuntimeException(
"Table [" + tableName + "] does not exist in database [" + dbName + "]."));
}
private LogicalPlan bindWithCurrentDb(CascadesContext cascadesContext, UnboundRelation unboundRelation) {
String tableName = unboundRelation.getNameParts().get(0);
// check if it is a CTE's name
@ -83,52 +109,64 @@ public class BindRelation extends OneAnalysisRuleFactory {
}
return new LogicalSubQueryAlias<>(tableName, ctePlan);
}
String catalogName = cascadesContext.getConnectContext().getCurrentCatalog().getName();
String dbName = cascadesContext.getConnectContext().getDatabase();
Table table = cascadesContext.getTable(dbName, tableName, cascadesContext.getConnectContext().getEnv());
TableIf table = getTable(catalogName, dbName, tableName, cascadesContext.getConnectContext().getEnv());
// TODO: should generate different Scan sub class according to table's type
List<Long> partIds = getPartitionIds(table, unboundRelation);
if (table.getType() == TableType.OLAP) {
if (!CollectionUtils.isEmpty(partIds)) {
return new LogicalOlapScan(RelationUtil.newRelationId(),
(OlapTable) table, ImmutableList.of(dbName), partIds);
} else {
return new LogicalOlapScan(RelationUtil.newRelationId(),
(OlapTable) table, ImmutableList.of(dbName));
}
} else if (table.getType() == TableType.VIEW) {
Plan viewPlan = parseAndAnalyzeView(table.getDdlSql(), cascadesContext);
return new LogicalSubQueryAlias<>(table.getName(), viewPlan);
} else if (table.getType() == TableType.SCHEMA) {
return new LogicalSchemaScan(RelationUtil.newRelationId(), table, ImmutableList.of(dbName));
}
throw new AnalysisException("Unsupported tableType:" + table.getType());
return getLogicalPlan(table, unboundRelation, dbName, cascadesContext);
}
private LogicalPlan bindWithDbNameFromNamePart(CascadesContext cascadesContext, UnboundRelation unboundRelation) {
List<String> nameParts = unboundRelation.getNameParts();
ConnectContext connectContext = cascadesContext.getConnectContext();
String catalogName = cascadesContext.getConnectContext().getCurrentCatalog().getName();
// if the relation is view, nameParts.get(0) is dbName.
String dbName = nameParts.get(0);
if (!dbName.equals(connectContext.getDatabase())) {
dbName = connectContext.getClusterName() + ":" + dbName;
}
Table table = cascadesContext.getTable(dbName, nameParts.get(1), connectContext.getEnv());
List<Long> partIds = getPartitionIds(table, unboundRelation);
if (table.getType() == TableType.OLAP) {
if (!CollectionUtils.isEmpty(partIds)) {
return new LogicalOlapScan(RelationUtil.newRelationId(), (OlapTable) table,
ImmutableList.of(dbName), partIds);
} else {
return new LogicalOlapScan(RelationUtil.newRelationId(), (OlapTable) table, ImmutableList.of(dbName));
}
} else if (table.getType() == TableType.VIEW) {
Plan viewPlan = parseAndAnalyzeView(table.getDdlSql(), cascadesContext);
return new LogicalSubQueryAlias<>(table.getName(), viewPlan);
} else if (table.getType() == TableType.SCHEMA) {
return new LogicalSchemaScan(RelationUtil.newRelationId(), table, ImmutableList.of(dbName));
String tableName = nameParts.get(1);
TableIf table = getTable(catalogName, dbName, tableName, connectContext.getEnv());
return getLogicalPlan(table, unboundRelation, dbName, cascadesContext);
}
private LogicalPlan bindWithCatalogNameFromNamePart(CascadesContext cascadesContext,
UnboundRelation unboundRelation) {
List<String> nameParts = unboundRelation.getNameParts();
ConnectContext connectContext = cascadesContext.getConnectContext();
String catalogName = nameParts.get(0);
String dbName = nameParts.get(1);
if (!dbName.equals(connectContext.getDatabase())) {
dbName = connectContext.getClusterName() + ":" + dbName;
}
String tableName = nameParts.get(2);
TableIf table = getTable(catalogName, dbName, tableName, connectContext.getEnv());
return getLogicalPlan(table, unboundRelation, dbName, cascadesContext);
}
private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelation, String dbName,
CascadesContext cascadesContext) {
switch (table.getType()) {
case OLAP:
List<Long> partIds = getPartitionIds(table, unboundRelation);
if (!CollectionUtils.isEmpty(partIds)) {
return new LogicalOlapScan(RelationUtil.newRelationId(),
(OlapTable) table, ImmutableList.of(dbName), partIds);
} else {
return new LogicalOlapScan(RelationUtil.newRelationId(),
(OlapTable) table, ImmutableList.of(dbName));
}
case VIEW:
Plan viewPlan = parseAndAnalyzeView(((View) table).getDdlSql(), cascadesContext);
return new LogicalSubQueryAlias<>(table.getName(), viewPlan);
case HMS_EXTERNAL_TABLE:
return new LogicalFileScan(cascadesContext.getStatementContext().getNextRelationId(),
(HMSExternalTable) table, ImmutableList.of(dbName));
case SCHEMA:
return new LogicalSchemaScan(RelationUtil.newRelationId(), (Table) table, ImmutableList.of(dbName));
default:
throw new AnalysisException("Unsupported tableType:" + table.getType());
}
throw new AnalysisException("Unsupported tableType:" + table.getType());
}
private Plan parseAndAnalyzeView(String viewSql, CascadesContext parentContext) {
@ -141,12 +179,12 @@ public class BindRelation extends OneAnalysisRuleFactory {
return viewContext.getMemo().copyOut(false);
}
private List<Long> getPartitionIds(Table t, UnboundRelation unboundRelation) {
private List<Long> getPartitionIds(TableIf t, UnboundRelation unboundRelation) {
List<String> parts = unboundRelation.getPartNames();
if (CollectionUtils.isEmpty(parts)) {
return Collections.emptyList();
}
if (!t.getType().equals(TableType.OLAP)) {
if (!t.getType().equals(TableIf.TableType.OLAP)) {
throw new IllegalStateException(String.format(
"Only OLAP table is support select by partition for now,"
+ "Table: %s is not OLAP table", t.getName()));

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.nereids.properties.DistributionSpecAny;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import java.util.Optional;
/**
* Implementation rule that convert logical FileScan to physical FileScan.
*/
public class LogicalFileScanToPhysicalFileScan extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalFileScan().then(fileScan ->
new PhysicalFileScan(
fileScan.getId(),
fileScan.getTable(),
fileScan.getQualifier(),
DistributionSpecAny.INSTANCE,
Optional.empty(),
fileScan.getLogicalProperties())
).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE);
}
}

View File

@ -18,7 +18,7 @@
package org.apache.doris.nereids.stats;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Id;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.memo.GroupExpression;
@ -42,6 +42,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect;
@ -60,6 +61,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
@ -171,6 +173,12 @@ public class StatsCalculator extends DefaultPlanVisitor<StatsDeriveResult, Void>
return new StatsDeriveResult(1);
}
@Override
public StatsDeriveResult visitLogicalFileScan(LogicalFileScan fileScan, Void context) {
fileScan.getExpressions();
return computeScan(fileScan);
}
@Override
public StatsDeriveResult visitLogicalTVFRelation(LogicalTVFRelation tvfRelation, Void context) {
return tvfRelation.getFunction().computeStats(tvfRelation.getOutput());
@ -256,6 +264,11 @@ public class StatsCalculator extends DefaultPlanVisitor<StatsDeriveResult, Void>
return new StatsDeriveResult(1);
}
@Override
public StatsDeriveResult visitPhysicalFileScan(PhysicalFileScan fileScan, Void context) {
return computeScan(fileScan);
}
@Override
public StatsDeriveResult visitPhysicalStorageLayerAggregate(
PhysicalStorageLayerAggregate storageLayerAggregate, Void context) {
@ -359,7 +372,7 @@ public class StatsCalculator extends DefaultPlanVisitor<StatsDeriveResult, Void>
Set<SlotReference> slotSet = scan.getOutput().stream().filter(SlotReference.class::isInstance)
.map(s -> (SlotReference) s).collect(Collectors.toSet());
Map<Id, ColumnStatistic> columnStatisticMap = new HashMap<>();
Table table = scan.getTable();
TableIf table = scan.getTable();
double rowCount = scan.getTable().estimatedRowCount();
for (SlotReference slotReference : slotSet) {
String colName = slotReference.getName();
@ -370,7 +383,6 @@ public class StatsCalculator extends DefaultPlanVisitor<StatsDeriveResult, Void>
Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(table.getId(), colName);
if (!colStats.isUnKnown) {
rowCount = colStats.count;
}
columnStatisticMap.put(slotReference.getExprId(), colStats);
}

View File

@ -43,6 +43,7 @@ public enum PlanType {
LOGICAL_LIMIT,
LOGICAL_OLAP_SCAN,
LOGICAL_SCHEMA_SCAN,
LOGICAL_FILE_SCAN,
LOGICAL_APPLY,
LOGICAL_SELECT_HINT,
LOGICAL_ASSERT_NUM_ROWS,
@ -59,6 +60,7 @@ public enum PlanType {
PHYSICAL_EMPTY_RELATION,
PHYSICAL_ONE_ROW_RELATION,
PHYSICAL_OLAP_SCAN,
PHYSICAL_FILE_SCAN,
PHYSICAL_TVF_RELATION,
PHYSICAL_SCHEMA_SCAN,
PHYSICAL_PROJECT,

View File

@ -17,12 +17,12 @@
package org.apache.doris.nereids.trees.plans.algebra;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.analyzer.Relation;
/**
* Common interface for logical/physical scan.
*/
public interface Scan extends Relation {
Table getTable();
TableIf getTable();
}

View File

@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Optional;
/**
* Logical file scan for external catalog.
*/
public class LogicalFileScan extends LogicalRelation {
/**
* Constructor for LogicalFileScan.
*/
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties) {
super(id, PlanType.LOGICAL_FILE_SCAN, table, qualifier,
groupExpression, logicalProperties);
}
public LogicalFileScan(RelationId id, ExternalTable table) {
this(id, table, ImmutableList.of());
}
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier) {
this(id, table, qualifier, Optional.empty(), Optional.empty());
}
@Override
public ExternalTable getTable() {
Preconditions.checkArgument(table instanceof ExternalTable);
return (ExternalTable) table;
}
@Override
public String toString() {
return Utils.toSqlString("LogicalFileScan",
"qualified", qualifiedName(),
"output", getOutput()
);
}
@Override
public LogicalFileScan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalFileScan(id, (ExternalTable) table, qualifier, groupExpression,
Optional.of(getLogicalProperties()));
}
@Override
public LogicalFileScan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
return new LogicalFileScan(id, (ExternalTable) table, qualifier, groupExpression,
logicalProperties);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalFileScan(this, context);
}
}

View File

@ -27,7 +27,6 @@ import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.RelationId;
@ -196,33 +195,33 @@ public class LogicalOlapScan extends LogicalRelation implements CatalogRelation,
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalOlapScan(id, table, qualifier, groupExpression, Optional.of(getLogicalProperties()),
public LogicalOlapScan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalOlapScan(id, (Table) table, qualifier, groupExpression, Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletIds, tabletPruned,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions);
}
@Override
public LogicalOlapScan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
return new LogicalOlapScan(id, table, qualifier, Optional.empty(), logicalProperties,
return new LogicalOlapScan(id, (Table) table, qualifier, Optional.empty(), logicalProperties,
selectedPartitionIds, partitionPruned, selectedTabletIds, tabletPruned,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions);
}
public LogicalOlapScan withSelectedPartitionIds(List<Long> selectedPartitionIds) {
return new LogicalOlapScan(id, table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()),
return new LogicalOlapScan(id, (Table) table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, true, selectedTabletIds, tabletPruned,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions);
}
public LogicalOlapScan withMaterializedIndexSelected(PreAggStatus preAgg, long indexId) {
return new LogicalOlapScan(id, table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()),
return new LogicalOlapScan(id, (Table) table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletIds, tabletPruned,
indexId, true, preAgg, manuallySpecifiedPartitions);
}
public LogicalOlapScan withSelectedTabletIds(List<Long> selectedTabletIds) {
return new LogicalOlapScan(id, table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()),
return new LogicalOlapScan(id, (Table) table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletIds, true,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions);
}

View File

@ -18,7 +18,7 @@
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
@ -42,12 +42,12 @@ import java.util.Optional;
*/
public abstract class LogicalRelation extends LogicalLeaf implements Scan {
protected final Table table;
protected final TableIf table;
protected final ImmutableList<String> qualifier;
protected final RelationId id;
public LogicalRelation(RelationId id, PlanType type, Table table, List<String> qualifier) {
public LogicalRelation(RelationId id, PlanType type, TableIf table, List<String> qualifier) {
this(id, type, table, qualifier, Optional.empty(), Optional.empty());
}
@ -63,7 +63,7 @@ public abstract class LogicalRelation extends LogicalLeaf implements Scan {
* @param table Doris table
* @param qualifier qualified relation name
*/
public LogicalRelation(RelationId id, PlanType type, Table table, List<String> qualifier,
public LogicalRelation(RelationId id, PlanType type, TableIf table, List<String> qualifier,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties) {
super(type, groupExpression, logicalProperties);
this.table = Objects.requireNonNull(table, "table can not be null");
@ -72,7 +72,7 @@ public abstract class LogicalRelation extends LogicalLeaf implements Scan {
}
@Override
public Table getTable() {
public TableIf getTable() {
return table;
}

View File

@ -18,7 +18,7 @@
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.catalog.SchemaTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Slot;
@ -40,13 +40,13 @@ import java.util.Optional;
*/
public class LogicalSchemaScan extends LogicalRelation implements Scan {
public LogicalSchemaScan(RelationId id,
Table table,
TableIf table,
List<String> qualifier) {
super(id, PlanType.LOGICAL_SCHEMA_SCAN, table, qualifier);
}
public LogicalSchemaScan(RelationId id,
Table table,
TableIf table,
List<String> qualifier, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties) {
super(id, PlanType.LOGICAL_SCHEMA_SCAN, table, qualifier, groupExpression, logicalProperties);

View File

@ -0,0 +1,119 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.StatsDeriveResult;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* Physical file scan for external catalog.
*/
public class PhysicalFileScan extends PhysicalRelation {
private final ExternalTable table;
private final DistributionSpec distributionSpec;
/**
* Constructor for PhysicalFileScan.
*/
public PhysicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties) {
super(id, PlanType.PHYSICAL_FILE_SCAN, qualifier, groupExpression, logicalProperties);
this.table = table;
this.distributionSpec = distributionSpec;
}
/**
* Constructor for PhysicalFileScan.
*/
public PhysicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties, PhysicalProperties physicalProperties,
StatsDeriveResult statsDeriveResult) {
super(id, PlanType.PHYSICAL_FILE_SCAN, qualifier, groupExpression, logicalProperties,
physicalProperties, statsDeriveResult);
this.table = table;
this.distributionSpec = distributionSpec;
}
@Override
public String toString() {
return Utils.toSqlString("PhysicalFileScan",
"qualified", Utils.qualifiedName(qualifier, table.getName()),
"output", getOutput(),
"stats", statsDeriveResult
);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass() || !super.equals(o)) {
return false;
}
PhysicalFileScan that = ((PhysicalFileScan) o);
return Objects.equals(table, that.table);
}
@Override
public int hashCode() {
return Objects.hash(id, table);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalFileScan(this, context);
}
@Override
public PhysicalFileScan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalFileScan(id, table, qualifier, distributionSpec, groupExpression, getLogicalProperties());
}
@Override
public PhysicalFileScan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
return new PhysicalFileScan(id, table, qualifier, distributionSpec, groupExpression, logicalProperties.get());
}
@Override
public ExternalTable getTable() {
return table;
}
@Override
public PhysicalFileScan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
StatsDeriveResult statsDeriveResult) {
return new PhysicalFileScan(id, table, qualifier, distributionSpec, groupExpression, getLogicalProperties(),
physicalProperties, statsDeriveResult);
}
}

View File

@ -69,7 +69,7 @@ public class PhysicalStorageLayerAggregate extends PhysicalRelation {
@Override
public Table getTable() {
return relation.getTable();
return (Table) relation.getTable();
}
@Override

View File

@ -32,6 +32,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
@ -57,6 +58,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
@ -168,6 +170,10 @@ public abstract class PlanVisitor<R, C> {
return visitLogicalRelation(schemaScan, context);
}
public R visitLogicalFileScan(LogicalFileScan fileScan, C context) {
return visitLogicalRelation(fileScan, context);
}
public R visitLogicalTVFRelation(LogicalTVFRelation tvfRelation, C context) {
return visitLogicalRelation(tvfRelation, context);
}
@ -261,6 +267,10 @@ public abstract class PlanVisitor<R, C> {
return visitPhysicalScan(schemaScan, context);
}
public R visitPhysicalFileScan(PhysicalFileScan fileScan, C context) {
return visitPhysicalScan(fileScan, context);
}
public R visitPhysicalStorageLayerAggregate(PhysicalStorageLayerAggregate storageLayerAggregate, C context) {
return storageLayerAggregate.getRelation().accept(this, context);
}

View File

@ -192,6 +192,43 @@ public class ExternalFileScanNode extends ExternalScanNode {
initParamCreateContexts(analyzer);
}
/**
* Init ExternalFileScanNode, ONLY used for Nereids. Should NOT use this function in anywhere else.
*/
public void init() throws UserException {
if (!Config.enable_vectorized_load) {
throw new UserException(
"Please set 'enable_vectorized_load=true' in fe.conf to enable external file scan node");
}
switch (type) {
case QUERY:
// prepare for partition prune
// computeColumnFilter();
if (this.desc.getTable() instanceof HMSExternalTable) {
HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable();
initHMSExternalTable(hmsTable);
} else if (this.desc.getTable() instanceof FunctionGenTable) {
FunctionGenTable table = (FunctionGenTable) this.desc.getTable();
initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf());
}
break;
default:
throw new UserException("Unknown type: " + type);
}
backendPolicy.init();
numNodes = backendPolicy.numBackends();
for (FileScanProviderIf scanProvider : scanProviders) {
ParamCreateContext context = scanProvider.createContext(analyzer);
context.createDestSlotMap();
initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer);
initAndSetWhereExpr(context.fileGroup.getWhereExpr(), context.destTupleDescriptor, analyzer);
context.conjuncts = conjuncts;
this.contexts.add(context);
}
}
private void initHMSExternalTable(HMSExternalTable hmsTable) throws UserException {
Preconditions.checkNotNull(hmsTable);
@ -312,6 +349,25 @@ public class ExternalFileScanNode extends ExternalScanNode {
}
}
public void finalizeForNerieds() throws UserException {
Preconditions.checkState(contexts.size() == scanProviders.size(),
contexts.size() + " vs. " + scanProviders.size());
for (int i = 0; i < contexts.size(); ++i) {
ParamCreateContext context = contexts.get(i);
FileScanProviderIf scanProvider = scanProviders.get(i);
setDefaultValueExprs(scanProvider, context);
setColumnPositionMappingForTextFile(scanProvider, context);
finalizeParamsForLoad(context, analyzer);
createScanRangeLocations(context, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
if (scanProvider instanceof HiveScanProvider) {
this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum();
this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum();
}
}
}
private void setColumnPositionMappingForTextFile(FileScanProviderIf scanProvider, ParamCreateContext context)
throws UserException {
if (type != Type.QUERY) {

View File

@ -17,7 +17,7 @@
package org.apache.doris.nereids.jobs;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.GroupExpression;
@ -103,11 +103,11 @@ public class RewriteTopDownJobTest {
private static class LogicalBoundRelation extends LogicalRelation {
public LogicalBoundRelation(Table table, List<String> qualifier) {
public LogicalBoundRelation(TableIf table, List<String> qualifier) {
super(RelationUtil.newRelationId(), PlanType.LOGICAL_BOUND_RELATION, table, qualifier);
}
public LogicalBoundRelation(Table table, List<String> qualifier, Optional<GroupExpression> groupExpression,
public LogicalBoundRelation(TableIf table, List<String> qualifier, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties) {
super(RelationUtil.newRelationId(), PlanType.LOGICAL_BOUND_RELATION, table, qualifier,
groupExpression, logicalProperties);