From 7380483394bc96976164650d5dcf0ccc063eae06 Mon Sep 17 00:00:00 2001 From: ZHAO Chun Date: Sat, 29 Dec 2018 09:13:04 +0800 Subject: [PATCH] Support UDF (#468) Now, user can create UDF with CREATE FUNCTION statement. Doris only support UDF in this version, it will support UDAF/UDTF later. --- be/CMakeLists.txt | 2 +- be/src/exprs/agg_fn.cc | 17 +- be/src/exprs/agg_fn_evaluator.cpp | 22 +- be/src/exprs/scalar_fn_call.cpp | 8 +- be/src/udf_samples/CMakeLists.txt | 24 ++ be/src/udf_samples/udf_sample.cpp | 29 +++ be/src/udf_samples/udf_sample.h | 26 ++ .../help/Contents/Data Definition/ddl_stmt.md | 39 +++ fe/src/main/cup/sql_parser.cup | 66 +++-- .../doris/analysis/CreateFunctionStmt.java | 151 +++++++++--- .../doris/analysis/DropFunctionStmt.java | 24 +- .../doris/analysis/FunctionArgsDef.java | 74 ++++++ .../doris/analysis/FunctionCallExpr.java | 17 +- .../apache/doris/analysis/FunctionName.java | 29 +++ .../doris/catalog/AggregateFunction.java | 70 +++++- .../org/apache/doris/catalog/Catalog.java | 39 +++ .../org/apache/doris/catalog/Database.java | 123 +++++++++- .../org/apache/doris/catalog/Function.java | 228 +++++++++++++++++- .../doris/catalog/FunctionSearchDesc.java | 114 +++++++++ .../apache/doris/catalog/ScalarFunction.java | 64 ++++- .../java/org/apache/doris/catalog/Uda.java | 122 ---------- .../java/org/apache/doris/catalog/Udf.java | 65 ----- .../org/apache/doris/common/FeConstants.java | 2 +- .../apache/doris/common/FeMetaVersion.java | 2 + .../org/apache/doris/common/io/IOUtils.java | 17 ++ .../apache/doris/journal/JournalEntity.java | 12 + .../org/apache/doris/persist/EditLog.java | 20 ++ .../apache/doris/persist/OperationType.java | 4 + .../java/org/apache/doris/qe/DdlExecutor.java | 6 + .../org/apache/doris/qe/StmtExecutor.java | 2 +- fe/src/main/jflex/sql_scanner.flex | 4 + 31 files changed, 1122 insertions(+), 300 deletions(-) create mode 100644 be/src/udf_samples/CMakeLists.txt create mode 100644 be/src/udf_samples/udf_sample.cpp create mode 100644 be/src/udf_samples/udf_sample.h create mode 100644 fe/src/main/java/org/apache/doris/analysis/FunctionArgsDef.java create mode 100644 fe/src/main/java/org/apache/doris/catalog/FunctionSearchDesc.java delete mode 100644 fe/src/main/java/org/apache/doris/catalog/Uda.java delete mode 100644 fe/src/main/java/org/apache/doris/catalog/Udf.java diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index a598d56480..a829efe14d 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -559,7 +559,7 @@ add_subdirectory(${SRC_DIR}/udf) add_subdirectory(${SRC_DIR}/runtime) add_subdirectory(${SRC_DIR}/testutil) add_subdirectory(${SRC_DIR}/tools) -# add_subdirectory(${SRC_DIR}/udf_samples) +add_subdirectory(${SRC_DIR}/udf_samples) # Utility CMake function to make specifying tests and benchmarks less verbose FUNCTION(ADD_BE_TEST TEST_NAME) diff --git a/be/src/exprs/agg_fn.cc b/be/src/exprs/agg_fn.cc index 74b44d0cc8..8b448bb729 100644 --- a/be/src/exprs/agg_fn.cc +++ b/be/src/exprs/agg_fn.cc @@ -92,37 +92,40 @@ Status AggFn::Init(const RowDescriptor& row_desc, RuntimeState* state) { } RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( - _fn.id, aggregate_fn.init_fn_symbol, _fn.hdfs_location, "", &init_fn_, &_cache_entry)); + _fn.id, aggregate_fn.init_fn_symbol, + _fn.hdfs_location, _fn.checksum, &init_fn_, &_cache_entry)); RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( - _fn.id, aggregate_fn.update_fn_symbol, _fn.hdfs_location, "", &update_fn_, &_cache_entry)); + _fn.id, aggregate_fn.update_fn_symbol, + _fn.hdfs_location, _fn.checksum, &update_fn_, &_cache_entry)); // Merge() is not defined for purely analytic function. if (!aggregate_fn.is_analytic_only_fn) { RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( - _fn.id, aggregate_fn.merge_fn_symbol, _fn.hdfs_location, "", &merge_fn_, &_cache_entry)); + _fn.id, aggregate_fn.merge_fn_symbol, + _fn.hdfs_location, _fn.checksum, &merge_fn_, &_cache_entry)); } // Serialize(), GetValue(), Remove() and Finalize() are optional if (!aggregate_fn.serialize_fn_symbol.empty()) { RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( _fn.id, aggregate_fn.serialize_fn_symbol, - _fn.hdfs_location, "", + _fn.hdfs_location, _fn.checksum, &serialize_fn_, &_cache_entry)); } if (!aggregate_fn.get_value_fn_symbol.empty()) { RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( - _fn.id, aggregate_fn.get_value_fn_symbol, _fn.hdfs_location, "", + _fn.id, aggregate_fn.get_value_fn_symbol, _fn.hdfs_location, _fn.checksum, &get_value_fn_, &_cache_entry)); } if (!aggregate_fn.remove_fn_symbol.empty()) { RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( _fn.id, aggregate_fn.remove_fn_symbol, - _fn.hdfs_location, "", + _fn.hdfs_location, _fn.checksum, &remove_fn_, &_cache_entry)); } if (!aggregate_fn.finalize_fn_symbol.empty()) { RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( _fn.id, _fn.aggregate_fn.finalize_fn_symbol, - _fn.hdfs_location, "", + _fn.hdfs_location, _fn.checksum, &finalize_fn_, &_cache_entry)); } return Status::OK; diff --git a/be/src/exprs/agg_fn_evaluator.cpp b/be/src/exprs/agg_fn_evaluator.cpp index ec8c6c6dc7..ac71d6e3a3 100755 --- a/be/src/exprs/agg_fn_evaluator.cpp +++ b/be/src/exprs/agg_fn_evaluator.cpp @@ -208,36 +208,42 @@ Status AggFnEvaluator::prepare( // Load the function pointers. RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( - _fn.id, _fn.aggregate_fn.init_fn_symbol, _hdfs_location, "", &_init_fn, NULL)); + _fn.id, _fn.aggregate_fn.init_fn_symbol, + _hdfs_location, _fn.checksum, &_init_fn, NULL)); RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( - _fn.id, _fn.aggregate_fn.update_fn_symbol, _hdfs_location, "", &_update_fn, NULL)); + _fn.id, _fn.aggregate_fn.update_fn_symbol, + _hdfs_location, _fn.checksum, &_update_fn, NULL)); // Merge() is not loaded if evaluating the agg fn as an analytic function. if (!_is_analytic_fn) { RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( - _fn.id, _fn.aggregate_fn.merge_fn_symbol, _hdfs_location, "", &_merge_fn, NULL)); + _fn.id, _fn.aggregate_fn.merge_fn_symbol, + _hdfs_location, _fn.checksum, &_merge_fn, NULL)); } // Serialize and Finalize are optional if (!_fn.aggregate_fn.serialize_fn_symbol.empty()) { RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( - _fn.id, _fn.aggregate_fn.serialize_fn_symbol, _hdfs_location, - "", &_serialize_fn, NULL)); + _fn.id, _fn.aggregate_fn.serialize_fn_symbol, + _hdfs_location, _fn.checksum, &_serialize_fn, NULL)); } if (!_fn.aggregate_fn.finalize_fn_symbol.empty()) { RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( - _fn.id, _fn.aggregate_fn.finalize_fn_symbol, _hdfs_location, "", &_finalize_fn, NULL)); + _fn.id, _fn.aggregate_fn.finalize_fn_symbol, + _hdfs_location, _fn.checksum, &_finalize_fn, NULL)); } if (!_fn.aggregate_fn.get_value_fn_symbol.empty()) { RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( - _fn.id, _fn.aggregate_fn.get_value_fn_symbol, _hdfs_location, "", &_get_value_fn, + _fn.id, _fn.aggregate_fn.get_value_fn_symbol, + _hdfs_location, _fn.checksum, &_get_value_fn, NULL)); } if (!_fn.aggregate_fn.remove_fn_symbol.empty()) { RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( - _fn.id, _fn.aggregate_fn.remove_fn_symbol, _hdfs_location, "", &_remove_fn, + _fn.id, _fn.aggregate_fn.remove_fn_symbol, + _hdfs_location, _fn.checksum, &_remove_fn, NULL)); } diff --git a/be/src/exprs/scalar_fn_call.cpp b/be/src/exprs/scalar_fn_call.cpp index fd6ffeaf57..5ff95a5325 100644 --- a/be/src/exprs/scalar_fn_call.cpp +++ b/be/src/exprs/scalar_fn_call.cpp @@ -90,7 +90,7 @@ Status ScalarFnCall::prepare( if (_scalar_fn == NULL) { if (SymbolsUtil::is_mangled(_fn.scalar_fn.symbol)) { status = UserFunctionCache::instance()->get_function_ptr( - _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, "", &_scalar_fn, &_cache_entry); + _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, _fn.checksum, &_scalar_fn, &_cache_entry); } else { std::vector arg_types; for (auto& t_type : _fn.arg_types) { @@ -101,7 +101,7 @@ Status ScalarFnCall::prepare( std::string symbol = SymbolsUtil::mangle_user_function( _fn.scalar_fn.symbol, arg_types, _fn.has_var_args, NULL); status = UserFunctionCache::instance()->get_function_ptr( - _fn.id, symbol, _fn.hdfs_location, "", &_scalar_fn, &_cache_entry); + _fn.id, symbol, _fn.hdfs_location, _fn.checksum, &_scalar_fn, &_cache_entry); } } #if 0 @@ -426,7 +426,7 @@ Status ScalarFnCall::get_udf(RuntimeState* state, Function** udf) { // interface with the code in impalad. void* fn_ptr = NULL; Status status = UserFunctionCache::instance()->get_function_ptr( - _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, "", &fn_ptr, &_cache_entry); + _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, _fn.checksum, &fn_ptr, &_cache_entry); if (!status.ok() && _fn.binary_type == TFunctionBinaryType::BUILTIN) { // Builtins symbols should exist unless there is a version mismatch. // TODO(zc ) @@ -542,7 +542,7 @@ Status ScalarFnCall::get_function(RuntimeState* state, const std::string& symbol if (_fn.binary_type == TFunctionBinaryType::NATIVE || _fn.binary_type == TFunctionBinaryType::BUILTIN) { return UserFunctionCache::instance()->get_function_ptr( - _fn.id, symbol, _fn.hdfs_location, "", fn, &_cache_entry); + _fn.id, symbol, _fn.hdfs_location, _fn.checksum, fn, &_cache_entry); } else { #if 0 DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::IR); diff --git a/be/src/udf_samples/CMakeLists.txt b/be/src/udf_samples/CMakeLists.txt new file mode 100644 index 0000000000..2e81b17de9 --- /dev/null +++ b/be/src/udf_samples/CMakeLists.txt @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# where to put generated libraries +set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/udf_samples") + +# where to put generated binaries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/udf_samples") + +add_library(udfsample SHARED udf_sample.cpp) diff --git a/be/src/udf_samples/udf_sample.cpp b/be/src/udf_samples/udf_sample.cpp new file mode 100644 index 0000000000..faa7280fb7 --- /dev/null +++ b/be/src/udf_samples/udf_sample.cpp @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "udf_samples/udf_sample.h" + +namespace doris_udf { + +IntVal AddUdf(FunctionContext* context, const IntVal& arg1, const IntVal& arg2) { + if (arg1.is_null || arg2.is_null) { + return IntVal::null(); + } + return {arg1.val + arg2.val}; +} + +} diff --git a/be/src/udf_samples/udf_sample.h b/be/src/udf_samples/udf_sample.h new file mode 100644 index 0000000000..cf123feaa0 --- /dev/null +++ b/be/src/udf_samples/udf_sample.h @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "udf/udf.h" + +namespace doris_udf { + +IntVal AddUdf(FunctionContext* context, const IntVal& arg1, const IntVal& arg2); + +} diff --git a/docs/help/Contents/Data Definition/ddl_stmt.md b/docs/help/Contents/Data Definition/ddl_stmt.md index 18d0e08a76..b52a634470 100644 --- a/docs/help/Contents/Data Definition/ddl_stmt.md +++ b/docs/help/Contents/Data Definition/ddl_stmt.md @@ -1094,3 +1094,42 @@ COLOCATE, JOIN, CREATE TABLE +# CREATE FUNCTION +## description + Used to create a UDF/UDAF/UDTF + Syntax: + CREATE [AGGREGATE] FUNCTION funcName (argType [, ...]) + RETURNS retType + PROPERTIES ( + k1=v1 [, k2=v2] + ) + + valid PROPERTIES: + "symbol": UDF's symbol, which Doris call this symbol's function to execute. MUST BE SET + "object_file": UDF library's URL, Doris use it to download library. MUST BE SET + +## example + 1. create a function "my_func", receive two int and return one int + CREATE FUNCTION my_func (int, int) RETURNS int + PROPERTIES ("symbol"="my_func_symbol", "object_file"="http://127.0.0.1/my_func.so") + 2. create a variadic function "my_func" + CREATE FUNCTION my_func (int, ...) RETURNS int + PROPERTIES ("symbol"="my_func_symbol", "object_file"="http://127.0.0.1/my_func.so") + +## keyword + CREATE, FUNCTION + +# DROP FUNCTION +## description + Used to drop a UDF/UDAF/UDTF + Syntax: + DROP FUNCTION funcName (argType [, ...]) + +## example + 1. drop a UDF whose name is my_func + DROP FUNCTION my_func (int, int) + 2. drop a variadic function + DROP FUNCTION my_func (int, ...) + +## keyword + DROP, FUNCTION diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 0577bce38a..fae9b60d56 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -215,7 +215,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROPERTIES, KW_PROPERTY, KW_QUERY, KW_QUOTA, KW_RANDOM, KW_RANGE, KW_READ, KW_RECOVER, KW_REGEXP, KW_RELEASE, KW_RENAME, - KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE, KW_REPLICA, KW_RESOURCE, KW_RESTORE, KW_REVOKE, + KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE, KW_REPLICA, KW_RESOURCE, KW_RESTORE, KW_RETURNS, KW_REVOKE, KW_RIGHT, KW_ROLE, KW_ROLES, KW_ROLLBACK, KW_ROLLUP, KW_ROW, KW_ROWS, KW_SCHEMAS, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SHOW, KW_SMALLINT, KW_SNAPSHOT, KW_SONAME, KW_SPLIT, KW_START, KW_STATUS, KW_STORAGE, KW_STRING, @@ -226,7 +226,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_VALUES, KW_VARCHAR, KW_VARIABLES, KW_VIEW, KW_WARNINGS, KW_WHEN, KW_WHITELIST, KW_WHERE, KW_WITH, KW_WORK, KW_WRITE; -terminal COMMA, DOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT; +terminal COMMA, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT; terminal BITAND, BITOR, BITXOR, BITNOT; terminal EQUAL, NOT, LESSTHAN, GREATERTHAN, SET_VAR; terminal String IDENT; @@ -304,6 +304,8 @@ nonterminal Boolean opt_order_param; nonterminal Boolean opt_nulls_order_param; nonterminal LimitElement limit_clause; nonterminal TypeDef type_def; +nonterminal List type_def_list; +nonterminal FunctionArgsDef func_args_def; nonterminal Type type; nonterminal Expr cast_expr, case_else_clause, analytic_expr; nonterminal LiteralExpr literal; @@ -379,7 +381,7 @@ nonterminal TablePattern tbl_pattern; nonterminal String ident_or_star; // Boolean -nonterminal Boolean opt_negative, opt_super_user, opt_is_allow_null, opt_is_key, opt_read_only; +nonterminal Boolean opt_negative, opt_super_user, opt_is_allow_null, opt_is_key, opt_read_only, opt_aggregate; nonterminal String opt_from_rollup, opt_to_rollup; nonterminal ColumnPosition opt_col_pos; @@ -830,20 +832,11 @@ create_stmt ::= RESULT = new CreateClusterStmt(name, properties, password); :}*/ /* Function */ - /* - | KW_CREATE KW_FUNCTION function_name:functionName LPAREN column_type_list:arguments RPAREN - column_type:retrunType KW_SONAME STRING_LITERAL:soPath - opt_properties:properties + | KW_CREATE opt_aggregate:isAggregate KW_FUNCTION function_name:functionName LPAREN func_args_def:args RPAREN + KW_RETURNS type_def:retrunType opt_properties:properties {: - RESULT = new CreateFunctionStmt(functionName, arguments, retrunType, soPath, properties, false); + RESULT = new CreateFunctionStmt(isAggregate, functionName, args, retrunType, properties); :} - | KW_CREATE KW_AGGREGATE KW_FUNCTION function_name:functionName LPAREN column_type_list:arguments RPAREN - column_type:retrunType KW_SONAME STRING_LITERAL:soPath - opt_properties:properties - {: - RESULT = new CreateFunctionStmt(functionName, arguments, retrunType, soPath, properties, true); - :} - */ /* Table */ | KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists table_name:name LPAREN column_definition_list:columns RPAREN opt_engine:engineName @@ -882,6 +875,16 @@ create_stmt ::= :} ; +opt_aggregate ::= + {: + RESULT = false; + :} + | KW_AGGREGATE + {: + RESULT = true; + :} + ; + opt_read_only ::= {: RESULT = false; @@ -1162,9 +1165,9 @@ drop_stmt ::= RESULT = new DropClusterStmt(ifExists, cluster); :} /* Function */ - | KW_DROP KW_FUNCTION function_name:functionName + | KW_DROP KW_FUNCTION function_name:functionName LPAREN func_args_def:args RPAREN {: - RESULT = new DropFunctionStmt(functionName); + RESULT = new DropFunctionStmt(functionName, args); :} /* Table */ | KW_DROP KW_TABLE opt_if_exists:ifExists table_name:name @@ -2989,6 +2992,33 @@ type_def ::= {: RESULT = new TypeDef(t); :} ; +type_def_list ::= + type_def:typeDef + {: + RESULT = Lists.newArrayList(typeDef); + :} + | type_def_list:types COMMA type_def:typeDef + {: + types.add(typeDef); + RESULT = types; + :} + ; + +func_args_def ::= + type_def_list:argTypes + {: + RESULT = new FunctionArgsDef(argTypes, false); + :} + | DOTDOTDOT + {: + RESULT = new FunctionArgsDef(Lists.newArrayList(), true); + :} + | type_def_list:argTypes COMMA DOTDOTDOT + {: + RESULT = new FunctionArgsDef(argTypes, true); + :} + ; + cast_expr ::= KW_CAST LPAREN expr:e KW_AS type_def:targetType RPAREN {: RESULT = new CastExpr(targetType, e); :} @@ -3704,6 +3734,8 @@ keyword ::= {: RESULT = id; :} | KW_RESTORE:id {: RESULT = id; :} + | KW_RETURNS:id + {: RESULT = id; :} | KW_ROLLBACK:id {: RESULT = id; :} | KW_ROLLUP:id diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java index 91acc49fa0..51b906af6d 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java @@ -17,38 +17,129 @@ package org.apache.doris.analysis; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSortedMap; +import org.apache.commons.codec.binary.Hex; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Function; +import org.apache.doris.catalog.ScalarFunction; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; -import java.util.List; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLConnection; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Map; -/** - * Created by zhaochun on 14-7-30. - */ -public class CreateFunctionStmt extends StatementBase { - private final FunctionName functionName; - private final List argumentType; - private final org.apache.doris.catalog.ColumnType returnType; - private final String soFilePath; - private final Map properties; - private final boolean isAggregate; +// create a user define function +public class CreateFunctionStmt extends DdlStmt { - public CreateFunctionStmt(FunctionName functionName, - List argumentType, - org.apache.doris.catalog.ColumnType returnType, String soFilePath, - Map properties, boolean isAggregate) { + private final FunctionName functionName; + private final boolean isAggregate; + private final FunctionArgsDef argsDef; + private final TypeDef returnType; + private final Map properties; + + // needed item set after analyzed + private String objectFile; + private Function function; + private String checksum; + + public CreateFunctionStmt(boolean isAggregate, FunctionName functionName, FunctionArgsDef argsDef, + TypeDef returnType, Map properties) { this.functionName = functionName; - this.argumentType = argumentType; - this.returnType = returnType; - this.soFilePath = soFilePath; - this.properties = properties; this.isAggregate = isAggregate; + this.argsDef = argsDef; + this.returnType = returnType; + if (properties == null) { + this.properties = ImmutableSortedMap.of(); + } else { + this.properties = ImmutableSortedMap.copyOf(properties, String.CASE_INSENSITIVE_ORDER); + } } + public FunctionName getFunctionName() { return functionName; } + public Function getFunction() { return function; } + @Override - public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); + + analyzeCommon(analyzer); + // check + if (isAggregate) { + analyzeUda(); + } else { + analyzeUdf(); + } + } + + private void analyzeCommon(Analyzer analyzer) throws AnalysisException { + // check function name + functionName.analyze(analyzer); + + // check operation privilege + if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + // check argument + argsDef.analyze(analyzer); + + returnType.analyze(analyzer); + + String OBJECT_FILE_KEY = "object_file"; + objectFile = properties.get(OBJECT_FILE_KEY); + if (Strings.isNullOrEmpty(objectFile)) { + throw new AnalysisException("No 'object_file' in properties"); + } + try { + computeObjectChecksum(); + } catch (IOException | NoSuchAlgorithmException e) { + throw new AnalysisException("cannot to compute object's checksum"); + } + } + + private void computeObjectChecksum() throws IOException, NoSuchAlgorithmException { + URL url = new URL(objectFile); + URLConnection urlConnection = url.openConnection(); + InputStream inputStream = urlConnection.getInputStream(); + + MessageDigest digest = MessageDigest.getInstance("MD5"); + byte[] buf = new byte[4096]; + int bytesRead = 0; + do { + bytesRead = inputStream.read(buf); + if (bytesRead < 0) { + break; + } + digest.update(buf, 0, bytesRead); + } while (true); + + checksum = Hex.encodeHexString(digest.digest()); + } + + private void analyzeUda() throws AnalysisException { + throw new AnalysisException("Not support aggregate function now."); + } + + private void analyzeUdf() throws AnalysisException { + final String SYMBOL_KEY = "symbol"; + String symbol = properties.get(SYMBOL_KEY); + if (Strings.isNullOrEmpty(symbol)) { + throw new AnalysisException("No 'symbol' in properties"); + } + function = ScalarFunction.createUdf( + functionName, argsDef.getArgTypes(), + returnType.getType(), argsDef.isVariadic(), + objectFile, symbol); + function.setChecksum(checksum); } @Override @@ -60,29 +151,19 @@ public class CreateFunctionStmt extends StatementBase { } stringBuilder.append("FUNCTION "); stringBuilder.append(functionName.toString()); - stringBuilder.append("("); - int i = 0; - for (org.apache.doris.catalog.ColumnType type : argumentType) { - if (i != 0) { - stringBuilder.append(", "); - } - stringBuilder.append(type.toString()); - i++; - } - stringBuilder.append(") RETURNS "); + stringBuilder.append(argsDef.toSql()); + stringBuilder.append(" RETURNS "); stringBuilder.append(returnType.toString()); - stringBuilder.append(" SONAME "); - stringBuilder.append(soFilePath); if (properties.size() > 0) { stringBuilder.append(" PROPERTIES ("); - i = 0; + int i = 0; for (Map.Entry entry : properties.entrySet()) { if (i != 0) { stringBuilder.append(", "); } - stringBuilder.append(entry.getKey()); + stringBuilder.append('"').append(entry.getKey()).append('"'); stringBuilder.append("="); - stringBuilder.append(entry.getValue()); + stringBuilder.append('"').append(entry.getValue()).append('"'); i++; } stringBuilder.append(")"); diff --git a/fe/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java b/fe/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java index 4ffc949064..8f8d166858 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java @@ -17,22 +17,32 @@ package org.apache.doris.analysis; -import org.apache.doris.common.AnalysisException; +import org.apache.doris.catalog.FunctionSearchDesc; import org.apache.doris.common.UserException; -/** - * Created by zhaochun on 14-7-30. - */ -public class DropFunctionStmt extends StatementBase { +public class DropFunctionStmt extends DdlStmt { private final FunctionName functionName; + private final FunctionArgsDef argsDef; - public DropFunctionStmt(FunctionName functionName) { + // set after analyzed + private FunctionSearchDesc function; + + public DropFunctionStmt(FunctionName functionName, FunctionArgsDef argsDef) { this.functionName = functionName; + this.argsDef = argsDef; } + public FunctionName getFunctionName() { return functionName; } + public FunctionSearchDesc getFunction() { return function; } + @Override - public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); + // analyze function name + functionName.analyze(analyzer); + // analyze arguments + argsDef.analyze(analyzer); + function = new FunctionSearchDesc(functionName, argsDef.getArgTypes(), argsDef.isVariadic()); } @Override diff --git a/fe/src/main/java/org/apache/doris/analysis/FunctionArgsDef.java b/fe/src/main/java/org/apache/doris/analysis/FunctionArgsDef.java new file mode 100644 index 0000000000..fbdc9a4fea --- /dev/null +++ b/fe/src/main/java/org/apache/doris/analysis/FunctionArgsDef.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import com.google.common.collect.Lists; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; + +import java.util.ArrayList; +import java.util.List; + +public class FunctionArgsDef { + private final List argTypeDefs; + private final boolean isVariadic; + + // set after analyze + private Type[] argTypes; + + public FunctionArgsDef(List argTypeDefs, boolean isVariadic) { + this.argTypeDefs = argTypeDefs; + this.isVariadic = isVariadic; + } + + public Type[] getArgTypes() { return argTypes; } + public boolean isVariadic() { return isVariadic; } + + public void analyze(Analyzer analyzer) throws AnalysisException { + argTypes = new Type[argTypeDefs.size()]; + int i = 0; + for (TypeDef typeDef : argTypeDefs) { + typeDef.analyze(analyzer); + argTypes[i++] = typeDef.getType(); + } + } + + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("("); + int i = 0; + for (TypeDef typeDef : argTypeDefs) { + if (i != 0) { + sb.append(", "); + } + sb.append(typeDef.toString()); + i++; + } + if (isVariadic) { + if (i != 0) { + sb.append(", "); + } + sb.append("..."); + } + sb.append(")"); + return sb.toString(); + } + + @Override + public String toString() { return toSql(); } +} diff --git a/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java index 4150b0da74..f93a47c204 100644 --- a/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java +++ b/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java @@ -17,7 +17,10 @@ package org.apache.doris.analysis; +import com.google.common.base.Strings; import org.apache.doris.catalog.AggregateFunction; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Function; import org.apache.doris.catalog.ScalarFunction; import org.apache.doris.catalog.Type; @@ -479,9 +482,21 @@ public class FunctionCallExpr extends Expr { fn = getBuiltinFunction(analyzer, fnName.getFunction(), new Type[]{compatibleType}, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF); - } else { + } else { + // now first find function in builtin functions fn = getBuiltinFunction(analyzer, fnName.getFunction(), collectChildReturnTypes(), Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF); + if (fn == null) { + String dbName = fnName.analyzeDb(analyzer); + if (!Strings.isNullOrEmpty(dbName)) { + Database db = Catalog.getInstance().getDb(dbName); + if (db != null) { + Function searchDesc = new Function( + fnName, collectChildReturnTypes(), Type.INVALID, false); + fn = db.getFunction(searchDesc, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF); + } + } + } } if (fn == null) { diff --git a/fe/src/main/java/org/apache/doris/analysis/FunctionName.java b/fe/src/main/java/org/apache/doris/analysis/FunctionName.java index b159648223..e61fa3f002 100644 --- a/fe/src/main/java/org/apache/doris/analysis/FunctionName.java +++ b/fe/src/main/java/org/apache/doris/analysis/FunctionName.java @@ -17,7 +17,11 @@ package org.apache.doris.analysis; +import com.google.common.base.Strings; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.load.PullLoadSourceInfo; @@ -117,6 +121,20 @@ public class FunctionName implements Writable { return db_ + "." + fn_; } + // used to analyze db element in function name, add cluster + public String analyzeDb(Analyzer analyzer) throws AnalysisException { + String db = db_; + if (db == null) { + db = analyzer.getDefaultDb(); + } else { + if (Strings.isNullOrEmpty(analyzer.getClusterName())) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_NAME_NULL); + } + db = ClusterNamespace.getFullName(analyzer.getClusterName(), db); + } + return db; + } + public void analyze(Analyzer analyzer) throws AnalysisException { if (fn_.length() == 0) { throw new AnalysisException("Function name can not be empty."); @@ -131,6 +149,17 @@ public class FunctionName implements Writable { if (Character.isDigit(fn_.charAt(0))) { throw new AnalysisException("Function cannot start with a digit: " + fn_); } + if (db_ == null) { + db_ = analyzer.getDefaultDb(); + if (Strings.isNullOrEmpty(db_)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + } else { + if (Strings.isNullOrEmpty(analyzer.getClusterName())) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_NAME_NULL); + } + db_ = ClusterNamespace.getFullName(analyzer.getClusterName(), db_); + } // If the function name is not fully qualified, it must not be the same as a builtin // if (!isFullyQualified() && OpcodeRegistry.instance().getFunctionOperator( diff --git a/fe/src/main/java/org/apache/doris/catalog/AggregateFunction.java b/fe/src/main/java/org/apache/doris/catalog/AggregateFunction.java index 5926aabaeb..8a3504038c 100644 --- a/fe/src/main/java/org/apache/doris/catalog/AggregateFunction.java +++ b/fe/src/main/java/org/apache/doris/catalog/AggregateFunction.java @@ -17,18 +17,26 @@ package org.apache.doris.catalog; -import java.util.ArrayList; -import java.util.List; +import static org.apache.doris.common.io.IOUtils.readOptionStringOrNull; +import static org.apache.doris.common.io.IOUtils.writeOptionString; + +import org.apache.doris.common.io.IOUtils; import org.apache.doris.analysis.FunctionName; -// import org.apache.doris.analysis.String; +import org.apache.doris.analysis.HdfsURI; import org.apache.doris.thrift.TAggregateFunction; import org.apache.doris.thrift.TFunction; import org.apache.doris.thrift.TFunctionBinaryType; -import org.apache.doris.analysis.HdfsURI; - -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +// import org.apache.doris.analysis.String; /** * Internal representation of an aggregate function. @@ -75,6 +83,10 @@ public class AggregateFunction extends Function { // empty input in BE). private boolean returnsNonNullOnEmpty; + // only used for serialization + protected AggregateFunction() { + } + public AggregateFunction(FunctionName fnName, ArrayList argTypes, Type retType, boolean hasVarArgs) { super(fnName, argTypes, retType, hasVarArgs); @@ -239,5 +251,51 @@ public class AggregateFunction extends Function { fn.setAggregate_fn(aggFn); return fn; } + + @Override + public void write(DataOutput output) throws IOException { + // 1. type + FunctionType.AGGREGATE.write(output); + // 2. parent + super.write(output); + // 3. self's member + boolean hasInterType = intermediateType != null; + output.writeBoolean(hasInterType); + if (hasInterType) { + ColumnType.write(output, intermediateType); + } + writeOptionString(output, updateFnSymbol); + writeOptionString(output, initFnSymbol); + writeOptionString(output, serializeFnSymbol); + writeOptionString(output, mergeFnSymbol); + writeOptionString(output, getValueFnSymbol); + writeOptionString(output, removeFnSymbol); + writeOptionString(output, finalizeFnSymbol); + + output.writeBoolean(ignoresDistinct); + output.writeBoolean(isAnalyticFn); + output.writeBoolean(isAggregateFn); + output.writeBoolean(returnsNonNullOnEmpty); + } + + @Override + public void readFields(DataInput input) throws IOException { + super.readFields(input); + + if (input.readBoolean()) { + intermediateType = ColumnType.read(input); + } + updateFnSymbol = readOptionStringOrNull(input); + initFnSymbol = readOptionStringOrNull(input); + serializeFnSymbol = readOptionStringOrNull(input); + mergeFnSymbol = readOptionStringOrNull(input); + getValueFnSymbol = readOptionStringOrNull(input); + removeFnSymbol = readOptionStringOrNull(input); + finalizeFnSymbol = readOptionStringOrNull(input); + ignoresDistinct = input.readBoolean(); + isAnalyticFn = input.readBoolean(); + isAggregateFn = input.readBoolean(); + returnsNonNullOnEmpty = input.readBoolean(); + } } diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index fbb3245dd2..4ad35d87a0 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -37,6 +37,7 @@ import org.apache.doris.analysis.CancelBackupStmt; import org.apache.doris.analysis.ColumnRenameClause; import org.apache.doris.analysis.CreateClusterStmt; import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateFunctionStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.CreateUserStmt; import org.apache.doris.analysis.CreateViewStmt; @@ -44,8 +45,10 @@ import org.apache.doris.analysis.DecommissionBackendClause; import org.apache.doris.analysis.DistributionDesc; import org.apache.doris.analysis.DropClusterStmt; import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropFunctionStmt; import org.apache.doris.analysis.DropPartitionClause; import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.analysis.FunctionName; import org.apache.doris.analysis.HashDistributionDesc; import org.apache.doris.analysis.KeysDesc; import org.apache.doris.analysis.LinkDbStmt; @@ -5927,5 +5930,41 @@ public class Catalog { db.writeUnlock(); } } + + public void createFunction(CreateFunctionStmt stmt) throws UserException { + FunctionName name = stmt.getFunctionName(); + Database db = getDb(name.getDb()); + if (db == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, name.getDb()); + } + db.addFunction(stmt.getFunction()); + } + + public void replayCreateFunction(Function function) { + String dbName = function.getFunctionName().getDb(); + Database db = getDb(dbName); + if (db == null) { + throw new Error("unknown database when replay log, db=" + dbName); + } + db.replayAddFunction(function); + } + + public void dropFunction(DropFunctionStmt stmt) throws UserException { + FunctionName name = stmt.getFunctionName(); + Database db = getDb(name.getDb()); + if (db == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, name.getDb()); + } + db.dropFunction(stmt.getFunction()); + } + + public void replayDropFunction(FunctionSearchDesc functionSearchDesc) { + String dbName = functionSearchDesc.getName().getDb(); + Database db = getDb(dbName); + if (db == null) { + throw new Error("unknown database when replay log, db=" + dbName); + } + db.replayDropFunction(functionSearchDesc); + } } diff --git a/fe/src/main/java/org/apache/doris/catalog/Database.java b/fe/src/main/java/org/apache/doris/catalog/Database.java index 46535ab386..a57be8c5a6 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/src/main/java/org/apache/doris/catalog/Database.java @@ -17,6 +17,9 @@ package org.apache.doris.catalog; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; import org.apache.doris.catalog.MaterializedIndex.IndexState; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.Table.TableType; @@ -25,15 +28,13 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.persist.CreateTableInfo; +import org.apache.doris.persist.EditLog; import org.apache.doris.system.SystemInfoService; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.zip.Adler32; @@ -84,6 +86,9 @@ public class Database extends MetaObject implements Writable { private Map idToTable; private Map nameToTable; + // user define function + private ConcurrentMap> name2Function = Maps.newConcurrentMap(); + private long dataQuotaBytes; public enum DbState { @@ -389,6 +394,16 @@ public class Database extends MetaObject implements Writable { Text.writeString(out, clusterName); Text.writeString(out, dbState.name()); Text.writeString(out, attachDbName); + + // write functions + out.writeInt(name2Function.size()); + for (Entry> entry : name2Function.entrySet()) { + Text.writeString(out, entry.getKey()); + out.writeInt(entry.getValue().size()); + for (Function function : entry.getValue()) { + function.write(out); + } + } } @Override @@ -418,6 +433,20 @@ public class Database extends MetaObject implements Writable { dbState = DbState.valueOf(Text.readString(in)); attachDbName = Text.readString(in); } + + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_47) { + int numEntries = in.readInt(); + for (int i = 0; i < numEntries; ++i) { + String name = Text.readString(in); + ImmutableList.Builder builder = ImmutableList.builder(); + int numFunctions = in.readInt(); + for (int j = 0; j < numFunctions; ++j) { + builder.add(Function.read(in)); + } + + name2Function.put(name, builder.build()); + } + } } public boolean equals(Object obj) { @@ -479,4 +508,90 @@ public class Database extends MetaObject implements Writable { public void setName(String name) { this.fullQualifiedName = name; } + + public synchronized void addFunction(Function function) throws UserException { + addFunctionImpl(function, false); + Catalog.getInstance().getEditLog().logAddFunction(function); + } + + public synchronized void replayAddFunction(Function function) { + try { + addFunctionImpl(function, true); + } catch (UserException e) { + Preconditions.checkArgument(false); + } + } + + // return true if add success, false + private void addFunctionImpl(Function function, boolean isReplay) throws UserException { + String functionName = function.getFunctionName().getFunction(); + List existFuncs = name2Function.get(functionName); + if (!isReplay) { + if (existFuncs != null) { + for (Function existFunc : existFuncs) { + if (function.compare(existFunc, Function.CompareMode.IS_IDENTICAL)) { + throw new UserException("function already exists"); + } + } + } + // Get function id for this UDF, use CatalogIdGenerator. Only get function id + // when isReplay is false + long functionId = Catalog.getInstance().getNextId(); + function.setId(functionId); + } + + ImmutableList.Builder builder = ImmutableList.builder(); + if (existFuncs != null) { + builder.addAll(existFuncs); + } + builder.add(function); + name2Function.put(functionName, builder.build()); + } + + public synchronized void dropFunction(FunctionSearchDesc function) throws UserException { + dropFunctionImpl(function); + Catalog.getInstance().getEditLog().logDropFunction(function); + } + + public synchronized void replayDropFunction(FunctionSearchDesc functionSearchDesc) { + try { + dropFunctionImpl(functionSearchDesc); + } catch (UserException e) { + Preconditions.checkArgument(false); + } + } + + private void dropFunctionImpl(FunctionSearchDesc function) throws UserException { + String functionName = function.getName().getFunction(); + List existFuncs = name2Function.get(functionName); + if (existFuncs == null) { + throw new UserException("Unknown function, function=" + function.toString()); + } + boolean isFound = false; + ImmutableList.Builder builder = ImmutableList.builder(); + for (Function existFunc : existFuncs) { + if (function.isIdentical(existFunc)) { + isFound = true; + } else { + builder.add(existFunc); + } + } + if (!isFound) { + throw new UserException("Unknown function, function=" + function.toString()); + } + ImmutableList newFunctions = builder.build(); + if (newFunctions.isEmpty()) { + name2Function.remove(functionName); + } else { + name2Function.put(functionName, newFunctions); + } + } + + public synchronized Function getFunction(Function desc, Function.CompareMode mode) { + List fns = name2Function.get(desc.getFunctionName().getFunction()); + if (fns == null) { + return null; + } + return Function.getFunction(fns, desc, mode); + } } diff --git a/fe/src/main/java/org/apache/doris/catalog/Function.java b/fe/src/main/java/org/apache/doris/catalog/Function.java index 40c28a89f6..18875245ce 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Function.java +++ b/fe/src/main/java/org/apache/doris/catalog/Function.java @@ -17,24 +17,29 @@ package org.apache.doris.catalog; -import org.apache.doris.analysis.FunctionName; -import org.apache.doris.analysis.HdfsURI; -import org.apache.doris.thrift.TFunction; -import org.apache.doris.thrift.TFunctionBinaryType; +import static org.apache.doris.common.io.IOUtils.writeOptionString; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; - +import org.apache.doris.analysis.FunctionName; +import org.apache.doris.analysis.HdfsURI; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.thrift.TFunction; +import org.apache.doris.thrift.TFunctionBinaryType; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.List; /** * Base class for all functions. */ -public class Function { +public class Function implements Writable { private static final Logger LOG = LogManager.getLogger(Function.class); // Enum for how to compare function signatures. @@ -67,12 +72,19 @@ public class Function { // Nonstrict supertypes broaden the definition of supertype to accept implicit casts // of arguments that may result in loss of precision - e.g. decimal to float. IS_NONSTRICT_SUPERTYPE_OF, + + // Used to drop UDF. User can drop function through name or name and arguments. + // If X is matchable with Y, this will only check X's element is identical with Y's. + // e.g. fn is matchable with fn(int), fn(float) and fn(int) is only matchable with fn(int). + IS_MATCHABLE } public static final long UNIQUE_FUNCTION_ID = 0; + // Function id, every function has a unique id. Now all built-in functions' id is 0 + private long id = 0; // User specified function name e.g. "Add" private FunctionName name; - private final Type retType; + private Type retType; // Array of parameter types. empty array if this function does not have parameters. private Type[] argTypes; // If true, this function has variable arguments. @@ -89,9 +101,25 @@ public class Function { private HdfsURI location; private TFunctionBinaryType binaryType; + // library's checksum to make sure all backends use one library to serve user's request + private String checksum = ""; + + // Only used for serialization + protected Function() { + } + public Function(FunctionName name, Type[] argTypes, Type retType, boolean varArgs) { + this(0, name, argTypes, retType, varArgs); + } + + public Function(FunctionName name, List args, Type retType, boolean varArgs) { + this(0, name, args, retType, varArgs); + } + + public Function(long id, FunctionName name, Type[] argTypes, Type retType, boolean hasVarArgs) { + this.id = id; this.name = name; - this.hasVarArgs = varArgs; + this.hasVarArgs = hasVarArgs; if (argTypes == null) { this.argTypes = new Type[0]; } else { @@ -100,12 +128,12 @@ public class Function { this.retType = retType; } - public Function(FunctionName name, List args, Type retType, boolean varArgs) { - this(name, (Type[]) null, retType, varArgs); - if (args.size() > 0) { - argTypes = args.toArray(new Type[args.size()]); + public Function(long id, FunctionName name, List argTypes, Type retType, boolean hasVarArgs) { + this(id, name, (Type[]) null, retType, hasVarArgs); + if (argTypes.size() > 0) { + this.argTypes = argTypes.toArray(new Type[argTypes.size()]); } else { - argTypes = new Type[0]; + this.argTypes = new Type[0]; } } @@ -174,6 +202,11 @@ public class Function { hasVarArgs = v; } + public void setId(long functionId) { this.id = functionId; } + public long getId() { return id; } + public void setChecksum(String checksum) { this.checksum = checksum; } + public String getChecksum() { return checksum; } + // Returns a string with the signature in human readable format: // FnName(argtype1, argtyp2). e.g. Add(int, int) public String signatureString() { @@ -197,6 +230,8 @@ public class Function { return isSubtype(other); case IS_NONSTRICT_SUPERTYPE_OF: return isAssignCompatible(other); + case IS_MATCHABLE: + return isMatchable(other); default: Preconditions.checkState(false); return false; @@ -257,6 +292,27 @@ public class Function { return true; } + private boolean isMatchable(Function o) { + if (!o.name.equals(name)) { + return false; + } + if (argTypes != null) { + if (o.argTypes.length != this.argTypes.length) { + return false; + } + if (o.hasVarArgs != this.hasVarArgs) { + return false; + } + for (int i = 0; i < this.argTypes.length; ++i) { + if (!o.argTypes[i].matchesType(this.argTypes[i])) { + return false; + } + } + } + return true; + + } + private boolean isIdentical(Function o) { if (!o.name.equals(name)) { return false; @@ -364,6 +420,10 @@ public class Function { fn.setHas_var_args(hasVarArgs); // TODO: Comment field is missing? // fn.setComment(comment) + fn.setId(id); + if (!checksum.isEmpty()) { + fn.setChecksum(checksum); + } return fn; } @@ -439,4 +499,146 @@ public class Function { return ""; } } + + public static Function getFunction(List fns, Function desc, CompareMode mode) { + if (fns == null) { + return null; + } + // First check for identical + for (Function f : fns) { + if (f.compare(desc, Function.CompareMode.IS_IDENTICAL)) { + return f; + } + } + if (mode == Function.CompareMode.IS_IDENTICAL) { + return null; + } + + // Next check for indistinguishable + for (Function f : fns) { + if (f.compare(desc, Function.CompareMode.IS_INDISTINGUISHABLE)) { + return f; + } + } + if (mode == Function.CompareMode.IS_INDISTINGUISHABLE) { + return null; + } + + // Next check for strict supertypes + for (Function f : fns) { + if (f.compare(desc, Function.CompareMode.IS_SUPERTYPE_OF)) { + return f; + } + } + if (mode == Function.CompareMode.IS_SUPERTYPE_OF) { + return null; + } + // Finally check for non-strict supertypes + for (Function f : fns) { + if (f.compare(desc, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF)) { + return f; + } + } + return null; + } + + enum FunctionType { + ORIGIN(0), + SCALAR(1), + AGGREGATE(2); + + private int code; + + FunctionType(int code) { + this.code = code; + } + public int getCode() { + return code; + } + + public static FunctionType fromCode(int code) { + switch (code) { + case 0: + return ORIGIN; + case 1: + return SCALAR; + case 2: + return AGGREGATE; + } + return null; + } + + public void write(DataOutput output) throws IOException { + output.writeInt(code); + } + public static FunctionType read(DataInput input) throws IOException { + return fromCode(input.readInt()); + } + }; + + protected void writeFields(DataOutput output) throws IOException { + output.writeLong(id); + name.write(output); + ColumnType.write(output, retType); + output.writeInt(argTypes.length); + for (Type type : argTypes) { + ColumnType.write(output, type); + } + output.writeBoolean(hasVarArgs); + output.writeBoolean(userVisible); + output.writeInt(binaryType.getValue()); + // write library URL + String libUrl = ""; + if (location != null) { + libUrl = location.toString(); + } + writeOptionString(output, libUrl); + writeOptionString(output, checksum); + } + + @Override + public void write(DataOutput output) throws IOException { + throw new Error("Origin function cannot be serialized"); + } + + @Override + public void readFields(DataInput input) throws IOException { + id = input.readLong(); + name = FunctionName.read(input); + retType = ColumnType.read(input); + int numArgs = input.readInt(); + argTypes = new Type[numArgs]; + for (int i = 0; i < numArgs; ++i) { + argTypes[i] = ColumnType.read(input); + } + hasVarArgs = input.readBoolean(); + userVisible = input.readBoolean(); + binaryType = TFunctionBinaryType.findByValue(input.readInt()); + + boolean hasLocation = input.readBoolean(); + if (hasLocation) { + location = new HdfsURI(Text.readString(input)); + } + boolean hasChecksum = input.readBoolean(); + if (hasChecksum) { + checksum = Text.readString(input); + } + } + + public static Function read(DataInput input) throws IOException { + Function function; + FunctionType functionType = FunctionType.read(input); + switch (functionType) { + case SCALAR: + function = new ScalarFunction(); + break; + case AGGREGATE: + function = new AggregateFunction(); + break; + default: + throw new Error("Unsupported function type, type=" + functionType); + } + function.readFields(input); + return function; + } } diff --git a/fe/src/main/java/org/apache/doris/catalog/FunctionSearchDesc.java b/fe/src/main/java/org/apache/doris/catalog/FunctionSearchDesc.java new file mode 100644 index 0000000000..95d53a1f79 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/catalog/FunctionSearchDesc.java @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.analysis.FunctionName; +import org.apache.doris.common.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +// Used to search a function +public class FunctionSearchDesc implements Writable { + private FunctionName name; + private Type[] argTypes; + private boolean isVariadic; + + private FunctionSearchDesc() {} + + public FunctionSearchDesc(FunctionName name, Type[] argTypes, boolean isVariadic) { + this.name = name; + this.argTypes = argTypes; + this.isVariadic = isVariadic; + } + + public FunctionName getName() { return name; } + public Type[] getArgTypes() { return argTypes; } + public boolean isVariadic() { return isVariadic; } + + public boolean isIdentical(Function function) { + if (!name.equals(function.getFunctionName())) { + return false; + } + + if (argTypes.length != function.getArgs().length) { + return false; + } + if (isVariadic != function.hasVarArgs()) { + return false; + } + for (int i = 0; i < argTypes.length; ++i) { + if (!argTypes[i].matchesType(function.getArgs()[i])) { + return false; + } + } + return true; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(name.toString()).append("("); + int i = 0; + for (Type type : argTypes) { + if (i != 0) { + sb.append(", "); + } + sb.append(type.toString()); + i++; + } + if (isVariadic) { + if (i != 0) { + sb.append(", "); + } + sb.append("..."); + } + sb.append(")"); + return sb.toString(); + } + + @Override + public void write(DataOutput out) throws IOException { + name.write(out); + // write args + out.writeShort(argTypes.length); + for (Type type : argTypes) { + ColumnType.write(out, type); + } + out.writeBoolean(isVariadic); + } + + @Override + public void readFields(DataInput in) throws IOException { + name = FunctionName.read(in); + // read args + argTypes = new Type[in.readShort()]; + for (int i = 0; i < argTypes.length; ++i) { + argTypes[i] = ColumnType.read(in); + } + // read variadic + isVariadic = in.readBoolean(); + } + + public static FunctionSearchDesc read(DataInput input) throws IOException { + FunctionSearchDesc function = new FunctionSearchDesc(); + function.readFields(input); + return function; + } +} diff --git a/fe/src/main/java/org/apache/doris/catalog/ScalarFunction.java b/fe/src/main/java/org/apache/doris/catalog/ScalarFunction.java index c6f54c1efa..dc54d50516 100644 --- a/fe/src/main/java/org/apache/doris/catalog/ScalarFunction.java +++ b/fe/src/main/java/org/apache/doris/catalog/ScalarFunction.java @@ -17,21 +17,24 @@ package org.apache.doris.catalog; -import java.util.ArrayList; -import java.util.List; +import static org.apache.doris.common.io.IOUtils.writeOptionString; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.doris.analysis.FunctionName; import org.apache.doris.analysis.HdfsURI; -import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.io.Text; import org.apache.doris.thrift.TFunction; import org.apache.doris.thrift.TFunctionBinaryType; import org.apache.doris.thrift.TScalarFunction; -// import org.apache.doris.thrift.TSymbolType; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +// import org.apache.doris.thrift.TSymbolType; /** * Internal representation of a scalar function. @@ -43,6 +46,15 @@ public class ScalarFunction extends Function { private String prepareFnSymbol; private String closeFnSymbol; + // Only used for serialization + protected ScalarFunction() { + } + + public ScalarFunction( + FunctionName fnName, Type[] argTypes, Type retType, boolean hasVarArgs) { + super(fnName, argTypes, retType, hasVarArgs); + } + public ScalarFunction( FunctionName fnName, ArrayList argTypes, Type retType, boolean hasVarArgs) { super(fnName, argTypes, retType, hasVarArgs); @@ -204,6 +216,18 @@ public class ScalarFunction extends Function { return fn; } + public static ScalarFunction createUdf( + FunctionName name, Type[] args, + Type returnType, boolean isVariadic, + String objectFile, String symbol) { + ScalarFunction fn = new ScalarFunction(name, args, returnType, isVariadic); + fn.setBinaryType(TFunctionBinaryType.HIVE); + fn.setUserVisible(true); + fn.symbolName = symbol; + fn.setLocation(new HdfsURI(objectFile)); + return fn; + } + public void setSymbolName(String s) { symbolName = s; } public void setPrepareFnSymbol(String s) { prepareFnSymbol = s; } public void setCloseFnSymbol(String s) { closeFnSymbol = s; } @@ -236,4 +260,28 @@ public class ScalarFunction extends Function { } return fn; } + + @Override + public void write(DataOutput output) throws IOException { + // 1. type + FunctionType.SCALAR.write(output); + // 2. parent + super.writeFields(output); + // 3.symbols + Text.writeString(output, symbolName); + writeOptionString(output, prepareFnSymbol); + writeOptionString(output, closeFnSymbol); + } + + @Override + public void readFields(DataInput input) throws IOException { + super.readFields(input); + symbolName = Text.readString(input); + if (input.readBoolean()) { + prepareFnSymbol = Text.readString(input); + } + if (input.readBoolean()) { + closeFnSymbol = Text.readString(input); + } + } } diff --git a/fe/src/main/java/org/apache/doris/catalog/Uda.java b/fe/src/main/java/org/apache/doris/catalog/Uda.java deleted file mode 100644 index 1dc44a9734..0000000000 --- a/fe/src/main/java/org/apache/doris/catalog/Uda.java +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.catalog; - -import org.apache.doris.analysis.FunctionArgs; -import org.apache.doris.analysis.FunctionName; -import org.apache.doris.analysis.HdfsURI; -import org.apache.doris.thrift.TAggregateFunction; -import org.apache.doris.thrift.TFunction; - -import java.util.List; - -/** - * Internal representation of a UDA. - */ -public class Uda extends Function { - private Type intermediateType_; - - // The symbol inside the binary at location_ that contains this particular. - // They can be null if it is not required. - private String updateFnSymbol_; - private String initFnSymbol_; - private String serializeFnSymbol_; - private String mergeFnSymbol_; - private String finalizeFnSymbol_; - - public Uda(long id, FunctionName fnName, FunctionArgs args, Type retType) { - super(fnName, args.argTypes, retType, args.hasVarArgs); - } - - public Uda(long id, FunctionName fnName, List argTypes, Type retType, - Type intermediateType, HdfsURI location, String updateFnSymbol, String initFnSymbol, - String serializeFnSymbol, String mergeFnSymbol, String finalizeFnSymbol) { - super(fnName, argTypes, retType, false); - setLocation(location); - intermediateType_ = intermediateType; - updateFnSymbol_ = updateFnSymbol; - initFnSymbol_ = initFnSymbol; - serializeFnSymbol_ = serializeFnSymbol; - mergeFnSymbol_ = mergeFnSymbol; - finalizeFnSymbol_ = finalizeFnSymbol; - } - - public String getUpdateFnSymbol() { - return updateFnSymbol_; - } - - public void setUpdateFnSymbol(String fn) { - updateFnSymbol_ = fn; - } - - public String getInitFnSymbol() { - return initFnSymbol_; - } - - public void setInitFnSymbol(String fn) { - initFnSymbol_ = fn; - } - - public String getSerializeFnSymbol() { - return serializeFnSymbol_; - } - - public void setSerializeFnSymbol(String fn) { - serializeFnSymbol_ = fn; - } - - public String getMergeFnSymbol() { - return mergeFnSymbol_; - } - - public void setMergeFnSymbol(String fn) { - mergeFnSymbol_ = fn; - } - - public String getFinalizeFnSymbol() { - return finalizeFnSymbol_; - } - - public void setFinalizeFnSymbol(String fn) { - finalizeFnSymbol_ = fn; - } - - public Type getIntermediateType() { - return intermediateType_; - } - - public void setIntermediateType(Type t) { - intermediateType_ = t; - } - - @Override - public TFunction toThrift() { - TFunction fn = super.toThrift(); - TAggregateFunction uda = new TAggregateFunction(); - uda.setUpdate_fn_symbol(updateFnSymbol_); - uda.setInit_fn_symbol(initFnSymbol_); - if (serializeFnSymbol_ == null) { - uda.setSerialize_fn_symbol(serializeFnSymbol_); - } - uda.setMerge_fn_symbol(mergeFnSymbol_); - uda.setFinalize_fn_symbol(finalizeFnSymbol_); - uda.setIntermediate_type(intermediateType_.toThrift()); - fn.setAggregate_fn(uda); - return fn; - } -} diff --git a/fe/src/main/java/org/apache/doris/catalog/Udf.java b/fe/src/main/java/org/apache/doris/catalog/Udf.java deleted file mode 100644 index 741c978286..0000000000 --- a/fe/src/main/java/org/apache/doris/catalog/Udf.java +++ /dev/null @@ -1,65 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.catalog; - -import org.apache.doris.analysis.FunctionArgs; -import org.apache.doris.analysis.FunctionName; -import org.apache.doris.analysis.HdfsURI; -import org.apache.doris.thrift.TFunction; -import org.apache.doris.thrift.TScalarFunction; - -import java.util.List; - - -/** - * Internal representation of a UDF. - * TODO: unify this with builtins. - */ - -public class Udf extends Function { - // The name inside the binary at location_ that contains this particular - // UDF. e.g. org.example.MyUdf.class. - private String symbolName_; - - public Udf(long id, FunctionName fnName, FunctionArgs args, Type retType) { - super(fnName, args.argTypes, retType, args.hasVarArgs); - } - - public Udf(long id, FunctionName fnName, List argTypes, Type retType, - HdfsURI location, String symbolName) { - super(fnName, argTypes, retType, false); - setLocation(location); - setSymbolName(symbolName); - } - - public String getSymbolName() { - return symbolName_; - } - - public void setSymbolName(String s) { - symbolName_ = s; - } - - @Override - public TFunction toThrift() { - TFunction fn = super.toThrift(); - fn.setScalar_fn(new TScalarFunction()); - fn.getScalar_fn().setSymbol(symbolName_); - return fn; - } -} diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java b/fe/src/main/java/org/apache/doris/common/FeConstants.java index 06079ab7ff..02c8c70d3f 100644 --- a/fe/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java @@ -35,5 +35,5 @@ public class FeConstants { // general model // Current meta data version. Use this version to write journals and image - public static int meta_version = FeMetaVersion.VERSION_46; + public static int meta_version = FeMetaVersion.VERSION_47; } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index 2579ab3b8c..bc269031a6 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -101,5 +101,7 @@ public final class FeMetaVersion { // colocate join public static final int VERSION_46 = 46; + // UDF + public static final int VERSION_47 = 47; // TODO(ml):VERSION_ROUTINE_LOAD add extra in transaction state for routine load } diff --git a/fe/src/main/java/org/apache/doris/common/io/IOUtils.java b/fe/src/main/java/org/apache/doris/common/io/IOUtils.java index 5dadf85793..e0d9b32285 100644 --- a/fe/src/main/java/org/apache/doris/common/io/IOUtils.java +++ b/fe/src/main/java/org/apache/doris/common/io/IOUtils.java @@ -17,8 +17,11 @@ package org.apache.doris.common.io; +import com.google.common.base.Strings; import org.apache.logging.log4j.Logger; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -250,4 +253,18 @@ public class IOUtils { } } } + + public static void writeOptionString(DataOutput output, String value) throws IOException { + boolean hasValue = !Strings.isNullOrEmpty(value); + output.writeBoolean(hasValue); + if (hasValue) { + Text.writeString(output, value); + } + } + public static String readOptionStringOrNull(DataInput input) throws IOException { + if (input.readBoolean()) { + return Text.readString(input); + } + return null; + } } diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java index 79782e950c..62e2859017 100644 --- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -26,6 +26,8 @@ import org.apache.doris.backup.RestoreJob; import org.apache.doris.backup.RestoreJob_D; import org.apache.doris.catalog.BrokerMgr; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Function; +import org.apache.doris.catalog.FunctionSearchDesc; import org.apache.doris.cluster.BaseParam; import org.apache.doris.cluster.Cluster; import org.apache.doris.common.io.Text; @@ -394,6 +396,16 @@ public class JournalEntity implements Writable { needRead = false; break; } + case OperationType.OP_ADD_FUNCTION: { + data = Function.read(in); + needRead = false; + break; + } + case OperationType.OP_DROP_FUNCTION: { + data = FunctionSearchDesc.read(in); + needRead = false; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java index 1b6c14d880..b59143d9c0 100644 --- a/fe/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java @@ -29,6 +29,8 @@ import org.apache.doris.backup.RestoreJob_D; import org.apache.doris.catalog.BrokerMgr; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Function; +import org.apache.doris.catalog.FunctionSearchDesc; import org.apache.doris.cluster.BaseParam; import org.apache.doris.cluster.Cluster; import org.apache.doris.common.Config; @@ -642,6 +644,16 @@ public class EditLog { Catalog.getCurrentHeartbeatMgr().replayHearbeat(hbPackage); break; } + case OperationType.OP_ADD_FUNCTION: { + final Function function = (Function) journal.getData(); + Catalog.getCurrentCatalog().replayCreateFunction(function); + break; + } + case OperationType.OP_DROP_FUNCTION: { + FunctionSearchDesc function = (FunctionSearchDesc) journal.getData(); + Catalog.getCurrentCatalog().replayDropFunction(function); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -1128,4 +1140,12 @@ public class EditLog { public void logHeartbeat(HbPackage hbPackage) { logEdit(OperationType.OP_HEARTBEAT, hbPackage); } + + public void logAddFunction(Function function) { + logEdit(OperationType.OP_ADD_FUNCTION, function); + } + + public void logDropFunction(FunctionSearchDesc function) { + logEdit(OperationType.OP_DROP_FUNCTION, function); + } } diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java index b7637542b8..00d42f0404 100644 --- a/fe/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java @@ -150,4 +150,8 @@ public class OperationType { // routine load 110~120 public static final short OP_ROUTINE_LOAD_JOB = 110; + // UDF 130-140 + public static final short OP_ADD_FUNCTION = 130; + public static final short OP_DROP_FUNCTION = 131; + } diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java index 778e05ef53..8f93b71cbd 100644 --- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -29,6 +29,7 @@ import org.apache.doris.analysis.CancelBackupStmt; import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.CreateClusterStmt; import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateFunctionStmt; import org.apache.doris.analysis.CreateRepositoryStmt; import org.apache.doris.analysis.CreateRoleStmt; import org.apache.doris.analysis.CreateTableStmt; @@ -38,6 +39,7 @@ import org.apache.doris.analysis.DdlStmt; import org.apache.doris.analysis.DeleteStmt; import org.apache.doris.analysis.DropClusterStmt; import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropFunctionStmt; import org.apache.doris.analysis.DropRepositoryStmt; import org.apache.doris.analysis.DropRoleStmt; import org.apache.doris.analysis.DropTableStmt; @@ -79,6 +81,10 @@ public class DdlExecutor { catalog.createDb((CreateDbStmt) ddlStmt); } else if (ddlStmt instanceof DropDbStmt) { catalog.dropDb((DropDbStmt) ddlStmt); + } else if (ddlStmt instanceof CreateFunctionStmt) { + catalog.createFunction((CreateFunctionStmt) ddlStmt); + } else if (ddlStmt instanceof DropFunctionStmt) { + catalog.dropFunction((DropFunctionStmt) ddlStmt); } else if (ddlStmt instanceof CreateTableStmt) { catalog.createTable((CreateTableStmt) ddlStmt); } else if (ddlStmt instanceof DropTableStmt) { diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index dcf27cb4b4..5f9af92ae0 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -752,7 +752,7 @@ public class StmtExecutor { try { DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt); context.getState().setOk(); - } catch (DdlException e) { + } catch (UserException e) { // Return message to info client what happened. context.getState().setError(e.getMessage()); } catch (Exception e) { diff --git a/fe/src/main/jflex/sql_scanner.flex b/fe/src/main/jflex/sql_scanner.flex index 701b2db439..6c988af34f 100644 --- a/fe/src/main/jflex/sql_scanner.flex +++ b/fe/src/main/jflex/sql_scanner.flex @@ -244,6 +244,7 @@ import org.apache.doris.common.util.SqlUtils; keywordMap.put("repositories", new Integer(SqlParserSymbols.KW_REPOSITORIES)); keywordMap.put("resource", new Integer(SqlParserSymbols.KW_RESOURCE)); keywordMap.put("restore", new Integer(SqlParserSymbols.KW_RESTORE)); + keywordMap.put("returns", new Integer(SqlParserSymbols.KW_RETURNS)); keywordMap.put("revoke", new Integer(SqlParserSymbols.KW_REVOKE)); keywordMap.put("right", new Integer(SqlParserSymbols.KW_RIGHT)); keywordMap.put("rlike", new Integer(SqlParserSymbols.KW_REGEXP)); @@ -345,6 +346,7 @@ import org.apache.doris.common.util.SqlUtils; tokenIdMap.put(new Integer(SqlParserSymbols.STAR), "*"); tokenIdMap.put(new Integer(SqlParserSymbols.AT), "@"); tokenIdMap.put(new Integer(SqlParserSymbols.BITOR), "|"); + tokenIdMap.put(new Integer(SqlParserSymbols.DOTDOTDOT), "..."); tokenIdMap.put(new Integer(SqlParserSymbols.DOT), "."); tokenIdMap.put(new Integer(SqlParserSymbols.STRING_LITERAL), "STRING LITERAL"); tokenIdMap.put(new Integer(SqlParserSymbols.EOF), "EOF"); @@ -441,6 +443,8 @@ EndOfLineComment = "--" {NonTerminator}* {LineTerminator}? %% +"..." { return newToken(SqlParserSymbols.DOTDOTDOT, null); } + // single-character tokens "," { return newToken(SqlParserSymbols.COMMA, null); } "." { return newToken(SqlParserSymbols.DOT, null); }