// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "vec/exprs/vectorized_fn_call.h" #include #include // IWYU pragma: keep #include #include #include #include #include #include #include "common/config.h" #include "common/consts.h" #include "common/status.h" #include "runtime/runtime_state.h" #include "udf/udf.h" #include "vec/aggregate_functions/aggregate_function_simple_factory.h" #include "vec/columns/column.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/columns_with_type_and_name.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_agg_state.h" #include "vec/exprs/vexpr_context.h" #include "vec/functions/function_agg_state.h" #include "vec/functions/function_java_udf.h" #include "vec/functions/function_rpc.h" #include "vec/functions/simple_function_factory.h" #include "vec/utils/util.hpp" namespace doris { class RowDescriptor; class RuntimeState; class TExprNode; } // namespace doris namespace doris::vectorized { const std::string AGG_STATE_SUFFIX = "_state"; VectorizedFnCall::VectorizedFnCall(const TExprNode& node) : VExpr(node) {} Status VectorizedFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) { RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context)); ColumnsWithTypeAndName argument_template; argument_template.reserve(_children.size()); for (auto child : _children) { argument_template.emplace_back(nullptr, child->data_type(), child->expr_name()); } _expr_name = fmt::format("VectorizedFnCall[{}](arguments={},return={})", _fn.name.function_name, get_child_names(), _data_type->get_name()); if (_fn.binary_type == TFunctionBinaryType::RPC) { _function = FunctionRPC::create(_fn, argument_template, _data_type); } else if (_fn.binary_type == TFunctionBinaryType::JAVA_UDF) { if (config::enable_java_support) { _function = JavaFunctionCall::create(_fn, argument_template, _data_type); } else { return Status::InternalError( "Java UDF is not enabled, you can change be config enable_java_support to true " "and restart be."); } } else if (_fn.binary_type == TFunctionBinaryType::AGG_STATE) { DataTypes argument_types; for (auto column : argument_template) { argument_types.emplace_back(column.type); } if (match_suffix(_fn.name.function_name, AGG_STATE_SUFFIX)) { if (_data_type->is_nullable()) { return Status::InternalError("State function's return type must be not nullable"); } if (_data_type->get_type_as_type_descriptor().type != PrimitiveType::TYPE_AGG_STATE) { return Status::InternalError( "State function's return type must be agg_state but get {}", _data_type->get_family_name()); } _function = FunctionAggState::create( argument_types, _data_type, assert_cast(_data_type.get())->get_nested_function()); } else { return Status::InternalError("Function {} is not endwith '_state'", _fn.signature); } } else { // get the function. won't prepare function. _function = SimpleFunctionFactory::instance().get_function( _fn.name.function_name, argument_template, _data_type, state->be_exec_version()); } if (_function == nullptr) { return Status::InternalError( "Function {} get failed, expr is {} " "and return type is {}.", _fn.name.function_name, _expr_name, _data_type->get_name()); } VExpr::register_function_context(state, context); _function_name = _fn.name.function_name; _can_fast_execute = _function->can_fast_execute(); return Status::OK(); } Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { for (int i = 0; i < _children.size(); ++i) { RETURN_IF_ERROR(_children[i]->open(state, context, scope)); } RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); if (scope == FunctionContext::FRAGMENT_LOCAL) { RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); } return Status::OK(); } void VectorizedFnCall::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { VExpr::close_function_context(context, scope, _function); VExpr::close(context, scope); } Status VectorizedFnCall::execute(VExprContext* context, vectorized::Block* block, int* result_column_id) { // TODO: not execute const expr again, but use the const column in function context vectorized::ColumnNumbers arguments(_children.size()); for (int i = 0; i < _children.size(); ++i) { int column_id = -1; RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id)); arguments[i] = column_id; } RETURN_IF_ERROR(check_constant(*block, arguments)); // call function size_t num_columns_without_result = block->columns(); // prepare a column to save result block->insert({nullptr, _data_type, _expr_name}); if (_can_fast_execute) { // if not find fast execute result column, means do not need check fast execute again _can_fast_execute = fast_execute(context->fn_context(_fn_context_index), *block, arguments, num_columns_without_result, block->rows()); if (_can_fast_execute) { *result_column_id = num_columns_without_result; return Status::OK(); } } RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), *block, arguments, num_columns_without_result, block->rows(), false)); *result_column_id = num_columns_without_result; return Status::OK(); } // fast_execute can direct copy expr filter result which build by apply index in segment_iterator bool VectorizedFnCall::fast_execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, size_t result, size_t input_rows_count) { auto query_value = block.get_by_position(arguments[1]).to_string(0); std::string column_name = block.get_by_position(arguments[0]).name; auto result_column_name = BeConsts::BLOCK_TEMP_COLUMN_PREFIX + column_name + "_" + _function->get_name() + "_" + query_value; if (!block.has(result_column_name)) { return false; } auto result_column = block.get_by_name(result_column_name).column->convert_to_full_column_if_const(); auto& result_info = block.get_by_position(result); if (result_info.type->is_nullable()) { block.replace_by_position(result, ColumnNullable::create(std::move(result_column), ColumnUInt8::create(input_rows_count, 0))); } else { block.replace_by_position(result, std::move(result_column)); } return true; } const std::string& VectorizedFnCall::expr_name() const { return _expr_name; } std::string VectorizedFnCall::debug_string() const { std::stringstream out; out << "VectorizedFn["; out << _expr_name; out << "]{"; bool first = true; for (auto& input_expr : children()) { if (first) { first = false; } else { out << ","; } out << "\n" << input_expr->debug_string(); } out << "}"; return out.str(); } std::string VectorizedFnCall::debug_string(const std::vector& agg_fns) { std::stringstream out; out << "["; for (int i = 0; i < agg_fns.size(); ++i) { out << (i == 0 ? "" : " ") << agg_fns[i]->debug_string(); } out << "]"; return out.str(); } } // namespace doris::vectorized