[Enhancement](function) support unix_timestamp with float (#26827)
--------- Co-authored-by: YangWithU <plzw8@outlook.com>
This commit is contained in:
@ -17,24 +17,29 @@
|
||||
|
||||
#include <glog/logging.h>
|
||||
#include <limits.h>
|
||||
#include <parquet/column_writer.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <boost/iterator/iterator_facade.hpp>
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <tuple>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "runtime/decimalv2_value.h"
|
||||
#include "runtime/define_primitive_type.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/types.h"
|
||||
#include "udf/udf.h"
|
||||
#include "util/binary_cast.hpp"
|
||||
#include "util/datetype_cast.hpp"
|
||||
#include "util/time.h"
|
||||
#include "util/time_lut.h"
|
||||
#include "vec/aggregate_functions/aggregate_function.h"
|
||||
#include "vec/columns/column.h"
|
||||
@ -54,6 +59,7 @@
|
||||
#include "vec/data_types/data_type.h"
|
||||
#include "vec/data_types/data_type_date.h"
|
||||
#include "vec/data_types/data_type_date_time.h"
|
||||
#include "vec/data_types/data_type_decimal.h"
|
||||
#include "vec/data_types/data_type_nullable.h"
|
||||
#include "vec/data_types/data_type_number.h"
|
||||
#include "vec/data_types/data_type_string.h"
|
||||
@ -574,6 +580,103 @@ template <typename DateType>
|
||||
struct UnixTimeStampDateImpl {
|
||||
static DataTypes get_variadic_argument_types() { return {std::make_shared<DateType>()}; }
|
||||
|
||||
static DataTypePtr get_return_type_impl(const ColumnsWithTypeAndName& arguments) {
|
||||
if constexpr (std::is_same_v<DateType, DataTypeDateTimeV2>) {
|
||||
if (arguments[0].type->is_nullable()) {
|
||||
UInt32 scale = static_cast<const DataTypeNullable*>(arguments[0].type.get())
|
||||
->get_nested_type()
|
||||
->get_scale();
|
||||
return make_nullable(
|
||||
std::make_shared<DataTypeDecimal<Decimal64>>(10 + scale, scale));
|
||||
}
|
||||
UInt32 scale = arguments[0].type->get_scale();
|
||||
return std::make_shared<DataTypeDecimal<Decimal64>>(10 + scale, scale);
|
||||
} else {
|
||||
if (arguments[0].type->is_nullable()) {
|
||||
return make_nullable(std::make_shared<DataTypeInt32>());
|
||||
}
|
||||
return std::make_shared<DataTypeInt32>();
|
||||
}
|
||||
}
|
||||
|
||||
static Status execute_impl(FunctionContext* context, Block& block,
|
||||
const ColumnNumbers& arguments, size_t result,
|
||||
size_t input_rows_count) {
|
||||
const ColumnPtr& col = block.get_by_position(arguments[0]).column;
|
||||
DCHECK(!col->is_nullable());
|
||||
|
||||
if constexpr (std::is_same_v<DateType, DataTypeDate> ||
|
||||
std::is_same_v<DateType, DataTypeDateTime>) {
|
||||
const auto* col_source = assert_cast<const ColumnDate*>(col.get());
|
||||
auto col_result = ColumnVector<Int32>::create();
|
||||
auto& col_result_data = col_result->get_data();
|
||||
col_result->resize(input_rows_count);
|
||||
|
||||
for (int i = 0; i < input_rows_count; i++) {
|
||||
StringRef source = col_source->get_data_at(i);
|
||||
const auto& ts_value = reinterpret_cast<const VecDateTimeValue&>(*source.data);
|
||||
int64_t timestamp {};
|
||||
ts_value.unix_timestamp(×tamp, context->state()->timezone_obj());
|
||||
col_result_data[i] = UnixTimeStampImpl::trim_timestamp(timestamp);
|
||||
}
|
||||
block.replace_by_position(result, std::move(col_result));
|
||||
} else if constexpr (std::is_same_v<DateType, DataTypeDateV2>) {
|
||||
const auto* col_source = assert_cast<const ColumnDateV2*>(col.get());
|
||||
auto col_result = ColumnVector<Int32>::create();
|
||||
auto& col_result_data = col_result->get_data();
|
||||
col_result->resize(input_rows_count);
|
||||
|
||||
for (int i = 0; i < input_rows_count; i++) {
|
||||
StringRef source = col_source->get_data_at(i);
|
||||
const auto& ts_value =
|
||||
reinterpret_cast<const DateV2Value<DateV2ValueType>&>(*source.data);
|
||||
int64_t timestamp {};
|
||||
const auto valid =
|
||||
ts_value.unix_timestamp(×tamp, context->state()->timezone_obj());
|
||||
DCHECK(valid);
|
||||
col_result_data[i] = UnixTimeStampImpl::trim_timestamp(timestamp);
|
||||
}
|
||||
block.replace_by_position(result, std::move(col_result));
|
||||
} else { // DatetimeV2
|
||||
const auto* col_source = assert_cast<const ColumnDateTimeV2*>(col.get());
|
||||
UInt32 scale = block.get_by_position(arguments[0]).type->get_scale();
|
||||
auto col_result = ColumnDecimal<Decimal64>::create(input_rows_count, scale);
|
||||
auto& col_result_data = col_result->get_data();
|
||||
col_result->resize(input_rows_count);
|
||||
|
||||
for (int i = 0; i < input_rows_count; i++) {
|
||||
StringRef source = col_source->get_data_at(i);
|
||||
const auto& ts_value =
|
||||
reinterpret_cast<const DateV2Value<DateTimeV2ValueType>&>(*source.data);
|
||||
std::pair<int64_t, int64_t> timestamp {};
|
||||
const auto valid =
|
||||
ts_value.unix_timestamp(×tamp, context->state()->timezone_obj());
|
||||
DCHECK(valid);
|
||||
|
||||
auto& [sec, ms] = timestamp;
|
||||
sec = UnixTimeStampImpl::trim_timestamp(sec);
|
||||
auto ms_str = std::to_string(ms).substr(0, scale);
|
||||
if (ms_str.empty()) {
|
||||
ms_str = "0";
|
||||
}
|
||||
col_result_data[i] = Decimal64::from_int_frac(sec, std::stoll(ms_str), scale).value;
|
||||
}
|
||||
block.replace_by_position(result, std::move(col_result));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
};
|
||||
|
||||
template <typename DateType>
|
||||
struct UnixTimeStampDatetimeImpl : public UnixTimeStampDateImpl<DateType> {
|
||||
static DataTypes get_variadic_argument_types() { return {std::make_shared<DateType>()}; }
|
||||
};
|
||||
|
||||
template <typename DateType>
|
||||
struct UnixTimeStampDateImplOld {
|
||||
static DataTypes get_variadic_argument_types() { return {std::make_shared<DateType>()}; }
|
||||
|
||||
static DataTypePtr get_return_type_impl(const ColumnsWithTypeAndName& arguments) {
|
||||
RETURN_REAL_TYPE_FOR_DATEV2_FUNCTION(DataTypeInt32);
|
||||
}
|
||||
@ -601,7 +704,7 @@ struct UnixTimeStampDateImpl {
|
||||
StringRef source = col_source->get_data_at(i);
|
||||
const VecDateTimeValue& ts_value =
|
||||
reinterpret_cast<const VecDateTimeValue&>(*source.data);
|
||||
int64_t timestamp;
|
||||
int64_t timestamp {};
|
||||
if (!ts_value.unix_timestamp(×tamp, context->state()->timezone_obj())) {
|
||||
null_map_data[i] = true;
|
||||
} else {
|
||||
@ -626,7 +729,7 @@ struct UnixTimeStampDateImpl {
|
||||
StringRef source = col_source->get_data_at(i);
|
||||
const DateV2Value<DateV2ValueType>& ts_value =
|
||||
reinterpret_cast<const DateV2Value<DateV2ValueType>&>(*source.data);
|
||||
int64_t timestamp;
|
||||
int64_t timestamp {};
|
||||
if (!ts_value.unix_timestamp(×tamp, context->state()->timezone_obj())) {
|
||||
null_map_data[i] = true;
|
||||
} else {
|
||||
@ -642,7 +745,7 @@ struct UnixTimeStampDateImpl {
|
||||
StringRef source = col_source->get_data_at(i);
|
||||
const DateV2Value<DateV2ValueType>& ts_value =
|
||||
reinterpret_cast<const DateV2Value<DateV2ValueType>&>(*source.data);
|
||||
int64_t timestamp;
|
||||
int64_t timestamp {};
|
||||
const auto valid =
|
||||
ts_value.unix_timestamp(×tamp, context->state()->timezone_obj());
|
||||
DCHECK(valid);
|
||||
@ -665,7 +768,7 @@ struct UnixTimeStampDateImpl {
|
||||
StringRef source = col_source->get_data_at(i);
|
||||
const DateV2Value<DateTimeV2ValueType>& ts_value =
|
||||
reinterpret_cast<const DateV2Value<DateTimeV2ValueType>&>(*source.data);
|
||||
int64_t timestamp;
|
||||
int64_t timestamp {};
|
||||
if (!ts_value.unix_timestamp(×tamp, context->state()->timezone_obj())) {
|
||||
null_map_data[i] = true;
|
||||
} else {
|
||||
@ -681,7 +784,7 @@ struct UnixTimeStampDateImpl {
|
||||
StringRef source = col_source->get_data_at(i);
|
||||
const DateV2Value<DateTimeV2ValueType>& ts_value =
|
||||
reinterpret_cast<const DateV2Value<DateTimeV2ValueType>&>(*source.data);
|
||||
int64_t timestamp;
|
||||
int64_t timestamp {};
|
||||
const auto valid =
|
||||
ts_value.unix_timestamp(×tamp, context->state()->timezone_obj());
|
||||
DCHECK(valid);
|
||||
@ -696,15 +799,78 @@ struct UnixTimeStampDateImpl {
|
||||
};
|
||||
|
||||
template <typename DateType>
|
||||
struct UnixTimeStampDatetimeImpl : public UnixTimeStampDateImpl<DateType> {
|
||||
struct UnixTimeStampDatetimeImplOld : public UnixTimeStampDateImplOld<DateType> {
|
||||
static DataTypes get_variadic_argument_types() { return {std::make_shared<DateType>()}; }
|
||||
};
|
||||
|
||||
// This impl doesn't use default impl to deal null value.
|
||||
struct UnixTimeStampStrImpl {
|
||||
static DataTypes get_variadic_argument_types() {
|
||||
return {std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>()};
|
||||
}
|
||||
|
||||
static DataTypePtr get_return_type_impl(const ColumnsWithTypeAndName& arguments) {
|
||||
return make_nullable(std::make_shared<DataTypeDecimal<Decimal64>>(16, 6));
|
||||
}
|
||||
|
||||
static Status execute_impl(FunctionContext* context, Block& block,
|
||||
const ColumnNumbers& arguments, size_t result,
|
||||
size_t input_rows_count) {
|
||||
ColumnPtr col_left = nullptr, col_right = nullptr;
|
||||
bool source_const = false, format_const = false;
|
||||
std::tie(col_left, source_const) =
|
||||
unpack_if_const(block.get_by_position(arguments[0]).column);
|
||||
std::tie(col_right, format_const) =
|
||||
unpack_if_const(block.get_by_position(arguments[1]).column);
|
||||
|
||||
auto col_result = ColumnDecimal<Decimal64>::create(input_rows_count, 0);
|
||||
auto null_map = ColumnVector<UInt8>::create(input_rows_count);
|
||||
auto& col_result_data = col_result->get_data();
|
||||
auto& null_map_data = null_map->get_data();
|
||||
|
||||
check_set_nullable(col_left, null_map, source_const);
|
||||
check_set_nullable(col_right, null_map, format_const);
|
||||
|
||||
const auto* col_source = assert_cast<const ColumnString*>(col_left.get());
|
||||
const auto* col_format = assert_cast<const ColumnString*>(col_right.get());
|
||||
for (int i = 0; i < input_rows_count; i++) {
|
||||
StringRef source = col_source->get_data_at(i);
|
||||
StringRef fmt = col_format->get_data_at(i);
|
||||
|
||||
DateV2Value<DateTimeV2ValueType> ts_value;
|
||||
if (!ts_value.from_date_format_str(fmt.data, fmt.size, source.data, source.size)) {
|
||||
null_map_data[i] = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
std::pair<int64_t, int64_t> timestamp {};
|
||||
if (!ts_value.unix_timestamp(×tamp, context->state()->timezone_obj())) {
|
||||
null_map_data[i] = true;
|
||||
} else {
|
||||
null_map_data[i] = false;
|
||||
|
||||
auto& [sec, ms] = timestamp;
|
||||
sec = UnixTimeStampImpl::trim_timestamp(sec);
|
||||
auto ms_str = std::to_string(ms).substr(0, 6);
|
||||
if (ms_str.empty()) {
|
||||
ms_str = "0";
|
||||
}
|
||||
col_result_data[i] = Decimal64::from_int_frac(sec, std::stoll(ms_str), 6).value;
|
||||
}
|
||||
}
|
||||
|
||||
block.replace_by_position(
|
||||
result, ColumnNullable::create(std::move(col_result), std::move(null_map)));
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
};
|
||||
|
||||
struct UnixTimeStampStrImplOld {
|
||||
static DataTypes get_variadic_argument_types() {
|
||||
return {std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>()};
|
||||
}
|
||||
|
||||
static DataTypePtr get_return_type_impl(const ColumnsWithTypeAndName& arguments) {
|
||||
return make_nullable(std::make_shared<DataTypeInt32>());
|
||||
}
|
||||
@ -739,7 +905,7 @@ struct UnixTimeStampStrImpl {
|
||||
continue;
|
||||
}
|
||||
|
||||
int64_t timestamp;
|
||||
int64_t timestamp {};
|
||||
if (!ts_value.unix_timestamp(×tamp, context->state()->timezone_obj())) {
|
||||
null_map_data[i] = true;
|
||||
} else {
|
||||
@ -763,8 +929,6 @@ public:
|
||||
|
||||
String get_name() const override { return name; }
|
||||
|
||||
bool use_default_implementation_for_nulls() const override { return false; }
|
||||
|
||||
size_t get_number_of_arguments() const override {
|
||||
return get_variadic_argument_types_impl().size();
|
||||
}
|
||||
@ -777,6 +941,10 @@ public:
|
||||
return Impl::get_variadic_argument_types();
|
||||
}
|
||||
|
||||
bool use_default_implementation_for_nulls() const override {
|
||||
return !static_cast<bool>(std::is_same_v<Impl, UnixTimeStampStrImpl>);
|
||||
}
|
||||
|
||||
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
|
||||
size_t result, size_t input_rows_count) const override {
|
||||
return Impl::execute_impl(context, block, arguments, result, input_rows_count);
|
||||
@ -1212,11 +1380,8 @@ void register_function_timestamp(SimpleFunctionFactory& factory) {
|
||||
factory.register_function<FunctionUnixTimestamp<UnixTimeStampImpl>>();
|
||||
factory.register_function<FunctionUnixTimestamp<UnixTimeStampDateImpl<DataTypeDate>>>();
|
||||
factory.register_function<FunctionUnixTimestamp<UnixTimeStampDateImpl<DataTypeDateV2>>>();
|
||||
factory.register_function<FunctionUnixTimestamp<UnixTimeStampDateImpl<DataTypeDateTime>>>();
|
||||
factory.register_function<FunctionUnixTimestamp<UnixTimeStampDateImpl<DataTypeDateTimeV2>>>();
|
||||
factory.register_function<FunctionUnixTimestamp<UnixTimeStampDatetimeImpl<DataTypeDate>>>();
|
||||
factory.register_function<FunctionUnixTimestamp<UnixTimeStampDatetimeImpl<DataTypeDateV2>>>();
|
||||
factory.register_function<
|
||||
FunctionUnixTimestamp<UnixTimeStampDatetimeImpl<DataTypeDateTimeV2>>>();
|
||||
factory.register_function<FunctionUnixTimestamp<UnixTimeStampStrImpl>>();
|
||||
factory.register_function<FunctionDateOrDateTimeToDate<LastDayImpl, DataTypeDateTime>>();
|
||||
factory.register_function<FunctionDateOrDateTimeToDate<LastDayImpl, DataTypeDate>>();
|
||||
@ -1230,6 +1395,15 @@ void register_function_timestamp(SimpleFunctionFactory& factory) {
|
||||
factory.register_function<DateTimeToTimestamp<MicroSec>>();
|
||||
factory.register_function<DateTimeToTimestamp<MilliSec>>();
|
||||
factory.register_function<DateTimeToTimestamp<Sec>>();
|
||||
|
||||
/// @TEMPORARY: for be_exec_version=3
|
||||
factory.register_alternative_function<
|
||||
FunctionUnixTimestamp<UnixTimeStampDateImplOld<DataTypeDate>>>();
|
||||
factory.register_alternative_function<
|
||||
FunctionUnixTimestamp<UnixTimeStampDateImplOld<DataTypeDateV2>>>();
|
||||
factory.register_alternative_function<
|
||||
FunctionUnixTimestamp<UnixTimeStampDateImplOld<DataTypeDateTimeV2>>>();
|
||||
factory.register_alternative_function<FunctionUnixTimestamp<UnixTimeStampStrImplOld>>();
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
Reference in New Issue
Block a user