Add UserFunctionCache to cache UDF's library (#453)

* Add UserFunctionCache to cache UDF's library

This patch replace LibCache with UserFunctionCache. LibCache use HDFS
URL to identify a UDF's Library, and when BE process restart all of
downloaded library should be loaded another time. We use function id
corresponding to a library, and when process restart, all downloaded
libraries can be loaded without another downloading.

* update
This commit is contained in:
ZHAO Chun
2018-12-21 22:07:21 +08:00
committed by chenhao
parent 0341ffde67
commit 90d71508ff
38 changed files with 858 additions and 699 deletions

View File

@ -121,6 +121,7 @@ namespace config {
// log dir
CONF_String(sys_log_dir, "${DORIS_HOME}/log");
CONF_String(user_function_dir, "${DORIS_HOME}/lib/usr");
// INFO, WARNING, ERROR, FATAL
CONF_String(sys_log_level, "INFO");
// TIME-DAY, TIME-HOUR, SIZE-MB-nnn

View File

@ -32,7 +32,7 @@
#include "runtime/bufferpool/buffer_pool.h"
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "runtime/lib_cache.h"
#include "runtime/user_function_cache.h"
#include "exprs/operators.h"
#include "exprs/is_null_predicate.h"
#include "exprs/like_predicate.h"
@ -171,7 +171,7 @@ void init_daemon(int argc, char** argv, const std::vector<StorePath>& paths) {
CpuInfo::init();
DiskInfo::init();
MemInfo::init();
LibCache::init();
UserFunctionCache::instance()->init(config::user_function_dir);
Operators::init();
IsNullPredicate::init();
LikePredicate::init();

View File

@ -20,7 +20,7 @@
#include "codegen/llvm_codegen.h"
#include "exprs/anyval_util.h"
#include "runtime/descriptors.h"
#include "runtime/lib_cache.h"
#include "runtime/user_function_cache.h"
#include "runtime/runtime_state.h"
#include "common/names.h"
@ -91,32 +91,39 @@ Status AggFn::Init(const RowDescriptor& row_desc, RuntimeState* state) {
return Status(ss.str());
}
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
aggregate_fn.init_fn_symbol, &init_fn_, &_cache_entry));
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
aggregate_fn.update_fn_symbol, &update_fn_, &_cache_entry));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, aggregate_fn.init_fn_symbol, _fn.hdfs_location, "", &init_fn_, &_cache_entry));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, aggregate_fn.update_fn_symbol, _fn.hdfs_location, "", &update_fn_, &_cache_entry));
// Merge() is not defined for purely analytic function.
if (!aggregate_fn.is_analytic_only_fn) {
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
aggregate_fn.merge_fn_symbol, &merge_fn_, &_cache_entry));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, aggregate_fn.merge_fn_symbol, _fn.hdfs_location, "", &merge_fn_, &_cache_entry));
}
// Serialize(), GetValue(), Remove() and Finalize() are optional
if (!aggregate_fn.serialize_fn_symbol.empty()) {
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
aggregate_fn.serialize_fn_symbol, &serialize_fn_, &_cache_entry));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, aggregate_fn.serialize_fn_symbol,
_fn.hdfs_location, "",
&serialize_fn_, &_cache_entry));
}
if (!aggregate_fn.get_value_fn_symbol.empty()) {
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
aggregate_fn.get_value_fn_symbol, &get_value_fn_, &_cache_entry));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, aggregate_fn.get_value_fn_symbol, _fn.hdfs_location, "",
&get_value_fn_, &_cache_entry));
}
if (!aggregate_fn.remove_fn_symbol.empty()) {
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
aggregate_fn.remove_fn_symbol, &remove_fn_, &_cache_entry));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, aggregate_fn.remove_fn_symbol,
_fn.hdfs_location, "",
&remove_fn_, &_cache_entry));
}
if (!aggregate_fn.finalize_fn_symbol.empty()) {
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
_fn.aggregate_fn.finalize_fn_symbol, &finalize_fn_, &_cache_entry));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, _fn.aggregate_fn.finalize_fn_symbol,
_fn.hdfs_location, "",
&finalize_fn_, &_cache_entry));
}
return Status::OK;
}

View File

@ -24,7 +24,7 @@
#include "exec/aggregation_node.h"
#include "exprs/aggregate_functions.h"
#include "exprs/anyval_util.h"
//#include "runtime/lib_cache.h"
#include "runtime/user_function_cache.h"
#include "udf/udf_internal.h"
#include "util/debug_util.h"
#include "runtime/datetime_value.h"
@ -207,37 +207,38 @@ Status AggFnEvaluator::prepare(
}
// Load the function pointers.
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
_hdfs_location, _fn.aggregate_fn.init_fn_symbol, &_init_fn, NULL, true));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, _fn.aggregate_fn.init_fn_symbol, _hdfs_location, "", &_init_fn, NULL));
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
_hdfs_location, _fn.aggregate_fn.update_fn_symbol, &_update_fn, NULL, true));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, _fn.aggregate_fn.update_fn_symbol, _hdfs_location, "", &_update_fn, NULL));
// Merge() is not loaded if evaluating the agg fn as an analytic function.
if (!_is_analytic_fn) {
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
_hdfs_location, _fn.aggregate_fn.merge_fn_symbol, &_merge_fn, NULL, true));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, _fn.aggregate_fn.merge_fn_symbol, _hdfs_location, "", &_merge_fn, NULL));
}
// Serialize and Finalize are optional
if (!_fn.aggregate_fn.serialize_fn_symbol.empty()) {
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
_hdfs_location, _fn.aggregate_fn.serialize_fn_symbol, &_serialize_fn, NULL, true));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, _fn.aggregate_fn.serialize_fn_symbol, _hdfs_location,
"", &_serialize_fn, NULL));
}
if (!_fn.aggregate_fn.finalize_fn_symbol.empty()) {
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
_hdfs_location, _fn.aggregate_fn.finalize_fn_symbol, &_finalize_fn, NULL, true));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, _fn.aggregate_fn.finalize_fn_symbol, _hdfs_location, "", &_finalize_fn, NULL));
}
if (!_fn.aggregate_fn.get_value_fn_symbol.empty()) {
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
_hdfs_location, _fn.aggregate_fn.get_value_fn_symbol, &_get_value_fn,
NULL, true));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, _fn.aggregate_fn.get_value_fn_symbol, _hdfs_location, "", &_get_value_fn,
NULL));
}
if (!_fn.aggregate_fn.remove_fn_symbol.empty()) {
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
_hdfs_location, _fn.aggregate_fn.remove_fn_symbol, &_remove_fn,
NULL, true));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, _fn.aggregate_fn.remove_fn_symbol, _hdfs_location, "", &_remove_fn,
NULL));
}
vector<FunctionContext::TypeDesc> arg_types;

View File

@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_EXPRS_ARITHMETIC_EXPR_H
#define DORIS_BE_SRC_EXPRS_ARITHMETIC_EXPR_H
#include "common/object_pool.h"
#include "exprs/expr.h"
namespace doris {

View File

@ -20,7 +20,10 @@
#include <string>
#include <iostream>
#include <llvm/IR/InstrTypes.h>
#include "common/object_pool.h"
#include "exprs/predicate.h"
#include "gen_cpp/Exprs_types.h"

View File

@ -20,6 +20,7 @@
#include <string>
#include "expr.h"
#include "common/object_pool.h"
namespace llvm {
class Function;

View File

@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_EXPRS_CAST_EXPR_H
#define DORIS_BE_SRC_EXPRS_CAST_EXPR_H
#include "common/object_pool.h"
#include "exprs/expr.h"
namespace doris {

View File

@ -19,6 +19,8 @@
#define DORIS_BE_SRC_QUERY_EXPRS_COMPOUND_PREDICATE_H
#include <string>
#include "common/object_pool.h"
#include "exprs/predicate.h"
#include "gen_cpp/Exprs_types.h"

View File

@ -19,6 +19,7 @@
#define DORIS_BE_SRC_QUERY_EXPRS_CONDITIONAL_FUNCTIONS_H
#include <stdint.h>
#include "common/object_pool.h"
#include "exprs/expr.h"
#include "udf/udf.h"

View File

@ -48,6 +48,7 @@
#include "gen_cpp/Data_types.h"
#include "runtime/runtime_state.h"
#include "runtime/raw_value.h"
#include "runtime/user_function_cache.h"
#include "util/debug_util.h"
#include "gen_cpp/Exprs_types.h"
@ -1077,11 +1078,15 @@ Status Expr::create_tree_internal(const vector<TExprNode>& nodes, ObjectPool* po
// TODO chenhao
void Expr::close() {
for (Expr* child : _children) child->close();
/*if (_cache_entry != nullptr) {
LibCache::instance()->decrement_use_count(_cache_entry);
_cache_entry = nullptr;
}*/
for (Expr* child : _children) child->close();
/*if (_cache_entry != nullptr) {
LibCache::instance()->decrement_use_count(_cache_entry);
_cache_entry = nullptr;
}*/
if (_cache_entry != nullptr) {
UserFunctionCache::instance()->release_entry(_cache_entry);
_cache_entry = nullptr;
}
}
void Expr::close(const vector<Expr*>& exprs) {

View File

@ -34,7 +34,6 @@
#include "runtime/datetime_value.h"
#include "runtime/decimal_value.h"
#include "udf/udf.h"
#include "runtime/lib_cache.h"
#include "runtime/types.h"
//#include <boost/scoped_ptr.hpp>
//
@ -65,6 +64,7 @@ class TupleIsNullPredicate;
class VectorizedRowBatch;
class Literal;
class MemTracker;
class UserFunctionCacheEntry;
// This is the superclass of all expr evaluation nodes.
class Expr {
@ -396,7 +396,7 @@ protected:
ExprContext* ctx, RuntimeState* state, int varargs_buffer_size);
/// Cache entry for the library implementing this function.
LibCache::LibCacheEntry* _cache_entry;
UserFunctionCacheEntry* _cache_entry;
// function opcode

View File

@ -20,6 +20,7 @@
#include <string>
#include <iostream>
#include "common/object_pool.h"
#include "exprs/expr.h"
#include "gen_cpp/Exprs_types.h"

View File

@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_QUERY_EXPRS_LITERAL_H
#define DORIS_BE_SRC_QUERY_EXPRS_LITERAL_H
#include "common/object_pool.h"
#include "exprs/expr.h"
namespace doris {

View File

@ -28,7 +28,6 @@
#include "exprs/expr.h"
#include "exprs/scalar_fn_call.h"
#include "gutil/strings/substitute.h"
#include "runtime/lib_cache.h"
#include "runtime/mem_tracker.h"
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"

View File

@ -28,7 +28,6 @@
#include "exprs/agg_fn.h"
#include "exprs/hybird_map.h"
#include "runtime/descriptors.h"
#include "runtime/lib_cache.h"
#include "runtime/tuple_row.h"
#include "runtime/types.h"
#include "udf/udf.h"

View File

@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_QUERY_EXPRS_NULL_LITERAL_H
#define DORIS_BE_SRC_QUERY_EXPRS_NULL_LITERAL_H
#include "common/object_pool.h"
#include "exprs/expr.h"
namespace llvm {

View File

@ -25,7 +25,7 @@
#include "codegen/llvm_codegen.h"
#include "exprs/anyval_util.h"
#include "exprs/expr_context.h"
#include "runtime/lib_cache.h"
#include "runtime/user_function_cache.h"
#include "runtime/runtime_state.h"
#include "udf/udf_internal.h"
#include "util/debug_util.h"
@ -89,8 +89,8 @@ Status ScalarFnCall::prepare(
Status status = Status::OK;
if (_scalar_fn == NULL) {
if (SymbolsUtil::is_mangled(_fn.scalar_fn.symbol)) {
status = LibCache::instance()->get_so_function_ptr(
_fn.hdfs_location, _fn.scalar_fn.symbol, &_scalar_fn, &_cache_entry, true);
status = UserFunctionCache::instance()->get_function_ptr(
_fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, "", &_scalar_fn, &_cache_entry);
} else {
std::vector<TypeDescriptor> arg_types;
for (auto& t_type : _fn.arg_types) {
@ -100,8 +100,8 @@ Status ScalarFnCall::prepare(
// ret_type = ColumnType(thrift_to_type(_fn.ret_type));
std::string symbol = SymbolsUtil::mangle_user_function(
_fn.scalar_fn.symbol, arg_types, _fn.has_var_args, NULL);
status = LibCache::instance()->get_so_function_ptr(
_fn.hdfs_location, symbol, &_scalar_fn, &_cache_entry, true);
status = UserFunctionCache::instance()->get_function_ptr(
_fn.id, symbol, _fn.hdfs_location, "", &_scalar_fn, &_cache_entry);
}
}
#if 0
@ -119,7 +119,7 @@ Status ScalarFnCall::prepare(
if (char_arg) {
DCHECK(num_fixed_args() <= 8 && _fn.binary_type == TFunctionBinaryType::BUILTIN);
}
Status status = LibCache::instance()->GetSoFunctionPtr(
Status status = UserFunctionCache::instance()->GetSoFunctionPtr(
_fn.hdfs_location, _fn.scalar_fn.symbol, &_scalar_fn, &cache_entry_);
if (!status.ok()) {
if (_fn.binary_type == TFunctionBinaryType::BUILTIN) {
@ -141,8 +141,8 @@ Status ScalarFnCall::prepare(
if (_fn.binary_type == TFunctionBinaryType::IR) {
std::string local_path;
RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
_fn.hdfs_location, LibCache::TYPE_IR, &local_path));
RETURN_IF_ERROR(UserFunctionCache::instance()->GetLocalLibPath(
_fn.hdfs_location, UserFunctionCache::TYPE_IR, &local_path));
// Link the UDF module into this query's main module (essentially copy the UDF
// module into the main module) so the UDF's functions are available in the main
// module.
@ -425,8 +425,8 @@ Status ScalarFnCall::get_udf(RuntimeState* state, Function** udf) {
// This can either be a UDF implemented in a .so or a builtin using the UDF
// interface with the code in impalad.
void* fn_ptr = NULL;
Status status = LibCache::instance()->get_so_function_ptr(
_fn.hdfs_location, _fn.scalar_fn.symbol, &fn_ptr, &_cache_entry);
Status status = UserFunctionCache::instance()->get_function_ptr(
_fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, "", &fn_ptr, &_cache_entry);
if (!status.ok() && _fn.binary_type == TFunctionBinaryType::BUILTIN) {
// Builtins symbols should exist unless there is a version mismatch.
// TODO(zc )
@ -541,8 +541,8 @@ Status ScalarFnCall::get_udf(RuntimeState* state, Function** udf) {
Status ScalarFnCall::get_function(RuntimeState* state, const std::string& symbol, void** fn) {
if (_fn.binary_type == TFunctionBinaryType::NATIVE
|| _fn.binary_type == TFunctionBinaryType::BUILTIN) {
return LibCache::instance()->get_so_function_ptr(
_fn.hdfs_location, symbol, fn, &_cache_entry, true);
return UserFunctionCache::instance()->get_function_ptr(
_fn.id, symbol, _fn.hdfs_location, "", fn, &_cache_entry);
} else {
#if 0
DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::IR);

View File

@ -20,6 +20,7 @@
#include <string>
#include "common/object_pool.h"
#include "exprs/expr.h"
#include "udf/udf.h"

View File

@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_QUERY_EXPRS_SLOT_REF_H
#define DORIS_BE_SRC_QUERY_EXPRS_SLOT_REF_H
#include "common/object_pool.h"
#include "exprs/expr.h"
namespace doris {

View File

@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_QUERY_EXPRS_TUPLE_IS_NULL_PREDICATE_H
#define DORIS_BE_SRC_QUERY_EXPRS_TUPLE_IS_NULL_PREDICATE_H
#include "common/object_pool.h"
#include "exprs/predicate.h"
namespace doris {

View File

@ -159,16 +159,19 @@ Status HttpClient::download(const std::string& local_path) {
LOG(WARNING) << "open file failed, file=" << local_path;
return Status("open file failed");
}
auto callback = [&fp, &local_path] (const void* data, size_t length) {
Status status;
auto callback = [&status, &fp, &local_path] (const void* data, size_t length) {
auto res = fwrite(data, length, 1, fp.get());
if (res != 1) {
LOG(WARNING) << "fail to write data to file, file=" << local_path
<< ", error=" << ferror(fp.get());
status = Status("fail to write data when download");
return false;
}
return true;
};
return execute(callback);
RETURN_IF_ERROR(execute(callback));
return status;
}
Status HttpClient::execute(std::string* response) {

View File

@ -37,7 +37,7 @@ add_library(Runtime STATIC
descriptors.cpp
exec_env.cpp
exec_env_init.cpp
lib_cache.cpp
user_function_cache.cpp
mem_pool.cpp
plan_fragment_executor.cpp
primitive_type.cpp

View File

@ -1,437 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/lib_cache.h"
#include <boost/filesystem.hpp>
#include <boost/foreach.hpp>
#include <boost/thread/locks.hpp>
#include "codegen/llvm_codegen.h"
#include "runtime/runtime_state.h"
#include "util/dynamic_util.h"
#include "util/hash_util.hpp"
#include "util/path_builder.h"
namespace doris {
boost::scoped_ptr<LibCache> LibCache::_s_instance;
struct LibCache::LibCacheEntry {
// Lock protecting all fields in this entry
boost::mutex lock;
// The number of users that are using this cache entry. If this is
// a .so, we can't dlclose unless the use_count goes to 0.
int use_count;
// If true, this cache entry should be removed from _lib_cache when
// the use_count goes to 0.
bool should_remove;
// If true, we need to check if there is a newer version of the cached library in HDFS
// on next access. Should hold _lock to read/write.
bool check_needs_refresh;
// The type of this file.
LibType type;
// The path on the local file system for this library.
std::string local_path;
// Status returned from copying this file from HDFS.
Status copy_file_status;
// The last modification time of the HDFS file in seconds.
time_t last_mod_time;
// Handle from dlopen.
void* shared_object_handle;
// mapping from symbol => address of loaded symbol.
// Only used if the type is TYPE_SO.
typedef boost::unordered_map<std::string, void*> SymbolMap;
SymbolMap symbol_cache;
// Set of symbols in this entry. This is populated once on load and read
// only. This is only used if it is a llvm module.
// TODO: it would be nice to be able to do this for .so's as well but it's
// not trivial to walk an .so for the symbol table.
boost::unordered_set<std::string> symbols;
// Set if an error occurs loading the cache entry before the cache entry
// can be evicted. This allows other threads that attempt to use the entry
// before it is removed to return the same error.
Status loading_status;
LibCacheEntry() : use_count(0), should_remove(false), check_needs_refresh(false),
shared_object_handle(NULL) {}
~LibCacheEntry();
};
LibCache::LibCache() :
_current_process_handle(NULL) {
}
LibCache::~LibCache() {
drop_cache();
if (_current_process_handle != NULL) {
dynamic_close(_current_process_handle);
}
}
Status LibCache::init() {
DCHECK(LibCache::_s_instance.get() == NULL);
LibCache::_s_instance.reset(new LibCache());
return LibCache::_s_instance->init_internal();
}
Status LibCache::init_internal() {
// if (TestInfo::is_fe_test()) {
// // In the FE tests, NULL gives the handle to the java process.
// // Explicitly load the fe-support shared object.
// std::string fe_support_path;
// PathBuilder::GetFullBuildPath("service/libfesupport.so", &fe_support_path);
// RETURN_IF_ERROR(dynamic_open(fe_support_path.c_str(), &_current_process_handle));
// } else {
RETURN_IF_ERROR(dynamic_open(NULL, &_current_process_handle));
DCHECK(_current_process_handle != NULL)
<< "We should always be able to get current process handle.";
return Status::OK;
}
LibCache::LibCacheEntry::~LibCacheEntry() {
if (shared_object_handle != NULL) {
DCHECK_EQ(use_count, 0);
DCHECK(should_remove);
dynamic_close(shared_object_handle);
}
unlink(local_path.c_str());
}
Status LibCache::get_so_function_ptr(
const std::string& hdfs_lib_file, const std::string& origin_symbol,
void** fn_ptr, LibCacheEntry** ent, bool quiet) {
const std::string symbol = get_real_symbol(origin_symbol);
if (hdfs_lib_file.empty()) {
// Just loading a function ptr in the current process. No need to take any locks.
DCHECK(_current_process_handle != NULL);
RETURN_IF_ERROR(dynamic_lookup(_current_process_handle, symbol.c_str(), fn_ptr));
return Status::OK;
}
LibCacheEntry* entry = NULL;
boost::unique_lock<boost::mutex> lock;
if (ent != NULL && *ent != NULL) {
// Reuse already-cached entry provided by user
entry = *ent;
boost::unique_lock<boost::mutex> l(entry->lock);
lock.swap(l);
} else {
RETURN_IF_ERROR(get_chache_entry(hdfs_lib_file, TYPE_SO, &lock, &entry));
}
DCHECK(entry != NULL);
DCHECK_EQ(entry->type, TYPE_SO);
LibCacheEntry::SymbolMap::iterator it = entry->symbol_cache.find(symbol);
if (it != entry->symbol_cache.end()) {
*fn_ptr = it->second;
} else {
RETURN_IF_ERROR(
dynamic_lookup(entry->shared_object_handle, symbol.c_str(), fn_ptr));
entry->symbol_cache[symbol] = *fn_ptr;
}
DCHECK(*fn_ptr != NULL);
if (ent != NULL && *ent == NULL) {
// Only set and increment user's entry if it wasn't already cached
*ent = entry;
++(*ent)->use_count;
}
return Status::OK;
}
void LibCache::decrement_use_count(LibCacheEntry* entry) {
if (entry == NULL) {
return;
}
bool can_delete = false;
{
boost::unique_lock<boost::mutex> lock(entry->lock);
--entry->use_count;
can_delete = (entry->use_count == 0 && entry->should_remove);
}
if (can_delete) {
delete entry;
}
}
Status LibCache::get_local_lib_path(
const std::string& hdfs_lib_file, LibType type, std::string* local_path) {
boost::unique_lock<boost::mutex> lock;
LibCacheEntry* entry = NULL;
RETURN_IF_ERROR(get_chache_entry(hdfs_lib_file, type, &lock, &entry));
DCHECK(entry != NULL);
DCHECK_EQ(entry->type, type);
*local_path = entry->local_path;
return Status::OK;
}
Status LibCache::check_symbol_exists(
const std::string& hdfs_lib_file, LibType type,
const std::string& origin_symbol, bool quiet) {
const std::string symbol = get_real_symbol(origin_symbol);
if (type == TYPE_SO) {
void* dummy_ptr = NULL;
return get_so_function_ptr(hdfs_lib_file, symbol, &dummy_ptr, NULL, quiet);
} else if (type == TYPE_IR) {
boost::unique_lock<boost::mutex> lock;
LibCacheEntry* entry = NULL;
RETURN_IF_ERROR(get_chache_entry(hdfs_lib_file, type, &lock, &entry));
DCHECK(entry != NULL);
DCHECK_EQ(entry->type, TYPE_IR);
if (entry->symbols.find(symbol) == entry->symbols.end()) {
std::stringstream ss;
ss << "Symbol '" << symbol << "' does not exist in module: " << hdfs_lib_file
<< " (local path: " << entry->local_path << ")";
// return quiet ? Status::Expected(ss.str()) : Status(ss.str());
return Status(ss.str());
}
return Status::OK;
} else if (type == TYPE_JAR) {
// TODO: figure out how to inspect contents of jars
boost::unique_lock<boost::mutex> lock;
LibCacheEntry* dummy_entry = NULL;
return get_chache_entry(hdfs_lib_file, type, &lock, &dummy_entry);
} else {
DCHECK(false);
return Status("Shouldn't get here.");
}
}
void LibCache::set_needs_refresh(const std::string& hdfs_lib_file) {
boost::unique_lock<boost::mutex> lib_cache_lock(_lock);
LibMap::iterator it = _lib_cache.find(hdfs_lib_file);
if (it == _lib_cache.end()) {
return;
}
LibCacheEntry* entry = it->second;
boost::unique_lock<boost::mutex> entry_lock(entry->lock);
// Need to hold _lock before setting check_needs_refresh.
entry->check_needs_refresh = true;
}
void LibCache::remove_entry(const std::string& hdfs_lib_file) {
boost::unique_lock<boost::mutex> lib_cache_lock(_lock);
LibMap::iterator it = _lib_cache.find(hdfs_lib_file);
if (it == _lib_cache.end()) {
return;
}
remove_entry_internal(hdfs_lib_file, it);
}
void LibCache::remove_entry_internal(const std::string& hdfs_lib_file,
const LibMap::iterator& entry_iter) {
LibCacheEntry* entry = entry_iter->second;
VLOG(1) << "Removing lib cache entry: " << hdfs_lib_file
<< ", local path: " << entry->local_path;
boost::unique_lock<boost::mutex> entry_lock(entry->lock);
// We have both locks so no other thread can be updating _lib_cache or trying to get
// the entry.
_lib_cache.erase(entry_iter);
entry->should_remove = true;
DCHECK_GE(entry->use_count, 0);
bool can_delete = entry->use_count == 0;
// Now that the entry is removed from the map, it means no future threads
// can find it->second (the entry), so it is safe to unlock.
entry_lock.unlock();
// Now that we've unlocked, we can delete this entry if no one is using it.
if (can_delete) {
delete entry;
}
}
void LibCache::drop_cache() {
boost::unique_lock<boost::mutex> lib_cache_lock(_lock);
BOOST_FOREACH(LibMap::value_type& v, _lib_cache) {
bool can_delete = false;
{
// Lock to wait for any threads currently processing the entry.
boost::unique_lock<boost::mutex> entry_lock(v.second->lock);
v.second->should_remove = true;
DCHECK_GE(v.second->use_count, 0);
can_delete = v.second->use_count == 0;
}
VLOG(1) << "Removed lib cache entry: " << v.first;
if (can_delete) delete v.second;
}
_lib_cache.clear();
}
Status LibCache::get_chache_entry(
const std::string& hdfs_lib_file, LibType type,
boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry) {
Status status;
{
// If an error occurs, local_entry_lock is released before calling remove_entry()
// below because it takes the global _lock which must be acquired before taking entry
// locks.
boost::unique_lock<boost::mutex> local_entry_lock;
status = get_chache_entry_internal(hdfs_lib_file, type, &local_entry_lock, entry);
if (status.ok()) {
entry_lock->swap(local_entry_lock);
return status;
}
if (*entry == NULL) return status;
// Set loading_status on the entry so that if another thread calls
// get_chache_entry() for this lib before this thread is able to acquire _lock in
// remove_entry(), it is able to return the same error.
(*entry)->loading_status = status;
}
// Takes _lock
remove_entry(hdfs_lib_file);
return status;
}
Status LibCache::get_chache_entry_internal(
const std::string& hdfs_lib_file, LibType type,
boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry) {
DCHECK(!hdfs_lib_file.empty());
*entry = NULL;
#if 0
// Check if this file is already cached or an error occured on another thread while
// loading the library.
boost::unique_lock<boost::mutex> lib_cache_lock(_lock);
LibMap::iterator it = _lib_cache.find(hdfs_lib_file);
if (it != _lib_cache.end()) {
{
boost::unique_lock<boost::mutex> local_entry_lock((it->second)->lock);
if (!(it->second)->loading_status.ok()) {
// If loading_status is already set, the returned *entry should be NULL.
DCHECK(*entry == NULL);
return (it->second)->loading_status;
}
}
*entry = it->second;
if ((*entry)->check_needs_refresh) {
// Check if file has been modified since loading the cached copy. If so, remove the
// cached entry and create a new one.
(*entry)->check_needs_refresh = false;
time_t last_mod_time;
hdfsFS hdfs_conn;
Status status = HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn);
if (!status.ok()) {
remove_entry_internal(hdfs_lib_file, it);
*entry = NULL;
return status;
}
status = GetLastModificationTime(hdfs_conn, hdfs_lib_file.c_str(), &last_mod_time);
if (!status.ok() || (*entry)->last_mod_time < last_mod_time) {
remove_entry_internal(hdfs_lib_file, it);
*entry = NULL;
}
RETURN_IF_ERROR(status);
}
}
if (*entry != NULL) {
// Release the _lib_cache lock. This guarantees other threads looking at other
// libs can continue.
lib_cache_lock.unlock();
boost::unique_lock<boost::mutex> local_entry_lock((*entry)->lock);
entry_lock->swap(local_entry_lock);
RETURN_IF_ERROR((*entry)->copy_file_status);
DCHECK_EQ((*entry)->type, type);
DCHECK(!(*entry)->local_path.empty());
return Status::OK;
}
// Entry didn't exist. Add the entry then release _lock (so other libraries
// can be accessed).
*entry = new LibCacheEntry();
// Grab the entry lock before adding it to _lib_cache. We still need to do more
// work to initialize *entry and we don't want another thread to pick up
// the uninitialized entry.
boost::unique_lock<boost::mutex> local_entry_lock((*entry)->lock);
entry_lock->swap(local_entry_lock);
_lib_cache[hdfs_lib_file] = *entry;
lib_cache_lock.unlock();
// At this point we have the entry lock but not the lib cache lock.
DCHECK(*entry != NULL);
(*entry)->type = type;
// Copy the file
(*entry)->local_path = make_local_path(hdfs_lib_file, FLAGS_local_library_dir);
VLOG(1) << "Adding lib cache entry: " << hdfs_lib_file
<< ", local path: " << (*entry)->local_path;
hdfsFS hdfs_conn, local_conn;
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn));
RETURN_IF_ERROR(HdfsFsCache::instance()->GetLocalConnection(&local_conn));
// Note: the file can be updated between getting last_mod_time and copying the file to
// local_path. This can only result in the file unnecessarily being refreshed, and does
// not affect correctness.
(*entry)->copy_file_status = GetLastModificationTime(
hdfs_conn, hdfs_lib_file.c_str(), &(*entry)->last_mod_time);
RETURN_IF_ERROR((*entry)->copy_file_status);
(*entry)->copy_file_status = CopyHdfsFile(
hdfs_conn, hdfs_lib_file, local_conn, (*entry)->local_path);
RETURN_IF_ERROR((*entry)->copy_file_status);
if (type == TYPE_SO) {
// dlopen the local library
RETURN_IF_ERROR(
DynamicOpen((*entry)->local_path.c_str(), &(*entry)->shared_object_handle));
} else if (type == TYPE_IR) {
// Load the module and populate all symbols.
ObjectPool pool;
scoped_ptr<LlvmCodeGen> codegen;
std::string module_id = boost::filesystem::path((*entry)->local_path).stem().string();
RETURN_IF_ERROR(LlvmCodeGen::LoadFromFile(
&pool, (*entry)->local_path, module_id, &codegen));
codegen->GetSymbols(&(*entry)->symbols);
} else {
DCHECK_EQ(type, TYPE_JAR);
// Nothing to do.
}
#endif
return Status::OK;
}
std::string LibCache::make_local_path(
const std::string& hdfs_path, const std::string& local_dir) {
// Append the pid and library number to the local directory.
boost::filesystem::path src(hdfs_path);
std::stringstream dst;
dst << local_dir << "/" << src.stem().native() << "." << getpid() << "."
<< (__sync_fetch_and_add(&_num_libs_copied, 1)) << src.extension().native();
return dst.str();
}
}

View File

@ -1,189 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_RUNTIME_LIB_CACHE_H
#define DORIS_BE_RUNTIME_LIB_CACHE_H
#include <string>
#include <regex>
#include <boost/scoped_ptr.hpp>
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
#include <boost/thread/mutex.hpp>
#include "common/object_pool.h"
#include "common/status.h"
namespace doris {
class RuntimeState;
/// Process-wide cache of dynamically-linked libraries loaded from HDFS.
/// These libraries can either be shared objects, llvm modules or jars. For
/// shared objects, when we load the shared object, we dlopen() it and keep
/// it in our process. For modules, we store the symbols in the module to
/// service symbol lookups. We can't cache the module since it (i.e. the external
/// module) is consumed when it is linked with the query codegen module.
//
/// Locking strategy: We don't want to grab a big lock across all operations since
/// one of the operations is copying a file from HDFS. With one lock that would
/// prevent any UDFs from running on the system. Instead, we have a global lock
/// that is taken when doing the cache lookup, but is not taking during any blocking calls.
/// During the block calls, we take the per-lib lock.
//
/// Entry lifetime management: We cannot delete the entry while a query is
/// using the library. When the caller requests a ptr into the library, they
/// are given the entry handle and must decrement the ref count when they
/// are done.
//
/// TODO:
/// - refresh libraries
/// - better cached module management.
class LibCache {
public:
struct LibCacheEntry;
enum LibType {
TYPE_SO, // Shared object
TYPE_IR, // IR intermediate
TYPE_JAR, // Java jar file. We don't care about the contents in the BE.
};
static LibCache* instance() {
return LibCache::_s_instance.get();
}
/// Calls dlclose on all cached handles.
~LibCache();
/// Initializes the libcache. Must be called before any other APIs.
static Status init();
/// Gets the local file system path for the library at 'hdfs_lib_file'. If
/// this file is not already on the local fs, it copies it and caches the
/// result. Returns an error if 'hdfs_lib_file' cannot be copied to the local fs.
Status get_local_lib_path(const std::string& hdfs_lib_file, LibType type,
std::string* local_path);
/// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok otherwise.
/// If 'quiet' is true, the error status for non-Java unfound symbols will not be logged.
Status check_symbol_exists(
const std::string& hdfs_lib_file, LibType type,
const std::string& symbol, bool quiet);
/// Returns a pointer to the function for the given library and symbol.
/// If 'hdfs_lib_file' is empty, the symbol is looked up in the impalad process.
/// Otherwise, 'hdfs_lib_file' should be the HDFS path to a shared library (.so) file.
/// dlopen handles and symbols are cached.
/// Only usable if 'hdfs_lib_file' refers to a shared object.
//
/// If entry is non-null and *entry is null, *entry will be set to the cached entry. If
/// entry is non-null and *entry is non-null, *entry will be reused (i.e., the use count
/// is not increased). The caller must call decrement_use_count(*entry) when it is done
/// using fn_ptr and it is no longer valid to use fn_ptr.
//
/// If 'quiet' is true, returned error statuses will not be logged.
Status get_so_function_ptr(
const std::string& hdfs_lib_file, const std::string& symbol,
void** fn_ptr, LibCacheEntry** entry, bool quiet);
Status get_so_function_ptr(
const std::string& hdfs_lib_file, const std::string& symbol,
void** fn_ptr, LibCacheEntry** entry) {
return get_so_function_ptr(hdfs_lib_file, symbol, fn_ptr, entry, true);
}
/// Marks the entry for 'hdfs_lib_file' as needing to be refreshed if the file in HDFS is
/// newer than the local cached copied. The refresh will occur the next time the entry is
/// accessed.
void set_needs_refresh(const std::string& hdfs_lib_file);
/// See comment in get_so_function_ptr().
void decrement_use_count(LibCacheEntry* entry);
/// Removes the cache entry for 'hdfs_lib_file'
void remove_entry(const std::string& hdfs_lib_file);
/// Removes all cached entries.
void drop_cache();
private:
/// Singleton instance. Instantiated in Init().
static boost::scoped_ptr<LibCache> _s_instance;
/// dlopen() handle for the current process (i.e. impalad).
void* _current_process_handle;
/// The number of libs that have been copied from HDFS to the local FS.
/// This is appended to the local fs path to remove collisions.
int64_t _num_libs_copied;
/// Protects _lib_cache. For lock ordering, this lock must always be taken before
/// the per entry lock.
boost::mutex _lock;
/// Maps HDFS library path => cache entry.
/// Entries in the cache need to be explicitly deleted.
typedef boost::unordered_map<std::string, LibCacheEntry*> LibMap;
LibMap _lib_cache;
LibCache();
LibCache(LibCache const& l); // disable copy ctor
LibCache& operator=(LibCache const& l); // disable assignment
Status init_internal();
/// Returns the cache entry for 'hdfs_lib_file'. If this library has not been
/// copied locally, it will copy it and add a new LibCacheEntry to '_lib_cache'.
/// Result is returned in *entry.
/// No locks should be take before calling this. On return the entry's lock is
/// taken and returned in *entry_lock.
/// If an error is returned, there will be no entry in _lib_cache and *entry is NULL.
Status get_chache_entry(
const std::string& hdfs_lib_file, LibType type,
boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry);
/// Implementation to get the cache entry for 'hdfs_lib_file'. Errors are returned
/// without evicting the cache entry if the status is not OK and *entry is not NULL.
Status get_chache_entry_internal(
const std::string& hdfs_lib_file, LibType type,
boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry);
// map "palo" to "doris" in symbol, only for grayscale upgrading
std::string get_real_symbol(const std::string& symbol) {
static std::regex rx1("8palo_udf");
std::string str1 = std::regex_replace(symbol, rx1, "9doris_udf");
static std::regex rx2("4palo");
std::string str2 = std::regex_replace(str1, rx2, "5doris");
return str2;
}
/// Utility function for generating a filename unique to this process and
/// 'hdfs_path'. This is to prevent multiple impalad processes or different library files
/// with the same name from clobbering each other. 'hdfs_path' should be the full path
/// (including the filename) of the file we're going to copy to the local FS, and
/// 'local_dir' is the local directory prefix of the returned path.
std::string make_local_path(const std::string& hdfs_path, const std::string& local_dir);
/// Implementation to remove an entry from the cache.
/// _lock must be held. The entry's lock should not be held.
void remove_entry_internal(const std::string& hdfs_lib_file,
const LibMap::iterator& entry_iterator);
};
}
#endif

View File

@ -46,6 +46,30 @@
namespace doris {
RuntimeState::RuntimeState(
const TUniqueId& fragment_instance_id,
const TQueryOptions& query_options,
const std::string& now, ExecEnv* exec_env) :
_obj_pool(new ObjectPool()),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_profile(_obj_pool.get(), "Fragment " + print_id(fragment_instance_id)),
_fragment_mem_tracker(NULL),
_is_cancelled(false),
_per_fragment_instance_idx(0),
_root_node_id(-1),
_num_rows_load_success(0),
_num_rows_load_filtered(0),
_num_print_error_rows(0),
_normal_row_number(0),
_error_row_number(0),
_error_log_file(nullptr),
_is_running(true),
_instance_buffer_reservation(new ReservationTracker) {
Status status = init(fragment_instance_id, query_options, now, exec_env);
DCHECK(status.ok());
}
RuntimeState::RuntimeState(
const TExecPlanFragmentParams& fragment_params,
const TQueryOptions& query_options,

View File

@ -65,6 +65,11 @@ class RowDescriptor;
// query and shared across all execution nodes of that query.
class RuntimeState {
public:
// NOTE: only used to unit test
RuntimeState(
const TUniqueId& fragment_instance_id,
const TQueryOptions& query_options,
const std::string& now, ExecEnv* exec_env);
RuntimeState(
const TExecPlanFragmentParams& fragment_params,

View File

@ -0,0 +1,370 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/user_function_cache.h"
#include <vector>
#include <boost/algorithm/string/split.hpp> // boost::split
#include <boost/algorithm/string/predicate.hpp> // boost::algorithm::ends_with
#include <boost/algorithm/string/classification.hpp> // boost::is_any_of
#include "http/http_client.h"
#include "util/dynamic_util.h"
#include "util/file_utils.h"
#include "util/md5.h"
#include "util/spinlock.h"
namespace doris {
static const int kLibShardNum = 128;
// function cache entry, store information for
struct UserFunctionCacheEntry {
UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_,
const std::string& lib_file_)
: function_id(fid_), checksum(checksum_), lib_file(lib_file_) { }
~UserFunctionCacheEntry();
void ref() { _refs.fetch_add(1); }
// If unref() returns true, this object should be delete
bool unref() { return _refs.fetch_sub(1) == 1; }
int64_t function_id = 0;
// used to check if this library is valid.
std::string checksum;
// library file
std::string lib_file;
// make it atomic variable instead of holding a lock
std::atomic<bool> is_loaded{false};
// Set to true when this library is not needed.
// e.g. deleting some unused library to re
std::atomic<bool> should_delete_library{false};
// lock to make sure only one can load this cache
std::mutex load_lock;
// To reduce cache lock held time, cache entry is
// added to cache map before library is downloaded.
// And this is used to indicate whether library is downloaded.
bool is_downloaded = false;
// used to lookup a symbol
void* lib_handle = nullptr;
SpinLock map_lock;
// from symbol_name to function pointer
std::unordered_map<std::string, void*> fptr_map;
private:
std::atomic<int> _refs{0};
};
UserFunctionCacheEntry::~UserFunctionCacheEntry() {
// close lib_handle if it was opened
if (lib_handle != nullptr) {
dynamic_close(lib_handle);
lib_handle = nullptr;
}
// delete library file if should_delete_library is set
if (should_delete_library.load()) {
unlink(lib_file.c_str());
}
}
UserFunctionCache::UserFunctionCache() {
}
UserFunctionCache::~UserFunctionCache() {
std::lock_guard<std::mutex> l(_cache_lock);
auto it = _entry_map.begin();
while (it != _entry_map.end()) {
auto entry = it->second;
it = _entry_map.erase(it);
if (entry->unref()) {
delete entry;
}
}
}
UserFunctionCache* UserFunctionCache::instance() {
static UserFunctionCache s_cache;
return &s_cache;
}
Status UserFunctionCache::init(const std::string& lib_dir) {
DCHECK(_lib_dir.empty());
_lib_dir = lib_dir;
// 1. dynamic open current process
RETURN_IF_ERROR(dynamic_open(nullptr, &_current_process_handle));
// 2. load all cached
RETURN_IF_ERROR(_load_cached_lib());
return Status::OK;
}
Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std::string& file) {
if (!boost::algorithm::ends_with(file, ".so")) {
return Status("unknown library file format");
}
std::vector<std::string> split_parts;
boost::split(split_parts, file, boost::is_any_of("."));
if (split_parts.size() != 3) {
return Status("user function's name should be function_id.checksum.so");
}
int64_t function_id = std::stol(split_parts[0]);
std::string checksum = split_parts[1];
auto it = _entry_map.find(function_id);
if (it != _entry_map.end()) {
LOG(WARNING) << "meet a same function id user function library, function_id=" << function_id
<< ", one_checksum=" << checksum << ", other_checksum=" << it->second->checksum;
return Status("duplicate function id");
}
// create a cache entry and put it into entry map
UserFunctionCacheEntry* entry = new UserFunctionCacheEntry(
function_id, checksum, dir + "/" + file);
entry->is_downloaded = true;
entry->ref();
_entry_map[function_id] = entry;
return Status::OK;
}
Status UserFunctionCache::_load_cached_lib() {
// create library directory if not exist
RETURN_IF_ERROR(FileUtils::create_dir(_lib_dir));
auto scan_cb = [this] (const std::string& dir, const std::string& file) {
auto st = _load_entry_from_lib(dir, file);
if (!st.ok()) {
LOG(WARNING) << "load a library failed, dir=" << dir << ", file=" << file;
}
return true;
};
for (int i = 0; i < kLibShardNum; ++i) {
std::string sub_dir = _lib_dir + "/" + std::to_string(i);
RETURN_IF_ERROR(FileUtils::create_dir(sub_dir));
RETURN_IF_ERROR(FileUtils::scan_dir(sub_dir, scan_cb));
}
return Status::OK;
}
Status UserFunctionCache::get_function_ptr(
int64_t fid,
const std::string& symbol,
const std::string& url,
const std::string& checksum,
void** fn_ptr,
UserFunctionCacheEntry** output_entry) {
if (fid == 0) {
// Just loading a function ptr in the current process. No need to take any locks.
RETURN_IF_ERROR(dynamic_lookup(_current_process_handle, symbol.c_str(), fn_ptr));
return Status::OK;
}
// if we need to unref entry
bool need_unref_entry = false;
UserFunctionCacheEntry* entry = nullptr;
// find the library entry for this function. If *output_entry is not null
// find symbol in it without to get other entry
if (output_entry != nullptr && *output_entry != nullptr) {
entry = *output_entry;
} else {
RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry));
need_unref_entry = true;
}
Status status;
{
std::lock_guard<SpinLock> l(entry->map_lock);
// now, we have the library entry, we need to lock it to find symbol
auto it = entry->fptr_map.find(symbol);
if (it != entry->fptr_map.end()) {
*fn_ptr = it->second;
} else {
status = dynamic_lookup(entry->lib_handle, symbol.c_str(), fn_ptr);
if (status.ok()) {
entry->fptr_map.emplace(symbol, *fn_ptr);
} else {
LOG(WARNING) << "fail to lookup symbol in library, symbol=" << symbol
<< ", file=" << entry->lib_file;
}
}
}
if (status.ok() && output_entry != nullptr && *output_entry == nullptr) {
*output_entry = entry;
need_unref_entry = false;
}
if (need_unref_entry) {
if (entry->unref()) {
delete entry;
}
}
return status;
}
Status UserFunctionCache::_get_cache_entry(
int64_t fid, const std::string& url,
const std::string& checksum, UserFunctionCacheEntry** output_entry) {
UserFunctionCacheEntry* entry = nullptr;
{
std::lock_guard<std::mutex> l(_cache_lock);
auto it = _entry_map.find(fid);
if (it != _entry_map.end()) {
entry = it->second;
} else {
entry = new UserFunctionCacheEntry(
fid, checksum, _make_lib_file(fid, checksum));
entry->ref();
_entry_map.emplace(fid, entry);
}
entry->ref();
}
auto st = _load_cache_entry(url, entry);
if (!st.ok()) {
LOG(WARNING) << "fail to load cache entry, fid=" << fid;
// if we load a cache entry failed, I think we should delete this entry cache
// evenif this cache was valid before.
_destroy_cache_entry(entry);
return st;
}
*output_entry = entry;
return Status::OK;
}
void UserFunctionCache::_destroy_cache_entry(UserFunctionCacheEntry* entry) {
// 1. we remove cache entry from entry map
size_t num_removed = 0;
{
std::lock_guard<std::mutex> l(_cache_lock);
num_removed = _entry_map.erase(entry->function_id);
}
if (num_removed > 0) {
entry->unref();
}
entry->should_delete_library.store(true);
// now we need to drop
if (entry->unref()) {
delete entry;
}
}
Status UserFunctionCache::_load_cache_entry(
const std::string& url, UserFunctionCacheEntry* entry) {
if (entry->is_loaded.load()) {
return Status::OK;
}
std::unique_lock<std::mutex> l(entry->load_lock);
if (!entry->is_downloaded) {
RETURN_IF_ERROR(_download_lib(url, entry));
}
RETURN_IF_ERROR(_load_cache_entry_internal(entry));
return Status::OK;
}
// entry's lock must be held
Status UserFunctionCache::_download_lib(
const std::string& url, UserFunctionCacheEntry* entry) {
DCHECK(!entry->is_downloaded);
// get local path to save library
std::string tmp_file = entry->lib_file + ".tmp";
auto fp_closer = [] (FILE*fp) { fclose(fp); };
std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(tmp_file.c_str(), "w"), fp_closer);
if (fp == nullptr) {
LOG(WARNING) << "fail to open file, file=" << tmp_file
<< ", error=" << ferror(fp.get());
return Status("fail to open file");
}
Md5Digest digest;
HttpClient client;
RETURN_IF_ERROR(client.init(url));
Status status;
auto download_cb = [&status, &tmp_file, &fp, &digest] (const void* data, size_t length) {
digest.update(data, length);
auto res = fwrite(data, length, 1, fp.get());
if (res != 1) {
LOG(WARNING) << "fail to write data to file, file=" << tmp_file
<< ", error=" << ferror(fp.get());
status = Status("fail to write data when download");
return false;
}
return true;
};
RETURN_IF_ERROR(client.execute(download_cb));
RETURN_IF_ERROR(status);
digest.digest();
if (!boost::iequals(digest.hex(), entry->checksum)) {
LOG(WARNING) << "UDF's checksum is not equal, one=" << digest.hex()
<< ", other=" << entry->checksum;
return Status("UDF's library checksum is not match");
}
// close this file
fp.reset();
// rename temporary file to library file
auto ret = rename(tmp_file.c_str(), entry->lib_file.c_str());
if (ret != 0) {
char buf[64];
LOG(WARNING) << "fail to rename file from=" << tmp_file << ", to=" << entry->lib_file
<< ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 64);
return Status("fail to rename file");
}
// check download
entry->is_downloaded = true;
return Status::OK;
}
// entry's lock must be held
Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry* entry) {
RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle));
entry->is_loaded.store(true);
return Status::OK;
}
std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum) {
int shard = function_id % kLibShardNum;
std::stringstream ss;
ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum << ".so";
return ss.str();
}
void UserFunctionCache::release_entry(UserFunctionCacheEntry* entry) {
if (entry == nullptr) {
return;
}
if (entry->unref()) {
delete entry;
}
}
}

View File

@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <mutex>
#include <string>
#include <unordered_map>
#include "common/status.h"
namespace doris {
struct UserFunctionCacheEntry;
// Used to cache a user function. Theses functions inlcude
// UDF(User Definfed Function) and UDAF(User Defined Aggregate
// Function), and maybe inlucde UDTF(User Defined Table
// Function) in future. A user defined function may be splitted
// into several functions, for example, UDAF is splitted into
// InitFn, MergeFn, FinalizeFn...
// In Doris, we call UDF/UDAF/UDTF UserFunction, and we call
// implement function Fucntion.
// An UserFunction have a function id, we can find library with
// this id. When we add user function into cache, we need to
// download from URL and check its checksum. So if we find a function
// with id, this function library is valid. And when user wants to
// change its implementation(URL), Doris will generate a new function
// id.
class UserFunctionCache {
public:
// local_dir is the directory which contain cached library.
UserFunctionCache();
~UserFunctionCache();
// initialize this cache, call this function before others
Status init(const std::string& local_path);
static UserFunctionCache* instance();
// Return function pointer for given fid and symbol.
// If fid is 0, lookup symbol from this doris-be process.
// Otherwise find symbol in UserFunction's library.
// Found function pointer is returned in fn_ptr, and cache entry
// is returned by entry. Client must call release_entry to release
// cache entry if didn't need it.
// If *entry is not true means that we should find symbol in this
// entry.
Status get_function_ptr(int64_t fid,
const std::string& symbol,
const std::string& url,
const std::string& checksum,
void** fn_ptr,
UserFunctionCacheEntry** entry);
void release_entry(UserFunctionCacheEntry* entry);
private:
Status _load_cached_lib();
Status _load_entry_from_lib(const std::string& dir, const std::string& file);
Status _get_cache_entry(
int64_t fid, const std::string& url,
const std::string& checksum, UserFunctionCacheEntry** output_entry);
Status _load_cache_entry(const std::string& url, UserFunctionCacheEntry* entry);
Status _download_lib(
const std::string& url, UserFunctionCacheEntry* entry);
Status _load_cache_entry_internal(UserFunctionCacheEntry* entry);
std::string _make_lib_file(int64_t function_id, const std::string& checksum);
void _destroy_cache_entry(UserFunctionCacheEntry* entry);
private:
std::string _lib_dir;
void* _current_process_handle = nullptr;
std::mutex _cache_lock;
std::unordered_map<int64_t, UserFunctionCacheEntry*> _entry_map;
};
}

View File

@ -94,19 +94,10 @@ Status FileUtils::scan_dir(
}
DeferOp close_dir(std::bind<void>(&closedir, dir));
struct dirent entry;
struct dirent* result = nullptr;
int64_t count = 0;
while (true) {
int ret = readdir_r(dir, &entry, &result);
if (ret != 0) {
char buf[64];
std::stringstream ss;
ss << "readdir(" << dir_path << ") failed, because: " << strerror_r(errno, buf, 64);
return Status(ss.str());
}
auto result = readdir(dir.get());
if (result == nullptr) {
// Over
break;
}
std::string file_name = result->d_name;
@ -127,6 +118,36 @@ Status FileUtils::scan_dir(
return Status::OK;
}
Status FileUtils::scan_dir(
const std::string& dir_path,
const std::function<bool(const std::string&, const std::string&)>& callback) {
auto dir_closer = [] (DIR* dir) { closedir(dir); };
std::unique_ptr<DIR, decltype(dir_closer)> dir(opendir(dir_path.c_str()), dir_closer);
if (dir == nullptr) {
char buf[64];
LOG(WARNING) << "fail to open dir, dir=" << dir_path << ", errmsg=" << strerror_r(errno, buf, 64);
return Status("fail to opendir");
}
struct dirent* result = nullptr;
while (true) {
auto result = readdir(dir.get());
if (result == nullptr) {
break;
}
std::string file_name = result->d_name;
if (file_name == "." || file_name == "..") {
continue;
}
auto is_continue = callback(dir_path, file_name);
if (!is_continue) {
break;
}
}
return Status::OK;
}
bool FileUtils::is_dir(const std::string& path) {
struct stat path_stat;
if (stat(path.c_str(), &path_stat) != 0) {

View File

@ -19,6 +19,7 @@
#define DORIS_BE_UTIL_FILE_UTILS_H
#include <string>
#include <functional>
#include "common/status.h"
@ -44,6 +45,9 @@ public:
static Status scan_dir(
const std::string& dir_path, std::vector<std::string>* files,
int64_t* file_count = nullptr);
static Status scan_dir(
const std::string& dir_path,
const std::function<bool(const std::string&, const std::string&)>& callback);
// If the file_path is not exist, or is not a dir, return false.
static bool is_dir(const std::string& file_path);

View File

@ -30,7 +30,7 @@
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/row_batch.h"
#include "runtime/lib_cache.h"
#include "runtime/user_function_cache.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
@ -44,7 +44,7 @@ public:
}
void init();
static void SetUpTestCase() {
LibCache::instance()->init();
UserFunctionCache::instance()->init("./be/test/runtime/test_data/user_function_cache/normal");
CastFunctions::init();
}

View File

@ -29,7 +29,7 @@
#include "runtime/descriptors.h"
#include "runtime/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "runtime/lib_cache.h"
#include "runtime/user_function_cache.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "exprs/cast_functions.h"
@ -46,7 +46,7 @@ public:
void init();
static void SetUpTestCase() {
LibCache::instance()->init();
UserFunctionCache::instance()->init("./be/test/runtime/test_data/user_function_cache/normal");
CastFunctions::init();
}

View File

@ -54,3 +54,4 @@ ADD_BE_TEST(stream_load_pipe_test)
ADD_BE_TEST(tablet_writer_mgr_test)
#ADD_BE_TEST(export_task_mgr_test)
ADD_BE_TEST(snapshot_loader_test)
ADD_BE_TEST(user_function_cache_test)

View File

@ -0,0 +1,4 @@
void my_add() {
}
void my_del() {
}

View File

@ -0,0 +1,231 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/user_function_cache.h"
#include <gtest/gtest.h>
#include <cstdio>
#include <cstdlib>
#include "common/logging.h"
#include "http/ev_http_server.h"
#include "http/http_channel.h"
#include "http/http_handler.h"
#include "http/http_request.h"
#include "util/md5.h"
#include "util/file_utils.h"
int main(int argc, char* argv[]);
namespace doris {
bool k_is_downloaded = false;
class UserFunctionTestHandler : public HttpHandler {
public:
void handle(HttpRequest* req) override {
auto& file_name = req->param("FILE");
std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/lib";
auto lib_file = lib_dir + "/" + file_name;
FILE* fp = fopen(lib_file.c_str(), "r");
if (fp == nullptr) {
HttpChannel::send_error(req, INTERNAL_SERVER_ERROR);
return;
}
std::string response;
char buf[1024];
while (true) {
auto size = fread(buf, 1, 1024, fp);
response.append(buf, size);
if (size < 1024) {
break;
}
}
HttpChannel::send_reply(req, response);
k_is_downloaded = true;
fclose(fp);
}
};
static UserFunctionTestHandler s_test_handler = UserFunctionTestHandler();
static EvHttpServer* s_server = nullptr;
std::string my_add_md5sum;
static std::string compute_md5(const std::string& file) {
FILE* fp = fopen(file.c_str(), "r");
Md5Digest md5;
char buf[1024];
while (true) {
auto size = fread(buf, 1, 1024, fp);
md5.update(buf, size);
if (size < 1024) {
break;
}
}
fclose(fp);
md5.digest();
return md5.hex();
}
class UserFunctionCacheTest : public testing::Test {
public:
UserFunctionCacheTest() { }
virtual ~UserFunctionCacheTest() { }
static void SetUpTestCase() {
s_server = new EvHttpServer(29386);
s_server->register_handler(GET, "/{FILE}", &s_test_handler);
s_server->start();
// compile code to so
system("g++ -shared ./be/test/runtime/test_data/user_function_cache/lib/my_add.cc -o ./be/test/runtime/test_data/user_function_cache/lib/my_add.so");
my_add_md5sum = compute_md5("./be/test/runtime/test_data/user_function_cache/lib/my_add.so");
}
static void TearDownTestCase() {
delete s_server;
system("rm -rf ./be/test/runtime/test_data/user_function_cache/lib/my_add.so");
}
void SetUp() override {
k_is_downloaded = false;
}
};
TEST_F(UserFunctionCacheTest, process_symbol) {
UserFunctionCache cache;
std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/normal";
auto st = cache.init(lib_dir);
ASSERT_TRUE(st.ok());
void* fn_ptr = nullptr;
UserFunctionCacheEntry* entry = nullptr;
st = cache.get_function_ptr(0, "main", "", "", &fn_ptr, &entry);
ASSERT_TRUE(st.ok());
ASSERT_EQ(&main, fn_ptr);
ASSERT_EQ(nullptr, entry);
cache.release_entry(entry);
}
TEST_F(UserFunctionCacheTest, download_normal) {
UserFunctionCache cache;
std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/download";
FileUtils::remove_all(lib_dir);
auto st = cache.init(lib_dir);
ASSERT_TRUE(st.ok());
void* fn_ptr = nullptr;
UserFunctionCacheEntry* entry = nullptr;
// get my_add
st = cache.get_function_ptr(1,
"_Z6my_addv",
"http://127.0.0.1:29386/my_add.so",
my_add_md5sum, &fn_ptr, &entry);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(k_is_downloaded);
ASSERT_NE(nullptr, fn_ptr);
ASSERT_NE(nullptr, entry);
// get my_del
st = cache.get_function_ptr(1,
"_Z6my_delv",
"http://127.0.0.1:29386/my_add.so",
my_add_md5sum, &fn_ptr, &entry);
ASSERT_TRUE(st.ok());
ASSERT_NE(nullptr, fn_ptr);
ASSERT_NE(nullptr, entry);
// get my_mul
st = cache.get_function_ptr(1,
"_Z6my_mulv",
"http://127.0.0.1:29386/my_add.so",
my_add_md5sum, &fn_ptr, &entry);
ASSERT_FALSE(st.ok());
cache.release_entry(entry);
}
TEST_F(UserFunctionCacheTest, load_normal) {
UserFunctionCache cache;
std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/download";
auto st = cache.init(lib_dir);
ASSERT_TRUE(st.ok());
void* fn_ptr = nullptr;
UserFunctionCacheEntry* entry = nullptr;
st = cache.get_function_ptr(1,
"_Z6my_addv",
"http://127.0.0.1:29386/my_add.so",
my_add_md5sum, &fn_ptr, &entry);
ASSERT_TRUE(st.ok());
ASSERT_FALSE(k_is_downloaded);
ASSERT_NE(nullptr, fn_ptr);
ASSERT_NE(nullptr, entry);
cache.release_entry(entry);
}
TEST_F(UserFunctionCacheTest, download_fail) {
UserFunctionCache cache;
std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/download";
auto st = cache.init(lib_dir);
ASSERT_TRUE(st.ok());
void* fn_ptr = nullptr;
UserFunctionCacheEntry* entry = nullptr;
st = cache.get_function_ptr(2,
"_Z6my_delv",
"http://127.0.0.1:29386/my_del.so",
my_add_md5sum, &fn_ptr, &entry);
ASSERT_FALSE(st.ok());
}
TEST_F(UserFunctionCacheTest, md5_fail) {
UserFunctionCache cache;
std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/download";
FileUtils::remove_all(lib_dir);
auto st = cache.init(lib_dir);
ASSERT_TRUE(st.ok());
void* fn_ptr = nullptr;
UserFunctionCacheEntry* entry = nullptr;
st = cache.get_function_ptr(1,
"_Z6my_addv",
"http://127.0.0.1:29386/my_add.so",
"1234", &fn_ptr, &entry);
ASSERT_FALSE(st.ok());
}
TEST_F(UserFunctionCacheTest, bad_so) {
UserFunctionCache cache;
std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/bad";
FileUtils::create_dir(lib_dir + "/2");
auto so_file = lib_dir + "/2/2.abc.so";
FILE* fp = fopen(so_file.c_str(), "w");
fwrite(&fp, sizeof(FILE*), 1, fp);
fclose(fp);
auto st = cache.init(lib_dir);
ASSERT_TRUE(st.ok());
void* fn_ptr = nullptr;
UserFunctionCacheEntry* entry = nullptr;
st = cache.get_function_ptr(2,
"_Z6my_addv",
"http://127.0.0.1:29386/my_add.so",
"abc", &fn_ptr, &entry);
ASSERT_FALSE(st.ok());
}
}
int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -285,6 +285,7 @@ struct TFunction {
10: optional TAggregateFunction aggregate_fn
11: optional i64 id
12: optional string checksum
}
enum TLoadJobState {

View File

@ -175,8 +175,9 @@ ${DORIS_TEST_BINARY_DIR}/runtime/free_list_test
${DORIS_TEST_BINARY_DIR}/runtime/string_buffer_test
${DORIS_TEST_BINARY_DIR}/runtime/stream_load_pipe_test
${DORIS_TEST_BINARY_DIR}/runtime/tablet_writer_mgr_test
## Running expr Unittest
${DORIS_TEST_BINARY_DIR}/runtime/snapshot_loader_test
${DORIS_TEST_BINARY_DIR}/runtime/user_function_cache_test
## Running expr Unittest
# Running http
${DORIS_TEST_BINARY_DIR}/http/metrics_action_test