diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index bc8133472a..34af303a00 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -45,8 +45,7 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader, int32_t num_of_col ParquetReaderWrap::~ParquetReaderWrap() { close(); } - -Status ParquetReaderWrap::init_parquet_reader(const std::vector& tuple_slot_descs) { +Status ParquetReaderWrap::init_parquet_reader(const std::vector& tuple_slot_descs, const std::string& timezone) { try { // new file reader for parquet file auto st = parquet::arrow::FileReader::Make(arrow::default_memory_pool(), @@ -71,6 +70,8 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector // Get the Column Reader for the boolean column _map_column.emplace(schemaDescriptor->Column(i)->name(), i); } + + _timezone = timezone; if (_current_line_of_group == 0) {// the first read RETURN_IF_ERROR(column_indices(tuple_slot_descs)); @@ -188,30 +189,38 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr& ts_array, uint8_t *buf, int32_t *wbytes) { const auto type = std::dynamic_pointer_cast(ts_array->type()); // Doris only supports seconds - time_t timestamp = 0; + int64_t timestamp = 0; switch (type->unit()) { case arrow::TimeUnit::type::NANO: {// INT96 - timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_batch) / 1000000000); // convert to Second + timestamp = ts_array->Value(_current_line_of_batch) / 1000000000L; // convert to Second break; } case arrow::TimeUnit::type::SECOND: { - timestamp = (time_t)ts_array->Value(_current_line_of_batch); + timestamp = ts_array->Value(_current_line_of_batch); break; } case arrow::TimeUnit::type::MILLI: { - timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_batch) / 1000); // convert to Second + timestamp = ts_array->Value(_current_line_of_batch) / 1000; // convert to Second break; } case arrow::TimeUnit::type::MICRO: { - timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_batch) / 1000000); // convert to Second + timestamp = ts_array->Value(_current_line_of_batch) / 1000000; // convert to Second break; } default: return Status::InternalError("Invalid Time Type."); } - struct tm local; - localtime_r(×tamp, &local); - *wbytes = (uint32_t)strftime((char*)buf, 64, "%Y-%m-%d %H:%M:%S", &local); + + DateTimeValue dtv; + if (!dtv.from_unixtime(timestamp, _timezone)) { + std::stringstream str_error; + str_error << "Parse timestamp (" + std::to_string(timestamp) + ") error"; + LOG(WARNING) << str_error.str(); + return Status::InternalError(str_error.str()); + } + char* buf_end = (char*) buf; + buf_end= dtv.to_string((char*) buf_end); + *wbytes = buf_end - (char*) buf -1; return Status::OK(); } diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index d913b3c454..7a3fcc45ab 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -76,7 +76,7 @@ public: Status read(Tuple* tuple, const std::vector& tuple_slot_descs, MemPool* mem_pool, bool* eof); void close(); Status size(int64_t* size); - Status init_parquet_reader(const std::vector& tuple_slot_descs); + Status init_parquet_reader(const std::vector& tuple_slot_descs, const std::string& timezone); private: void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len); @@ -104,6 +104,8 @@ private: int _rows_of_group; // rows in a group. int _current_line_of_group; int _current_line_of_batch; + + std::string _timezone; }; } diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index a9962673eb..e205369092 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -149,7 +149,9 @@ Status ParquetScanner::open_next_reader() { } else { _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _src_slot_descs.size()); } - Status status = _cur_file_reader->init_parquet_reader(_src_slot_descs); + + Status status = _cur_file_reader->init_parquet_reader(_src_slot_descs, _state->timezone()); + if (status.is_end_of_file()) { continue; } else { diff --git a/be/src/runtime/datetime_value.cpp b/be/src/runtime/datetime_value.cpp index 49534903d3..0788cd63f8 100644 --- a/be/src/runtime/datetime_value.cpp +++ b/be/src/runtime/datetime_value.cpp @@ -1557,13 +1557,28 @@ bool DateTimeValue::from_unixtime(int64_t timestamp, const std::string& timezone if (timestamp < 0 || timestamp > 253402271999L) { return false; } + boost::local_time::time_zone_ptr local_time_zone = TimezoneDatabase::find_timezone(timezone); if (local_time_zone == nullptr) { return false; } - boost::local_time::local_date_time lt(boost::posix_time::from_time_t(timestamp), local_time_zone); + + int64_t current_t = timestamp; + boost::posix_time::ptime time = boost::posix_time::ptime(boost::gregorian::date(1970,1,1)); + + while (current_t > 0) { + int32_t seconds_to_add = 0; + if(current_t >= std::numeric_limits::max()) { + seconds_to_add = std::numeric_limits::max(); + } else { + seconds_to_add = static_cast(current_t); + } + current_t -= seconds_to_add; + time += boost::posix_time::seconds(seconds_to_add); + } + boost::local_time::local_date_time lt(time, local_time_zone); boost::posix_time::ptime local_ptime = lt.local_time(); - + _neg = 0; _type = TIME_DATETIME; _year = local_ptime.date().year(); @@ -1573,7 +1588,7 @@ bool DateTimeValue::from_unixtime(int64_t timestamp, const std::string& timezone _minute = local_ptime.time_of_day().minutes(); _second = local_ptime.time_of_day().seconds(); _microsecond = 0; - + return true; } diff --git a/be/test/runtime/datetime_value_test.cpp b/be/test/runtime/datetime_value_test.cpp index 9082572409..0f58bf95c2 100644 --- a/be/test/runtime/datetime_value_test.cpp +++ b/be/test/runtime/datetime_value_test.cpp @@ -298,6 +298,14 @@ TEST_F(DateTimeValueTest, from_unixtime) { value.from_unixtime(570672000, TimezoneDatabase::default_time_zone); value.to_string(str); ASSERT_STREQ("1988-02-01 08:00:00", str); + + value.from_unixtime(253402271999, TimezoneDatabase::default_time_zone); + value.to_string(str); + ASSERT_STREQ("9999-12-31 23:59:59", str); + + value.from_unixtime(0, TimezoneDatabase::default_time_zone); + value.to_string(str); + ASSERT_STREQ("1970-01-01 08:00:00", str); } // Calculate format