From d7b6fe57df395a788b63063843f638947839e2d2 Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Mon, 9 Oct 2023 15:10:56 +0800 Subject: [PATCH] [Bug](java-udf) fix java-udf memory leak (#25151) --- be/src/vec/functions/function_java_udf.cpp | 56 +++++++++++----------- be/src/vec/functions/function_java_udf.h | 4 -- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp index fa9c8517c8..331bb1bcac 100644 --- a/be/src/vec/functions/function_java_udf.cpp +++ b/be/src/vec/functions/function_java_udf.cpp @@ -85,39 +85,41 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio RETURN_ERROR_IF_EXC(env); context->set_function_state(FunctionContext::FRAGMENT_LOCAL, jni_env); } - JniEnv* jni_env = - reinterpret_cast(context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); - std::shared_ptr jni_ctx = std::make_shared( - _argument_types.size(), jni_env->executor_cl, jni_env->executor_close_id); - context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx); - // Add a scoped cleanup jni reference object. This cleans up local refs made below. - JniLocalFrame jni_frame; - { - std::string local_location; - auto function_cache = UserFunctionCache::instance(); - RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum, - &local_location)); - TJavaUdfExecutorCtorParams ctor_params; - ctor_params.__set_fn(fn_); - ctor_params.__set_location(local_location); - jbyteArray ctor_params_bytes; + if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) { + JniEnv* jni_env = reinterpret_cast( + context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + std::shared_ptr jni_ctx = std::make_shared( + _argument_types.size(), jni_env->executor_cl, jni_env->executor_close_id); + context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx); - // Pushed frame will be popped when jni_frame goes out-of-scope. - RETURN_IF_ERROR(jni_frame.push(env)); + // Add a scoped cleanup jni reference object. This cleans up local refs made below. + JniLocalFrame jni_frame; + { + std::string local_location; + auto function_cache = UserFunctionCache::instance(); + RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum, + &local_location)); + TJavaUdfExecutorCtorParams ctor_params; + ctor_params.__set_fn(fn_); + ctor_params.__set_location(local_location); + jbyteArray ctor_params_bytes; - RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); + // Pushed frame will be popped when jni_frame goes out-of-scope. + RETURN_IF_ERROR(jni_frame.push(env)); - jni_ctx->executor = - env->NewObject(jni_env->executor_cl, jni_env->executor_ctor_id, ctor_params_bytes); + RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); - jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr); - env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT); - env->DeleteLocalRef(ctor_params_bytes); + jni_ctx->executor = env->NewObject(jni_env->executor_cl, jni_env->executor_ctor_id, + ctor_params_bytes); + + jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr); + env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT); + env->DeleteLocalRef(ctor_params_bytes); + } + RETURN_ERROR_IF_EXC(env); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor)); } - RETURN_ERROR_IF_EXC(env); - RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor)); - return Status::OK(); } diff --git a/be/src/vec/functions/function_java_udf.h b/be/src/vec/functions/function_java_udf.h index 410c65e817..c26a60faaa 100644 --- a/be/src/vec/functions/function_java_udf.h +++ b/be/src/vec/functions/function_java_udf.h @@ -165,10 +165,6 @@ private: env->DeleteGlobalRef(executor); is_closed = true; } - - /// These functions are cross-compiled to IR and used by codegen. - static void SetInputNullsBufferElement(JniContext* jni_ctx, int index, uint8_t value); - static uint8_t* GetInputValuesBufferAtOffset(JniContext* jni_ctx, int offset); }; };