[Bug](java-udf) fix java-udf memory leak (#25151)
This commit is contained in:
@ -85,39 +85,41 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
|
||||
RETURN_ERROR_IF_EXC(env);
|
||||
context->set_function_state(FunctionContext::FRAGMENT_LOCAL, jni_env);
|
||||
}
|
||||
JniEnv* jni_env =
|
||||
reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
|
||||
std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>(
|
||||
_argument_types.size(), jni_env->executor_cl, jni_env->executor_close_id);
|
||||
context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
|
||||
|
||||
// Add a scoped cleanup jni reference object. This cleans up local refs made below.
|
||||
JniLocalFrame jni_frame;
|
||||
{
|
||||
std::string local_location;
|
||||
auto function_cache = UserFunctionCache::instance();
|
||||
RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum,
|
||||
&local_location));
|
||||
TJavaUdfExecutorCtorParams ctor_params;
|
||||
ctor_params.__set_fn(fn_);
|
||||
ctor_params.__set_location(local_location);
|
||||
jbyteArray ctor_params_bytes;
|
||||
if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
|
||||
JniEnv* jni_env = reinterpret_cast<JniEnv*>(
|
||||
context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
|
||||
std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>(
|
||||
_argument_types.size(), jni_env->executor_cl, jni_env->executor_close_id);
|
||||
context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
|
||||
|
||||
// Pushed frame will be popped when jni_frame goes out-of-scope.
|
||||
RETURN_IF_ERROR(jni_frame.push(env));
|
||||
// Add a scoped cleanup jni reference object. This cleans up local refs made below.
|
||||
JniLocalFrame jni_frame;
|
||||
{
|
||||
std::string local_location;
|
||||
auto function_cache = UserFunctionCache::instance();
|
||||
RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum,
|
||||
&local_location));
|
||||
TJavaUdfExecutorCtorParams ctor_params;
|
||||
ctor_params.__set_fn(fn_);
|
||||
ctor_params.__set_location(local_location);
|
||||
jbyteArray ctor_params_bytes;
|
||||
|
||||
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
|
||||
// Pushed frame will be popped when jni_frame goes out-of-scope.
|
||||
RETURN_IF_ERROR(jni_frame.push(env));
|
||||
|
||||
jni_ctx->executor =
|
||||
env->NewObject(jni_env->executor_cl, jni_env->executor_ctor_id, ctor_params_bytes);
|
||||
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
|
||||
|
||||
jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
|
||||
env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
|
||||
env->DeleteLocalRef(ctor_params_bytes);
|
||||
jni_ctx->executor = env->NewObject(jni_env->executor_cl, jni_env->executor_ctor_id,
|
||||
ctor_params_bytes);
|
||||
|
||||
jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
|
||||
env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
|
||||
env->DeleteLocalRef(ctor_params_bytes);
|
||||
}
|
||||
RETURN_ERROR_IF_EXC(env);
|
||||
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor));
|
||||
}
|
||||
RETURN_ERROR_IF_EXC(env);
|
||||
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor));
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user