From b04ec41c1da0c8f4fd671355ef7347ab1bfc111b Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Tue, 22 Nov 2022 20:49:02 +0800 Subject: [PATCH] [Vectorized](udaf) fix java-udaf couldn't get jar core dump (#14393) fix java-udaf couldn't get jar core dump --- be/src/runtime/user_function_cache.cpp | 38 +++++++++++++++++++ be/src/runtime/user_function_cache.h | 1 + .../aggregate_function_java_udaf.h | 16 +++++--- be/src/vec/exprs/vectorized_agg_fn.cpp | 5 ++- .../udf/java-user-defined-function.md | 2 + .../udf/java-user-defined-function.md | 2 + .../test_array_functions.groovy | 4 +- 7 files changed, 59 insertions(+), 9 deletions(-) diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp index 475a8a52e8..6bd202f5d9 100644 --- a/be/src/runtime/user_function_cache.cpp +++ b/be/src/runtime/user_function_cache.cpp @@ -22,6 +22,7 @@ #include #include "common/config.h" +#include "common/status.h" #include "env/env.h" #include "gutil/strings/split.h" #include "http/http_client.h" @@ -432,4 +433,41 @@ Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url, return Status::OK(); } +Status UserFunctionCache::check_jar(int64_t fid, const std::string& url, + const std::string& checksum) { + UserFunctionCacheEntry* entry = nullptr; + Status st = Status::OK(); + { + std::lock_guard l(_cache_lock); + auto it = _entry_map.find(fid); + if (it != _entry_map.end()) { + entry = it->second; + } else { + entry = new UserFunctionCacheEntry( + fid, checksum, _make_lib_file(fid, checksum, LibType::JAR), LibType::JAR); + entry->ref(); + _entry_map.emplace(fid, entry); + } + entry->ref(); + } + if (entry->is_loaded.load()) { + return st; + } + + std::unique_lock l(entry->load_lock); + if (!entry->is_downloaded) { + st = _download_lib(url, entry); + } + if (!st.ok()) { + // if we load a cache entry failed, I think we should delete this entry cache + // even if this cache was valid before. + _destroy_cache_entry(entry); + return Status::InternalError( + "Java UDAF has error, maybe you should check the path about java impl jar, because " + "{}", + st.get_error_msg()); + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/runtime/user_function_cache.h b/be/src/runtime/user_function_cache.h index 256a13c8c0..1d1decf1cd 100644 --- a/be/src/runtime/user_function_cache.h +++ b/be/src/runtime/user_function_cache.h @@ -69,6 +69,7 @@ public: Status get_jarpath(int64_t fid, const std::string& url, const std::string& checksum, std::string* libpath); + Status check_jar(int64_t fid, const std::string& url, const std::string& checksum); private: Status _load_cached_lib(); diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index f61d40443a..113277498c 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -313,6 +313,12 @@ public: const Array& parameters, const DataTypePtr& return_type) { return std::make_shared(fn, argument_types, parameters, return_type); } + //Note: The condition is added because maybe the BE can't find java-udaf impl jar + //So need to check as soon as possible, before call Data function + Status check_udaf(const TFunction& fn) { + auto function_cache = UserFunctionCache::instance(); + return function_cache->check_jar(fn.id, fn.hdfs_location, fn.checksum); + } void create(AggregateDataPtr __restrict place) const override { if (_first_created) { @@ -336,14 +342,14 @@ public: DataTypePtr get_return_type() const override { return _return_type; } - void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num, + void add(AggregateDataPtr __restrict /*place*/, const IColumn** /*columns*/, size_t /*row_num*/, Arena*) const override { LOG(WARNING) << " shouldn't going add function, there maybe some error about function " << _fn.name.function_name; } - void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset, - const IColumn** columns, Arena* arena, bool /*agg_many*/) const override { + void add_batch(size_t batch_size, AggregateDataPtr* places, size_t /*place_offset*/, + const IColumn** columns, Arena* /*arena*/, bool /*agg_many*/) const override { int64_t places_address[batch_size]; for (size_t i = 0; i < batch_size; ++i) { places_address[i] = reinterpret_cast(places[i]); @@ -354,14 +360,14 @@ public: // TODO: Here we calling method by jni, And if we get a thrown from FE, // But can't let user known the error, only return directly and output error to log file. void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns, - Arena* arena) const override { + Arena* /*arena*/) const override { int64_t places_address[1]; places_address[0] = reinterpret_cast(place); this->data(_exec_place).add(places_address, true, columns, 0, batch_size, argument_types); } // TODO: reset function should be implement also in struct data - void reset(AggregateDataPtr place) const override { + void reset(AggregateDataPtr /*place*/) const override { LOG(WARNING) << " shouldn't going reset function, there maybe some error about function " << _fn.name.function_name; } diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index badf925c3b..8f7527a15e 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -116,10 +116,11 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc, M if (_fn.binary_type == TFunctionBinaryType::JAVA_UDF) { if (config::enable_java_support) { _function = AggregateJavaUdaf::create(_fn, argument_types, {}, _data_type); + RETURN_IF_ERROR(static_cast(_function.get())->check_udaf(_fn)); } else { return Status::InternalError( - "Java UDF is not enabled, you can change be config enable_java_support to true " - "and restart be."); + "Java UDAF is not enabled, you can change be config enable_java_support to " + "true and restart be."); } } else if (_fn.binary_type == TFunctionBinaryType::RPC) { _function = AggregateRpcUdaf::create(_fn, argument_types, {}, _data_type); diff --git a/docs/en/docs/ecosystem/udf/java-user-defined-function.md b/docs/en/docs/ecosystem/udf/java-user-defined-function.md index 0de85207ad..17e540c968 100644 --- a/docs/en/docs/ecosystem/udf/java-user-defined-function.md +++ b/docs/en/docs/ecosystem/udf/java-user-defined-function.md @@ -163,6 +163,8 @@ CREATE AGGREGATE FUNCTION simple_sum(INT) RETURNS INT PROPERTIES ( "type"="JAVA_UDF" ); ``` +* The implemented jar package can be stored at local or in a remote server and downloaded via http, And each BE node must be able to obtain the jar package; +Otherwise, the error status message "Couldn't open file..." will be returned Currently, UDTF are not supported. diff --git a/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md b/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md index 70a63edb63..73b548640d 100644 --- a/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md +++ b/docs/zh-CN/docs/ecosystem/udf/java-user-defined-function.md @@ -162,6 +162,8 @@ CREATE AGGREGATE FUNCTION simple_sum(int) RETURNS int PROPERTIES ( "type"="JAVA_UDF" ); ``` +* 实现的jar包可以放在本地也可以存放在远程服务端通过http下载,但必须让每个BE节点都能获取到jar包; +否则将会返回错误状态信息"Couldn't open file ......". 目前还暂不支持UDTF diff --git a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy index 9f29b2c2fd..07ea61f759 100644 --- a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy +++ b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy @@ -136,8 +136,8 @@ suite("test_array_functions") { "storage_format" = "V2" ); """ - sql """ insert into ${tableName3} values (10005,'四年级三班',[10005,null,null]) """ - sql """ insert into ${tableName3} values (10006,'六年级一班',[60002,60002,60003,null,60005]) """ + sql """ insert into ${tableName3} values (10005,'aaaaa',[10005,null,null]) """ + sql """ insert into ${tableName3} values (10006,'bbbbb',[60002,60002,60003,null,60005]) """ qt_select_union "select class_id, student_ids, array_union(student_ids,[1,2,3]) from ${tableName3} order by class_id;" qt_select_except "select class_id, student_ids, array_except(student_ids,[1,2,3]) from ${tableName3} order by class_id;"