[Bug](javaudf) fix BE crash if javaudf is push down (#21139)
This commit is contained in:
@ -54,16 +54,24 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
|
||||
if (env == nullptr) {
|
||||
return Status::InternalError("Failed to get/create JVM");
|
||||
}
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &executor_cl_));
|
||||
executor_ctor_id_ = env->GetMethodID(executor_cl_, "<init>", EXECUTOR_CTOR_SIGNATURE);
|
||||
RETURN_ERROR_IF_EXC(env);
|
||||
executor_evaluate_id_ = env->GetMethodID(executor_cl_, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
|
||||
RETURN_ERROR_IF_EXC(env);
|
||||
executor_close_id_ = env->GetMethodID(executor_cl_, "close", EXECUTOR_CLOSE_SIGNATURE);
|
||||
RETURN_ERROR_IF_EXC(env);
|
||||
|
||||
std::shared_ptr<JniContext> jni_ctx =
|
||||
std::make_shared<JniContext>(_argument_types.size(), this);
|
||||
if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
|
||||
std::shared_ptr<JniEnv> jni_env = std::make_shared<JniEnv>();
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &jni_env->executor_cl));
|
||||
jni_env->executor_ctor_id =
|
||||
env->GetMethodID(jni_env->executor_cl, "<init>", EXECUTOR_CTOR_SIGNATURE);
|
||||
RETURN_ERROR_IF_EXC(env);
|
||||
jni_env->executor_evaluate_id =
|
||||
env->GetMethodID(jni_env->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
|
||||
RETURN_ERROR_IF_EXC(env);
|
||||
jni_env->executor_close_id =
|
||||
env->GetMethodID(jni_env->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE);
|
||||
RETURN_ERROR_IF_EXC(env);
|
||||
context->set_function_state(FunctionContext::FRAGMENT_LOCAL, jni_env);
|
||||
}
|
||||
JniEnv* jni_env =
|
||||
reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
|
||||
std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>(
|
||||
_argument_types.size(), jni_env->executor_cl, jni_env->executor_close_id);
|
||||
context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
|
||||
|
||||
// Add a scoped cleanup jni reference object. This cleans up local refs made below.
|
||||
@ -99,7 +107,9 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
|
||||
RETURN_IF_ERROR(jni_frame.push(env));
|
||||
|
||||
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
|
||||
jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
|
||||
|
||||
jni_ctx->executor =
|
||||
env->NewObject(jni_env->executor_cl, jni_env->executor_ctor_id, ctor_params_bytes);
|
||||
|
||||
jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
|
||||
env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
|
||||
@ -118,6 +128,8 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
|
||||
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
||||
JniContext* jni_ctx = reinterpret_cast<JniContext*>(
|
||||
context->get_function_state(FunctionContext::THREAD_LOCAL));
|
||||
JniEnv* jni_env =
|
||||
reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
|
||||
int arg_idx = 0;
|
||||
ColumnPtr data_cols[arguments.size()];
|
||||
ColumnPtr null_cols[arguments.size()];
|
||||
@ -192,105 +204,105 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
|
||||
|
||||
*(jni_ctx->output_null_value) = reinterpret_cast<int64_t>(null_col->get_data().data());
|
||||
#ifndef EVALUATE_JAVA_UDF
|
||||
#define EVALUATE_JAVA_UDF \
|
||||
if (data_col->is_column_string()) { \
|
||||
const ColumnString* str_col = assert_cast<const ColumnString*>(data_col.get()); \
|
||||
ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars()); \
|
||||
ColumnString::Offsets& offsets = \
|
||||
const_cast<ColumnString::Offsets&>(str_col->get_offsets()); \
|
||||
int increase_buffer_size = 0; \
|
||||
int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
|
||||
chars.resize(buffer_size); \
|
||||
offsets.resize(num_rows); \
|
||||
*(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
|
||||
*(jni_ctx->output_offsets_ptr) = reinterpret_cast<int64_t>(offsets.data()); \
|
||||
jni_ctx->output_intermediate_state_ptr->row_idx = 0; \
|
||||
jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
|
||||
nullptr); \
|
||||
while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
|
||||
increase_buffer_size++; \
|
||||
buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
|
||||
chars.resize(buffer_size); \
|
||||
*(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
|
||||
jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
|
||||
nullptr); \
|
||||
} \
|
||||
} else if (data_col->is_numeric() || data_col->is_column_decimal()) { \
|
||||
data_col->resize(num_rows); \
|
||||
*(jni_ctx->output_value_buffer) = \
|
||||
reinterpret_cast<int64_t>(data_col->get_raw_data().data); \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
|
||||
nullptr); \
|
||||
} else if (data_col->is_column_array()) { \
|
||||
ColumnArray* array_col = assert_cast<ColumnArray*>(data_col.get()); \
|
||||
ColumnNullable& array_nested_nullable = \
|
||||
assert_cast<ColumnNullable&>(array_col->get_data()); \
|
||||
auto data_column_null_map = array_nested_nullable.get_null_map_column_ptr(); \
|
||||
auto data_column = array_nested_nullable.get_nested_column_ptr(); \
|
||||
auto& offset_column = array_col->get_offsets_column(); \
|
||||
int increase_buffer_size = 0; \
|
||||
int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
|
||||
offset_column.resize(num_rows); \
|
||||
*(jni_ctx->output_offsets_ptr) = \
|
||||
reinterpret_cast<int64_t>(offset_column.get_raw_data().data); \
|
||||
data_column_null_map->resize(buffer_size); \
|
||||
auto& null_map_data = \
|
||||
assert_cast<ColumnVector<UInt8>*>(data_column_null_map.get())->get_data(); \
|
||||
*(jni_ctx->output_array_null_ptr) = reinterpret_cast<int64_t>(null_map_data.data()); \
|
||||
jni_ctx->output_intermediate_state_ptr->row_idx = 0; \
|
||||
jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
|
||||
if (data_column->is_column_string()) { \
|
||||
ColumnString* str_col = assert_cast<ColumnString*>(data_column.get()); \
|
||||
ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars()); \
|
||||
ColumnString::Offsets& offsets = \
|
||||
assert_cast<ColumnString::Offsets&>(str_col->get_offsets()); \
|
||||
chars.resize(buffer_size); \
|
||||
offsets.resize(buffer_size); \
|
||||
*(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
|
||||
*(jni_ctx->output_array_string_offsets_ptr) = \
|
||||
reinterpret_cast<int64_t>(offsets.data()); \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
|
||||
nullptr); \
|
||||
while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
|
||||
increase_buffer_size++; \
|
||||
buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
|
||||
null_map_data.resize(buffer_size); \
|
||||
chars.resize(buffer_size); \
|
||||
offsets.resize(buffer_size); \
|
||||
*(jni_ctx->output_array_null_ptr) = \
|
||||
reinterpret_cast<int64_t>(null_map_data.data()); \
|
||||
*(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
|
||||
*(jni_ctx->output_array_string_offsets_ptr) = \
|
||||
reinterpret_cast<int64_t>(offsets.data()); \
|
||||
jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, \
|
||||
executor_evaluate_id_, nullptr); \
|
||||
} \
|
||||
} else { \
|
||||
data_column->resize(buffer_size); \
|
||||
*(jni_ctx->output_value_buffer) = \
|
||||
reinterpret_cast<int64_t>(data_column->get_raw_data().data); \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
|
||||
nullptr); \
|
||||
while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
|
||||
increase_buffer_size++; \
|
||||
buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
|
||||
null_map_data.resize(buffer_size); \
|
||||
data_column->resize(buffer_size); \
|
||||
*(jni_ctx->output_array_null_ptr) = \
|
||||
reinterpret_cast<int64_t>(null_map_data.data()); \
|
||||
*(jni_ctx->output_value_buffer) = \
|
||||
reinterpret_cast<int64_t>(data_column->get_raw_data().data); \
|
||||
jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, \
|
||||
executor_evaluate_id_, nullptr); \
|
||||
} \
|
||||
} \
|
||||
} else { \
|
||||
return Status::InvalidArgument(strings::Substitute( \
|
||||
"Java UDF doesn't support return type $0 now !", return_type->get_name())); \
|
||||
#define EVALUATE_JAVA_UDF \
|
||||
if (data_col->is_column_string()) { \
|
||||
const ColumnString* str_col = assert_cast<const ColumnString*>(data_col.get()); \
|
||||
ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars()); \
|
||||
ColumnString::Offsets& offsets = \
|
||||
const_cast<ColumnString::Offsets&>(str_col->get_offsets()); \
|
||||
int increase_buffer_size = 0; \
|
||||
int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
|
||||
chars.resize(buffer_size); \
|
||||
offsets.resize(num_rows); \
|
||||
*(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
|
||||
*(jni_ctx->output_offsets_ptr) = reinterpret_cast<int64_t>(offsets.data()); \
|
||||
jni_ctx->output_intermediate_state_ptr->row_idx = 0; \
|
||||
jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
|
||||
jni_env->executor_evaluate_id, nullptr); \
|
||||
while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
|
||||
increase_buffer_size++; \
|
||||
buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
|
||||
chars.resize(buffer_size); \
|
||||
*(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
|
||||
jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
|
||||
jni_env->executor_evaluate_id, nullptr); \
|
||||
} \
|
||||
} else if (data_col->is_numeric() || data_col->is_column_decimal()) { \
|
||||
data_col->resize(num_rows); \
|
||||
*(jni_ctx->output_value_buffer) = \
|
||||
reinterpret_cast<int64_t>(data_col->get_raw_data().data); \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
|
||||
jni_env->executor_evaluate_id, nullptr); \
|
||||
} else if (data_col->is_column_array()) { \
|
||||
ColumnArray* array_col = assert_cast<ColumnArray*>(data_col.get()); \
|
||||
ColumnNullable& array_nested_nullable = \
|
||||
assert_cast<ColumnNullable&>(array_col->get_data()); \
|
||||
auto data_column_null_map = array_nested_nullable.get_null_map_column_ptr(); \
|
||||
auto data_column = array_nested_nullable.get_nested_column_ptr(); \
|
||||
auto& offset_column = array_col->get_offsets_column(); \
|
||||
int increase_buffer_size = 0; \
|
||||
int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
|
||||
offset_column.resize(num_rows); \
|
||||
*(jni_ctx->output_offsets_ptr) = \
|
||||
reinterpret_cast<int64_t>(offset_column.get_raw_data().data); \
|
||||
data_column_null_map->resize(buffer_size); \
|
||||
auto& null_map_data = \
|
||||
assert_cast<ColumnVector<UInt8>*>(data_column_null_map.get())->get_data(); \
|
||||
*(jni_ctx->output_array_null_ptr) = reinterpret_cast<int64_t>(null_map_data.data()); \
|
||||
jni_ctx->output_intermediate_state_ptr->row_idx = 0; \
|
||||
jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
|
||||
if (data_column->is_column_string()) { \
|
||||
ColumnString* str_col = assert_cast<ColumnString*>(data_column.get()); \
|
||||
ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars()); \
|
||||
ColumnString::Offsets& offsets = \
|
||||
assert_cast<ColumnString::Offsets&>(str_col->get_offsets()); \
|
||||
chars.resize(buffer_size); \
|
||||
offsets.resize(buffer_size); \
|
||||
*(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
|
||||
*(jni_ctx->output_array_string_offsets_ptr) = \
|
||||
reinterpret_cast<int64_t>(offsets.data()); \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
|
||||
jni_env->executor_evaluate_id, nullptr); \
|
||||
while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
|
||||
increase_buffer_size++; \
|
||||
buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
|
||||
null_map_data.resize(buffer_size); \
|
||||
chars.resize(buffer_size); \
|
||||
offsets.resize(buffer_size); \
|
||||
*(jni_ctx->output_array_null_ptr) = \
|
||||
reinterpret_cast<int64_t>(null_map_data.data()); \
|
||||
*(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
|
||||
*(jni_ctx->output_array_string_offsets_ptr) = \
|
||||
reinterpret_cast<int64_t>(offsets.data()); \
|
||||
jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
|
||||
jni_env->executor_evaluate_id, nullptr); \
|
||||
} \
|
||||
} else { \
|
||||
data_column->resize(buffer_size); \
|
||||
*(jni_ctx->output_value_buffer) = \
|
||||
reinterpret_cast<int64_t>(data_column->get_raw_data().data); \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
|
||||
jni_env->executor_evaluate_id, nullptr); \
|
||||
while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
|
||||
increase_buffer_size++; \
|
||||
buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
|
||||
null_map_data.resize(buffer_size); \
|
||||
data_column->resize(buffer_size); \
|
||||
*(jni_ctx->output_array_null_ptr) = \
|
||||
reinterpret_cast<int64_t>(null_map_data.data()); \
|
||||
*(jni_ctx->output_value_buffer) = \
|
||||
reinterpret_cast<int64_t>(data_column->get_raw_data().data); \
|
||||
jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
|
||||
env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
|
||||
jni_env->executor_evaluate_id, nullptr); \
|
||||
} \
|
||||
} \
|
||||
} else { \
|
||||
return Status::InvalidArgument(strings::Substitute( \
|
||||
"Java UDF doesn't support return type $0 now !", return_type->get_name())); \
|
||||
}
|
||||
#endif
|
||||
EVALUATE_JAVA_UDF;
|
||||
|
||||
Reference in New Issue
Block a user