@ -95,8 +95,9 @@ public:
|
||||
{
|
||||
TJavaUdfExecutorCtorParams ctor_params;
|
||||
ctor_params.__set_fn(fn);
|
||||
ctor_params.__set_location(local_location);
|
||||
|
||||
if (!fn.hdfs_location.empty() && !fn.checksum.empty()) {
|
||||
ctor_params.__set_location(local_location);
|
||||
}
|
||||
jbyteArray ctor_params_bytes;
|
||||
|
||||
// Pushed frame will be popped when jni_frame goes out-of-scope.
|
||||
@ -274,7 +275,13 @@ public:
|
||||
//So need to check as soon as possible, before call Data function
|
||||
Status check_udaf(const TFunction& fn) {
|
||||
auto function_cache = UserFunctionCache::instance();
|
||||
return function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum, &_local_location);
|
||||
// get jar path if both file path location and checksum are null
|
||||
if (!fn.hdfs_location.empty() && !fn.checksum.empty()) {
|
||||
return function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
|
||||
&_local_location);
|
||||
} else {
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
void create(AggregateDataPtr __restrict place) const override {
|
||||
|
||||
@ -55,11 +55,15 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
|
||||
{
|
||||
std::string local_location;
|
||||
auto function_cache = UserFunctionCache::instance();
|
||||
RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum,
|
||||
&local_location));
|
||||
TJavaUdfExecutorCtorParams ctor_params;
|
||||
ctor_params.__set_fn(fn_);
|
||||
ctor_params.__set_location(local_location);
|
||||
// get jar path if both file path location and checksum are null
|
||||
if (!fn_.hdfs_location.empty() && !fn_.checksum.empty()) {
|
||||
RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum,
|
||||
&local_location));
|
||||
ctor_params.__set_location(local_location);
|
||||
}
|
||||
|
||||
jbyteArray ctor_params_bytes;
|
||||
|
||||
// Pushed frame will be popped when jni_frame goes out-of-scope.
|
||||
|
||||
@ -221,7 +221,7 @@ done
|
||||
# add custom_libs to CLASSPATH
|
||||
if [[ -d "${DORIS_HOME}/custom_lib" ]]; then
|
||||
for f in "${DORIS_HOME}/custom_lib"/*.jar; do
|
||||
CLASSPATH="${f}:${CLASSPATH}"
|
||||
CLASSPATH="${CLASSPATH}:${f}"
|
||||
done
|
||||
fi
|
||||
|
||||
|
||||
@ -248,10 +248,7 @@ public class CreateFunctionStmt extends DdlStmt {
|
||||
}
|
||||
|
||||
userFile = properties.getOrDefault(FILE_KEY, properties.get(OBJECT_FILE_KEY));
|
||||
if (Strings.isNullOrEmpty(userFile)) {
|
||||
throw new AnalysisException("No 'file' or 'object_file' in properties");
|
||||
}
|
||||
if (binaryType != TFunctionBinaryType.RPC) {
|
||||
if (!Strings.isNullOrEmpty(userFile) && binaryType != TFunctionBinaryType.RPC) {
|
||||
try {
|
||||
computeObjectChecksum();
|
||||
} catch (IOException | NoSuchAlgorithmException e) {
|
||||
@ -304,10 +301,15 @@ public class CreateFunctionStmt extends DdlStmt {
|
||||
private void analyzeUda() throws AnalysisException {
|
||||
AggregateFunction.AggregateFunctionBuilder builder
|
||||
= AggregateFunction.AggregateFunctionBuilder.createUdfBuilder();
|
||||
|
||||
URI location;
|
||||
if (!Strings.isNullOrEmpty(userFile)) {
|
||||
location = URI.create(userFile);
|
||||
} else {
|
||||
location = null;
|
||||
}
|
||||
builder.name(functionName).argsType(argsDef.getArgTypes()).retType(returnType.getType())
|
||||
.hasVarArgs(argsDef.isVariadic()).intermediateType(intermediateType.getType())
|
||||
.location(URI.create(userFile));
|
||||
.location(location);
|
||||
String initFnSymbol = properties.get(INIT_KEY);
|
||||
if (initFnSymbol == null && !(binaryType == TFunctionBinaryType.JAVA_UDF
|
||||
|| binaryType == TFunctionBinaryType.RPC)) {
|
||||
@ -353,8 +355,6 @@ public class CreateFunctionStmt extends DdlStmt {
|
||||
function = builder.initFnSymbol(initFnSymbol).updateFnSymbol(updateFnSymbol).mergeFnSymbol(mergeFnSymbol)
|
||||
.serializeFnSymbol(serializeFnSymbol).finalizeFnSymbol(finalizeFnSymbol)
|
||||
.getValueFnSymbol(getValueFnSymbol).removeFnSymbol(removeFnSymbol).symbolName(symbol).build();
|
||||
|
||||
URI location = URI.create(userFile);
|
||||
function.setLocation(location);
|
||||
function.setBinaryType(binaryType);
|
||||
function.setChecksum(checksum);
|
||||
@ -378,7 +378,12 @@ public class CreateFunctionStmt extends DdlStmt {
|
||||
} else if (binaryType == TFunctionBinaryType.JAVA_UDF) {
|
||||
analyzeJavaUdf(symbol);
|
||||
}
|
||||
URI location = URI.create(userFile);
|
||||
URI location;
|
||||
if (!Strings.isNullOrEmpty(userFile)) {
|
||||
location = URI.create(userFile);
|
||||
} else {
|
||||
location = null;
|
||||
}
|
||||
function = ScalarFunction.createUdf(binaryType,
|
||||
functionName, argsDef.getArgTypes(),
|
||||
returnType.getType(), argsDef.isVariadic(),
|
||||
@ -391,94 +396,18 @@ public class CreateFunctionStmt extends DdlStmt {
|
||||
HashMap<String, Method> allMethods = new HashMap<>();
|
||||
|
||||
try {
|
||||
if (Strings.isNullOrEmpty(userFile)) {
|
||||
try {
|
||||
ClassLoader cl = this.getClass().getClassLoader();
|
||||
checkUdafClass(clazz, cl, allMethods);
|
||||
return;
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new AnalysisException("Class [" + clazz + "] not found in classpath");
|
||||
}
|
||||
}
|
||||
URL[] urls = {new URL("jar:" + userFile + "!/")};
|
||||
try (URLClassLoader cl = URLClassLoader.newInstance(urls)) {
|
||||
Class udfClass = cl.loadClass(clazz);
|
||||
String udfClassName = udfClass.getCanonicalName();
|
||||
String stateClassName = udfClassName + "$" + STATE_CLASS_NAME;
|
||||
Class stateClass = cl.loadClass(stateClassName);
|
||||
|
||||
for (Method m : udfClass.getMethods()) {
|
||||
if (!m.getDeclaringClass().equals(udfClass)) {
|
||||
continue;
|
||||
}
|
||||
String name = m.getName();
|
||||
if (allMethods.containsKey(name)) {
|
||||
throw new AnalysisException(
|
||||
String.format("UDF class '%s' has multiple methods with name '%s' ", udfClassName,
|
||||
name));
|
||||
}
|
||||
allMethods.put(name, m);
|
||||
}
|
||||
|
||||
if (allMethods.get(CREATE_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", CREATE_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(CREATE_METHOD_NAME, allMethods.get(CREATE_METHOD_NAME), udfClassName);
|
||||
checkArgumentCount(allMethods.get(CREATE_METHOD_NAME), 0, udfClassName);
|
||||
checkReturnJavaType(udfClassName, allMethods.get(CREATE_METHOD_NAME), stateClass);
|
||||
}
|
||||
|
||||
if (allMethods.get(DESTROY_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", DESTROY_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(DESTROY_METHOD_NAME, allMethods.get(DESTROY_METHOD_NAME),
|
||||
udfClassName);
|
||||
checkArgumentCount(allMethods.get(DESTROY_METHOD_NAME), 1, udfClassName);
|
||||
checkReturnJavaType(udfClassName, allMethods.get(DESTROY_METHOD_NAME), void.class);
|
||||
}
|
||||
|
||||
if (allMethods.get(ADD_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", ADD_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(ADD_METHOD_NAME, allMethods.get(ADD_METHOD_NAME), udfClassName);
|
||||
checkArgumentCount(allMethods.get(ADD_METHOD_NAME), argsDef.getArgTypes().length + 1, udfClassName);
|
||||
checkReturnJavaType(udfClassName, allMethods.get(ADD_METHOD_NAME), void.class);
|
||||
for (int i = 0; i < argsDef.getArgTypes().length; i++) {
|
||||
Parameter p = allMethods.get(ADD_METHOD_NAME).getParameters()[i + 1];
|
||||
checkUdfType(udfClass, allMethods.get(ADD_METHOD_NAME), argsDef.getArgTypes()[i], p.getType(),
|
||||
p.getName());
|
||||
}
|
||||
}
|
||||
|
||||
if (allMethods.get(SERIALIZE_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", SERIALIZE_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(SERIALIZE_METHOD_NAME, allMethods.get(SERIALIZE_METHOD_NAME),
|
||||
udfClassName);
|
||||
checkArgumentCount(allMethods.get(SERIALIZE_METHOD_NAME), 2, udfClassName);
|
||||
checkReturnJavaType(udfClassName, allMethods.get(SERIALIZE_METHOD_NAME), void.class);
|
||||
}
|
||||
|
||||
if (allMethods.get(MERGE_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", MERGE_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(MERGE_METHOD_NAME, allMethods.get(MERGE_METHOD_NAME), udfClassName);
|
||||
checkArgumentCount(allMethods.get(MERGE_METHOD_NAME), 2, udfClassName);
|
||||
checkReturnJavaType(udfClassName, allMethods.get(MERGE_METHOD_NAME), void.class);
|
||||
}
|
||||
|
||||
if (allMethods.get(GETVALUE_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", GETVALUE_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(GETVALUE_METHOD_NAME, allMethods.get(GETVALUE_METHOD_NAME),
|
||||
udfClassName);
|
||||
checkArgumentCount(allMethods.get(GETVALUE_METHOD_NAME), 1, udfClassName);
|
||||
checkReturnUdfType(udfClass, allMethods.get(GETVALUE_METHOD_NAME), returnType.getType());
|
||||
}
|
||||
|
||||
if (!Modifier.isPublic(stateClass.getModifiers()) || !Modifier.isStatic(stateClass.getModifiers())) {
|
||||
throw new AnalysisException(
|
||||
String.format(
|
||||
"UDAF '%s' should have one public & static 'State' class to Construction data ",
|
||||
udfClassName));
|
||||
}
|
||||
checkUdafClass(clazz, cl, allMethods);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new AnalysisException(
|
||||
"Class [" + clazz + "] or inner class [State] not found in file :" + userFile);
|
||||
@ -490,6 +419,96 @@ public class CreateFunctionStmt extends DdlStmt {
|
||||
}
|
||||
}
|
||||
|
||||
private void checkUdafClass(String clazz, ClassLoader cl, HashMap<String, Method> allMethods)
|
||||
throws ClassNotFoundException, AnalysisException {
|
||||
Class udfClass = cl.loadClass(clazz);
|
||||
String udfClassName = udfClass.getCanonicalName();
|
||||
String stateClassName = udfClassName + "$" + STATE_CLASS_NAME;
|
||||
Class stateClass = cl.loadClass(stateClassName);
|
||||
|
||||
for (Method m : udfClass.getMethods()) {
|
||||
if (!m.getDeclaringClass().equals(udfClass)) {
|
||||
continue;
|
||||
}
|
||||
String name = m.getName();
|
||||
if (allMethods.containsKey(name)) {
|
||||
throw new AnalysisException(
|
||||
String.format("UDF class '%s' has multiple methods with name '%s' ", udfClassName,
|
||||
name));
|
||||
}
|
||||
allMethods.put(name, m);
|
||||
}
|
||||
|
||||
if (allMethods.get(CREATE_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", CREATE_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(CREATE_METHOD_NAME, allMethods.get(CREATE_METHOD_NAME), udfClassName);
|
||||
checkArgumentCount(allMethods.get(CREATE_METHOD_NAME), 0, udfClassName);
|
||||
checkReturnJavaType(udfClassName, allMethods.get(CREATE_METHOD_NAME), stateClass);
|
||||
}
|
||||
|
||||
if (allMethods.get(DESTROY_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", DESTROY_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(DESTROY_METHOD_NAME, allMethods.get(DESTROY_METHOD_NAME),
|
||||
udfClassName);
|
||||
checkArgumentCount(allMethods.get(DESTROY_METHOD_NAME), 1, udfClassName);
|
||||
checkReturnJavaType(udfClassName, allMethods.get(DESTROY_METHOD_NAME), void.class);
|
||||
}
|
||||
|
||||
if (allMethods.get(ADD_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", ADD_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(ADD_METHOD_NAME, allMethods.get(ADD_METHOD_NAME), udfClassName);
|
||||
checkArgumentCount(allMethods.get(ADD_METHOD_NAME), argsDef.getArgTypes().length + 1, udfClassName);
|
||||
checkReturnJavaType(udfClassName, allMethods.get(ADD_METHOD_NAME), void.class);
|
||||
for (int i = 0; i < argsDef.getArgTypes().length; i++) {
|
||||
Parameter p = allMethods.get(ADD_METHOD_NAME).getParameters()[i + 1];
|
||||
checkUdfType(udfClass, allMethods.get(ADD_METHOD_NAME), argsDef.getArgTypes()[i], p.getType(),
|
||||
p.getName());
|
||||
}
|
||||
}
|
||||
|
||||
if (allMethods.get(SERIALIZE_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", SERIALIZE_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(SERIALIZE_METHOD_NAME, allMethods.get(SERIALIZE_METHOD_NAME),
|
||||
udfClassName);
|
||||
checkArgumentCount(allMethods.get(SERIALIZE_METHOD_NAME), 2, udfClassName);
|
||||
checkReturnJavaType(udfClassName, allMethods.get(SERIALIZE_METHOD_NAME), void.class);
|
||||
}
|
||||
|
||||
if (allMethods.get(MERGE_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", MERGE_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(MERGE_METHOD_NAME, allMethods.get(MERGE_METHOD_NAME), udfClassName);
|
||||
checkArgumentCount(allMethods.get(MERGE_METHOD_NAME), 2, udfClassName);
|
||||
checkReturnJavaType(udfClassName, allMethods.get(MERGE_METHOD_NAME), void.class);
|
||||
}
|
||||
|
||||
if (allMethods.get(GETVALUE_METHOD_NAME) == null) {
|
||||
throw new AnalysisException(
|
||||
String.format("No method '%s' in class '%s'!", GETVALUE_METHOD_NAME, udfClassName));
|
||||
} else {
|
||||
checkMethodNonStaticAndPublic(GETVALUE_METHOD_NAME, allMethods.get(GETVALUE_METHOD_NAME),
|
||||
udfClassName);
|
||||
checkArgumentCount(allMethods.get(GETVALUE_METHOD_NAME), 1, udfClassName);
|
||||
checkReturnUdfType(udfClass, allMethods.get(GETVALUE_METHOD_NAME), returnType.getType());
|
||||
}
|
||||
|
||||
if (!Modifier.isPublic(stateClass.getModifiers()) || !Modifier.isStatic(stateClass.getModifiers())) {
|
||||
throw new AnalysisException(
|
||||
String.format(
|
||||
"UDAF '%s' should have one public & static 'State' class to Construction data ",
|
||||
udfClassName));
|
||||
}
|
||||
}
|
||||
|
||||
private void checkMethodNonStaticAndPublic(String methoName, Method method, String udfClassName)
|
||||
throws AnalysisException {
|
||||
if (Modifier.isStatic(method.getModifiers())) {
|
||||
@ -529,60 +548,18 @@ public class CreateFunctionStmt extends DdlStmt {
|
||||
|
||||
private void analyzeJavaUdf(String clazz) throws AnalysisException {
|
||||
try {
|
||||
if (Strings.isNullOrEmpty(userFile)) {
|
||||
try {
|
||||
ClassLoader cl = this.getClass().getClassLoader();
|
||||
checkUdfClass(clazz, cl);
|
||||
return;
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new AnalysisException("Class [" + clazz + "] not found in classpath");
|
||||
}
|
||||
}
|
||||
URL[] urls = {new URL("jar:" + userFile + "!/")};
|
||||
try (URLClassLoader cl = URLClassLoader.newInstance(urls)) {
|
||||
Class udfClass = cl.loadClass(clazz);
|
||||
List<Method> evalList = Arrays.stream(udfClass.getMethods())
|
||||
.filter(m -> m.getDeclaringClass().equals(udfClass) && EVAL_METHOD_KEY.equals(m.getName()))
|
||||
.collect(Collectors.toList());
|
||||
if (evalList.size() == 0) {
|
||||
throw new AnalysisException(String.format(
|
||||
"No method '%s' in class '%s'!", EVAL_METHOD_KEY, udfClass.getCanonicalName()));
|
||||
}
|
||||
List<Method> evalNonStaticAndPublicList = evalList.stream()
|
||||
.filter(m -> !Modifier.isStatic(m.getModifiers()) && Modifier.isPublic(m.getModifiers()))
|
||||
.collect(Collectors.toList());
|
||||
if (evalNonStaticAndPublicList.size() == 0) {
|
||||
throw new AnalysisException(
|
||||
String.format("Method '%s' in class '%s' should be non-static and public", EVAL_METHOD_KEY,
|
||||
udfClass.getCanonicalName()));
|
||||
}
|
||||
List<Method> evalArgLengthMatchList = evalNonStaticAndPublicList.stream().filter(
|
||||
m -> m.getParameters().length == argsDef.getArgTypes().length).collect(Collectors.toList());
|
||||
if (evalArgLengthMatchList.size() == 0) {
|
||||
throw new AnalysisException(
|
||||
String.format("The number of parameters for method '%s' in class '%s' should be %d",
|
||||
EVAL_METHOD_KEY, udfClass.getCanonicalName(), argsDef.getArgTypes().length));
|
||||
} else if (evalArgLengthMatchList.size() == 1) {
|
||||
Method method = evalArgLengthMatchList.get(0);
|
||||
checkUdfType(udfClass, method, returnType.getType(), method.getReturnType(), "return");
|
||||
for (int i = 0; i < method.getParameters().length; i++) {
|
||||
Parameter p = method.getParameters()[i];
|
||||
checkUdfType(udfClass, method, argsDef.getArgTypes()[i], p.getType(), p.getName());
|
||||
}
|
||||
} else {
|
||||
// If multiple methods have the same parameters,
|
||||
// the error message returned cannot be as specific as a single method
|
||||
boolean hasError = false;
|
||||
for (Method method : evalArgLengthMatchList) {
|
||||
try {
|
||||
checkUdfType(udfClass, method, returnType.getType(), method.getReturnType(), "return");
|
||||
for (int i = 0; i < method.getParameters().length; i++) {
|
||||
Parameter p = method.getParameters()[i];
|
||||
checkUdfType(udfClass, method, argsDef.getArgTypes()[i], p.getType(), p.getName());
|
||||
}
|
||||
hasError = false;
|
||||
break;
|
||||
} catch (AnalysisException e) {
|
||||
hasError = true;
|
||||
}
|
||||
}
|
||||
if (hasError) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Multi methods '%s' in class '%s' and no one passed parameter matching verification",
|
||||
EVAL_METHOD_KEY, udfClass.getCanonicalName()));
|
||||
}
|
||||
}
|
||||
checkUdfClass(clazz, cl);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new AnalysisException("Class [" + clazz + "] not found in file :" + userFile);
|
||||
} catch (IOException e) {
|
||||
@ -593,6 +570,61 @@ public class CreateFunctionStmt extends DdlStmt {
|
||||
}
|
||||
}
|
||||
|
||||
private void checkUdfClass(String clazz, ClassLoader cl) throws ClassNotFoundException, AnalysisException {
|
||||
Class udfClass = cl.loadClass(clazz);
|
||||
List<Method> evalList = Arrays.stream(udfClass.getMethods())
|
||||
.filter(m -> m.getDeclaringClass().equals(udfClass) && EVAL_METHOD_KEY.equals(m.getName()))
|
||||
.collect(Collectors.toList());
|
||||
if (evalList.size() == 0) {
|
||||
throw new AnalysisException(String.format(
|
||||
"No method '%s' in class '%s'!", EVAL_METHOD_KEY, udfClass.getCanonicalName()));
|
||||
}
|
||||
List<Method> evalNonStaticAndPublicList = evalList.stream()
|
||||
.filter(m -> !Modifier.isStatic(m.getModifiers()) && Modifier.isPublic(m.getModifiers()))
|
||||
.collect(Collectors.toList());
|
||||
if (evalNonStaticAndPublicList.size() == 0) {
|
||||
throw new AnalysisException(
|
||||
String.format("Method '%s' in class '%s' should be non-static and public", EVAL_METHOD_KEY,
|
||||
udfClass.getCanonicalName()));
|
||||
}
|
||||
List<Method> evalArgLengthMatchList = evalNonStaticAndPublicList.stream().filter(
|
||||
m -> m.getParameters().length == argsDef.getArgTypes().length).collect(Collectors.toList());
|
||||
if (evalArgLengthMatchList.size() == 0) {
|
||||
throw new AnalysisException(
|
||||
String.format("The number of parameters for method '%s' in class '%s' should be %d",
|
||||
EVAL_METHOD_KEY, udfClass.getCanonicalName(), argsDef.getArgTypes().length));
|
||||
} else if (evalArgLengthMatchList.size() == 1) {
|
||||
Method method = evalArgLengthMatchList.get(0);
|
||||
checkUdfType(udfClass, method, returnType.getType(), method.getReturnType(), "return");
|
||||
for (int i = 0; i < method.getParameters().length; i++) {
|
||||
Parameter p = method.getParameters()[i];
|
||||
checkUdfType(udfClass, method, argsDef.getArgTypes()[i], p.getType(), p.getName());
|
||||
}
|
||||
} else {
|
||||
// If multiple methods have the same parameters,
|
||||
// the error message returned cannot be as specific as a single method
|
||||
boolean hasError = false;
|
||||
for (Method method : evalArgLengthMatchList) {
|
||||
try {
|
||||
checkUdfType(udfClass, method, returnType.getType(), method.getReturnType(), "return");
|
||||
for (int i = 0; i < method.getParameters().length; i++) {
|
||||
Parameter p = method.getParameters()[i];
|
||||
checkUdfType(udfClass, method, argsDef.getArgTypes()[i], p.getType(), p.getName());
|
||||
}
|
||||
hasError = false;
|
||||
break;
|
||||
} catch (AnalysisException e) {
|
||||
hasError = true;
|
||||
}
|
||||
}
|
||||
if (hasError) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Multi methods '%s' in class '%s' and no one passed parameter matching verification",
|
||||
EVAL_METHOD_KEY, udfClass.getCanonicalName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkUdfType(Class clazz, Method method, Type expType, Class pType, String pname)
|
||||
throws AnalysisException {
|
||||
Set<Class> javaTypes;
|
||||
|
||||
@ -151,7 +151,7 @@ public class JavaUdaf extends AggregateFunction implements ExplicitlyCastableSig
|
||||
JavaUdaf udaf = new JavaUdaf(fnName, aggregate.getId(), dbName, aggregate.getBinaryType(), sig,
|
||||
intermediateType,
|
||||
aggregate.getNullableMode(),
|
||||
aggregate.getLocation().getLocation(),
|
||||
aggregate.getLocation() == null ? null : aggregate.getLocation().getLocation(),
|
||||
aggregate.getSymbolName(),
|
||||
aggregate.getInitFnSymbol(),
|
||||
aggregate.getUpdateFnSymbol(),
|
||||
@ -182,7 +182,7 @@ public class JavaUdaf extends AggregateFunction implements ExplicitlyCastableSig
|
||||
signature.returnType.toCatalogDataType(),
|
||||
signature.hasVarArgs,
|
||||
intermediateType.toCatalogDataType(),
|
||||
URI.create(objectFile),
|
||||
objectFile == null ? null : URI.create(objectFile),
|
||||
initFn,
|
||||
updateFn,
|
||||
mergeFn,
|
||||
|
||||
@ -129,7 +129,7 @@ public class JavaUdf extends ScalarFunction implements ExplicitlyCastableSignatu
|
||||
|
||||
JavaUdf udf = new JavaUdf(fnName, scalar.getId(), dbName, scalar.getBinaryType(), sig,
|
||||
scalar.getNullableMode(),
|
||||
scalar.getLocation().getLocation(),
|
||||
scalar.getLocation() == null ? null : scalar.getLocation().getLocation(),
|
||||
scalar.getSymbolName(),
|
||||
scalar.getPrepareFnSymbol(),
|
||||
scalar.getCloseFnSymbol(),
|
||||
@ -154,7 +154,7 @@ public class JavaUdf extends ScalarFunction implements ExplicitlyCastableSignatu
|
||||
signature.argumentsTypes.stream().map(DataType::toCatalogDataType).toArray(Type[]::new),
|
||||
signature.returnType.toCatalogDataType(),
|
||||
signature.hasVarArgs,
|
||||
URI.create(objectFile),
|
||||
objectFile == null ? null : URI.create(objectFile),
|
||||
symbol,
|
||||
prepareFn,
|
||||
closeFn
|
||||
|
||||
@ -0,0 +1,20 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !select_default --
|
||||
1 1 abcdefg1 poiuytre1abcdefg
|
||||
2 2 abcdefg2 poiuytre2abcdefg
|
||||
0 3 abcdefg3 poiuytre3abcdefg
|
||||
1 4 abcdefg4 poiuytre4abcdefg
|
||||
2 5 abcdefg5 poiuytre5abcdefg
|
||||
0 6 abcdefg6 poiuytre6abcdefg
|
||||
1 7 abcdefg7 poiuytre7abcdefg
|
||||
2 8 abcdefg8 poiuytre8abcdefg
|
||||
9 9 abcdefg9 poiuytre9abcdefg
|
||||
|
||||
-- !select1 --
|
||||
18
|
||||
|
||||
-- !select2 --
|
||||
0 0
|
||||
1 3
|
||||
2 6
|
||||
9 9
|
||||
@ -0,0 +1,12 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !java_udf_all_types --
|
||||
1 1
|
||||
2 2
|
||||
3 3
|
||||
4 4
|
||||
5 5
|
||||
6 6
|
||||
7 7
|
||||
8 8
|
||||
9 9
|
||||
10 10
|
||||
@ -0,0 +1,64 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import org.codehaus.groovy.runtime.IOGroovyMethods
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
|
||||
suite("test_javaudaf_custom_lib_mysum_int") {
|
||||
def tableName = "test_javaudaf_custom_lib_mysum_int"
|
||||
try {
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
`user_id` INT NOT NULL COMMENT "用户id",
|
||||
`char_col` CHAR NOT NULL COMMENT "",
|
||||
`varchar_col` VARCHAR(10) NOT NULL COMMENT "",
|
||||
`string_col` STRING NOT NULL COMMENT ""
|
||||
)
|
||||
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
|
||||
"""
|
||||
StringBuilder sb = new StringBuilder()
|
||||
int i = 1
|
||||
for (; i < 9; i ++) {
|
||||
sb.append("""
|
||||
(${i % 3}, '${i}','abcdefg${i}','poiuytre${i}abcdefg'),
|
||||
""")
|
||||
}
|
||||
sb.append("""
|
||||
(${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg')
|
||||
""")
|
||||
sql """ INSERT INTO ${tableName} VALUES
|
||||
${sb.toString()}
|
||||
"""
|
||||
qt_select_default """ SELECT * FROM ${tableName} t ORDER BY char_col; """
|
||||
|
||||
sql """ CREATE AGGREGATE FUNCTION c_udaf_my_sum_int(int) RETURNS BigInt PROPERTIES (
|
||||
"symbol"="org.apache.doris.udf.MySumInt",
|
||||
"always_nullable"="false",
|
||||
"type"="JAVA_UDF"
|
||||
); """
|
||||
|
||||
qt_select1 """ SELECT c_udaf_my_sum_int(user_id) result FROM ${tableName}; """
|
||||
qt_select2 """ select user_id, c_udaf_my_sum_int(user_id) from ${tableName} group by user_id order by user_id; """
|
||||
} finally {
|
||||
try_sql("DROP FUNCTION IF EXISTS c_udaf_my_sum_int(int);")
|
||||
try_sql("DROP TABLE IF EXISTS ${tableName}")
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,63 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import org.codehaus.groovy.runtime.IOGroovyMethods
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
|
||||
suite("test_javaudf_custom_lib_all_types") {
|
||||
def tableName = "test_javaudf_custom_lib_all_types"
|
||||
try {
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
int_col int,
|
||||
string_col string
|
||||
)
|
||||
DISTRIBUTED BY HASH(int_col) PROPERTIES("replication_num" = "1");
|
||||
"""
|
||||
StringBuilder sb = new StringBuilder()
|
||||
int i = 1
|
||||
for (; i < 10; i++) {
|
||||
sb.append("""
|
||||
(${i},${i%2}),
|
||||
""")
|
||||
}
|
||||
sb.append("""
|
||||
(${i},null)
|
||||
""")
|
||||
sql """ INSERT INTO ${tableName} VALUES
|
||||
${sb.toString()}
|
||||
"""
|
||||
|
||||
sql """DROP FUNCTION IF EXISTS c_echo_int(int);"""
|
||||
sql """CREATE FUNCTION c_echo_int(int) RETURNS int PROPERTIES (
|
||||
"symbol"="org.apache.doris.udf.Echo\$EchoInt",
|
||||
"type"="JAVA_UDF"
|
||||
);"""
|
||||
|
||||
qt_java_udf_all_types """select
|
||||
int_col,
|
||||
c_echo_int(int_col)
|
||||
from ${tableName} order by int_col;"""
|
||||
} finally {
|
||||
try_sql """DROP FUNCTION IF EXISTS c_echo_int(int);"""
|
||||
try_sql("""DROP TABLE IF EXISTS ${tableName};""")
|
||||
}
|
||||
}
|
||||
@ -50,7 +50,7 @@ Usage: $0 <shell_options> <framework_options>
|
||||
Eg.
|
||||
$0 build regression test framework and run all suite which in default group
|
||||
$0 --run test_select run a suite which named as test_select
|
||||
$0 --run 'test*' run all suite which named start with 'test', note that you must quota with ''
|
||||
$0 --compile only compile regression framework
|
||||
$0 --run -s test_select run a suite which named as test_select
|
||||
$0 --run test_select -genOut generate output file for test_select if not exist
|
||||
$0 --run -g default run all suite in the group which named as default
|
||||
@ -65,7 +65,7 @@ Default config file: \${DORIS_HOME}/regression-test/conf/regression-conf.groovy
|
||||
"
|
||||
exit 1
|
||||
}
|
||||
|
||||
ONLY_COMPILE=0
|
||||
CLEAN=0
|
||||
WRONG_CMD=0
|
||||
TEAMCITY=0
|
||||
@ -96,6 +96,11 @@ else
|
||||
shift
|
||||
shift
|
||||
;;
|
||||
--compile)
|
||||
RUN=1
|
||||
ONLY_COMPILE=1
|
||||
shift
|
||||
;;
|
||||
--run)
|
||||
RUN=1
|
||||
shift
|
||||
@ -178,6 +183,11 @@ if ! test -f ${RUN_JAR:+${RUN_JAR}}; then
|
||||
cd "${DORIS_HOME}"/regression-test/java-udf-src
|
||||
"${MVN_CMD}" package
|
||||
cp target/java-udf-case-jar-with-dependencies.jar "${DORIS_HOME}"/regression-test/suites/javaudf_p0/jars/
|
||||
# be and fe dir is compiled output
|
||||
mkdir -p "${DORIS_HOME}"/output/fe/custom_lib/
|
||||
mkdir -p "${DORIS_HOME}"/output/be/custom_lib/
|
||||
cp target/java-udf-case-jar-with-dependencies.jar "${DORIS_HOME}"/output/fe/custom_lib/
|
||||
cp target/java-udf-case-jar-with-dependencies.jar "${DORIS_HOME}"/output/be/custom_lib/
|
||||
cd "${DORIS_HOME}"
|
||||
fi
|
||||
|
||||
@ -205,11 +215,13 @@ if [[ "${TEAMCITY}" -eq 1 ]]; then
|
||||
JAVA_OPTS="${JAVA_OPTS} -DstdoutAppenderType=teamcity -Xmx2048m"
|
||||
fi
|
||||
|
||||
"${JAVA}" -DDORIS_HOME="${DORIS_HOME}" \
|
||||
-DLOG_PATH="${LOG_OUTPUT_FILE}" \
|
||||
-Dfile.encoding="UTF-8" \
|
||||
-Dlogback.configurationFile="${LOG_CONFIG_FILE}" \
|
||||
${JAVA_OPTS:+${JAVA_OPTS}} \
|
||||
-jar ${RUN_JAR:+${RUN_JAR}} \
|
||||
-cf "${CONFIG_FILE}" \
|
||||
${REGRESSION_OPTIONS_PREFIX:+${REGRESSION_OPTIONS_PREFIX}} "$@"
|
||||
if [[ "${ONLY_COMPILE}" -eq 0 ]]; then
|
||||
"${JAVA}" -DDORIS_HOME="${DORIS_HOME}" \
|
||||
-DLOG_PATH="${LOG_OUTPUT_FILE}" \
|
||||
-Dfile.encoding="UTF-8" \
|
||||
-Dlogback.configurationFile="${LOG_CONFIG_FILE}" \
|
||||
${JAVA_OPTS:+${JAVA_OPTS}} \
|
||||
-jar ${RUN_JAR:+${RUN_JAR}} \
|
||||
-cf "${CONFIG_FILE}" \
|
||||
${REGRESSION_OPTIONS_PREFIX:+${REGRESSION_OPTIONS_PREFIX}} "$@"
|
||||
fi
|
||||
|
||||
Reference in New Issue
Block a user