diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp index 693a4672de..40381c0e5c 100644 --- a/be/src/runtime/user_function_cache.cpp +++ b/be/src/runtime/user_function_cache.cpp @@ -425,43 +425,4 @@ Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url, return Status::OK(); } -Status UserFunctionCache::check_jar(int64_t fid, const std::string& url, - const std::string& checksum) { - UserFunctionCacheEntry* entry = nullptr; - Status st = Status::OK(); - std::string file_name = _get_file_name_from_url(url); - { - std::lock_guard l(_cache_lock); - auto it = _entry_map.find(fid); - if (it != _entry_map.end()) { - entry = it->second; - } else { - entry = new UserFunctionCacheEntry( - fid, checksum, _make_lib_file(fid, checksum, LibType::JAR, file_name), - LibType::JAR); - entry->ref(); - _entry_map.emplace(fid, entry); - } - entry->ref(); - } - if (entry->is_loaded.load()) { - return st; - } - - std::unique_lock l(entry->load_lock); - if (!entry->is_downloaded) { - st = _download_lib(url, entry); - } - if (!st.ok()) { - // if we load a cache entry failed, I think we should delete this entry cache - // even if this cache was valid before. - _destroy_cache_entry(entry); - return Status::InternalError( - "Java UDAF has error, maybe you should check the path about java impl jar, because " - "{}", - st.to_string()); - } - return Status::OK(); -} - } // namespace doris diff --git a/be/src/runtime/user_function_cache.h b/be/src/runtime/user_function_cache.h index b12edab9e8..b3c4aa7e80 100644 --- a/be/src/runtime/user_function_cache.h +++ b/be/src/runtime/user_function_cache.h @@ -69,7 +69,6 @@ public: Status get_jarpath(int64_t fid, const std::string& url, const std::string& checksum, std::string* libpath); - Status check_jar(int64_t fid, const std::string& url, const std::string& checksum); private: Status _load_cached_lib(); diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index 93ef2341cc..1295e54fb8 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -77,7 +77,7 @@ public: env->DeleteGlobalRef(executor_obj); } - Status init_udaf(const TFunction& fn) { + Status init_udaf(const TFunction& fn, const std::string& local_location) { JNIEnv* env = nullptr; RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf init_udaf function"); RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl)); @@ -87,10 +87,6 @@ public: // Add a scoped cleanup jni reference object. This cleans up local refs made below. JniLocalFrame jni_frame; { - std::string local_location; - auto function_cache = UserFunctionCache::instance(); - RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum, - &local_location)); TJavaUdfExecutorCtorParams ctor_params; ctor_params.__set_fn(fn); ctor_params.__set_location(local_location); @@ -255,6 +251,14 @@ public: jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, \ executor_result_id, to.size() - 1, place); \ while (res != JNI_TRUE) { \ + /*Add this check is now, the agg function can't deal with the return status, */ \ + /*even we return a bad status, nobody could deal with it,*/ \ + /*so add this limit avoid std::bad_alloc, (1024<<10) is enough*/ \ + /*but this maybe get a mistake of result,when could handle exception need removethis*/ \ + if (increase_buffer_size == 10) { \ + return Status::MemoryAllocFailed("memory allocate failed, buffer:{},size:{}", \ + increase_buffer_size, buffer_size); \ + } \ increase_buffer_size++; \ buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ chars.resize(buffer_size); \ @@ -294,6 +298,10 @@ public: jboolean res = env->CallNonvirtualBooleanMethod( \ executor_obj, executor_cl, executor_result_id, to.size() - 1, place); \ while (res != JNI_TRUE) { \ + if (increase_buffer_size == 10) { \ + return Status::MemoryAllocFailed("memory allocate failed, buffer:{},size:{}", \ + increase_buffer_size, buffer_size); \ + } \ increase_buffer_size++; \ buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ null_map_data.resize(buffer_size); \ @@ -312,6 +320,10 @@ public: jboolean res = env->CallNonvirtualBooleanMethod( \ executor_obj, executor_cl, executor_result_id, to.size() - 1, place); \ while (res != JNI_TRUE) { \ + if (increase_buffer_size == 10) { \ + return Status::MemoryAllocFailed("memory allocate failed, buffer:{},size:{}", \ + increase_buffer_size, buffer_size); \ + } \ increase_buffer_size++; \ buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \ null_map_data.resize(buffer_size); \ @@ -414,14 +426,14 @@ public: //So need to check as soon as possible, before call Data function Status check_udaf(const TFunction& fn) { auto function_cache = UserFunctionCache::instance(); - return function_cache->check_jar(fn.id, fn.hdfs_location, fn.checksum); + return function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum, &_local_location); } void create(AggregateDataPtr __restrict place) const override { if (_first_created) { new (place) Data(argument_types.size()); Status status = Status::OK(); - RETURN_IF_STATUS_ERROR(status, this->data(place).init_udaf(_fn)); + RETURN_IF_STATUS_ERROR(status, this->data(place).init_udaf(_fn, _local_location)); _first_created = false; _exec_place = place; } @@ -499,6 +511,7 @@ private: DataTypePtr _return_type; mutable bool _first_created; mutable AggregateDataPtr _exec_place; + std::string _local_location; }; } // namespace doris::vectorized diff --git a/docs/en/docs/ecosystem/udf/java-user-defined-function.md b/docs/en/docs/ecosystem/udf/java-user-defined-function.md index 06c421f39c..08652bcffa 100644 --- a/docs/en/docs/ecosystem/udf/java-user-defined-function.md +++ b/docs/en/docs/ecosystem/udf/java-user-defined-function.md @@ -107,13 +107,11 @@ The following SimpleDemo will implement a simple function similar to sum, the in ```JAVA package org.apache.doris.udf.demo; -import org.apache.hadoop.hive.ql.exec.UDAF; - import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public class SimpleDemo extends UDAF { +public class SimpleDemo { //Need an inner class to store data /*required*/ public static class State { @@ -134,7 +132,7 @@ public class SimpleDemo extends UDAF { /*required*/ //first argument is State, then other types your input - public void add(State state, Integer val) { + public void add(State state, Integer val) throws Exception { /* here doing update work when input data*/ if (val != null) { state.sum += val; @@ -142,36 +140,36 @@ public class SimpleDemo extends UDAF { } /*required*/ - public void serialize(State state, DataOutputStream out) { + public void serialize(State state, DataOutputStream out) throws Exception { /* serialize some data into buffer */ try { out.writeInt(state.sum); - } catch ( IOException e ) { - throw new RuntimeException (e); + } catch (IOException e) { + throw new RuntimeException(e); } } /*required*/ - public void deserialize(State state, DataInputStream in) { + public void deserialize(State state, DataInputStream in) throws Exception { /* deserialize get data from buffer before you put */ int val = 0; try { val = in.readInt(); - } catch ( IOException e ) { - throw new RuntimeException (e); + } catch (IOException e) { + throw new RuntimeException(e); } state.sum = val; } /*required*/ - public void merge(State state, State rhs) { + public void merge(State state, State rhs) throws Exception { /* merge data from state */ state.sum += rhs.sum; } /*required*/ //return Type you defined - public Integer getValue(State state) { + public Integer getValue(State state) throws Exception { /* return finally result */ return state.sum; } diff --git a/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md b/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md index fa2fd6ec56..4a5121d816 100644 --- a/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md +++ b/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md @@ -103,13 +103,11 @@ CREATE FUNCTION java_udf_add_one(int) RETURNS int PROPERTIES ( ```JAVA package org.apache.doris.udf.demo; -import org.apache.hadoop.hive.ql.exec.UDAF; - import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public class SimpleDemo extends UDAF { +public class SimpleDemo { //Need an inner class to store data /*required*/ public static class State { @@ -130,7 +128,7 @@ public class SimpleDemo extends UDAF { /*required*/ //first argument is State, then other types your input - public void add(State state, Integer val) { + public void add(State state, Integer val) throws Exception { /* here doing update work when input data*/ if (val != null) { state.sum += val; @@ -138,36 +136,36 @@ public class SimpleDemo extends UDAF { } /*required*/ - public void serialize(State state, DataOutputStream out) { + public void serialize(State state, DataOutputStream out) throws Exception { /* serialize some data into buffer */ try { out.writeInt(state.sum); - } catch ( IOException e ) { - throw new RuntimeException (e); + } catch (IOException e) { + throw new RuntimeException(e); } } /*required*/ - public void deserialize(State state, DataInputStream in) { + public void deserialize(State state, DataInputStream in) throws Exception { /* deserialize get data from buffer before you put */ int val = 0; try { val = in.readInt(); - } catch ( IOException e ) { - throw new RuntimeException (e); + } catch (IOException e) { + throw new RuntimeException(e); } state.sum = val; } /*required*/ - public void merge(State state, State rhs) { + public void merge(State state, State rhs) throws Exception { /* merge data from state */ state.sum += rhs.sum; } /*required*/ //return Type you defined - public Integer getValue(State state) { + public Integer getValue(State state) throws Exception { /* return finally result */ return state.sum; } diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java index 7ce47b4837..1cba22a120 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java @@ -85,6 +85,7 @@ public class UdafExecutor extends BaseExecutor { } while (isSinglePlace && idx < rowEnd); } while (idx < rowEnd); } catch (Exception e) { + LOG.warn("invoke add function meet some error: " + e.getCause().toString()); throw new UdfRuntimeException("UDAF failed to add: ", e); } } @@ -96,6 +97,7 @@ public class UdafExecutor extends BaseExecutor { try { return allMethods.get(UDAF_CREATE_FUNCTION).invoke(udf, null); } catch (Exception e) { + LOG.warn("invoke createAggState function meet some error: " + e.getCause().toString()); throw new UdfRuntimeException("UDAF failed to create: ", e); } } @@ -110,6 +112,7 @@ public class UdafExecutor extends BaseExecutor { } stateObjMap.clear(); } catch (Exception e) { + LOG.warn("invoke destroy function meet some error: " + e.getCause().toString()); throw new UdfRuntimeException("UDAF failed to destroy: ", e); } } @@ -126,6 +129,7 @@ public class UdafExecutor extends BaseExecutor { allMethods.get(UDAF_SERIALIZE_FUNCTION).invoke(udf, args); return baos.toByteArray(); } catch (Exception e) { + LOG.warn("invoke serialize function meet some error: " + e.getCause().toString()); throw new UdfRuntimeException("UDAF failed to serialize: ", e); } } @@ -147,6 +151,7 @@ public class UdafExecutor extends BaseExecutor { args[0] = stateObjMap.get(curPlace); allMethods.get(UDAF_MERGE_FUNCTION).invoke(udf, args); } catch (Exception e) { + LOG.warn("invoke merge function meet some error: " + e.getCause().toString()); throw new UdfRuntimeException("UDAF failed to merge: ", e); } } @@ -156,9 +161,13 @@ public class UdafExecutor extends BaseExecutor { */ public boolean getValue(long row, long place) throws UdfRuntimeException { try { + if (stateObjMap.get(place) == null) { + stateObjMap.put(place, createAggState()); + } return storeUdfResult(allMethods.get(UDAF_RESULT_FUNCTION).invoke(udf, stateObjMap.get((Long) place)), row, retClass); } catch (Exception e) { + LOG.warn("invoke getValue function meet some error: " + e.getCause().toString()); throw new UdfRuntimeException("UDAF failed to result", e); } }