Support 64 bit timestamp in from_unixtime (#3069)

Support 64 bit timestamp in from_unixtime
This commit is contained in:
HangyuanLiu
2020-03-17 17:30:42 +08:00
committed by GitHub
parent 33319d659b
commit d01b58bff6
5 changed files with 51 additions and 15 deletions

View File

@ -45,8 +45,7 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader, int32_t num_of_col
ParquetReaderWrap::~ParquetReaderWrap() {
close();
}
Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs) {
Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, const std::string& timezone) {
try {
// new file reader for parquet file
auto st = parquet::arrow::FileReader::Make(arrow::default_memory_pool(),
@ -71,6 +70,8 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
// Get the Column Reader for the boolean column
_map_column.emplace(schemaDescriptor->Column(i)->name(), i);
}
_timezone = timezone;
if (_current_line_of_group == 0) {// the first read
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
@ -188,30 +189,38 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t *buf, int32_t *wbytes) {
const auto type = std::dynamic_pointer_cast<arrow::TimestampType>(ts_array->type());
// Doris only supports seconds
time_t timestamp = 0;
int64_t timestamp = 0;
switch (type->unit()) {
case arrow::TimeUnit::type::NANO: {// INT96
timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_batch) / 1000000000); // convert to Second
timestamp = ts_array->Value(_current_line_of_batch) / 1000000000L; // convert to Second
break;
}
case arrow::TimeUnit::type::SECOND: {
timestamp = (time_t)ts_array->Value(_current_line_of_batch);
timestamp = ts_array->Value(_current_line_of_batch);
break;
}
case arrow::TimeUnit::type::MILLI: {
timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_batch) / 1000); // convert to Second
timestamp = ts_array->Value(_current_line_of_batch) / 1000; // convert to Second
break;
}
case arrow::TimeUnit::type::MICRO: {
timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_batch) / 1000000); // convert to Second
timestamp = ts_array->Value(_current_line_of_batch) / 1000000; // convert to Second
break;
}
default:
return Status::InternalError("Invalid Time Type.");
}
struct tm local;
localtime_r(&timestamp, &local);
*wbytes = (uint32_t)strftime((char*)buf, 64, "%Y-%m-%d %H:%M:%S", &local);
DateTimeValue dtv;
if (!dtv.from_unixtime(timestamp, _timezone)) {
std::stringstream str_error;
str_error << "Parse timestamp (" + std::to_string(timestamp) + ") error";
LOG(WARNING) << str_error.str();
return Status::InternalError(str_error.str());
}
char* buf_end = (char*) buf;
buf_end= dtv.to_string((char*) buf_end);
*wbytes = buf_end - (char*) buf -1;
return Status::OK();
}

View File

@ -76,7 +76,7 @@ public:
Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs, MemPool* mem_pool, bool* eof);
void close();
Status size(int64_t* size);
Status init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs);
Status init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, const std::string& timezone);
private:
void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len);
@ -104,6 +104,8 @@ private:
int _rows_of_group; // rows in a group.
int _current_line_of_group;
int _current_line_of_batch;
std::string _timezone;
};
}

View File

@ -149,7 +149,9 @@ Status ParquetScanner::open_next_reader() {
} else {
_cur_file_reader = new ParquetReaderWrap(file_reader.release(), _src_slot_descs.size());
}
Status status = _cur_file_reader->init_parquet_reader(_src_slot_descs);
Status status = _cur_file_reader->init_parquet_reader(_src_slot_descs, _state->timezone());
if (status.is_end_of_file()) {
continue;
} else {

View File

@ -1557,13 +1557,28 @@ bool DateTimeValue::from_unixtime(int64_t timestamp, const std::string& timezone
if (timestamp < 0 || timestamp > 253402271999L) {
return false;
}
boost::local_time::time_zone_ptr local_time_zone = TimezoneDatabase::find_timezone(timezone);
if (local_time_zone == nullptr) {
return false;
}
boost::local_time::local_date_time lt(boost::posix_time::from_time_t(timestamp), local_time_zone);
int64_t current_t = timestamp;
boost::posix_time::ptime time = boost::posix_time::ptime(boost::gregorian::date(1970,1,1));
while (current_t > 0) {
int32_t seconds_to_add = 0;
if(current_t >= std::numeric_limits<int32_t>::max()) {
seconds_to_add = std::numeric_limits<int32_t>::max();
} else {
seconds_to_add = static_cast<int32_t>(current_t);
}
current_t -= seconds_to_add;
time += boost::posix_time::seconds(seconds_to_add);
}
boost::local_time::local_date_time lt(time, local_time_zone);
boost::posix_time::ptime local_ptime = lt.local_time();
_neg = 0;
_type = TIME_DATETIME;
_year = local_ptime.date().year();
@ -1573,7 +1588,7 @@ bool DateTimeValue::from_unixtime(int64_t timestamp, const std::string& timezone
_minute = local_ptime.time_of_day().minutes();
_second = local_ptime.time_of_day().seconds();
_microsecond = 0;
return true;
}

View File

@ -298,6 +298,14 @@ TEST_F(DateTimeValueTest, from_unixtime) {
value.from_unixtime(570672000, TimezoneDatabase::default_time_zone);
value.to_string(str);
ASSERT_STREQ("1988-02-01 08:00:00", str);
value.from_unixtime(253402271999, TimezoneDatabase::default_time_zone);
value.to_string(str);
ASSERT_STREQ("9999-12-31 23:59:59", str);
value.from_unixtime(0, TimezoneDatabase::default_time_zone);
value.to_string(str);
ASSERT_STREQ("1970-01-01 08:00:00", str);
}
// Calculate format