branch-2.1: [fix](mc)Fixed the issue that maxcompute catalog can only read part of the timestamp data #49600 (#49706)

Cherry-picked from #49600

Co-authored-by: daidai <changyuwei@selectdb.com>
This commit is contained in:
github-actions[bot]
2025-04-01 17:09:15 +08:00
committed by GitHub
parent a9939c09c1
commit e898dbbba0
4 changed files with 110 additions and 48 deletions

View File

@ -29,9 +29,9 @@ import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
@ -39,7 +39,7 @@ import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.holders.NullableTimeStampNanoHolder;
import org.apache.arrow.vector.holders.NullableTimeStampMicroHolder;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.log4j.Logger;
@ -246,48 +246,12 @@ public class MaxComputeColumnValue implements ColumnValue {
if (timestampType.getUnit() == org.apache.arrow.vector.types.TimeUnit.MILLISECOND) { //DATETIME
result = convertToLocalDateTime((TimeStampMilliTZVector) column, idx);
} else if (timestampType.getTimezone() == null) { // TIMESTAMP_NTZ
NullableTimeStampNanoHolder valueHoder = new NullableTimeStampNanoHolder();
((TimeStampNanoVector) column).get(idx, valueHoder);
long timestampNanos = valueHoder.value;
result = LocalDateTime.ofEpochSecond(timestampNanos / 1_000_000_000,
(int) (timestampNanos % 1_000_000_000), java.time.ZoneOffset.UTC);
NullableTimeStampMicroHolder valueHoder = new NullableTimeStampMicroHolder();
((TimeStampMicroVector) column).get(idx, valueHoder);
result = microsToInstant(valueHoder.value).atZone(java.time.ZoneOffset.UTC).toLocalDateTime();
} else { // TIMESTAMP
result = convertToLocalDateTime((TimeStampNanoTZVector) column, idx);
result = convertToLocalDateTime((TimeStampMicroTZVector) column, idx);
}
/*
timestampType.getUnit()
result = switch (timestampType.getUnit()) {
case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx);
case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx);
case MILLISECOND -> convertToLocalDateTime((TimeStampMilliTZVector) column, idx);
case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx);
};
Because :
MaxCompute type => Doris Type
DATETIME => ScalarType.createDatetimeV2Type(3)
TIMESTAMP_NTZ => ScalarType.createDatetimeV2Type(6);
and
TableBatchReadSession
.withArrowOptions (
ArrowOptions.newBuilder()
.withDatetimeUnit(TimestampUnit.MILLI)
.withTimestampUnit(TimestampUnit.NANO)
.build()
)
,
TIMESTAMP_NTZ is NTZ => column is TimeStampNanoVector
So:
case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx);
case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx);
case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx);
may never be used.
*/
return result;
}
@ -342,9 +306,14 @@ public class MaxComputeColumnValue implements ColumnValue {
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), timeZone);
}
public LocalDateTime convertToLocalDateTime(TimeStampNanoTZVector nanoTZVector, int index) {
long timestampNano = nanoTZVector.get(index);
return Instant.ofEpochSecond(timestampNano / 1_000_000_000, timestampNano % 1_000_000_000)
.atZone(timeZone).toLocalDateTime();
public LocalDateTime convertToLocalDateTime(TimeStampMicroTZVector nanoTZVector, int index) {
long timestampMicro = nanoTZVector.get(index);
return microsToInstant(timestampMicro).atZone(timeZone).toLocalDateTime();
}
private static Instant microsToInstant(long timestampMicro) {
long epochSecond = Math.floorDiv(timestampMicro, 1_000_000);
long microAdjustment = timestampMicro - epochSecond * 1_000_000;
return Instant.ofEpochSecond(epochSecond, microAdjustment * 1000);
}
}