[refactor](cache) Refactor preloaded timezone global cache (#24694)
Refactor preloaded timezone global cache
This commit is contained in:
@ -25,7 +25,6 @@
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
@ -36,14 +35,12 @@
|
||||
#include "olap/options.h"
|
||||
#include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove this include header
|
||||
#include "util/threadpool.h"
|
||||
#include "vec/common/hash_table/phmap_fwd_decl.h"
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
class VDataStreamMgr;
|
||||
class ScannerScheduler;
|
||||
class DeltaWriterV2Pool;
|
||||
using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
|
||||
} // namespace vectorized
|
||||
namespace pipeline {
|
||||
class TaskScheduler;
|
||||
@ -235,9 +232,6 @@ public:
|
||||
}
|
||||
|
||||
#endif
|
||||
vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; }
|
||||
std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; }
|
||||
|
||||
stream_load::LoadStreamStubPool* load_stream_stub_pool() {
|
||||
return _load_stream_stub_pool.get();
|
||||
}
|
||||
@ -347,9 +341,6 @@ private:
|
||||
std::unique_ptr<stream_load::LoadStreamStubPool> _load_stream_stub_pool;
|
||||
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
|
||||
|
||||
std::unique_ptr<vectorized::ZoneList> _global_zone_cache;
|
||||
std::shared_mutex _zone_cache_rw_lock;
|
||||
|
||||
std::mutex _frontends_lock;
|
||||
std::map<TNetworkAddress, FrontendInfo> _frontends;
|
||||
GroupCommitMgr* _group_commit_mgr = nullptr;
|
||||
|
||||
@ -155,10 +155,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
|
||||
_broker_client_cache = new BrokerServiceClientCache(config::max_client_cache_size_per_host);
|
||||
|
||||
TimezoneUtils::load_timezone_names();
|
||||
|
||||
// _global_zone_cache is not owned by ExecEnv ... maybe should refactor.
|
||||
_global_zone_cache = std::make_unique<vectorized::ZoneList>();
|
||||
TimezoneUtils::load_timezones_to_cache(*_global_zone_cache);
|
||||
TimezoneUtils::load_timezones_to_cache();
|
||||
|
||||
ThreadPoolBuilder("SendBatchThreadPool")
|
||||
.set_min_threads(config::send_batch_thread_pool_thread_num)
|
||||
|
||||
@ -31,6 +31,9 @@
|
||||
#include <cctype>
|
||||
#include <exception>
|
||||
#include <filesystem>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
|
||||
#include "common/exception.h"
|
||||
@ -38,14 +41,23 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
namespace vectorized {
|
||||
using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
|
||||
}
|
||||
|
||||
RE2 TimezoneUtils::time_zone_offset_format_reg("^[+-]{1}\\d{2}\\:\\d{2}$");
|
||||
|
||||
std::unordered_map<std::string, std::string> TimezoneUtils::timezone_names_map_;
|
||||
bool TimezoneUtils::inited_ = false;
|
||||
// for ut, make it never nullptr.
|
||||
std::unique_ptr<vectorized::ZoneList> zone_cache = std::make_unique<vectorized::ZoneList>();
|
||||
std::shared_mutex zone_cache_rw_lock;
|
||||
|
||||
const std::string TimezoneUtils::default_time_zone = "+08:00";
|
||||
static const char* tzdir = "/usr/share/zoneinfo"; // default value, may change by TZDIR env var
|
||||
|
||||
void TimezoneUtils::clear_timezone_names() {
|
||||
void TimezoneUtils::clear_timezone_caches() {
|
||||
zone_cache->clear();
|
||||
timezone_names_map_.clear();
|
||||
inited_ = false;
|
||||
}
|
||||
@ -211,8 +223,8 @@ bool parse_load_timezone(vectorized::ZoneList& zone_list, int8_t* data, int len,
|
||||
|
||||
} // namespace
|
||||
|
||||
void TimezoneUtils::load_timezones_to_cache(vectorized::ZoneList& cache_list) {
|
||||
cache_list["CST"] = cctz::fixed_time_zone(cctz::seconds(8 * 3600));
|
||||
void TimezoneUtils::load_timezones_to_cache() {
|
||||
(*zone_cache)["CST"] = cctz::fixed_time_zone(cctz::seconds(8 * 3600));
|
||||
|
||||
std::string base_str;
|
||||
// try get from System
|
||||
@ -240,7 +252,7 @@ void TimezoneUtils::load_timezones_to_cache(vectorized::ZoneList& cache_list) {
|
||||
auto tz_path = dir_entry.path().string();
|
||||
auto [handle, length] = load_file_to_memory(tz_path);
|
||||
|
||||
parse_load_timezone(cache_list, handle, length);
|
||||
parse_load_timezone(*zone_cache, handle, length);
|
||||
|
||||
delete[] handle;
|
||||
} else if (dir_entry.is_directory() && ignore_paths.contains(dir_entry.path().filename())) {
|
||||
@ -248,11 +260,24 @@ void TimezoneUtils::load_timezones_to_cache(vectorized::ZoneList& cache_list) {
|
||||
}
|
||||
}
|
||||
|
||||
cache_list.erase("LMT"); // local mean time for every timezone
|
||||
LOG(INFO) << "Read " << cache_list.size() << " timezones.";
|
||||
zone_cache->erase("LMT"); // local mean time for every timezone
|
||||
LOG(INFO) << "Read " << zone_cache->size() << " timezones.";
|
||||
}
|
||||
|
||||
bool TimezoneUtils::find_cctz_time_zone(const std::string& timezone, cctz::time_zone& ctz) {
|
||||
zone_cache_rw_lock.lock_shared();
|
||||
if (auto it = zone_cache->find(timezone); it != nullptr) {
|
||||
ctz = it->second;
|
||||
zone_cache_rw_lock.unlock_shared();
|
||||
return true;
|
||||
}
|
||||
zone_cache_rw_lock.unlock_shared();
|
||||
return find_cctz_time_zone_impl(timezone, ctz);
|
||||
}
|
||||
|
||||
bool TimezoneUtils::find_cctz_time_zone_impl(const std::string& timezone, cctz::time_zone& ctz) {
|
||||
// now timezone is not in zone_cache
|
||||
|
||||
auto timezone_lower = boost::algorithm::to_lower_copy(timezone);
|
||||
re2::StringPiece value;
|
||||
// +08:00
|
||||
@ -273,6 +298,8 @@ bool TimezoneUtils::find_cctz_time_zone(const std::string& timezone, cctz::time_
|
||||
int offset = hour * 60 * 60 + minute * 60;
|
||||
offset *= positive ? 1 : -1;
|
||||
ctz = cctz::fixed_time_zone(cctz::seconds(offset));
|
||||
std::unique_lock<std::shared_mutex> l(zone_cache_rw_lock);
|
||||
zone_cache->emplace(timezone, ctz);
|
||||
return true;
|
||||
} else { // not only offset, GMT or GMT+8
|
||||
// split tz_name and offset
|
||||
@ -312,6 +339,8 @@ bool TimezoneUtils::find_cctz_time_zone(const std::string& timezone, cctz::time_
|
||||
}
|
||||
if (tz_parsed) {
|
||||
if (!have_both) { // GMT only
|
||||
std::unique_lock<std::shared_mutex> l(zone_cache_rw_lock);
|
||||
zone_cache->emplace(timezone, ctz);
|
||||
return true;
|
||||
}
|
||||
// GMT+8
|
||||
@ -320,6 +349,8 @@ bool TimezoneUtils::find_cctz_time_zone(const std::string& timezone, cctz::time_
|
||||
(cctz::convert(cctz::civil_second {}, offset) -
|
||||
cctz::time_point<cctz::seconds>());
|
||||
ctz = cctz::fixed_time_zone(std::chrono::duration_cast<std::chrono::seconds>(tz));
|
||||
std::unique_lock<std::shared_mutex> l(zone_cache_rw_lock);
|
||||
zone_cache->emplace(timezone, ctz);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,23 +29,22 @@ class time_zone;
|
||||
|
||||
namespace doris {
|
||||
|
||||
namespace vectorized {
|
||||
using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
|
||||
}
|
||||
|
||||
class TimezoneUtils {
|
||||
public:
|
||||
static void load_timezone_names();
|
||||
static void load_timezones_to_cache(vectorized::ZoneList& cache_list);
|
||||
// we support to parse lower_case timezone name iff execution environment has timezone file
|
||||
static void load_timezones_to_cache();
|
||||
// when use this, timezone will be saved in cache.
|
||||
static bool find_cctz_time_zone(const std::string& timezone, cctz::time_zone& ctz);
|
||||
|
||||
// for ut only
|
||||
static void clear_timezone_names();
|
||||
|
||||
static const std::string default_time_zone;
|
||||
|
||||
private:
|
||||
// for ut only
|
||||
static void clear_timezone_caches();
|
||||
|
||||
static bool find_cctz_time_zone_impl(const std::string& timezone, cctz::time_zone& ctz);
|
||||
|
||||
static bool inited_;
|
||||
static std::unordered_map<std::string, std::string> timezone_names_map_;
|
||||
|
||||
|
||||
@ -43,7 +43,6 @@
|
||||
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
|
||||
#include "common/compiler_util.h" // IWYU pragma: keep
|
||||
#include "common/status.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "udf/udf.h"
|
||||
#include "util/jsonb_document.h"
|
||||
@ -130,14 +129,12 @@ struct TimeCast {
|
||||
// Some examples of conversions.
|
||||
// '300' -> 00:03:00 '20:23' -> 20:23:00 '20:23:24' -> 20:23:24
|
||||
template <typename T>
|
||||
static bool try_parse_time(char* s, size_t len, T& x, const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex& cache_lock) {
|
||||
static bool try_parse_time(char* s, size_t len, T& x, const cctz::time_zone& local_time_zone) {
|
||||
/// TODO: Maybe we can move Timecast to the io_helper.
|
||||
if (try_as_time(s, len, x, local_time_zone)) {
|
||||
return true;
|
||||
} else {
|
||||
if (VecDateTimeValue dv {};
|
||||
dv.from_date_str(s, len, local_time_zone, time_zone_cache, &cache_lock)) {
|
||||
if (VecDateTimeValue dv {}; dv.from_date_str(s, len, local_time_zone)) {
|
||||
// can be parse as a datetime
|
||||
x = dv.hour() * 3600 + dv.minute() * 60 + dv.second();
|
||||
return true;
|
||||
@ -862,25 +859,23 @@ struct NameToDateTime {
|
||||
|
||||
template <typename DataType, typename Additions = void*, typename FromDataType = void*>
|
||||
bool try_parse_impl(typename DataType::FieldType& x, ReadBuffer& rb,
|
||||
const cctz::time_zone& local_time_zone, ZoneList& time_zone_cache,
|
||||
std::shared_mutex& cache_lock,
|
||||
const cctz::time_zone& local_time_zone,
|
||||
Additions additions [[maybe_unused]] = Additions()) {
|
||||
if constexpr (IsDateTimeType<DataType>) {
|
||||
return try_read_datetime_text(x, rb, local_time_zone, time_zone_cache, cache_lock);
|
||||
return try_read_datetime_text(x, rb, local_time_zone);
|
||||
}
|
||||
|
||||
if constexpr (IsDateType<DataType>) {
|
||||
return try_read_date_text(x, rb, local_time_zone, time_zone_cache, cache_lock);
|
||||
return try_read_date_text(x, rb, local_time_zone);
|
||||
}
|
||||
|
||||
if constexpr (IsDateV2Type<DataType>) {
|
||||
return try_read_date_v2_text(x, rb, local_time_zone, time_zone_cache, cache_lock);
|
||||
return try_read_date_v2_text(x, rb, local_time_zone);
|
||||
}
|
||||
|
||||
if constexpr (IsDateTimeV2Type<DataType>) {
|
||||
UInt32 scale = additions;
|
||||
return try_read_datetime_v2_text(x, rb, local_time_zone, time_zone_cache, cache_lock,
|
||||
scale);
|
||||
return try_read_datetime_v2_text(x, rb, local_time_zone, scale);
|
||||
}
|
||||
|
||||
if constexpr (std::is_same_v<DataTypeString, FromDataType> &&
|
||||
@ -889,8 +884,7 @@ bool try_parse_impl(typename DataType::FieldType& x, ReadBuffer& rb,
|
||||
auto len = rb.count();
|
||||
auto s = rb.position();
|
||||
rb.position() = rb.end(); // make is_all_read = true
|
||||
auto ret =
|
||||
TimeCast::try_parse_time(s, len, x, local_time_zone, time_zone_cache, cache_lock);
|
||||
auto ret = TimeCast::try_parse_time(s, len, x, local_time_zone);
|
||||
x *= (1000 * 1000);
|
||||
return ret;
|
||||
}
|
||||
@ -911,8 +905,6 @@ bool try_parse_impl(typename DataType::FieldType& x, ReadBuffer& rb,
|
||||
template <typename DataType, typename Additions = void*>
|
||||
StringParser::ParseResult try_parse_decimal_impl(typename DataType::FieldType& x, ReadBuffer& rb,
|
||||
const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache,
|
||||
std::shared_mutex& cache_lock,
|
||||
Additions additions
|
||||
[[maybe_unused]] = Additions()) {
|
||||
if constexpr (IsDataTypeDecimalV2<DataType>) {
|
||||
@ -1351,10 +1343,6 @@ struct ConvertThroughParsing {
|
||||
using ColVecTo = std::conditional_t<IsDecimalNumber<ToFieldType>,
|
||||
ColumnDecimal<ToFieldType>, ColumnVector<ToFieldType>>;
|
||||
|
||||
// For datelike type, only from FunctionConvertFromString. So we can use its' context。
|
||||
ZoneList& time_zone_cache = context->state()->exec_env()->global_zone_cache();
|
||||
std::shared_mutex& cache_lock = context->state()->exec_env()->zone_cache_rw_lock();
|
||||
|
||||
const IColumn* col_from = block.get_by_position(arguments[0]).column.get();
|
||||
const ColumnString* col_from_string = check_and_get_column<ColumnString>(col_from);
|
||||
|
||||
@ -1405,8 +1393,8 @@ struct ConvertThroughParsing {
|
||||
if constexpr (IsDataTypeDecimal<ToDataType>) {
|
||||
ToDataType::check_type_precision((PrecisionScaleArg(additions).precision));
|
||||
StringParser::ParseResult res = try_parse_decimal_impl<ToDataType>(
|
||||
vec_to[i], read_buffer, context->state()->timezone_obj(), time_zone_cache,
|
||||
cache_lock, PrecisionScaleArg(additions));
|
||||
vec_to[i], read_buffer, context->state()->timezone_obj(),
|
||||
PrecisionScaleArg(additions));
|
||||
parsed = (res == StringParser::PARSE_SUCCESS ||
|
||||
res == StringParser::PARSE_OVERFLOW ||
|
||||
res == StringParser::PARSE_UNDERFLOW);
|
||||
@ -1415,11 +1403,10 @@ struct ConvertThroughParsing {
|
||||
block.get_by_position(result).type.get());
|
||||
parsed = try_parse_impl<ToDataType>(vec_to[i], read_buffer,
|
||||
context->state()->timezone_obj(),
|
||||
time_zone_cache, cache_lock, type->get_scale());
|
||||
type->get_scale());
|
||||
} else {
|
||||
parsed = try_parse_impl<ToDataType, void*, FromDataType>(
|
||||
vec_to[i], read_buffer, context->state()->timezone_obj(), time_zone_cache,
|
||||
cache_lock);
|
||||
vec_to[i], read_buffer, context->state()->timezone_obj());
|
||||
}
|
||||
(*vec_null_map_to)[i] = !parsed || !is_all_read(read_buffer);
|
||||
current_offset = next_offset;
|
||||
|
||||
@ -23,8 +23,6 @@
|
||||
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
@ -69,8 +67,6 @@ class DateV2Value;
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
|
||||
|
||||
template <typename DateValueType, typename ArgType>
|
||||
struct ConvertTZImpl {
|
||||
using ColumnType = std::conditional_t<
|
||||
@ -92,8 +88,6 @@ struct ConvertTZImpl {
|
||||
const ColumnString* from_tz_column, const ColumnString* to_tz_column,
|
||||
ReturnColumnType* result_column, NullMap& result_null_map,
|
||||
size_t input_rows_count) {
|
||||
ZoneList& time_zone_cache = context->state()->exec_env()->global_zone_cache();
|
||||
std::shared_mutex& cache_lock = context->state()->exec_env()->zone_cache_rw_lock();
|
||||
for (size_t i = 0; i < input_rows_count; i++) {
|
||||
if (result_null_map[i]) {
|
||||
result_column->insert_default();
|
||||
@ -101,8 +95,7 @@ struct ConvertTZImpl {
|
||||
}
|
||||
auto from_tz = from_tz_column->get_data_at(i).to_string();
|
||||
auto to_tz = to_tz_column->get_data_at(i).to_string();
|
||||
execute_inner_loop(date_column, time_zone_cache, cache_lock, from_tz, to_tz,
|
||||
result_column, result_null_map, i);
|
||||
execute_inner_loop(date_column, from_tz, to_tz, result_column, result_null_map, i);
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,9 +103,6 @@ struct ConvertTZImpl {
|
||||
const ColumnString* from_tz_column,
|
||||
const ColumnString* to_tz_column, ReturnColumnType* result_column,
|
||||
NullMap& result_null_map, size_t input_rows_count) {
|
||||
ZoneList& time_zone_cache = context->state()->exec_env()->global_zone_cache();
|
||||
std::shared_mutex& cache_lock = context->state()->exec_env()->zone_cache_rw_lock();
|
||||
|
||||
auto from_tz = from_tz_column->get_data_at(0).to_string();
|
||||
auto to_tz = to_tz_column->get_data_at(0).to_string();
|
||||
for (size_t i = 0; i < input_rows_count; i++) {
|
||||
@ -120,56 +110,38 @@ struct ConvertTZImpl {
|
||||
result_column->insert_default();
|
||||
continue;
|
||||
}
|
||||
execute_inner_loop(date_column, time_zone_cache, cache_lock, from_tz, to_tz,
|
||||
result_column, result_null_map, i);
|
||||
execute_inner_loop(date_column, from_tz, to_tz, result_column, result_null_map, i);
|
||||
}
|
||||
}
|
||||
|
||||
static void execute_inner_loop(const ColumnType* date_column, ZoneList& time_zone_cache,
|
||||
std::shared_mutex& cache_lock, const std::string& from_tz,
|
||||
const std::string& to_tz, ReturnColumnType* result_column,
|
||||
static void execute_inner_loop(const ColumnType* date_column, const std::string& from_tz_name,
|
||||
const std::string& to_tz_name, ReturnColumnType* result_column,
|
||||
NullMap& result_null_map, const size_t index_now) {
|
||||
DateValueType ts_value =
|
||||
binary_cast<NativeType, DateValueType>(date_column->get_element(index_now));
|
||||
int64_t timestamp;
|
||||
cctz::time_zone from_tz {}, to_tz {};
|
||||
|
||||
cache_lock.lock_shared();
|
||||
if (time_zone_cache.find(from_tz) == time_zone_cache.cend()) {
|
||||
cache_lock.unlock_shared();
|
||||
std::unique_lock<std::shared_mutex> lock_(cache_lock);
|
||||
//TODO: the lock upgrade could be done in find_... function only when we push value into the hashmap
|
||||
if (!TimezoneUtils::find_cctz_time_zone(from_tz, time_zone_cache[from_tz])) {
|
||||
time_zone_cache.erase(from_tz);
|
||||
result_null_map[index_now] = true;
|
||||
result_column->insert_default();
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
cache_lock.unlock_shared();
|
||||
if (!TimezoneUtils::find_cctz_time_zone(from_tz_name, from_tz)) {
|
||||
result_null_map[index_now] = true;
|
||||
result_column->insert_default();
|
||||
return;
|
||||
}
|
||||
|
||||
cache_lock.lock_shared();
|
||||
if (time_zone_cache.find(to_tz) == time_zone_cache.cend()) {
|
||||
cache_lock.unlock_shared();
|
||||
std::unique_lock<std::shared_mutex> lock_(cache_lock);
|
||||
if (!TimezoneUtils::find_cctz_time_zone(to_tz, time_zone_cache[to_tz])) {
|
||||
time_zone_cache.erase(to_tz);
|
||||
result_null_map[index_now] = true;
|
||||
result_column->insert_default();
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
cache_lock.unlock_shared();
|
||||
if (!TimezoneUtils::find_cctz_time_zone(to_tz_name, to_tz)) {
|
||||
result_null_map[index_now] = true;
|
||||
result_column->insert_default();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!ts_value.unix_timestamp(×tamp, time_zone_cache[from_tz])) {
|
||||
if (!ts_value.unix_timestamp(×tamp, from_tz)) {
|
||||
result_null_map[index_now] = true;
|
||||
result_column->insert_default();
|
||||
return;
|
||||
}
|
||||
|
||||
ReturnDateType ts_value2;
|
||||
if (!ts_value2.from_unixtime(timestamp, time_zone_cache[to_tz])) {
|
||||
if (!ts_value2.from_unixtime(timestamp, to_tz)) {
|
||||
result_null_map[index_now] = true;
|
||||
result_column->insert_default();
|
||||
return;
|
||||
|
||||
@ -21,7 +21,6 @@
|
||||
#include <snappy/snappy.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "common/exception.h"
|
||||
#include "util/binary_cast.hpp"
|
||||
@ -43,8 +42,6 @@ static constexpr size_t DEFAULT_MAX_STRING_SIZE = 1073741824; // 1GB
|
||||
static constexpr size_t DEFAULT_MAX_JSON_SIZE = 1073741824; // 1GB
|
||||
static constexpr auto WRITE_HELPERS_MAX_INT_WIDTH = 40U;
|
||||
|
||||
using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
|
||||
|
||||
inline std::string int128_to_string(__int128_t value) {
|
||||
fmt::memory_buffer buffer;
|
||||
fmt::format_to(buffer, "{}", value);
|
||||
@ -283,12 +280,10 @@ bool read_date_text_impl(T& x, ReadBuffer& buf) {
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool read_date_text_impl(T& x, ReadBuffer& buf, const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex& cache_lock) {
|
||||
bool read_date_text_impl(T& x, ReadBuffer& buf, const cctz::time_zone& local_time_zone) {
|
||||
static_assert(std::is_same_v<Int64, T>);
|
||||
auto dv = binary_cast<Int64, VecDateTimeValue>(x);
|
||||
auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone, time_zone_cache,
|
||||
&cache_lock);
|
||||
auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone);
|
||||
dv.cast_to_date();
|
||||
|
||||
// only to match the is_all_read() check to prevent return null
|
||||
@ -311,12 +306,10 @@ bool read_datetime_text_impl(T& x, ReadBuffer& buf) {
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool read_datetime_text_impl(T& x, ReadBuffer& buf, const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex& cache_lock) {
|
||||
bool read_datetime_text_impl(T& x, ReadBuffer& buf, const cctz::time_zone& local_time_zone) {
|
||||
static_assert(std::is_same_v<Int64, T>);
|
||||
auto dv = binary_cast<Int64, VecDateTimeValue>(x);
|
||||
auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone, time_zone_cache,
|
||||
&cache_lock);
|
||||
auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone);
|
||||
dv.to_datetime();
|
||||
|
||||
// only to match the is_all_read() check to prevent return null
|
||||
@ -338,12 +331,10 @@ bool read_date_v2_text_impl(T& x, ReadBuffer& buf) {
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool read_date_v2_text_impl(T& x, ReadBuffer& buf, const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex& cache_lock) {
|
||||
bool read_date_v2_text_impl(T& x, ReadBuffer& buf, const cctz::time_zone& local_time_zone) {
|
||||
static_assert(std::is_same_v<UInt32, T>);
|
||||
auto dv = binary_cast<UInt32, DateV2Value<DateV2ValueType>>(x);
|
||||
auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone, time_zone_cache,
|
||||
&cache_lock);
|
||||
auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone);
|
||||
|
||||
// only to match the is_all_read() check to prevent return null
|
||||
buf.position() = buf.end();
|
||||
@ -365,12 +356,10 @@ bool read_datetime_v2_text_impl(T& x, ReadBuffer& buf, UInt32 scale = -1) {
|
||||
|
||||
template <typename T>
|
||||
bool read_datetime_v2_text_impl(T& x, ReadBuffer& buf, const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex& cache_lock,
|
||||
UInt32 scale = -1) {
|
||||
static_assert(std::is_same_v<UInt64, T>);
|
||||
auto dv = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(x);
|
||||
auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone, time_zone_cache,
|
||||
&cache_lock, scale);
|
||||
auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone, scale);
|
||||
|
||||
// only to match the is_all_read() check to prevent return null
|
||||
buf.position() = buf.end();
|
||||
@ -456,28 +445,23 @@ StringParser::ParseResult try_read_decimal_text(T& x, ReadBuffer& in, UInt32 pre
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool try_read_datetime_text(T& x, ReadBuffer& in, const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex& cache_lock) {
|
||||
return read_datetime_text_impl<T>(x, in, local_time_zone, time_zone_cache, cache_lock);
|
||||
bool try_read_datetime_text(T& x, ReadBuffer& in, const cctz::time_zone& local_time_zone) {
|
||||
return read_datetime_text_impl<T>(x, in, local_time_zone);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool try_read_date_text(T& x, ReadBuffer& in, const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex& cache_lock) {
|
||||
return read_date_text_impl<T>(x, in, local_time_zone, time_zone_cache, cache_lock);
|
||||
bool try_read_date_text(T& x, ReadBuffer& in, const cctz::time_zone& local_time_zone) {
|
||||
return read_date_text_impl<T>(x, in, local_time_zone);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool try_read_date_v2_text(T& x, ReadBuffer& in, const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex& cache_lock) {
|
||||
return read_date_v2_text_impl<T>(x, in, local_time_zone, time_zone_cache, cache_lock);
|
||||
bool try_read_date_v2_text(T& x, ReadBuffer& in, const cctz::time_zone& local_time_zone) {
|
||||
return read_date_v2_text_impl<T>(x, in, local_time_zone);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool try_read_datetime_v2_text(T& x, ReadBuffer& in, const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex& cache_lock,
|
||||
UInt32 scale) {
|
||||
return read_datetime_v2_text_impl<T>(x, in, local_time_zone, time_zone_cache, cache_lock,
|
||||
scale);
|
||||
return read_datetime_v2_text_impl<T>(x, in, local_time_zone, scale);
|
||||
}
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -85,19 +85,16 @@ bool VecDateTimeValue::check_date(uint32_t year, uint32_t month, uint32_t day) {
|
||||
// YYYY-MM-DD HH-MM-DD.FFFFFF AM in default format
|
||||
// 0 1 2 3 4 5 6 7
|
||||
bool VecDateTimeValue::from_date_str(const char* date_str, int len) {
|
||||
return from_date_str_base(date_str, len, nullptr, nullptr, nullptr);
|
||||
return from_date_str_base(date_str, len, nullptr);
|
||||
}
|
||||
//parse timezone to get offset
|
||||
bool VecDateTimeValue::from_date_str(const char* date_str, int len,
|
||||
const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex* cache_lock) {
|
||||
return from_date_str_base(date_str, len, &local_time_zone, &time_zone_cache, cache_lock);
|
||||
const cctz::time_zone& local_time_zone) {
|
||||
return from_date_str_base(date_str, len, &local_time_zone);
|
||||
}
|
||||
|
||||
bool VecDateTimeValue::from_date_str_base(const char* date_str, int len,
|
||||
const cctz::time_zone* local_time_zone,
|
||||
ZoneList* time_zone_cache,
|
||||
std::shared_mutex* cache_lock) {
|
||||
const cctz::time_zone* local_time_zone) {
|
||||
const char* ptr = date_str;
|
||||
const char* end = date_str + len;
|
||||
// ONLY 2, 6 can follow by a space
|
||||
@ -162,24 +159,16 @@ bool VecDateTimeValue::from_date_str_base(const char* date_str, int len,
|
||||
if (UNLIKELY((field_idx > 2 ||
|
||||
!has_bar) /*dont treat xxxx-xx-xx:xx:xx as xxxx-xx(-xx:xx:xx)*/
|
||||
&& time_zone_begins(ptr, end))) {
|
||||
if (local_time_zone == nullptr || time_zone_cache == nullptr) {
|
||||
if (local_time_zone == nullptr) {
|
||||
return false;
|
||||
}
|
||||
auto get_tz_offset = [&](const std::string& str_tz,
|
||||
const cctz::time_zone* local_time_zone) -> long {
|
||||
cache_lock->lock_shared();
|
||||
if (time_zone_cache->find(str_tz) == time_zone_cache->end()) { // not found
|
||||
cache_lock->unlock_shared();
|
||||
std::unique_lock<std::shared_mutex> lock_(*cache_lock);
|
||||
//TODO: the lock upgrade could be done in find_... function only when we push value into the hashmap
|
||||
if (!TimezoneUtils::find_cctz_time_zone(str_tz, (*time_zone_cache)[str_tz])) {
|
||||
time_zone_cache->erase(str_tz);
|
||||
throw Exception {ErrorCode::INVALID_ARGUMENT, ""};
|
||||
}
|
||||
} else {
|
||||
cache_lock->unlock_shared();
|
||||
cctz::time_zone given_tz {};
|
||||
if (!TimezoneUtils::find_cctz_time_zone(str_tz, given_tz)) {
|
||||
throw Exception {ErrorCode::INVALID_ARGUMENT, ""};
|
||||
}
|
||||
auto given = cctz::convert(cctz::civil_second {}, (*time_zone_cache)[str_tz]);
|
||||
auto given = cctz::convert(cctz::civil_second {}, given_tz);
|
||||
auto local = cctz::convert(cctz::civil_second {}, *local_time_zone);
|
||||
// these two values is absolute time. so they are negative. need to use (-local) - (-given)
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(given - local).count();
|
||||
@ -1965,20 +1954,17 @@ void DateV2Value<T>::format_datetime(uint32_t* date_val, bool* carry_bits) const
|
||||
// 0 1 2 3 4 5 6 7
|
||||
template <typename T>
|
||||
bool DateV2Value<T>::from_date_str(const char* date_str, int len, int scale /* = -1*/) {
|
||||
return from_date_str_base(date_str, len, scale, nullptr, nullptr, nullptr);
|
||||
return from_date_str_base(date_str, len, scale, nullptr);
|
||||
}
|
||||
// when we parse
|
||||
template <typename T>
|
||||
bool DateV2Value<T>::from_date_str(const char* date_str, int len,
|
||||
const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex* cache_lock,
|
||||
int scale /* = -1*/) {
|
||||
return from_date_str_base(date_str, len, scale, &local_time_zone, &time_zone_cache, cache_lock);
|
||||
const cctz::time_zone& local_time_zone, int scale /* = -1*/) {
|
||||
return from_date_str_base(date_str, len, scale, &local_time_zone);
|
||||
}
|
||||
template <typename T>
|
||||
bool DateV2Value<T>::from_date_str_base(const char* date_str, int len, int scale,
|
||||
const cctz::time_zone* local_time_zone,
|
||||
ZoneList* time_zone_cache, std::shared_mutex* cache_lock) {
|
||||
const cctz::time_zone* local_time_zone) {
|
||||
const char* ptr = date_str;
|
||||
const char* end = date_str + len;
|
||||
// ONLY 2, 6 can follow by a space
|
||||
@ -2070,24 +2056,16 @@ bool DateV2Value<T>::from_date_str_base(const char* date_str, int len, int scale
|
||||
if (UNLIKELY((field_idx > 2 ||
|
||||
!has_bar) /*dont treat xxxx-xx-xx:xx:xx as xxxx-xx(-xx:xx:xx)*/
|
||||
&& time_zone_begins(ptr, end))) {
|
||||
if (local_time_zone == nullptr || time_zone_cache == nullptr) {
|
||||
if (local_time_zone == nullptr) {
|
||||
return false;
|
||||
}
|
||||
auto get_tz_offset = [&](const std::string& str_tz,
|
||||
const cctz::time_zone* local_time_zone) -> long {
|
||||
cache_lock->lock_shared();
|
||||
if (time_zone_cache->find(str_tz) == time_zone_cache->end()) { // not found
|
||||
cache_lock->unlock_shared();
|
||||
std::unique_lock<std::shared_mutex> lock_(*cache_lock);
|
||||
//TODO: the lock upgrade could be done in find_... function only when we push value into the hashmap
|
||||
if (!TimezoneUtils::find_cctz_time_zone(str_tz, (*time_zone_cache)[str_tz])) {
|
||||
time_zone_cache->erase(str_tz);
|
||||
throw Exception {ErrorCode::INVALID_ARGUMENT, ""};
|
||||
}
|
||||
} else {
|
||||
cache_lock->unlock_shared();
|
||||
cctz::time_zone given_tz {};
|
||||
if (!TimezoneUtils::find_cctz_time_zone(str_tz, given_tz)) {
|
||||
throw Exception {ErrorCode::INVALID_ARGUMENT, ""};
|
||||
}
|
||||
auto given = cctz::convert(cctz::civil_second {}, (*time_zone_cache)[str_tz]);
|
||||
auto given = cctz::convert(cctz::civil_second {}, given_tz);
|
||||
auto local = cctz::convert(cctz::civil_second {}, *local_time_zone);
|
||||
// these two values is absolute time. so they are negative. need to use (-local) - (-given)
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(given - local).count();
|
||||
|
||||
@ -44,8 +44,6 @@ namespace doris {
|
||||
|
||||
namespace vectorized {
|
||||
|
||||
using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
|
||||
|
||||
enum TimeUnit {
|
||||
MICROSECOND,
|
||||
MILLISECOND,
|
||||
@ -364,8 +362,7 @@ public:
|
||||
// 'YY-MM-DD', 'YYYY-MM-DD', 'YY-MM-DD HH.MM.SS'
|
||||
// 'YYYYMMDDTHHMMSS'
|
||||
bool from_date_str(const char* str, int len);
|
||||
bool from_date_str(const char* str, int len, const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex* cache_lock);
|
||||
bool from_date_str(const char* str, int len, const cctz::time_zone& local_time_zone);
|
||||
|
||||
// Construct Date/Datetime type value from int64_t value.
|
||||
// Return true if convert success. Otherwise return false.
|
||||
@ -703,8 +700,7 @@ private:
|
||||
char* to_date_buffer(char* to) const;
|
||||
char* to_time_buffer(char* to) const;
|
||||
|
||||
bool from_date_str_base(const char* date_str, int len, const cctz::time_zone* local_time_zone,
|
||||
ZoneList* time_zone_cache, std::shared_mutex* cache_lock);
|
||||
bool from_date_str_base(const char* date_str, int len, const cctz::time_zone* local_time_zone);
|
||||
|
||||
int64_t to_date_int64() const;
|
||||
int64_t to_time_int64() const;
|
||||
@ -826,7 +822,7 @@ public:
|
||||
// 'YYYYMMDDTHHMMSS'
|
||||
bool from_date_str(const char* str, int len, int scale = -1);
|
||||
bool from_date_str(const char* str, int len, const cctz::time_zone& local_time_zone,
|
||||
ZoneList& time_zone_cache, std::shared_mutex* cache_lock, int scale = -1);
|
||||
int scale = -1);
|
||||
|
||||
// Convert this value to string
|
||||
// this will check type to decide which format to convert
|
||||
@ -1190,8 +1186,7 @@ private:
|
||||
bool disable_lut = false);
|
||||
|
||||
bool from_date_str_base(const char* date_str, int len, int scale,
|
||||
const cctz::time_zone* local_time_zone, ZoneList* time_zone_cache,
|
||||
std::shared_mutex* cache_lock);
|
||||
const cctz::time_zone* local_time_zone);
|
||||
|
||||
// Used to construct from int value
|
||||
int64_t standardize_timevalue(int64_t value);
|
||||
|
||||
@ -39,17 +39,13 @@ FunctionUtils::FunctionUtils() {
|
||||
|
||||
FunctionUtils::FunctionUtils(const doris::TypeDescriptor& return_type,
|
||||
const std::vector<doris::TypeDescriptor>& arg_types,
|
||||
int varargs_buffer_size, RuntimeState* state = nullptr) {
|
||||
int varargs_buffer_size) {
|
||||
TQueryGlobals globals;
|
||||
globals.__set_now_string("2019-08-06 01:38:57");
|
||||
globals.__set_timestamp_ms(1565026737805);
|
||||
globals.__set_time_zone("Asia/Shanghai");
|
||||
if (state == nullptr) {
|
||||
_state = RuntimeState::create_unique(globals).release();
|
||||
_fn_ctx = FunctionContext::create_context(_state, return_type, arg_types);
|
||||
} else {
|
||||
_fn_ctx = FunctionContext::create_context(state, return_type, arg_types);
|
||||
}
|
||||
_state = RuntimeState::create_unique(globals).release();
|
||||
_fn_ctx = FunctionContext::create_context(_state, return_type, arg_types);
|
||||
}
|
||||
|
||||
FunctionUtils::~FunctionUtils() {
|
||||
|
||||
@ -29,8 +29,7 @@ class FunctionUtils {
|
||||
public:
|
||||
FunctionUtils();
|
||||
FunctionUtils(const doris::TypeDescriptor& return_type,
|
||||
const std::vector<doris::TypeDescriptor>& arg_types, int varargs_buffer_size,
|
||||
RuntimeState*);
|
||||
const std::vector<doris::TypeDescriptor>& arg_types, int varargs_buffer_size);
|
||||
~FunctionUtils();
|
||||
|
||||
doris::FunctionContext* get_fn_ctx() { return _fn_ctx.get(); }
|
||||
|
||||
@ -200,11 +200,9 @@ void check_vec_table_function(TableFunction* fn, const InputTypeSet& input_types
|
||||
// Null values are represented by Null()
|
||||
// The type of the constant column is represented as follows: Consted {TypeIndex::String}
|
||||
// A DataSet with a constant column can only have one row of data
|
||||
// If state != nullptr, should set query options you use for your own.
|
||||
template <typename ReturnType, bool nullable = false>
|
||||
Status check_function(const std::string& func_name, const InputTypeSet& input_types,
|
||||
const DataSet& data_set, bool expect_fail = false,
|
||||
RuntimeState* state = nullptr) {
|
||||
const DataSet& data_set, bool expect_fail = false) {
|
||||
// 1.0 create data type
|
||||
ut_type::UTDataTypeDescs descs;
|
||||
EXPECT_TRUE(parse_ut_data_type(input_types, descs));
|
||||
@ -273,7 +271,7 @@ Status check_function(const std::string& func_name, const InputTypeSet& input_ty
|
||||
fn_ctx_return.type = doris::PrimitiveType::INVALID_TYPE;
|
||||
}
|
||||
|
||||
FunctionUtils fn_utils(fn_ctx_return, arg_types, 0, state);
|
||||
FunctionUtils fn_utils(fn_ctx_return, arg_types, 0);
|
||||
auto* fn_ctx = fn_utils.get_fn_ctx();
|
||||
fn_ctx->set_constant_cols(constant_cols);
|
||||
func->open(fn_ctx, FunctionContext::FRAGMENT_LOCAL);
|
||||
|
||||
@ -204,11 +204,7 @@ TEST(VTimestampFunctionsTest, timediff_test) {
|
||||
TEST(VTimestampFunctionsTest, convert_tz_test) {
|
||||
std::string func_name = "convert_tz";
|
||||
|
||||
ExecEnv* exec_env = ExecEnv::GetInstance();
|
||||
exec_env->_global_zone_cache = std::make_unique<vectorized::ZoneList>();
|
||||
auto test_state = RuntimeState::create_unique();
|
||||
test_state->set_exec_env(exec_env);
|
||||
TimezoneUtils::clear_timezone_names();
|
||||
TimezoneUtils::clear_timezone_caches();
|
||||
|
||||
InputTypeSet input_types = {TypeIndex::DateTimeV2, TypeIndex::String, TypeIndex::String};
|
||||
|
||||
@ -216,8 +212,7 @@ TEST(VTimestampFunctionsTest, convert_tz_test) {
|
||||
DataSet data_set = {{{std::string {"2019-08-01 02:18:27"}, std::string {"Asia/SHANGHAI"},
|
||||
std::string {"america/Los_angeles"}},
|
||||
Null()}};
|
||||
check_function<DataTypeDateTimeV2, true>(func_name, input_types, data_set, false,
|
||||
test_state.get());
|
||||
check_function<DataTypeDateTimeV2, true>(func_name, input_types, data_set, false);
|
||||
}
|
||||
|
||||
{
|
||||
@ -233,8 +228,7 @@ TEST(VTimestampFunctionsTest, convert_tz_test) {
|
||||
{{std::string {"2019-08-01 02:18:27"}, std::string {"Asia/SHANGHAI"},
|
||||
std::string {"america/Los_angeles"}},
|
||||
Null()}};
|
||||
check_function<DataTypeDateTimeV2, true>(func_name, input_types, data_set, false,
|
||||
test_state.get());
|
||||
check_function<DataTypeDateTimeV2, true>(func_name, input_types, data_set, false);
|
||||
}
|
||||
|
||||
{
|
||||
@ -251,9 +245,8 @@ TEST(VTimestampFunctionsTest, convert_tz_test) {
|
||||
std::string {"america/Los_angeles"}},
|
||||
str_to_datetime_v2("2019-07-31 11:18:27", "%Y-%m-%d %H:%i:%s.%f")}};
|
||||
TimezoneUtils::load_timezone_names();
|
||||
TimezoneUtils::load_timezones_to_cache(*exec_env->_global_zone_cache);
|
||||
check_function<DataTypeDateTimeV2, true>(func_name, input_types, data_set, false,
|
||||
test_state.get());
|
||||
TimezoneUtils::load_timezones_to_cache();
|
||||
check_function<DataTypeDateTimeV2, true>(func_name, input_types, data_set, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user