[vectorized](udf) java udf support with return map type (#22300)
This commit is contained in:
@ -78,6 +78,8 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
|
||||
jni_env->executor_cl, "copyBatchBasicResult", "(ZI[Ljava/lang/Object;JJJ)V");
|
||||
jni_env->executor_result_array_batch_id = env->GetMethodID(
|
||||
jni_env->executor_cl, "copyBatchArrayResult", "(ZI[Ljava/lang/Object;JJJJJ)V");
|
||||
jni_env->executor_result_map_batch_id = env->GetMethodID(
|
||||
jni_env->executor_cl, "copyBatchMapResult", "(ZI[Ljava/lang/Object;JJJJJJJJ)V");
|
||||
jni_env->executor_close_id =
|
||||
env->GetMethodID(jni_env->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE);
|
||||
RETURN_ERROR_IF_EXC(env);
|
||||
@ -150,7 +152,7 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
|
||||
ColumnPtr null_cols[arg_size];
|
||||
jclass obj_class = env->FindClass("[Ljava/lang/Object;");
|
||||
jclass arraylist_class = env->FindClass("Ljava/util/ArrayList;");
|
||||
// jclass hashmap_class = env->FindClass("Ljava/util/HashMap;");
|
||||
jclass hashmap_class = env->FindClass("Ljava/util/HashMap;");
|
||||
jobjectArray arg_objects = env->NewObjectArray(arg_size, obj_class, nullptr);
|
||||
int64_t nullmap_address = 0;
|
||||
for (size_t arg_idx = 0; arg_idx < arg_size; ++arg_idx) {
|
||||
@ -352,6 +354,69 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
|
||||
jni_ctx->executor, jni_env->executor_cl, jni_env->executor_result_array_batch_id,
|
||||
result_nullable, num_rows, result_obj, nullmap_address, offset_address,
|
||||
nested_nullmap_address, nested_data_address, nested_offset_address);
|
||||
} else if (res_col->is_column_map()) {
|
||||
ColumnMap* map_col = assert_cast<ColumnMap*>(res_col.get());
|
||||
auto& offset_column = map_col->get_offsets_column();
|
||||
auto offset_address = reinterpret_cast<int64_t>(offset_column.get_raw_data().data);
|
||||
ColumnNullable& map_key_column_nullable = assert_cast<ColumnNullable&>(map_col->get_keys());
|
||||
auto key_data_column_null_map = map_key_column_nullable.get_null_map_column_ptr();
|
||||
auto key_data_column = map_key_column_nullable.get_nested_column_ptr();
|
||||
auto& key_null_map_data =
|
||||
assert_cast<ColumnVector<UInt8>*>(key_data_column_null_map.get())->get_data();
|
||||
auto key_nested_nullmap_address = reinterpret_cast<int64_t>(key_null_map_data.data());
|
||||
int64_t key_nested_data_address = 0, key_nested_offset_address = 0;
|
||||
if (key_data_column->is_column_string()) {
|
||||
ColumnString* str_col = assert_cast<ColumnString*>(key_data_column.get());
|
||||
ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars());
|
||||
ColumnString::Offsets& offsets =
|
||||
assert_cast<ColumnString::Offsets&>(str_col->get_offsets());
|
||||
key_nested_data_address = reinterpret_cast<int64_t>(&chars);
|
||||
key_nested_offset_address = reinterpret_cast<int64_t>(offsets.data());
|
||||
} else {
|
||||
key_nested_data_address =
|
||||
reinterpret_cast<int64_t>(key_data_column->get_raw_data().data);
|
||||
}
|
||||
|
||||
ColumnNullable& map_value_column_nullable =
|
||||
assert_cast<ColumnNullable&>(map_col->get_values());
|
||||
auto value_data_column_null_map = map_value_column_nullable.get_null_map_column_ptr();
|
||||
auto value_data_column = map_value_column_nullable.get_nested_column_ptr();
|
||||
auto& value_null_map_data =
|
||||
assert_cast<ColumnVector<UInt8>*>(value_data_column_null_map.get())->get_data();
|
||||
auto value_nested_nullmap_address = reinterpret_cast<int64_t>(value_null_map_data.data());
|
||||
int64_t value_nested_data_address = 0, value_nested_offset_address = 0;
|
||||
// array type need pass address: [nullmap_address], offset_address, nested_nullmap_address, nested_data_address/nested_char_address,nested_offset_address
|
||||
if (value_data_column->is_column_string()) {
|
||||
ColumnString* str_col = assert_cast<ColumnString*>(value_data_column.get());
|
||||
ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars());
|
||||
ColumnString::Offsets& offsets =
|
||||
assert_cast<ColumnString::Offsets&>(str_col->get_offsets());
|
||||
value_nested_data_address = reinterpret_cast<int64_t>(&chars);
|
||||
value_nested_offset_address = reinterpret_cast<int64_t>(offsets.data());
|
||||
} else {
|
||||
value_nested_data_address =
|
||||
reinterpret_cast<int64_t>(value_data_column->get_raw_data().data);
|
||||
}
|
||||
jmethodID map_size = env->GetMethodID(hashmap_class, "size", "()I");
|
||||
int element_size = 0; // get all element size in num_rows of map column
|
||||
for (int i = 0; i < num_rows; ++i) {
|
||||
jobject obj = env->GetObjectArrayElement(result_obj, i);
|
||||
if (obj == nullptr) {
|
||||
continue;
|
||||
}
|
||||
element_size = element_size + env->CallIntMethod(obj, map_size);
|
||||
env->DeleteLocalRef(obj);
|
||||
}
|
||||
map_key_column_nullable.resize(element_size);
|
||||
memset(key_null_map_data.data(), 0, element_size);
|
||||
map_value_column_nullable.resize(element_size);
|
||||
memset(value_null_map_data.data(), 0, element_size);
|
||||
env->CallNonvirtualVoidMethod(jni_ctx->executor, jni_env->executor_cl,
|
||||
jni_env->executor_result_map_batch_id, result_nullable,
|
||||
num_rows, result_obj, nullmap_address, offset_address,
|
||||
key_nested_nullmap_address, key_nested_data_address,
|
||||
key_nested_offset_address, value_nested_nullmap_address,
|
||||
value_nested_data_address, value_nested_offset_address);
|
||||
} else {
|
||||
return Status::InvalidArgument(strings::Substitute(
|
||||
"Java UDF doesn't support return type $0 now !", return_type->get_name()));
|
||||
@ -359,6 +424,7 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
|
||||
env->DeleteLocalRef(result_obj);
|
||||
env->DeleteLocalRef(obj_class);
|
||||
env->DeleteLocalRef(arraylist_class);
|
||||
env->DeleteLocalRef(hashmap_class);
|
||||
if (result_nullable) {
|
||||
block.replace_by_position(result,
|
||||
ColumnNullable::create(std::move(res_col), std::move(null_col)));
|
||||
|
||||
Reference in New Issue
Block a user