[Feature](iceberg-writer) Implements iceberg partition transform. (#37692)
## Proposed changes Cherry-pick iceberg partition transform functionality. #36289 #36889 --------- Co-authored-by: kang <35803862+ghkang98@users.noreply.github.com> Co-authored-by: lik40 <lik40@chinatelecom.cn> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Mingyu Chen <morningman@163.com>
This commit is contained in:
@ -0,0 +1,66 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/AnalysisException.java
|
||||
// and modified by Doris
|
||||
|
||||
package org.apache.doris.common.info;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class SimpleTableInfo {
|
||||
|
||||
private final String dbName;
|
||||
private final String tbName;
|
||||
|
||||
public SimpleTableInfo(String dbName, String tbName) {
|
||||
this.dbName = dbName;
|
||||
this.tbName = tbName;
|
||||
}
|
||||
|
||||
public String getDbName() {
|
||||
return dbName;
|
||||
}
|
||||
|
||||
public String getTbName() {
|
||||
return tbName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(dbName, tbName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (other == null || getClass() != other.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SimpleTableInfo that = (SimpleTableInfo) other;
|
||||
return Objects.equals(dbName, that.dbName) && Objects.equals(tbName, that.tbName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s.%s", dbName, tbName);
|
||||
}
|
||||
}
|
||||
@ -33,6 +33,7 @@ import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.iceberg.ManifestFiles;
|
||||
import org.apache.iceberg.SerializableTable;
|
||||
import org.apache.iceberg.Snapshot;
|
||||
import org.apache.iceberg.Table;
|
||||
import org.apache.iceberg.catalog.Catalog;
|
||||
@ -85,6 +86,20 @@ public class IcebergMetadataCache {
|
||||
return tableCache.get(key);
|
||||
}
|
||||
|
||||
public Table getAndCloneTable(CatalogIf catalog, String dbName, String tbName) {
|
||||
Table restTable;
|
||||
synchronized (this) {
|
||||
Table table = getIcebergTable(catalog, dbName, tbName);
|
||||
restTable = SerializableTable.copyOf(table);
|
||||
}
|
||||
return restTable;
|
||||
}
|
||||
|
||||
public Table getRemoteTable(CatalogIf catalog, String dbName, String tbName) {
|
||||
IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, dbName, tbName);
|
||||
return loadTable(key);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
|
||||
Table icebergTable = getIcebergTable(key.catalog, key.dbName, key.tableName);
|
||||
@ -116,7 +131,7 @@ public class IcebergMetadataCache {
|
||||
public void invalidateCatalogCache(long catalogId) {
|
||||
snapshotListCache.asMap().keySet().stream()
|
||||
.filter(key -> key.catalog.getId() == catalogId)
|
||||
.forEach(snapshotListCache::invalidate);
|
||||
.forEach(snapshotListCache::invalidate);
|
||||
|
||||
tableCache.asMap().entrySet().stream()
|
||||
.filter(entry -> entry.getKey().catalog.getId() == catalogId)
|
||||
@ -130,7 +145,7 @@ public class IcebergMetadataCache {
|
||||
snapshotListCache.asMap().keySet().stream()
|
||||
.filter(key -> key.catalog.getId() == catalogId && key.dbName.equals(dbName) && key.tableName.equals(
|
||||
tblName))
|
||||
.forEach(snapshotListCache::invalidate);
|
||||
.forEach(snapshotListCache::invalidate);
|
||||
|
||||
tableCache.asMap().entrySet().stream()
|
||||
.filter(entry -> {
|
||||
|
||||
@ -64,6 +64,10 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
|
||||
return catalog;
|
||||
}
|
||||
|
||||
public IcebergExternalCatalog getExternalCatalog() {
|
||||
return dorisCatalog;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
@ -21,31 +21,39 @@
|
||||
package org.apache.doris.datasource.iceberg;
|
||||
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.thrift.TFileContent;
|
||||
import org.apache.doris.common.info.SimpleTableInfo;
|
||||
import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper;
|
||||
import org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
|
||||
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
|
||||
import org.apache.doris.thrift.TIcebergCommitData;
|
||||
import org.apache.doris.thrift.TUpdateMode;
|
||||
import org.apache.doris.transaction.Transaction;
|
||||
|
||||
import com.google.common.base.VerifyException;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.iceberg.AppendFiles;
|
||||
import org.apache.iceberg.DataFiles;
|
||||
import org.apache.iceberg.FileContent;
|
||||
import org.apache.iceberg.Metrics;
|
||||
import org.apache.iceberg.FileFormat;
|
||||
import org.apache.iceberg.PartitionSpec;
|
||||
import org.apache.iceberg.ReplacePartitions;
|
||||
import org.apache.iceberg.Table;
|
||||
import org.apache.iceberg.catalog.TableIdentifier;
|
||||
import org.apache.iceberg.io.WriteResult;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class IcebergTransaction implements Transaction {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(IcebergTransaction.class);
|
||||
|
||||
private final IcebergMetadataOps ops;
|
||||
private SimpleTableInfo tableInfo;
|
||||
private Table table;
|
||||
|
||||
|
||||
private org.apache.iceberg.Transaction transaction;
|
||||
private final List<TIcebergCommitData> commitDataList = Lists.newArrayList();
|
||||
|
||||
@ -59,140 +67,123 @@ public class IcebergTransaction implements Transaction {
|
||||
}
|
||||
}
|
||||
|
||||
public void beginInsert(String dbName, String tbName) {
|
||||
Table icebergTable = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName));
|
||||
transaction = icebergTable.newTransaction();
|
||||
public void beginInsert(SimpleTableInfo tableInfo) {
|
||||
this.tableInfo = tableInfo;
|
||||
this.table = getNativeTable(tableInfo);
|
||||
this.transaction = table.newTransaction();
|
||||
}
|
||||
|
||||
public void finishInsert() {
|
||||
Table icebergTable = transaction.table();
|
||||
AppendFiles appendFiles = transaction.newAppend();
|
||||
public void finishInsert(SimpleTableInfo tableInfo, Optional<InsertCommandContext> insertCtx) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("iceberg table {} insert table finished!", tableInfo);
|
||||
}
|
||||
|
||||
for (CommitTaskData task : convertToCommitTaskData()) {
|
||||
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
|
||||
.withPath(task.getPath())
|
||||
.withFileSizeInBytes(task.getFileSizeInBytes())
|
||||
.withFormat(IcebergUtils.getFileFormat(icebergTable))
|
||||
.withMetrics(task.getMetrics());
|
||||
//create and start the iceberg transaction
|
||||
TUpdateMode updateMode = TUpdateMode.APPEND;
|
||||
if (insertCtx.isPresent()) {
|
||||
updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE
|
||||
: TUpdateMode.APPEND;
|
||||
}
|
||||
updateManifestAfterInsert(updateMode);
|
||||
}
|
||||
|
||||
if (icebergTable.spec().isPartitioned()) {
|
||||
List<String> partitionValues = task.getPartitionValues()
|
||||
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
|
||||
builder.withPartitionValues(partitionValues);
|
||||
private void updateManifestAfterInsert(TUpdateMode updateMode) {
|
||||
PartitionSpec spec = table.spec();
|
||||
FileFormat fileFormat = IcebergUtils.getFileFormat(table);
|
||||
|
||||
//convert commitDataList to writeResult
|
||||
WriteResult writeResult = IcebergWriterHelper
|
||||
.convertToWriterResult(fileFormat, spec, commitDataList);
|
||||
List<WriteResult> pendingResults = Lists.newArrayList(writeResult);
|
||||
|
||||
if (spec.isPartitioned()) {
|
||||
partitionManifestUpdate(updateMode, table, pendingResults);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("{} {} table partition manifest successful and writeResult : {}..", tableInfo, updateMode,
|
||||
writeResult);
|
||||
}
|
||||
appendFiles.appendFile(builder.build());
|
||||
}
|
||||
|
||||
// in appendFiles.commit, it will generate metadata(manifest and snapshot)
|
||||
// after appendFiles.commit, in current transaction, you can already see the new snapshot
|
||||
appendFiles.commit();
|
||||
}
|
||||
|
||||
public List<CommitTaskData> convertToCommitTaskData() {
|
||||
List<CommitTaskData> commitTaskData = new ArrayList<>();
|
||||
for (TIcebergCommitData data : this.commitDataList) {
|
||||
commitTaskData.add(new CommitTaskData(
|
||||
data.getFilePath(),
|
||||
data.getFileSize(),
|
||||
new Metrics(
|
||||
data.getRowCount(),
|
||||
Collections.EMPTY_MAP,
|
||||
Collections.EMPTY_MAP,
|
||||
Collections.EMPTY_MAP,
|
||||
Collections.EMPTY_MAP
|
||||
),
|
||||
data.isSetPartitionValues() ? Optional.of(data.getPartitionValues()) : Optional.empty(),
|
||||
convertToFileContent(data.getFileContent()),
|
||||
data.isSetReferencedDataFiles() ? Optional.of(data.getReferencedDataFiles()) : Optional.empty()
|
||||
));
|
||||
}
|
||||
return commitTaskData;
|
||||
}
|
||||
|
||||
private FileContent convertToFileContent(TFileContent content) {
|
||||
if (content.equals(TFileContent.DATA)) {
|
||||
return FileContent.DATA;
|
||||
} else if (content.equals(TFileContent.POSITION_DELETES)) {
|
||||
return FileContent.POSITION_DELETES;
|
||||
} else {
|
||||
return FileContent.EQUALITY_DELETES;
|
||||
tableManifestUpdate(updateMode, table, pendingResults);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("{} {} table manifest successful and writeResult : {}..", tableInfo, updateMode,
|
||||
writeResult);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() throws UserException {
|
||||
// Externally readable
|
||||
// Manipulate the relevant data so that others can also see the latest table, such as:
|
||||
// 1. hadoop: it will change the version number information in 'version-hint.text'
|
||||
// 2. hive: it will change the table properties, the most important thing is to revise 'metadata_location'
|
||||
// 3. and so on ...
|
||||
// commit the iceberg transaction
|
||||
transaction.commitTransaction();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollback() {
|
||||
|
||||
//do nothing
|
||||
}
|
||||
|
||||
public long getUpdateCnt() {
|
||||
return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum();
|
||||
}
|
||||
|
||||
public static class CommitTaskData {
|
||||
private final String path;
|
||||
private final long fileSizeInBytes;
|
||||
private final Metrics metrics;
|
||||
private final Optional<List<String>> partitionValues;
|
||||
private final FileContent content;
|
||||
private final Optional<List<String>> referencedDataFiles;
|
||||
|
||||
public CommitTaskData(String path,
|
||||
long fileSizeInBytes,
|
||||
Metrics metrics,
|
||||
Optional<List<String>> partitionValues,
|
||||
FileContent content,
|
||||
Optional<List<String>> referencedDataFiles) {
|
||||
this.path = path;
|
||||
this.fileSizeInBytes = fileSizeInBytes;
|
||||
this.metrics = metrics;
|
||||
this.partitionValues = convertPartitionValuesForNull(partitionValues);
|
||||
this.content = content;
|
||||
this.referencedDataFiles = referencedDataFiles;
|
||||
private synchronized Table getNativeTable(SimpleTableInfo tableInfo) {
|
||||
Objects.requireNonNull(tableInfo);
|
||||
IcebergExternalCatalog externalCatalog = ops.getExternalCatalog();
|
||||
return IcebergUtils.getRemoteTable(externalCatalog, tableInfo);
|
||||
}
|
||||
|
||||
private void partitionManifestUpdate(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) {
|
||||
if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
|
||||
LOG.warn("{} partitionManifestUp method call but pendingResults is null or empty!", table.name());
|
||||
return;
|
||||
}
|
||||
|
||||
private Optional<List<String>> convertPartitionValuesForNull(Optional<List<String>> partitionValues) {
|
||||
if (!partitionValues.isPresent()) {
|
||||
return partitionValues;
|
||||
// Commit the appendPartitionOperator transaction.
|
||||
if (updateMode == TUpdateMode.APPEND) {
|
||||
commitAppendTxn(table, pendingResults);
|
||||
} else {
|
||||
ReplacePartitions appendPartitionOp = table.newReplacePartitions();
|
||||
for (WriteResult result : pendingResults) {
|
||||
Preconditions.checkState(result.referencedDataFiles().length == 0,
|
||||
"Should have no referenced data files.");
|
||||
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
|
||||
}
|
||||
List<String> values = partitionValues.get();
|
||||
if (!values.contains("null")) {
|
||||
return partitionValues;
|
||||
}
|
||||
return Optional.of(values.stream().map(s -> s.equals("null") ? null : s).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public long getFileSizeInBytes() {
|
||||
return fileSizeInBytes;
|
||||
}
|
||||
|
||||
public Metrics getMetrics() {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
public Optional<List<String>> getPartitionValues() {
|
||||
return partitionValues;
|
||||
}
|
||||
|
||||
public FileContent getContent() {
|
||||
return content;
|
||||
}
|
||||
|
||||
public Optional<List<String>> getReferencedDataFiles() {
|
||||
return referencedDataFiles;
|
||||
appendPartitionOp.commit();
|
||||
}
|
||||
}
|
||||
|
||||
private void tableManifestUpdate(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) {
|
||||
if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
|
||||
LOG.warn("{} tableManifestUp method call but pendingResults is null or empty!", table.name());
|
||||
return;
|
||||
}
|
||||
// Commit the appendPartitionOperator transaction.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("{} tableManifestUp method call ", table.name());
|
||||
}
|
||||
if (updateMode == TUpdateMode.APPEND) {
|
||||
commitAppendTxn(table, pendingResults);
|
||||
} else {
|
||||
ReplacePartitions appendPartitionOp = table.newReplacePartitions();
|
||||
for (WriteResult result : pendingResults) {
|
||||
Preconditions.checkState(result.referencedDataFiles().length == 0,
|
||||
"Should have no referenced data files.");
|
||||
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
|
||||
}
|
||||
appendPartitionOp.commit();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void commitAppendTxn(Table table, List<WriteResult> pendingResults) {
|
||||
// To be compatible with iceberg format V1.
|
||||
AppendFiles appendFiles = table.newAppend();
|
||||
for (WriteResult result : pendingResults) {
|
||||
Preconditions.checkState(result.referencedDataFiles().length == 0,
|
||||
"Should have no referenced data files for append.");
|
||||
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
|
||||
}
|
||||
appendFiles.commit();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -43,6 +43,7 @@ import org.apache.doris.catalog.StructField;
|
||||
import org.apache.doris.catalog.StructType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.info.SimpleTableInfo;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
|
||||
@ -50,6 +51,7 @@ import org.apache.doris.nereids.exceptions.NotSupportedException;
|
||||
import org.apache.doris.thrift.TExprOpcode;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.iceberg.FileFormat;
|
||||
import org.apache.iceberg.PartitionSpec;
|
||||
import org.apache.iceberg.Schema;
|
||||
import org.apache.iceberg.Snapshot;
|
||||
@ -87,6 +89,8 @@ public class IcebergUtils {
|
||||
// https://iceberg.apache.org/spec/#schemas-and-data-types
|
||||
// All time and timestamp values are stored with microsecond precision
|
||||
private static final int ICEBERG_DATETIME_SCALE_MS = 6;
|
||||
private static final String PARQUET_NAME = "parquet";
|
||||
private static final String ORC_NAME = "orc";
|
||||
|
||||
public static final String TOTAL_RECORDS = "total-records";
|
||||
public static final String TOTAL_POSITION_DELETES = "total-position-deletes";
|
||||
@ -522,8 +526,8 @@ public class IcebergUtils {
|
||||
case MAP:
|
||||
Types.MapType map = (Types.MapType) type;
|
||||
return new MapType(
|
||||
icebergTypeToDorisType(map.keyType()),
|
||||
icebergTypeToDorisType(map.valueType())
|
||||
icebergTypeToDorisType(map.keyType()),
|
||||
icebergTypeToDorisType(map.valueType())
|
||||
);
|
||||
case STRUCT:
|
||||
Types.StructType struct = (Types.StructType) type;
|
||||
@ -536,11 +540,30 @@ public class IcebergUtils {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static org.apache.iceberg.Table getIcebergTable(ExternalCatalog catalog, String dbName, String tblName) {
|
||||
return getIcebergTableInternal(catalog, dbName, tblName, false);
|
||||
}
|
||||
|
||||
public static org.apache.iceberg.Table getAndCloneTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) {
|
||||
return getIcebergTableInternal(catalog, tableInfo.getDbName(), tableInfo.getTbName(), true);
|
||||
}
|
||||
|
||||
public static org.apache.iceberg.Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) {
|
||||
return Env.getCurrentEnv()
|
||||
.getExtMetaCacheMgr()
|
||||
.getIcebergMetadataCache()
|
||||
.getIcebergTable(catalog, dbName, tblName);
|
||||
.getRemoteTable(catalog, tableInfo.getDbName(), tableInfo.getTbName());
|
||||
}
|
||||
|
||||
private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog catalog, String dbName,
|
||||
String tblName,
|
||||
boolean isClone) {
|
||||
IcebergMetadataCache metadataCache = Env.getCurrentEnv()
|
||||
.getExtMetaCacheMgr()
|
||||
.getIcebergMetadataCache();
|
||||
return isClone ? metadataCache.getAndCloneTable(catalog, dbName, tblName)
|
||||
: metadataCache.getIcebergTable(catalog, dbName, tblName);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -587,17 +610,27 @@ public class IcebergUtils {
|
||||
return -1;
|
||||
}
|
||||
|
||||
public static String getFileFormat(Table table) {
|
||||
Map<String, String> properties = table.properties();
|
||||
|
||||
public static FileFormat getFileFormat(Table icebergTable) {
|
||||
Map<String, String> properties = icebergTable.properties();
|
||||
String fileFormatName;
|
||||
if (properties.containsKey(WRITE_FORMAT)) {
|
||||
return properties.get(WRITE_FORMAT);
|
||||
fileFormatName = properties.get(WRITE_FORMAT);
|
||||
} else {
|
||||
fileFormatName = properties.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, PARQUET_NAME);
|
||||
}
|
||||
if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
|
||||
return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
|
||||
FileFormat fileFormat;
|
||||
if (fileFormatName.toLowerCase().contains(ORC_NAME)) {
|
||||
fileFormat = FileFormat.ORC;
|
||||
} else if (fileFormatName.toLowerCase().contains(PARQUET_NAME)) {
|
||||
fileFormat = FileFormat.PARQUET;
|
||||
} else {
|
||||
throw new RuntimeException("Unsupported input format type: " + fileFormatName);
|
||||
}
|
||||
return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
|
||||
return fileFormat;
|
||||
}
|
||||
|
||||
|
||||
public static String getFileCompress(Table table) {
|
||||
Map<String, String> properties = table.properties();
|
||||
if (properties.containsKey(COMPRESSION_CODEC)) {
|
||||
@ -605,11 +638,11 @@ public class IcebergUtils {
|
||||
} else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) {
|
||||
return properties.get(SPARK_SQL_COMPRESSION_CODEC);
|
||||
}
|
||||
String fileFormat = getFileFormat(table);
|
||||
if (fileFormat.equalsIgnoreCase("parquet")) {
|
||||
FileFormat fileFormat = getFileFormat(table);
|
||||
if (fileFormat == FileFormat.PARQUET) {
|
||||
return properties.getOrDefault(
|
||||
TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
|
||||
} else if (fileFormat.equalsIgnoreCase("orc")) {
|
||||
} else if (fileFormat == FileFormat.ORC) {
|
||||
return properties.getOrDefault(
|
||||
TableProperties.ORC_COMPRESSION, TableProperties.ORC_COMPRESSION_DEFAULT);
|
||||
}
|
||||
@ -620,9 +653,10 @@ public class IcebergUtils {
|
||||
Map<String, String> properties = table.properties();
|
||||
if (properties.containsKey(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)) {
|
||||
throw new NotSupportedException(
|
||||
"Table " + table.name() + " specifies " + properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
|
||||
+ " as a location provider. "
|
||||
+ "Writing to Iceberg tables with custom location provider is not supported.");
|
||||
"Table " + table.name() + " specifies " + properties
|
||||
.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
|
||||
+ " as a location provider. "
|
||||
+ "Writing to Iceberg tables with custom location provider is not supported.");
|
||||
}
|
||||
String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION);
|
||||
if (dataLocation == null) {
|
||||
|
||||
@ -0,0 +1,91 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.iceberg.helper;
|
||||
|
||||
import org.apache.doris.datasource.statistics.CommonStatistics;
|
||||
import org.apache.doris.thrift.TIcebergCommitData;
|
||||
|
||||
import com.google.common.base.VerifyException;
|
||||
import org.apache.iceberg.DataFile;
|
||||
import org.apache.iceberg.DataFiles;
|
||||
import org.apache.iceberg.FileFormat;
|
||||
import org.apache.iceberg.PartitionSpec;
|
||||
import org.apache.iceberg.io.WriteResult;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class IcebergWriterHelper {
|
||||
|
||||
private static final int DEFAULT_FILE_COUNT = 1;
|
||||
|
||||
public static WriteResult convertToWriterResult(
|
||||
FileFormat format,
|
||||
PartitionSpec spec,
|
||||
List<TIcebergCommitData> commitDataList) {
|
||||
List<DataFile> dataFiles = new ArrayList<>();
|
||||
for (TIcebergCommitData commitData : commitDataList) {
|
||||
//get the files path
|
||||
String location = commitData.getFilePath();
|
||||
|
||||
//get the commit file statistics
|
||||
long fileSize = commitData.getFileSize();
|
||||
long recordCount = commitData.getRowCount();
|
||||
CommonStatistics stat = new CommonStatistics(recordCount, DEFAULT_FILE_COUNT, fileSize);
|
||||
|
||||
Optional<List<String>> partValues = Optional.empty();
|
||||
//get and check partitionValues when table is partitionedTable
|
||||
if (spec.isPartitioned()) {
|
||||
List<String> partitionValues = commitData.getPartitionValues();
|
||||
if (Objects.isNull(partitionValues) || partitionValues.isEmpty()) {
|
||||
throw new VerifyException("No partition data for partitioned table");
|
||||
}
|
||||
partitionValues = partitionValues.stream().map(s -> s.equals("null") ? null : s)
|
||||
.collect(Collectors.toList());
|
||||
partValues = Optional.of(partitionValues);
|
||||
}
|
||||
DataFile dataFile = genDataFile(format, location, spec, partValues, stat);
|
||||
dataFiles.add(dataFile);
|
||||
}
|
||||
return WriteResult.builder()
|
||||
.addDataFiles(dataFiles)
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
public static DataFile genDataFile(
|
||||
FileFormat format,
|
||||
String location,
|
||||
PartitionSpec spec,
|
||||
Optional<List<String>> partValues,
|
||||
CommonStatistics statistics) {
|
||||
|
||||
DataFiles.Builder builder = DataFiles.builder(spec)
|
||||
.withPath(location)
|
||||
.withFileSizeInBytes(statistics.getTotalFileBytes())
|
||||
.withRecordCount(statistics.getRowCount())
|
||||
.withFormat(format);
|
||||
|
||||
partValues.ifPresent(builder::withPartitionValues);
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
@ -61,7 +61,7 @@ public class IcebergApiSource implements IcebergSource {
|
||||
|
||||
@Override
|
||||
public String getFileFormat() {
|
||||
return IcebergUtils.getFileFormat(originTable);
|
||||
return IcebergUtils.getFileFormat(originTable).name();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -41,7 +41,7 @@ public class IcebergHMSSource implements IcebergSource {
|
||||
private final org.apache.iceberg.Table icebergTable;
|
||||
|
||||
public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
|
||||
Map<String, ColumnRange> columnNameToRange) {
|
||||
Map<String, ColumnRange> columnNameToRange) {
|
||||
this.hmsTable = hmsTable;
|
||||
this.desc = desc;
|
||||
this.columnNameToRange = columnNameToRange;
|
||||
@ -58,7 +58,7 @@ public class IcebergHMSSource implements IcebergSource {
|
||||
|
||||
@Override
|
||||
public String getFileFormat() throws DdlException, MetaNotFoundException {
|
||||
return IcebergUtils.getFileFormat(icebergTable);
|
||||
return IcebergUtils.getFileFormat(icebergTable).name();
|
||||
}
|
||||
|
||||
public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {
|
||||
|
||||
@ -0,0 +1,81 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.statistics;
|
||||
|
||||
public class CommonStatistics {
|
||||
|
||||
public static final CommonStatistics EMPTY = new CommonStatistics(0L, 0L, 0L);
|
||||
|
||||
private final long rowCount;
|
||||
private final long fileCount;
|
||||
private final long totalFileBytes;
|
||||
|
||||
public CommonStatistics(long rowCount, long fileCount, long totalFileBytes) {
|
||||
this.fileCount = fileCount;
|
||||
this.rowCount = rowCount;
|
||||
this.totalFileBytes = totalFileBytes;
|
||||
}
|
||||
|
||||
public long getRowCount() {
|
||||
return rowCount;
|
||||
}
|
||||
|
||||
public long getFileCount() {
|
||||
return fileCount;
|
||||
}
|
||||
|
||||
public long getTotalFileBytes() {
|
||||
return totalFileBytes;
|
||||
}
|
||||
|
||||
public static CommonStatistics reduce(
|
||||
CommonStatistics current,
|
||||
CommonStatistics update,
|
||||
ReduceOperator operator) {
|
||||
return new CommonStatistics(
|
||||
reduce(current.getRowCount(), update.getRowCount(), operator),
|
||||
reduce(current.getFileCount(), update.getFileCount(), operator),
|
||||
reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), operator));
|
||||
}
|
||||
|
||||
public static long reduce(long current, long update, ReduceOperator operator) {
|
||||
if (current >= 0 && update >= 0) {
|
||||
switch (operator) {
|
||||
case ADD:
|
||||
return current + update;
|
||||
case SUBTRACT:
|
||||
return current - update;
|
||||
case MAX:
|
||||
return Math.max(current, update);
|
||||
case MIN:
|
||||
return Math.min(current, update);
|
||||
default:
|
||||
throw new IllegalArgumentException("Unexpected operator: " + operator);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
public enum ReduceOperator {
|
||||
ADD,
|
||||
SUBTRACT,
|
||||
MIN,
|
||||
MAX,
|
||||
}
|
||||
}
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.nereids.trees.plans.commands.insert;
|
||||
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.info.SimpleTableInfo;
|
||||
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
|
||||
import org.apache.doris.datasource.iceberg.IcebergTransaction;
|
||||
import org.apache.doris.nereids.NereidsPlanner;
|
||||
@ -39,9 +40,9 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor {
|
||||
* constructor
|
||||
*/
|
||||
public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable table,
|
||||
String labelName, NereidsPlanner planner,
|
||||
Optional<InsertCommandContext> insertCtx,
|
||||
boolean emptyInsert) {
|
||||
String labelName, NereidsPlanner planner,
|
||||
Optional<InsertCommandContext> insertCtx,
|
||||
boolean emptyInsert) {
|
||||
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
|
||||
}
|
||||
|
||||
@ -52,10 +53,22 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doBeforeCommit() throws UserException {
|
||||
protected void beforeExec() {
|
||||
String dbName = ((IcebergExternalTable) table).getDbName();
|
||||
String tbName = table.getName();
|
||||
SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
|
||||
IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId);
|
||||
loadedRows = transaction.getUpdateCnt();
|
||||
transaction.finishInsert();
|
||||
transaction.beginInsert(tableInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doBeforeCommit() throws UserException {
|
||||
String dbName = ((IcebergExternalTable) table).getDbName();
|
||||
String tbName = table.getName();
|
||||
SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
|
||||
IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId);
|
||||
this.loadedRows = transaction.getUpdateCnt();
|
||||
transaction.finishInsert(tableInfo, insertCtx);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -63,9 +76,4 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor {
|
||||
return TransactionType.ICEBERG;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void beforeExec() {
|
||||
IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId);
|
||||
transaction.beginInsert(((IcebergExternalTable) table).getDbName(), table.getName());
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,7 +121,7 @@ public class IcebergTableSink extends BaseExternalTableDataSink {
|
||||
}
|
||||
|
||||
// file info
|
||||
tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable)));
|
||||
tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable).name()));
|
||||
tSink.setCompressionType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable)));
|
||||
|
||||
// hadoop config
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.transaction;
|
||||
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
|
||||
@ -58,12 +59,12 @@ public class IcebergTransactionManager implements TransactionManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Transaction getTransaction(long id) {
|
||||
public IcebergTransaction getTransaction(long id) {
|
||||
return getTransactionWithException(id);
|
||||
}
|
||||
|
||||
public Transaction getTransactionWithException(long id) {
|
||||
Transaction icebergTransaction = transactions.get(id);
|
||||
public IcebergTransaction getTransactionWithException(long id) {
|
||||
IcebergTransaction icebergTransaction = transactions.get(id);
|
||||
if (icebergTransaction == null) {
|
||||
throw new RuntimeException("Can't find transaction for " + id);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user