[Vectorized](udaf) fix java-udaf couldn't get jar core dump (#14393)
fix java-udaf couldn't get jar core dump
This commit is contained in:
@ -22,6 +22,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "common/config.h"
|
||||
#include "common/status.h"
|
||||
#include "env/env.h"
|
||||
#include "gutil/strings/split.h"
|
||||
#include "http/http_client.h"
|
||||
@ -432,4 +433,41 @@ Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status UserFunctionCache::check_jar(int64_t fid, const std::string& url,
|
||||
const std::string& checksum) {
|
||||
UserFunctionCacheEntry* entry = nullptr;
|
||||
Status st = Status::OK();
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_cache_lock);
|
||||
auto it = _entry_map.find(fid);
|
||||
if (it != _entry_map.end()) {
|
||||
entry = it->second;
|
||||
} else {
|
||||
entry = new UserFunctionCacheEntry(
|
||||
fid, checksum, _make_lib_file(fid, checksum, LibType::JAR), LibType::JAR);
|
||||
entry->ref();
|
||||
_entry_map.emplace(fid, entry);
|
||||
}
|
||||
entry->ref();
|
||||
}
|
||||
if (entry->is_loaded.load()) {
|
||||
return st;
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> l(entry->load_lock);
|
||||
if (!entry->is_downloaded) {
|
||||
st = _download_lib(url, entry);
|
||||
}
|
||||
if (!st.ok()) {
|
||||
// if we load a cache entry failed, I think we should delete this entry cache
|
||||
// even if this cache was valid before.
|
||||
_destroy_cache_entry(entry);
|
||||
return Status::InternalError(
|
||||
"Java UDAF has error, maybe you should check the path about java impl jar, because "
|
||||
"{}",
|
||||
st.get_error_msg());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -69,6 +69,7 @@ public:
|
||||
|
||||
Status get_jarpath(int64_t fid, const std::string& url, const std::string& checksum,
|
||||
std::string* libpath);
|
||||
Status check_jar(int64_t fid, const std::string& url, const std::string& checksum);
|
||||
|
||||
private:
|
||||
Status _load_cached_lib();
|
||||
|
||||
@ -313,6 +313,12 @@ public:
|
||||
const Array& parameters, const DataTypePtr& return_type) {
|
||||
return std::make_shared<AggregateJavaUdaf>(fn, argument_types, parameters, return_type);
|
||||
}
|
||||
//Note: The condition is added because maybe the BE can't find java-udaf impl jar
|
||||
//So need to check as soon as possible, before call Data function
|
||||
Status check_udaf(const TFunction& fn) {
|
||||
auto function_cache = UserFunctionCache::instance();
|
||||
return function_cache->check_jar(fn.id, fn.hdfs_location, fn.checksum);
|
||||
}
|
||||
|
||||
void create(AggregateDataPtr __restrict place) const override {
|
||||
if (_first_created) {
|
||||
@ -336,14 +342,14 @@ public:
|
||||
|
||||
DataTypePtr get_return_type() const override { return _return_type; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num,
|
||||
void add(AggregateDataPtr __restrict /*place*/, const IColumn** /*columns*/, size_t /*row_num*/,
|
||||
Arena*) const override {
|
||||
LOG(WARNING) << " shouldn't going add function, there maybe some error about function "
|
||||
<< _fn.name.function_name;
|
||||
}
|
||||
|
||||
void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset,
|
||||
const IColumn** columns, Arena* arena, bool /*agg_many*/) const override {
|
||||
void add_batch(size_t batch_size, AggregateDataPtr* places, size_t /*place_offset*/,
|
||||
const IColumn** columns, Arena* /*arena*/, bool /*agg_many*/) const override {
|
||||
int64_t places_address[batch_size];
|
||||
for (size_t i = 0; i < batch_size; ++i) {
|
||||
places_address[i] = reinterpret_cast<int64_t>(places[i]);
|
||||
@ -354,14 +360,14 @@ public:
|
||||
// TODO: Here we calling method by jni, And if we get a thrown from FE,
|
||||
// But can't let user known the error, only return directly and output error to log file.
|
||||
void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
|
||||
Arena* arena) const override {
|
||||
Arena* /*arena*/) const override {
|
||||
int64_t places_address[1];
|
||||
places_address[0] = reinterpret_cast<int64_t>(place);
|
||||
this->data(_exec_place).add(places_address, true, columns, 0, batch_size, argument_types);
|
||||
}
|
||||
|
||||
// TODO: reset function should be implement also in struct data
|
||||
void reset(AggregateDataPtr place) const override {
|
||||
void reset(AggregateDataPtr /*place*/) const override {
|
||||
LOG(WARNING) << " shouldn't going reset function, there maybe some error about function "
|
||||
<< _fn.name.function_name;
|
||||
}
|
||||
|
||||
@ -116,10 +116,11 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc, M
|
||||
if (_fn.binary_type == TFunctionBinaryType::JAVA_UDF) {
|
||||
if (config::enable_java_support) {
|
||||
_function = AggregateJavaUdaf::create(_fn, argument_types, {}, _data_type);
|
||||
RETURN_IF_ERROR(static_cast<AggregateJavaUdaf*>(_function.get())->check_udaf(_fn));
|
||||
} else {
|
||||
return Status::InternalError(
|
||||
"Java UDF is not enabled, you can change be config enable_java_support to true "
|
||||
"and restart be.");
|
||||
"Java UDAF is not enabled, you can change be config enable_java_support to "
|
||||
"true and restart be.");
|
||||
}
|
||||
} else if (_fn.binary_type == TFunctionBinaryType::RPC) {
|
||||
_function = AggregateRpcUdaf::create(_fn, argument_types, {}, _data_type);
|
||||
|
||||
@ -163,6 +163,8 @@ CREATE AGGREGATE FUNCTION simple_sum(INT) RETURNS INT PROPERTIES (
|
||||
"type"="JAVA_UDF"
|
||||
);
|
||||
```
|
||||
* The implemented jar package can be stored at local or in a remote server and downloaded via http, And each BE node must be able to obtain the jar package;
|
||||
Otherwise, the error status message "Couldn't open file..." will be returned
|
||||
|
||||
Currently, UDTF are not supported.
|
||||
|
||||
|
||||
@ -162,6 +162,8 @@ CREATE AGGREGATE FUNCTION simple_sum(int) RETURNS int PROPERTIES (
|
||||
"type"="JAVA_UDF"
|
||||
);
|
||||
```
|
||||
* 实现的jar包可以放在本地也可以存放在远程服务端通过http下载,但必须让每个BE节点都能获取到jar包;
|
||||
否则将会返回错误状态信息"Couldn't open file ......".
|
||||
|
||||
目前还暂不支持UDTF
|
||||
|
||||
|
||||
@ -136,8 +136,8 @@ suite("test_array_functions") {
|
||||
"storage_format" = "V2"
|
||||
);
|
||||
"""
|
||||
sql """ insert into ${tableName3} values (10005,'四年级三班',[10005,null,null]) """
|
||||
sql """ insert into ${tableName3} values (10006,'六年级一班',[60002,60002,60003,null,60005]) """
|
||||
sql """ insert into ${tableName3} values (10005,'aaaaa',[10005,null,null]) """
|
||||
sql """ insert into ${tableName3} values (10006,'bbbbb',[60002,60002,60003,null,60005]) """
|
||||
|
||||
qt_select_union "select class_id, student_ids, array_union(student_ids,[1,2,3]) from ${tableName3} order by class_id;"
|
||||
qt_select_except "select class_id, student_ids, array_except(student_ids,[1,2,3]) from ${tableName3} order by class_id;"
|
||||
|
||||
Reference in New Issue
Block a user