[fix](java udf) make executor class thread local (#25758)

This commit is contained in:
Ashin Gau
2023-10-23 16:55:15 +08:00
committed by GitHub
parent b5ee4a9dbb
commit 6a6e10c182
2 changed files with 22 additions and 56 deletions

View File

@ -17,31 +17,16 @@
#include "vec/functions/function_java_udf.h"
#include <glog/logging.h>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "gutil/strings/substitute.h"
#include "jni.h"
#include "jni_md.h"
#include "runtime/user_function_cache.h"
#include "util/jni-util.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/jni_connector.h"
const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor";
@ -60,25 +45,9 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
if (env == nullptr) {
return Status::InternalError("Failed to get/create JVM");
}
if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
std::shared_ptr<JniEnv> jni_env = std::make_shared<JniEnv>();
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &jni_env->executor_cl));
jni_env->executor_ctor_id =
env->GetMethodID(jni_env->executor_cl, "<init>", EXECUTOR_CTOR_SIGNATURE);
RETURN_ERROR_IF_EXC(env);
jni_env->executor_evaluate_id =
env->GetMethodID(jni_env->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
jni_env->executor_close_id =
env->GetMethodID(jni_env->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE);
RETURN_ERROR_IF_EXC(env);
context->set_function_state(FunctionContext::FRAGMENT_LOCAL, jni_env);
}
if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
JniEnv* jni_env = reinterpret_cast<JniEnv*>(
context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>(
_argument_types.size(), jni_env->executor_cl, jni_env->executor_close_id);
std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>();
context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
// Add a scoped cleanup jni reference object. This cleans up local refs made below.
@ -98,7 +67,14 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
jni_ctx->executor = env->NewObject(jni_env->executor_cl, jni_env->executor_ctor_id,
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &jni_ctx->executor_cl));
jni_ctx->executor_ctor_id =
env->GetMethodID(jni_ctx->executor_cl, "<init>", EXECUTOR_CTOR_SIGNATURE);
jni_ctx->executor_evaluate_id =
env->GetMethodID(jni_ctx->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
jni_ctx->executor_close_id =
env->GetMethodID(jni_ctx->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE);
jni_ctx->executor = env->NewObject(jni_ctx->executor_cl, jni_ctx->executor_ctor_id,
ctor_params_bytes);
jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
@ -118,8 +94,6 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
JniContext* jni_ctx = reinterpret_cast<JniContext*>(
context->get_function_state(FunctionContext::THREAD_LOCAL));
JniEnv* jni_env =
reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
std::unique_ptr<long[]> input_table;
RETURN_IF_ERROR(JniConnector::to_java_table(&block, num_rows, arguments, input_table));
@ -136,7 +110,7 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
{"required_fields", output_table_schema.first},
{"columns_types", output_table_schema.second}};
jobject output_map = JniUtil::convert_to_java_map(env, output_params);
long output_address = env->CallLongMethod(jni_ctx->executor, jni_env->executor_evaluate_id,
long output_address = env->CallLongMethod(jni_ctx->executor, jni_ctx->executor_evaluate_id,
input_map, output_map);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
env->DeleteLocalRef(input_map);