[bugfix](iceberg)fix read NULL with date partition (#30478)

* fix date

* fix date

* add case
This commit is contained in:
wuwenchi
2024-01-30 10:30:57 +08:00
committed by yiguolei
parent 5731ed7aad
commit 4648902350
3 changed files with 71 additions and 2 deletions

View File

@ -69,6 +69,9 @@ import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.TableScanUtil;
import java.io.IOException;
@ -220,11 +223,23 @@ public class IcebergScanNode extends FileQueryScanNode {
List<String> partitionValues = new ArrayList<>();
if (isPartitionedTable) {
StructLike structLike = splitTask.file().partition();
List<PartitionField> fields = splitTask.spec().fields();
Types.StructType structType = icebergTable.schema().asStruct();
// set partitionValue for this IcebergSplit
for (int i = 0; i < structLike.size(); i++) {
String partition = String.valueOf(structLike.get(i, Object.class));
partitionValues.add(partition);
Object obj = structLike.get(i, Object.class);
String value = String.valueOf(obj);
PartitionField partitionField = fields.get(i);
if (partitionField.transform().isIdentity()) {
Type type = structType.fieldType(partitionField.name());
if (type != null && type.typeId().equals(Type.TypeID.DATE)) {
// iceberg use integer to store date,
// we need transform it to string
value = DateTimeUtil.daysToIsoDate((Integer) obj);
}
}
partitionValues.add(value);
}
// Counts the number of partitions read