[Improvement](multi-catalog) The interface of external Splitter. WIP (#17390)

This is PR introduce splitter interface external table.
The splitter interface contain one method getSplits, which is used by QueryScanProvider to get the external file split. 
For Hive/Iceberg/TVF, a split is a file block. For ES, it is a shard.
This PR also move the getSplits logic in FileScanProviderIf to the new Splitter interface.
In the future, we may unify internal table as well.
This commit is contained in:
Jibing-Li
2023-03-12 20:11:08 +08:00
committed by GitHub
parent 455c800405
commit 0d05e4cce0
17 changed files with 571 additions and 341 deletions

View File

@ -38,6 +38,7 @@ import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@ -59,7 +60,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.Strings;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
@ -250,6 +250,8 @@ public class HiveMetaStoreCache {
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
InputSplit[] splits;
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
// TODO: Implement getSplits logic by ourselves, don't call inputFormat.getSplits anymore.
if (!Strings.isNullOrEmpty(remoteUser)) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
splits = ugi.doAs(

View File

@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Expr;
import org.apache.doris.common.UserException;
import java.util.List;
public class OlapSplitter implements Splitter {
@Override
public List<Split> getSplits(List<Expr> exprs) throws UserException {
return null;
}
}

View File

@ -15,19 +15,17 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner.external;
package org.apache.doris.planner;
import lombok.Data;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
@Data
public class HiveSplit extends FileSplit {
public HiveSplit() {}
public abstract class Split {
protected String[] hosts;
public HiveSplit(Path file, long start, long length, String[] hosts) {
super(file, start, length, hosts);
public Split() {}
public Split(String[] hosts) {
this.hosts = hosts;
}
protected TableFormatType tableFormatType;
}

View File

@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Expr;
import org.apache.doris.common.UserException;
import java.util.List;
public interface Splitter {
List<Split> getSplits(List<Expr> exprs) throws UserException;
}

View File

@ -18,7 +18,6 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
@ -28,9 +27,6 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.hadoop.mapred.InputSplit;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -41,9 +37,6 @@ public interface FileScanProviderIf {
// Return S3/HDSF, etc.
TFileType getLocationType() throws DdlException, MetaNotFoundException;
// Return file list
List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException;
// return properties for S3/HDFS, etc.
Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException;

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner.external;
import org.apache.doris.planner.Split;
import lombok.Data;
import org.apache.hadoop.fs.Path;
@Data
public class FileSplit extends Split {
protected Path path;
protected long start;
protected long length;
protected TableFormatType tableFormatType;
public FileSplit() {}
public FileSplit(Path path, long start, long length, String[] hosts) {
this.path = path;
this.start = start;
this.length = length;
this.hosts = hosts;
}
public String[] getHosts() {
if (this.hosts == null) {
return new String[]{};
} else {
return this.hosts;
}
}
}

View File

@ -19,8 +19,6 @@ package org.apache.doris.planner.external;
import org.apache.doris.common.Config;
import org.apache.hadoop.mapred.FileSplit;
public class FileSplitStrategy {
private long totalSplitSize;
private int splitNum;

View File

@ -18,29 +18,18 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
@ -49,17 +38,12 @@ import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -86,6 +70,7 @@ public class HiveScanProvider extends HMSTableScanProvider {
this.hmsTable = hmsTable;
this.desc = desc;
this.columnNameToRange = columnNameToRange;
this.splitter = new HiveSplitter(hmsTable, columnNameToRange);
}
@Override
@ -138,84 +123,12 @@ public class HiveScanProvider extends HMSTableScanProvider {
return hmsTable.getMetastoreUri();
}
@Override
public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
long start = System.currentTimeMillis();
try {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
// 1. get ListPartitionItems from cache
HivePartitionValues hivePartitionValues = null;
List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
if (!partitionColumnTypes.isEmpty()) {
hivePartitionValues = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
partitionColumnTypes);
}
List<InputSplit> allFiles = Lists.newArrayList();
if (hivePartitionValues != null) {
// 2. prune partitions by expr
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();
this.totalPartitionNum = idToPartitionItem.size();
ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem,
hmsTable.getPartitionColumns(), columnNameToRange,
hivePartitionValues.getUidToPartitionRange(),
hivePartitionValues.getRangeToId(),
hivePartitionValues.getSingleColumnRangeMap(),
true);
Collection<Long> filteredPartitionIds = pruner.prune();
this.readPartitionNum = filteredPartitionIds.size();
LOG.debug("hive partition fetch and prune for table {}.{} cost: {} ms",
hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start));
// 3. get partitions from cache
List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(filteredPartitionIds.size());
for (Long id : filteredPartitionIds) {
ListPartitionItem listPartitionItem = (ListPartitionItem) idToPartitionItem.get(id);
partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList());
}
List<HivePartition> partitions = cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(),
partitionValuesList);
// 4. get all files of partitions
getFileSplitByPartitions(cache, partitions, allFiles);
} else {
// unpartitioned table, create a dummy partition to save location and inputformat,
// so that we can unify the interface.
HivePartition dummyPartition = new HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
hmsTable.getRemoteTable().getSd().getLocation(), null);
getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles);
this.totalPartitionNum = 1;
this.readPartitionNum = 1;
}
LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start));
return allFiles;
} catch (Throwable t) {
LOG.warn("get file split failed for table: {}", hmsTable.getName(), t);
throw new UserException(
"get file split failed for table: " + hmsTable.getName() + ", err: " + Util.getRootCauseMessage(t),
t);
}
}
private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
List<InputSplit> allFiles) {
List<InputSplit> files = cache.getFilesByPartitions(partitions);
if (LOG.isDebugEnabled()) {
LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(),
Joiner.on(",")
.join(files.stream().limit(10).map(f -> ((FileSplit) f).getPath())
.collect(Collectors.toList())));
}
allFiles.addAll(files);
}
public int getTotalPartitionNum() {
return totalPartitionNum;
return ((HiveSplitter) splitter).getTotalPartitionNum();
}
public int getReadPartitionNum() {
return readPartitionNum;
return ((HiveSplitter) splitter).getReadPartitionNum();
}
@Override

View File

@ -0,0 +1,155 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.Split;
import org.apache.doris.planner.Splitter;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class HiveSplitter implements Splitter {
private static final Logger LOG = LogManager.getLogger(HiveSplitter.class);
private HMSExternalTable hmsTable;
private Map<String, ColumnRange> columnNameToRange;
private int totalPartitionNum = 0;
private int readPartitionNum = 0;
public HiveSplitter(HMSExternalTable hmsTable, Map<String, ColumnRange> columnNameToRange) {
this.hmsTable = hmsTable;
this.columnNameToRange = columnNameToRange;
}
@Override
public List<Split> getSplits(List<Expr> exprs) throws UserException {
long start = System.currentTimeMillis();
try {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
// 1. get ListPartitionItems from cache
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = null;
List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
if (!partitionColumnTypes.isEmpty()) {
hivePartitionValues = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
partitionColumnTypes);
}
List<Split> allFiles = Lists.newArrayList();
if (hivePartitionValues != null) {
// 2. prune partitions by expr
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();
this.totalPartitionNum = idToPartitionItem.size();
ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem,
hmsTable.getPartitionColumns(), columnNameToRange,
hivePartitionValues.getUidToPartitionRange(),
hivePartitionValues.getRangeToId(),
hivePartitionValues.getSingleColumnRangeMap(),
true);
Collection<Long> filteredPartitionIds = pruner.prune();
this.readPartitionNum = filteredPartitionIds.size();
LOG.debug("hive partition fetch and prune for table {}.{} cost: {} ms",
hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start));
// 3. get partitions from cache
List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(filteredPartitionIds.size());
for (Long id : filteredPartitionIds) {
ListPartitionItem listPartitionItem = (ListPartitionItem) idToPartitionItem.get(id);
partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList());
}
List<HivePartition> partitions = cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(),
partitionValuesList);
// 4. get all files of partitions
getFileSplitByPartitions(cache, partitions, allFiles);
} else {
// unpartitioned table, create a dummy partition to save location and inputformat,
// so that we can unify the interface.
HivePartition dummyPartition = new HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
hmsTable.getRemoteTable().getSd().getLocation(), null);
getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles);
this.totalPartitionNum = 1;
this.readPartitionNum = 1;
}
LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start));
return allFiles;
} catch (Throwable t) {
LOG.warn("get file split failed for table: {}", hmsTable.getName(), t);
throw new UserException(
"get file split failed for table: " + hmsTable.getName() + ", err: " + Util.getRootCauseMessage(t),
t);
}
}
private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
List<Split> allFiles) {
List<InputSplit> files = cache.getFilesByPartitions(partitions);
if (LOG.isDebugEnabled()) {
LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(),
Joiner.on(",")
.join(files.stream().limit(10).map(f -> ((FileSplit) f).getPath())
.collect(Collectors.toList())));
}
allFiles.addAll(files.stream().map(file -> {
FileSplit fs = (FileSplit) file;
org.apache.doris.planner.external.FileSplit split = new org.apache.doris.planner.external.FileSplit();
split.setPath(fs.getPath());
split.setStart(fs.getStart());
// file size of orc files is not correct get by FileSplit.getLength(),
// broker reader needs correct file size
if (fs instanceof OrcSplit) {
split.setLength(((OrcSplit) fs).getFileLength());
} else {
split.setLength(fs.getLength());
}
return split;
}).collect(Collectors.toList()));
}
public int getTotalPartitionNum() {
return totalPartitionNum;
}
public int getReadPartitionNum() {
return readPartitionNum;
}
}

View File

@ -0,0 +1,154 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.planner.Split;
import org.apache.doris.planner.Splitter;
import org.apache.doris.planner.external.iceberg.IcebergDeleteFileFilter;
import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
import org.apache.doris.planner.external.iceberg.IcebergSource;
import org.apache.doris.planner.external.iceberg.IcebergSplit;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.types.Conversions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
public class IcebergSplitter implements Splitter {
private static final Logger LOG = LogManager.getLogger(IcebergSplitter.class);
private final IcebergSource icebergSource;
private final Analyzer analyzer;
public IcebergSplitter(IcebergSource icebergSource, Analyzer analyzer) {
this.icebergSource = icebergSource;
this.analyzer = analyzer;
}
@Override
public List<Split> getSplits(List<Expr> exprs) throws UserException {
List<Expression> expressions = new ArrayList<>();
org.apache.iceberg.Table table = icebergSource.getIcebergTable();
for (Expr conjunct : exprs) {
Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
if (expression != null) {
expressions.add(expression);
}
}
TableScan scan = table.newScan();
TableSnapshot tableSnapshot = icebergSource.getDesc().getRef().getTableSnapshot();
if (tableSnapshot != null) {
TableSnapshot.VersionType type = tableSnapshot.getType();
try {
if (type == TableSnapshot.VersionType.VERSION) {
scan = scan.useSnapshot(tableSnapshot.getVersion());
} else {
long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
scan = scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
}
} catch (IllegalArgumentException e) {
throw new UserException(e);
}
}
for (Expression predicate : expressions) {
scan = scan.filter(predicate);
}
List<Split> splits = new ArrayList<>();
int formatVersion = ((BaseTable) table).operations().current().formatVersion();
for (FileScanTask task : scan.planFiles()) {
for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) {
String dataFilePath = splitTask.file().path().toString();
IcebergSplit split = new IcebergSplit(new Path(dataFilePath), splitTask.start(),
splitTask.length(), new String[0]);
split.setFormatVersion(formatVersion);
if (formatVersion >= IcebergScanProvider.MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
}
split.setTableFormatType(TableFormatType.ICEBERG);
split.setAnalyzer(analyzer);
splits.add(split);
}
}
return splits;
}
public static long getSnapshotIdAsOfTime(List<HistoryEntry> historyEntries, long asOfTimestamp) {
// find history at or before asOfTimestamp
HistoryEntry latestHistory = null;
for (HistoryEntry entry : historyEntries) {
if (entry.timestampMillis() <= asOfTimestamp) {
if (latestHistory == null) {
latestHistory = entry;
continue;
}
if (entry.timestampMillis() > latestHistory.timestampMillis()) {
latestHistory = entry;
}
}
}
if (latestHistory == null) {
throw new NotFoundException("No version history at or before "
+ Instant.ofEpochMilli(asOfTimestamp));
}
return latestHistory.snapshotId();
}
private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask) {
List<IcebergDeleteFileFilter> filters = new ArrayList<>();
for (DeleteFile delete : spitTask.deletes()) {
if (delete.content() == FileContent.POSITION_DELETES) {
ByteBuffer lowerBoundBytes = delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
Optional<Long> positionLowerBound = Optional.ofNullable(lowerBoundBytes)
.map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
ByteBuffer upperBoundBytes = delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
Optional<Long> positionUpperBound = Optional.ofNullable(upperBoundBytes)
.map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L)));
} else if (delete.content() == FileContent.EQUALITY_DELETES) {
// todo: filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
// delete.equalityFieldIds()));
throw new IllegalStateException("Don't support equality delete file");
} else {
throw new IllegalStateException("Unknown delete content: " + delete.content());
}
}
return filters;
}
}

View File

@ -18,7 +18,6 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotRef;
@ -51,9 +50,7 @@ import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.mapred.InputSplit;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -77,11 +74,6 @@ public class LoadScanProvider implements FileScanProviderIf {
return null;
}
@Override
public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException {
return null;
}
@Override
public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
return null;

View File

@ -24,6 +24,8 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.planner.Split;
import org.apache.doris.planner.Splitter;
import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
import org.apache.doris.planner.external.iceberg.IcebergSplit;
@ -42,13 +44,9 @@ import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Joiner;
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -56,6 +54,7 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
public static final Logger LOG = LogManager.getLogger(QueryScanProvider.class);
private int inputSplitNum = 0;
private long inputFileSize = 0;
protected Splitter splitter;
public abstract TFileAttributes getFileAttributes() throws UserException;
@ -63,93 +62,83 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException {
long start = System.currentTimeMillis();
try {
List<InputSplit> inputSplits = getSplits(context.conjuncts);
this.inputSplitNum = inputSplits.size();
if (inputSplits.isEmpty()) {
return;
}
InputSplit inputSplit = inputSplits.get(0);
TFileType locationType = getLocationType();
context.params.setFileType(locationType);
TFileFormatType fileFormatType = getFileFormatType();
context.params.setFormatType(getFileFormatType());
if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || fileFormatType == TFileFormatType.FORMAT_JSON) {
context.params.setFileAttributes(getFileAttributes());
}
// set hdfs params for hdfs file type.
Map<String, String> locationProperties = getLocationProperties();
if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
String fsName = "";
if (this instanceof TVFScanProvider) {
fsName = ((TVFScanProvider) this).getFsName();
} else {
String fullPath = ((FileSplit) inputSplit).getPath().toUri().toString();
String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath();
// eg:
// hdfs://namenode
// s3://buckets
fsName = fullPath.replace(filePath, "");
}
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
tHdfsParams.setFsName(fsName);
context.params.setHdfsParams(tHdfsParams);
if (locationType == TFileType.FILE_BROKER) {
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
if (broker == null) {
throw new UserException("No alive broker.");
}
context.params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
}
} else if (locationType == TFileType.FILE_S3) {
context.params.setProperties(locationProperties);
}
TScanRangeLocations curLocations = newLocations(context.params, backendPolicy);
FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
for (InputSplit split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
List<String> pathPartitionKeys = getPathPartitionKeys();
List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
pathPartitionKeys, false);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
// external data lake table
if (split instanceof IcebergSplit) {
IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) split);
}
// file size of orc files is not correct get by FileSplit.getLength(),
// broker reader needs correct file size
if (locationType == TFileType.FILE_BROKER && fileFormatType == TFileFormatType.FORMAT_ORC) {
rangeDesc.setFileSize(((OrcSplit) fileSplit).getFileLength());
}
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
fileSplit.getLength(), Joiner.on("|").join(split.getLocations()));
fileSplitStrategy.update(fileSplit);
// Add a new location when it's can be split
if (fileSplitStrategy.hasNext()) {
scanRangeLocations.add(curLocations);
curLocations = newLocations(context.params, backendPolicy);
fileSplitStrategy.next();
}
this.inputFileSize += fileSplit.getLength();
}
if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) {
scanRangeLocations.add(curLocations);
}
LOG.debug("create #{} ScanRangeLocations cost: {} ms",
scanRangeLocations.size(), (System.currentTimeMillis() - start));
} catch (IOException e) {
throw new UserException(e);
List<Split> inputSplits = splitter.getSplits(context.conjuncts);
this.inputSplitNum = inputSplits.size();
if (inputSplits.isEmpty()) {
return;
}
FileSplit inputSplit = (FileSplit) inputSplits.get(0);
TFileType locationType = getLocationType();
context.params.setFileType(locationType);
TFileFormatType fileFormatType = getFileFormatType();
context.params.setFormatType(getFileFormatType());
if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || fileFormatType == TFileFormatType.FORMAT_JSON) {
context.params.setFileAttributes(getFileAttributes());
}
// set hdfs params for hdfs file type.
Map<String, String> locationProperties = getLocationProperties();
if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
String fsName = "";
if (this instanceof TVFScanProvider) {
fsName = ((TVFScanProvider) this).getFsName();
} else {
String fullPath = inputSplit.getPath().toUri().toString();
String filePath = inputSplit.getPath().toUri().getPath();
// eg:
// hdfs://namenode
// s3://buckets
fsName = fullPath.replace(filePath, "");
}
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
tHdfsParams.setFsName(fsName);
context.params.setHdfsParams(tHdfsParams);
if (locationType == TFileType.FILE_BROKER) {
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
if (broker == null) {
throw new UserException("No alive broker.");
}
context.params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
}
} else if (locationType == TFileType.FILE_S3) {
context.params.setProperties(locationProperties);
}
TScanRangeLocations curLocations = newLocations(context.params, backendPolicy);
FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
for (Split split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
List<String> pathPartitionKeys = getPathPartitionKeys();
List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
pathPartitionKeys, false);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
// external data lake table
if (fileSplit instanceof IcebergSplit) {
IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit);
}
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts()));
fileSplitStrategy.update(fileSplit);
// Add a new location when it's can be split
if (fileSplitStrategy.hasNext()) {
scanRangeLocations.add(curLocations);
curLocations = newLocations(context.params, backendPolicy);
fileSplitStrategy.next();
}
this.inputFileSize += fileSplit.getLength();
}
if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) {
scanRangeLocations.add(curLocations);
}
LOG.debug("create #{} ScanRangeLocations cost: {} ms",
scanRangeLocations.size(), (System.currentTimeMillis() - start));
}
@Override

View File

@ -18,7 +18,6 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
@ -30,7 +29,6 @@ import org.apache.doris.common.UserException;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileScanRangeParams;
@ -38,11 +36,7 @@ import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -56,6 +50,7 @@ public class TVFScanProvider extends QueryScanProvider {
this.tvfTable = tvfTable;
this.desc = desc;
this.tableValuedFunction = tableValuedFunction;
this.splitter = new TVFSplitter(tableValuedFunction);
}
public String getFsName() {
@ -80,18 +75,6 @@ public class TVFScanProvider extends QueryScanProvider {
return tableValuedFunction.getTFileType();
}
@Override
public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException {
List<InputSplit> splits = Lists.newArrayList();
List<TBrokerFileStatus> fileStatuses = tableValuedFunction.getFileStatuses();
for (TBrokerFileStatus fileStatus : fileStatuses) {
Path path = new Path(fileStatus.getPath());
FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getSize(), new String[0]);
splits.add(fileSplit);
}
return splits;
}
@Override
public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
return tableValuedFunction.getLocationProperties();

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Expr;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.Split;
import org.apache.doris.planner.Splitter;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TBrokerFileStatus;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
public class TVFSplitter implements Splitter {
private static final Logger LOG = LogManager.getLogger(TVFSplitter.class);
private ExternalFileTableValuedFunction tableValuedFunction;
public TVFSplitter(ExternalFileTableValuedFunction tableValuedFunction) {
this.tableValuedFunction = tableValuedFunction;
}
@Override
public List<Split> getSplits(List<Expr> exprs) throws UserException {
List<Split> splits = Lists.newArrayList();
List<TBrokerFileStatus> fileStatuses = tableValuedFunction.getFileStatuses();
for (TBrokerFileStatus fileStatus : fileStatuses) {
Path path = new Path(fileStatus.getPath());
Split split = new FileSplit(path, 0, fileStatus.getSize(), new String[0]);
splits.add(split);
}
return splits;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.planner.external;
public enum TableFormatType {
HIVE("hive"),
ICEBERG("iceberg"),
HUDI("hudi");

View File

@ -18,18 +18,14 @@
package org.apache.doris.planner.external.iceberg;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.planner.external.ExternalFileScanNode;
import org.apache.doris.planner.external.IcebergSplitter;
import org.apache.doris.planner.external.QueryScanProvider;
import org.apache.doris.planner.external.TableFormatType;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
@ -38,26 +34,11 @@ import org.apache.doris.thrift.TIcebergDeleteFileDesc;
import org.apache.doris.thrift.TIcebergFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.types.Conversions;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;
@ -66,17 +47,17 @@ import java.util.stream.Collectors;
*/
public class IcebergScanProvider extends QueryScanProvider {
private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
private final Analyzer analyzer;
private final IcebergSource icebergSource;
public IcebergScanProvider(IcebergSource icebergSource, Analyzer analyzer) {
this.icebergSource = icebergSource;
this.analyzer = analyzer;
this.splitter = new IcebergSplitter(icebergSource, analyzer);
}
public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit)
throws UserException {
public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
@ -139,97 +120,6 @@ public class IcebergScanProvider extends QueryScanProvider {
+ " for hms table " + icebergSource.getIcebergTable().name());
}
@Override
public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
List<Expression> expressions = new ArrayList<>();
org.apache.iceberg.Table table = icebergSource.getIcebergTable();
for (Expr conjunct : exprs) {
Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
if (expression != null) {
expressions.add(expression);
}
}
TableScan scan = table.newScan();
TableSnapshot tableSnapshot = icebergSource.getDesc().getRef().getTableSnapshot();
if (tableSnapshot != null) {
TableSnapshot.VersionType type = tableSnapshot.getType();
try {
if (type == TableSnapshot.VersionType.VERSION) {
scan = scan.useSnapshot(tableSnapshot.getVersion());
} else {
long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
scan = scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
}
} catch (IllegalArgumentException e) {
throw new UserException(e);
}
}
for (Expression predicate : expressions) {
scan = scan.filter(predicate);
}
List<InputSplit> splits = new ArrayList<>();
int formatVersion = ((BaseTable) table).operations().current().formatVersion();
for (FileScanTask task : scan.planFiles()) {
for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) {
String dataFilePath = splitTask.file().path().toString();
IcebergSplit split = new IcebergSplit(new Path(dataFilePath), splitTask.start(),
splitTask.length(), new String[0]);
split.setFormatVersion(formatVersion);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
}
split.setTableFormatType(TableFormatType.ICEBERG);
split.setAnalyzer(analyzer);
splits.add(split);
}
}
return splits;
}
public static long getSnapshotIdAsOfTime(List<HistoryEntry> historyEntries, long asOfTimestamp) {
// find history at or before asOfTimestamp
HistoryEntry latestHistory = null;
for (HistoryEntry entry : historyEntries) {
if (entry.timestampMillis() <= asOfTimestamp) {
if (latestHistory == null) {
latestHistory = entry;
continue;
}
if (entry.timestampMillis() > latestHistory.timestampMillis()) {
latestHistory = entry;
}
}
}
if (latestHistory == null) {
throw new NotFoundException("No version history at or before "
+ Instant.ofEpochMilli(asOfTimestamp));
}
return latestHistory.snapshotId();
}
private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask) {
List<IcebergDeleteFileFilter> filters = new ArrayList<>();
for (DeleteFile delete : spitTask.deletes()) {
if (delete.content() == FileContent.POSITION_DELETES) {
ByteBuffer lowerBoundBytes = delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
Optional<Long> positionLowerBound = Optional.ofNullable(lowerBoundBytes)
.map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
ByteBuffer upperBoundBytes = delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
Optional<Long> positionUpperBound = Optional.ofNullable(upperBoundBytes)
.map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L)));
} else if (delete.content() == FileContent.EQUALITY_DELETES) {
// todo: filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
// delete.equalityFieldIds()));
throw new IllegalStateException("Don't support equality delete file");
} else {
throw new IllegalStateException("Unknown delete content: " + delete.content());
}
}
return filters;
}
@Override
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
return icebergSource.getIcebergTable().spec().fields().stream().map(PartitionField::name)

View File

@ -18,7 +18,7 @@
package org.apache.doris.planner.external.iceberg;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.planner.external.HiveSplit;
import org.apache.doris.planner.external.FileSplit;
import lombok.Data;
import org.apache.hadoop.fs.Path;
@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path;
import java.util.List;
@Data
public class IcebergSplit extends HiveSplit {
public class IcebergSplit extends FileSplit {
public IcebergSplit(Path file, long start, long length, String[] hosts) {
super(file, start, length, hosts);
}