[feature](iceberg) support iceberg equality delete (#34223) (#34327)

bp #34223

Co-authored-by: Ashin Gau <AshinGau@users.noreply.github.com>
This commit is contained in:
Mingyu Chen
2024-04-30 11:51:29 +08:00
committed by GitHub
parent 7d77fd0286
commit 35f8563a75
12 changed files with 546 additions and 22 deletions

View File

@ -61,8 +61,14 @@ public class IcebergApiSource implements IcebergSource {
@Override
public String getFileFormat() {
return originTable.properties()
.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
Map<String, String> properties = originTable.properties();
if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
}
if (properties.containsKey(FLINK_WRITE_FORMAT)) {
return properties.get(FLINK_WRITE_FORMAT);
}
return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
}
@Override

View File

@ -59,8 +59,14 @@ public class IcebergHMSSource implements IcebergSource {
@Override
public String getFileFormat() throws DdlException, MetaNotFoundException {
return hmsTable.getRemoteTable().getParameters()
.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
Map<String, String> properties = hmsTable.getRemoteTable().getParameters();
if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
}
if (properties.containsKey(FLINK_WRITE_FORMAT)) {
return properties.get(FLINK_WRITE_FORMAT);
}
return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
}
public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {

View File

@ -153,7 +153,6 @@ public class IcebergScanNode extends FileQueryScanNode {
Path splitDeletePath = locationPath.toStorageLocation();
deleteFileDesc.setPath(splitDeletePath.toString());
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
fileDesc.setContent(FileContent.POSITION_DELETES.id());
IcebergDeleteFileFilter.PositionDelete positionDelete =
(IcebergDeleteFileFilter.PositionDelete) filter;
OptionalLong lowerBound = positionDelete.getPositionLowerBound();
@ -164,11 +163,12 @@ public class IcebergScanNode extends FileQueryScanNode {
if (upperBound.isPresent()) {
deleteFileDesc.setPositionUpperBound(upperBound.getAsLong());
}
deleteFileDesc.setContent(FileContent.POSITION_DELETES.id());
} else {
fileDesc.setContent(FileContent.EQUALITY_DELETES.id());
IcebergDeleteFileFilter.EqualityDelete equalityDelete =
(IcebergDeleteFileFilter.EqualityDelete) filter;
deleteFileDesc.setFieldIds(equalityDelete.getFieldIds());
deleteFileDesc.setContent(FileContent.EQUALITY_DELETES.id());
}
fileDesc.addToDeleteFiles(deleteFileDesc);
}
@ -327,8 +327,8 @@ public class IcebergScanNode extends FileQueryScanNode {
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L)));
} else if (delete.content() == FileContent.EQUALITY_DELETES) {
// todo: filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
throw new IllegalStateException("Don't support equality delete file");
filters.add(IcebergDeleteFileFilter.createEqualityDelete(
delete.path().toString(), delete.equalityFieldIds()));
} else {
throw new IllegalStateException("Unknown delete content: " + delete.content());
}

View File

@ -27,6 +27,9 @@ import org.apache.doris.thrift.TFileAttributes;
public interface IcebergSource {
// compatible with flink, which is "write.format.default" in spark
String FLINK_WRITE_FORMAT = "write-format";
TupleDescriptor getDesc();
org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException;