[UDF](java udf) using config to enable java udf instead of macro at compile time (#14062)
* [UDF](java udf) useing config to enable java udf instead of macro at compile time
This commit is contained in:
@ -600,17 +600,13 @@ include_directories(
|
||||
${GENSRC_DIR}/
|
||||
${THIRDPARTY_DIR}/include
|
||||
${GPERFTOOLS_HOME}/include
|
||||
)
|
||||
|
||||
if (BUILD_JAVA_UDF)
|
||||
include_directories($ENV{JAVA_HOME}/include)
|
||||
if (NOT OS_MACOSX)
|
||||
include_directories($ENV{JAVA_HOME}/include/linux)
|
||||
else()
|
||||
include_directories($ENV{JAVA_HOME}/include/darwin)
|
||||
endif()
|
||||
add_definitions("-DLIBJVM")
|
||||
endif()
|
||||
)
|
||||
|
||||
if (NOT OS_MACOSX)
|
||||
set(WL_START_GROUP "-Wl,--start-group")
|
||||
|
||||
@ -850,6 +850,9 @@ CONF_Int32(segcompaction_small_threshold, "1048576");
|
||||
|
||||
CONF_String(jvm_max_heap_size, "1024M");
|
||||
|
||||
// enable java udf and jdbc scannode
|
||||
CONF_Bool(enable_java_support, "true");
|
||||
|
||||
#ifdef BE_TEST
|
||||
// test s3
|
||||
CONF_String(test_s3_resource, "resource");
|
||||
|
||||
@ -171,11 +171,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
return Status::InternalError("Missing data jdbc sink.");
|
||||
}
|
||||
if (is_vec) {
|
||||
#ifdef LIBJVM
|
||||
sink->reset(new vectorized::VJdbcTableSink(pool, row_desc, output_exprs));
|
||||
#else
|
||||
return Status::InternalError("Jdbc table sink is disabled since no libjvm is found!");
|
||||
#endif
|
||||
if (config::enable_java_support) {
|
||||
sink->reset(new vectorized::VJdbcTableSink(pool, row_desc, output_exprs));
|
||||
} else {
|
||||
return Status::InternalError(
|
||||
"Jdbc table sink is not enabled, you can change be config "
|
||||
"enable_java_support to true and restart be.");
|
||||
}
|
||||
} else {
|
||||
return Status::InternalError("only support jdbc sink in vectorized engine.");
|
||||
}
|
||||
|
||||
@ -450,11 +450,13 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
|
||||
|
||||
case TPlanNodeType::JDBC_SCAN_NODE:
|
||||
if (state->enable_vectorized_exec()) {
|
||||
#ifdef LIBJVM
|
||||
*node = pool->add(new vectorized::NewJdbcScanNode(pool, tnode, descs));
|
||||
#else
|
||||
return Status::InternalError("Jdbc scan node is disabled since no libjvm is found!");
|
||||
#endif
|
||||
if (config::enable_java_support) {
|
||||
*node = pool->add(new vectorized::NewJdbcScanNode(pool, tnode, descs));
|
||||
} else {
|
||||
return Status::InternalError(
|
||||
"Jdbc scan node is disabled, you can change be config enable_java_support "
|
||||
"to true and restart be.");
|
||||
}
|
||||
} else {
|
||||
return Status::InternalError("Jdbc scan node only support vectorized engine.");
|
||||
}
|
||||
@ -722,11 +724,8 @@ void ExecNode::try_do_aggregate_serde_improve() {
|
||||
if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) ||
|
||||
typeid(*child0) == typeid(vectorized::NewFileScanNode) ||
|
||||
typeid(*child0) == typeid(vectorized::NewOdbcScanNode) ||
|
||||
typeid(*child0) == typeid(vectorized::NewEsScanNode)
|
||||
#ifdef LIBJVM
|
||||
|| typeid(*child0) == typeid(vectorized::NewJdbcScanNode)
|
||||
#endif
|
||||
) {
|
||||
typeid(*child0) == typeid(vectorized::NewEsScanNode) ||
|
||||
typeid(*child0) == typeid(vectorized::NewJdbcScanNode)) {
|
||||
vectorized::VScanNode* scan_node =
|
||||
static_cast<vectorized::VScanNode*>(agg_node[0]->_children[0]);
|
||||
scan_node->set_no_agg_finalize();
|
||||
|
||||
@ -170,11 +170,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
|
||||
if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
|
||||
typeid(*node) == typeid(vectorized::NewFileScanNode) ||
|
||||
typeid(*node) == typeid(vectorized::NewOdbcScanNode) ||
|
||||
typeid(*node) == typeid(vectorized::NewEsScanNode)
|
||||
#ifdef LIBJVM
|
||||
|| typeid(*node) == typeid(vectorized::NewJdbcScanNode)
|
||||
#endif
|
||||
) {
|
||||
typeid(*node) == typeid(vectorized::NewEsScanNode) ||
|
||||
typeid(*node) == typeid(vectorized::NewJdbcScanNode)) {
|
||||
vectorized::VScanNode* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]);
|
||||
const std::vector<TScanRangeParams>& scan_ranges =
|
||||
find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
|
||||
|
||||
@ -21,17 +21,16 @@
|
||||
#include <regex>
|
||||
#include <vector>
|
||||
|
||||
#include "common/config.h"
|
||||
#include "env/env.h"
|
||||
#include "gutil/strings/split.h"
|
||||
#include "http/http_client.h"
|
||||
#include "util/dynamic_util.h"
|
||||
#include "util/file_utils.h"
|
||||
#include "util/string_util.h"
|
||||
#ifdef LIBJVM
|
||||
#include "util/jni-util.h"
|
||||
#endif
|
||||
#include "util/md5.h"
|
||||
#include "util/spinlock.h"
|
||||
#include "util/string_util.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -377,28 +376,30 @@ Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry* ent
|
||||
}
|
||||
|
||||
Status UserFunctionCache::_add_to_classpath(UserFunctionCacheEntry* entry) {
|
||||
#ifdef LIBJVM
|
||||
const std::string path = "file://" + entry->lib_file;
|
||||
LOG(INFO) << "Add jar " << path << " to classpath";
|
||||
JNIEnv* env;
|
||||
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
||||
jclass class_class_loader = env->FindClass("java/lang/ClassLoader");
|
||||
jmethodID method_get_system_class_loader = env->GetStaticMethodID(
|
||||
class_class_loader, "getSystemClassLoader", "()Ljava/lang/ClassLoader;");
|
||||
jobject class_loader =
|
||||
env->CallStaticObjectMethod(class_class_loader, method_get_system_class_loader);
|
||||
jclass class_url_class_loader = env->FindClass("java/net/URLClassLoader");
|
||||
jmethodID method_add_url =
|
||||
env->GetMethodID(class_url_class_loader, "addURL", "(Ljava/net/URL;)V");
|
||||
jclass class_url = env->FindClass("java/net/URL");
|
||||
jmethodID url_ctor = env->GetMethodID(class_url, "<init>", "(Ljava/lang/String;)V");
|
||||
jobject urlInstance = env->NewObject(class_url, url_ctor, env->NewStringUTF(path.c_str()));
|
||||
env->CallVoidMethod(class_loader, method_add_url, urlInstance);
|
||||
entry->is_loaded.store(true);
|
||||
return Status::OK();
|
||||
#else
|
||||
return Status::InternalError("No libjvm is found!");
|
||||
#endif
|
||||
if (config::enable_java_support) {
|
||||
const std::string path = "file://" + entry->lib_file;
|
||||
LOG(INFO) << "Add jar " << path << " to classpath";
|
||||
JNIEnv* env;
|
||||
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
||||
jclass class_class_loader = env->FindClass("java/lang/ClassLoader");
|
||||
jmethodID method_get_system_class_loader = env->GetStaticMethodID(
|
||||
class_class_loader, "getSystemClassLoader", "()Ljava/lang/ClassLoader;");
|
||||
jobject class_loader =
|
||||
env->CallStaticObjectMethod(class_class_loader, method_get_system_class_loader);
|
||||
jclass class_url_class_loader = env->FindClass("java/net/URLClassLoader");
|
||||
jmethodID method_add_url =
|
||||
env->GetMethodID(class_url_class_loader, "addURL", "(Ljava/net/URL;)V");
|
||||
jclass class_url = env->FindClass("java/net/URL");
|
||||
jmethodID url_ctor = env->GetMethodID(class_url, "<init>", "(Ljava/lang/String;)V");
|
||||
jobject urlInstance = env->NewObject(class_url, url_ctor, env->NewStringUTF(path.c_str()));
|
||||
env->CallVoidMethod(class_loader, method_add_url, urlInstance);
|
||||
entry->is_loaded.store(true);
|
||||
return Status::OK();
|
||||
} else {
|
||||
return Status::InternalError(
|
||||
"Java UDF is not enabled, you can change be config enable_java_support to true and "
|
||||
"restart be.");
|
||||
}
|
||||
}
|
||||
|
||||
std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum,
|
||||
|
||||
@ -375,14 +375,14 @@ int main(int argc, char** argv) {
|
||||
apache::thrift::GlobalOutput.setOutputFunction(doris::thrift_output);
|
||||
|
||||
Status status = Status::OK();
|
||||
#ifdef LIBJVM
|
||||
// Init jni
|
||||
status = doris::JniUtil::Init();
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "Failed to initialize JNI: " << status.get_error_msg();
|
||||
exit(1);
|
||||
if (doris::config::enable_java_support) {
|
||||
// Init jni
|
||||
status = doris::JniUtil::Init();
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "Failed to initialize JNI: " << status.get_error_msg();
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
doris::Daemon daemon;
|
||||
daemon.init(argc, argv, paths);
|
||||
|
||||
@ -114,6 +114,7 @@ set(UTIL_FILES
|
||||
quantile_state.cpp
|
||||
jni-util.cpp
|
||||
exception.cpp
|
||||
libjvm_loader.cpp
|
||||
)
|
||||
|
||||
if (WITH_MYSQL)
|
||||
@ -127,10 +128,6 @@ if (OS_MACOSX)
|
||||
list(APPEND UTIL_FILES perf_counters_mac.cpp disk_info_mac.cpp)
|
||||
endif()
|
||||
|
||||
if (BUILD_JAVA_UDF)
|
||||
list(APPEND UTIL_FILES libjvm_loader.cpp)
|
||||
endif()
|
||||
|
||||
add_library(Util STATIC
|
||||
${UTIL_FILES}
|
||||
)
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
#include "util/jni-util.h"
|
||||
|
||||
#ifdef LIBJVM
|
||||
#include <jni.h>
|
||||
#include <jni_md.h>
|
||||
#include <stdlib.h>
|
||||
@ -290,4 +289,3 @@ Status JniUtil::Init() {
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
#endif
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef LIBJVM
|
||||
#include <jni.h>
|
||||
|
||||
#include "common/status.h"
|
||||
@ -155,5 +154,3 @@ Status SerializeThriftMsg(JNIEnv* env, T* msg, jbyteArray* serialized_msg) {
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
#endif
|
||||
|
||||
@ -17,8 +17,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef LIBJVM
|
||||
|
||||
#include <jni.h>
|
||||
#include <unistd.h>
|
||||
|
||||
@ -401,4 +399,3 @@ private:
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
#endif
|
||||
|
||||
@ -16,7 +16,6 @@
|
||||
// under the License.
|
||||
|
||||
#include "vec/exec/scan/new_jdbc_scan_node.h"
|
||||
#ifdef LIBJVM
|
||||
|
||||
#include "vec/exec/scan/new_jdbc_scanner.h"
|
||||
#include "vec/exec/scan/vscanner.h"
|
||||
@ -58,4 +57,3 @@ Status NewJdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) {
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace doris::vectorized
|
||||
#endif
|
||||
|
||||
@ -16,7 +16,6 @@
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
#ifdef LIBJVM
|
||||
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "vec/exec/scan/vscan_node.h"
|
||||
@ -41,4 +40,3 @@ private:
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
#endif
|
||||
|
||||
@ -17,8 +17,6 @@
|
||||
|
||||
#include "new_jdbc_scanner.h"
|
||||
|
||||
#ifdef LIBJVM
|
||||
|
||||
namespace doris::vectorized {
|
||||
NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
|
||||
TupleId tuple_id, std::string query_string)
|
||||
@ -151,4 +149,3 @@ Status NewJdbcScanner::close(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace doris::vectorized
|
||||
#endif
|
||||
|
||||
@ -16,7 +16,6 @@
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
#ifdef LIBJVM
|
||||
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "vec/exec/scan/new_jdbc_scan_node.h"
|
||||
@ -55,4 +54,3 @@ private:
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
#endif
|
||||
|
||||
@ -16,7 +16,7 @@
|
||||
// under the License.
|
||||
|
||||
#include "vec/exec/vjdbc_connector.h"
|
||||
#ifdef LIBJVM
|
||||
|
||||
#include "common/status.h"
|
||||
#include "exec/table_connector.h"
|
||||
#include "gen_cpp/Types_types.h"
|
||||
@ -409,5 +409,3 @@ FUNC_IMPL_TO_CONVERT_DATA(double, double, D, Double)
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
#endif
|
||||
|
||||
@ -16,10 +16,10 @@
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
#ifdef LIBJVM
|
||||
|
||||
#include <jni.h>
|
||||
|
||||
#include "exec/table_connector.h"
|
||||
#include "jni.h"
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
@ -104,5 +104,3 @@ private:
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
#endif
|
||||
|
||||
@ -114,11 +114,13 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc, M
|
||||
_real_argument_types.empty() ? tmp_argument_types : _real_argument_types;
|
||||
|
||||
if (_fn.binary_type == TFunctionBinaryType::JAVA_UDF) {
|
||||
#ifdef LIBJVM
|
||||
_function = AggregateJavaUdaf::create(_fn, argument_types, {}, _data_type);
|
||||
#else
|
||||
return Status::InternalError("Java UDAF is disabled since no libjvm is found!");
|
||||
#endif
|
||||
if (config::enable_java_support) {
|
||||
_function = AggregateJavaUdaf::create(_fn, argument_types, {}, _data_type);
|
||||
} else {
|
||||
return Status::InternalError(
|
||||
"Java UDF is not enabled, you can change be config enable_java_support to true "
|
||||
"and restart be.");
|
||||
}
|
||||
} else if (_fn.binary_type == TFunctionBinaryType::RPC) {
|
||||
_function = AggregateRpcUdaf::create(_fn, argument_types, {}, _data_type);
|
||||
} else {
|
||||
|
||||
@ -58,11 +58,13 @@ doris::Status VectorizedFnCall::prepare(doris::RuntimeState* state,
|
||||
if (_fn.binary_type == TFunctionBinaryType::RPC) {
|
||||
_function = FunctionRPC::create(_fn, argument_template, _data_type);
|
||||
} else if (_fn.binary_type == TFunctionBinaryType::JAVA_UDF) {
|
||||
#ifdef LIBJVM
|
||||
_function = JavaFunctionCall::create(_fn, argument_template, _data_type);
|
||||
#else
|
||||
return Status::InternalError("Java UDF is disabled since no libjvm is found!");
|
||||
#endif
|
||||
if (config::enable_java_support) {
|
||||
_function = JavaFunctionCall::create(_fn, argument_template, _data_type);
|
||||
} else {
|
||||
return Status::InternalError(
|
||||
"Java UDF is not enabled, you can change be config enable_java_support to true "
|
||||
"and restart be.");
|
||||
}
|
||||
} else {
|
||||
_function = SimpleFunctionFactory::instance().get_function(_fn.name.function_name,
|
||||
argument_template, _data_type);
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
#include "vec/functions/function_java_udf.h"
|
||||
|
||||
#ifdef LIBJVM
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include <memory>
|
||||
@ -223,4 +222,3 @@ Status JavaFunctionCall::close(FunctionContext* context,
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace doris::vectorized
|
||||
#endif
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef LIBJVM
|
||||
#include <jni.h>
|
||||
|
||||
#include "gen_cpp/Exprs_types.h"
|
||||
@ -128,4 +127,3 @@ private:
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
#endif
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
#include "vec/sink/vjdbc_table_sink.h"
|
||||
|
||||
#ifdef LIBJVM
|
||||
#include <gen_cpp/DataSinks_types.h>
|
||||
|
||||
#include <sstream>
|
||||
@ -101,4 +100,3 @@ Status VJdbcTableSink::close(RuntimeState* state, Status exec_status) {
|
||||
}
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
#endif
|
||||
@ -15,7 +15,6 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
#pragma once
|
||||
#ifdef LIBJVM
|
||||
#include "common/status.h"
|
||||
#include "vec/exec/vjdbc_connector.h"
|
||||
#include "vec/sink/vtable_sink.h"
|
||||
@ -45,4 +44,3 @@ private:
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
#endif
|
||||
1
build.sh
1
build.sh
@ -406,7 +406,6 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then
|
||||
-DWITH_LZO="${WITH_LZO}" \
|
||||
-DUSE_LIBCPP="${USE_LIBCPP}" \
|
||||
-DBUILD_META_TOOL="${BUILD_META_TOOL}" \
|
||||
-DBUILD_JAVA_UDF="${BUILD_JAVA_UDF}" \
|
||||
-DSTRIP_DEBUG_INFO="${STRIP_DEBUG_INFO}" \
|
||||
-DUSE_DWARF="${USE_DWARF}" \
|
||||
-DUSE_MEM_TRACKER="${USE_MEM_TRACKER}" \
|
||||
|
||||
Reference in New Issue
Block a user