Support prepare/close in UDF (#1985)
The prepare/close step of scalar function is already supported in execution framework, We only need to do is that support it in syntax and meta in frontend. In addition, 'Hive' binary type of scalar function NOT supports prepare/close step, we need to make it supports.
This commit is contained in:
@ -540,7 +540,8 @@ Status ScalarFnCall::get_udf(RuntimeState* state, Function** udf) {
|
||||
|
||||
Status ScalarFnCall::get_function(RuntimeState* state, const std::string& symbol, void** fn) {
|
||||
if (_fn.binary_type == TFunctionBinaryType::NATIVE
|
||||
|| _fn.binary_type == TFunctionBinaryType::BUILTIN) {
|
||||
|| _fn.binary_type == TFunctionBinaryType::BUILTIN
|
||||
|| _fn.binary_type == TFunctionBinaryType::HIVE) {
|
||||
return UserFunctionCache::instance()->get_function_ptr(
|
||||
_fn.id, symbol, _fn.hdfs_location, _fn.checksum, fn, &_cache_entry);
|
||||
} else {
|
||||
|
||||
@ -26,4 +26,9 @@ IntVal AddUdf(FunctionContext* context, const IntVal& arg1, const IntVal& arg2)
|
||||
return {arg1.val + arg2.val};
|
||||
}
|
||||
|
||||
/// --- Prepare / Close Functions ---
|
||||
/// ---------------------------------
|
||||
void AddUdfPrepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) {}
|
||||
void AddUdfClose(FunctionContext* context, FunctionContext::FunctionStateScope scope) {}
|
||||
|
||||
}
|
||||
|
||||
@ -23,4 +23,15 @@ namespace doris_udf {
|
||||
|
||||
IntVal AddUdf(FunctionContext* context, const IntVal& arg1, const IntVal& arg2);
|
||||
|
||||
/// --- Prepare / Close Functions ---
|
||||
/// ---------------------------------
|
||||
|
||||
/// The UDF can optionally include a prepare function. The prepare function is called
|
||||
/// before any calls to the UDF to evaluate values.
|
||||
void AddUdfPrepare(FunctionContext* context, FunctionContext::FunctionStateScope scope);
|
||||
|
||||
/// The UDF can also optionally include a close function. The close function is called
|
||||
/// after all calls to the UDF have completed.
|
||||
void AddUdfClose(FunctionContext* context, FunctionContext::FunctionStateScope scope);
|
||||
|
||||
}
|
||||
|
||||
@ -39,6 +39,10 @@ CREATE [AGGREGATE] FUNCTION function_name
|
||||
> "finalize_fn": 聚合函数获取最后结果的函数签名。对于聚合函数是可选项,如果没有指定,将会使用默认的获取结果函数
|
||||
>
|
||||
> "md5": 函数动态链接库的MD5值,用于校验下载的内容是否正确。此选项是可选项
|
||||
>
|
||||
> "prepare_fn": 自定义函数的prepare函数的函数签名,用于从动态库里面找到prepare函数入口。此选项对于自定义函数是可选项
|
||||
>
|
||||
> "close_fn": 自定义函数的close函数的函数签名,用于从动态库里面找到close函数入口。此选项对于自定义函数是可选项
|
||||
|
||||
|
||||
此语句创建一个自定义函数。执行此命令需要用户拥有 `ADMIN` 权限。
|
||||
@ -49,24 +53,35 @@ CREATE [AGGREGATE] FUNCTION function_name
|
||||
|
||||
1. 创建一个自定义标量函数
|
||||
|
||||
```
|
||||
CREATE FUNCTION my_add(INT, INT) RETURNS INT PROPERTIES (
|
||||
"symbol" = "_ZN9doris_udf6AddUdfEPNS_15FunctionContextERKNS_6IntValES4_",
|
||||
"object_file" = "http://host:port/libmyadd.so"
|
||||
);
|
||||
```
|
||||
```
|
||||
CREATE FUNCTION my_add(INT, INT) RETURNS INT PROPERTIES (
|
||||
"symbol" = "_ZN9doris_udf6AddUdfEPNS_15FunctionContextERKNS_6IntValES4_",
|
||||
"object_file" = "http://host:port/libmyadd.so"
|
||||
);
|
||||
```
|
||||
|
||||
2. 创建一个有prepare/close函数的自定义标量函数
|
||||
|
||||
```
|
||||
CREATE FUNCTION my_add(INT, INT) RETURNS INT PROPERTIES (
|
||||
"symbol" = "_ZN9doris_udf6AddUdfEPNS_15FunctionContextERKNS_6IntValES4_",
|
||||
"prepare_fn" = "_ZN9doris_udf14AddUdf_prepareEPNS_15FunctionContextENS0_18FunctionStateScopeE",
|
||||
"close_fn" = "_ZN9doris_udf12AddUdf_closeEPNS_15FunctionContextENS0_18FunctionStateScopeE",
|
||||
"object_file" = "http://host:port/libmyadd.so"
|
||||
);
|
||||
```
|
||||
|
||||
2. 创建一个自定义聚合函数
|
||||
|
||||
```
|
||||
CREATE AGGREGATE FUNCTION my_count (BIGINT) RETURNS BIGINT PROPERTIES (
|
||||
"init_fn"="_ZN9doris_udf9CountInitEPNS_15FunctionContextEPNS_9BigIntValE",
|
||||
"update_fn"="_ZN9doris_udf11CountUpdateEPNS_15FunctionContextERKNS_6IntValEPNS_9BigIntValE",
|
||||
"merge_fn"="_ZN9doris_udf10CountMergeEPNS_15FunctionContextERKNS_9BigIntValEPS2_",
|
||||
"finalize_fn"="_ZN9doris_udf13CountFinalizeEPNS_15FunctionContextERKNS_9BigIntValE",
|
||||
"object_file"="http://host:port/libudasample.so"
|
||||
);
|
||||
```
|
||||
```
|
||||
CREATE AGGREGATE FUNCTION my_count (BIGINT) RETURNS BIGINT PROPERTIES (
|
||||
"init_fn"="_ZN9doris_udf9CountInitEPNS_15FunctionContextEPNS_9BigIntValE",
|
||||
"update_fn"="_ZN9doris_udf11CountUpdateEPNS_15FunctionContextERKNS_6IntValEPNS_9BigIntValE",
|
||||
"merge_fn"="_ZN9doris_udf10CountMergeEPNS_15FunctionContextERKNS_9BigIntValEPS2_",
|
||||
"finalize_fn"="_ZN9doris_udf13CountFinalizeEPNS_15FunctionContextERKNS_9BigIntValE",
|
||||
"object_file"="http://host:port/libudasample.so"
|
||||
);
|
||||
```
|
||||
|
||||
## keyword
|
||||
|
||||
|
||||
@ -4,25 +4,25 @@
|
||||
|
||||
```
|
||||
CREATE [AGGREGATE] FUNCTION function_name
|
||||
(angry type [...])
|
||||
RETURNS ret_type
|
||||
[INTERMEDIATE inter_type]
|
||||
[PROPERTIES ("key" = "value" [, ...]) ]
|
||||
(angry type [...])
|
||||
RETURNS ret_type
|
||||
[INTERMEDIATE inter_type]
|
||||
[PROPERTIES ("key" = "value" [, ...]) ]
|
||||
```
|
||||
|
||||
### Parameters
|
||||
|
||||
>` AGGREGATE `: If this is the case, it means that the created function is an aggregate function, otherwise it is a scalar function.
|
||||
>`AGGREGATE`: If this is the case, it means that the created function is an aggregate function, otherwise it is a scalar function.
|
||||
>
|
||||
>` Function_name': To create the name of the function, you can include the name of the database. For example: `db1.my_func'.
|
||||
>`Function_name`: To create the name of the function, you can include the name of the database. For example: `db1.my_func'.
|
||||
>
|
||||
>` arg_type': The parameter type of the function is the same as the type defined at the time of table building. Variable-length parameters can be represented by `,...'. If it is a variable-length type, the type of the variable-length part of the parameters is the same as the last non-variable-length parameter type.
|
||||
>` arg_type': The parameter type of the function is the same as the type defined at the time of table building. Variable-length parameters can be represented by `,...`. If it is a variable-length type, the type of the variable-length part of the parameters is the same as the last non-variable-length parameter type.
|
||||
>
|
||||
>` ret_type': Function return type.
|
||||
>`ret_type`: Function return type.
|
||||
>
|
||||
>` Inter_type': A data type used to represent the intermediate stage of an aggregate function.
|
||||
>`Inter_type`: A data type used to represent the intermediate stage of an aggregate function.
|
||||
>
|
||||
>` properties `: Used to set properties related to this function. Properties that can be set include
|
||||
>`properties`: Used to set properties related to this function. Properties that can be set include
|
||||
>
|
||||
> "Object_file": Custom function dynamic library URL path, currently only supports HTTP/HTTPS protocol, this path needs to remain valid throughout the life cycle of the function. This option is mandatory
|
||||
>
|
||||
@ -39,33 +39,47 @@ RETURNS ret_type
|
||||
> "finalize_fn": A function signature that aggregates functions to obtain the final result. For aggregation functions, it is optional. If not specified, the default fetch result function will be used.
|
||||
>
|
||||
> "md5": The MD5 value of the function dynamic link library, which is used to verify that the downloaded content is correct. This option is optional
|
||||
>
|
||||
> "prepare_fn": Function signature of the prepare function for finding the entry from the dynamic library. This option is optional for custom functions
|
||||
>
|
||||
> "close_fn": Function signature of the close function for finding the entry from the dynamic library. This option is optional for custom functions
|
||||
|
||||
|
||||
This statement creates a custom function. Executing this command requires that the user have `ADMIN'privileges.
|
||||
This statement creates a custom function. Executing this command requires that the user have `ADMIN` privileges.
|
||||
|
||||
If the `function_name'contains the database name, the custom function will be created in the corresponding database, otherwise the function will be created in the database where the current session is located. The name and parameters of the new function cannot be the same as functions already existing in the current namespace, otherwise the creation will fail. But only with the same name and different parameters can the creation be successful.
|
||||
If the `function_name` contains the database name, the custom function will be created in the corresponding database, otherwise the function will be created in the database where the current session is located. The name and parameters of the new function cannot be the same as functions already existing in the current namespace, otherwise the creation will fail. But only with the same name and different parameters can the creation be successful.
|
||||
|
||||
## example
|
||||
|
||||
1. Create a custom scalar function
|
||||
|
||||
```
|
||||
CREATE FUNCTION my_add(INT, INT) RETURNS INT PROPERTIES (
|
||||
"Symbol"=""\\\\\\\\ zn9doris\\\ udf6addudfepns\\ FunctionContexterkns\\ INTVales 4\,
|
||||
"object file" ="http://host:port /libmyadd.so"
|
||||
);
|
||||
```
|
||||
```
|
||||
CREATE FUNCTION my_add(INT, INT) RETURNS INT PROPERTIES (
|
||||
"symbol" = "_ZN9doris_udf6AddUdfEPNS_15FunctionContextERKNS_6IntValES4_",
|
||||
"object_file" ="http://host:port/libmyadd.so"
|
||||
);
|
||||
```
|
||||
2. Create a custom scalar function with prepare/close functions
|
||||
|
||||
2. Create a custom aggregation function
|
||||
```
|
||||
CREATE FUNCTION my_add(INT, INT) RETURNS INT PROPERTIES (
|
||||
"symbol" = "_ZN9doris_udf6AddUdfEPNS_15FunctionContextERKNS_6IntValES4_",
|
||||
"prepare_fn" = "_ZN9doris_udf14AddUdf_prepareEPNS_15FunctionContextENS0_18FunctionStateScopeE",
|
||||
"close_fn" = "_ZN9doris_udf12AddUdf_closeEPNS_15FunctionContextENS0_18FunctionStateScopeE",
|
||||
"object_file" = "http://host:port/libmyadd.so"
|
||||
);
|
||||
```
|
||||
|
||||
```
|
||||
CREATE AGGREGATE FUNCTION my_count (BIGINT) RETURNS BIGINT PROPERTIES (
|
||||
"init u fn"= "ZN9doris, udf9CountInitEPNS -u 15FunctionContextEPNS, u 9BigIntValE",
|
||||
"Update fn" = " zn9doris \ udf11Countupdateepns \ \ FunctionContexterkns \ Intvalepns bigintvale",
|
||||
"Merge fn"="\ zn9doris\\ udf10CountMergeepns\ \ FunctionContexterkns\ Bigintvaleps2\\\\\\\\\\\\\
|
||||
"Finalize \ fn" = "\ zn9doris \ udf13Count Finalizepns \\ FunctionContexterkns \ Bigintvale",
|
||||
"object" file ="http://host:port /libudasample.so"
|
||||
);
|
||||
```
|
||||
3. Create a custom aggregation function
|
||||
|
||||
```
|
||||
CREATE AGGREGATE FUNCTION my_count (BIGINT) RETURNS BIGINT PROPERTIES (
|
||||
"init_fn"= "_ZN9doris_udf6AddUdfEPNS_15FunctionContextERKNS_6IntValES4_",
|
||||
"update_fn" = "zn9dorisudf11CountupdateepnsFunctionContexterknsIntvalepnsbigintvale",
|
||||
"merge_fn" = "zn9dorisudf10CountMergeepnsFunctionContexterknsBigintvaleps2
|
||||
"finalize_fn" = "zn9dorisudf13CountFinalizepnsFunctionContexterknsBigintvale",
|
||||
"object_file" = "http://host:port/libudasample.so"
|
||||
);
|
||||
```
|
||||
##keyword
|
||||
CREATE,FUNCTION
|
||||
|
||||
@ -43,6 +43,8 @@ import java.util.Map;
|
||||
public class CreateFunctionStmt extends DdlStmt {
|
||||
public static final String OBJECT_FILE_KEY = "object_file";
|
||||
public static final String SYMBOL_KEY = "symbol";
|
||||
public static final String PREPARE_SYMBOL_KEY = "prepare_fn";
|
||||
public static final String CLOSE_SYMBOL_KEY = "close_fn";
|
||||
public static final String MD5_CHECKSUM = "md5";
|
||||
public static final String INIT_KEY = "init_fn";
|
||||
public static final String UPDATE_KEY = "update_fn";
|
||||
@ -177,10 +179,12 @@ public class CreateFunctionStmt extends DdlStmt {
|
||||
if (Strings.isNullOrEmpty(symbol)) {
|
||||
throw new AnalysisException("No 'symbol' in properties");
|
||||
}
|
||||
String prepareFnSymbol = properties.get(PREPARE_SYMBOL_KEY);
|
||||
String closeFnSymbol = properties.get(CLOSE_SYMBOL_KEY);
|
||||
function = ScalarFunction.createUdf(
|
||||
functionName, argsDef.getArgTypes(),
|
||||
returnType.getType(), argsDef.isVariadic(),
|
||||
objectFile, symbol);
|
||||
objectFile, symbol, prepareFnSymbol, closeFnSymbol);
|
||||
function.setChecksum(checksum);
|
||||
}
|
||||
|
||||
|
||||
@ -233,11 +233,13 @@ public class ScalarFunction extends Function {
|
||||
public static ScalarFunction createUdf(
|
||||
FunctionName name, Type[] args,
|
||||
Type returnType, boolean isVariadic,
|
||||
String objectFile, String symbol) {
|
||||
String objectFile, String symbol, String prepareFnSymbol, String closeFnSymbol) {
|
||||
ScalarFunction fn = new ScalarFunction(name, args, returnType, isVariadic);
|
||||
fn.setBinaryType(TFunctionBinaryType.HIVE);
|
||||
fn.setUserVisible(true);
|
||||
fn.symbolName = symbol;
|
||||
fn.prepareFnSymbol = prepareFnSymbol;
|
||||
fn.closeFnSymbol = closeFnSymbol;
|
||||
fn.setLocation(new HdfsURI(objectFile));
|
||||
return fn;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user