[Opt](Iceberg) Simplify the code of getting time travel snapshotId (#34299) (#38101)

bp #34299

Co-authored-by: Butao Zhang <zhangbutao@cmss.chinamobile.com>
This commit is contained in:
Mingyu Chen
2024-07-19 09:45:56 +08:00
committed by GitHub
parent bb2b7774df
commit 27d7461644

View File

@ -58,25 +58,23 @@ import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -287,8 +285,8 @@ public class IcebergScanNode extends FileQueryScanNode {
if (type == TableSnapshot.VersionType.VERSION) {
return tableSnapshot.getVersion();
} else {
long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
return getSnapshotIdAsOfTime(icebergTable.history(), snapshotId);
long timestamp = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
return SnapshotUtil.snapshotIdAsOfTime(icebergTable, timestamp);
}
} catch (IllegalArgumentException e) {
throw new UserException(e);
@ -297,27 +295,6 @@ public class IcebergScanNode extends FileQueryScanNode {
return null;
}
private long getSnapshotIdAsOfTime(List<HistoryEntry> historyEntries, long asOfTimestamp) {
// find history at or before asOfTimestamp
HistoryEntry latestHistory = null;
for (HistoryEntry entry : historyEntries) {
if (entry.timestampMillis() <= asOfTimestamp) {
if (latestHistory == null) {
latestHistory = entry;
continue;
}
if (entry.timestampMillis() > latestHistory.timestampMillis()) {
latestHistory = entry;
}
}
}
if (latestHistory == null) {
throw new NotFoundException("No version history at or before "
+ Instant.ofEpochMilli(asOfTimestamp));
}
return latestHistory.snapshotId();
}
private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask) {
List<IcebergDeleteFileFilter> filters = new ArrayList<>();
for (DeleteFile delete : spitTask.deletes()) {