[Bug](iceberg)fix read partitioned iceberg without partition path (#25503)

Iceberg does not require partition values to exist on file paths, so we should get the partition value from `PartitionScanTask.partition`.
This commit is contained in:
wuwenchi
2023-10-31 18:09:53 +08:00
committed by GitHub
parent b137f03921
commit b98744ae90
6 changed files with 44 additions and 13 deletions

View File

@ -296,7 +296,8 @@ public class SlotDescriptor {
public TSlotDescriptor toThrift() {
// Non-nullable slots will have 0 for the byte offset and -1 for the bit mask
TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), parent.getId().asInt(), type.toThrift(), -1,
byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? column.getNonShadowName() : ""), slotIdx,
byteOffset, 0, getIsNullable() ? 0 : -1,
((column != null) ? column.getNonShadowName() : ""), slotIdx,
isMaterialized);
tSlotDescriptor.setNeedMaterialize(needMaterialize);
tSlotDescriptor.setIsAutoIncrement(isAutoInc);

View File

@ -62,8 +62,8 @@ import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
@ -86,7 +86,6 @@ import java.util.stream.Collectors;
public class IcebergScanNode extends FileQueryScanNode {
public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
public static final String DEFAULT_DATA_PATH = "/data/";
private static final String TOTAL_RECORDS = "total-records";
private static final String TOTAL_POSITION_DELETES = "total-position-deletes";
private static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes";
@ -201,8 +200,6 @@ public class IcebergScanNode extends FileQueryScanNode {
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE);
HashSet<String> partitionPathSet = new HashSet<>();
String dataPath = normalizeLocation(icebergTable.location()) + icebergTable.properties()
.getOrDefault(TableProperties.WRITE_DATA_LOCATION, DEFAULT_DATA_PATH);
boolean isPartitionedTable = icebergTable.spec().isPartitioned();
CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize);
@ -211,12 +208,18 @@ public class IcebergScanNode extends FileQueryScanNode {
combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> {
String dataFilePath = normalizeLocation(splitTask.file().path().toString());
// Counts the number of partitions read
List<String> partitionValues = new ArrayList<>();
if (isPartitionedTable) {
int last = dataFilePath.lastIndexOf("/");
if (last > 0) {
partitionPathSet.add(dataFilePath.substring(dataPath.length(), last));
StructLike structLike = splitTask.file().partition();
// set partitionValue for this IcebergSplit
for (int i = 0; i < structLike.size(); i++) {
String partition = String.valueOf(structLike.get(i, Object.class));
partitionValues.add(partition);
}
// Counts the number of partitions read
partitionPathSet.add(structLike.toString());
}
Path finalDataFilePath = S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
@ -227,7 +230,8 @@ public class IcebergScanNode extends FileQueryScanNode {
splitTask.file().fileSizeInBytes(),
new String[0],
formatVersion,
source.getCatalog().getProperties());
source.getCatalog().getProperties(),
partitionValues);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
}

View File

@ -30,8 +30,9 @@ public class IcebergSplit extends FileSplit {
// File path will be changed if the file is modified, so there's no need to get modification time.
public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts,
Integer formatVersion, Map<String, String> config) {
super(file, start, length, fileLength, hosts, null);
Integer formatVersion, Map<String, String> config,
List<String> partitionList) {
super(file, start, length, fileLength, hosts, partitionList);
this.formatVersion = formatVersion;
this.config = config;
}