[bugfix](hudi)add timetravel for nereids for 2.1 (#38324) (#38582)

## Proposed changes

bp #38324
This commit is contained in:
wuwenchi
2024-08-01 11:37:57 +08:00
committed by GitHub
parent 338fa32303
commit 057ee1905f
9 changed files with 451 additions and 21 deletions

View File

@ -97,7 +97,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected String brokerName;
@Getter
protected TableSnapshot tableSnapshot;
/**
@ -595,4 +594,16 @@ public abstract class FileQueryScanNode extends FileScanNode {
}
}
}
public void setQueryTableSnapshot(TableSnapshot tableSnapshot) {
this.tableSnapshot = tableSnapshot;
}
public TableSnapshot getQueryTableSnapshot() {
TableSnapshot snapshot = desc.getRef().getTableSnapshot();
if (snapshot != null) {
return snapshot;
}
return this.tableSnapshot;
}
}

View File

@ -0,0 +1,188 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hudi.source;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.LocalTaskContextSupplier;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.function.FunctionWrapper;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* This file is copied from org.apache.hudi.common.engine.HoodieLocalEngineContext.
* Because we need set ugi in thread pool
* A java based engine context, use this implementation on the query engine integrations if needed.
*/
public final class HudiLocalEngineContext extends HoodieEngineContext {
public HudiLocalEngineContext(Configuration conf) {
this(conf, new LocalTaskContextSupplier());
}
public HudiLocalEngineContext(Configuration conf, TaskContextSupplier taskContextSupplier) {
super(new SerializableConfiguration(conf), taskContextSupplier);
}
@Override
public HoodieAccumulator newAccumulator() {
return HoodieAtomicLongAccumulator.create();
}
@Override
public <T> HoodieData<T> emptyHoodieData() {
return HoodieListData.eager(Collections.emptyList());
}
@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieListData.eager(data);
}
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
return data.stream().parallel().map(v1 -> {
try {
return HiveMetaStoreClientHelper.ugiDoAs(getHadoopConf().get(), () -> func.apply(v1));
} catch (Exception e) {
throw new HoodieException("Error occurs when executing map", e);
}
}).collect(Collectors.toList());
}
@Override
public <I, K, V> List<V> mapToPairAndReduceByKey(
List<I> data,
SerializablePairFunction<I, K, V> mapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return data.stream().parallel().map(FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc))
.collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
.map(list ->
list.stream()
.map(e -> e.getValue())
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).get())
.collect(Collectors.toList());
}
@Override
public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey(
Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
.map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map(
Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)))
.filter(Objects::nonNull);
}
@Override
public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return data.stream().parallel()
.collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
.map(list ->
list.stream()
.map(e -> e.getValue())
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
return
data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func)).collect(Collectors.toList());
}
@Override
public <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism) {
data.stream().forEach(FunctionWrapper.throwingForeachWrapper(consumer));
}
@Override
public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
return data.stream().map(FunctionWrapper.throwingMapToPairWrapper(func)).collect(
Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) -> newVal)
);
}
@Override
public void setProperty(EngineProperty key, String value) {
// no operation for now
}
@Override
public Option<String> getProperty(EngineProperty key) {
return Option.empty();
}
@Override
public void setJobStatus(String activeModule, String activityDescription) {
// no operation for now
}
@Override
public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
// no operation for now
}
@Override
public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
return Collections.emptyList();
}
@Override
public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
return Collections.emptyList();
}
@Override
public void cancelJob(String jobId) {
// no operation for now
}
@Override
public void cancelAllJobs() {
// no operation for now
}
}

View File

@ -18,7 +18,6 @@
package org.apache.doris.datasource.hudi.source;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
@ -50,7 +49,7 @@ public abstract class HudiPartitionProcessor {
.build();
HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
new HoodieLocalEngineContext(tableMetaClient.getHadoopConf()), metadataConfig,
new HudiLocalEngineContext(tableMetaClient.getHadoopConf()), metadataConfig,
tableMetaClient.getBasePathV2().toString(), true);
return newTableMetadata.getAllPartitionPaths();

View File

@ -18,6 +18,7 @@
package org.apache.doris.datasource.hudi.source;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PartitionItem;
@ -203,8 +204,12 @@ public class HudiScanNode extends HiveScanNode {
}
timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
if (desc.getRef().getTableSnapshot() != null) {
queryInstant = desc.getRef().getTableSnapshot().getTime();
TableSnapshot tableSnapshot = getQueryTableSnapshot();
if (tableSnapshot != null) {
if (tableSnapshot.getType() == TableSnapshot.VersionType.VERSION) {
throw new UserException("Hudi does not support `FOR VERSION AS OF`, please use `FOR TIME AS OF`");
}
queryInstant = tableSnapshot.getTime().replaceAll("[-: ]", "");
snapshotTimestamp = Option.of(queryInstant);
} else {
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();

View File

@ -274,10 +274,7 @@ public class IcebergScanNode extends FileQueryScanNode {
}
public Long getSpecifiedSnapshot() throws UserException {
TableSnapshot tableSnapshot = source.getDesc().getRef().getTableSnapshot();
if (tableSnapshot == null) {
tableSnapshot = this.tableSnapshot;
}
TableSnapshot tableSnapshot = getQueryTableSnapshot();
if (tableSnapshot != null) {
TableSnapshot.VersionType type = tableSnapshot.getType();
try {
@ -440,8 +437,4 @@ public class IcebergScanNode extends FileQueryScanNode {
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%sicebergPredicatePushdown=\n%s\n", prefix, sb);
}
public void setTableSnapshot(TableSnapshot tableSnapshot) {
this.tableSnapshot = tableSnapshot;
}
}

View File

@ -49,6 +49,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.es.source.EsScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
@ -571,10 +572,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
switch (((HMSExternalTable) table).getDlaType()) {
case ICEBERG:
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
IcebergScanNode icebergScanNode = (IcebergScanNode) scanNode;
if (fileScan.getTableSnapshot().isPresent()) {
icebergScanNode.setTableSnapshot(fileScan.getTableSnapshot().get());
}
break;
case HIVE:
scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
@ -590,9 +587,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
} else if (table instanceof IcebergExternalTable) {
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
if (fileScan.getTableSnapshot().isPresent()) {
((IcebergScanNode) scanNode).setTableSnapshot(fileScan.getTableSnapshot().get());
}
} else if (table instanceof PaimonExternalTable) {
scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
} else if (table instanceof MaxComputeExternalTable) {
@ -600,6 +594,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
} else {
throw new RuntimeException("do not support table type " + table.getType());
}
if (fileScan.getTableSnapshot().isPresent() && scanNode instanceof FileQueryScanNode) {
((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
}
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor);
}
@ -661,7 +658,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false,
hudiScan.getScanParams(), hudiScan.getIncrementalRelation());
if (fileScan.getTableSnapshot().isPresent()) {
((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
}
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor);
}

View File

@ -34,6 +34,7 @@ import org.apache.doris.analysis.PredicateUtils;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
@ -97,6 +98,8 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator {
// create a mapping between output slot's id and project expr
Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();
protected TableSnapshot tableSnapshot;
public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType) {
super(id, desc.getId().asList(), planNodeName, statisticalType);
this.desc = desc;