[bugfix](memleak) UserFunctionCache may have memory leak during close (#18913)

* [bugfix](memleak) UserFunctionCache may have memory leak during close

* [bugfix](memleak) UserFunctionCache may have memory leak during close

---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2023-04-22 10:15:51 +08:00
committed by GitHub
parent 04d18eec59
commit c80dc91a78
2 changed files with 28 additions and 121 deletions

View File

@ -32,6 +32,7 @@
#include <vector>
#include "common/config.h"
#include "common/factory_creator.h"
#include "common/status.h"
#include "gutil/strings/split.h"
#include "http/http_client.h"
@ -48,16 +49,12 @@ static const int kLibShardNum = 128;
// function cache entry, store information for
struct UserFunctionCacheEntry {
ENABLE_FACTORY_CREATOR(UserFunctionCacheEntry);
UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_,
LibType type)
: function_id(fid_), checksum(checksum_), lib_file(lib_file_), type(type) {}
~UserFunctionCacheEntry();
void ref() { _refs.fetch_add(1); }
// If unref() returns true, this object should be delete
bool unref() { return _refs.fetch_sub(1) == 1; }
int64_t function_id = 0;
// used to check if this library is valid.
std::string checksum;
@ -88,9 +85,6 @@ struct UserFunctionCacheEntry {
std::unordered_map<std::string, void*> fptr_map;
LibType type;
private:
std::atomic<int> _refs {0};
};
UserFunctionCacheEntry::~UserFunctionCacheEntry() {
@ -114,9 +108,6 @@ UserFunctionCache::~UserFunctionCache() {
while (it != _entry_map.end()) {
auto entry = it->second;
it = _entry_map.erase(it);
if (entry->unref()) {
delete entry;
}
}
}
@ -162,11 +153,9 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std
return Status::InternalError("duplicate function id");
}
// create a cache entry and put it into entry map
UserFunctionCacheEntry* entry =
new UserFunctionCacheEntry(function_id, checksum, dir + "/" + file, lib_type);
std::shared_ptr<UserFunctionCacheEntry> entry = UserFunctionCacheEntry::create_shared(
function_id, checksum, dir + "/" + file, lib_type);
entry->is_downloaded = true;
entry->ref();
_entry_map[function_id] = entry;
return Status::OK();
@ -204,64 +193,11 @@ std::string get_real_symbol(const std::string& symbol) {
return str2;
}
Status UserFunctionCache::get_function_ptr(int64_t fid, const std::string& orig_symbol,
const std::string& url, const std::string& checksum,
void** fn_ptr, UserFunctionCacheEntry** output_entry) {
auto symbol = get_real_symbol(orig_symbol);
if (fid == 0) {
// Just loading a function ptr in the current process. No need to take any locks.
RETURN_IF_ERROR(dynamic_lookup(_current_process_handle, symbol.c_str(), fn_ptr));
return Status::OK();
}
// if we need to unref entry
bool need_unref_entry = false;
UserFunctionCacheEntry* entry = nullptr;
// find the library entry for this function. If *output_entry is not null
// find symbol in it without to get other entry
if (output_entry != nullptr && *output_entry != nullptr) {
entry = *output_entry;
} else {
RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry, LibType::SO));
need_unref_entry = true;
}
Status status;
{
std::lock_guard<SpinLock> l(entry->map_lock);
// now, we have the library entry, we need to lock it to find symbol
auto it = entry->fptr_map.find(symbol);
if (it != entry->fptr_map.end()) {
*fn_ptr = it->second;
} else {
status = dynamic_lookup(entry->lib_handle, symbol.c_str(), fn_ptr);
if (status.ok()) {
entry->fptr_map.emplace(symbol, *fn_ptr);
} else {
LOG(WARNING) << "fail to lookup symbol in library, symbol=" << symbol
<< ", file=" << entry->lib_file;
}
}
}
if (status.ok() && output_entry != nullptr && *output_entry == nullptr) {
*output_entry = entry;
need_unref_entry = false;
}
if (need_unref_entry) {
if (entry->unref()) {
delete entry;
}
}
return status;
}
Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
const std::string& checksum,
UserFunctionCacheEntry** output_entry, LibType type) {
UserFunctionCacheEntry* entry = nullptr;
std::shared_ptr<UserFunctionCacheEntry>& output_entry,
LibType type) {
std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
std::string file_name = _get_file_name_from_url(url);
{
std::lock_guard<std::mutex> l(_cache_lock);
@ -269,12 +205,10 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
if (it != _entry_map.end()) {
entry = it->second;
} else {
entry = new UserFunctionCacheEntry(
entry = UserFunctionCacheEntry::create_shared(
fid, checksum, _make_lib_file(fid, checksum, type, file_name), type);
entry->ref();
_entry_map.emplace(fid, entry);
}
entry->ref();
}
auto st = _load_cache_entry(url, entry);
if (!st.ok()) {
@ -285,28 +219,21 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
return st;
}
*output_entry = entry;
output_entry = entry;
return Status::OK();
}
void UserFunctionCache::_destroy_cache_entry(UserFunctionCacheEntry* entry) {
void UserFunctionCache::_destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry) {
// 1. we remove cache entry from entry map
size_t num_removed = 0;
{
std::lock_guard<std::mutex> l(_cache_lock);
num_removed = _entry_map.erase(entry->function_id);
}
if (num_removed > 0) {
entry->unref();
}
std::lock_guard<std::mutex> l(_cache_lock);
// set should delete flag to true, so that the jar file will be removed when
// the entry is removed from map, and deconstruct method is called.
entry->should_delete_library.store(true);
// now we need to drop
if (entry->unref()) {
delete entry;
}
_entry_map.erase(entry->function_id);
}
Status UserFunctionCache::_load_cache_entry(const std::string& url, UserFunctionCacheEntry* entry) {
Status UserFunctionCache::_load_cache_entry(const std::string& url,
std::shared_ptr<UserFunctionCacheEntry> entry) {
if (entry->is_loaded.load()) {
return Status::OK();
}
@ -326,7 +253,8 @@ Status UserFunctionCache::_load_cache_entry(const std::string& url, UserFunction
}
// entry's lock must be held
Status UserFunctionCache::_download_lib(const std::string& url, UserFunctionCacheEntry* entry) {
Status UserFunctionCache::_download_lib(const std::string& url,
std::shared_ptr<UserFunctionCacheEntry> entry) {
DCHECK(!entry->is_downloaded);
// get local path to save library
@ -399,7 +327,8 @@ std::string UserFunctionCache::_get_file_name_from_url(const std::string& url) c
}
// entry's lock must be held
Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry* entry) {
Status UserFunctionCache::_load_cache_entry_internal(
std::shared_ptr<UserFunctionCacheEntry> entry) {
RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle));
entry->is_loaded.store(true);
return Status::OK();
@ -418,19 +347,10 @@ std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::st
return ss.str();
}
void UserFunctionCache::release_entry(UserFunctionCacheEntry* entry) {
if (entry == nullptr) {
return;
}
if (entry->unref()) {
delete entry;
}
}
Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url,
const std::string& checksum, std::string* libpath) {
UserFunctionCacheEntry* entry = nullptr;
RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry, LibType::JAR));
std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::JAR));
*libpath = entry->lib_file;
return Status::OK();
}