[refactor](functioncontext) remove function context impl class (#17715)
* [refactor](functioncontext) remove function context impl class Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --------- Co-authored-by: yiguolei <yiguolei@gmail.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
@ -19,7 +19,6 @@
|
||||
#include <memory>
|
||||
|
||||
#include "udf/udf.h"
|
||||
#include "udf/udf_internal.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
|
||||
@ -35,60 +35,46 @@
|
||||
// on libhdfs.
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/types.h"
|
||||
#include "udf/udf_internal.h"
|
||||
#include "udf/udf.h"
|
||||
#include "util/debug_util.h"
|
||||
#include "vec/common/string_ref.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
FunctionContextImpl::FunctionContextImpl()
|
||||
: _state(nullptr),
|
||||
_num_warnings(0),
|
||||
_thread_local_fn_state(nullptr),
|
||||
_fragment_local_fn_state(nullptr) {}
|
||||
static const int MAX_WARNINGS = 1000;
|
||||
|
||||
void FunctionContextImpl::set_constant_cols(
|
||||
std::unique_ptr<doris::FunctionContext> FunctionContext::create_context(
|
||||
RuntimeState* state, const doris::TypeDescriptor& return_type,
|
||||
const std::vector<doris::TypeDescriptor>& arg_types) {
|
||||
auto ctx = std::unique_ptr<doris::FunctionContext>(new doris::FunctionContext());
|
||||
ctx->_state = state;
|
||||
ctx->_return_type = return_type;
|
||||
ctx->_arg_types = arg_types;
|
||||
ctx->_num_warnings = 0;
|
||||
ctx->_thread_local_fn_state = nullptr;
|
||||
ctx->_fragment_local_fn_state = nullptr;
|
||||
return ctx;
|
||||
}
|
||||
|
||||
void FunctionContext::set_constant_cols(
|
||||
const std::vector<std::shared_ptr<doris::ColumnPtrWrapper>>& constant_cols) {
|
||||
_constant_cols = constant_cols;
|
||||
}
|
||||
|
||||
std::unique_ptr<doris::FunctionContext> FunctionContextImpl::create_context(
|
||||
RuntimeState* state, const doris::TypeDescriptor& return_type,
|
||||
const std::vector<doris::TypeDescriptor>& arg_types) {
|
||||
auto ctx = std::unique_ptr<doris::FunctionContext>(new doris::FunctionContext());
|
||||
ctx->_impl->_state = state;
|
||||
ctx->_impl->_return_type = return_type;
|
||||
ctx->_impl->_arg_types = arg_types;
|
||||
return ctx;
|
||||
}
|
||||
|
||||
std::unique_ptr<FunctionContext> FunctionContextImpl::clone() {
|
||||
std::unique_ptr<FunctionContext> FunctionContext::clone() {
|
||||
auto new_context = create_context(_state, _return_type, _arg_types);
|
||||
new_context->_impl->_constant_cols = _constant_cols;
|
||||
new_context->_impl->_fragment_local_fn_state = _fragment_local_fn_state;
|
||||
new_context->_constant_cols = _constant_cols;
|
||||
new_context->_fragment_local_fn_state = _fragment_local_fn_state;
|
||||
return new_context;
|
||||
}
|
||||
|
||||
const doris::TypeDescriptor& FunctionContextImpl::get_return_type() const {
|
||||
return _return_type;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
namespace doris {
|
||||
static const int MAX_WARNINGS = 1000;
|
||||
|
||||
FunctionContext::FunctionContext() {
|
||||
_impl = std::make_unique<doris::FunctionContextImpl>();
|
||||
}
|
||||
|
||||
void FunctionContext::set_function_state(FunctionStateScope scope, std::shared_ptr<void> ptr) {
|
||||
switch (scope) {
|
||||
case THREAD_LOCAL:
|
||||
_impl->_thread_local_fn_state = std::move(ptr);
|
||||
_thread_local_fn_state = std::move(ptr);
|
||||
break;
|
||||
case FRAGMENT_LOCAL:
|
||||
_impl->_fragment_local_fn_state = std::move(ptr);
|
||||
_fragment_local_fn_state = std::move(ptr);
|
||||
break;
|
||||
default:
|
||||
std::stringstream ss;
|
||||
@ -98,27 +84,27 @@ void FunctionContext::set_function_state(FunctionStateScope scope, std::shared_p
|
||||
}
|
||||
|
||||
void FunctionContext::set_error(const char* error_msg) {
|
||||
if (_impl->_error_msg.empty()) {
|
||||
_impl->_error_msg = error_msg;
|
||||
if (_error_msg.empty()) {
|
||||
_error_msg = error_msg;
|
||||
std::stringstream ss;
|
||||
ss << "UDF ERROR: " << error_msg;
|
||||
|
||||
if (_impl->_state != nullptr) {
|
||||
_impl->_state->set_process_status(ss.str());
|
||||
if (_state != nullptr) {
|
||||
_state->set_process_status(ss.str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool FunctionContext::add_warning(const char* warning_msg) {
|
||||
if (_impl->_num_warnings++ >= MAX_WARNINGS) {
|
||||
if (_num_warnings++ >= MAX_WARNINGS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "UDF WARNING: " << warning_msg;
|
||||
|
||||
if (_impl->_state != nullptr) {
|
||||
return _impl->_state->log_error(ss.str());
|
||||
if (_state != nullptr) {
|
||||
return _state->log_error(ss.str());
|
||||
} else {
|
||||
std::cerr << ss.str() << std::endl;
|
||||
return true;
|
||||
@ -126,40 +112,40 @@ bool FunctionContext::add_warning(const char* warning_msg) {
|
||||
}
|
||||
|
||||
const doris::TypeDescriptor* FunctionContext::get_arg_type(int arg_idx) const {
|
||||
if (arg_idx < 0 || arg_idx >= _impl->_arg_types.size()) {
|
||||
if (arg_idx < 0 || arg_idx >= _arg_types.size()) {
|
||||
return nullptr;
|
||||
}
|
||||
return &_impl->_arg_types[arg_idx];
|
||||
return &_arg_types[arg_idx];
|
||||
}
|
||||
|
||||
bool FunctionContext::is_col_constant(int i) const {
|
||||
if (i < 0 || i >= _impl->_constant_cols.size()) {
|
||||
if (i < 0 || i >= _constant_cols.size()) {
|
||||
return false;
|
||||
}
|
||||
return _impl->_constant_cols[i] != nullptr;
|
||||
return _constant_cols[i] != nullptr;
|
||||
}
|
||||
|
||||
doris::ColumnPtrWrapper* FunctionContext::get_constant_col(int i) const {
|
||||
if (i < 0 || i >= _impl->_constant_cols.size()) {
|
||||
if (i < 0 || i >= _constant_cols.size()) {
|
||||
return nullptr;
|
||||
}
|
||||
return _impl->_constant_cols[i].get();
|
||||
return _constant_cols[i].get();
|
||||
}
|
||||
|
||||
int FunctionContext::get_num_args() const {
|
||||
return _impl->_arg_types.size();
|
||||
return _arg_types.size();
|
||||
}
|
||||
|
||||
const doris::TypeDescriptor& FunctionContext::get_return_type() const {
|
||||
return _impl->_return_type;
|
||||
return _return_type;
|
||||
}
|
||||
|
||||
void* FunctionContext::get_function_state(FunctionStateScope scope) const {
|
||||
switch (scope) {
|
||||
case THREAD_LOCAL:
|
||||
return _impl->_thread_local_fn_state.get();
|
||||
return _thread_local_fn_state.get();
|
||||
case FRAGMENT_LOCAL:
|
||||
return _impl->_fragment_local_fn_state.get();
|
||||
return _fragment_local_fn_state.get();
|
||||
default:
|
||||
// TODO: signal error somehow
|
||||
return nullptr;
|
||||
@ -167,8 +153,8 @@ void* FunctionContext::get_function_state(FunctionStateScope scope) const {
|
||||
}
|
||||
|
||||
StringRef FunctionContext::create_temp_string_val(int64_t len) {
|
||||
this->impl()->string_result().resize(len);
|
||||
return StringRef((uint8_t*)this->impl()->string_result().c_str(), len);
|
||||
_string_result.resize(len);
|
||||
return StringRef((uint8_t*)_string_result.c_str(), len);
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -27,9 +27,11 @@
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include "runtime/types.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class FunctionContextImpl;
|
||||
struct ColumnPtrWrapper;
|
||||
struct StringRef;
|
||||
class BitmapValue;
|
||||
@ -37,6 +39,8 @@ class DecimalV2Value;
|
||||
class DateTimeValue;
|
||||
class CollectionValue;
|
||||
struct TypeDescriptor;
|
||||
|
||||
class RuntimeState;
|
||||
// All input and output values will be one of the structs below. The struct is a simple
|
||||
// object containing a boolean to store if the value is nullptr and the value itself. The
|
||||
// value is unspecified if the nullptr boolean is set.
|
||||
@ -71,6 +75,27 @@ public:
|
||||
THREAD_LOCAL,
|
||||
};
|
||||
|
||||
static std::unique_ptr<doris::FunctionContext> create_context(
|
||||
RuntimeState* state, const doris::TypeDescriptor& return_type,
|
||||
const std::vector<doris::TypeDescriptor>& arg_types);
|
||||
|
||||
/// Returns a new FunctionContext with the same constant args, fragment-local state, and
|
||||
/// debug flag as this FunctionContext. The caller is responsible for calling delete on
|
||||
/// it.
|
||||
std::unique_ptr<doris::FunctionContext> clone();
|
||||
|
||||
void set_constant_cols(const std::vector<std::shared_ptr<doris::ColumnPtrWrapper>>& cols);
|
||||
|
||||
RuntimeState* state() { return _state; }
|
||||
|
||||
std::string& string_result() { return _string_result; }
|
||||
|
||||
bool check_overflow_for_decimal() const { return _check_overflow_for_decimal; }
|
||||
|
||||
bool set_check_overflow_for_decimal(bool check_overflow_for_decimal) {
|
||||
return _check_overflow_for_decimal = check_overflow_for_decimal;
|
||||
}
|
||||
|
||||
// Sets an error for this UDF. If this is called, this will trigger the
|
||||
// query to fail.
|
||||
// Note: when you set error for the UDFs used in Data Load, you should
|
||||
@ -83,18 +108,6 @@ public:
|
||||
// added and false if it was ignored due to the cap.
|
||||
bool add_warning(const char* warning_msg);
|
||||
|
||||
// TODO: Do we need to add arbitrary key/value metadata. This would be plumbed
|
||||
// through the query. E.g. "select UDA(col, 'sample=true') from tbl".
|
||||
// const char* GetMetadata(const char*) const;
|
||||
|
||||
// TODO: Add mechanism for UDAs to update stats similar to runtime profile counters
|
||||
|
||||
// TODO: Add mechanism to query for table/column stats
|
||||
|
||||
// Returns the underlying opaque implementation object. The UDF/UDA should not
|
||||
// use this. This is used internally.
|
||||
doris::FunctionContextImpl* impl() { return _impl.get(); }
|
||||
|
||||
/// Methods for maintaining state across UDF/UDA function calls. SetFunctionState() can
|
||||
/// be used to store a pointer that can then be retrieved via GetFunctionState(). If
|
||||
/// GetFunctionState() is called when no pointer is set, it will return
|
||||
@ -132,16 +145,38 @@ public:
|
||||
~FunctionContext() = default;
|
||||
|
||||
private:
|
||||
friend class doris::FunctionContextImpl;
|
||||
|
||||
FunctionContext();
|
||||
FunctionContext() = default;
|
||||
|
||||
// Disable copy ctor and assignment operator
|
||||
FunctionContext(const FunctionContext& other);
|
||||
|
||||
FunctionContext& operator=(const FunctionContext& other);
|
||||
|
||||
std::unique_ptr<doris::FunctionContextImpl> _impl; // Owned by this object.
|
||||
// We use the query's runtime state to report errors and warnings. nullptr for test
|
||||
// contexts.
|
||||
RuntimeState* _state;
|
||||
|
||||
// Empty if there's no error
|
||||
std::string _error_msg;
|
||||
|
||||
// The number of warnings reported.
|
||||
int64_t _num_warnings;
|
||||
|
||||
/// The function state accessed via FunctionContext::Get/SetFunctionState()
|
||||
std::shared_ptr<void> _thread_local_fn_state;
|
||||
std::shared_ptr<void> _fragment_local_fn_state;
|
||||
|
||||
// Type descriptor for the return type of the function.
|
||||
doris::TypeDescriptor _return_type;
|
||||
|
||||
// Type descriptors for each argument of the function.
|
||||
std::vector<doris::TypeDescriptor> _arg_types;
|
||||
|
||||
std::vector<std::shared_ptr<doris::ColumnPtrWrapper>> _constant_cols;
|
||||
|
||||
bool _check_overflow_for_decimal = false;
|
||||
|
||||
std::string _string_result;
|
||||
};
|
||||
|
||||
//----------------------------------------------------------------------------
|
||||
|
||||
@ -1,103 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/udf/udf-internal.h
|
||||
// and modified by Doris
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "runtime/types.h"
|
||||
#include "udf/udf.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class RuntimeState;
|
||||
struct ColumnPtrWrapper;
|
||||
struct TypeDescriptor;
|
||||
|
||||
// This class actually implements the interface of FunctionContext. This is split to
|
||||
// hide the details from the external header.
|
||||
// Note: The actual user code does not include this file.
|
||||
class FunctionContextImpl {
|
||||
public:
|
||||
/// Create a FunctionContext for a UDA. Identical to the UDF version except for the
|
||||
/// intermediate type. Caller is responsible for deleting it.
|
||||
static std::unique_ptr<doris::FunctionContext> create_context(
|
||||
RuntimeState* state, const doris::TypeDescriptor& return_type,
|
||||
const std::vector<doris::TypeDescriptor>& arg_types);
|
||||
|
||||
~FunctionContextImpl() {}
|
||||
|
||||
FunctionContextImpl();
|
||||
|
||||
/// Returns a new FunctionContext with the same constant args, fragment-local state, and
|
||||
/// debug flag as this FunctionContext. The caller is responsible for calling delete on
|
||||
/// it.
|
||||
std::unique_ptr<doris::FunctionContext> clone();
|
||||
|
||||
void set_constant_cols(const std::vector<std::shared_ptr<doris::ColumnPtrWrapper>>& cols);
|
||||
|
||||
RuntimeState* state() { return _state; }
|
||||
|
||||
std::string& string_result() { return _string_result; }
|
||||
|
||||
const doris::TypeDescriptor& get_return_type() const;
|
||||
|
||||
bool check_overflow_for_decimal() const { return _check_overflow_for_decimal; }
|
||||
|
||||
bool set_check_overflow_for_decimal(bool check_overflow_for_decimal) {
|
||||
return _check_overflow_for_decimal = check_overflow_for_decimal;
|
||||
}
|
||||
|
||||
private:
|
||||
friend class doris::FunctionContext;
|
||||
|
||||
// We use the query's runtime state to report errors and warnings. nullptr for test
|
||||
// contexts.
|
||||
RuntimeState* _state;
|
||||
|
||||
// Empty if there's no error
|
||||
std::string _error_msg;
|
||||
|
||||
// The number of warnings reported.
|
||||
int64_t _num_warnings;
|
||||
|
||||
/// The function state accessed via FunctionContext::Get/SetFunctionState()
|
||||
std::shared_ptr<void> _thread_local_fn_state;
|
||||
std::shared_ptr<void> _fragment_local_fn_state;
|
||||
|
||||
// Type descriptor for the return type of the function.
|
||||
doris::TypeDescriptor _return_type;
|
||||
|
||||
// Type descriptors for each argument of the function.
|
||||
std::vector<doris::TypeDescriptor> _arg_types;
|
||||
|
||||
std::vector<std::shared_ptr<doris::ColumnPtrWrapper>> _constant_cols;
|
||||
|
||||
bool _check_overflow_for_decimal = false;
|
||||
|
||||
std::string _string_result;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
@ -23,7 +23,7 @@
|
||||
#include "common/status.h"
|
||||
#include "fmt/format.h"
|
||||
#include "fmt/ranges.h"
|
||||
#include "udf/udf_internal.h"
|
||||
#include "udf/udf.h"
|
||||
#include "vec/data_types/data_type_nullable.h"
|
||||
#include "vec/data_types/data_type_number.h"
|
||||
#include "vec/functions/function_java_udf.h"
|
||||
|
||||
@ -385,7 +385,7 @@ Status VExpr::init_function_context(VExprContext* context,
|
||||
RETURN_IF_ERROR(c->get_const_col(context, &const_col));
|
||||
constant_cols.push_back(const_col);
|
||||
}
|
||||
fn_ctx->impl()->set_constant_cols(constant_cols);
|
||||
fn_ctx->set_constant_cols(constant_cols);
|
||||
}
|
||||
|
||||
if (scope == FunctionContext::FRAGMENT_LOCAL) {
|
||||
|
||||
@ -25,7 +25,7 @@
|
||||
#include "exprs/hybrid_set.h"
|
||||
#include "gen_cpp/Exprs_types.h"
|
||||
#include "runtime/types.h"
|
||||
#include "udf/udf_internal.h"
|
||||
#include "udf/udf.h"
|
||||
#include "vec/data_types/data_type.h"
|
||||
#include "vec/exprs/vexpr_context.h"
|
||||
#include "vec/functions/function.h"
|
||||
|
||||
@ -17,7 +17,7 @@
|
||||
|
||||
#include "vec/exprs/vexpr_context.h"
|
||||
|
||||
#include "udf/udf_internal.h"
|
||||
#include "udf/udf.h"
|
||||
#include "util/stack_util.h"
|
||||
#include "vec/exprs/vexpr.h"
|
||||
|
||||
@ -76,7 +76,7 @@ doris::Status VExprContext::clone(RuntimeState* state, VExprContext** new_ctx) {
|
||||
|
||||
*new_ctx = state->obj_pool()->add(new VExprContext(_root));
|
||||
for (auto& _fn_context : _fn_contexts) {
|
||||
(*new_ctx)->_fn_contexts.push_back(_fn_context->impl()->clone());
|
||||
(*new_ctx)->_fn_contexts.push_back(_fn_context->clone());
|
||||
}
|
||||
|
||||
(*new_ctx)->_is_clone = true;
|
||||
@ -88,16 +88,15 @@ doris::Status VExprContext::clone(RuntimeState* state, VExprContext** new_ctx) {
|
||||
|
||||
void VExprContext::clone_fn_contexts(VExprContext* other) {
|
||||
for (auto& _fn_context : _fn_contexts) {
|
||||
other->_fn_contexts.push_back(_fn_context->impl()->clone());
|
||||
other->_fn_contexts.push_back(_fn_context->clone());
|
||||
}
|
||||
}
|
||||
|
||||
int VExprContext::register_function_context(RuntimeState* state,
|
||||
const doris::TypeDescriptor& return_type,
|
||||
const std::vector<doris::TypeDescriptor>& arg_types) {
|
||||
_fn_contexts.push_back(FunctionContextImpl::create_context(state, return_type, arg_types));
|
||||
_fn_contexts.back()->impl()->set_check_overflow_for_decimal(
|
||||
state->check_overflow_for_decimal());
|
||||
_fn_contexts.push_back(FunctionContext::create_context(state, return_type, arg_types));
|
||||
_fn_contexts.back()->set_check_overflow_for_decimal(state->check_overflow_for_decimal());
|
||||
return _fn_contexts.size() - 1;
|
||||
}
|
||||
|
||||
|
||||
@ -22,7 +22,7 @@
|
||||
|
||||
#include "common/status.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "udf/udf_internal.h"
|
||||
#include "udf/udf.h"
|
||||
#include "util/binary_cast.hpp"
|
||||
#include "util/type_traits.h"
|
||||
#include "vec/columns/column_nullable.h"
|
||||
@ -296,7 +296,7 @@ struct TransformerToStringTwoArgument {
|
||||
if constexpr (is_specialization_of_v<Transform, FromUnixTimeImpl>) {
|
||||
std::tie(new_offset, is_null) =
|
||||
Transform::execute(t, StringRef(format.c_str(), format.size()), res_data,
|
||||
offset, context->impl()->state()->timezone_obj());
|
||||
offset, context->state()->timezone_obj());
|
||||
} else {
|
||||
std::tie(new_offset, is_null) = Transform::execute(
|
||||
t, StringRef(format.c_str(), format.size()), res_data, offset);
|
||||
|
||||
@ -23,7 +23,7 @@
|
||||
#include <type_traits>
|
||||
|
||||
#include "runtime/decimalv2_value.h"
|
||||
#include "udf/udf_internal.h"
|
||||
#include "udf/udf.h"
|
||||
#include "vec/columns/column_const.h"
|
||||
#include "vec/columns/column_decimal.h"
|
||||
#include "vec/columns/column_nullable.h"
|
||||
@ -811,7 +811,7 @@ public:
|
||||
right_generic =
|
||||
static_cast<const DataTypeNullable*>(right_generic)->get_nested_type().get();
|
||||
}
|
||||
bool result_is_nullable = context->impl()->check_overflow_for_decimal();
|
||||
bool result_is_nullable = context->check_overflow_for_decimal();
|
||||
if (result_generic->is_nullable()) {
|
||||
result_generic =
|
||||
static_cast<const DataTypeNullable*>(result_generic)->get_nested_type().get();
|
||||
|
||||
@ -22,7 +22,7 @@
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include "udf/udf_internal.h"
|
||||
#include "udf/udf.h"
|
||||
#include "vec/columns/column_array.h"
|
||||
#include "vec/columns/column_const.h"
|
||||
#include "vec/columns/column_nullable.h"
|
||||
@ -895,14 +895,14 @@ public:
|
||||
|
||||
ret_status = ConvertImpl<LeftDataType, RightDataType, Name>::execute(
|
||||
block, arguments, result, input_rows_count,
|
||||
context->impl()->check_overflow_for_decimal(), scale);
|
||||
context->check_overflow_for_decimal(), scale);
|
||||
} else if constexpr (IsDataTypeDateTimeV2<RightDataType>) {
|
||||
const ColumnWithTypeAndName& scale_column = block.get_by_position(result);
|
||||
auto type =
|
||||
check_and_get_data_type<DataTypeDateTimeV2>(scale_column.type.get());
|
||||
ret_status = ConvertImpl<LeftDataType, RightDataType, Name>::execute(
|
||||
block, arguments, result, input_rows_count,
|
||||
context->impl()->check_overflow_for_decimal(), type->get_scale());
|
||||
context->check_overflow_for_decimal(), type->get_scale());
|
||||
} else {
|
||||
ret_status = ConvertImpl<LeftDataType, RightDataType, Name>::execute(
|
||||
block, arguments, result, input_rows_count);
|
||||
@ -1380,7 +1380,7 @@ private:
|
||||
|
||||
ConvertImpl<LeftDataType, RightDataType, NameCast>::execute(
|
||||
block, arguments, result, input_rows_count,
|
||||
context->impl()->check_overflow_for_decimal(), scale);
|
||||
context->check_overflow_for_decimal(), scale);
|
||||
return true;
|
||||
});
|
||||
|
||||
|
||||
@ -21,7 +21,7 @@
|
||||
#include "fmt/format.h"
|
||||
#include "runtime/datetime_value.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "udf/udf_internal.h"
|
||||
#include "udf/udf.h"
|
||||
#include "util/binary_cast.hpp"
|
||||
#include "vec/columns/column_const.h"
|
||||
#include "vec/columns/column_vector.h"
|
||||
@ -851,9 +851,9 @@ struct CurrentDateTimeImpl {
|
||||
if (const ColumnConst* const_column = check_and_get_column<ColumnConst>(
|
||||
block.get_by_position(arguments[0]).column)) {
|
||||
int scale = const_column->get_int(0);
|
||||
if (dtv.from_unixtime(context->impl()->state()->timestamp_ms() / 1000,
|
||||
context->impl()->state()->nano_seconds(),
|
||||
context->impl()->state()->timezone_obj(), scale)) {
|
||||
if (dtv.from_unixtime(context->state()->timestamp_ms() / 1000,
|
||||
context->state()->nano_seconds(),
|
||||
context->state()->timezone_obj(), scale)) {
|
||||
if constexpr (std::is_same_v<DateValueType, VecDateTimeValue>) {
|
||||
reinterpret_cast<DateValueType*>(&dtv)->set_type(TIME_DATETIME);
|
||||
}
|
||||
@ -872,11 +872,10 @@ struct CurrentDateTimeImpl {
|
||||
const auto& null_map = nullable_column->get_null_map_data();
|
||||
const auto& nested_column = nullable_column->get_nested_column_ptr();
|
||||
for (int i = 0; i < input_rows_count; i++) {
|
||||
if (!null_map[i] &&
|
||||
dtv.from_unixtime(context->impl()->state()->timestamp_ms() / 1000,
|
||||
context->impl()->state()->nano_seconds(),
|
||||
context->impl()->state()->timezone_obj(),
|
||||
nested_column->get64(i))) {
|
||||
if (!null_map[i] && dtv.from_unixtime(context->state()->timestamp_ms() / 1000,
|
||||
context->state()->nano_seconds(),
|
||||
context->state()->timezone_obj(),
|
||||
nested_column->get64(i))) {
|
||||
if constexpr (std::is_same_v<DateValueType, VecDateTimeValue>) {
|
||||
reinterpret_cast<DateValueType*>(&dtv)->set_type(TIME_DATETIME);
|
||||
}
|
||||
@ -894,10 +893,9 @@ struct CurrentDateTimeImpl {
|
||||
} else {
|
||||
auto& int_column = block.get_by_position(arguments[0]).column;
|
||||
for (int i = 0; i < input_rows_count; i++) {
|
||||
if (dtv.from_unixtime(context->impl()->state()->timestamp_ms() / 1000,
|
||||
context->impl()->state()->nano_seconds(),
|
||||
context->impl()->state()->timezone_obj(),
|
||||
int_column->get64(i))) {
|
||||
if (dtv.from_unixtime(context->state()->timestamp_ms() / 1000,
|
||||
context->state()->nano_seconds(),
|
||||
context->state()->timezone_obj(), int_column->get64(i))) {
|
||||
if constexpr (std::is_same_v<DateValueType, VecDateTimeValue>) {
|
||||
reinterpret_cast<DateValueType*>(&dtv)->set_type(TIME_DATETIME);
|
||||
}
|
||||
@ -914,8 +912,8 @@ struct CurrentDateTimeImpl {
|
||||
use_const = false;
|
||||
}
|
||||
} else {
|
||||
if (dtv.from_unixtime(context->impl()->state()->timestamp_ms() / 1000,
|
||||
context->impl()->state()->timezone_obj())) {
|
||||
if (dtv.from_unixtime(context->state()->timestamp_ms() / 1000,
|
||||
context->state()->timezone_obj())) {
|
||||
if constexpr (std::is_same_v<DateValueType, VecDateTimeValue>) {
|
||||
reinterpret_cast<DateValueType*>(&dtv)->set_type(TIME_DATETIME);
|
||||
}
|
||||
@ -949,8 +947,8 @@ struct CurrentDateImpl {
|
||||
auto col_to = ColumnVector<NativeType>::create();
|
||||
if constexpr (std::is_same_v<DateType, DataTypeDateV2>) {
|
||||
DateV2Value<DateV2ValueType> dtv;
|
||||
if (dtv.from_unixtime(context->impl()->state()->timestamp_ms() / 1000,
|
||||
context->impl()->state()->timezone_obj())) {
|
||||
if (dtv.from_unixtime(context->state()->timestamp_ms() / 1000,
|
||||
context->state()->timezone_obj())) {
|
||||
auto date_packed_int = binary_cast<DateV2Value<DateV2ValueType>, uint32_t>(
|
||||
*reinterpret_cast<DateV2Value<DateV2ValueType>*>(&dtv));
|
||||
col_to->insert_data(
|
||||
@ -962,8 +960,8 @@ struct CurrentDateImpl {
|
||||
}
|
||||
} else {
|
||||
VecDateTimeValue dtv;
|
||||
if (dtv.from_unixtime(context->impl()->state()->timestamp_ms() / 1000,
|
||||
context->impl()->state()->timezone_obj())) {
|
||||
if (dtv.from_unixtime(context->state()->timestamp_ms() / 1000,
|
||||
context->state()->timezone_obj())) {
|
||||
reinterpret_cast<VecDateTimeValue*>(&dtv)->set_type(TIME_DATE);
|
||||
auto date_packed_int = binary_cast<doris::vectorized::VecDateTimeValue, int64_t>(
|
||||
*reinterpret_cast<VecDateTimeValue*>(&dtv));
|
||||
@ -989,8 +987,8 @@ struct CurrentTimeImpl {
|
||||
size_t result, size_t input_rows_count) {
|
||||
auto col_to = ColumnVector<Float64>::create();
|
||||
VecDateTimeValue dtv;
|
||||
if (dtv.from_unixtime(context->impl()->state()->timestamp_ms() / 1000,
|
||||
context->impl()->state()->timezone_obj())) {
|
||||
if (dtv.from_unixtime(context->state()->timestamp_ms() / 1000,
|
||||
context->state()->timezone_obj())) {
|
||||
double time = dtv.hour() * 3600l + dtv.minute() * 60l + dtv.second();
|
||||
col_to->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&time)), 0);
|
||||
} else {
|
||||
@ -1025,7 +1023,7 @@ struct UtcTimestampImpl {
|
||||
size_t input_rows_count) {
|
||||
auto col_to = ColumnVector<Int64>::create();
|
||||
DateValueType dtv;
|
||||
if (dtv.from_unixtime(context->impl()->state()->timestamp_ms() / 1000, "+00:00")) {
|
||||
if (dtv.from_unixtime(context->state()->timestamp_ms() / 1000, "+00:00")) {
|
||||
if constexpr (std::is_same_v<DateValueType, VecDateTimeValue>) {
|
||||
reinterpret_cast<DateValueType*>(&dtv)->set_type(TIME_DATETIME);
|
||||
}
|
||||
|
||||
@ -1070,14 +1070,14 @@ public:
|
||||
if (auto* col2 = check_and_get_column<ColumnInt32>(*argument_ptr[1])) {
|
||||
vector_vector(col1->get_chars(), col1->get_offsets(), col2->get_data(),
|
||||
res->get_chars(), res->get_offsets(), null_map->get_data(),
|
||||
context->impl()->state()->repeat_max_num());
|
||||
context->state()->repeat_max_num());
|
||||
block.replace_by_position(
|
||||
result, ColumnNullable::create(std::move(res), std::move(null_map)));
|
||||
return Status::OK();
|
||||
} else if (auto* col2_const = check_and_get_column<ColumnConst>(*argument_ptr[1])) {
|
||||
DCHECK(check_and_get_column<ColumnInt32>(col2_const->get_data_column()));
|
||||
int repeat = std::min<int>(col2_const->get_int(0),
|
||||
context->impl()->state()->repeat_max_num());
|
||||
int repeat =
|
||||
std::min<int>(col2_const->get_int(0), context->state()->repeat_max_num());
|
||||
if (repeat <= 0) {
|
||||
null_map->get_data().resize_fill(input_rows_count, 0);
|
||||
res->insert_many_defaults(input_rows_count);
|
||||
|
||||
@ -16,7 +16,7 @@
|
||||
// under the License.
|
||||
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "udf/udf_internal.h"
|
||||
#include "udf/udf.h"
|
||||
#include "vec/columns/column_const.h"
|
||||
#include "vec/columns/column_nullable.h"
|
||||
#include "vec/columns/column_string.h"
|
||||
@ -117,8 +117,7 @@ struct StrToDate {
|
||||
null_map[i] = 1;
|
||||
}
|
||||
if constexpr (std::is_same_v<DateValueType, VecDateTimeValue>) {
|
||||
if (context->impl()->get_return_type().type ==
|
||||
doris::PrimitiveType::TYPE_DATETIME) {
|
||||
if (context->get_return_type().type == doris::PrimitiveType::TYPE_DATETIME) {
|
||||
ts_val.to_datetime();
|
||||
} else {
|
||||
ts_val.cast_to_date();
|
||||
@ -399,7 +398,7 @@ struct UnixTimeStampImpl {
|
||||
size_t input_rows_count) {
|
||||
auto col_result = ColumnVector<Int32>::create();
|
||||
col_result->resize(1);
|
||||
col_result->get_data()[0] = context->impl()->state()->timestamp_ms() / 1000;
|
||||
col_result->get_data()[0] = context->state()->timestamp_ms() / 1000;
|
||||
auto col_const = ColumnConst::create(std::move(col_result), input_rows_count);
|
||||
block.replace_by_position(result, std::move(col_const));
|
||||
return Status::OK();
|
||||
@ -438,8 +437,7 @@ struct UnixTimeStampDateImpl {
|
||||
const VecDateTimeValue& ts_value =
|
||||
reinterpret_cast<const VecDateTimeValue&>(*source.data);
|
||||
int64_t timestamp;
|
||||
if (!ts_value.unix_timestamp(×tamp,
|
||||
context->impl()->state()->timezone_obj())) {
|
||||
if (!ts_value.unix_timestamp(×tamp, context->state()->timezone_obj())) {
|
||||
null_map_data[i] = true;
|
||||
} else {
|
||||
null_map_data[i] = false;
|
||||
@ -464,8 +462,7 @@ struct UnixTimeStampDateImpl {
|
||||
const DateV2Value<DateV2ValueType>& ts_value =
|
||||
reinterpret_cast<const DateV2Value<DateV2ValueType>&>(*source.data);
|
||||
int64_t timestamp;
|
||||
if (!ts_value.unix_timestamp(×tamp,
|
||||
context->impl()->state()->timezone_obj())) {
|
||||
if (!ts_value.unix_timestamp(×tamp, context->state()->timezone_obj())) {
|
||||
null_map_data[i] = true;
|
||||
} else {
|
||||
null_map_data[i] = false;
|
||||
@ -481,8 +478,8 @@ struct UnixTimeStampDateImpl {
|
||||
const DateV2Value<DateV2ValueType>& ts_value =
|
||||
reinterpret_cast<const DateV2Value<DateV2ValueType>&>(*source.data);
|
||||
int64_t timestamp;
|
||||
const auto valid = ts_value.unix_timestamp(
|
||||
×tamp, context->impl()->state()->timezone_obj());
|
||||
const auto valid =
|
||||
ts_value.unix_timestamp(×tamp, context->state()->timezone_obj());
|
||||
DCHECK(valid);
|
||||
col_result_data[i] = UnixTimeStampImpl::trim_timestamp(timestamp);
|
||||
}
|
||||
@ -504,8 +501,7 @@ struct UnixTimeStampDateImpl {
|
||||
const DateV2Value<DateTimeV2ValueType>& ts_value =
|
||||
reinterpret_cast<const DateV2Value<DateTimeV2ValueType>&>(*source.data);
|
||||
int64_t timestamp;
|
||||
if (!ts_value.unix_timestamp(×tamp,
|
||||
context->impl()->state()->timezone_obj())) {
|
||||
if (!ts_value.unix_timestamp(×tamp, context->state()->timezone_obj())) {
|
||||
null_map_data[i] = true;
|
||||
} else {
|
||||
null_map_data[i] = false;
|
||||
@ -521,8 +517,8 @@ struct UnixTimeStampDateImpl {
|
||||
const DateV2Value<DateTimeV2ValueType>& ts_value =
|
||||
reinterpret_cast<const DateV2Value<DateTimeV2ValueType>&>(*source.data);
|
||||
int64_t timestamp;
|
||||
const auto valid = ts_value.unix_timestamp(
|
||||
×tamp, context->impl()->state()->timezone_obj());
|
||||
const auto valid =
|
||||
ts_value.unix_timestamp(×tamp, context->state()->timezone_obj());
|
||||
DCHECK(valid);
|
||||
col_result_data[i] = UnixTimeStampImpl::trim_timestamp(timestamp);
|
||||
}
|
||||
@ -579,7 +575,7 @@ struct UnixTimeStampStrImpl {
|
||||
}
|
||||
|
||||
int64_t timestamp;
|
||||
if (!ts_value.unix_timestamp(×tamp, context->impl()->state()->timezone_obj())) {
|
||||
if (!ts_value.unix_timestamp(×tamp, context->state()->timezone_obj())) {
|
||||
null_map_data[i] = true;
|
||||
} else {
|
||||
null_map_data[i] = false;
|
||||
|
||||
@ -21,7 +21,7 @@
|
||||
|
||||
#include "runtime/mem_pool.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "udf/udf_internal.h"
|
||||
#include "udf/udf.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -33,7 +33,7 @@ FunctionUtils::FunctionUtils() {
|
||||
_state = new RuntimeState(globals);
|
||||
doris::TypeDescriptor return_type;
|
||||
std::vector<doris::TypeDescriptor> arg_types;
|
||||
_fn_ctx = FunctionContextImpl::create_context(_state, return_type, arg_types);
|
||||
_fn_ctx = FunctionContext::create_context(_state, return_type, arg_types);
|
||||
}
|
||||
|
||||
FunctionUtils::FunctionUtils(const doris::TypeDescriptor& return_type,
|
||||
@ -44,7 +44,7 @@ FunctionUtils::FunctionUtils(const doris::TypeDescriptor& return_type,
|
||||
globals.__set_timestamp_ms(1565026737805);
|
||||
globals.__set_time_zone("Asia/Shanghai");
|
||||
_state = new RuntimeState(globals);
|
||||
_fn_ctx = FunctionContextImpl::create_context(_state, return_type, arg_types);
|
||||
_fn_ctx = FunctionContext::create_context(_state, return_type, arg_types);
|
||||
}
|
||||
|
||||
FunctionUtils::~FunctionUtils() {
|
||||
|
||||
@ -19,7 +19,6 @@
|
||||
#include <vector>
|
||||
|
||||
#include "udf/udf.h"
|
||||
#include "udf/udf_internal.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
|
||||
@ -25,7 +25,6 @@
|
||||
#include "testutil/any_type.h"
|
||||
#include "testutil/function_utils.h"
|
||||
#include "udf/udf.h"
|
||||
#include "udf/udf_internal.h"
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/columns/column_const.h"
|
||||
#include "vec/core/columns_with_type_and_name.h"
|
||||
@ -249,7 +248,7 @@ Status check_function(const std::string& func_name, const InputTypeSet& input_ty
|
||||
|
||||
FunctionUtils fn_utils(fn_ctx_return, arg_types, 0);
|
||||
auto* fn_ctx = fn_utils.get_fn_ctx();
|
||||
fn_ctx->impl()->set_constant_cols(constant_cols);
|
||||
fn_ctx->set_constant_cols(constant_cols);
|
||||
func->open(fn_ctx, FunctionContext::FRAGMENT_LOCAL);
|
||||
func->open(fn_ctx, FunctionContext::THREAD_LOCAL);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user