[fix](hudi) move wrong members in HMSExternalTable (#36187)
Previously, there are 2 members: TableScanParams and IncrementalRelation in HMSExternalTable. These 2 members are for Hudi's incremental query, so their lifecycle should be with query task, should not be saved in HMSExternalTable. This PR mainly changes: - Add LogicalHudiScan and PhysicalHudiScan, extends from LogicalFileScan and PhysicalFileScan. - Move TableScanParams and IncrementalRelation from HMSExternalTable to XXXHudiScan. - Add or modify related Nereids rules
This commit is contained in:
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.analysis.TableScanParams;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.ListPartitionItem;
|
||||
@ -31,23 +30,12 @@ import org.apache.doris.common.Config;
|
||||
import org.apache.doris.datasource.ExternalTable;
|
||||
import org.apache.doris.datasource.SchemaCacheValue;
|
||||
import org.apache.doris.datasource.hudi.HudiUtils;
|
||||
import org.apache.doris.datasource.hudi.source.COWIncrementalRelation;
|
||||
import org.apache.doris.datasource.hudi.source.IncrementalRelation;
|
||||
import org.apache.doris.datasource.hudi.source.MORIncrementalRelation;
|
||||
import org.apache.doris.datasource.iceberg.IcebergUtils;
|
||||
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
|
||||
import org.apache.doris.mtmv.MTMVRelatedTableIf;
|
||||
import org.apache.doris.mtmv.MTMVSnapshotIf;
|
||||
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
|
||||
import org.apache.doris.nereids.exceptions.NotSupportedException;
|
||||
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.GreaterThan;
|
||||
import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
|
||||
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
|
||||
import org.apache.doris.qe.GlobalVariable;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
@ -60,7 +48,6 @@ import org.apache.doris.thrift.THiveTable;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
import org.apache.doris.thrift.TTableType;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
@ -76,7 +63,6 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
|
||||
import org.apache.hadoop.hive.ql.io.AcidUtils;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -162,10 +148,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
// record the event update time when enable hms event listener
|
||||
protected volatile long eventUpdateTime;
|
||||
|
||||
// for hudi incremental read
|
||||
private TableScanParams scanParams = null;
|
||||
private IncrementalRelation incrementalRelation = null;
|
||||
|
||||
public enum DLAType {
|
||||
UNKNOWN, HIVE, HUDI, ICEBERG
|
||||
}
|
||||
@ -305,82 +287,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
.orElse(Collections.emptyList());
|
||||
}
|
||||
|
||||
public TableScanParams getScanParams() {
|
||||
return scanParams;
|
||||
}
|
||||
|
||||
public void setScanParams(TableScanParams scanParams) {
|
||||
if (scanParams != null && scanParams.incrementalRead()) {
|
||||
Map<String, String> optParams = getHadoopProperties();
|
||||
if (scanParams.getParams().containsKey("beginTime")) {
|
||||
optParams.put("hoodie.datasource.read.begin.instanttime", scanParams.getParams().get("beginTime"));
|
||||
}
|
||||
if (scanParams.getParams().containsKey("endTime")) {
|
||||
optParams.put("hoodie.datasource.read.end.instanttime", scanParams.getParams().get("endTime"));
|
||||
}
|
||||
scanParams.getParams().forEach((k, v) -> {
|
||||
if (k.startsWith("hoodie.")) {
|
||||
optParams.put(k, v);
|
||||
}
|
||||
});
|
||||
HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(this);
|
||||
try {
|
||||
boolean isCowOrRoTable = isHoodieCowTable();
|
||||
if (isCowOrRoTable) {
|
||||
Map<String, String> serd = remoteTable.getSd().getSerdeInfo().getParameters();
|
||||
if ("true".equals(serd.get("hoodie.query.as.ro.table"))
|
||||
&& remoteTable.getTableName().endsWith("_ro")) {
|
||||
// Incremental read RO table as RT table, I don't know why?
|
||||
isCowOrRoTable = false;
|
||||
LOG.warn("Execute incremental read on RO table");
|
||||
}
|
||||
}
|
||||
if (isCowOrRoTable) {
|
||||
incrementalRelation = new COWIncrementalRelation(
|
||||
optParams, HiveMetaStoreClientHelper.getConfiguration(this), hudiClient);
|
||||
} else {
|
||||
incrementalRelation = new MORIncrementalRelation(
|
||||
optParams, HiveMetaStoreClientHelper.getConfiguration(this), hudiClient);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to create incremental relation", e);
|
||||
}
|
||||
}
|
||||
this.scanParams = scanParams;
|
||||
}
|
||||
|
||||
public IncrementalRelation getIncrementalRelation() {
|
||||
return incrementalRelation;
|
||||
}
|
||||
|
||||
/**
|
||||
* replace incremental params as AND expression
|
||||
* incr('beginTime'='20240308110257169', 'endTime'='20240308110677278') =>
|
||||
* _hoodie_commit_time >= 20240308110257169 and _hoodie_commit_time <= '20240308110677278'
|
||||
*/
|
||||
public Set<Expression> generateIncrementalExpression(List<Slot> slots) {
|
||||
if (incrementalRelation == null) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
SlotReference timeField = null;
|
||||
for (Slot slot : slots) {
|
||||
if ("_hoodie_commit_time".equals(slot.getName())) {
|
||||
timeField = (SlotReference) slot;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (timeField == null) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
StringLiteral upperValue = new StringLiteral(incrementalRelation.getEndTs());
|
||||
StringLiteral lowerValue = new StringLiteral(incrementalRelation.getStartTs());
|
||||
ComparisonPredicate less = new LessThanEqual(timeField, upperValue);
|
||||
ComparisonPredicate great = incrementalRelation.isIncludeStartTime()
|
||||
? new GreaterThanEqual(timeField, lowerValue)
|
||||
: new GreaterThan(timeField, lowerValue);
|
||||
return ImmutableSet.of(great, less);
|
||||
}
|
||||
|
||||
public boolean isHiveTransactionalTable() {
|
||||
return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable)
|
||||
&& isSupportedTransactionalFileFormat();
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.datasource.hudi.source;
|
||||
|
||||
import org.apache.doris.common.CacheFactory;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CacheException;
|
||||
import org.apache.doris.datasource.TablePartitionValues;
|
||||
import org.apache.doris.datasource.TablePartitionValues.TablePartitionKey;
|
||||
@ -163,7 +164,7 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to get hudi partitions", e);
|
||||
throw new CacheException("Failed to get hudi partitions", e);
|
||||
throw new CacheException("Failed to get hudi partitions: " + Util.getRootCauseMessage(e), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -97,9 +97,6 @@ public class HudiScanNode extends HiveScanNode {
|
||||
private List<String> columnNames;
|
||||
private List<String> columnTypes;
|
||||
|
||||
private boolean incrementalRead = false;
|
||||
private IncrementalRelation incrementalRelation;
|
||||
|
||||
private boolean partitionInit = false;
|
||||
private HoodieTimeline timeline;
|
||||
private Option<String> snapshotTimestamp;
|
||||
@ -108,25 +105,32 @@ public class HudiScanNode extends HiveScanNode {
|
||||
private Iterator<HivePartition> prunedPartitionsIter;
|
||||
private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION;
|
||||
|
||||
private boolean incrementalRead = false;
|
||||
private TableScanParams scanParams;
|
||||
private IncrementalRelation incrementalRelation;
|
||||
|
||||
/**
|
||||
* External file scan node for Query Hudi table
|
||||
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
|
||||
* eg: s3 tvf
|
||||
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
|
||||
*/
|
||||
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
|
||||
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv,
|
||||
Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation) {
|
||||
super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv);
|
||||
isCowOrRoTable = hmsTable.isHoodieCowTable();
|
||||
if (isCowOrRoTable) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getName());
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", hmsTable.getName());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (isCowOrRoTable) {
|
||||
LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getFullQualifiers());
|
||||
} else {
|
||||
LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE",
|
||||
hmsTable.getFullQualifiers());
|
||||
}
|
||||
}
|
||||
useHiveSyncPartition = hmsTable.useHiveSyncPartition();
|
||||
this.scanParams = scanParams.orElse(null);
|
||||
this.incrementalRelation = incrementalRelation.orElse(null);
|
||||
this.incrementalRead = (this.scanParams != null && this.scanParams.incrementalRead());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -171,17 +175,9 @@ public class HudiScanNode extends HiveScanNode {
|
||||
columnTypes.add(columnType);
|
||||
}
|
||||
|
||||
TableScanParams scanParams = desc.getRef().getScanParams();
|
||||
if (scanParams != null) {
|
||||
throw new UserException("Incremental read should turn on nereids planner");
|
||||
}
|
||||
scanParams = hmsTable.getScanParams();
|
||||
if (scanParams != null) {
|
||||
if (scanParams.incrementalRead()) {
|
||||
incrementalRead = true;
|
||||
} else {
|
||||
throw new UserException("Not support function '" + scanParams.getParamType() + "' in hudi table");
|
||||
}
|
||||
if (scanParams != null && !scanParams.incrementalRead()) {
|
||||
// Only support incremental read
|
||||
throw new UserException("Not support function '" + scanParams.getParamType() + "' in hudi table");
|
||||
}
|
||||
if (incrementalRead) {
|
||||
if (isCowOrRoTable) {
|
||||
@ -191,18 +187,15 @@ public class HudiScanNode extends HiveScanNode {
|
||||
&& hmsTable.getRemoteTable().getTableName().endsWith("_ro")) {
|
||||
// Incremental read RO table as RT table, I don't know why?
|
||||
isCowOrRoTable = false;
|
||||
LOG.warn("Execute incremental read on RO table");
|
||||
LOG.warn("Execute incremental read on RO table: {}", hmsTable.getFullQualifiers());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
incrementalRelation = hmsTable.getIncrementalRelation();
|
||||
if (incrementalRelation == null) {
|
||||
throw new UserException("Failed to create incremental relation");
|
||||
}
|
||||
} else {
|
||||
incrementalRelation = null;
|
||||
}
|
||||
|
||||
timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
|
||||
|
||||
@ -51,6 +51,7 @@ import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.ExternalTable;
|
||||
import org.apache.doris.datasource.es.source.EsScanNode;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
|
||||
import org.apache.doris.datasource.hive.source.HiveScanNode;
|
||||
import org.apache.doris.datasource.hudi.source.HudiScanNode;
|
||||
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
|
||||
@ -121,6 +122,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
|
||||
@ -205,6 +207,7 @@ import com.google.common.collect.Sets;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
@ -566,9 +569,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
ScanNode scanNode;
|
||||
if (table instanceof HMSExternalTable) {
|
||||
switch (((HMSExternalTable) table).getDlaType()) {
|
||||
case HUDI:
|
||||
scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
|
||||
break;
|
||||
case ICEBERG:
|
||||
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
|
||||
IcebergScanNode icebergScanNode = (IcebergScanNode) scanNode;
|
||||
@ -600,32 +600,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
} else {
|
||||
throw new RuntimeException("do not support table type " + table.getType());
|
||||
}
|
||||
scanNode.setNereidsId(fileScan.getId());
|
||||
scanNode.addConjuncts(translateToLegacyConjuncts(fileScan.getConjuncts()));
|
||||
scanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(fileScan.getRelationId()));
|
||||
|
||||
TableName tableName = new TableName(null, "", "");
|
||||
TableRef ref = new TableRef(tableName, null, null);
|
||||
BaseTableRef tableRef = new BaseTableRef(ref, table, tableName);
|
||||
tupleDescriptor.setRef(tableRef);
|
||||
if (fileScan.getStats() != null) {
|
||||
scanNode.setCardinality((long) fileScan.getStats().getRowCount());
|
||||
}
|
||||
Utils.execWithUncheckedException(scanNode::init);
|
||||
context.addScanNode(scanNode);
|
||||
ScanNode finalScanNode = scanNode;
|
||||
context.getRuntimeTranslator().ifPresent(
|
||||
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(fileScan).forEach(
|
||||
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context)
|
||||
)
|
||||
);
|
||||
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
|
||||
// Create PlanFragment
|
||||
DataPartition dataPartition = DataPartition.RANDOM;
|
||||
PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan);
|
||||
context.addPlanFragment(planFragment);
|
||||
updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan);
|
||||
return planFragment;
|
||||
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -671,6 +646,57 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
return planFragment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTranslatorContext context) {
|
||||
List<Slot> slots = fileScan.getOutput();
|
||||
ExternalTable table = fileScan.getTable();
|
||||
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context);
|
||||
|
||||
if (!(table instanceof HMSExternalTable) || ((HMSExternalTable) table).getDlaType() != DLAType.HUDI) {
|
||||
throw new RuntimeException("Invalid table type for Hudi scan: " + table.getType());
|
||||
}
|
||||
Preconditions.checkState(fileScan instanceof PhysicalHudiScan,
|
||||
"Invalid physical scan: " + fileScan.getClass().getSimpleName()
|
||||
+ " for Hudi table");
|
||||
PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
|
||||
ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false,
|
||||
hudiScan.getScanParams(), hudiScan.getIncrementalRelation());
|
||||
|
||||
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileScan, PlanTranslatorContext context,
|
||||
ScanNode scanNode,
|
||||
ExternalTable table, TupleDescriptor tupleDescriptor) {
|
||||
scanNode.setNereidsId(fileScan.getId());
|
||||
scanNode.addConjuncts(translateToLegacyConjuncts(fileScan.getConjuncts()));
|
||||
scanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(fileScan.getRelationId()));
|
||||
|
||||
TableName tableName = new TableName(null, "", "");
|
||||
TableRef ref = new TableRef(tableName, null, null);
|
||||
BaseTableRef tableRef = new BaseTableRef(ref, table, tableName);
|
||||
tupleDescriptor.setRef(tableRef);
|
||||
if (fileScan.getStats() != null) {
|
||||
scanNode.setCardinality((long) fileScan.getStats().getRowCount());
|
||||
}
|
||||
Utils.execWithUncheckedException(scanNode::init);
|
||||
context.addScanNode(scanNode);
|
||||
ScanNode finalScanNode = scanNode;
|
||||
context.getRuntimeTranslator().ifPresent(
|
||||
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(fileScan).forEach(
|
||||
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context)
|
||||
)
|
||||
);
|
||||
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
|
||||
// Create PlanFragment
|
||||
DataPartition dataPartition = DataPartition.RANDOM;
|
||||
PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan);
|
||||
context.addPlanFragment(planFragment);
|
||||
updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan);
|
||||
return planFragment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTranslatorContext context) {
|
||||
List<Slot> slots = jdbcScan.getOutput();
|
||||
|
||||
@ -71,6 +71,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalFileSinkToPhysicalFi
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate;
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalHiveTableSinkToPhysicalHiveTableSink;
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalHudiScanToPhysicalHudiScan;
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink;
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
|
||||
@ -176,6 +177,7 @@ public class RuleSet {
|
||||
.add(new LogicalOlapScanToPhysicalOlapScan())
|
||||
.add(new LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan())
|
||||
.add(new LogicalSchemaScanToPhysicalSchemaScan())
|
||||
.add(new LogicalHudiScanToPhysicalHudiScan())
|
||||
.add(new LogicalFileScanToPhysicalFileScan())
|
||||
.add(new LogicalJdbcScanToPhysicalJdbcScan())
|
||||
.add(new LogicalOdbcScanToPhysicalOdbcScan())
|
||||
|
||||
@ -419,6 +419,7 @@ public enum RuleType {
|
||||
LOGICAL_DEFER_MATERIALIZE_OLAP_SCAN_TO_PHYSICAL_DEFER_MATERIALIZE_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
|
||||
LOGICAL_SCHEMA_SCAN_TO_PHYSICAL_SCHEMA_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
|
||||
LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
|
||||
LOGICAL_HUDI_SCAN_TO_PHYSICAL_HUDI_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
|
||||
LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
|
||||
LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
|
||||
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
|
||||
|
||||
@ -29,6 +29,7 @@ import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.ExternalTable;
|
||||
import org.apache.doris.datasource.es.EsExternalTable;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
|
||||
import org.apache.doris.nereids.CTEContext;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.SqlCacheContext;
|
||||
@ -55,6 +56,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
|
||||
@ -267,10 +269,17 @@ public class BindRelation extends OneAnalysisRuleFactory {
|
||||
Plan hiveViewPlan = parseAndAnalyzeHiveView(hmsTable, hiveCatalog, ddlSql, cascadesContext);
|
||||
return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan);
|
||||
}
|
||||
hmsTable.setScanParams(unboundRelation.getScanParams());
|
||||
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table,
|
||||
qualifierWithoutTableName, unboundRelation.getTableSample(),
|
||||
unboundRelation.getTableSnapshot());
|
||||
if (hmsTable.getDlaType() == DLAType.HUDI) {
|
||||
LogicalHudiScan hudiScan = new LogicalHudiScan(unboundRelation.getRelationId(), hmsTable,
|
||||
qualifierWithoutTableName, unboundRelation.getTableSample(),
|
||||
unboundRelation.getTableSnapshot());
|
||||
hudiScan = hudiScan.withScanParams(hmsTable, unboundRelation.getScanParams());
|
||||
return hudiScan;
|
||||
} else {
|
||||
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table,
|
||||
qualifierWithoutTableName, unboundRelation.getTableSample(),
|
||||
unboundRelation.getTableSnapshot());
|
||||
}
|
||||
case ICEBERG_EXTERNAL_TABLE:
|
||||
case PAIMON_EXTERNAL_TABLE:
|
||||
case MAX_COMPUTE_EXTERNAL_TABLE:
|
||||
|
||||
@ -25,8 +25,8 @@ import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy.RelatedPolicy;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
@ -65,12 +65,11 @@ public class CheckPolicy implements AnalysisRuleFactory {
|
||||
Set<Expression> combineFilter = new LinkedHashSet<>();
|
||||
|
||||
// replace incremental params as AND expression
|
||||
if (relation instanceof LogicalFileScan) {
|
||||
LogicalFileScan fileScan = (LogicalFileScan) relation;
|
||||
if (fileScan.getTable() instanceof HMSExternalTable) {
|
||||
HMSExternalTable hmsTable = (HMSExternalTable) fileScan.getTable();
|
||||
combineFilter.addAll(hmsTable.generateIncrementalExpression(
|
||||
fileScan.getLogicalProperties().getOutput()));
|
||||
if (relation instanceof LogicalHudiScan) {
|
||||
LogicalHudiScan hudiScan = (LogicalHudiScan) relation;
|
||||
if (hudiScan.getTable() instanceof HMSExternalTable) {
|
||||
combineFilter.addAll(hudiScan.generateIncrementalExpression(
|
||||
hudiScan.getLogicalProperties().getOutput()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.nereids.rules.implementation;
|
||||
import org.apache.doris.nereids.properties.DistributionSpecAny;
|
||||
import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
|
||||
|
||||
import java.util.Optional;
|
||||
@ -30,7 +31,7 @@ import java.util.Optional;
|
||||
public class LogicalFileScanToPhysicalFileScan extends OneImplementationRuleFactory {
|
||||
@Override
|
||||
public Rule build() {
|
||||
return logicalFileScan().then(fileScan ->
|
||||
return logicalFileScan().when(plan -> !(plan instanceof LogicalHudiScan)).then(fileScan ->
|
||||
new PhysicalFileScan(
|
||||
fileScan.getRelationId(),
|
||||
fileScan.getTable(),
|
||||
|
||||
@ -0,0 +1,49 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.rules.implementation;
|
||||
|
||||
import org.apache.doris.nereids.properties.DistributionSpecAny;
|
||||
import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Implementation rule that convert logical HudiScan to physical HudiScan.
|
||||
*/
|
||||
public class LogicalHudiScanToPhysicalHudiScan extends OneImplementationRuleFactory {
|
||||
@Override
|
||||
public Rule build() {
|
||||
return logicalHudiScan().then(fileScan ->
|
||||
new PhysicalHudiScan(
|
||||
fileScan.getRelationId(),
|
||||
fileScan.getTable(),
|
||||
fileScan.getQualifier(),
|
||||
DistributionSpecAny.INSTANCE,
|
||||
Optional.empty(),
|
||||
fileScan.getLogicalProperties(),
|
||||
fileScan.getConjuncts(),
|
||||
fileScan.getSelectedPartitions(),
|
||||
fileScan.getTableSample(),
|
||||
fileScan.getTableSnapshot(),
|
||||
fileScan.getScanParams(),
|
||||
fileScan.getIncrementalRelation())
|
||||
).toRule(RuleType.LOGICAL_HUDI_SCAN_TO_PHYSICAL_HUDI_SCAN_RULE);
|
||||
}
|
||||
}
|
||||
@ -68,6 +68,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
|
||||
@ -317,6 +318,11 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
|
||||
return computeCatalogRelation(fileScan);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Statistics visitLogicalHudiScan(LogicalHudiScan fileScan, Void context) {
|
||||
return computeCatalogRelation(fileScan);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Statistics visitLogicalTVFRelation(LogicalTVFRelation tvfRelation, Void context) {
|
||||
return tvfRelation.getFunction().computeStats(tvfRelation.getOutput());
|
||||
|
||||
@ -90,6 +90,7 @@ public enum PlanType {
|
||||
PHYSICAL_EMPTY_RELATION,
|
||||
PHYSICAL_ES_SCAN,
|
||||
PHYSICAL_FILE_SCAN,
|
||||
PHYSICAL_HUDI_SCAN,
|
||||
PHYSICAL_JDBC_SCAN,
|
||||
PHYSICAL_ODBC_SCAN,
|
||||
PHYSICAL_ONE_ROW_RELATION,
|
||||
|
||||
@ -45,14 +45,14 @@ import java.util.Set;
|
||||
*/
|
||||
public class LogicalFileScan extends LogicalExternalRelation {
|
||||
|
||||
private final SelectedPartitions selectedPartitions;
|
||||
private final Optional<TableSample> tableSample;
|
||||
private final Optional<TableSnapshot> tableSnapshot;
|
||||
protected final SelectedPartitions selectedPartitions;
|
||||
protected final Optional<TableSample> tableSample;
|
||||
protected final Optional<TableSnapshot> tableSnapshot;
|
||||
|
||||
/**
|
||||
* Constructor for LogicalFileScan.
|
||||
*/
|
||||
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
|
||||
protected LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
|
||||
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
|
||||
Set<Expression> conjuncts, SelectedPartitions selectedPartitions, Optional<TableSample> tableSample,
|
||||
Optional<TableSnapshot> tableSnapshot) {
|
||||
|
||||
@ -0,0 +1,226 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.logical;
|
||||
|
||||
import org.apache.doris.analysis.TableScanParams;
|
||||
import org.apache.doris.analysis.TableSnapshot;
|
||||
import org.apache.doris.datasource.ExternalTable;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.datasource.hudi.source.COWIncrementalRelation;
|
||||
import org.apache.doris.datasource.hudi.source.IncrementalRelation;
|
||||
import org.apache.doris.datasource.hudi.source.MORIncrementalRelation;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.trees.TableSample;
|
||||
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.GreaterThan;
|
||||
import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
|
||||
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.RelationId;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Logical Hudi scan for Hudi table
|
||||
*/
|
||||
public class LogicalHudiScan extends LogicalFileScan {
|
||||
private static final Logger LOG = LogManager.getLogger(LogicalHudiScan.class);
|
||||
|
||||
// for hudi incremental read
|
||||
private final Optional<TableScanParams> scanParams;
|
||||
private final Optional<IncrementalRelation> incrementalRelation;
|
||||
|
||||
/**
|
||||
* Constructor for LogicalHudiScan.
|
||||
*/
|
||||
protected LogicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier,
|
||||
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
|
||||
Set<Expression> conjuncts, SelectedPartitions selectedPartitions, Optional<TableSample> tableSample,
|
||||
Optional<TableSnapshot> tableSnapshot,
|
||||
Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation) {
|
||||
super(id, table, qualifier, groupExpression, logicalProperties, conjuncts,
|
||||
selectedPartitions, tableSample, tableSnapshot);
|
||||
Objects.requireNonNull(scanParams, "scanParams should not null");
|
||||
Objects.requireNonNull(incrementalRelation, "incrementalRelation should not null");
|
||||
this.scanParams = scanParams;
|
||||
this.incrementalRelation = incrementalRelation;
|
||||
}
|
||||
|
||||
public LogicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier,
|
||||
Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) {
|
||||
this(id, table, qualifier, Optional.empty(), Optional.empty(),
|
||||
Sets.newHashSet(), SelectedPartitions.NOT_PRUNED, tableSample, tableSnapshot,
|
||||
Optional.empty(), Optional.empty());
|
||||
}
|
||||
|
||||
public Optional<TableScanParams> getScanParams() {
|
||||
return scanParams;
|
||||
}
|
||||
|
||||
public Optional<IncrementalRelation> getIncrementalRelation() {
|
||||
return incrementalRelation;
|
||||
}
|
||||
|
||||
/**
|
||||
* replace incremental params as AND expression
|
||||
* incr('beginTime'='20240308110257169', 'endTime'='20240308110677278') =>
|
||||
* _hoodie_commit_time >= 20240308110257169 and _hoodie_commit_time <= '20240308110677278'
|
||||
*/
|
||||
public Set<Expression> generateIncrementalExpression(List<Slot> slots) {
|
||||
if (!incrementalRelation.isPresent()) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
SlotReference timeField = null;
|
||||
for (Slot slot : slots) {
|
||||
if ("_hoodie_commit_time".equals(slot.getName())) {
|
||||
timeField = (SlotReference) slot;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (timeField == null) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
StringLiteral upperValue = new StringLiteral(incrementalRelation.get().getEndTs());
|
||||
StringLiteral lowerValue = new StringLiteral(incrementalRelation.get().getStartTs());
|
||||
ComparisonPredicate less = new LessThanEqual(timeField, upperValue);
|
||||
ComparisonPredicate great = incrementalRelation.get().isIncludeStartTime()
|
||||
? new GreaterThanEqual(timeField, lowerValue)
|
||||
: new GreaterThan(timeField, lowerValue);
|
||||
return ImmutableSet.of(great, less);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toSqlString("LogicalHudiScan",
|
||||
"qualified", qualifiedName(),
|
||||
"output", getOutput()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogicalHudiScan withGroupExpression(Optional<GroupExpression> groupExpression) {
|
||||
return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, groupExpression,
|
||||
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample, tableSnapshot,
|
||||
scanParams, incrementalRelation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
|
||||
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
|
||||
return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier,
|
||||
groupExpression, logicalProperties, conjuncts, selectedPartitions, tableSample, tableSnapshot,
|
||||
scanParams, incrementalRelation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogicalHudiScan withConjuncts(Set<Expression> conjuncts) {
|
||||
return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, Optional.empty(),
|
||||
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample, tableSnapshot,
|
||||
scanParams, incrementalRelation);
|
||||
}
|
||||
|
||||
public LogicalHudiScan withSelectedPartitions(SelectedPartitions selectedPartitions) {
|
||||
return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, Optional.empty(),
|
||||
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample, tableSnapshot,
|
||||
scanParams, incrementalRelation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogicalHudiScan withRelationId(RelationId relationId) {
|
||||
return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, Optional.empty(),
|
||||
Optional.empty(), conjuncts, selectedPartitions, tableSample, tableSnapshot,
|
||||
scanParams, incrementalRelation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
|
||||
return visitor.visitLogicalHudiScan(this, context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set scan params for incremental read
|
||||
*
|
||||
* @param table should be hudi table
|
||||
* @param scanParams including incremental read params
|
||||
*/
|
||||
public LogicalHudiScan withScanParams(HMSExternalTable table, TableScanParams scanParams) {
|
||||
Optional<IncrementalRelation> newIncrementalRelation = Optional.empty();
|
||||
Optional<TableScanParams> newScanParams = Optional.empty();
|
||||
if (scanParams != null && scanParams.incrementalRead()) {
|
||||
Map<String, String> optParams = table.getHadoopProperties();
|
||||
if (scanParams.getParams().containsKey("beginTime")) {
|
||||
optParams.put("hoodie.datasource.read.begin.instanttime", scanParams.getParams().get("beginTime"));
|
||||
}
|
||||
if (scanParams.getParams().containsKey("endTime")) {
|
||||
optParams.put("hoodie.datasource.read.end.instanttime", scanParams.getParams().get("endTime"));
|
||||
}
|
||||
scanParams.getParams().forEach((k, v) -> {
|
||||
if (k.startsWith("hoodie.")) {
|
||||
optParams.put(k, v);
|
||||
}
|
||||
});
|
||||
HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(table);
|
||||
try {
|
||||
boolean isCowOrRoTable = table.isHoodieCowTable();
|
||||
if (isCowOrRoTable) {
|
||||
Map<String, String> serd = table.getRemoteTable().getSd().getSerdeInfo().getParameters();
|
||||
if ("true".equals(serd.get("hoodie.query.as.ro.table"))
|
||||
&& table.getRemoteTable().getTableName().endsWith("_ro")) {
|
||||
// Incremental read RO table as RT table, I don't know why?
|
||||
isCowOrRoTable = false;
|
||||
LOG.warn("Execute incremental read on RO table: {}", table.getFullQualifiers());
|
||||
}
|
||||
}
|
||||
if (isCowOrRoTable) {
|
||||
newIncrementalRelation = Optional.of(new COWIncrementalRelation(
|
||||
optParams, HiveMetaStoreClientHelper.getConfiguration(table), hudiClient));
|
||||
} else {
|
||||
newIncrementalRelation = Optional.of(new MORIncrementalRelation(
|
||||
optParams, HiveMetaStoreClientHelper.getConfiguration(table), hudiClient));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new AnalysisException(
|
||||
"Failed to create incremental relation for table: " + table.getFullQualifiers(), e);
|
||||
}
|
||||
}
|
||||
newScanParams = Optional.ofNullable(scanParams);
|
||||
return new LogicalHudiScan(relationId, table, qualifier, Optional.empty(),
|
||||
Optional.empty(), conjuncts, selectedPartitions, tableSample, tableSnapshot,
|
||||
newScanParams, newIncrementalRelation);
|
||||
}
|
||||
}
|
||||
@ -42,11 +42,11 @@ import java.util.Set;
|
||||
*/
|
||||
public class PhysicalFileScan extends PhysicalCatalogRelation {
|
||||
|
||||
private final DistributionSpec distributionSpec;
|
||||
private final Set<Expression> conjuncts;
|
||||
private final SelectedPartitions selectedPartitions;
|
||||
private final Optional<TableSample> tableSample;
|
||||
private final Optional<TableSnapshot> tableSnapshot;
|
||||
protected final DistributionSpec distributionSpec;
|
||||
protected final Set<Expression> conjuncts;
|
||||
protected final SelectedPartitions selectedPartitions;
|
||||
protected final Optional<TableSample> tableSample;
|
||||
protected final Optional<TableSnapshot> tableSnapshot;
|
||||
|
||||
/**
|
||||
* Constructor for PhysicalFileScan.
|
||||
@ -56,12 +56,8 @@ public class PhysicalFileScan extends PhysicalCatalogRelation {
|
||||
LogicalProperties logicalProperties, Set<Expression> conjuncts,
|
||||
SelectedPartitions selectedPartitions, Optional<TableSample> tableSample,
|
||||
Optional<TableSnapshot> tableSnapshot) {
|
||||
super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, groupExpression, logicalProperties);
|
||||
this.distributionSpec = distributionSpec;
|
||||
this.conjuncts = conjuncts;
|
||||
this.selectedPartitions = selectedPartitions;
|
||||
this.tableSample = tableSample;
|
||||
this.tableSnapshot = tableSnapshot;
|
||||
this(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, distributionSpec, groupExpression,
|
||||
logicalProperties, conjuncts, selectedPartitions, tableSample, tableSnapshot);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -72,7 +68,33 @@ public class PhysicalFileScan extends PhysicalCatalogRelation {
|
||||
LogicalProperties logicalProperties, PhysicalProperties physicalProperties,
|
||||
Statistics statistics, Set<Expression> conjuncts, SelectedPartitions selectedPartitions,
|
||||
Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) {
|
||||
super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, groupExpression, logicalProperties,
|
||||
this(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, distributionSpec, groupExpression,
|
||||
logicalProperties, physicalProperties, statistics, conjuncts, selectedPartitions, tableSample,
|
||||
tableSnapshot);
|
||||
}
|
||||
|
||||
/**
|
||||
* For hudi file scan to specified PlanTye
|
||||
*/
|
||||
protected PhysicalFileScan(RelationId id, PlanType type, ExternalTable table, List<String> qualifier,
|
||||
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
|
||||
LogicalProperties logicalProperties, Set<Expression> conjuncts,
|
||||
SelectedPartitions selectedPartitions, Optional<TableSample> tableSample,
|
||||
Optional<TableSnapshot> tableSnapshot) {
|
||||
super(id, type, table, qualifier, groupExpression, logicalProperties);
|
||||
this.distributionSpec = distributionSpec;
|
||||
this.conjuncts = conjuncts;
|
||||
this.selectedPartitions = selectedPartitions;
|
||||
this.tableSample = tableSample;
|
||||
this.tableSnapshot = tableSnapshot;
|
||||
}
|
||||
|
||||
protected PhysicalFileScan(RelationId id, PlanType type, ExternalTable table, List<String> qualifier,
|
||||
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
|
||||
LogicalProperties logicalProperties, PhysicalProperties physicalProperties,
|
||||
Statistics statistics, Set<Expression> conjuncts, SelectedPartitions selectedPartitions,
|
||||
Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) {
|
||||
super(id, type, table, qualifier, groupExpression, logicalProperties,
|
||||
physicalProperties, statistics);
|
||||
this.distributionSpec = distributionSpec;
|
||||
this.conjuncts = conjuncts;
|
||||
|
||||
@ -0,0 +1,133 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.physical;
|
||||
|
||||
import org.apache.doris.analysis.TableScanParams;
|
||||
import org.apache.doris.analysis.TableSnapshot;
|
||||
import org.apache.doris.datasource.ExternalTable;
|
||||
import org.apache.doris.datasource.hudi.source.IncrementalRelation;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.DistributionSpec;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.TableSample;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.RelationId;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
import org.apache.doris.statistics.Statistics;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Physical Hudi scan for Hudi table.
|
||||
*/
|
||||
public class PhysicalHudiScan extends PhysicalFileScan {
|
||||
|
||||
// for hudi incremental read
|
||||
private final Optional<TableScanParams> scanParams;
|
||||
private final Optional<IncrementalRelation> incrementalRelation;
|
||||
|
||||
/**
|
||||
* Constructor for PhysicalHudiScan.
|
||||
*/
|
||||
public PhysicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier,
|
||||
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
|
||||
LogicalProperties logicalProperties, Set<Expression> conjuncts,
|
||||
SelectedPartitions selectedPartitions, Optional<TableSample> tableSample,
|
||||
Optional<TableSnapshot> tableSnapshot,
|
||||
Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation) {
|
||||
super(id, PlanType.PHYSICAL_HUDI_SCAN, table, qualifier, distributionSpec, groupExpression, logicalProperties,
|
||||
conjuncts, selectedPartitions, tableSample, tableSnapshot);
|
||||
Objects.requireNonNull(scanParams, "scanParams should not null");
|
||||
Objects.requireNonNull(incrementalRelation, "incrementalRelation should not null");
|
||||
this.scanParams = scanParams;
|
||||
this.incrementalRelation = incrementalRelation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for PhysicalHudiScan.
|
||||
*/
|
||||
public PhysicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier,
|
||||
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
|
||||
LogicalProperties logicalProperties, PhysicalProperties physicalProperties,
|
||||
Statistics statistics, Set<Expression> conjuncts, SelectedPartitions selectedPartitions,
|
||||
Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot,
|
||||
Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation) {
|
||||
super(id, PlanType.PHYSICAL_HUDI_SCAN, table, qualifier, distributionSpec, groupExpression, logicalProperties,
|
||||
physicalProperties, statistics, conjuncts, selectedPartitions, tableSample, tableSnapshot);
|
||||
this.scanParams = scanParams;
|
||||
this.incrementalRelation = incrementalRelation;
|
||||
}
|
||||
|
||||
public Optional<TableScanParams> getScanParams() {
|
||||
return scanParams;
|
||||
}
|
||||
|
||||
public Optional<IncrementalRelation> getIncrementalRelation() {
|
||||
return incrementalRelation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PhysicalHudiScan withGroupExpression(Optional<GroupExpression> groupExpression) {
|
||||
return new PhysicalHudiScan(relationId, getTable(), qualifier, distributionSpec,
|
||||
groupExpression, getLogicalProperties(), conjuncts, selectedPartitions, tableSample, tableSnapshot,
|
||||
scanParams, incrementalRelation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
|
||||
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
|
||||
return new PhysicalHudiScan(relationId, getTable(), qualifier, distributionSpec,
|
||||
groupExpression, logicalProperties.get(), conjuncts, selectedPartitions, tableSample, tableSnapshot,
|
||||
scanParams, incrementalRelation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PhysicalHudiScan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
|
||||
Statistics statistics) {
|
||||
return new PhysicalHudiScan(relationId, getTable(), qualifier, distributionSpec,
|
||||
groupExpression, getLogicalProperties(), physicalProperties, statistics, conjuncts,
|
||||
selectedPartitions, tableSample, tableSnapshot,
|
||||
scanParams, incrementalRelation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
|
||||
return visitor.visitPhysicalHudiScan(this, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toSqlString("PhysicalHudiScan",
|
||||
"qualified", Utils.qualifiedName(qualifier, table.getName()),
|
||||
"output", getOutput(),
|
||||
"stats", statistics,
|
||||
"conjuncts", conjuncts,
|
||||
"selected partitions num",
|
||||
selectedPartitions.isPruned ? selectedPartitions.selectedPartitions.size() : "unknown",
|
||||
"isIncremental", incrementalRelation.isPresent()
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalExternalRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
|
||||
@ -40,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOla
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
|
||||
@ -105,6 +107,10 @@ public interface RelationVisitor<R, C> {
|
||||
return visitLogicalExternalRelation(fileScan, context);
|
||||
}
|
||||
|
||||
default R visitLogicalHudiScan(LogicalHudiScan fileScan, C context) {
|
||||
return visitLogicalFileScan(fileScan, context);
|
||||
}
|
||||
|
||||
default R visitLogicalJdbcScan(LogicalJdbcScan jdbcScan, C context) {
|
||||
return visitLogicalExternalRelation(jdbcScan, context);
|
||||
}
|
||||
@ -154,6 +160,10 @@ public interface RelationVisitor<R, C> {
|
||||
return visitPhysicalCatalogRelation(fileScan, context);
|
||||
}
|
||||
|
||||
default R visitPhysicalHudiScan(PhysicalHudiScan hudiScan, C context) {
|
||||
return visitPhysicalFileScan(hudiScan, context);
|
||||
}
|
||||
|
||||
default R visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, C context) {
|
||||
return visitPhysicalCatalogRelation(jdbcScan, context);
|
||||
}
|
||||
|
||||
@ -25,6 +25,7 @@ import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import java.util.List;
|
||||
@ -42,13 +43,24 @@ public class RelationUtil {
|
||||
case 1: { // table
|
||||
// Use current database name from catalog.
|
||||
String tableName = nameParts.get(0);
|
||||
String catalogName = context.getCurrentCatalog().getName();
|
||||
CatalogIf catalogIf = context.getCurrentCatalog();
|
||||
if (catalogIf == null) {
|
||||
throw new IllegalStateException("Current catalog is not set.");
|
||||
}
|
||||
String catalogName = catalogIf.getName();
|
||||
String dbName = context.getDatabase();
|
||||
if (Strings.isNullOrEmpty(dbName)) {
|
||||
throw new IllegalStateException("Current database is not set.");
|
||||
}
|
||||
return ImmutableList.of(catalogName, dbName, tableName);
|
||||
}
|
||||
case 2: { // db.table
|
||||
// Use database name from table name parts.
|
||||
String catalogName = context.getCurrentCatalog().getName();
|
||||
CatalogIf catalogIf = context.getCurrentCatalog();
|
||||
if (catalogIf == null) {
|
||||
throw new IllegalStateException("Current catalog is not set.");
|
||||
}
|
||||
String catalogName = catalogIf.getName();
|
||||
// if the relation is view, nameParts.get(0) is dbName.
|
||||
String dbName = nameParts.get(0);
|
||||
String tableName = nameParts.get(1);
|
||||
|
||||
@ -96,6 +96,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -1962,7 +1963,14 @@ public class SingleNodePlanner {
|
||||
TableIf table = tblRef.getDesc().getTable();
|
||||
switch (((HMSExternalTable) table).getDlaType()) {
|
||||
case HUDI:
|
||||
scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
|
||||
// Old planner does not support hudi incremental read,
|
||||
// so just pass Optional.empty() to HudiScanNode
|
||||
if (tblRef.getScanParams() != null) {
|
||||
throw new UserException("Hudi incremental read is not supported, "
|
||||
+ "please set enable_nereids_planner = true to enable new optimizer");
|
||||
}
|
||||
scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true,
|
||||
Optional.empty(), Optional.empty());
|
||||
break;
|
||||
case ICEBERG:
|
||||
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
|
||||
|
||||
@ -33,6 +33,7 @@ import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HMSExternalDatabase;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
|
||||
import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
|
||||
@ -133,6 +134,10 @@ public class HmsCatalogTest extends AnalyzeCheckTestBase {
|
||||
tbl.getDatabase();
|
||||
minTimes = 0;
|
||||
result = db;
|
||||
|
||||
tbl.getDlaType();
|
||||
minTimes = 0;
|
||||
result = DLAType.HIVE;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user