diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp index ca69fbb0bf..751bbdbe55 100644 --- a/be/src/vec/functions/function_java_udf.cpp +++ b/be/src/vec/functions/function_java_udf.cpp @@ -17,31 +17,16 @@ #include "vec/functions/function_java_udf.h" -#include - -#include #include #include -#include #include -#include "gutil/strings/substitute.h" #include "jni.h" -#include "jni_md.h" #include "runtime/user_function_cache.h" #include "util/jni-util.h" #include "vec/columns/column.h" -#include "vec/columns/column_array.h" -#include "vec/columns/column_map.h" -#include "vec/columns/column_nullable.h" -#include "vec/columns/column_string.h" -#include "vec/columns/column_vector.h" -#include "vec/columns/columns_number.h" #include "vec/common/assert_cast.h" -#include "vec/common/string_ref.h" #include "vec/core/block.h" -#include "vec/data_types/data_type_array.h" -#include "vec/data_types/data_type_nullable.h" #include "vec/exec/jni_connector.h" const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor"; @@ -60,25 +45,9 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio if (env == nullptr) { return Status::InternalError("Failed to get/create JVM"); } - if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) { - std::shared_ptr jni_env = std::make_shared(); - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &jni_env->executor_cl)); - jni_env->executor_ctor_id = - env->GetMethodID(jni_env->executor_cl, "", EXECUTOR_CTOR_SIGNATURE); - RETURN_ERROR_IF_EXC(env); - jni_env->executor_evaluate_id = - env->GetMethodID(jni_env->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE); - jni_env->executor_close_id = - env->GetMethodID(jni_env->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE); - RETURN_ERROR_IF_EXC(env); - context->set_function_state(FunctionContext::FRAGMENT_LOCAL, jni_env); - } if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) { - JniEnv* jni_env = reinterpret_cast( - context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); - std::shared_ptr jni_ctx = std::make_shared( - _argument_types.size(), jni_env->executor_cl, jni_env->executor_close_id); + std::shared_ptr jni_ctx = std::make_shared(); context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx); // Add a scoped cleanup jni reference object. This cleans up local refs made below. @@ -98,7 +67,14 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); - jni_ctx->executor = env->NewObject(jni_env->executor_cl, jni_env->executor_ctor_id, + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &jni_ctx->executor_cl)); + jni_ctx->executor_ctor_id = + env->GetMethodID(jni_ctx->executor_cl, "", EXECUTOR_CTOR_SIGNATURE); + jni_ctx->executor_evaluate_id = + env->GetMethodID(jni_ctx->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE); + jni_ctx->executor_close_id = + env->GetMethodID(jni_ctx->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE); + jni_ctx->executor = env->NewObject(jni_ctx->executor_cl, jni_ctx->executor_ctor_id, ctor_params_bytes); jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr); @@ -118,8 +94,6 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block, RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); JniContext* jni_ctx = reinterpret_cast( context->get_function_state(FunctionContext::THREAD_LOCAL)); - JniEnv* jni_env = - reinterpret_cast(context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); std::unique_ptr input_table; RETURN_IF_ERROR(JniConnector::to_java_table(&block, num_rows, arguments, input_table)); @@ -136,7 +110,7 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block, {"required_fields", output_table_schema.first}, {"columns_types", output_table_schema.second}}; jobject output_map = JniUtil::convert_to_java_map(env, output_params); - long output_address = env->CallLongMethod(jni_ctx->executor, jni_env->executor_evaluate_id, + long output_address = env->CallLongMethod(jni_ctx->executor, jni_ctx->executor_evaluate_id, input_map, output_map); RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); env->DeleteLocalRef(input_map); diff --git a/be/src/vec/functions/function_java_udf.h b/be/src/vec/functions/function_java_udf.h index 425f05ccfd..a30a93b355 100644 --- a/be/src/vec/functions/function_java_udf.h +++ b/be/src/vec/functions/function_java_udf.h @@ -38,9 +38,8 @@ #include "vec/core/types.h" #include "vec/data_types/data_type.h" #include "vec/functions/function.h" -namespace doris { -namespace vectorized { +namespace doris::vectorized { class JavaUdfPreparedFunction : public PreparedFunctionImpl { public: @@ -114,26 +113,18 @@ private: const DataTypes _argument_types; const DataTypePtr _return_type; - struct JniEnv { - /// Global class reference to the UdfExecutor Java class and related method IDs. Set in - /// Init(). These have the lifetime of the process (i.e. 'executor_cl_' is never freed). - jclass executor_cl; - jmethodID executor_ctor_id; - jmethodID executor_evaluate_id; - jmethodID executor_close_id; - }; - struct JniContext { // Do not save parent directly, because parent is in VExpr, but jni context is in FunctionContext // The deconstruct sequence is not determined, it will core. // JniContext's lifecycle should same with function context, not related with expr - jclass executor_cl_; - jmethodID executor_close_id_; + jclass executor_cl; + jmethodID executor_ctor_id; + jmethodID executor_evaluate_id; + jmethodID executor_close_id; jobject executor = nullptr; bool is_closed = false; - JniContext(int64_t num_args, jclass executor_cl, jmethodID executor_close_id) - : executor_cl_(executor_cl), executor_close_id_(executor_close_id) {} + JniContext() = default; void close() { if (is_closed) { @@ -146,15 +137,16 @@ private: LOG(WARNING) << "errors while get jni env " << status; return; } - env->CallNonvirtualVoidMethodA(executor, executor_cl_, executor_close_id_, NULL); + env->CallNonvirtualVoidMethodA(executor, executor_cl, executor_close_id, nullptr); env->DeleteGlobalRef(executor); - env->DeleteGlobalRef(executor_cl_); + env->DeleteGlobalRef(executor_cl); Status s = JniUtil::GetJniExceptionMsg(env); - if (!s.ok()) LOG(WARNING) << s; + if (!s.ok()) { + LOG(WARNING) << s; + } is_closed = true; } }; }; -} // namespace vectorized -} // namespace doris +} // namespace doris::vectorized