From 90d71508ff7c2eeba1af8718ecea76bf66371bfa Mon Sep 17 00:00:00 2001 From: ZHAO Chun Date: Fri, 21 Dec 2018 22:07:21 +0800 Subject: [PATCH] Add UserFunctionCache to cache UDF's library (#453) * Add UserFunctionCache to cache UDF's library This patch replace LibCache with UserFunctionCache. LibCache use HDFS URL to identify a UDF's Library, and when BE process restart all of downloaded library should be loaded another time. We use function id corresponding to a library, and when process restart, all downloaded libraries can be loaded without another downloading. * update --- be/src/common/config.h | 1 + be/src/common/daemon.cpp | 4 +- be/src/exprs/agg_fn.cc | 37 +- be/src/exprs/agg_fn_evaluator.cpp | 35 +- be/src/exprs/arithmetic_expr.h | 1 + be/src/exprs/binary_predicate.h | 3 + be/src/exprs/case_expr.h | 1 + be/src/exprs/cast_expr.h | 1 + be/src/exprs/compound_predicate.h | 2 + be/src/exprs/conditional_functions.h | 1 + be/src/exprs/expr.cpp | 15 +- be/src/exprs/expr.h | 4 +- be/src/exprs/info_func.h | 1 + be/src/exprs/literal.h | 1 + be/src/exprs/new_agg_fn_evaluator.cc | 1 - be/src/exprs/new_agg_fn_evaluator.h | 1 - be/src/exprs/null_literal.h | 1 + be/src/exprs/scalar_fn_call.cpp | 24 +- be/src/exprs/scalar_fn_call.h | 1 + be/src/exprs/slot_ref.h | 1 + be/src/exprs/tuple_is_null_predicate.h | 1 + be/src/http/http_client.cpp | 7 +- be/src/runtime/CMakeLists.txt | 2 +- be/src/runtime/lib_cache.cpp | 437 ------------------ be/src/runtime/lib_cache.h | 189 -------- be/src/runtime/runtime_state.cpp | 24 + be/src/runtime/runtime_state.h | 5 + be/src/runtime/user_function_cache.cpp | 370 +++++++++++++++ be/src/runtime/user_function_cache.h | 93 ++++ be/src/util/file_utils.cpp | 41 +- be/src/util/file_utils.h | 4 + be/test/exec/broker_scan_node_test.cpp | 4 +- be/test/exec/broker_scanner_test.cpp | 4 +- be/test/runtime/CMakeLists.txt | 1 + .../user_function_cache/lib/my_add.cc | 4 + be/test/runtime/user_function_cache_test.cpp | 231 +++++++++ gensrc/thrift/Types.thrift | 1 + run-ut.sh | 3 +- 38 files changed, 858 insertions(+), 699 deletions(-) delete mode 100644 be/src/runtime/lib_cache.cpp delete mode 100644 be/src/runtime/lib_cache.h create mode 100644 be/src/runtime/user_function_cache.cpp create mode 100644 be/src/runtime/user_function_cache.h create mode 100644 be/test/runtime/test_data/user_function_cache/lib/my_add.cc create mode 100644 be/test/runtime/user_function_cache_test.cpp diff --git a/be/src/common/config.h b/be/src/common/config.h index 0a04059445..0c9272d77b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -121,6 +121,7 @@ namespace config { // log dir CONF_String(sys_log_dir, "${DORIS_HOME}/log"); + CONF_String(user_function_dir, "${DORIS_HOME}/lib/usr"); // INFO, WARNING, ERROR, FATAL CONF_String(sys_log_level, "INFO"); // TIME-DAY, TIME-HOUR, SIZE-MB-nnn diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 53a327bb54..450521ebc1 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -32,7 +32,7 @@ #include "runtime/bufferpool/buffer_pool.h" #include "runtime/exec_env.h" #include "runtime/mem_tracker.h" -#include "runtime/lib_cache.h" +#include "runtime/user_function_cache.h" #include "exprs/operators.h" #include "exprs/is_null_predicate.h" #include "exprs/like_predicate.h" @@ -171,7 +171,7 @@ void init_daemon(int argc, char** argv, const std::vector& paths) { CpuInfo::init(); DiskInfo::init(); MemInfo::init(); - LibCache::init(); + UserFunctionCache::instance()->init(config::user_function_dir); Operators::init(); IsNullPredicate::init(); LikePredicate::init(); diff --git a/be/src/exprs/agg_fn.cc b/be/src/exprs/agg_fn.cc index fd81204e1e..74b44d0cc8 100644 --- a/be/src/exprs/agg_fn.cc +++ b/be/src/exprs/agg_fn.cc @@ -20,7 +20,7 @@ #include "codegen/llvm_codegen.h" #include "exprs/anyval_util.h" #include "runtime/descriptors.h" -#include "runtime/lib_cache.h" +#include "runtime/user_function_cache.h" #include "runtime/runtime_state.h" #include "common/names.h" @@ -91,32 +91,39 @@ Status AggFn::Init(const RowDescriptor& row_desc, RuntimeState* state) { return Status(ss.str()); } - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location, - aggregate_fn.init_fn_symbol, &init_fn_, &_cache_entry)); - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location, - aggregate_fn.update_fn_symbol, &update_fn_, &_cache_entry)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, aggregate_fn.init_fn_symbol, _fn.hdfs_location, "", &init_fn_, &_cache_entry)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, aggregate_fn.update_fn_symbol, _fn.hdfs_location, "", &update_fn_, &_cache_entry)); // Merge() is not defined for purely analytic function. if (!aggregate_fn.is_analytic_only_fn) { - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location, - aggregate_fn.merge_fn_symbol, &merge_fn_, &_cache_entry)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, aggregate_fn.merge_fn_symbol, _fn.hdfs_location, "", &merge_fn_, &_cache_entry)); } // Serialize(), GetValue(), Remove() and Finalize() are optional if (!aggregate_fn.serialize_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location, - aggregate_fn.serialize_fn_symbol, &serialize_fn_, &_cache_entry)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, aggregate_fn.serialize_fn_symbol, + _fn.hdfs_location, "", + &serialize_fn_, &_cache_entry)); } if (!aggregate_fn.get_value_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location, - aggregate_fn.get_value_fn_symbol, &get_value_fn_, &_cache_entry)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, aggregate_fn.get_value_fn_symbol, _fn.hdfs_location, "", + &get_value_fn_, &_cache_entry)); } if (!aggregate_fn.remove_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location, - aggregate_fn.remove_fn_symbol, &remove_fn_, &_cache_entry)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, aggregate_fn.remove_fn_symbol, + _fn.hdfs_location, "", + &remove_fn_, &_cache_entry)); } if (!aggregate_fn.finalize_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location, - _fn.aggregate_fn.finalize_fn_symbol, &finalize_fn_, &_cache_entry)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, _fn.aggregate_fn.finalize_fn_symbol, + _fn.hdfs_location, "", + &finalize_fn_, &_cache_entry)); } return Status::OK; } diff --git a/be/src/exprs/agg_fn_evaluator.cpp b/be/src/exprs/agg_fn_evaluator.cpp index 6653bc0abb..ec8c6c6dc7 100755 --- a/be/src/exprs/agg_fn_evaluator.cpp +++ b/be/src/exprs/agg_fn_evaluator.cpp @@ -24,7 +24,7 @@ #include "exec/aggregation_node.h" #include "exprs/aggregate_functions.h" #include "exprs/anyval_util.h" -//#include "runtime/lib_cache.h" +#include "runtime/user_function_cache.h" #include "udf/udf_internal.h" #include "util/debug_util.h" #include "runtime/datetime_value.h" @@ -207,37 +207,38 @@ Status AggFnEvaluator::prepare( } // Load the function pointers. - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr( - _hdfs_location, _fn.aggregate_fn.init_fn_symbol, &_init_fn, NULL, true)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, _fn.aggregate_fn.init_fn_symbol, _hdfs_location, "", &_init_fn, NULL)); - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr( - _hdfs_location, _fn.aggregate_fn.update_fn_symbol, &_update_fn, NULL, true)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, _fn.aggregate_fn.update_fn_symbol, _hdfs_location, "", &_update_fn, NULL)); // Merge() is not loaded if evaluating the agg fn as an analytic function. if (!_is_analytic_fn) { - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr( - _hdfs_location, _fn.aggregate_fn.merge_fn_symbol, &_merge_fn, NULL, true)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, _fn.aggregate_fn.merge_fn_symbol, _hdfs_location, "", &_merge_fn, NULL)); } // Serialize and Finalize are optional if (!_fn.aggregate_fn.serialize_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr( - _hdfs_location, _fn.aggregate_fn.serialize_fn_symbol, &_serialize_fn, NULL, true)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, _fn.aggregate_fn.serialize_fn_symbol, _hdfs_location, + "", &_serialize_fn, NULL)); } if (!_fn.aggregate_fn.finalize_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr( - _hdfs_location, _fn.aggregate_fn.finalize_fn_symbol, &_finalize_fn, NULL, true)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, _fn.aggregate_fn.finalize_fn_symbol, _hdfs_location, "", &_finalize_fn, NULL)); } if (!_fn.aggregate_fn.get_value_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr( - _hdfs_location, _fn.aggregate_fn.get_value_fn_symbol, &_get_value_fn, - NULL, true)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, _fn.aggregate_fn.get_value_fn_symbol, _hdfs_location, "", &_get_value_fn, + NULL)); } if (!_fn.aggregate_fn.remove_fn_symbol.empty()) { - RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr( - _hdfs_location, _fn.aggregate_fn.remove_fn_symbol, &_remove_fn, - NULL, true)); + RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr( + _fn.id, _fn.aggregate_fn.remove_fn_symbol, _hdfs_location, "", &_remove_fn, + NULL)); } vector arg_types; diff --git a/be/src/exprs/arithmetic_expr.h b/be/src/exprs/arithmetic_expr.h index 7d06d635b7..b83f89c319 100644 --- a/be/src/exprs/arithmetic_expr.h +++ b/be/src/exprs/arithmetic_expr.h @@ -18,6 +18,7 @@ #ifndef DORIS_BE_SRC_EXPRS_ARITHMETIC_EXPR_H #define DORIS_BE_SRC_EXPRS_ARITHMETIC_EXPR_H +#include "common/object_pool.h" #include "exprs/expr.h" namespace doris { diff --git a/be/src/exprs/binary_predicate.h b/be/src/exprs/binary_predicate.h index 783e2dbde0..463ca9b92d 100644 --- a/be/src/exprs/binary_predicate.h +++ b/be/src/exprs/binary_predicate.h @@ -20,7 +20,10 @@ #include #include + #include + +#include "common/object_pool.h" #include "exprs/predicate.h" #include "gen_cpp/Exprs_types.h" diff --git a/be/src/exprs/case_expr.h b/be/src/exprs/case_expr.h index 02795af0b6..7b848a96f4 100644 --- a/be/src/exprs/case_expr.h +++ b/be/src/exprs/case_expr.h @@ -20,6 +20,7 @@ #include #include "expr.h" +#include "common/object_pool.h" namespace llvm { class Function; diff --git a/be/src/exprs/cast_expr.h b/be/src/exprs/cast_expr.h index 75de1c8177..e7e516afe8 100644 --- a/be/src/exprs/cast_expr.h +++ b/be/src/exprs/cast_expr.h @@ -18,6 +18,7 @@ #ifndef DORIS_BE_SRC_EXPRS_CAST_EXPR_H #define DORIS_BE_SRC_EXPRS_CAST_EXPR_H +#include "common/object_pool.h" #include "exprs/expr.h" namespace doris { diff --git a/be/src/exprs/compound_predicate.h b/be/src/exprs/compound_predicate.h index b5bf2bc525..fbecd28777 100644 --- a/be/src/exprs/compound_predicate.h +++ b/be/src/exprs/compound_predicate.h @@ -19,6 +19,8 @@ #define DORIS_BE_SRC_QUERY_EXPRS_COMPOUND_PREDICATE_H #include + +#include "common/object_pool.h" #include "exprs/predicate.h" #include "gen_cpp/Exprs_types.h" diff --git a/be/src/exprs/conditional_functions.h b/be/src/exprs/conditional_functions.h index 53502a3773..d14ba7b92a 100644 --- a/be/src/exprs/conditional_functions.h +++ b/be/src/exprs/conditional_functions.h @@ -19,6 +19,7 @@ #define DORIS_BE_SRC_QUERY_EXPRS_CONDITIONAL_FUNCTIONS_H #include +#include "common/object_pool.h" #include "exprs/expr.h" #include "udf/udf.h" diff --git a/be/src/exprs/expr.cpp b/be/src/exprs/expr.cpp index a65147c578..e971006751 100644 --- a/be/src/exprs/expr.cpp +++ b/be/src/exprs/expr.cpp @@ -48,6 +48,7 @@ #include "gen_cpp/Data_types.h" #include "runtime/runtime_state.h" #include "runtime/raw_value.h" +#include "runtime/user_function_cache.h" #include "util/debug_util.h" #include "gen_cpp/Exprs_types.h" @@ -1077,11 +1078,15 @@ Status Expr::create_tree_internal(const vector& nodes, ObjectPool* po // TODO chenhao void Expr::close() { - for (Expr* child : _children) child->close(); - /*if (_cache_entry != nullptr) { - LibCache::instance()->decrement_use_count(_cache_entry); - _cache_entry = nullptr; - }*/ + for (Expr* child : _children) child->close(); + /*if (_cache_entry != nullptr) { + LibCache::instance()->decrement_use_count(_cache_entry); + _cache_entry = nullptr; + }*/ + if (_cache_entry != nullptr) { + UserFunctionCache::instance()->release_entry(_cache_entry); + _cache_entry = nullptr; + } } void Expr::close(const vector& exprs) { diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h index 1690992ceb..d947cf8e34 100644 --- a/be/src/exprs/expr.h +++ b/be/src/exprs/expr.h @@ -34,7 +34,6 @@ #include "runtime/datetime_value.h" #include "runtime/decimal_value.h" #include "udf/udf.h" -#include "runtime/lib_cache.h" #include "runtime/types.h" //#include // @@ -65,6 +64,7 @@ class TupleIsNullPredicate; class VectorizedRowBatch; class Literal; class MemTracker; +class UserFunctionCacheEntry; // This is the superclass of all expr evaluation nodes. class Expr { @@ -396,7 +396,7 @@ protected: ExprContext* ctx, RuntimeState* state, int varargs_buffer_size); /// Cache entry for the library implementing this function. - LibCache::LibCacheEntry* _cache_entry; + UserFunctionCacheEntry* _cache_entry; // function opcode diff --git a/be/src/exprs/info_func.h b/be/src/exprs/info_func.h index fd1279d614..f0c05bb80d 100644 --- a/be/src/exprs/info_func.h +++ b/be/src/exprs/info_func.h @@ -20,6 +20,7 @@ #include #include +#include "common/object_pool.h" #include "exprs/expr.h" #include "gen_cpp/Exprs_types.h" diff --git a/be/src/exprs/literal.h b/be/src/exprs/literal.h index 49b3d5181c..307775b89f 100644 --- a/be/src/exprs/literal.h +++ b/be/src/exprs/literal.h @@ -18,6 +18,7 @@ #ifndef DORIS_BE_SRC_QUERY_EXPRS_LITERAL_H #define DORIS_BE_SRC_QUERY_EXPRS_LITERAL_H +#include "common/object_pool.h" #include "exprs/expr.h" namespace doris { diff --git a/be/src/exprs/new_agg_fn_evaluator.cc b/be/src/exprs/new_agg_fn_evaluator.cc index f61a6afb5e..384b2b757c 100644 --- a/be/src/exprs/new_agg_fn_evaluator.cc +++ b/be/src/exprs/new_agg_fn_evaluator.cc @@ -28,7 +28,6 @@ #include "exprs/expr.h" #include "exprs/scalar_fn_call.h" #include "gutil/strings/substitute.h" -#include "runtime/lib_cache.h" #include "runtime/mem_tracker.h" #include "runtime/raw_value.h" #include "runtime/runtime_state.h" diff --git a/be/src/exprs/new_agg_fn_evaluator.h b/be/src/exprs/new_agg_fn_evaluator.h index bfbaae73de..7c9bd72f5c 100644 --- a/be/src/exprs/new_agg_fn_evaluator.h +++ b/be/src/exprs/new_agg_fn_evaluator.h @@ -28,7 +28,6 @@ #include "exprs/agg_fn.h" #include "exprs/hybird_map.h" #include "runtime/descriptors.h" -#include "runtime/lib_cache.h" #include "runtime/tuple_row.h" #include "runtime/types.h" #include "udf/udf.h" diff --git a/be/src/exprs/null_literal.h b/be/src/exprs/null_literal.h index 5edae780ef..857b5b483f 100644 --- a/be/src/exprs/null_literal.h +++ b/be/src/exprs/null_literal.h @@ -18,6 +18,7 @@ #ifndef DORIS_BE_SRC_QUERY_EXPRS_NULL_LITERAL_H #define DORIS_BE_SRC_QUERY_EXPRS_NULL_LITERAL_H +#include "common/object_pool.h" #include "exprs/expr.h" namespace llvm { diff --git a/be/src/exprs/scalar_fn_call.cpp b/be/src/exprs/scalar_fn_call.cpp index 10a2d80f3c..fd6ffeaf57 100644 --- a/be/src/exprs/scalar_fn_call.cpp +++ b/be/src/exprs/scalar_fn_call.cpp @@ -25,7 +25,7 @@ #include "codegen/llvm_codegen.h" #include "exprs/anyval_util.h" #include "exprs/expr_context.h" -#include "runtime/lib_cache.h" +#include "runtime/user_function_cache.h" #include "runtime/runtime_state.h" #include "udf/udf_internal.h" #include "util/debug_util.h" @@ -89,8 +89,8 @@ Status ScalarFnCall::prepare( Status status = Status::OK; if (_scalar_fn == NULL) { if (SymbolsUtil::is_mangled(_fn.scalar_fn.symbol)) { - status = LibCache::instance()->get_so_function_ptr( - _fn.hdfs_location, _fn.scalar_fn.symbol, &_scalar_fn, &_cache_entry, true); + status = UserFunctionCache::instance()->get_function_ptr( + _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, "", &_scalar_fn, &_cache_entry); } else { std::vector arg_types; for (auto& t_type : _fn.arg_types) { @@ -100,8 +100,8 @@ Status ScalarFnCall::prepare( // ret_type = ColumnType(thrift_to_type(_fn.ret_type)); std::string symbol = SymbolsUtil::mangle_user_function( _fn.scalar_fn.symbol, arg_types, _fn.has_var_args, NULL); - status = LibCache::instance()->get_so_function_ptr( - _fn.hdfs_location, symbol, &_scalar_fn, &_cache_entry, true); + status = UserFunctionCache::instance()->get_function_ptr( + _fn.id, symbol, _fn.hdfs_location, "", &_scalar_fn, &_cache_entry); } } #if 0 @@ -119,7 +119,7 @@ Status ScalarFnCall::prepare( if (char_arg) { DCHECK(num_fixed_args() <= 8 && _fn.binary_type == TFunctionBinaryType::BUILTIN); } - Status status = LibCache::instance()->GetSoFunctionPtr( + Status status = UserFunctionCache::instance()->GetSoFunctionPtr( _fn.hdfs_location, _fn.scalar_fn.symbol, &_scalar_fn, &cache_entry_); if (!status.ok()) { if (_fn.binary_type == TFunctionBinaryType::BUILTIN) { @@ -141,8 +141,8 @@ Status ScalarFnCall::prepare( if (_fn.binary_type == TFunctionBinaryType::IR) { std::string local_path; - RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath( - _fn.hdfs_location, LibCache::TYPE_IR, &local_path)); + RETURN_IF_ERROR(UserFunctionCache::instance()->GetLocalLibPath( + _fn.hdfs_location, UserFunctionCache::TYPE_IR, &local_path)); // Link the UDF module into this query's main module (essentially copy the UDF // module into the main module) so the UDF's functions are available in the main // module. @@ -425,8 +425,8 @@ Status ScalarFnCall::get_udf(RuntimeState* state, Function** udf) { // This can either be a UDF implemented in a .so or a builtin using the UDF // interface with the code in impalad. void* fn_ptr = NULL; - Status status = LibCache::instance()->get_so_function_ptr( - _fn.hdfs_location, _fn.scalar_fn.symbol, &fn_ptr, &_cache_entry); + Status status = UserFunctionCache::instance()->get_function_ptr( + _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, "", &fn_ptr, &_cache_entry); if (!status.ok() && _fn.binary_type == TFunctionBinaryType::BUILTIN) { // Builtins symbols should exist unless there is a version mismatch. // TODO(zc ) @@ -541,8 +541,8 @@ Status ScalarFnCall::get_udf(RuntimeState* state, Function** udf) { Status ScalarFnCall::get_function(RuntimeState* state, const std::string& symbol, void** fn) { if (_fn.binary_type == TFunctionBinaryType::NATIVE || _fn.binary_type == TFunctionBinaryType::BUILTIN) { - return LibCache::instance()->get_so_function_ptr( - _fn.hdfs_location, symbol, fn, &_cache_entry, true); + return UserFunctionCache::instance()->get_function_ptr( + _fn.id, symbol, _fn.hdfs_location, "", fn, &_cache_entry); } else { #if 0 DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::IR); diff --git a/be/src/exprs/scalar_fn_call.h b/be/src/exprs/scalar_fn_call.h index 240faac6ce..dcd2ba782c 100644 --- a/be/src/exprs/scalar_fn_call.h +++ b/be/src/exprs/scalar_fn_call.h @@ -20,6 +20,7 @@ #include +#include "common/object_pool.h" #include "exprs/expr.h" #include "udf/udf.h" diff --git a/be/src/exprs/slot_ref.h b/be/src/exprs/slot_ref.h index 01202629fc..6d6f7ffb38 100644 --- a/be/src/exprs/slot_ref.h +++ b/be/src/exprs/slot_ref.h @@ -18,6 +18,7 @@ #ifndef DORIS_BE_SRC_QUERY_EXPRS_SLOT_REF_H #define DORIS_BE_SRC_QUERY_EXPRS_SLOT_REF_H +#include "common/object_pool.h" #include "exprs/expr.h" namespace doris { diff --git a/be/src/exprs/tuple_is_null_predicate.h b/be/src/exprs/tuple_is_null_predicate.h index 08c7a82317..f55b0e0271 100644 --- a/be/src/exprs/tuple_is_null_predicate.h +++ b/be/src/exprs/tuple_is_null_predicate.h @@ -18,6 +18,7 @@ #ifndef DORIS_BE_SRC_QUERY_EXPRS_TUPLE_IS_NULL_PREDICATE_H #define DORIS_BE_SRC_QUERY_EXPRS_TUPLE_IS_NULL_PREDICATE_H +#include "common/object_pool.h" #include "exprs/predicate.h" namespace doris { diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index e8c4834b05..6e93a02ffd 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -159,16 +159,19 @@ Status HttpClient::download(const std::string& local_path) { LOG(WARNING) << "open file failed, file=" << local_path; return Status("open file failed"); } - auto callback = [&fp, &local_path] (const void* data, size_t length) { + Status status; + auto callback = [&status, &fp, &local_path] (const void* data, size_t length) { auto res = fwrite(data, length, 1, fp.get()); if (res != 1) { LOG(WARNING) << "fail to write data to file, file=" << local_path << ", error=" << ferror(fp.get()); + status = Status("fail to write data when download"); return false; } return true; }; - return execute(callback); + RETURN_IF_ERROR(execute(callback)); + return status; } Status HttpClient::execute(std::string* response) { diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 5dcf63bb40..fd911382e6 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -37,7 +37,7 @@ add_library(Runtime STATIC descriptors.cpp exec_env.cpp exec_env_init.cpp - lib_cache.cpp + user_function_cache.cpp mem_pool.cpp plan_fragment_executor.cpp primitive_type.cpp diff --git a/be/src/runtime/lib_cache.cpp b/be/src/runtime/lib_cache.cpp deleted file mode 100644 index 5bfaa1a330..0000000000 --- a/be/src/runtime/lib_cache.cpp +++ /dev/null @@ -1,437 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/lib_cache.h" - -#include -#include -#include - -#include "codegen/llvm_codegen.h" -#include "runtime/runtime_state.h" -#include "util/dynamic_util.h" -#include "util/hash_util.hpp" -#include "util/path_builder.h" - -namespace doris { - -boost::scoped_ptr LibCache::_s_instance; - -struct LibCache::LibCacheEntry { - // Lock protecting all fields in this entry - boost::mutex lock; - - // The number of users that are using this cache entry. If this is - // a .so, we can't dlclose unless the use_count goes to 0. - int use_count; - - // If true, this cache entry should be removed from _lib_cache when - // the use_count goes to 0. - bool should_remove; - - // If true, we need to check if there is a newer version of the cached library in HDFS - // on next access. Should hold _lock to read/write. - bool check_needs_refresh; - - // The type of this file. - LibType type; - - // The path on the local file system for this library. - std::string local_path; - - // Status returned from copying this file from HDFS. - Status copy_file_status; - - // The last modification time of the HDFS file in seconds. - time_t last_mod_time; - - // Handle from dlopen. - void* shared_object_handle; - - // mapping from symbol => address of loaded symbol. - // Only used if the type is TYPE_SO. - typedef boost::unordered_map SymbolMap; - SymbolMap symbol_cache; - - // Set of symbols in this entry. This is populated once on load and read - // only. This is only used if it is a llvm module. - // TODO: it would be nice to be able to do this for .so's as well but it's - // not trivial to walk an .so for the symbol table. - boost::unordered_set symbols; - - // Set if an error occurs loading the cache entry before the cache entry - // can be evicted. This allows other threads that attempt to use the entry - // before it is removed to return the same error. - Status loading_status; - - LibCacheEntry() : use_count(0), should_remove(false), check_needs_refresh(false), - shared_object_handle(NULL) {} - ~LibCacheEntry(); -}; - -LibCache::LibCache() : - _current_process_handle(NULL) { -} - -LibCache::~LibCache() { - drop_cache(); - if (_current_process_handle != NULL) { - dynamic_close(_current_process_handle); - } -} - -Status LibCache::init() { - DCHECK(LibCache::_s_instance.get() == NULL); - LibCache::_s_instance.reset(new LibCache()); - return LibCache::_s_instance->init_internal(); -} - -Status LibCache::init_internal() { - // if (TestInfo::is_fe_test()) { - // // In the FE tests, NULL gives the handle to the java process. - // // Explicitly load the fe-support shared object. - // std::string fe_support_path; - // PathBuilder::GetFullBuildPath("service/libfesupport.so", &fe_support_path); - // RETURN_IF_ERROR(dynamic_open(fe_support_path.c_str(), &_current_process_handle)); - // } else { - RETURN_IF_ERROR(dynamic_open(NULL, &_current_process_handle)); - DCHECK(_current_process_handle != NULL) - << "We should always be able to get current process handle."; - return Status::OK; -} - -LibCache::LibCacheEntry::~LibCacheEntry() { - if (shared_object_handle != NULL) { - DCHECK_EQ(use_count, 0); - DCHECK(should_remove); - dynamic_close(shared_object_handle); - } - unlink(local_path.c_str()); -} - -Status LibCache::get_so_function_ptr( - const std::string& hdfs_lib_file, const std::string& origin_symbol, - void** fn_ptr, LibCacheEntry** ent, bool quiet) { - const std::string symbol = get_real_symbol(origin_symbol); - if (hdfs_lib_file.empty()) { - // Just loading a function ptr in the current process. No need to take any locks. - DCHECK(_current_process_handle != NULL); - RETURN_IF_ERROR(dynamic_lookup(_current_process_handle, symbol.c_str(), fn_ptr)); - return Status::OK; - } - LibCacheEntry* entry = NULL; - boost::unique_lock lock; - if (ent != NULL && *ent != NULL) { - // Reuse already-cached entry provided by user - entry = *ent; - boost::unique_lock l(entry->lock); - lock.swap(l); - } else { - RETURN_IF_ERROR(get_chache_entry(hdfs_lib_file, TYPE_SO, &lock, &entry)); - } - DCHECK(entry != NULL); - DCHECK_EQ(entry->type, TYPE_SO); - - LibCacheEntry::SymbolMap::iterator it = entry->symbol_cache.find(symbol); - if (it != entry->symbol_cache.end()) { - *fn_ptr = it->second; - } else { - RETURN_IF_ERROR( - dynamic_lookup(entry->shared_object_handle, symbol.c_str(), fn_ptr)); - entry->symbol_cache[symbol] = *fn_ptr; - } - - DCHECK(*fn_ptr != NULL); - if (ent != NULL && *ent == NULL) { - // Only set and increment user's entry if it wasn't already cached - *ent = entry; - ++(*ent)->use_count; - } - return Status::OK; -} - -void LibCache::decrement_use_count(LibCacheEntry* entry) { - if (entry == NULL) { - return; - } - bool can_delete = false; - { - boost::unique_lock lock(entry->lock); - --entry->use_count; - can_delete = (entry->use_count == 0 && entry->should_remove); - } - if (can_delete) { - delete entry; - } -} - -Status LibCache::get_local_lib_path( - const std::string& hdfs_lib_file, LibType type, std::string* local_path) { - boost::unique_lock lock; - LibCacheEntry* entry = NULL; - RETURN_IF_ERROR(get_chache_entry(hdfs_lib_file, type, &lock, &entry)); - DCHECK(entry != NULL); - DCHECK_EQ(entry->type, type); - *local_path = entry->local_path; - return Status::OK; -} - -Status LibCache::check_symbol_exists( - const std::string& hdfs_lib_file, LibType type, - const std::string& origin_symbol, bool quiet) { - const std::string symbol = get_real_symbol(origin_symbol); - if (type == TYPE_SO) { - void* dummy_ptr = NULL; - return get_so_function_ptr(hdfs_lib_file, symbol, &dummy_ptr, NULL, quiet); - } else if (type == TYPE_IR) { - boost::unique_lock lock; - LibCacheEntry* entry = NULL; - RETURN_IF_ERROR(get_chache_entry(hdfs_lib_file, type, &lock, &entry)); - DCHECK(entry != NULL); - DCHECK_EQ(entry->type, TYPE_IR); - if (entry->symbols.find(symbol) == entry->symbols.end()) { - std::stringstream ss; - ss << "Symbol '" << symbol << "' does not exist in module: " << hdfs_lib_file - << " (local path: " << entry->local_path << ")"; - // return quiet ? Status::Expected(ss.str()) : Status(ss.str()); - return Status(ss.str()); - } - return Status::OK; - } else if (type == TYPE_JAR) { - // TODO: figure out how to inspect contents of jars - boost::unique_lock lock; - LibCacheEntry* dummy_entry = NULL; - return get_chache_entry(hdfs_lib_file, type, &lock, &dummy_entry); - } else { - DCHECK(false); - return Status("Shouldn't get here."); - } -} - -void LibCache::set_needs_refresh(const std::string& hdfs_lib_file) { - boost::unique_lock lib_cache_lock(_lock); - LibMap::iterator it = _lib_cache.find(hdfs_lib_file); - if (it == _lib_cache.end()) { - return; - } - LibCacheEntry* entry = it->second; - - boost::unique_lock entry_lock(entry->lock); - // Need to hold _lock before setting check_needs_refresh. - entry->check_needs_refresh = true; -} - -void LibCache::remove_entry(const std::string& hdfs_lib_file) { - boost::unique_lock lib_cache_lock(_lock); - LibMap::iterator it = _lib_cache.find(hdfs_lib_file); - if (it == _lib_cache.end()) { - return; - } - remove_entry_internal(hdfs_lib_file, it); -} - -void LibCache::remove_entry_internal(const std::string& hdfs_lib_file, - const LibMap::iterator& entry_iter) { - LibCacheEntry* entry = entry_iter->second; - VLOG(1) << "Removing lib cache entry: " << hdfs_lib_file - << ", local path: " << entry->local_path; - boost::unique_lock entry_lock(entry->lock); - - // We have both locks so no other thread can be updating _lib_cache or trying to get - // the entry. - _lib_cache.erase(entry_iter); - - entry->should_remove = true; - DCHECK_GE(entry->use_count, 0); - bool can_delete = entry->use_count == 0; - - // Now that the entry is removed from the map, it means no future threads - // can find it->second (the entry), so it is safe to unlock. - entry_lock.unlock(); - - // Now that we've unlocked, we can delete this entry if no one is using it. - if (can_delete) { - delete entry; - } -} - -void LibCache::drop_cache() { - boost::unique_lock lib_cache_lock(_lock); - BOOST_FOREACH(LibMap::value_type& v, _lib_cache) { - bool can_delete = false; - { - // Lock to wait for any threads currently processing the entry. - boost::unique_lock entry_lock(v.second->lock); - v.second->should_remove = true; - DCHECK_GE(v.second->use_count, 0); - can_delete = v.second->use_count == 0; - } - VLOG(1) << "Removed lib cache entry: " << v.first; - if (can_delete) delete v.second; - } - _lib_cache.clear(); -} - -Status LibCache::get_chache_entry( - const std::string& hdfs_lib_file, LibType type, - boost::unique_lock* entry_lock, LibCacheEntry** entry) { - Status status; - { - // If an error occurs, local_entry_lock is released before calling remove_entry() - // below because it takes the global _lock which must be acquired before taking entry - // locks. - boost::unique_lock local_entry_lock; - status = get_chache_entry_internal(hdfs_lib_file, type, &local_entry_lock, entry); - if (status.ok()) { - entry_lock->swap(local_entry_lock); - return status; - } - if (*entry == NULL) return status; - - // Set loading_status on the entry so that if another thread calls - // get_chache_entry() for this lib before this thread is able to acquire _lock in - // remove_entry(), it is able to return the same error. - (*entry)->loading_status = status; - } - // Takes _lock - remove_entry(hdfs_lib_file); - return status; -} - -Status LibCache::get_chache_entry_internal( - const std::string& hdfs_lib_file, LibType type, - boost::unique_lock* entry_lock, LibCacheEntry** entry) { - DCHECK(!hdfs_lib_file.empty()); - *entry = NULL; -#if 0 - // Check if this file is already cached or an error occured on another thread while - // loading the library. - boost::unique_lock lib_cache_lock(_lock); - LibMap::iterator it = _lib_cache.find(hdfs_lib_file); - if (it != _lib_cache.end()) { - { - boost::unique_lock local_entry_lock((it->second)->lock); - if (!(it->second)->loading_status.ok()) { - // If loading_status is already set, the returned *entry should be NULL. - DCHECK(*entry == NULL); - return (it->second)->loading_status; - } - } - - *entry = it->second; - if ((*entry)->check_needs_refresh) { - // Check if file has been modified since loading the cached copy. If so, remove the - // cached entry and create a new one. - (*entry)->check_needs_refresh = false; - time_t last_mod_time; - hdfsFS hdfs_conn; - Status status = HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn); - if (!status.ok()) { - remove_entry_internal(hdfs_lib_file, it); - *entry = NULL; - return status; - } - status = GetLastModificationTime(hdfs_conn, hdfs_lib_file.c_str(), &last_mod_time); - if (!status.ok() || (*entry)->last_mod_time < last_mod_time) { - remove_entry_internal(hdfs_lib_file, it); - *entry = NULL; - } - RETURN_IF_ERROR(status); - } - } - - if (*entry != NULL) { - // Release the _lib_cache lock. This guarantees other threads looking at other - // libs can continue. - lib_cache_lock.unlock(); - boost::unique_lock local_entry_lock((*entry)->lock); - entry_lock->swap(local_entry_lock); - - RETURN_IF_ERROR((*entry)->copy_file_status); - DCHECK_EQ((*entry)->type, type); - DCHECK(!(*entry)->local_path.empty()); - return Status::OK; - } - - // Entry didn't exist. Add the entry then release _lock (so other libraries - // can be accessed). - *entry = new LibCacheEntry(); - - // Grab the entry lock before adding it to _lib_cache. We still need to do more - // work to initialize *entry and we don't want another thread to pick up - // the uninitialized entry. - boost::unique_lock local_entry_lock((*entry)->lock); - entry_lock->swap(local_entry_lock); - _lib_cache[hdfs_lib_file] = *entry; - lib_cache_lock.unlock(); - - // At this point we have the entry lock but not the lib cache lock. - DCHECK(*entry != NULL); - (*entry)->type = type; - - // Copy the file - (*entry)->local_path = make_local_path(hdfs_lib_file, FLAGS_local_library_dir); - VLOG(1) << "Adding lib cache entry: " << hdfs_lib_file - << ", local path: " << (*entry)->local_path; - - hdfsFS hdfs_conn, local_conn; - RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn)); - RETURN_IF_ERROR(HdfsFsCache::instance()->GetLocalConnection(&local_conn)); - - // Note: the file can be updated between getting last_mod_time and copying the file to - // local_path. This can only result in the file unnecessarily being refreshed, and does - // not affect correctness. - (*entry)->copy_file_status = GetLastModificationTime( - hdfs_conn, hdfs_lib_file.c_str(), &(*entry)->last_mod_time); - RETURN_IF_ERROR((*entry)->copy_file_status); - - (*entry)->copy_file_status = CopyHdfsFile( - hdfs_conn, hdfs_lib_file, local_conn, (*entry)->local_path); - RETURN_IF_ERROR((*entry)->copy_file_status); - - if (type == TYPE_SO) { - // dlopen the local library - RETURN_IF_ERROR( - DynamicOpen((*entry)->local_path.c_str(), &(*entry)->shared_object_handle)); - } else if (type == TYPE_IR) { - // Load the module and populate all symbols. - ObjectPool pool; - scoped_ptr codegen; - std::string module_id = boost::filesystem::path((*entry)->local_path).stem().string(); - RETURN_IF_ERROR(LlvmCodeGen::LoadFromFile( - &pool, (*entry)->local_path, module_id, &codegen)); - codegen->GetSymbols(&(*entry)->symbols); - } else { - DCHECK_EQ(type, TYPE_JAR); - // Nothing to do. - } -#endif - return Status::OK; -} - -std::string LibCache::make_local_path( - const std::string& hdfs_path, const std::string& local_dir) { - // Append the pid and library number to the local directory. - boost::filesystem::path src(hdfs_path); - std::stringstream dst; - dst << local_dir << "/" << src.stem().native() << "." << getpid() << "." - << (__sync_fetch_and_add(&_num_libs_copied, 1)) << src.extension().native(); - return dst.str(); -} - -} - diff --git a/be/src/runtime/lib_cache.h b/be/src/runtime/lib_cache.h deleted file mode 100644 index 2083875a5f..0000000000 --- a/be/src/runtime/lib_cache.h +++ /dev/null @@ -1,189 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_RUNTIME_LIB_CACHE_H -#define DORIS_BE_RUNTIME_LIB_CACHE_H - -#include -#include -#include -#include -#include -#include -#include "common/object_pool.h" -#include "common/status.h" - -namespace doris { - -class RuntimeState; - -/// Process-wide cache of dynamically-linked libraries loaded from HDFS. -/// These libraries can either be shared objects, llvm modules or jars. For -/// shared objects, when we load the shared object, we dlopen() it and keep -/// it in our process. For modules, we store the symbols in the module to -/// service symbol lookups. We can't cache the module since it (i.e. the external -/// module) is consumed when it is linked with the query codegen module. -// -/// Locking strategy: We don't want to grab a big lock across all operations since -/// one of the operations is copying a file from HDFS. With one lock that would -/// prevent any UDFs from running on the system. Instead, we have a global lock -/// that is taken when doing the cache lookup, but is not taking during any blocking calls. -/// During the block calls, we take the per-lib lock. -// -/// Entry lifetime management: We cannot delete the entry while a query is -/// using the library. When the caller requests a ptr into the library, they -/// are given the entry handle and must decrement the ref count when they -/// are done. -// -/// TODO: -/// - refresh libraries -/// - better cached module management. -class LibCache { -public: - struct LibCacheEntry; - - enum LibType { - TYPE_SO, // Shared object - TYPE_IR, // IR intermediate - TYPE_JAR, // Java jar file. We don't care about the contents in the BE. - }; - - static LibCache* instance() { - return LibCache::_s_instance.get(); - } - - /// Calls dlclose on all cached handles. - ~LibCache(); - - /// Initializes the libcache. Must be called before any other APIs. - static Status init(); - - /// Gets the local file system path for the library at 'hdfs_lib_file'. If - /// this file is not already on the local fs, it copies it and caches the - /// result. Returns an error if 'hdfs_lib_file' cannot be copied to the local fs. - Status get_local_lib_path(const std::string& hdfs_lib_file, LibType type, - std::string* local_path); - - /// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok otherwise. - /// If 'quiet' is true, the error status for non-Java unfound symbols will not be logged. - Status check_symbol_exists( - const std::string& hdfs_lib_file, LibType type, - const std::string& symbol, bool quiet); - - /// Returns a pointer to the function for the given library and symbol. - /// If 'hdfs_lib_file' is empty, the symbol is looked up in the impalad process. - /// Otherwise, 'hdfs_lib_file' should be the HDFS path to a shared library (.so) file. - /// dlopen handles and symbols are cached. - /// Only usable if 'hdfs_lib_file' refers to a shared object. - // - /// If entry is non-null and *entry is null, *entry will be set to the cached entry. If - /// entry is non-null and *entry is non-null, *entry will be reused (i.e., the use count - /// is not increased). The caller must call decrement_use_count(*entry) when it is done - /// using fn_ptr and it is no longer valid to use fn_ptr. - // - /// If 'quiet' is true, returned error statuses will not be logged. - Status get_so_function_ptr( - const std::string& hdfs_lib_file, const std::string& symbol, - void** fn_ptr, LibCacheEntry** entry, bool quiet); - - Status get_so_function_ptr( - const std::string& hdfs_lib_file, const std::string& symbol, - void** fn_ptr, LibCacheEntry** entry) { - return get_so_function_ptr(hdfs_lib_file, symbol, fn_ptr, entry, true); - } - - /// Marks the entry for 'hdfs_lib_file' as needing to be refreshed if the file in HDFS is - /// newer than the local cached copied. The refresh will occur the next time the entry is - /// accessed. - void set_needs_refresh(const std::string& hdfs_lib_file); - - /// See comment in get_so_function_ptr(). - void decrement_use_count(LibCacheEntry* entry); - - /// Removes the cache entry for 'hdfs_lib_file' - void remove_entry(const std::string& hdfs_lib_file); - - /// Removes all cached entries. - void drop_cache(); - -private: - /// Singleton instance. Instantiated in Init(). - static boost::scoped_ptr _s_instance; - - /// dlopen() handle for the current process (i.e. impalad). - void* _current_process_handle; - - /// The number of libs that have been copied from HDFS to the local FS. - /// This is appended to the local fs path to remove collisions. - int64_t _num_libs_copied; - - /// Protects _lib_cache. For lock ordering, this lock must always be taken before - /// the per entry lock. - boost::mutex _lock; - - /// Maps HDFS library path => cache entry. - /// Entries in the cache need to be explicitly deleted. - typedef boost::unordered_map LibMap; - LibMap _lib_cache; - - LibCache(); - LibCache(LibCache const& l); // disable copy ctor - LibCache& operator=(LibCache const& l); // disable assignment - - Status init_internal(); - - /// Returns the cache entry for 'hdfs_lib_file'. If this library has not been - /// copied locally, it will copy it and add a new LibCacheEntry to '_lib_cache'. - /// Result is returned in *entry. - /// No locks should be take before calling this. On return the entry's lock is - /// taken and returned in *entry_lock. - /// If an error is returned, there will be no entry in _lib_cache and *entry is NULL. - Status get_chache_entry( - const std::string& hdfs_lib_file, LibType type, - boost::unique_lock* entry_lock, LibCacheEntry** entry); - - /// Implementation to get the cache entry for 'hdfs_lib_file'. Errors are returned - /// without evicting the cache entry if the status is not OK and *entry is not NULL. - Status get_chache_entry_internal( - const std::string& hdfs_lib_file, LibType type, - boost::unique_lock* entry_lock, LibCacheEntry** entry); - - // map "palo" to "doris" in symbol, only for grayscale upgrading - std::string get_real_symbol(const std::string& symbol) { - static std::regex rx1("8palo_udf"); - std::string str1 = std::regex_replace(symbol, rx1, "9doris_udf"); - static std::regex rx2("4palo"); - std::string str2 = std::regex_replace(str1, rx2, "5doris"); - return str2; - } - - /// Utility function for generating a filename unique to this process and - /// 'hdfs_path'. This is to prevent multiple impalad processes or different library files - /// with the same name from clobbering each other. 'hdfs_path' should be the full path - /// (including the filename) of the file we're going to copy to the local FS, and - /// 'local_dir' is the local directory prefix of the returned path. - std::string make_local_path(const std::string& hdfs_path, const std::string& local_dir); - - /// Implementation to remove an entry from the cache. - /// _lock must be held. The entry's lock should not be held. - void remove_entry_internal(const std::string& hdfs_lib_file, - const LibMap::iterator& entry_iterator); -}; - -} - -#endif diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 7e2ecf583f..a21ecf34ab 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -46,6 +46,30 @@ namespace doris { +RuntimeState::RuntimeState( + const TUniqueId& fragment_instance_id, + const TQueryOptions& query_options, + const std::string& now, ExecEnv* exec_env) : + _obj_pool(new ObjectPool()), + _data_stream_recvrs_pool(new ObjectPool()), + _unreported_error_idx(0), + _profile(_obj_pool.get(), "Fragment " + print_id(fragment_instance_id)), + _fragment_mem_tracker(NULL), + _is_cancelled(false), + _per_fragment_instance_idx(0), + _root_node_id(-1), + _num_rows_load_success(0), + _num_rows_load_filtered(0), + _num_print_error_rows(0), + _normal_row_number(0), + _error_row_number(0), + _error_log_file(nullptr), + _is_running(true), + _instance_buffer_reservation(new ReservationTracker) { + Status status = init(fragment_instance_id, query_options, now, exec_env); + DCHECK(status.ok()); +} + RuntimeState::RuntimeState( const TExecPlanFragmentParams& fragment_params, const TQueryOptions& query_options, diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 7a413db162..f8d46ee5c2 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -65,6 +65,11 @@ class RowDescriptor; // query and shared across all execution nodes of that query. class RuntimeState { public: + // NOTE: only used to unit test + RuntimeState( + const TUniqueId& fragment_instance_id, + const TQueryOptions& query_options, + const std::string& now, ExecEnv* exec_env); RuntimeState( const TExecPlanFragmentParams& fragment_params, diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp new file mode 100644 index 0000000000..cc3e3bba08 --- /dev/null +++ b/be/src/runtime/user_function_cache.cpp @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/user_function_cache.h" + +#include + +#include // boost::split +#include // boost::algorithm::ends_with +#include // boost::is_any_of + +#include "http/http_client.h" +#include "util/dynamic_util.h" +#include "util/file_utils.h" +#include "util/md5.h" +#include "util/spinlock.h" + +namespace doris { + +static const int kLibShardNum = 128; + +// function cache entry, store information for +struct UserFunctionCacheEntry { + UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, + const std::string& lib_file_) + : function_id(fid_), checksum(checksum_), lib_file(lib_file_) { } + ~UserFunctionCacheEntry(); + + void ref() { _refs.fetch_add(1); } + + // If unref() returns true, this object should be delete + bool unref() { return _refs.fetch_sub(1) == 1; } + + int64_t function_id = 0; + // used to check if this library is valid. + std::string checksum; + + // library file + std::string lib_file; + + // make it atomic variable instead of holding a lock + std::atomic is_loaded{false}; + + // Set to true when this library is not needed. + // e.g. deleting some unused library to re + std::atomic should_delete_library{false}; + + // lock to make sure only one can load this cache + std::mutex load_lock; + + // To reduce cache lock held time, cache entry is + // added to cache map before library is downloaded. + // And this is used to indicate whether library is downloaded. + bool is_downloaded = false; + + // used to lookup a symbol + void* lib_handle = nullptr; + + SpinLock map_lock; + // from symbol_name to function pointer + std::unordered_map fptr_map; + +private: + std::atomic _refs{0}; +}; + +UserFunctionCacheEntry::~UserFunctionCacheEntry() { + // close lib_handle if it was opened + if (lib_handle != nullptr) { + dynamic_close(lib_handle); + lib_handle = nullptr; + } + + // delete library file if should_delete_library is set + if (should_delete_library.load()) { + unlink(lib_file.c_str()); + } +} + +UserFunctionCache::UserFunctionCache() { +} + +UserFunctionCache::~UserFunctionCache() { + std::lock_guard l(_cache_lock); + auto it = _entry_map.begin(); + while (it != _entry_map.end()) { + auto entry = it->second; + it = _entry_map.erase(it); + if (entry->unref()) { + delete entry; + } + } +} + +UserFunctionCache* UserFunctionCache::instance() { + static UserFunctionCache s_cache; + return &s_cache; +} + +Status UserFunctionCache::init(const std::string& lib_dir) { + DCHECK(_lib_dir.empty()); + _lib_dir = lib_dir; + // 1. dynamic open current process + RETURN_IF_ERROR(dynamic_open(nullptr, &_current_process_handle)); + // 2. load all cached + RETURN_IF_ERROR(_load_cached_lib()); + return Status::OK; +} + +Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std::string& file) { + if (!boost::algorithm::ends_with(file, ".so")) { + return Status("unknown library file format"); + } + + std::vector split_parts; + boost::split(split_parts, file, boost::is_any_of(".")); + if (split_parts.size() != 3) { + return Status("user function's name should be function_id.checksum.so"); + } + int64_t function_id = std::stol(split_parts[0]); + std::string checksum = split_parts[1]; + auto it = _entry_map.find(function_id); + if (it != _entry_map.end()) { + LOG(WARNING) << "meet a same function id user function library, function_id=" << function_id + << ", one_checksum=" << checksum << ", other_checksum=" << it->second->checksum; + return Status("duplicate function id"); + } + // create a cache entry and put it into entry map + UserFunctionCacheEntry* entry = new UserFunctionCacheEntry( + function_id, checksum, dir + "/" + file); + entry->is_downloaded = true; + + entry->ref(); + _entry_map[function_id] = entry; + + return Status::OK; +} + +Status UserFunctionCache::_load_cached_lib() { + // create library directory if not exist + RETURN_IF_ERROR(FileUtils::create_dir(_lib_dir)); + + auto scan_cb = [this] (const std::string& dir, const std::string& file) { + auto st = _load_entry_from_lib(dir, file); + if (!st.ok()) { + LOG(WARNING) << "load a library failed, dir=" << dir << ", file=" << file; + } + return true; + }; + for (int i = 0; i < kLibShardNum; ++i) { + std::string sub_dir = _lib_dir + "/" + std::to_string(i); + RETURN_IF_ERROR(FileUtils::create_dir(sub_dir)); + RETURN_IF_ERROR(FileUtils::scan_dir(sub_dir, scan_cb)); + } + return Status::OK; +} + +Status UserFunctionCache::get_function_ptr( + int64_t fid, + const std::string& symbol, + const std::string& url, + const std::string& checksum, + void** fn_ptr, + UserFunctionCacheEntry** output_entry) { + if (fid == 0) { + // Just loading a function ptr in the current process. No need to take any locks. + RETURN_IF_ERROR(dynamic_lookup(_current_process_handle, symbol.c_str(), fn_ptr)); + return Status::OK; + } + + // if we need to unref entry + bool need_unref_entry = false; + UserFunctionCacheEntry* entry = nullptr; + // find the library entry for this function. If *output_entry is not null + // find symbol in it without to get other entry + if (output_entry != nullptr && *output_entry != nullptr) { + entry = *output_entry; + } else { + RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry)); + need_unref_entry = true; + } + + Status status; + { + std::lock_guard l(entry->map_lock); + // now, we have the library entry, we need to lock it to find symbol + auto it = entry->fptr_map.find(symbol); + if (it != entry->fptr_map.end()) { + *fn_ptr = it->second; + } else { + status = dynamic_lookup(entry->lib_handle, symbol.c_str(), fn_ptr); + if (status.ok()) { + entry->fptr_map.emplace(symbol, *fn_ptr); + } else { + LOG(WARNING) << "fail to lookup symbol in library, symbol=" << symbol + << ", file=" << entry->lib_file; + } + } + } + + if (status.ok() && output_entry != nullptr && *output_entry == nullptr) { + *output_entry = entry; + need_unref_entry = false; + } + + if (need_unref_entry) { + if (entry->unref()) { + delete entry; + } + } + + return status; +} + +Status UserFunctionCache::_get_cache_entry( + int64_t fid, const std::string& url, + const std::string& checksum, UserFunctionCacheEntry** output_entry) { + UserFunctionCacheEntry* entry = nullptr; + { + std::lock_guard l(_cache_lock); + auto it = _entry_map.find(fid); + if (it != _entry_map.end()) { + entry = it->second; + } else { + entry = new UserFunctionCacheEntry( + fid, checksum, _make_lib_file(fid, checksum)); + + entry->ref(); + _entry_map.emplace(fid, entry); + } + entry->ref(); + } + auto st = _load_cache_entry(url, entry); + if (!st.ok()) { + LOG(WARNING) << "fail to load cache entry, fid=" << fid; + // if we load a cache entry failed, I think we should delete this entry cache + // evenif this cache was valid before. + _destroy_cache_entry(entry); + return st; + } + + *output_entry = entry; + return Status::OK; +} + +void UserFunctionCache::_destroy_cache_entry(UserFunctionCacheEntry* entry) { + // 1. we remove cache entry from entry map + size_t num_removed = 0; + { + std::lock_guard l(_cache_lock); + num_removed = _entry_map.erase(entry->function_id); + } + if (num_removed > 0) { + entry->unref(); + } + entry->should_delete_library.store(true); + // now we need to drop + if (entry->unref()) { + delete entry; + } +} + +Status UserFunctionCache::_load_cache_entry( + const std::string& url, UserFunctionCacheEntry* entry) { + if (entry->is_loaded.load()) { + return Status::OK; + } + + std::unique_lock l(entry->load_lock); + if (!entry->is_downloaded) { + RETURN_IF_ERROR(_download_lib(url, entry)); + } + + RETURN_IF_ERROR(_load_cache_entry_internal(entry)); + return Status::OK; +} + +// entry's lock must be held +Status UserFunctionCache::_download_lib( + const std::string& url, UserFunctionCacheEntry* entry) { + DCHECK(!entry->is_downloaded); + + // get local path to save library + std::string tmp_file = entry->lib_file + ".tmp"; + auto fp_closer = [] (FILE*fp) { fclose(fp); }; + std::unique_ptr fp(fopen(tmp_file.c_str(), "w"), fp_closer); + if (fp == nullptr) { + LOG(WARNING) << "fail to open file, file=" << tmp_file + << ", error=" << ferror(fp.get()); + return Status("fail to open file"); + } + + Md5Digest digest; + HttpClient client; + RETURN_IF_ERROR(client.init(url)); + Status status; + auto download_cb = [&status, &tmp_file, &fp, &digest] (const void* data, size_t length) { + digest.update(data, length); + auto res = fwrite(data, length, 1, fp.get()); + if (res != 1) { + LOG(WARNING) << "fail to write data to file, file=" << tmp_file + << ", error=" << ferror(fp.get()); + status = Status("fail to write data when download"); + return false; + } + return true; + }; + RETURN_IF_ERROR(client.execute(download_cb)); + RETURN_IF_ERROR(status); + digest.digest(); + if (!boost::iequals(digest.hex(), entry->checksum)) { + LOG(WARNING) << "UDF's checksum is not equal, one=" << digest.hex() + << ", other=" << entry->checksum; + return Status("UDF's library checksum is not match"); + } + // close this file + fp.reset(); + + // rename temporary file to library file + auto ret = rename(tmp_file.c_str(), entry->lib_file.c_str()); + if (ret != 0) { + char buf[64]; + LOG(WARNING) << "fail to rename file from=" << tmp_file << ", to=" << entry->lib_file + << ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 64); + return Status("fail to rename file"); + } + + // check download + entry->is_downloaded = true; + return Status::OK; +} + +// entry's lock must be held +Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry* entry) { + RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle)); + entry->is_loaded.store(true); + return Status::OK; +} + +std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum) { + int shard = function_id % kLibShardNum; + std::stringstream ss; + ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum << ".so"; + return ss.str(); +} + +void UserFunctionCache::release_entry(UserFunctionCacheEntry* entry) { + if (entry == nullptr) { + return; + } + if (entry->unref()) { + delete entry; + } +} + +} diff --git a/be/src/runtime/user_function_cache.h b/be/src/runtime/user_function_cache.h new file mode 100644 index 0000000000..e53fe46fb9 --- /dev/null +++ b/be/src/runtime/user_function_cache.h @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "common/status.h" + +namespace doris { + +struct UserFunctionCacheEntry; + +// Used to cache a user function. Theses functions inlcude +// UDF(User Definfed Function) and UDAF(User Defined Aggregate +// Function), and maybe inlucde UDTF(User Defined Table +// Function) in future. A user defined function may be splitted +// into several functions, for example, UDAF is splitted into +// InitFn, MergeFn, FinalizeFn... +// In Doris, we call UDF/UDAF/UDTF UserFunction, and we call +// implement function Fucntion. +// An UserFunction have a function id, we can find library with +// this id. When we add user function into cache, we need to +// download from URL and check its checksum. So if we find a function +// with id, this function library is valid. And when user wants to +// change its implementation(URL), Doris will generate a new function +// id. +class UserFunctionCache { +public: + // local_dir is the directory which contain cached library. + UserFunctionCache(); + ~UserFunctionCache(); + + // initialize this cache, call this function before others + Status init(const std::string& local_path); + + static UserFunctionCache* instance(); + + // Return function pointer for given fid and symbol. + // If fid is 0, lookup symbol from this doris-be process. + // Otherwise find symbol in UserFunction's library. + // Found function pointer is returned in fn_ptr, and cache entry + // is returned by entry. Client must call release_entry to release + // cache entry if didn't need it. + // If *entry is not true means that we should find symbol in this + // entry. + Status get_function_ptr(int64_t fid, + const std::string& symbol, + const std::string& url, + const std::string& checksum, + void** fn_ptr, + UserFunctionCacheEntry** entry); + void release_entry(UserFunctionCacheEntry* entry); + +private: + Status _load_cached_lib(); + Status _load_entry_from_lib(const std::string& dir, const std::string& file); + Status _get_cache_entry( + int64_t fid, const std::string& url, + const std::string& checksum, UserFunctionCacheEntry** output_entry); + Status _load_cache_entry(const std::string& url, UserFunctionCacheEntry* entry); + Status _download_lib( + const std::string& url, UserFunctionCacheEntry* entry); + Status _load_cache_entry_internal(UserFunctionCacheEntry* entry); + + std::string _make_lib_file(int64_t function_id, const std::string& checksum); + void _destroy_cache_entry(UserFunctionCacheEntry* entry); + +private: + std::string _lib_dir; + void* _current_process_handle = nullptr; + + std::mutex _cache_lock; + std::unordered_map _entry_map; +}; + +} diff --git a/be/src/util/file_utils.cpp b/be/src/util/file_utils.cpp index b1bc645b2a..a0cd5dca00 100644 --- a/be/src/util/file_utils.cpp +++ b/be/src/util/file_utils.cpp @@ -94,19 +94,10 @@ Status FileUtils::scan_dir( } DeferOp close_dir(std::bind(&closedir, dir)); - struct dirent entry; - struct dirent* result = nullptr; int64_t count = 0; while (true) { - int ret = readdir_r(dir, &entry, &result); - if (ret != 0) { - char buf[64]; - std::stringstream ss; - ss << "readdir(" << dir_path << ") failed, because: " << strerror_r(errno, buf, 64); - return Status(ss.str()); - } + auto result = readdir(dir.get()); if (result == nullptr) { - // Over break; } std::string file_name = result->d_name; @@ -127,6 +118,36 @@ Status FileUtils::scan_dir( return Status::OK; } +Status FileUtils::scan_dir( + const std::string& dir_path, + const std::function& callback) { + auto dir_closer = [] (DIR* dir) { closedir(dir); }; + std::unique_ptr dir(opendir(dir_path.c_str()), dir_closer); + if (dir == nullptr) { + char buf[64]; + LOG(WARNING) << "fail to open dir, dir=" << dir_path << ", errmsg=" << strerror_r(errno, buf, 64); + return Status("fail to opendir"); + } + + struct dirent* result = nullptr; + while (true) { + auto result = readdir(dir.get()); + if (result == nullptr) { + break; + } + std::string file_name = result->d_name; + if (file_name == "." || file_name == "..") { + continue; + } + auto is_continue = callback(dir_path, file_name); + if (!is_continue) { + break; + } + } + + return Status::OK; +} + bool FileUtils::is_dir(const std::string& path) { struct stat path_stat; if (stat(path.c_str(), &path_stat) != 0) { diff --git a/be/src/util/file_utils.h b/be/src/util/file_utils.h index b879ccf090..fca04f41b8 100644 --- a/be/src/util/file_utils.h +++ b/be/src/util/file_utils.h @@ -19,6 +19,7 @@ #define DORIS_BE_UTIL_FILE_UTILS_H #include +#include #include "common/status.h" @@ -44,6 +45,9 @@ public: static Status scan_dir( const std::string& dir_path, std::vector* files, int64_t* file_count = nullptr); + static Status scan_dir( + const std::string& dir_path, + const std::function& callback); // If the file_path is not exist, or is not a dir, return false. static bool is_dir(const std::string& file_path); diff --git a/be/test/exec/broker_scan_node_test.cpp b/be/test/exec/broker_scan_node_test.cpp index 3485762e49..066f468dfa 100644 --- a/be/test/exec/broker_scan_node_test.cpp +++ b/be/test/exec/broker_scan_node_test.cpp @@ -30,7 +30,7 @@ #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "runtime/row_batch.h" -#include "runtime/lib_cache.h" +#include "runtime/user_function_cache.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" @@ -44,7 +44,7 @@ public: } void init(); static void SetUpTestCase() { - LibCache::instance()->init(); + UserFunctionCache::instance()->init("./be/test/runtime/test_data/user_function_cache/normal"); CastFunctions::init(); } diff --git a/be/test/exec/broker_scanner_test.cpp b/be/test/exec/broker_scanner_test.cpp index b1d4813d79..5182cbbbda 100644 --- a/be/test/exec/broker_scanner_test.cpp +++ b/be/test/exec/broker_scanner_test.cpp @@ -29,7 +29,7 @@ #include "runtime/descriptors.h" #include "runtime/mem_tracker.h" #include "runtime/runtime_state.h" -#include "runtime/lib_cache.h" +#include "runtime/user_function_cache.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" #include "exprs/cast_functions.h" @@ -46,7 +46,7 @@ public: void init(); static void SetUpTestCase() { - LibCache::instance()->init(); + UserFunctionCache::instance()->init("./be/test/runtime/test_data/user_function_cache/normal"); CastFunctions::init(); } diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt index 5c86ad309e..a42f23cbd0 100644 --- a/be/test/runtime/CMakeLists.txt +++ b/be/test/runtime/CMakeLists.txt @@ -54,3 +54,4 @@ ADD_BE_TEST(stream_load_pipe_test) ADD_BE_TEST(tablet_writer_mgr_test) #ADD_BE_TEST(export_task_mgr_test) ADD_BE_TEST(snapshot_loader_test) +ADD_BE_TEST(user_function_cache_test) diff --git a/be/test/runtime/test_data/user_function_cache/lib/my_add.cc b/be/test/runtime/test_data/user_function_cache/lib/my_add.cc new file mode 100644 index 0000000000..683135f579 --- /dev/null +++ b/be/test/runtime/test_data/user_function_cache/lib/my_add.cc @@ -0,0 +1,4 @@ +void my_add() { +} +void my_del() { +} diff --git a/be/test/runtime/user_function_cache_test.cpp b/be/test/runtime/user_function_cache_test.cpp new file mode 100644 index 0000000000..a5f5b23df9 --- /dev/null +++ b/be/test/runtime/user_function_cache_test.cpp @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/user_function_cache.h" + +#include + +#include +#include + +#include "common/logging.h" +#include "http/ev_http_server.h" +#include "http/http_channel.h" +#include "http/http_handler.h" +#include "http/http_request.h" +#include "util/md5.h" +#include "util/file_utils.h" + +int main(int argc, char* argv[]); + +namespace doris { + +bool k_is_downloaded = false; +class UserFunctionTestHandler : public HttpHandler { +public: + void handle(HttpRequest* req) override { + auto& file_name = req->param("FILE"); + std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/lib"; + auto lib_file = lib_dir + "/" + file_name; + FILE* fp = fopen(lib_file.c_str(), "r"); + if (fp == nullptr) { + HttpChannel::send_error(req, INTERNAL_SERVER_ERROR); + return; + } + std::string response; + char buf[1024]; + while (true) { + auto size = fread(buf, 1, 1024, fp); + response.append(buf, size); + if (size < 1024) { + break; + } + } + HttpChannel::send_reply(req, response); + k_is_downloaded = true; + fclose(fp); + } +}; + +static UserFunctionTestHandler s_test_handler = UserFunctionTestHandler(); +static EvHttpServer* s_server = nullptr; + +std::string my_add_md5sum; + +static std::string compute_md5(const std::string& file) { + FILE* fp = fopen(file.c_str(), "r"); + Md5Digest md5; + char buf[1024]; + while (true) { + auto size = fread(buf, 1, 1024, fp); + md5.update(buf, size); + if (size < 1024) { + break; + } + } + fclose(fp); + md5.digest(); + return md5.hex(); +} +class UserFunctionCacheTest : public testing::Test { +public: + UserFunctionCacheTest() { } + virtual ~UserFunctionCacheTest() { } + static void SetUpTestCase() { + s_server = new EvHttpServer(29386); + s_server->register_handler(GET, "/{FILE}", &s_test_handler); + s_server->start(); + + // compile code to so + system("g++ -shared ./be/test/runtime/test_data/user_function_cache/lib/my_add.cc -o ./be/test/runtime/test_data/user_function_cache/lib/my_add.so"); + + my_add_md5sum = compute_md5("./be/test/runtime/test_data/user_function_cache/lib/my_add.so"); + } + static void TearDownTestCase() { + delete s_server; + system("rm -rf ./be/test/runtime/test_data/user_function_cache/lib/my_add.so"); + } + void SetUp() override { + k_is_downloaded = false; + } +}; + +TEST_F(UserFunctionCacheTest, process_symbol) { + UserFunctionCache cache; + std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/normal"; + auto st = cache.init(lib_dir); + ASSERT_TRUE(st.ok()); + void* fn_ptr = nullptr; + UserFunctionCacheEntry* entry = nullptr; + st = cache.get_function_ptr(0, "main", "", "", &fn_ptr, &entry); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(&main, fn_ptr); + ASSERT_EQ(nullptr, entry); + cache.release_entry(entry); +} + +TEST_F(UserFunctionCacheTest, download_normal) { + UserFunctionCache cache; + std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/download"; + FileUtils::remove_all(lib_dir); + + auto st = cache.init(lib_dir); + ASSERT_TRUE(st.ok()); + void* fn_ptr = nullptr; + UserFunctionCacheEntry* entry = nullptr; + // get my_add + st = cache.get_function_ptr(1, + "_Z6my_addv", + "http://127.0.0.1:29386/my_add.so", + my_add_md5sum, &fn_ptr, &entry); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(k_is_downloaded); + ASSERT_NE(nullptr, fn_ptr); + ASSERT_NE(nullptr, entry); + + // get my_del + st = cache.get_function_ptr(1, + "_Z6my_delv", + "http://127.0.0.1:29386/my_add.so", + my_add_md5sum, &fn_ptr, &entry); + ASSERT_TRUE(st.ok()); + ASSERT_NE(nullptr, fn_ptr); + ASSERT_NE(nullptr, entry); + + // get my_mul + st = cache.get_function_ptr(1, + "_Z6my_mulv", + "http://127.0.0.1:29386/my_add.so", + my_add_md5sum, &fn_ptr, &entry); + ASSERT_FALSE(st.ok()); + + cache.release_entry(entry); +} + +TEST_F(UserFunctionCacheTest, load_normal) { + UserFunctionCache cache; + std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/download"; + auto st = cache.init(lib_dir); + ASSERT_TRUE(st.ok()); + void* fn_ptr = nullptr; + UserFunctionCacheEntry* entry = nullptr; + st = cache.get_function_ptr(1, + "_Z6my_addv", + "http://127.0.0.1:29386/my_add.so", + my_add_md5sum, &fn_ptr, &entry); + ASSERT_TRUE(st.ok()); + ASSERT_FALSE(k_is_downloaded); + ASSERT_NE(nullptr, fn_ptr); + ASSERT_NE(nullptr, entry); + cache.release_entry(entry); +} + +TEST_F(UserFunctionCacheTest, download_fail) { + UserFunctionCache cache; + std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/download"; + auto st = cache.init(lib_dir); + ASSERT_TRUE(st.ok()); + void* fn_ptr = nullptr; + UserFunctionCacheEntry* entry = nullptr; + st = cache.get_function_ptr(2, + "_Z6my_delv", + "http://127.0.0.1:29386/my_del.so", + my_add_md5sum, &fn_ptr, &entry); + ASSERT_FALSE(st.ok()); +} + +TEST_F(UserFunctionCacheTest, md5_fail) { + UserFunctionCache cache; + std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/download"; + FileUtils::remove_all(lib_dir); + + auto st = cache.init(lib_dir); + ASSERT_TRUE(st.ok()); + void* fn_ptr = nullptr; + UserFunctionCacheEntry* entry = nullptr; + st = cache.get_function_ptr(1, + "_Z6my_addv", + "http://127.0.0.1:29386/my_add.so", + "1234", &fn_ptr, &entry); + ASSERT_FALSE(st.ok()); +} + +TEST_F(UserFunctionCacheTest, bad_so) { + UserFunctionCache cache; + std::string lib_dir = "./be/test/runtime/test_data/user_function_cache/bad"; + FileUtils::create_dir(lib_dir + "/2"); + auto so_file = lib_dir + "/2/2.abc.so"; + FILE* fp = fopen(so_file.c_str(), "w"); + fwrite(&fp, sizeof(FILE*), 1, fp); + fclose(fp); + auto st = cache.init(lib_dir); + ASSERT_TRUE(st.ok()); + void* fn_ptr = nullptr; + UserFunctionCacheEntry* entry = nullptr; + st = cache.get_function_ptr(2, + "_Z6my_addv", + "http://127.0.0.1:29386/my_add.so", + "abc", &fn_ptr, &entry); + ASSERT_FALSE(st.ok()); +} + +} + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 15271c3997..c9934ff74a 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -285,6 +285,7 @@ struct TFunction { 10: optional TAggregateFunction aggregate_fn 11: optional i64 id + 12: optional string checksum } enum TLoadJobState { diff --git a/run-ut.sh b/run-ut.sh index cc1707078b..c3e710751f 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -175,8 +175,9 @@ ${DORIS_TEST_BINARY_DIR}/runtime/free_list_test ${DORIS_TEST_BINARY_DIR}/runtime/string_buffer_test ${DORIS_TEST_BINARY_DIR}/runtime/stream_load_pipe_test ${DORIS_TEST_BINARY_DIR}/runtime/tablet_writer_mgr_test -## Running expr Unittest ${DORIS_TEST_BINARY_DIR}/runtime/snapshot_loader_test +${DORIS_TEST_BINARY_DIR}/runtime/user_function_cache_test +## Running expr Unittest # Running http ${DORIS_TEST_BINARY_DIR}/http/metrics_action_test