[enhancement] Function(create/drop) support the global operation (#16973) (#17608)

Support create/drop global function. 
     When you create a custom function, it can only be used within in one database. It cannot be used in other database/catalog. When there are many databases/catalog, it needs to create function one by one.

## Problem summary

Describe your changes.
1、 When a function is created or deleted, add the global keyword.

CREATE [GLOBAL] [AGGREGATE] [ALIAS] FUNCTION function_name (arg_type [, ...]) [RETURNS ret_type] [INTERMEDIATE inter_type] [WITH PARAMETER(param [,...]) AS origin_function] [PROPERTIES ("key" = "value" [, ...]) ]

DROP [GLOBAL] FUNCTION function_name (arg_type [, ...])

2、A completely global global function is set, and the global function metadata is stored in the image. The function lookup strategy is to look in the database first, and if it can't be found, it looks in the global function.
Co-authored-by: lexluo <lexluo@tencent.com>
This commit is contained in:
lexluo09
2023-03-18 22:06:48 +08:00
committed by GitHub
parent 49a053b3da
commit c95eb8a67f
23 changed files with 677 additions and 167 deletions

View File

@ -39,7 +39,7 @@ If `function_name` contains the database name, then the custom function will be
grammar:
```sql
CREATE [AGGREGATE] [ALIAS] FUNCTION function_name
CREATE [GLOBAL] [AGGREGATE] [ALIAS] FUNCTION function_name
(arg_type [, ...])
[RETURNS ret_type]
[INTERMEDIATE inter_type]
@ -49,6 +49,8 @@ CREATE [AGGREGATE] [ALIAS] FUNCTION function_name
Parameter Description:
- `GLOBAL`: If there is this item, it means that the created function is a global function.
- `AGGREGATE`: If there is this item, it means that the created function is an aggregate function.
@ -147,6 +149,21 @@ Parameter Description:
CREATE ALIAS FUNCTION id_masking(INT) WITH PARAMETER(id) AS CONCAT(LEFT(id, 3), '****', RIGHT(id, 4));
````
6. Create a global custom scalar function
```sql
CREATE GLOBAL FUNCTION my_add(INT, INT) RETURNS INT PROPERTIES (
"symbol" = "_ZN9doris_udf6AddUdfEPNS_15FunctionContextERKNS_6IntValES4_",
"object_file" = "http://host:port/libmyadd.so"
);
````
7. Create a global custom alias function
```sql
CREATE GLOBAL ALIAS FUNCTION id_masking(INT) WITH PARAMETER(id) AS CONCAT(LEFT(id, 3), '****', RIGHT(id, 4));
````
### Keywords
CREATE, FUNCTION

View File

@ -37,7 +37,7 @@ Delete a custom function. Function names and parameter types are exactly the sam
grammar:
```sql
DROP FUNCTION function_name
DROP [GLOBAL] FUNCTION function_name
(arg_type [, ...])
````
@ -53,6 +53,11 @@ Parameter Description:
```sql
DROP FUNCTION my_add(INT, INT)
````
2. Delete a global function
```sql
DROP GLOBAL FUNCTION my_add(INT, INT)
````
### Keywords

View File

@ -39,7 +39,7 @@ CREATE FUNCTION
语法:
```sql
CREATE [AGGREGATE] [ALIAS] FUNCTION function_name
CREATE [GLOBAL] [AGGREGATE] [ALIAS] FUNCTION function_name
(arg_type [, ...])
[RETURNS ret_type]
[INTERMEDIATE inter_type]
@ -49,6 +49,8 @@ CREATE [AGGREGATE] [ALIAS] FUNCTION function_name
参数说明:
- `GLOBAL`: 如果有此项,表示的是创建的函数是全局范围内生效。
- `AGGREGATE`: 如果有此项,表示的是创建的函数是一个聚合函数。
@ -147,6 +149,21 @@ CREATE [AGGREGATE] [ALIAS] FUNCTION function_name
CREATE ALIAS FUNCTION id_masking(INT) WITH PARAMETER(id) AS CONCAT(LEFT(id, 3), '****', RIGHT(id, 4));
```
6. 创建一个全局自定义标量函数
```sql
CREATE GLOBAL FUNCTION my_add(INT, INT) RETURNS INT PROPERTIES (
"symbol" = "_ZN9doris_udf6AddUdfEPNS_15FunctionContextERKNS_6IntValES4_",
"object_file" = "http://host:port/libmyadd.so"
);
````
7. 创建一个全局自定义别名函数
```sql
CREATE GLOBAL ALIAS FUNCTION id_masking(INT) WITH PARAMETER(id) AS CONCAT(LEFT(id, 3), '****', RIGHT(id, 4));
````
### Keywords
CREATE, FUNCTION

View File

@ -37,7 +37,7 @@ DROP FUNCTION
语法:
```sql
DROP FUNCTION function_name
DROP [GLOBAL] FUNCTION function_name
(arg_type [, ...])
```
@ -53,6 +53,11 @@ DROP FUNCTION function_name
```sql
DROP FUNCTION my_add(INT, INT)
```
2. 删除掉一个全局函数
```sql
DROP GLOBAL FUNCTION my_add(INT, INT)
````
### Keywords

View File

@ -29,8 +29,8 @@ import java.io.IOException;
* Class A implements Writable {
* @Override
* public void write(DataOutput out) throws IOException {
* in.write(x);
* in.write(y);
* out.write(x);
* out.write(y);
* ...
* }
*

View File

@ -1795,15 +1795,15 @@ create_stmt ::=
RESULT = new CreateClusterStmt(name, properties, password);
:}*/
/* Function */
| KW_CREATE opt_aggregate:isAggregate KW_FUNCTION opt_if_not_exists:ifNotExists function_name:functionName LPAREN func_args_def:args RPAREN
| KW_CREATE opt_var_type:type opt_aggregate:isAggregate KW_FUNCTION opt_if_not_exists:ifNotExists function_name:functionName LPAREN func_args_def:args RPAREN
KW_RETURNS type_def:returnType opt_intermediate_type:intermediateType opt_properties:properties
{:
RESULT = new CreateFunctionStmt(ifNotExists, isAggregate, functionName, args, returnType, intermediateType, properties);
RESULT = new CreateFunctionStmt(type, ifNotExists, isAggregate, functionName, args, returnType, intermediateType, properties);
:}
| KW_CREATE KW_ALIAS KW_FUNCTION opt_if_not_exists:ifNotExists function_name:functionName LPAREN func_args_def:args RPAREN
| KW_CREATE opt_var_type:type KW_ALIAS KW_FUNCTION opt_if_not_exists:ifNotExists function_name:functionName LPAREN func_args_def:args RPAREN
KW_WITH KW_PARAMETER LPAREN ident_list:parameters RPAREN KW_AS expr:func
{:
RESULT = new CreateFunctionStmt(ifNotExists, functionName, args, parameters, func);
RESULT = new CreateFunctionStmt(type, ifNotExists, functionName, args, parameters, func);
:}
/* Table */
| KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists table_name:name KW_LIKE table_name:existed_name KW_WITH KW_ROLLUP LPAREN ident_list:rollupNames RPAREN
@ -2904,9 +2904,9 @@ drop_stmt ::=
RESULT = new DropClusterStmt(ifExists, cluster);
:}
/* Function */
| KW_DROP KW_FUNCTION opt_if_exists:ifExists function_name:functionName LPAREN func_args_def:args RPAREN
| KW_DROP opt_var_type:type KW_FUNCTION opt_if_exists:ifExists function_name:functionName LPAREN func_args_def:args RPAREN
{:
RESULT = new DropFunctionStmt(ifExists, functionName, args);
RESULT = new DropFunctionStmt(type, ifExists, functionName, args);
:}
/* Table */
| KW_DROP KW_TABLE opt_if_exists:ifExists table_name:name opt_force:force

View File

@ -95,6 +95,7 @@ public class CreateFunctionStmt extends DdlStmt {
public static final String IS_RETURN_NULL = "always_nullable";
private static final Logger LOG = LogManager.getLogger(CreateFunctionStmt.class);
private SetType type = SetType.DEFAULT;
private final boolean ifNotExists;
private final FunctionName functionName;
private final boolean isAggregate;
@ -119,9 +120,10 @@ public class CreateFunctionStmt extends DdlStmt {
// timeout for both connection and read. 10 seconds is long enough.
private static final int HTTP_TIMEOUT_MS = 10000;
public CreateFunctionStmt(boolean ifNotExists, boolean isAggregate, FunctionName functionName,
FunctionArgsDef argsDef,
TypeDef returnType, TypeDef intermediateType, Map<String, String> properties) {
public CreateFunctionStmt(SetType type, boolean ifNotExists, boolean isAggregate, FunctionName functionName,
FunctionArgsDef argsDef,
TypeDef returnType, TypeDef intermediateType, Map<String, String> properties) {
this.type = type;
this.ifNotExists = ifNotExists;
this.functionName = functionName;
this.isAggregate = isAggregate;
@ -138,8 +140,9 @@ public class CreateFunctionStmt extends DdlStmt {
this.originFunction = null;
}
public CreateFunctionStmt(boolean ifNotExists, FunctionName functionName, FunctionArgsDef argsDef,
public CreateFunctionStmt(SetType type, boolean ifNotExists, FunctionName functionName, FunctionArgsDef argsDef,
List<String> parameters, Expr originFunction) {
this.type = type;
this.ifNotExists = ifNotExists;
this.functionName = functionName;
this.isAlias = true;
@ -155,6 +158,10 @@ public class CreateFunctionStmt extends DdlStmt {
this.properties = ImmutableSortedMap.of();
}
public SetType getType() {
return type;
}
public boolean isIfNotExists() {
return ifNotExists;
}
@ -205,7 +212,7 @@ public class CreateFunctionStmt extends DdlStmt {
private void analyzeCommon(Analyzer analyzer) throws AnalysisException {
// check function name
functionName.analyze(analyzer);
functionName.analyze(analyzer, this.type);
// check operation privilege
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {

View File

@ -29,16 +29,22 @@ public class DropFunctionStmt extends DdlStmt {
private final boolean ifExists;
private final FunctionName functionName;
private final FunctionArgsDef argsDef;
private SetType type = SetType.DEFAULT;
// set after analyzed
private FunctionSearchDesc function;
public DropFunctionStmt(boolean ifExists, FunctionName functionName, FunctionArgsDef argsDef) {
public DropFunctionStmt(SetType type, boolean ifExists, FunctionName functionName, FunctionArgsDef argsDef) {
this.type = type;
this.ifExists = ifExists;
this.functionName = functionName;
this.argsDef = argsDef;
}
public SetType getType() {
return type;
}
public boolean isIfExists() {
return ifExists;
}
@ -56,7 +62,7 @@ public class DropFunctionStmt extends DdlStmt {
super.analyze(analyzer);
// analyze function name
functionName.analyze(analyzer);
functionName.analyze(analyzer, this.type);
// check operation privilege
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {

View File

@ -1327,6 +1327,13 @@ public class FunctionCallExpr extends Expr {
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
}
}
// find from the internal database first, if not, then from the global functions
if (fn == null) {
Function searchDesc =
new Function(fnName, Arrays.asList(collectChildReturnTypes()), Type.INVALID, false);
fn = Env.getCurrentEnv().getGlobalFunctionMgr().getFunction(searchDesc,
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
}
}
}
}

View File

@ -135,7 +135,7 @@ public class FunctionName implements Writable {
return db;
}
public void analyze(Analyzer analyzer) throws AnalysisException {
public void analyze(Analyzer analyzer, SetType type) throws AnalysisException {
if (fn.length() == 0) {
throw new AnalysisException("Function name can not be empty.");
}
@ -150,7 +150,7 @@ public class FunctionName implements Writable {
}
if (db == null) {
db = analyzer.getDefaultDb();
if (Strings.isNullOrEmpty(db)) {
if (Strings.isNullOrEmpty(db) && type != SetType.GLOBAL) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
} else {

View File

@ -76,7 +76,7 @@ public class ShowCreateFunctionStmt extends ShowStmt {
}
// analyze function name
functionName.analyze(analyzer);
functionName.analyze(analyzer, SetType.DEFAULT);
// check operation privilege
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), dbName, PrivPredicate.SHOW)) {

View File

@ -51,7 +51,8 @@ public class ShowFunctionsStmt extends ShowStmt {
private Expr expr;
public ShowFunctionsStmt(String dbName, boolean isBuiltin, boolean isVerbose, String wild, Expr expr) {
public ShowFunctionsStmt(String dbName, boolean isBuiltin, boolean isVerbose, String wild,
Expr expr) {
this.dbName = dbName;
this.isBuiltin = isBuiltin;
this.isVerbose = isVerbose;

View File

@ -567,14 +567,7 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
Text.writeString(out, attachDbName);
// write functions
out.writeInt(name2Function.size());
for (Entry<String, ImmutableList<Function>> entry : name2Function.entrySet()) {
Text.writeString(out, entry.getKey());
out.writeInt(entry.getValue().size());
for (Function function : entry.getValue()) {
function.write(out);
}
}
FunctionUtil.write(out, name2Function);
// write encryptKeys
dbEncryptKey.write(out);
@ -606,17 +599,7 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
dbState = DbState.valueOf(Text.readString(in));
attachDbName = Text.readString(in);
int numEntries = in.readInt();
for (int i = 0; i < numEntries; ++i) {
String name = Text.readString(in);
ImmutableList.Builder<Function> builder = ImmutableList.builder();
int numFunctions = in.readInt();
for (int j = 0; j < numFunctions; ++j) {
builder.add(Function.read(in));
}
name2Function.put(name, builder.build());
}
FunctionUtil.readFields(in, name2Function);
// read encryptKeys
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_102) {
@ -695,140 +678,43 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
public synchronized void addFunction(Function function, boolean ifNotExists) throws UserException {
function.checkWritable();
if (addFunctionImpl(function, ifNotExists, false)) {
if (FunctionUtil.addFunctionImpl(function, ifNotExists, false, name2Function)) {
Env.getCurrentEnv().getEditLog().logAddFunction(function);
}
}
public synchronized void replayAddFunction(Function function) {
try {
addFunctionImpl(function, false, true);
FunctionUtil.addFunctionImpl(function, false, true, name2Function);
} catch (UserException e) {
throw new RuntimeException(e);
}
}
/**
* @param function
* @param ifNotExists
* @param isReplay
* @return return true if we do add the function, otherwise, return false.
* @throws UserException
*/
private boolean addFunctionImpl(Function function, boolean ifNotExists, boolean isReplay) throws UserException {
String functionName = function.getFunctionName().getFunction();
List<Function> existFuncs = name2Function.get(functionName);
if (!isReplay) {
if (existFuncs != null) {
for (Function existFunc : existFuncs) {
if (function.compare(existFunc, Function.CompareMode.IS_IDENTICAL)) {
if (ifNotExists) {
LOG.debug("function already exists");
return false;
}
throw new UserException("function already exists");
}
}
}
// Get function id for this UDF, use CatalogIdGenerator. Only get function id
// when isReplay is false
long functionId = Env.getCurrentEnv().getNextId();
function.setId(functionId);
}
ImmutableList.Builder<Function> builder = ImmutableList.builder();
if (existFuncs != null) {
builder.addAll(existFuncs);
}
builder.add(function);
name2Function.put(functionName, builder.build());
return true;
}
public synchronized void dropFunction(FunctionSearchDesc function, boolean ifExists) throws UserException {
if (dropFunctionImpl(function, ifExists)) {
if (FunctionUtil.dropFunctionImpl(function, ifExists, name2Function)) {
Env.getCurrentEnv().getEditLog().logDropFunction(function);
}
}
public synchronized void replayDropFunction(FunctionSearchDesc functionSearchDesc) {
try {
dropFunctionImpl(functionSearchDesc, false);
FunctionUtil.dropFunctionImpl(functionSearchDesc, false, name2Function);
} catch (UserException e) {
throw new RuntimeException(e);
}
}
/**
* @param function
* @param ifExists
* @return return true if we do drop the function, otherwise, return false.
* @throws UserException
*/
private boolean dropFunctionImpl(FunctionSearchDesc function, boolean ifExists) throws UserException {
String functionName = function.getName().getFunction();
List<Function> existFuncs = name2Function.get(functionName);
if (existFuncs == null) {
if (ifExists) {
LOG.debug("function name does not exist: " + functionName);
return false;
}
throw new UserException("function name does not exist: " + functionName);
}
boolean isFound = false;
ImmutableList.Builder<Function> builder = ImmutableList.builder();
for (Function existFunc : existFuncs) {
if (function.isIdentical(existFunc)) {
isFound = true;
} else {
builder.add(existFunc);
}
}
if (!isFound) {
if (ifExists) {
LOG.debug("function does not exist: " + function);
return false;
}
throw new UserException("function does not exist: " + function);
}
ImmutableList<Function> newFunctions = builder.build();
if (newFunctions.isEmpty()) {
name2Function.remove(functionName);
} else {
name2Function.put(functionName, newFunctions);
}
return true;
}
public synchronized Function getFunction(Function desc, Function.CompareMode mode) {
List<Function> fns = name2Function.get(desc.getFunctionName().getFunction());
if (fns == null) {
return null;
}
return Function.getFunction(fns, desc, mode);
return FunctionUtil.getFunction(desc, mode, name2Function);
}
public synchronized Function getFunction(FunctionSearchDesc function) throws AnalysisException {
String functionName = function.getName().getFunction();
List<Function> existFuncs = name2Function.get(functionName);
if (existFuncs == null) {
throw new AnalysisException("Unknown function, function=" + function.toString());
}
for (Function existFunc : existFuncs) {
if (function.isIdentical(existFunc)) {
return existFunc;
}
}
throw new AnalysisException("Unknown function, function=" + function.toString());
return FunctionUtil.getFunction(function, name2Function);
}
public synchronized List<Function> getFunctions() {
List<Function> functions = Lists.newArrayList();
for (Map.Entry<String, ImmutableList<Function>> entry : name2Function.entrySet()) {
functions.addAll(entry.getValue());
}
return functions;
return FunctionUtil.getFunctions(name2Function);
}
public boolean isInfoSchemaDb() {

View File

@ -75,6 +75,7 @@ import org.apache.doris.analysis.RefreshMaterializedViewStmt;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.RollupRenameClause;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.ShowAlterStmt.AlterType;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableRenameClause;
@ -306,6 +307,7 @@ public class Env {
private QueryableReentrantLock lock;
private CatalogMgr catalogMgr;
private GlobalFunctionMgr globalFunctionMgr;
private Load load;
private LoadManager loadManager;
private StreamLoadRecordMgr streamLoadRecordMgr;
@ -646,6 +648,7 @@ public class Env {
if (!isCheckpointCatalog) {
this.analysisManager = new AnalysisManager();
}
this.globalFunctionMgr = new GlobalFunctionMgr();
}
public static void destroyCheckpoint() {
@ -1921,6 +1924,15 @@ public class Env {
return checksum;
}
/**
* Load global function.
**/
public long loadGlobalFunction(DataInputStream in, long checksum) throws IOException {
this.globalFunctionMgr = GlobalFunctionMgr.read(in);
LOG.info("finished replay global function from image");
return checksum;
}
// Only called by checkpoint thread
// return the latest image file's absolute path
public String saveImage() throws IOException {
@ -2158,6 +2170,12 @@ public class Env {
return checksum;
}
public long saveGlobalFunction(CountingDataOutputStream out, long checksum) throws IOException {
this.globalFunctionMgr.write(out);
LOG.info("Save global function to image");
return checksum;
}
public void createLabelCleaner() {
labelCleaner = new MasterDaemon("LoadLabelCleaner", Config.label_clean_interval_second * 1000L) {
@Override
@ -4807,9 +4825,12 @@ public class Env {
}
public void createFunction(CreateFunctionStmt stmt) throws UserException {
FunctionName name = stmt.getFunctionName();
Database db = getInternalCatalog().getDbOrDdlException(name.getDb());
db.addFunction(stmt.getFunction(), stmt.isIfNotExists());
if (SetType.GLOBAL.equals(stmt.getType())) {
globalFunctionMgr.addFunction(stmt.getFunction(), stmt.isIfNotExists());
} else {
Database db = getInternalCatalog().getDbOrDdlException(stmt.getFunctionName().getDb());
db.addFunction(stmt.getFunction(), stmt.isIfNotExists());
}
}
public void replayCreateFunction(Function function) throws MetaNotFoundException {
@ -4818,10 +4839,18 @@ public class Env {
db.replayAddFunction(function);
}
public void replayCreateGlobalFunction(Function function) {
globalFunctionMgr.replayAddFunction(function);
}
public void dropFunction(DropFunctionStmt stmt) throws UserException {
FunctionName name = stmt.getFunctionName();
Database db = getInternalCatalog().getDbOrDdlException(name.getDb());
db.dropFunction(stmt.getFunction(), stmt.isIfExists());
if (SetType.GLOBAL.equals(stmt.getType())) {
globalFunctionMgr.dropFunction(stmt.getFunction(), stmt.isIfExists());
} else {
Database db = getInternalCatalog().getDbOrDdlException(name.getDb());
db.dropFunction(stmt.getFunction(), stmt.isIfExists());
}
}
public void replayDropFunction(FunctionSearchDesc functionSearchDesc) throws MetaNotFoundException {
@ -4830,6 +4859,10 @@ public class Env {
db.replayDropFunction(functionSearchDesc);
}
public void replayDropGlobalFunction(FunctionSearchDesc functionSearchDesc) {
globalFunctionMgr.replayDropFunction(functionSearchDesc);
}
public void setConfig(AdminSetConfigStmt stmt) throws DdlException {
Map<String, String> configs = stmt.getConfigs();
Preconditions.checkState(configs.size() == 1);
@ -5287,4 +5320,8 @@ public class Env {
return analysisManager;
}
public GlobalFunctionMgr getGlobalFunctionMgr() {
return globalFunctionMgr;
}
}

View File

@ -0,0 +1,185 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
/**
* function util.
*/
public class FunctionUtil {
private static final Logger LOG = LogManager.getLogger(FunctionUtil.class);
/**
* @param function
* @param ifExists
* @param name2Function
* @return return true if we do drop the function, otherwise, return false.
* @throws UserException
*/
public static boolean dropFunctionImpl(FunctionSearchDesc function, boolean ifExists,
ConcurrentMap<String, ImmutableList<Function>> name2Function) throws UserException {
String functionName = function.getName().getFunction();
List<Function> existFuncs = name2Function.get(functionName);
if (existFuncs == null) {
if (ifExists) {
LOG.debug("function name does not exist: " + functionName);
return false;
}
throw new UserException("function name does not exist: " + functionName);
}
boolean isFound = false;
ImmutableList.Builder<Function> builder = ImmutableList.builder();
for (Function existFunc : existFuncs) {
if (function.isIdentical(existFunc)) {
isFound = true;
} else {
builder.add(existFunc);
}
}
if (!isFound) {
if (ifExists) {
LOG.debug("function does not exist: " + function);
return false;
}
throw new UserException("function does not exist: " + function);
}
ImmutableList<Function> newFunctions = builder.build();
if (newFunctions.isEmpty()) {
name2Function.remove(functionName);
} else {
name2Function.put(functionName, newFunctions);
}
return true;
}
/**
* @param function
* @param ifNotExists
* @param isReplay
* @param name2Function
* @return return true if we do add the function, otherwise, return false.
* @throws UserException
*/
public static boolean addFunctionImpl(Function function, boolean ifNotExists, boolean isReplay,
ConcurrentMap<String, ImmutableList<Function>> name2Function) throws UserException {
String functionName = function.getFunctionName().getFunction();
List<Function> existFuncs = name2Function.get(functionName);
if (!isReplay) {
if (existFuncs != null) {
for (Function existFunc : existFuncs) {
if (function.compare(existFunc, Function.CompareMode.IS_IDENTICAL)) {
if (ifNotExists) {
LOG.debug("function already exists");
return false;
}
throw new UserException("function already exists");
}
}
}
// Get function id for this UDF, use CatalogIdGenerator. Only get function id
// when isReplay is false
long functionId = Env.getCurrentEnv().getNextId();
function.setId(functionId);
}
ImmutableList.Builder<Function> builder = ImmutableList.builder();
if (existFuncs != null) {
builder.addAll(existFuncs);
}
builder.add(function);
name2Function.put(functionName, builder.build());
return true;
}
public static Function getFunction(FunctionSearchDesc function,
ConcurrentMap<String, ImmutableList<Function>> name2Function) throws AnalysisException {
String functionName = function.getName().getFunction();
List<Function> existFuncs = name2Function.get(functionName);
if (existFuncs == null) {
throw new AnalysisException("Unknown function, function=" + function.toString());
}
for (Function existFunc : existFuncs) {
if (function.isIdentical(existFunc)) {
return existFunc;
}
}
throw new AnalysisException("Unknown function, function=" + function.toString());
}
public static List<Function> getFunctions(ConcurrentMap<String, ImmutableList<Function>> name2Function) {
List<Function> functions = Lists.newArrayList();
for (Map.Entry<String, ImmutableList<Function>> entry : name2Function.entrySet()) {
functions.addAll(entry.getValue());
}
return functions;
}
public static Function getFunction(Function desc, Function.CompareMode mode,
ConcurrentMap<String, ImmutableList<Function>> name2Function) {
List<Function> fns = name2Function.get(desc.getFunctionName().getFunction());
if (fns == null) {
return null;
}
return Function.getFunction(fns, desc, mode);
}
public static void write(DataOutput out, ConcurrentMap<String, ImmutableList<Function>> name2Function)
throws IOException {
// write functions
out.writeInt(name2Function.size());
for (Entry<String, ImmutableList<Function>> entry : name2Function.entrySet()) {
Text.writeString(out, entry.getKey());
out.writeInt(entry.getValue().size());
for (Function function : entry.getValue()) {
function.write(out);
}
}
}
public static void readFields(DataInput in, ConcurrentMap<String, ImmutableList<Function>> name2Function)
throws IOException {
int numEntries = in.readInt();
for (int i = 0; i < numEntries; ++i) {
String name = Text.readString(in);
ImmutableList.Builder<Function> builder = ImmutableList.builder();
int numFunctions = in.readInt();
for (int j = 0; j < numFunctions; ++j) {
builder.add(Function.read(in));
}
name2Function.put(name, builder.build());
}
}
}

View File

@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
/**
* GlobalFunctionMgr will load all global functions at FE startup.
* Provides management of global functions such as add, drop and other operations
*/
public class GlobalFunctionMgr extends MetaObject {
// user define function
private ConcurrentMap<String, ImmutableList<Function>> name2Function = Maps.newConcurrentMap();
public static GlobalFunctionMgr read(DataInput in) throws IOException {
GlobalFunctionMgr globalFunctionMgr = new GlobalFunctionMgr();
globalFunctionMgr.readFields(in);
return globalFunctionMgr;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
// write functions
FunctionUtil.write(out, name2Function);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
FunctionUtil.readFields(in, name2Function);
}
public synchronized void addFunction(Function function, boolean ifNotExists) throws UserException {
function.checkWritable();
if (FunctionUtil.addFunctionImpl(function, ifNotExists, false, name2Function)) {
Env.getCurrentEnv().getEditLog().logAddGlobalFunction(function);
}
}
public synchronized void replayAddFunction(Function function) {
try {
FunctionUtil.addFunctionImpl(function, false, true, name2Function);
} catch (UserException e) {
throw new RuntimeException(e);
}
}
public synchronized void dropFunction(FunctionSearchDesc function, boolean ifExists) throws UserException {
if (FunctionUtil.dropFunctionImpl(function, ifExists, name2Function)) {
Env.getCurrentEnv().getEditLog().logDropGlobalFunction(function);
}
}
public synchronized void replayDropFunction(FunctionSearchDesc functionSearchDesc) {
try {
FunctionUtil.dropFunctionImpl(functionSearchDesc, false, name2Function);
} catch (UserException e) {
throw new RuntimeException(e);
}
}
public synchronized Function getFunction(Function desc, Function.CompareMode mode) {
return FunctionUtil.getFunction(desc, mode, name2Function);
}
public synchronized Function getFunction(FunctionSearchDesc function) throws AnalysisException {
return FunctionUtil.getFunction(function, name2Function);
}
public synchronized List<Function> getFunctions() {
return FunctionUtil.getFunctions(name2Function);
}
}

View File

@ -505,6 +505,16 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_ADD_GLOBAL_FUNCTION: {
data = Function.read(in);
isRead = true;
break;
}
case OperationType.OP_DROP_GLOBAL_FUNCTION: {
data = FunctionSearchDesc.read(in);
isRead = true;
break;
}
case OperationType.OP_CREATE_ENCRYPTKEY: {
data = EncryptKey.read(in);
isRead = true;

View File

@ -628,6 +628,16 @@ public class EditLog {
Env.getCurrentEnv().replayDropFunction(function);
break;
}
case OperationType.OP_ADD_GLOBAL_FUNCTION: {
final Function function = (Function) journal.getData();
Env.getCurrentEnv().replayCreateGlobalFunction(function);
break;
}
case OperationType.OP_DROP_GLOBAL_FUNCTION: {
FunctionSearchDesc function = (FunctionSearchDesc) journal.getData();
Env.getCurrentEnv().replayDropGlobalFunction(function);
break;
}
case OperationType.OP_CREATE_ENCRYPTKEY: {
final EncryptKey encryptKey = (EncryptKey) journal.getData();
EncryptKeyHelper.replayCreateEncryptKey(encryptKey);
@ -1461,10 +1471,18 @@ public class EditLog {
logEdit(OperationType.OP_ADD_FUNCTION, function);
}
public void logAddGlobalFunction(Function function) {
logEdit(OperationType.OP_ADD_GLOBAL_FUNCTION, function);
}
public void logDropFunction(FunctionSearchDesc function) {
logEdit(OperationType.OP_DROP_FUNCTION, function);
}
public void logDropGlobalFunction(FunctionSearchDesc function) {
logEdit(OperationType.OP_DROP_GLOBAL_FUNCTION, function);
}
public void logAddEncryptKey(EncryptKey encryptKey) {
logEdit(OperationType.OP_CREATE_ENCRYPTKEY, encryptKey);
}

View File

@ -180,6 +180,8 @@ public class OperationType {
// UDF 130-140
public static final short OP_ADD_FUNCTION = 130;
public static final short OP_DROP_FUNCTION = 131;
public static final short OP_ADD_GLOBAL_FUNCTION = 132;
public static final short OP_DROP_GLOBAL_FUNCTION = 133;
// routine load 200
public static final short OP_CREATE_ROUTINE_LOAD_JOB = 200;

View File

@ -203,6 +203,12 @@ public class MetaPersistMethod {
metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveMTMVJobManager",
CountingDataOutputStream.class, long.class);
break;
case "globalFunction":
metaPersistMethod.readMethod = Env.class.getDeclaredMethod("loadGlobalFunction", DataInputStream.class,
long.class);
metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveGlobalFunction",
CountingDataOutputStream.class, long.class);
break;
default:
break;
}

View File

@ -38,7 +38,7 @@ public class PersistMetaModules {
"masterInfo", "frontends", "backends", "datasource", "db", "alterJob", "recycleBin",
"globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler",
"paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles",
"plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager");
"plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "globalFunction");
static {
MODULES_MAP = Maps.newHashMap();

View File

@ -74,15 +74,10 @@ public class CreateFunctionTest {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
// create database db1
String createDbStmtStr = "create database db1;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
Env.getCurrentEnv().createDb(createDbStmt);
System.out.println(Env.getCurrentInternalCatalog().getDbNames());
createDatabase(ctx, "create database db1;");
String createTblStmtStr = "create table db1.tbl1(k1 int, k2 bigint, k3 varchar(10), k4 char(5)) duplicate key(k1) "
+ "distributed by hash(k2) buckets 1 properties('replication_num' = '1');";
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, connectContext);
Env.getCurrentEnv().createTable(createTableStmt);
createTable("create table db1.tbl1(k1 int, k2 bigint, k3 varchar(10), k4 char(5)) duplicate key(k1) "
+ "distributed by hash(k2) buckets 1 properties('replication_num' = '1');");
dorisAssert = new DorisAssert();
dorisAssert.useDatabase("db1");
@ -91,8 +86,10 @@ public class CreateFunctionTest {
Assert.assertNotNull(db);
// create alias function
String createFuncStr = "create alias function db1.id_masking(bigint) with parameter(id) as concat(left(id,3),'****',right(id,4));";
CreateFunctionStmt createFunctionStmt = (CreateFunctionStmt) UtFrameUtils.parseAndAnalyzeStmt(createFuncStr, ctx);
String createFuncStr
= "create alias function db1.id_masking(bigint) with parameter(id) as concat(left(id,3),'****',right(id,4));";
CreateFunctionStmt createFunctionStmt = (CreateFunctionStmt) UtFrameUtils.parseAndAnalyzeStmt(createFuncStr,
ctx);
Env.getCurrentEnv().createFunction(createFunctionStmt);
List<Function> functions = db.getFunctions();
@ -107,14 +104,15 @@ public class CreateFunctionTest {
Assert.assertEquals(1, planner.getFragments().size());
PlanFragment fragment = planner.getFragments().get(0);
Assert.assertTrue(fragment.getPlanRoot() instanceof UnionNode);
UnionNode unionNode = (UnionNode) fragment.getPlanRoot();
UnionNode unionNode = (UnionNode) fragment.getPlanRoot();
List<List<Expr>> constExprLists = Deencapsulation.getField(unionNode, "constExprLists");
Assert.assertEquals(1, constExprLists.size());
Assert.assertEquals(1, constExprLists.get(0).size());
Assert.assertTrue(constExprLists.get(0).get(0) instanceof FunctionCallExpr);
queryStr = "select db1.id_masking(k1) from db1.tbl1";
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("concat(left(`k1`, 3), '****', right(`k1`, 4))"));
Assert.assertTrue(
dorisAssert.query(queryStr).explainQuery().contains("concat(left(`k1`, 3), '****', right(`k1`, 4))"));
// create alias function with cast
// cast any type to decimal with specific precision and scale
@ -135,7 +133,7 @@ public class CreateFunctionTest {
Assert.assertEquals(1, planner.getFragments().size());
fragment = planner.getFragments().get(0);
Assert.assertTrue(fragment.getPlanRoot() instanceof UnionNode);
unionNode = (UnionNode) fragment.getPlanRoot();
unionNode = (UnionNode) fragment.getPlanRoot();
constExprLists = Deencapsulation.getField(unionNode, "constExprLists");
System.out.println(constExprLists.get(0).get(0));
Assert.assertTrue(constExprLists.get(0).get(0) instanceof StringLiteral);
@ -165,7 +163,7 @@ public class CreateFunctionTest {
Assert.assertEquals(1, planner.getFragments().size());
fragment = planner.getFragments().get(0);
Assert.assertTrue(fragment.getPlanRoot() instanceof UnionNode);
unionNode = (UnionNode) fragment.getPlanRoot();
unionNode = (UnionNode) fragment.getPlanRoot();
constExprLists = Deencapsulation.getField(unionNode, "constExprLists");
Assert.assertEquals(1, constExprLists.size());
Assert.assertEquals(1, constExprLists.get(0).size());
@ -192,7 +190,7 @@ public class CreateFunctionTest {
Assert.assertEquals(1, planner.getFragments().size());
fragment = planner.getFragments().get(0);
Assert.assertTrue(fragment.getPlanRoot() instanceof UnionNode);
unionNode = (UnionNode) fragment.getPlanRoot();
unionNode = (UnionNode) fragment.getPlanRoot();
constExprLists = Deencapsulation.getField(unionNode, "constExprLists");
Assert.assertEquals(1, constExprLists.size());
Assert.assertEquals(1, constExprLists.get(0).size());
@ -201,4 +199,121 @@ public class CreateFunctionTest {
queryStr = "select db1.char(k1, 4) from db1.tbl1;";
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k1` AS CHARACTER)"));
}
@Test
public void testCreateGlobalFunction() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
// 1. create database db2
createDatabase(ctx, "create database db2;");
createTable("create table db2.tbl1(k1 int, k2 bigint, k3 varchar(10), k4 char(5)) duplicate key(k1) "
+ "distributed by hash(k2) buckets 1 properties('replication_num' = '1');");
dorisAssert = new DorisAssert();
dorisAssert.useDatabase("db2");
Database db = Env.getCurrentInternalCatalog().getDbNullable("default_cluster:db2");
Assert.assertNotNull(db);
// 2. create global function
String createFuncStr
= "create global alias function id_masking(bigint) with parameter(id) as concat(left(id,3),'****',right(id,4));";
CreateFunctionStmt createFunctionStmt = (CreateFunctionStmt) UtFrameUtils.parseAndAnalyzeStmt(createFuncStr,
ctx);
Env.getCurrentEnv().createFunction(createFunctionStmt);
List<Function> functions = Env.getCurrentEnv().getGlobalFunctionMgr().getFunctions();
Assert.assertEquals(1, functions.size());
String queryStr = "select id_masking(13888888888);";
testFunctionQuery(ctx, queryStr, false);
queryStr = "select id_masking(k1) from db2.tbl1";
Assert.assertTrue(
dorisAssert.query(queryStr).explainQuery().contains("concat(left(`k1`, 3), '****', right(`k1`, 4))"));
// 4. create alias function with cast
// cast any type to decimal with specific precision and scale
createFuncStr = "create global alias function decimal(all, int, int) with parameter(col, precision, scale)"
+ " as cast(col as decimal(precision, scale));";
createFunctionStmt = (CreateFunctionStmt) UtFrameUtils.parseAndAnalyzeStmt(createFuncStr, ctx);
Env.getCurrentEnv().createFunction(createFunctionStmt);
functions = Env.getCurrentEnv().getGlobalFunctionMgr().getFunctions();
Assert.assertEquals(2, functions.size());
queryStr = "select decimal(333, 4, 1);";
testFunctionQuery(ctx, queryStr, true);
queryStr = "select decimal(k3, 4, 1) from db2.tbl1;";
if (Config.enable_decimal_conversion) {
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k3` AS DECIMALV3(4,1))"));
} else {
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k3` AS DECIMAL(4,1))"));
}
// 5. cast any type to varchar with fixed length
createFuncStr = "create global alias function db2.varchar(all, int) with parameter(text, length) as "
+ "cast(text as varchar(length));";
createFunctionStmt = (CreateFunctionStmt) UtFrameUtils.parseAndAnalyzeStmt(createFuncStr, ctx);
Env.getCurrentEnv().createFunction(createFunctionStmt);
functions = Env.getCurrentEnv().getGlobalFunctionMgr().getFunctions();
Assert.assertEquals(3, functions.size());
queryStr = "select varchar(333, 4);";
testFunctionQuery(ctx, queryStr, true);
queryStr = "select varchar(k1, 4) from db2.tbl1;";
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k1` AS CHARACTER)"));
// 6. cast any type to char with fixed length
createFuncStr = "create global alias function db2.char(all, int) with parameter(text, length) as "
+ "cast(text as char(length));";
createFunctionStmt = (CreateFunctionStmt) UtFrameUtils.parseAndAnalyzeStmt(createFuncStr, ctx);
Env.getCurrentEnv().createFunction(createFunctionStmt);
functions = Env.getCurrentEnv().getGlobalFunctionMgr().getFunctions();
Assert.assertEquals(4, functions.size());
queryStr = "select char(333, 4);";
testFunctionQuery(ctx, queryStr, true);
queryStr = "select char(k1, 4) from db2.tbl1;";
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k1` AS CHARACTER)"));
}
private void testFunctionQuery(ConnectContext ctx, String queryStr, Boolean isStringLiteral) throws Exception {
ctx.getState().reset();
StmtExecutor stmtExecutor = new StmtExecutor(ctx, queryStr);
stmtExecutor.execute();
Assert.assertNotEquals(QueryState.MysqlStateType.ERR, ctx.getState().getStateType());
Planner planner = stmtExecutor.planner();
Assert.assertEquals(1, planner.getFragments().size());
PlanFragment fragment = planner.getFragments().get(0);
Assert.assertTrue(fragment.getPlanRoot() instanceof UnionNode);
UnionNode unionNode = (UnionNode) fragment.getPlanRoot();
List<List<Expr>> constExprLists = Deencapsulation.getField(unionNode, "constExprLists");
Assert.assertEquals(1, constExprLists.size());
Assert.assertEquals(1, constExprLists.get(0).size());
if (isStringLiteral) {
Assert.assertTrue(constExprLists.get(0).get(0) instanceof StringLiteral);
} else {
Assert.assertTrue(constExprLists.get(0).get(0) instanceof FunctionCallExpr);
}
}
private void createTable(String createTblStmtStr) throws Exception {
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr,
connectContext);
Env.getCurrentEnv().createTable(createTableStmt);
}
private void createDatabase(ConnectContext ctx, String createDbStmtStr) throws Exception {
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
Env.getCurrentEnv().createDb(createDbStmt);
System.out.println(Env.getCurrentInternalCatalog().getDbNames());
}
}

View File

@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.DropFunctionStmt;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.utframe.DorisAssert;
import org.apache.doris.utframe.UtFrameUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.util.List;
import java.util.UUID;
public class DropFunctionTest {
private static String runningDir = "fe/mocked/DropFunctionTest/" + UUID.randomUUID().toString() + "/";
private static ConnectContext connectContext;
private static DorisAssert dorisAssert;
@BeforeClass
public static void setup() throws Exception {
UtFrameUtils.createDorisCluster(runningDir);
FeConstants.runningUnitTest = true;
// create connect context
connectContext = UtFrameUtils.createDefaultCtx();
}
@AfterClass
public static void teardown() {
File file = new File("fe/mocked/DropFunctionTest/");
file.delete();
}
@Test
public void testDropGlobalFunction() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
// 1. create database db1
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt("create database db1;", ctx);
Env.getCurrentEnv().createDb(createDbStmt);
String createFuncStr
= "create global alias function id_masking(bigint) with parameter(id) as concat(left(id,3),'****',right(id,4));";
CreateFunctionStmt createFunctionStmt = (CreateFunctionStmt) UtFrameUtils.parseAndAnalyzeStmt(createFuncStr,
ctx);
Env.getCurrentEnv().createFunction(createFunctionStmt);
List<Function> functions = Env.getCurrentEnv().getGlobalFunctionMgr().getFunctions();
Assert.assertEquals(1, functions.size());
// drop global function
String dropFuncStr = "drop global function id_masking(bigint)";
DropFunctionStmt dropFunctionStmt = (DropFunctionStmt) UtFrameUtils.parseAndAnalyzeStmt(dropFuncStr, ctx);
Env.getCurrentEnv().dropFunction(dropFunctionStmt);
functions = Env.getCurrentEnv().getGlobalFunctionMgr().getFunctions();
Assert.assertEquals(0, functions.size());
}
}