From 0d05e4cce019e88d5c5cdd99d5435643f9f8f71d Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Sun, 12 Mar 2023 20:11:08 +0800 Subject: [PATCH] [Improvement](multi-catalog) The interface of external Splitter. WIP (#17390) This is PR introduce splitter interface external table. The splitter interface contain one method getSplits, which is used by QueryScanProvider to get the external file split. For Hive/Iceberg/TVF, a split is a file block. For ES, it is a shard. This PR also move the getSplits logic in FileScanProviderIf to the new Splitter interface. In the future, we may unify internal table as well. --- .../datasource/hive/HiveMetaStoreCache.java | 4 +- .../apache/doris/planner/OlapSplitter.java | 31 ++++ .../{external/HiveSplit.java => Split.java} | 16 +- .../org/apache/doris/planner/Splitter.java | 27 +++ .../planner/external/FileScanProviderIf.java | 7 - .../doris/planner/external/FileSplit.java | 48 +++++ .../planner/external/FileSplitStrategy.java | 2 - .../planner/external/HiveScanProvider.java | 93 +--------- .../doris/planner/external/HiveSplitter.java | 155 ++++++++++++++++ .../planner/external/IcebergSplitter.java | 154 ++++++++++++++++ .../planner/external/LoadScanProvider.java | 8 - .../planner/external/QueryScanProvider.java | 169 ++++++++---------- .../planner/external/TVFScanProvider.java | 19 +- .../doris/planner/external/TVFSplitter.java | 56 ++++++ .../planner/external/TableFormatType.java | 1 + .../external/iceberg/IcebergScanProvider.java | 118 +----------- .../external/iceberg/IcebergSplit.java | 4 +- 17 files changed, 571 insertions(+), 341 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java rename fe/fe-core/src/main/java/org/apache/doris/planner/{external/HiveSplit.java => Split.java} (70%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 31b8d2f3b4..c2313eb885 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -38,6 +38,7 @@ import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -59,7 +60,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.parquet.Strings; import java.security.PrivilegedExceptionAction; import java.util.HashMap; @@ -250,6 +250,8 @@ public class HiveMetaStoreCache { InputFormat inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); InputSplit[] splits; String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); + + // TODO: Implement getSplits logic by ourselves, don't call inputFormat.getSplits anymore. if (!Strings.isNullOrEmpty(remoteUser)) { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); splits = ugi.doAs( diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java new file mode 100644 index 0000000000..f2d06a3aef --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.common.UserException; + +import java.util.List; + +public class OlapSplitter implements Splitter { + + @Override + public List getSplits(List exprs) throws UserException { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java similarity index 70% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/Split.java index 6c8f916a5e..63b837aacc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java @@ -15,19 +15,17 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.planner.external; +package org.apache.doris.planner; import lombok.Data; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileSplit; @Data -public class HiveSplit extends FileSplit { - public HiveSplit() {} +public abstract class Split { + protected String[] hosts; - public HiveSplit(Path file, long start, long length, String[] hosts) { - super(file, start, length, hosts); + public Split() {} + + public Split(String[] hosts) { + this.hosts = hosts; } - - protected TableFormatType tableFormatType; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java new file mode 100644 index 0000000000..07952bff87 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.common.UserException; + +import java.util.List; + +public interface Splitter { + List getSplits(List exprs) throws UserException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java index 8ae7952169..f962f4d827 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java @@ -18,7 +18,6 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -28,9 +27,6 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TScanRangeLocations; -import org.apache.hadoop.mapred.InputSplit; - -import java.io.IOException; import java.util.List; import java.util.Map; @@ -41,9 +37,6 @@ public interface FileScanProviderIf { // Return S3/HDSF, etc. TFileType getLocationType() throws DdlException, MetaNotFoundException; - // Return file list - List getSplits(List exprs) throws IOException, UserException; - // return properties for S3/HDFS, etc. Map getLocationProperties() throws MetaNotFoundException, DdlException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java new file mode 100644 index 0000000000..a4e7bfae2f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.planner.Split; + +import lombok.Data; +import org.apache.hadoop.fs.Path; + +@Data +public class FileSplit extends Split { + protected Path path; + protected long start; + protected long length; + protected TableFormatType tableFormatType; + + public FileSplit() {} + + public FileSplit(Path path, long start, long length, String[] hosts) { + this.path = path; + this.start = start; + this.length = length; + this.hosts = hosts; + } + + public String[] getHosts() { + if (this.hosts == null) { + return new String[]{}; + } else { + return this.hosts; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java index 83a6d3d49c..e574aeb9d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java @@ -19,8 +19,6 @@ package org.apache.doris.planner.external; import org.apache.doris.common.Config; -import org.apache.hadoop.mapred.FileSplit; - public class FileSplitStrategy { private long totalSplitSize; private int splitNum; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index f538bf3c3e..5b0baf93ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -18,29 +18,18 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HiveMetaStoreClientHelper; -import org.apache.doris.catalog.ListPartitionItem; -import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.HMSExternalCatalog; -import org.apache.doris.datasource.hive.HiveMetaStoreCache; -import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues; -import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.ColumnRange; -import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileFormatType; @@ -49,17 +38,12 @@ import org.apache.doris.thrift.TFileScanSlotInfo; import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -86,6 +70,7 @@ public class HiveScanProvider extends HMSTableScanProvider { this.hmsTable = hmsTable; this.desc = desc; this.columnNameToRange = columnNameToRange; + this.splitter = new HiveSplitter(hmsTable, columnNameToRange); } @Override @@ -138,84 +123,12 @@ public class HiveScanProvider extends HMSTableScanProvider { return hmsTable.getMetastoreUri(); } - @Override - public List getSplits(List exprs) throws UserException { - long start = System.currentTimeMillis(); - try { - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); - // 1. get ListPartitionItems from cache - HivePartitionValues hivePartitionValues = null; - List partitionColumnTypes = hmsTable.getPartitionColumnTypes(); - if (!partitionColumnTypes.isEmpty()) { - hivePartitionValues = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(), - partitionColumnTypes); - } - - List allFiles = Lists.newArrayList(); - if (hivePartitionValues != null) { - // 2. prune partitions by expr - Map idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); - this.totalPartitionNum = idToPartitionItem.size(); - ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem, - hmsTable.getPartitionColumns(), columnNameToRange, - hivePartitionValues.getUidToPartitionRange(), - hivePartitionValues.getRangeToId(), - hivePartitionValues.getSingleColumnRangeMap(), - true); - Collection filteredPartitionIds = pruner.prune(); - this.readPartitionNum = filteredPartitionIds.size(); - LOG.debug("hive partition fetch and prune for table {}.{} cost: {} ms", - hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); - - // 3. get partitions from cache - List> partitionValuesList = Lists.newArrayListWithCapacity(filteredPartitionIds.size()); - for (Long id : filteredPartitionIds) { - ListPartitionItem listPartitionItem = (ListPartitionItem) idToPartitionItem.get(id); - partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList()); - } - List partitions = cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), - partitionValuesList); - // 4. get all files of partitions - getFileSplitByPartitions(cache, partitions, allFiles); - } else { - // unpartitioned table, create a dummy partition to save location and inputformat, - // so that we can unify the interface. - HivePartition dummyPartition = new HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(), - hmsTable.getRemoteTable().getSd().getLocation(), null); - getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles); - this.totalPartitionNum = 1; - this.readPartitionNum = 1; - } - LOG.debug("get #{} files for table: {}.{}, cost: {} ms", - allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); - return allFiles; - } catch (Throwable t) { - LOG.warn("get file split failed for table: {}", hmsTable.getName(), t); - throw new UserException( - "get file split failed for table: " + hmsTable.getName() + ", err: " + Util.getRootCauseMessage(t), - t); - } - } - - private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, - List allFiles) { - List files = cache.getFilesByPartitions(partitions); - if (LOG.isDebugEnabled()) { - LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(), - Joiner.on(",") - .join(files.stream().limit(10).map(f -> ((FileSplit) f).getPath()) - .collect(Collectors.toList()))); - } - allFiles.addAll(files); - } - public int getTotalPartitionNum() { - return totalPartitionNum; + return ((HiveSplitter) splitter).getTotalPartitionNum(); } public int getReadPartitionNum() { - return readPartitionNum; + return ((HiveSplitter) splitter).getReadPartitionNum(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java new file mode 100644 index 0000000000..a49935b9ee --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.Type; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.datasource.hive.HivePartition; +import org.apache.doris.planner.ColumnRange; +import org.apache.doris.planner.ListPartitionPrunerV2; +import org.apache.doris.planner.Split; +import org.apache.doris.planner.Splitter; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class HiveSplitter implements Splitter { + + private static final Logger LOG = LogManager.getLogger(HiveSplitter.class); + + private HMSExternalTable hmsTable; + private Map columnNameToRange; + private int totalPartitionNum = 0; + private int readPartitionNum = 0; + + public HiveSplitter(HMSExternalTable hmsTable, Map columnNameToRange) { + this.hmsTable = hmsTable; + this.columnNameToRange = columnNameToRange; + } + + @Override + public List getSplits(List exprs) throws UserException { + long start = System.currentTimeMillis(); + try { + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); + // 1. get ListPartitionItems from cache + HiveMetaStoreCache.HivePartitionValues hivePartitionValues = null; + List partitionColumnTypes = hmsTable.getPartitionColumnTypes(); + if (!partitionColumnTypes.isEmpty()) { + hivePartitionValues = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(), + partitionColumnTypes); + } + + List allFiles = Lists.newArrayList(); + if (hivePartitionValues != null) { + // 2. prune partitions by expr + Map idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); + this.totalPartitionNum = idToPartitionItem.size(); + ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem, + hmsTable.getPartitionColumns(), columnNameToRange, + hivePartitionValues.getUidToPartitionRange(), + hivePartitionValues.getRangeToId(), + hivePartitionValues.getSingleColumnRangeMap(), + true); + Collection filteredPartitionIds = pruner.prune(); + this.readPartitionNum = filteredPartitionIds.size(); + LOG.debug("hive partition fetch and prune for table {}.{} cost: {} ms", + hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); + + // 3. get partitions from cache + List> partitionValuesList = Lists.newArrayListWithCapacity(filteredPartitionIds.size()); + for (Long id : filteredPartitionIds) { + ListPartitionItem listPartitionItem = (ListPartitionItem) idToPartitionItem.get(id); + partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList()); + } + List partitions = cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), + partitionValuesList); + // 4. get all files of partitions + getFileSplitByPartitions(cache, partitions, allFiles); + } else { + // unpartitioned table, create a dummy partition to save location and inputformat, + // so that we can unify the interface. + HivePartition dummyPartition = new HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(), + hmsTable.getRemoteTable().getSd().getLocation(), null); + getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles); + this.totalPartitionNum = 1; + this.readPartitionNum = 1; + } + LOG.debug("get #{} files for table: {}.{}, cost: {} ms", + allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); + return allFiles; + } catch (Throwable t) { + LOG.warn("get file split failed for table: {}", hmsTable.getName(), t); + throw new UserException( + "get file split failed for table: " + hmsTable.getName() + ", err: " + Util.getRootCauseMessage(t), + t); + } + } + + private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, + List allFiles) { + List files = cache.getFilesByPartitions(partitions); + if (LOG.isDebugEnabled()) { + LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(), + Joiner.on(",") + .join(files.stream().limit(10).map(f -> ((FileSplit) f).getPath()) + .collect(Collectors.toList()))); + } + allFiles.addAll(files.stream().map(file -> { + FileSplit fs = (FileSplit) file; + org.apache.doris.planner.external.FileSplit split = new org.apache.doris.planner.external.FileSplit(); + split.setPath(fs.getPath()); + split.setStart(fs.getStart()); + // file size of orc files is not correct get by FileSplit.getLength(), + // broker reader needs correct file size + if (fs instanceof OrcSplit) { + split.setLength(((OrcSplit) fs).getFileLength()); + } else { + split.setLength(fs.getLength()); + } + return split; + }).collect(Collectors.toList())); + } + + public int getTotalPartitionNum() { + return totalPartitionNum; + } + + public int getReadPartitionNum() { + return readPartitionNum; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java new file mode 100644 index 0000000000..b595e95cc3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TableSnapshot; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.external.iceberg.util.IcebergUtils; +import org.apache.doris.planner.Split; +import org.apache.doris.planner.Splitter; +import org.apache.doris.planner.external.iceberg.IcebergDeleteFileFilter; +import org.apache.doris.planner.external.iceberg.IcebergScanProvider; +import org.apache.doris.planner.external.iceberg.IcebergSource; +import org.apache.doris.planner.external.iceberg.IcebergSplit; + +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.HistoryEntry; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.types.Conversions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class IcebergSplitter implements Splitter { + private static final Logger LOG = LogManager.getLogger(IcebergSplitter.class); + + private final IcebergSource icebergSource; + private final Analyzer analyzer; + + public IcebergSplitter(IcebergSource icebergSource, Analyzer analyzer) { + this.icebergSource = icebergSource; + this.analyzer = analyzer; + } + + @Override + public List getSplits(List exprs) throws UserException { + List expressions = new ArrayList<>(); + org.apache.iceberg.Table table = icebergSource.getIcebergTable(); + for (Expr conjunct : exprs) { + Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, table.schema()); + if (expression != null) { + expressions.add(expression); + } + } + TableScan scan = table.newScan(); + TableSnapshot tableSnapshot = icebergSource.getDesc().getRef().getTableSnapshot(); + if (tableSnapshot != null) { + TableSnapshot.VersionType type = tableSnapshot.getType(); + try { + if (type == TableSnapshot.VersionType.VERSION) { + scan = scan.useSnapshot(tableSnapshot.getVersion()); + } else { + long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); + scan = scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId)); + } + } catch (IllegalArgumentException e) { + throw new UserException(e); + } + } + for (Expression predicate : expressions) { + scan = scan.filter(predicate); + } + List splits = new ArrayList<>(); + int formatVersion = ((BaseTable) table).operations().current().formatVersion(); + for (FileScanTask task : scan.planFiles()) { + for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) { + String dataFilePath = splitTask.file().path().toString(); + IcebergSplit split = new IcebergSplit(new Path(dataFilePath), splitTask.start(), + splitTask.length(), new String[0]); + split.setFormatVersion(formatVersion); + if (formatVersion >= IcebergScanProvider.MIN_DELETE_FILE_SUPPORT_VERSION) { + split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); + } + split.setTableFormatType(TableFormatType.ICEBERG); + split.setAnalyzer(analyzer); + splits.add(split); + } + } + return splits; + } + + public static long getSnapshotIdAsOfTime(List historyEntries, long asOfTimestamp) { + // find history at or before asOfTimestamp + HistoryEntry latestHistory = null; + for (HistoryEntry entry : historyEntries) { + if (entry.timestampMillis() <= asOfTimestamp) { + if (latestHistory == null) { + latestHistory = entry; + continue; + } + if (entry.timestampMillis() > latestHistory.timestampMillis()) { + latestHistory = entry; + } + } + } + if (latestHistory == null) { + throw new NotFoundException("No version history at or before " + + Instant.ofEpochMilli(asOfTimestamp)); + } + return latestHistory.snapshotId(); + } + + private List getDeleteFileFilters(FileScanTask spitTask) { + List filters = new ArrayList<>(); + for (DeleteFile delete : spitTask.deletes()) { + if (delete.content() == FileContent.POSITION_DELETES) { + ByteBuffer lowerBoundBytes = delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId()); + Optional positionLowerBound = Optional.ofNullable(lowerBoundBytes) + .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); + ByteBuffer upperBoundBytes = delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId()); + Optional positionUpperBound = Optional.ofNullable(upperBoundBytes) + .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); + filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(), + positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L))); + } else if (delete.content() == FileContent.EQUALITY_DELETES) { + // todo: filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(), + // delete.equalityFieldIds())); + throw new IllegalStateException("Don't support equality delete file"); + } else { + throw new IllegalStateException("Unknown delete content: " + delete.content()); + } + } + return filters; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index d8e644e277..086191e94d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -18,7 +18,6 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.SlotRef; @@ -51,9 +50,7 @@ import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.hadoop.mapred.InputSplit; -import java.io.IOException; import java.util.List; import java.util.Map; @@ -77,11 +74,6 @@ public class LoadScanProvider implements FileScanProviderIf { return null; } - @Override - public List getSplits(List exprs) throws IOException, UserException { - return null; - } - @Override public Map getLocationProperties() throws MetaNotFoundException, DdlException { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java index 1d25a3cb31..45e48c4ff5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java @@ -24,6 +24,8 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.planner.Split; +import org.apache.doris.planner.Splitter; import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; import org.apache.doris.planner.external.iceberg.IcebergScanProvider; import org.apache.doris.planner.external.iceberg.IcebergSplit; @@ -42,13 +44,9 @@ import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Joiner; -import org.apache.hadoop.hive.ql.io.orc.OrcSplit; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; import java.util.List; import java.util.Map; @@ -56,6 +54,7 @@ public abstract class QueryScanProvider implements FileScanProviderIf { public static final Logger LOG = LogManager.getLogger(QueryScanProvider.class); private int inputSplitNum = 0; private long inputFileSize = 0; + protected Splitter splitter; public abstract TFileAttributes getFileAttributes() throws UserException; @@ -63,93 +62,83 @@ public abstract class QueryScanProvider implements FileScanProviderIf { public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, List scanRangeLocations) throws UserException { long start = System.currentTimeMillis(); - try { - List inputSplits = getSplits(context.conjuncts); - this.inputSplitNum = inputSplits.size(); - if (inputSplits.isEmpty()) { - return; - } - InputSplit inputSplit = inputSplits.get(0); - TFileType locationType = getLocationType(); - context.params.setFileType(locationType); - TFileFormatType fileFormatType = getFileFormatType(); - context.params.setFormatType(getFileFormatType()); - if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || fileFormatType == TFileFormatType.FORMAT_JSON) { - context.params.setFileAttributes(getFileAttributes()); - } - - // set hdfs params for hdfs file type. - Map locationProperties = getLocationProperties(); - if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { - String fsName = ""; - if (this instanceof TVFScanProvider) { - fsName = ((TVFScanProvider) this).getFsName(); - } else { - String fullPath = ((FileSplit) inputSplit).getPath().toUri().toString(); - String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath(); - // eg: - // hdfs://namenode - // s3://buckets - fsName = fullPath.replace(filePath, ""); - } - THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); - tHdfsParams.setFsName(fsName); - context.params.setHdfsParams(tHdfsParams); - - if (locationType == TFileType.FILE_BROKER) { - FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); - if (broker == null) { - throw new UserException("No alive broker."); - } - context.params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); - } - } else if (locationType == TFileType.FILE_S3) { - context.params.setProperties(locationProperties); - } - TScanRangeLocations curLocations = newLocations(context.params, backendPolicy); - - FileSplitStrategy fileSplitStrategy = new FileSplitStrategy(); - - for (InputSplit split : inputSplits) { - FileSplit fileSplit = (FileSplit) split; - List pathPartitionKeys = getPathPartitionKeys(); - List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), - pathPartitionKeys, false); - - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); - // external data lake table - if (split instanceof IcebergSplit) { - IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) split); - } - - // file size of orc files is not correct get by FileSplit.getLength(), - // broker reader needs correct file size - if (locationType == TFileType.FILE_BROKER && fileFormatType == TFileFormatType.FORMAT_ORC) { - rangeDesc.setFileSize(((OrcSplit) fileSplit).getFileLength()); - } - - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", - curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), - fileSplit.getLength(), Joiner.on("|").join(split.getLocations())); - - fileSplitStrategy.update(fileSplit); - // Add a new location when it's can be split - if (fileSplitStrategy.hasNext()) { - scanRangeLocations.add(curLocations); - curLocations = newLocations(context.params, backendPolicy); - fileSplitStrategy.next(); - } - this.inputFileSize += fileSplit.getLength(); - } - if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) { - scanRangeLocations.add(curLocations); - } - LOG.debug("create #{} ScanRangeLocations cost: {} ms", - scanRangeLocations.size(), (System.currentTimeMillis() - start)); - } catch (IOException e) { - throw new UserException(e); + List inputSplits = splitter.getSplits(context.conjuncts); + this.inputSplitNum = inputSplits.size(); + if (inputSplits.isEmpty()) { + return; } + FileSplit inputSplit = (FileSplit) inputSplits.get(0); + TFileType locationType = getLocationType(); + context.params.setFileType(locationType); + TFileFormatType fileFormatType = getFileFormatType(); + context.params.setFormatType(getFileFormatType()); + if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || fileFormatType == TFileFormatType.FORMAT_JSON) { + context.params.setFileAttributes(getFileAttributes()); + } + + // set hdfs params for hdfs file type. + Map locationProperties = getLocationProperties(); + if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { + String fsName = ""; + if (this instanceof TVFScanProvider) { + fsName = ((TVFScanProvider) this).getFsName(); + } else { + String fullPath = inputSplit.getPath().toUri().toString(); + String filePath = inputSplit.getPath().toUri().getPath(); + // eg: + // hdfs://namenode + // s3://buckets + fsName = fullPath.replace(filePath, ""); + } + THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); + tHdfsParams.setFsName(fsName); + context.params.setHdfsParams(tHdfsParams); + + if (locationType == TFileType.FILE_BROKER) { + FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); + if (broker == null) { + throw new UserException("No alive broker."); + } + context.params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); + } + } else if (locationType == TFileType.FILE_S3) { + context.params.setProperties(locationProperties); + } + TScanRangeLocations curLocations = newLocations(context.params, backendPolicy); + + FileSplitStrategy fileSplitStrategy = new FileSplitStrategy(); + + for (Split split : inputSplits) { + FileSplit fileSplit = (FileSplit) split; + List pathPartitionKeys = getPathPartitionKeys(); + List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), + pathPartitionKeys, false); + + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); + // external data lake table + if (fileSplit instanceof IcebergSplit) { + IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit); + } + + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", + curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), + fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); + + fileSplitStrategy.update(fileSplit); + // Add a new location when it's can be split + if (fileSplitStrategy.hasNext()) { + scanRangeLocations.add(curLocations); + curLocations = newLocations(context.params, backendPolicy); + fileSplitStrategy.next(); + } + this.inputFileSize += fileSplit.getLength(); + } + if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) { + scanRangeLocations.add(curLocations); + } + LOG.debug("create #{} ScanRangeLocations cost: {} ms", + scanRangeLocations.size(), (System.currentTimeMillis() - start)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java index 954d271a94..48365a7656 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java @@ -18,7 +18,6 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; @@ -30,7 +29,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; -import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileScanRangeParams; @@ -38,11 +36,7 @@ import org.apache.doris.thrift.TFileScanSlotInfo; import org.apache.doris.thrift.TFileType; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import java.io.IOException; import java.util.List; import java.util.Map; @@ -56,6 +50,7 @@ public class TVFScanProvider extends QueryScanProvider { this.tvfTable = tvfTable; this.desc = desc; this.tableValuedFunction = tableValuedFunction; + this.splitter = new TVFSplitter(tableValuedFunction); } public String getFsName() { @@ -80,18 +75,6 @@ public class TVFScanProvider extends QueryScanProvider { return tableValuedFunction.getTFileType(); } - @Override - public List getSplits(List exprs) throws IOException, UserException { - List splits = Lists.newArrayList(); - List fileStatuses = tableValuedFunction.getFileStatuses(); - for (TBrokerFileStatus fileStatus : fileStatuses) { - Path path = new Path(fileStatus.getPath()); - FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getSize(), new String[0]); - splits.add(fileSplit); - } - return splits; - } - @Override public Map getLocationProperties() throws MetaNotFoundException, DdlException { return tableValuedFunction.getLocationProperties(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java new file mode 100644 index 0000000000..d3234c977f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.Split; +import org.apache.doris.planner.Splitter; +import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; +import org.apache.doris.thrift.TBrokerFileStatus; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +public class TVFSplitter implements Splitter { + + private static final Logger LOG = LogManager.getLogger(TVFSplitter.class); + + private ExternalFileTableValuedFunction tableValuedFunction; + + public TVFSplitter(ExternalFileTableValuedFunction tableValuedFunction) { + this.tableValuedFunction = tableValuedFunction; + } + + @Override + public List getSplits(List exprs) throws UserException { + List splits = Lists.newArrayList(); + List fileStatuses = tableValuedFunction.getFileStatuses(); + for (TBrokerFileStatus fileStatus : fileStatuses) { + Path path = new Path(fileStatus.getPath()); + Split split = new FileSplit(path, 0, fileStatus.getSize(), new String[0]); + splits.add(split); + } + return splits; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java index 794283de80..6fc5d69544 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java @@ -18,6 +18,7 @@ package org.apache.doris.planner.external; public enum TableFormatType { + HIVE("hive"), ICEBERG("iceberg"), HUDI("hudi"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java index 483432b798..eb565638e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java @@ -18,18 +18,14 @@ package org.apache.doris.planner.external.iceberg; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.external.iceberg.util.IcebergUtils; import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.planner.external.IcebergSplitter; import org.apache.doris.planner.external.QueryScanProvider; -import org.apache.doris.planner.external.TableFormatType; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileRangeDesc; @@ -38,26 +34,11 @@ import org.apache.doris.thrift.TIcebergDeleteFileDesc; import org.apache.doris.thrift.TIcebergFileDesc; import org.apache.doris.thrift.TTableFormatFileDesc; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.iceberg.BaseTable; -import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.HistoryEntry; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionField; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.exceptions.NotFoundException; -import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.types.Conversions; -import java.nio.ByteBuffer; -import java.time.Instant; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.OptionalLong; import java.util.stream.Collectors; @@ -66,17 +47,17 @@ import java.util.stream.Collectors; */ public class IcebergScanProvider extends QueryScanProvider { - private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; + public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; private final Analyzer analyzer; private final IcebergSource icebergSource; public IcebergScanProvider(IcebergSource icebergSource, Analyzer analyzer) { this.icebergSource = icebergSource; this.analyzer = analyzer; + this.splitter = new IcebergSplitter(icebergSource, analyzer); } - public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) - throws UserException { + public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value()); TIcebergFileDesc fileDesc = new TIcebergFileDesc(); @@ -139,97 +120,6 @@ public class IcebergScanProvider extends QueryScanProvider { + " for hms table " + icebergSource.getIcebergTable().name()); } - @Override - public List getSplits(List exprs) throws UserException { - List expressions = new ArrayList<>(); - org.apache.iceberg.Table table = icebergSource.getIcebergTable(); - for (Expr conjunct : exprs) { - Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, table.schema()); - if (expression != null) { - expressions.add(expression); - } - } - TableScan scan = table.newScan(); - TableSnapshot tableSnapshot = icebergSource.getDesc().getRef().getTableSnapshot(); - if (tableSnapshot != null) { - TableSnapshot.VersionType type = tableSnapshot.getType(); - try { - if (type == TableSnapshot.VersionType.VERSION) { - scan = scan.useSnapshot(tableSnapshot.getVersion()); - } else { - long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); - scan = scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId)); - } - } catch (IllegalArgumentException e) { - throw new UserException(e); - } - } - for (Expression predicate : expressions) { - scan = scan.filter(predicate); - } - List splits = new ArrayList<>(); - int formatVersion = ((BaseTable) table).operations().current().formatVersion(); - for (FileScanTask task : scan.planFiles()) { - for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) { - String dataFilePath = splitTask.file().path().toString(); - IcebergSplit split = new IcebergSplit(new Path(dataFilePath), splitTask.start(), - splitTask.length(), new String[0]); - split.setFormatVersion(formatVersion); - if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { - split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); - } - split.setTableFormatType(TableFormatType.ICEBERG); - split.setAnalyzer(analyzer); - splits.add(split); - } - } - return splits; - } - - public static long getSnapshotIdAsOfTime(List historyEntries, long asOfTimestamp) { - // find history at or before asOfTimestamp - HistoryEntry latestHistory = null; - for (HistoryEntry entry : historyEntries) { - if (entry.timestampMillis() <= asOfTimestamp) { - if (latestHistory == null) { - latestHistory = entry; - continue; - } - if (entry.timestampMillis() > latestHistory.timestampMillis()) { - latestHistory = entry; - } - } - } - if (latestHistory == null) { - throw new NotFoundException("No version history at or before " - + Instant.ofEpochMilli(asOfTimestamp)); - } - return latestHistory.snapshotId(); - } - - private List getDeleteFileFilters(FileScanTask spitTask) { - List filters = new ArrayList<>(); - for (DeleteFile delete : spitTask.deletes()) { - if (delete.content() == FileContent.POSITION_DELETES) { - ByteBuffer lowerBoundBytes = delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId()); - Optional positionLowerBound = Optional.ofNullable(lowerBoundBytes) - .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); - ByteBuffer upperBoundBytes = delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId()); - Optional positionUpperBound = Optional.ofNullable(upperBoundBytes) - .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); - filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(), - positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L))); - } else if (delete.content() == FileContent.EQUALITY_DELETES) { - // todo: filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(), - // delete.equalityFieldIds())); - throw new IllegalStateException("Don't support equality delete file"); - } else { - throw new IllegalStateException("Unknown delete content: " + delete.content()); - } - } - return filters; - } - @Override public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { return icebergSource.getIcebergTable().spec().fields().stream().map(PartitionField::name) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java index a82c99b04a..431652b894 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java @@ -18,7 +18,7 @@ package org.apache.doris.planner.external.iceberg; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.planner.external.HiveSplit; +import org.apache.doris.planner.external.FileSplit; import lombok.Data; import org.apache.hadoop.fs.Path; @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path; import java.util.List; @Data -public class IcebergSplit extends HiveSplit { +public class IcebergSplit extends FileSplit { public IcebergSplit(Path file, long start, long length, String[] hosts) { super(file, start, length, hosts); }