[bug](udaf) fix memory leak in the java udaf (#32630)

fix memory leak in the java udaf
This commit is contained in:
zhangstar333
2024-03-22 19:43:14 +08:00
committed by yiguolei
parent c223d9e7d0
commit a55b5ea9ca

View File

@ -59,18 +59,28 @@ public:
AggregateJavaUdafData() = default;
AggregateJavaUdafData(int64_t num_args) { argument_size = num_args; }
~AggregateJavaUdafData() {
~AggregateJavaUdafData() = default;
Status close_and_delete_object() {
JNIEnv* env = nullptr;
if (!JniUtil::GetJNIEnv(&env).ok()) {
Defer defer {[&]() {
if (env != nullptr) {
env->DeleteGlobalRef(executor_cl);
env->DeleteGlobalRef(executor_obj);
}
}};
Status st = JniUtil::GetJNIEnv(&env);
if (!st.ok()) {
LOG(WARNING) << "Failed to get JNIEnv";
return st;
}
env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
Status st = JniUtil::GetJniExceptionMsg(env);
st = JniUtil::GetJniExceptionMsg(env);
if (!st.ok()) {
LOG(WARNING) << "Failed to close JAVA UDAF: " << st.to_string();
return st;
}
env->DeleteGlobalRef(executor_cl);
env->DeleteGlobalRef(executor_obj);
return Status::OK();
}
Status init_udaf(const TFunction& fn, const std::string& local_location) {
@ -268,8 +278,8 @@ public:
}
void create(AggregateDataPtr __restrict place) const override {
new (place) Data(argument_types.size());
if (_first_created) {
new (place) Data(argument_types.size());
Status status = Status::OK();
SAFE_CREATE(RETURN_IF_STATUS_ERROR(status,
this->data(place).init_udaf(_fn, _local_location)),
@ -279,16 +289,24 @@ public:
});
_first_created = false;
_exec_place = place;
if (UNLIKELY(!status.ok())) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, status.to_string());
}
}
}
// To avoid multiple times JNI call, Here will destroy all data at once
void destroy(AggregateDataPtr __restrict place) const noexcept override {
if (place == _exec_place) {
static_cast<void>(this->data(_exec_place).destroy());
this->data(_exec_place).~Data();
Status status = Status::OK();
status = this->data(_exec_place).destroy();
status = this->data(_exec_place).close_and_delete_object();
_first_created = true;
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Failed to destroy function: " << status.to_string();
}
}
this->data(place).~Data();
}
String get_name() const override { return _fn.name.function_name; }
@ -372,7 +390,6 @@ public:
// so it's can't call ~Data, only to change _destory_deserialize flag.
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena*) const override {
new (place) Data(argument_types.size());
this->data(place).read(buf);
}