diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index cd55f8fd56..0f836f4c0d 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -477,6 +477,22 @@ include_directories( ${THIRDPARTY_DIR}/include/breakpad/ ) +execute_process(COMMAND sh ${BASE_DIR}/../tools/find_libjvm.sh OUTPUT_VARIABLE LIBJVM_PATH OUTPUT_STRIP_TRAILING_WHITESPACE) +FILE(GLOB_RECURSE LIB_JVM ${LIBJVM_PATH}) +if(${LIB_JVM} STREQUAL "") + message(STATUS "Disable JAVA UDF because there is no libjvm found!") +else() + set(DORIS_DEPENDENCIES + ${DORIS_DEPENDENCIES} + jvm + ) + add_library(jvm SHARED IMPORTED) + set_target_properties(jvm PROPERTIES IMPORTED_LOCATION ${LIB_JVM}) + include_directories($ENV{JAVA_HOME}/include) + include_directories($ENV{JAVA_HOME}/include/linux) + add_definitions("-DLIBJVM") +endif() + set(WL_START_GROUP "-Wl,--start-group") set(WL_END_GROUP "-Wl,--end-group") diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp index aa1086e8f7..49fd4af546 100644 --- a/be/src/runtime/user_function_cache.cpp +++ b/be/src/runtime/user_function_cache.cpp @@ -28,6 +28,7 @@ #include "http/http_client.h" #include "util/dynamic_util.h" #include "util/file_utils.h" +#include "util/jni-util.h" #include "util/md5.h" #include "util/spinlock.h" @@ -37,8 +38,9 @@ static const int kLibShardNum = 128; // function cache entry, store information for struct UserFunctionCacheEntry { - UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_) - : function_id(fid_), checksum(checksum_), lib_file(lib_file_) {} + UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_, + LibType type) + : function_id(fid_), checksum(checksum_), lib_file(lib_file_), type(type) {} ~UserFunctionCacheEntry(); void ref() { _refs.fetch_add(1); } @@ -75,6 +77,8 @@ struct UserFunctionCacheEntry { // from symbol_name to function pointer std::unordered_map fptr_map; + LibType type; + private: std::atomic _refs{0}; }; @@ -141,7 +145,7 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std } // create a cache entry and put it into entry map UserFunctionCacheEntry* entry = - new UserFunctionCacheEntry(function_id, checksum, dir + "/" + file); + new UserFunctionCacheEntry(function_id, checksum, dir + "/" + file, LibType::SO); entry->is_downloaded = true; entry->ref(); @@ -199,7 +203,7 @@ Status UserFunctionCache::get_function_ptr(int64_t fid, const std::string& orig_ if (output_entry != nullptr && *output_entry != nullptr) { entry = *output_entry; } else { - RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry)); + RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry, LibType::SO)); need_unref_entry = true; } @@ -237,7 +241,8 @@ Status UserFunctionCache::get_function_ptr(int64_t fid, const std::string& orig_ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url, const std::string& checksum, - UserFunctionCacheEntry** output_entry) { + UserFunctionCacheEntry** output_entry, + LibType type) { UserFunctionCacheEntry* entry = nullptr; { std::lock_guard l(_cache_lock); @@ -245,7 +250,7 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url, if (it != _entry_map.end()) { entry = it->second; } else { - entry = new UserFunctionCacheEntry(fid, checksum, _make_lib_file(fid, checksum)); + entry = new UserFunctionCacheEntry(fid, checksum, _make_lib_file(fid, checksum, type), type); entry->ref(); _entry_map.emplace(fid, entry); @@ -292,7 +297,14 @@ Status UserFunctionCache::_load_cache_entry(const std::string& url, UserFunction RETURN_IF_ERROR(_download_lib(url, entry)); } - RETURN_IF_ERROR(_load_cache_entry_internal(entry)); + if (entry->type == LibType::SO) { + RETURN_IF_ERROR(_load_cache_entry_internal(entry)); + } else if (entry->type == LibType::JAR) { + RETURN_IF_ERROR(_add_to_classpath(entry)); + } else { + return Status::InvalidArgument( + "Unsupported lib type! Make sure your lib type is one of 'so' and 'jar'!"); + } return Status::OK(); } @@ -356,10 +368,38 @@ Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry* ent return Status::OK(); } -std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum) { +Status UserFunctionCache::_add_to_classpath(UserFunctionCacheEntry* entry) { +#ifdef LIBJVM + const std::string path = "file://" + entry->lib_file; + LOG(INFO) << "Add jar " << path << " to classpath"; + JNIEnv* env; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + jclass class_class_loader = env->FindClass("java/lang/ClassLoader"); + jmethodID method_get_system_class_loader = + env->GetStaticMethodID(class_class_loader, "getSystemClassLoader", "()Ljava/lang/ClassLoader;"); + jobject class_loader = env->CallStaticObjectMethod(class_class_loader, method_get_system_class_loader); + jclass class_url_class_loader = env->FindClass("java/net/URLClassLoader"); + jmethodID method_add_url = env->GetMethodID(class_url_class_loader, "addURL", "(Ljava/net/URL;)V"); + jclass class_url = env->FindClass("java/net/URL"); + jmethodID url_ctor = env->GetMethodID(class_url, "", "(Ljava/lang/String;)V"); + jobject urlInstance = env->NewObject(class_url, url_ctor, env->NewStringUTF(path.c_str())); + env->CallVoidMethod(class_loader, method_add_url, urlInstance); + return Status::OK(); +#else + return Status::InternalError("No libjvm is found!"); +#endif +} + +std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum, + LibType type) { int shard = function_id % kLibShardNum; std::stringstream ss; - ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum << ".so"; + ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum; + if (type == LibType::JAR) { + ss << ".jar"; + } else { + ss << ".so"; + } return ss.str(); } @@ -372,4 +412,12 @@ void UserFunctionCache::release_entry(UserFunctionCacheEntry* entry) { } } +Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url, const std::string& checksum, + std::string* libpath) { + UserFunctionCacheEntry* entry = nullptr; + RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry, LibType::JAR)); + *libpath = entry->lib_file; + return Status::OK(); +} + } // namespace doris diff --git a/be/src/runtime/user_function_cache.h b/be/src/runtime/user_function_cache.h index e357324555..a9b68e5e8b 100644 --- a/be/src/runtime/user_function_cache.h +++ b/be/src/runtime/user_function_cache.h @@ -41,6 +41,11 @@ struct UserFunctionCacheEntry; // with id, this function library is valid. And when user wants to // change its implementation(URL), Doris will generate a new function // id. +enum class LibType { + JAR, + SO +}; + class UserFunctionCache { public: // local_dir is the directory which contain cached library. @@ -65,16 +70,20 @@ public: UserFunctionCacheEntry** entry); void release_entry(UserFunctionCacheEntry* entry); + Status get_jarpath(int64_t fid, const std::string& url, const std::string& checksum, std::string* libpath); + private: Status _load_cached_lib(); Status _load_entry_from_lib(const std::string& dir, const std::string& file); Status _get_cache_entry(int64_t fid, const std::string& url, const std::string& checksum, - UserFunctionCacheEntry** output_entry); + UserFunctionCacheEntry** output_entry, LibType type); Status _load_cache_entry(const std::string& url, UserFunctionCacheEntry* entry); Status _download_lib(const std::string& url, UserFunctionCacheEntry* entry); Status _load_cache_entry_internal(UserFunctionCacheEntry* entry); - std::string _make_lib_file(int64_t function_id, const std::string& checksum); + Status _add_to_classpath(UserFunctionCacheEntry* entry); + + std::string _make_lib_file(int64_t function_id, const std::string& checksum, LibType type); void _destroy_cache_entry(UserFunctionCacheEntry* entry); private: diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 6e9bea3739..2b06779986 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -28,6 +28,8 @@ #include #include +#include "util/jni-util.h" + #if defined(LEAK_SANITIZER) #include #endif @@ -482,6 +484,16 @@ int main(int argc, char** argv) { exit(1); } +#ifdef LIBJVM + // 6. init jni + status = doris::JniUtil::Init(); + if (!status.ok()) { + LOG(WARNING) << "Failed to initialize JNI: " << status.get_error_msg(); + doris::shutdown_logging(); + exit(1); + } +#endif + while (!doris::k_doris_exit) { #if defined(LEAK_SANITIZER) __lsan_do_leak_check(); diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 0582c57b80..2385b80ee6 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -108,6 +108,7 @@ set(UTIL_FILES s3_util.cpp topn_counter.cpp tuple_row_zorder_compare.cpp + jni-util.cpp ) if (WITH_MYSQL) diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp new file mode 100644 index 0000000000..cad68db617 --- /dev/null +++ b/be/src/util/jni-util.cpp @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/jni-util.h" +#ifdef LIBJVM +#include +#include + +#include "gutil/once.h" +#include "gutil/strings/substitute.h" + +using std::string; + +namespace doris { + +namespace { +JavaVM* g_vm; +GoogleOnceType g_vm_once = GOOGLE_ONCE_INIT; + +void FindOrCreateJavaVM() { + int num_vms; + int rv = JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms); + if (rv == 0) { + JNIEnv *env; + JavaVMInitArgs vm_args; + JavaVMOption options[1]; + char* str = getenv("DORIS_JNI_CLASSPATH_PARAMETER"); + options[0].optionString = str; + vm_args.version = JNI_VERSION_1_8; + vm_args.options = options; + vm_args.nOptions = 1; + vm_args.ignoreUnrecognized = JNI_TRUE; + + int res = JNI_CreateJavaVM(&g_vm, (void **)&env, &vm_args); + DCHECK_LT(res, 0) << "Failed tp create JVM, code= " << res; + } else { + CHECK_EQ(rv, 0) << "Could not find any created Java VM"; + CHECK_EQ(num_vms, 1) << "No VMs returned"; + } +} + +} // anonymous namespace + +bool JniUtil::jvm_inited_ = false; +__thread JNIEnv* JniUtil::tls_env_ = nullptr; +jclass JniUtil::internal_exc_cl_ = NULL; +jclass JniUtil::jni_util_cl_ = NULL; +jmethodID JniUtil::throwable_to_string_id_ = NULL; +jmethodID JniUtil::throwable_to_stack_trace_id_ = NULL; +jmethodID JniUtil::get_jvm_metrics_id_ = NULL; +jmethodID JniUtil::get_jvm_threads_id_ = NULL; +jmethodID JniUtil::get_jmx_json_ = NULL; + +Status JniUtfCharGuard::create(JNIEnv* env, jstring jstr, JniUtfCharGuard* out) { + DCHECK(jstr != nullptr); + DCHECK(!env->ExceptionCheck()); + jboolean is_copy; + const char* utf_chars = env->GetStringUTFChars(jstr, &is_copy); + bool exception_check = static_cast(env->ExceptionCheck()); + if (utf_chars == nullptr || exception_check) { + if (exception_check) env->ExceptionClear(); + if (utf_chars != nullptr) env->ReleaseStringUTFChars(jstr, utf_chars); + auto fail_message = "GetStringUTFChars failed. Probable OOM on JVM side"; + LOG(WARNING) << fail_message; + return Status::InternalError(fail_message); + } + out->env = env; + out->jstr = jstr; + out->utf_chars = utf_chars; + return Status::OK(); +} + +Status JniLocalFrame::push(JNIEnv* env, int max_local_ref) { + DCHECK(env_ == NULL); + DCHECK_GT(max_local_ref, 0); + if (env->PushLocalFrame(max_local_ref) < 0) { + env->ExceptionClear(); + return Status::InternalError("failed to push frame"); + } + env_ = env; + return Status::OK(); +} + +Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) { + DCHECK(!tls_env_) << "Call GetJNIEnv() fast path"; + + GoogleOnceInit(&g_vm_once, &FindOrCreateJavaVM); + int rc = g_vm->GetEnv(reinterpret_cast(&tls_env_), JNI_VERSION_1_8); + if (rc == JNI_EDETACHED) { + rc = g_vm->AttachCurrentThread((void **) &tls_env_, nullptr); + } + if (rc != 0 || tls_env_ == nullptr) { + return Status::InternalError("Unable to get JVM!"); + } + *env = tls_env_; + return Status::OK(); +} + +Status JniUtil::GetJniExceptionMsg(JNIEnv* env, bool log_stack, const string& prefix) { + jthrowable exc = env->ExceptionOccurred(); + if (exc == nullptr) { + return Status::OK(); + } + env->ExceptionClear(); + DCHECK(throwable_to_string_id() != nullptr); + const char* oom_msg_template = "$0 threw an unchecked exception. The JVM is likely out " + "of memory (OOM)."; + jstring msg = static_cast(env->CallStaticObjectMethod(jni_util_class(), + throwable_to_string_id(), exc)); + if (env->ExceptionOccurred()) { + env->ExceptionClear(); + string oom_msg = strings::Substitute(oom_msg_template, "throwableToString"); + LOG(WARNING) << oom_msg; + return Status::InternalError(oom_msg); + } + JniUtfCharGuard msg_str_guard; + RETURN_IF_ERROR(JniUtfCharGuard::create(env, msg, &msg_str_guard)); + if (log_stack) { + jstring stack = static_cast(env->CallStaticObjectMethod(jni_util_class(), + throwable_to_stack_trace_id(), exc)); + if (env->ExceptionOccurred()) { + env->ExceptionClear(); + string oom_msg = strings::Substitute(oom_msg_template, "throwableToStackTrace"); + LOG(WARNING) << oom_msg; + return Status::InternalError(oom_msg); + } + JniUtfCharGuard c_stack_guard; + RETURN_IF_ERROR(JniUtfCharGuard::create(env, stack, &c_stack_guard)); + LOG(WARNING) << c_stack_guard.get(); + } + + env->DeleteLocalRef(exc); + return Status::InternalError(strings::Substitute("$0$1", prefix, msg_str_guard.get())); +} + +Status JniUtil::GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref) { + *class_ref = NULL; + jclass local_cl = env->FindClass(class_str); + RETURN_ERROR_IF_EXC(env); + RETURN_IF_ERROR(LocalToGlobalRef(env, local_cl, reinterpret_cast(class_ref))); + env->DeleteLocalRef(local_cl); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} + +Status JniUtil::LocalToGlobalRef(JNIEnv* env, jobject local_ref, jobject* global_ref) { + *global_ref = env->NewGlobalRef(local_ref); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} + +Status JniUtil::Init() { + // Get the JNIEnv* corresponding to current thread. + JNIEnv* env; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + if (env == NULL) return Status::InternalError("Failed to get/create JVM"); + // Find JniUtil class and create a global ref. + jclass local_jni_util_cl = env->FindClass("org/apache/doris/udf/JniUtil"); + if (local_jni_util_cl == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Failed to find JniUtil class."); + } + jni_util_cl_ = reinterpret_cast(env->NewGlobalRef(local_jni_util_cl)); + if (jni_util_cl_ == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Failed to create global reference to JniUtil class."); + } + env->DeleteLocalRef(local_jni_util_cl); + if (env->ExceptionOccurred()) { + return Status::InternalError("Failed to delete local reference to JniUtil class."); + } + + // Find InternalException class and create a global ref. + jclass local_internal_exc_cl = + env->FindClass("org/apache/doris/udf/InternalException"); + if (local_internal_exc_cl == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Failed to find JniUtil class."); + } + internal_exc_cl_ = reinterpret_cast(env->NewGlobalRef(local_internal_exc_cl)); + if (internal_exc_cl_ == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Failed to create global reference to JniUtil class."); + } + env->DeleteLocalRef(local_internal_exc_cl); + if (env->ExceptionOccurred()) { + return Status::InternalError("Failed to delete local reference to JniUtil class."); + } + + // Throwable toString() + throwable_to_string_id_ = + env->GetStaticMethodID(jni_util_cl_, "throwableToString", + "(Ljava/lang/Throwable;)Ljava/lang/String;"); + if (throwable_to_string_id_ == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Failed to find JniUtil.throwableToString method."); + } + + // throwableToStackTrace() + throwable_to_stack_trace_id_ = + env->GetStaticMethodID(jni_util_cl_, "throwableToStackTrace", + "(Ljava/lang/Throwable;)Ljava/lang/String;"); + if (throwable_to_stack_trace_id_ == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Failed to find JniUtil.throwableToFullStackTrace method."); + } + + get_jvm_metrics_id_ = + env->GetStaticMethodID(jni_util_cl_, "getJvmMemoryMetrics", "()[B"); + if (get_jvm_metrics_id_ == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Failed to find JniUtil.getJvmMemoryMetrics method."); + } + + get_jvm_threads_id_ = + env->GetStaticMethodID(jni_util_cl_, "getJvmThreadsInfo", "([B)[B"); + if (get_jvm_threads_id_ == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Failed to find JniUtil.getJvmThreadsInfo method."); + } + + get_jmx_json_ = + env->GetStaticMethodID(jni_util_cl_, "getJMXJson", "()[B"); + if (get_jmx_json_ == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Failed to find JniUtil.getJMXJson method."); + } + jvm_inited_ = true; + return Status::OK(); +} + +} // namespace doris +#endif diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h new file mode 100644 index 0000000000..d83e1fdbb5 --- /dev/null +++ b/be/src/util/jni-util.h @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#ifdef LIBJVM +#include +#include + +#include "common/status.h" +#include "gutil/macros.h" +#include "gutil/strings/substitute.h" +#include "util//thrift_util.h" + +namespace doris { + +#define RETURN_ERROR_IF_EXC(env) \ + do { \ + jthrowable exc = (env)->ExceptionOccurred(); \ + if (exc != nullptr) return JniUtil::GetJniExceptionMsg(env);\ + } while (false) + +class JniUtil { +public: + static Status Init() WARN_UNUSED_RESULT; + + static jmethodID throwable_to_string_id() { return throwable_to_string_id_; } + + static Status GetJNIEnv(JNIEnv** env) { + if (tls_env_) { + *env = tls_env_; + return Status::OK(); + } + return GetJNIEnvSlowPath(env); + } + + static Status GetGlobalClassRef( + JNIEnv* env, const char* class_str, jclass* class_ref) WARN_UNUSED_RESULT; + + static Status LocalToGlobalRef(JNIEnv* env, jobject local_ref, + jobject* global_ref) WARN_UNUSED_RESULT; + + static Status GetJniExceptionMsg(JNIEnv* env, bool log_stack = true, + const std::string& prefix = "") WARN_UNUSED_RESULT; + + static jclass jni_util_class() { return jni_util_cl_; } + static jmethodID throwable_to_stack_trace_id() { return throwable_to_stack_trace_id_; } + +private: + static Status GetJNIEnvSlowPath(JNIEnv** env); + + static bool jvm_inited_; + static jclass internal_exc_cl_; + static jclass jni_util_cl_; + static jmethodID throwable_to_string_id_; + static jmethodID throwable_to_stack_trace_id_; + static jmethodID get_jvm_metrics_id_; + static jmethodID get_jvm_threads_id_; + static jmethodID get_jmx_json_; + + // Thread-local cache of the JNIEnv for this thread. + static __thread JNIEnv* tls_env_; +}; + +/// Helper class for lifetime management of chars from JNI, releasing JNI chars when +/// destructed +class JniUtfCharGuard { +public: + /// Construct a JniUtfCharGuards holding nothing + JniUtfCharGuard() : utf_chars(nullptr) {} + + /// Release the held char sequence if there is one. + ~JniUtfCharGuard() { + if (utf_chars != nullptr) env->ReleaseStringUTFChars(jstr, utf_chars); + } + + /// Try to get chars from jstr. If error is returned, utf_chars and get() remain + /// to be nullptr, otherwise they point to a valid char sequence. The char sequence + /// lives as long as this guard. jstr should not be null. + static Status create(JNIEnv* env, jstring jstr, JniUtfCharGuard* out); + + /// Get the char sequence. Returns nullptr if the guard does hold a char sequence. + const char* get() { return utf_chars; } +private: + JNIEnv* env; + jstring jstr; + const char* utf_chars; + DISALLOW_COPY_AND_ASSIGN(JniUtfCharGuard); +}; + +class JniLocalFrame { +public: + JniLocalFrame(): env_(nullptr) {} + ~JniLocalFrame() { if (env_ != nullptr) env_->PopLocalFrame(nullptr); } + + JniLocalFrame(JniLocalFrame&& other) noexcept + : env_(other.env_) { + other.env_ = nullptr; + } + + /// Pushes a new JNI local frame. The frame can support max_local_ref local references. + /// The number of local references created inside the frame might exceed max_local_ref, + /// but there is no guarantee that memory will be available. + /// Push should be called at most once. + Status push(JNIEnv* env, int max_local_ref = 10) WARN_UNUSED_RESULT; + +private: + DISALLOW_COPY_AND_ASSIGN(JniLocalFrame); + + JNIEnv* env_; +}; + +template +Status SerializeThriftMsg(JNIEnv* env, T* msg, jbyteArray* serialized_msg) { + int buffer_size = 100 * 1024; // start out with 100KB + ThriftSerializer serializer(false, buffer_size); + + uint8_t* buffer = NULL; + uint32_t size = 0; + RETURN_IF_ERROR(serializer.serialize(msg, &size, &buffer)); + + // Make sure that 'size' is within the limit of INT_MAX as the use of + // 'size' below takes int. + if (size > INT_MAX) { + return Status::InternalError(strings::Substitute( + "The length of the serialization buffer ($0 bytes) exceeds the limit of $1 bytes", + size, INT_MAX)); + } + + /// create jbyteArray given buffer + *serialized_msg = env->NewByteArray(size); + RETURN_ERROR_IF_EXC(env); + if (*serialized_msg == NULL) return Status::InternalError("couldn't construct jbyteArray"); + env->SetByteArrayRegion(*serialized_msg, 0, size, reinterpret_cast(buffer)); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} + +} // namespace doris + +#endif diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 20705a60fc..be9e788280 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -161,6 +161,7 @@ set(VEC_FILES functions/function_date_or_datetime_to_string.cpp functions/function_datetime_string_to_string.cpp functions/function_grouping.cpp + functions/function_java_udf.cpp functions/function_rpc.cpp functions/function_convert_tz.cpp functions/least_greast.cpp diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp b/be/src/vec/exprs/vectorized_fn_call.cpp index 6f01a126d6..379b892f03 100644 --- a/be/src/vec/exprs/vectorized_fn_call.cpp +++ b/be/src/vec/exprs/vectorized_fn_call.cpp @@ -25,6 +25,7 @@ #include "udf/udf_internal.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_number.h" +#include "vec/functions/function_java_udf.h" #include "vec/functions/function_rpc.h" #include "vec/functions/simple_function_factory.h" @@ -46,6 +47,12 @@ doris::Status VectorizedFnCall::prepare(doris::RuntimeState* state, if (_fn.binary_type == TFunctionBinaryType::RPC) { _function = RPCFnCall::create(_fn.name.function_name, _fn.hdfs_location, argument_template, _data_type); + } else if (_fn.binary_type == TFunctionBinaryType::JAVA_UDF) { +#ifdef LIBJVM + _function = JavaFunctionCall::create(_fn, argument_template, _data_type); +#else + return Status::InternalError("Java UDF is disabled since no libjvm is found!"); +#endif } else { _function = SimpleFunctionFactory::instance().get_function(_fn.name.function_name, argument_template, _data_type); diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp new file mode 100644 index 0000000000..5a47a5c0c2 --- /dev/null +++ b/be/src/vec/functions/function_java_udf.cpp @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/functions/function_java_udf.h" + +#ifdef LIBJVM +#include + +#include +#include + +#include "gen_cpp/Exprs_types.h" +#include "runtime/exec_env.h" +#include "runtime/user_function_cache.h" +#include "util/jni-util.h" +#include "vec/columns/column_vector.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_bitmap.h" +#include "vec/data_types/data_type_date.h" +#include "vec/data_types/data_type_date_time.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" + +const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor"; +const char* EXECUTOR_CTOR_SIGNATURE ="([B)V"; +const char* EXECUTOR_EVALUATE_SIGNATURE = "()V"; +const char* EXECUTOR_CLOSE_SIGNATURE = "()V"; + +namespace doris::vectorized { +JavaFunctionCall::JavaFunctionCall(const TFunction& fn, + const DataTypes& argument_types, const DataTypePtr& return_type) + : fn_(fn), + _argument_types(argument_types), + _return_type(return_type) {} + +Status JavaFunctionCall::prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) { + DCHECK(executor_cl_ == NULL) << "Init() already called!"; + JNIEnv* env; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + if (env == NULL) return Status::InternalError("Failed to get/create JVM"); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &executor_cl_)); + executor_ctor_id_ = env->GetMethodID( + executor_cl_, "", EXECUTOR_CTOR_SIGNATURE); + RETURN_ERROR_IF_EXC(env); + executor_evaluate_id_ = env->GetMethodID( + executor_cl_, "evaluate", EXECUTOR_EVALUATE_SIGNATURE); + RETURN_ERROR_IF_EXC(env); + executor_close_id_ = env->GetMethodID( + executor_cl_, "close", EXECUTOR_CLOSE_SIGNATURE); + RETURN_ERROR_IF_EXC(env); + + JniContext* jni_ctx = new JniContext(_argument_types.size(), this); + context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx); + + // Add a scoped cleanup jni reference object. This cleans up local refs made below. + JniLocalFrame jni_frame; + { + std::string local_location; + auto function_cache = UserFunctionCache::instance(); + RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum, &local_location)); + TJavaUdfExecutorCtorParams ctor_params; + ctor_params.__set_fn(fn_); + ctor_params.__set_location(local_location); + ctor_params.__set_input_byte_offsets(jni_ctx->input_byte_offsets_ptr); + ctor_params.__set_input_buffer_ptrs(jni_ctx->input_values_buffer_ptr); + ctor_params.__set_input_nulls_ptrs(jni_ctx->input_nulls_buffer_ptr); + ctor_params.__set_output_buffer_ptr(jni_ctx->output_value_buffer); + ctor_params.__set_output_null_ptr(jni_ctx->output_null_value); + ctor_params.__set_batch_size_ptr(jni_ctx->batch_size_ptr); + + jbyteArray ctor_params_bytes; + + // Pushed frame will be popped when jni_frame goes out-of-scope. + RETURN_IF_ERROR(jni_frame.push(env)); + + RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); + jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes); + } + RETURN_ERROR_IF_EXC(env); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor)); + + return Status::OK(); +} + +Status JavaFunctionCall::execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t num_rows, bool dry_run) { + auto return_type = block.get_data_type(result); + if (!return_type->have_maximum_size_of_value()) { + return Status::InvalidArgument(strings::Substitute( + "Java UDF doesn't support return type $0 now !", return_type->get_name())); + } + JNIEnv* env; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + JniContext* jni_ctx = reinterpret_cast( + context->get_function_state(FunctionContext::THREAD_LOCAL)); + int arg_idx = 0; + for (size_t col_idx : arguments) { + ColumnWithTypeAndName& column = block.get_by_position(col_idx); + auto col = column.column->convert_to_full_column_if_const(); + if (!_argument_types[arg_idx]->equals(*column.type)) { + return Status::InvalidArgument(strings::Substitute( + "$0-th input column's type $1 does not equal to required type $2", + arg_idx, column.type->get_name(), + _argument_types[arg_idx]->get_name())); + } + if (!column.type->have_maximum_size_of_value()) { + return Status::InvalidArgument(strings::Substitute( + "Java UDF doesn't support input type $0 now !", return_type->get_name())); + } + auto data_col = col; + if (auto* nullable = check_and_get_column(*col)) { + data_col = nullable->get_nested_column_ptr(); + auto null_col = + check_and_get_column>(nullable->get_null_map_column_ptr()); + ((int64_t*) jni_ctx->input_nulls_buffer_ptr)[arg_idx] = + reinterpret_cast(null_col->get_data().data()); + } + ((int64_t*) jni_ctx->input_values_buffer_ptr)[arg_idx] = + reinterpret_cast(data_col->get_raw_data().data); + arg_idx++; + } + + if (return_type->is_nullable()) { + auto null_type = std::reinterpret_pointer_cast(return_type); + auto data_col = null_type->get_nested_type()->create_column(); + auto null_col = ColumnUInt8::create(data_col->size(), 0); + null_col->reserve(num_rows); + null_col->resize(num_rows); + data_col->reserve(num_rows); + data_col->resize(num_rows); + + *((int64_t*) jni_ctx->output_null_value) = + reinterpret_cast(null_col->get_data().data()); + *((int64_t*) jni_ctx->output_value_buffer) = reinterpret_cast(data_col->get_raw_data().data); + block.replace_by_position(result, + ColumnNullable::create(std::move(data_col), std::move(null_col))); + } else { + auto data_col = return_type->create_column(); + data_col->reserve(num_rows); + data_col->resize(num_rows); + + *((int64_t*) jni_ctx->output_value_buffer) = reinterpret_cast(data_col->get_raw_data().data); + block.replace_by_position(result, std::move(data_col)); + } + *((int32_t*) jni_ctx->batch_size_ptr) = num_rows; + // Using this version of Call has the lowest overhead. This eliminates the + // vtable lookup and setting up return stacks. + env->CallNonvirtualVoidMethodA( + jni_ctx->executor, executor_cl_, executor_evaluate_id_, nullptr); + return JniUtil::GetJniExceptionMsg(env); +} + +Status JavaFunctionCall::close(FunctionContext* context, FunctionContext::FunctionStateScope scope) { + JniContext* jni_ctx = reinterpret_cast( + context->get_function_state(FunctionContext::THREAD_LOCAL)); + if (jni_ctx != NULL) { + delete jni_ctx; + context->set_function_state(FunctionContext::THREAD_LOCAL, nullptr); + } + return Status::OK(); +} +} // namespace doris::vectorized +#endif diff --git a/be/src/vec/functions/function_java_udf.h b/be/src/vec/functions/function_java_udf.h new file mode 100644 index 0000000000..2bc8ce88d8 --- /dev/null +++ b/be/src/vec/functions/function_java_udf.h @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#ifdef LIBJVM +#include + +#include "gen_cpp/Exprs_types.h" +#include "util/jni-util.h" +#include "vec/functions/function.h" + +namespace doris { + +namespace vectorized { +class JavaFunctionCall : public IFunctionBase { +public: + JavaFunctionCall(const TFunction& fn, const DataTypes& argument_types, + const DataTypePtr& return_type); + + static FunctionBasePtr create(const TFunction& fn, + const ColumnsWithTypeAndName& argument_types, + const DataTypePtr& return_type) { + DataTypes data_types(argument_types.size()); + for (size_t i = 0; i < argument_types.size(); ++i) { + data_types[i] = argument_types[i].type; + } + return std::make_shared(fn, data_types, return_type); + } + + /// Get the main function name. + String get_name() const override { return fn_.name.function_name; }; + + const DataTypes& get_argument_types() const override { return _argument_types; }; + const DataTypePtr& get_return_type() const override { return _return_type; }; + + PreparedFunctionPtr prepare(FunctionContext* context, const Block& sample_block, + const ColumnNumbers& arguments, size_t result) const override { + return nullptr; + } + + Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; + + Status execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t input_rows_count, bool dry_run = false) override; + + Status close(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; + + bool is_deterministic() const override { return false; } + + bool is_deterministic_in_scope_of_query() const override { return false; } + +private: + const TFunction& fn_; + const DataTypes _argument_types; + const DataTypePtr _return_type; + + /// Global class reference to the UdfExecutor Java class and related method IDs. Set in + /// Init(). These have the lifetime of the process (i.e. 'executor_cl_' is never freed). + jclass executor_cl_; + jmethodID executor_ctor_id_; + jmethodID executor_evaluate_id_; + jmethodID executor_close_id_; + + struct JniContext { + JavaFunctionCall* parent = nullptr; + + jobject executor = nullptr; + + int64_t input_values_buffer_ptr; + int64_t input_nulls_buffer_ptr; + int64_t input_byte_offsets_ptr; + int64_t output_value_buffer; + int64_t output_null_value; + int64_t batch_size_ptr; + + JniContext(int64_t num_args, JavaFunctionCall* parent): + parent(parent) { + input_values_buffer_ptr = (int64_t) new int64_t[num_args]; + input_nulls_buffer_ptr = (int64_t) new int64_t[num_args]; + input_byte_offsets_ptr = (int64_t) new int64_t[num_args]; + + output_value_buffer = (int64_t) malloc(sizeof(int64_t)); + output_null_value = (int64_t) malloc(sizeof(int64_t)); + batch_size_ptr = (int64_t) malloc(sizeof(int32_t)); + } + + ~JniContext() { + VLOG_DEBUG << "Free resources for JniContext"; + JNIEnv* env; + Status status; + RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethodA( + executor, parent->executor_cl_, parent->executor_close_id_, NULL); + Status s = JniUtil::GetJniExceptionMsg(env); + if (!s.ok()) LOG(WARNING) << s.get_error_msg(); + env->DeleteGlobalRef(executor); + delete[] ((int64*) input_values_buffer_ptr); + delete[] ((int64*) input_nulls_buffer_ptr); + delete[] ((int64*) input_byte_offsets_ptr); + free((int64*) output_value_buffer); + free((int64*) output_null_value); + free((int32*) batch_size_ptr); + } + + /// These functions are cross-compiled to IR and used by codegen. + static void SetInputNullsBufferElement( + JniContext* jni_ctx, int index, uint8_t value); + static uint8_t* GetInputValuesBufferAtOffset(JniContext* jni_ctx, int offset); + }; +}; + +} // namespace vectorized +} // namespace doris +#endif diff --git a/bin/start_be.sh b/bin/start_be.sh index 7981c56938..c5b8da5c61 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -53,6 +53,59 @@ export DORIS_HOME=$( pwd ) +# add libs to CLASSPATH +for f in $DORIS_HOME/lib/*.jar; do + if [ ! -n "${DORIS_JNI_CLASSPATH_PARAMETER}" ]; then + export DORIS_JNI_CLASSPATH_PARAMETER=$f + else + export DORIS_JNI_CLASSPATH_PARAMETER=$f:${DORIS_JNI_CLASSPATH_PARAMETER} + fi +done +# DORIS_JNI_CLASSPATH_PARAMETER is used to configure additional jar path to jvm. e.g. -Djava.class.path=$DORIS_HOME/lib/java-udf.jar +export DORIS_JNI_CLASSPATH_PARAMETER="-Djava.class.path=${DORIS_JNI_CLASSPATH_PARAMETER}" + +jdk_version() { + local result + local java_cmd=$JAVA_HOME/bin/java + local IFS=$'\n' + # remove \r for Cygwin + local lines=$("$java_cmd" -Xms32M -Xmx32M -version 2>&1 | tr '\r' '\n') + if [[ -z $java_cmd ]] + then + result=no_java + else + for line in $lines; do + if [[ (-z $result) && ($line = *"version \""*) ]] + then + local ver=$(echo $line | sed -e 's/.*version "\(.*\)"\(.*\)/\1/; 1q') + # on macOS, sed doesn't support '?' + if [[ $ver = "1."* ]] + then + result=$(echo $ver | sed -e 's/1\.\([0-9]*\)\(.*\)/\1/; 1q') + else + result=$(echo $ver | sed -e 's/\([0-9]*\)\(.*\)/\1/; 1q') + fi + fi + done + fi + echo "$result" +} + +jvm_arch="amd64" +if [[ "${MACHINE_TYPE}" == "aarch64" ]]; then + jvm_arch="aarch64" +fi +java_version=$(jdk_version) +if [[ $java_version -gt 8 ]]; then + export LD_LIBRARY_PATH=$JAVA_HOME/lib/server:$JAVA_HOME/lib:$LD_LIBRARY_PATH +# JAVA_HOME is jdk +elif [[ -d "$JAVA_HOME/jre" ]]; then + export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/$jvm_arch/server:$JAVA_HOME/jre/lib/$jvm_arch:$LD_LIBRARY_PATH +# JAVA_HOME is jre +else + export LD_LIBRARY_PATH=$JAVA_HOME/lib/$jvm_arch/server:$JAVA_HOME/lib/$jvm_arch:$LD_LIBRARY_PATH +fi + # export env variables from be.conf # # UDF_RUNTIME_DIR diff --git a/build.sh b/build.sh index e18df14e9c..f9bb87cf4e 100755 --- a/build.sh +++ b/build.sh @@ -244,6 +244,22 @@ cd ${DORIS_HOME}/gensrc python --version make +# Assesmble FE modules +FE_MODULES= +if [ ${BUILD_FE} -eq 1 -o ${BUILD_SPARK_DPP} -eq 1 -o ${BUILD_BE} -eq 1 ]; then + if [ ${BUILD_FE} -eq 1 -a ${BUILD_BE} -eq 1 ]; then + FE_MODULES="fe-common,spark-dpp,fe-core,java-udf" + elif [ ${BUILD_FE} -eq 1 -a ${BUILD_BE} -eq 0 ]; then + FE_MODULES="fe-common,spark-dpp,fe-core" + elif [ ${BUILD_BE} -eq 1 -a ${BUILD_SPARK_DPP} -eq 0 ]; then + FE_MODULES="fe-common,fe-core,java-udf" + elif [ ${BUILD_BE} -eq 1 -a ${BUILD_SPARK_DPP} -eq 1 ]; then + FE_MODULES="fe-common,fe-core,java-udf,spark-dpp" + else + FE_MODULES="fe-common,spark-dpp" + fi +fi + # Clean and build Backend if [ ${BUILD_BE} -eq 1 ] ; then CMAKE_BUILD_TYPE=${BUILD_TYPE:-Release} @@ -272,6 +288,14 @@ if [ ${BUILD_BE} -eq 1 ] ; then -DGLIBC_COMPATIBILITY=${GLIBC_COMPATIBILITY} ../ ${BUILD_SYSTEM} -j ${PARALLEL} ${BUILD_SYSTEM} install + echo "Build Frontend Modules: java-udf" + cd ${DORIS_HOME}/fe + if [ ${CLEAN} -eq 1 ]; then + clean_fe + fi + if [ ${BUILD_FE} -eq 0 ]; then + ${MVN_CMD} package -pl fe-common,fe-core,java-udf -DskipTests + fi cd ${DORIS_HOME} fi @@ -281,17 +305,6 @@ cd ${DORIS_HOME}/docs ./build_help_zip.sh cd ${DORIS_HOME} -# Assesmble FE modules -FE_MODULES= -if [ ${BUILD_FE} -eq 1 -o ${BUILD_SPARK_DPP} -eq 1 ]; then - if [ ${BUILD_SPARK_DPP} -eq 1 ]; then - FE_MODULES="fe-common,spark-dpp" - fi - if [ ${BUILD_FE} -eq 1 ]; then - FE_MODULES="fe-common,spark-dpp,fe-core" - fi -fi - function build_ui() { # check NPM env here, not in env.sh. # Because UI should be considered a non-essential component at runtime. @@ -384,6 +397,7 @@ if [ ${BUILD_BE} -eq 1 ]; then cp -r -p ${DORIS_HOME}/be/output/udf/*.a ${DORIS_OUTPUT}/udf/lib/ cp -r -p ${DORIS_HOME}/be/output/udf/include/* ${DORIS_OUTPUT}/udf/include/ cp -r -p ${DORIS_HOME}/webroot/be/* ${DORIS_OUTPUT}/be/www/ + cp -r -p ${DORIS_HOME}/fe/java-udf/target/java-udf-jar-with-dependencies.jar ${DORIS_OUTPUT}/be/lib/ cp -r -p ${DORIS_THIRDPARTY}/installed/webroot/* ${DORIS_OUTPUT}/be/www/ mkdir -p ${DORIS_OUTPUT}/be/log diff --git a/fe/fe-core/src/main/java/org/apache/doris/monitor/jvm/JvmPauseMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/monitor/jvm/JvmPauseMonitor.java new file mode 100644 index 0000000000..107062c004 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/monitor/jvm/JvmPauseMonitor.java @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.monitor.jvm; + +import com.google.common.base.Joiner; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.log4j.Logger; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Class which sets up a simple thread which runs in a loop sleeping + * for a short interval of time. If the sleep takes significantly longer + * than its target time, it implies that the JVM or host machine has + * paused processing, which may cause other problems. If such a pause is + * detected, the thread logs a message. + */ +public class JvmPauseMonitor { + private static final Logger LOG = Logger.getLogger(JvmPauseMonitor.class); + + // The target sleep time. + private static final long SLEEP_INTERVAL_MS = 500; + + // Check for Java deadlocks at this interval. Set by init(). 0 or negative means that + // the deadlock checks are disabled. + private long deadlockCheckIntervalS_ = 0; + + // log WARN if we detect a pause longer than this threshold. + private long warnThresholdMs_; + private static final long WARN_THRESHOLD_MS = 10000; + + // log INFO if we detect a pause longer than this threshold. + private long infoThresholdMs_; + private static final long INFO_THRESHOLD_MS = 1000; + + // Overall metrics + // Volatile to allow populating metrics concurrently with the values + // being updated without staleness (but with no other synchronization + // guarantees). + private volatile long numGcWarnThresholdExceeded = 0; + private volatile long numGcInfoThresholdExceeded = 0; + private volatile long totalGcExtraSleepTime = 0; + + // Daemon thread running the pause monitor loop. + private Thread monitorThread_; + private volatile boolean shouldRun = true; + + // Singleton instance of this pause monitor. + public static JvmPauseMonitor INSTANCE = new JvmPauseMonitor(); + + // Initializes the pause monitor. No-op if called multiple times. + public static void initPauseMonitor(long deadlockCheckIntervalS) { + if (INSTANCE.isStarted()) return; + INSTANCE.init(deadlockCheckIntervalS); + } + + private JvmPauseMonitor() { + this(INFO_THRESHOLD_MS, WARN_THRESHOLD_MS); + } + + private JvmPauseMonitor(long infoThresholdMs, long warnThresholdMs) { + this.infoThresholdMs_ = infoThresholdMs; + this.warnThresholdMs_ = warnThresholdMs; + } + + protected void init(long deadlockCheckIntervalS) { + deadlockCheckIntervalS_ = deadlockCheckIntervalS; + monitorThread_ = new Thread(new Monitor(), "JVM pause monitor"); + monitorThread_.setDaemon(true); + monitorThread_.start(); + } + + public boolean isStarted() { + return monitorThread_ != null; + } + + public long getNumGcWarnThresholdExceeded() { + return numGcWarnThresholdExceeded; + } + + public long getNumGcInfoThresholdExceeded() { + return numGcInfoThresholdExceeded; + } + + public long getTotalGcExtraSleepTime() { + return totalGcExtraSleepTime; + } + + /** + * Helper method that formats the message to be logged, along with + * the GC metrics. + */ + private String formatMessage(long extraSleepTime, + Map gcTimesAfterSleep, + Map gcTimesBeforeSleep) { + + Set gcBeanNames = Sets.intersection( + gcTimesAfterSleep.keySet(), + gcTimesBeforeSleep.keySet()); + List gcDiffs = Lists.newArrayList(); + for (String name : gcBeanNames) { + GcTimes diff = gcTimesAfterSleep.get(name).subtract( + gcTimesBeforeSleep.get(name)); + if (diff.gcCount != 0) { + gcDiffs.add("GC pool '" + name + "' had collection(s): " + + diff.toString()); + } + } + + String ret = "Detected pause in JVM or host machine (eg GC): " + + "pause of approximately " + extraSleepTime + "ms\n"; + if (gcDiffs.isEmpty()) { + ret += "No GCs detected"; + } else { + ret += Joiner.on("\n").join(gcDiffs); + } + return ret; + } + + private Map getGcTimes() { + Map map = Maps.newHashMap(); + List gcBeans = + ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + map.put(gcBean.getName(), new GcTimes(gcBean)); + } + return map; + } + + private static class GcTimes { + private GcTimes(GarbageCollectorMXBean gcBean) { + gcCount = gcBean.getCollectionCount(); + gcTimeMillis = gcBean.getCollectionTime(); + } + + private GcTimes(long count, long time) { + this.gcCount = count; + this.gcTimeMillis = time; + } + + private GcTimes subtract(GcTimes other) { + return new GcTimes(this.gcCount - other.gcCount, + this.gcTimeMillis - other.gcTimeMillis); + } + + @Override + public String toString() { + return "count=" + gcCount + " time=" + gcTimeMillis + "ms"; + } + + private long gcCount; + private long gcTimeMillis; + } + + /** + * Runnable instance of the pause monitor loop. Launched from serviceStart(). + */ + private class Monitor implements Runnable { + @Override + public void run() { + Stopwatch sw = Stopwatch.createUnstarted(); + Stopwatch timeSinceDeadlockCheck = Stopwatch.createStarted(); + Map gcTimesBeforeSleep = getGcTimes(); + LOG.info("Starting JVM pause monitor"); + while (shouldRun) { + sw.reset().start(); + try { + Thread.sleep(SLEEP_INTERVAL_MS); + } catch (InterruptedException ie) { + LOG.error("JVM pause monitor interrupted", ie); + return; + } + sw.stop(); + long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS; + Map gcTimesAfterSleep = getGcTimes(); + + if (extraSleepTime > warnThresholdMs_) { + ++numGcWarnThresholdExceeded; + LOG.warn(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } else if (extraSleepTime > infoThresholdMs_) { + ++numGcInfoThresholdExceeded; + LOG.info(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } + totalGcExtraSleepTime += extraSleepTime; + gcTimesBeforeSleep = gcTimesAfterSleep; + + if (deadlockCheckIntervalS_ > 0 && + timeSinceDeadlockCheck.elapsed(TimeUnit.SECONDS) >= deadlockCheckIntervalS_) { + checkForDeadlocks(); + timeSinceDeadlockCheck.reset().start(); + } + } + } + + /** + * Check for deadlocks between Java threads using the JVM's deadlock detector. + * If a deadlock is found, log info about the deadlocked threads and exit the + * process. + *

+ * We choose to exit the process this situation because the deadlock will likely + * cause hangs and other forms of service unavailability and there is no way to + * recover from the deadlock except by restarting the process. + */ + private void checkForDeadlocks() { + ThreadMXBean threadMx = ManagementFactory.getThreadMXBean(); + long deadlockedTids[] = threadMx.findDeadlockedThreads(); + if (deadlockedTids != null) { + ThreadInfo deadlockedThreads[] = + threadMx.getThreadInfo(deadlockedTids, true, true); + // Log diagnostics with error before aborting the process with a FATAL log. + LOG.error("Found " + deadlockedThreads.length + " threads in deadlock: "); + for (ThreadInfo thread : deadlockedThreads) { + // Defensively check for null in case the thread somehow disappeared between + // findDeadlockedThreads() and getThreadInfo(). + if (thread != null) LOG.error(thread.toString()); + } + LOG.warn("All threads:"); + for (ThreadInfo thread : threadMx.dumpAllThreads(true, true)) { + LOG.error(thread.toString()); + } + // In the context of an Doris service, LOG.fatal calls glog's fatal, which + // aborts the process, which will produce a coredump if coredumps are enabled. + LOG.fatal("Aborting because of deadlocked threads in JVM."); + System.exit(1); + } + } + } + + /** + * Helper for manual testing that causes a deadlock between java threads. + */ + private static void causeDeadlock() { + final Object obj1 = new Object(); + final Object obj2 = new Object(); + + new Thread(new Runnable() { + + @Override + public void run() { + while (true) { + synchronized (obj2) { + synchronized (obj1) { + System.err.println("Thread 1 got locks"); + } + } + } + } + }).start(); + + while (true) { + synchronized (obj1) { + synchronized (obj2) { + System.err.println("Thread 2 got locks"); + } + } + } + } + + /** + * This function just leaks memory into a list. Running this function + * with a 1GB heap will very quickly go into "GC hell" and result in + * log messages about the GC pauses. + */ + private static void allocateMemory() { + List list = Lists.newArrayList(); + int i = 0; + while (true) { + list.add(String.valueOf(i++)); + } + } + + /** + * Simple 'main' to facilitate manual testing of the pause monitor. + */ + @SuppressWarnings("resource") + public static void main(String[] args) throws Exception { + JvmPauseMonitor monitor = new JvmPauseMonitor(); + monitor.init(60); + if (args[0].equals("gc")) { + allocateMemory(); + } else if (args[0].equals("deadlock")) { + causeDeadlock(); + } else { + System.err.println("Unknown mode"); + } + } + +} + + diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml new file mode 100644 index 0000000000..e1433d15f1 --- /dev/null +++ b/fe/java-udf/pom.xml @@ -0,0 +1,111 @@ + + + + 4.0.0 + + + org.apache.doris + ${revision} + fe + ../pom.xml + + + java-udf + jar + + ${basedir}/../../ + 1 + + + + ${project.groupId} + fe-common + ${project.version} + + + ${project.groupId} + fe-core + ${project.version} + + + org.apache.hive + hive-exec + ${hive.version} + + + org.pentaho + * + + + + + + java-udf + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + + + + + + make-assembly + package + + single + + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.1.0 + + checkstyle.xml + UTF-8 + true + true + false + **/jmockit/**/* + + + + validate + validate + + check + + + + + + + diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/InternalException.java b/fe/java-udf/src/main/java/org/apache/doris/udf/InternalException.java new file mode 100644 index 0000000000..1b1e335ae2 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/InternalException.java @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +public class InternalException extends Exception { + public InternalException(String msg, Throwable cause) { + super(msg, cause); + } + + public InternalException(String msg) { + super(msg); + } +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JMXJsonUtil.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JMXJsonUtil.java new file mode 100644 index 0000000000..47057d26b4 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JMXJsonUtil.java @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; + +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.IntrospectionException; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanException; +import javax.management.MBeanInfo; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.ReflectionException; +import javax.management.RuntimeErrorException; +import javax.management.RuntimeMBeanException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import java.io.IOException; +import java.io.StringWriter; +import java.lang.management.ManagementFactory; +import java.lang.reflect.Array; +import java.util.Iterator; +import java.util.Set; + +/** + * Utility class that returns a JSON representation of the JMX beans. + * This is based on hadoop-common's implementation of JMXJsonServlet. + *

+ * Output format: + * { + * "beans" : [ + * { + * "name":"bean-name" + * ... + * } + * ] + * } + * Each bean's attributes will be converted to a JSON object member. + * If the attribute is a boolean, a number, a string, or an array + * it will be converted to the JSON equivalent. + *

+ * If the value is a {@link CompositeData} then it will be converted + * to a JSON object with the keys as the name of the JSON member and + * the value is converted following these same rules. + * If the value is a {@link TabularData} then it will be converted + * to an array of the {@link CompositeData} elements that it contains. + * All other objects will be converted to a string and output as such. + * The bean's name and modelerType will be returned for all beans. + */ +public class JMXJsonUtil { + // MBean server instance + protected static transient MBeanServer mBeanServer = + ManagementFactory.getPlatformMBeanServer(); + + private static final Logger LOG = Logger.getLogger(JMXJsonUtil.class); + + // Returns the JMX beans as a JSON string. + public static String getJMXJson() { + StringWriter writer = new StringWriter(); + try { + JsonGenerator jg = null; + try { + JsonFactory jsonFactory = new JsonFactory(); + jg = jsonFactory.createJsonGenerator(writer); + jg.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + jg.writeStartObject(); + if (mBeanServer == null) { + jg.writeStringField("result", "ERROR"); + jg.writeStringField("message", "No MBeanServer could be found"); + jg.close(); + LOG.error("No MBeanServer could be found."); + return writer.toString(); + } + listBeans(jg); + } finally { + if (jg != null) { + jg.close(); + } + if (writer != null) { + writer.close(); + } + } + } catch (IOException e) { + LOG.error("Caught an exception while processing JMX request", e); + } + return writer.toString(); + } + + // Utility method that lists all the mbeans and write them using the supplied + // JsonGenerator. + private static void listBeans(JsonGenerator jg) throws IOException { + Set names; + names = mBeanServer.queryNames(null, null); + jg.writeArrayFieldStart("beans"); + Iterator it = names.iterator(); + while (it.hasNext()) { + ObjectName oname = it.next(); + MBeanInfo minfo; + String code = ""; + Object attributeinfo = null; + try { + minfo = mBeanServer.getMBeanInfo(oname); + code = minfo.getClassName(); + String prs = ""; + try { + if ("org.apache.commons.modeler.BaseModelMBean".equals(code)) { + prs = "modelerType"; + code = (String) mBeanServer.getAttribute(oname, prs); + } + } catch (AttributeNotFoundException e) { + // If the modelerType attribute was not found, the class name is used + // instead. + LOG.error("getting attribute " + prs + " of " + oname + + " threw an exception", e); + } catch (MBeanException e) { + // The code inside the attribute getter threw an exception so log it, + // and fall back on the class name + LOG.error("getting attribute " + prs + " of " + oname + + " threw an exception", e); + } catch (RuntimeException e) { + // For some reason even with an MBeanException available to them + // Runtime exceptionscan still find their way through, so treat them + // the same as MBeanException + LOG.error("getting attribute " + prs + " of " + oname + + " threw an exception", e); + } catch (ReflectionException e) { + // This happens when the code inside the JMX bean (setter?? from the + // java docs) threw an exception, so log it and fall back on the + // class name + LOG.error("getting attribute " + prs + " of " + oname + + " threw an exception", e); + } + } catch (InstanceNotFoundException e) { + //Ignored for some reason the bean was not found so don't output it + continue; + } catch (IntrospectionException | ReflectionException e) { + // This is an internal error, something odd happened with reflection so + // log it and don't output the bean. + LOG.error("Problem while trying to process JMX query with MBean " + oname, e); + continue; + } + jg.writeStartObject(); + jg.writeStringField("name", oname.toString()); + jg.writeStringField("modelerType", code); + MBeanAttributeInfo attrs[] = minfo.getAttributes(); + for (int i = 0; i < attrs.length; i++) { + writeAttribute(jg, oname, attrs[i]); + } + jg.writeEndObject(); + } + jg.writeEndArray(); + } + + // Utility method to write mBean attributes. + private static void writeAttribute(JsonGenerator jg, ObjectName oname, + MBeanAttributeInfo attr) throws IOException { + if (!attr.isReadable()) { + return; + } + String attName = attr.getName(); + if ("modelerType".equals(attName)) { + return; + } + if (attName.indexOf("=") >= 0 || attName.indexOf(":") >= 0 + || attName.indexOf(" ") >= 0) { + return; + } + Object value = null; + try { + value = mBeanServer.getAttribute(oname, attName); + } catch (RuntimeMBeanException e) { + // UnsupportedOperationExceptions happen in the normal course of business, + // so no need to log them as errors all the time. + if (e.getCause() instanceof UnsupportedOperationException) { + LOG.trace("getting attribute " + attName + " of " + oname + " threw an exception", e); + } else { + LOG.error("getting attribute " + attName + " of " + oname + " threw an exception", e); + } + return; + } catch (RuntimeErrorException e) { + // RuntimeErrorException happens when an unexpected failure occurs in getAttribute + // for example https://issues.apache.org/jira/browse/DAEMON-120 + LOG.debug("getting attribute " + attName + " of " + oname + " threw an exception", e); + return; + } catch (AttributeNotFoundException e) { + //Ignored the attribute was not found, which should never happen because the bean + //just told us that it has this attribute, but if this happens just don't output + //the attribute. + return; + } catch (MBeanException e) { + //The code inside the attribute getter threw an exception so log it, and + // skip outputting the attribute + LOG.error("getting attribute " + attName + " of " + oname + " threw an exception", e); + return; + } catch (RuntimeException e) { + //For some reason even with an MBeanException available to them Runtime exceptions + //can still find their way through, so treat them the same as MBeanException + LOG.error("getting attribute " + attName + " of " + oname + " threw an exception", e); + return; + } catch (ReflectionException e) { + //This happens when the code inside the JMX bean (setter?? from the java docs) + //threw an exception, so log it and skip outputting the attribute + LOG.error("getting attribute " + attName + " of " + oname + " threw an exception", e); + return; + } catch (InstanceNotFoundException e) { + //Ignored the mbean itself was not found, which should never happen because we + //just accessed it (perhaps something unregistered in-between) but if this + //happens just don't output the attribute. + return; + } + writeAttribute(jg, attName, value); + } + + private static void writeAttribute(JsonGenerator jg, String attName, Object value) + throws IOException { + jg.writeFieldName(attName); + writeObject(jg, value); + } + + private static void writeObject(JsonGenerator jg, Object value) throws IOException { + if (value == null) { + jg.writeNull(); + } else { + Class c = value.getClass(); + if (c.isArray()) { + jg.writeStartArray(); + int len = Array.getLength(value); + for (int j = 0; j < len; j++) { + Object item = Array.get(value, j); + writeObject(jg, item); + } + jg.writeEndArray(); + } else if (value instanceof Number) { + Number n = (Number) value; + jg.writeNumber(n.toString()); + } else if (value instanceof Boolean) { + Boolean b = (Boolean) value; + jg.writeBoolean(b); + } else if (value instanceof CompositeData) { + CompositeData cds = (CompositeData) value; + CompositeType comp = cds.getCompositeType(); + Set keys = comp.keySet(); + jg.writeStartObject(); + for (String key : keys) { + writeAttribute(jg, key, cds.get(key)); + } + jg.writeEndObject(); + } else if (value instanceof TabularData) { + TabularData tds = (TabularData) value; + jg.writeStartArray(); + for (Object entry : tds.values()) { + writeObject(jg, entry); + } + jg.writeEndArray(); + } else { + jg.writeString(value.toString()); + } + } + } +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JniUtil.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JniUtil.java new file mode 100644 index 0000000000..8812376ef6 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JniUtil.java @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import com.google.common.base.Joiner; + +import org.apache.doris.thrift.TJvmMemoryPool; +import org.apache.doris.thrift.TGetJvmMemoryMetricsResponse; +import org.apache.doris.thrift.TJvmThreadInfo; +import org.apache.doris.thrift.TGetJvmThreadsInfoRequest; +import org.apache.doris.thrift.TGetJvmThreadsInfoResponse; +import org.apache.doris.thrift.TGetJMXJsonResponse; +import org.apache.doris.monitor.jvm.JvmPauseMonitor; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocolFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.io.Writer; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.lang.management.RuntimeMXBean; +import java.lang.management.ThreadInfo; +import java.util.ArrayList; +import java.util.Map; + +/** + * Utility class with methods intended for JNI clients + */ +public class JniUtil { + private final static TBinaryProtocol.Factory protocolFactory_ = + new TBinaryProtocol.Factory(); + + /** + * Initializes the JvmPauseMonitor instance. + */ + public static void initPauseMonitor(long deadlockCheckIntervalS) { + JvmPauseMonitor.INSTANCE.initPauseMonitor(deadlockCheckIntervalS); + } + + /** + * Returns a formatted string containing the simple exception name and the + * exception message without the full stack trace. Includes the + * the chain of causes each in a separate line. + */ + public static String throwableToString(Throwable t) { + StringWriter output = new StringWriter(); + output.write(String.format("%s: %s", t.getClass().getSimpleName(), + t.getMessage())); + // Follow the chain of exception causes and print them as well. + Throwable cause = t; + while ((cause = cause.getCause()) != null) { + output.write(String.format("\nCAUSED BY: %s: %s", + cause.getClass().getSimpleName(), cause.getMessage())); + } + return output.toString(); + } + + /** + * Returns the stack trace of the Throwable object. + */ + public static String throwableToStackTrace(Throwable t) { + Writer output = new StringWriter(); + t.printStackTrace(new PrintWriter(output)); + return output.toString(); + } + + /** + * Serializes input into a byte[] using the default protocol factory. + */ + public static > + byte[] serializeToThrift(T input) throws InternalException { + TSerializer serializer = new TSerializer(protocolFactory_); + try { + return serializer.serialize(input); + } catch (TException e) { + throw new InternalException(e.getMessage()); + } + } + + /** + * Serializes input into a byte[] using a given protocol factory. + */ + public static , F extends TProtocolFactory> + byte[] serializeToThrift(T input, F protocolFactory) throws InternalException { + TSerializer serializer = new TSerializer(protocolFactory); + try { + return serializer.serialize(input); + } catch (TException e) { + throw new InternalException(e.getMessage()); + } + } + + public static > + void deserializeThrift(T result, byte[] thriftData) throws InternalException { + deserializeThrift(protocolFactory_, result, thriftData); + } + + /** + * Deserialize a serialized form of a Thrift data structure to its object form. + */ + public static , F extends TProtocolFactory> + void deserializeThrift(F protocolFactory, T result, byte[] thriftData) + throws InternalException { + // TODO: avoid creating deserializer for each query? + TDeserializer deserializer = new TDeserializer(protocolFactory); + try { + deserializer.deserialize(result, thriftData); + } catch (TException e) { + throw new InternalException(e.getMessage()); + } + } + + /** + * Collect the JVM's memory statistics into a thrift structure for translation into + * Doris metrics by the backend. A synthetic 'total' memory pool is included with + * aggregate statistics for all real pools. Metrics for the JvmPauseMonitor + * and Garbage Collection are also included. + */ + public static byte[] getJvmMemoryMetrics() throws InternalException { + TGetJvmMemoryMetricsResponse jvmMetrics = new TGetJvmMemoryMetricsResponse(); + jvmMetrics.setMemoryPools(new ArrayList()); + TJvmMemoryPool totalUsage = new TJvmMemoryPool(); + + totalUsage.setName("total"); + jvmMetrics.getMemoryPools().add(totalUsage); + + for (MemoryPoolMXBean memBean : ManagementFactory.getMemoryPoolMXBeans()) { + TJvmMemoryPool usage = new TJvmMemoryPool(); + MemoryUsage beanUsage = memBean.getUsage(); + usage.setCommitted(beanUsage.getCommitted()); + usage.setInit(beanUsage.getInit()); + usage.setMax(beanUsage.getMax()); + usage.setUsed(beanUsage.getUsed()); + usage.setName(memBean.getName()); + + totalUsage.committed += beanUsage.getCommitted(); + totalUsage.init += beanUsage.getInit(); + totalUsage.max += beanUsage.getMax(); + totalUsage.used += beanUsage.getUsed(); + + MemoryUsage peakUsage = memBean.getPeakUsage(); + usage.setPeakCommitted(peakUsage.getCommitted()); + usage.setPeakInit(peakUsage.getInit()); + usage.setPeakMax(peakUsage.getMax()); + usage.setPeakUsed(peakUsage.getUsed()); + + totalUsage.peak_committed += peakUsage.getCommitted(); + totalUsage.peak_init += peakUsage.getInit(); + totalUsage.peak_max += peakUsage.getMax(); + totalUsage.peak_used += peakUsage.getUsed(); + + jvmMetrics.getMemoryPools().add(usage); + } + + // Populate heap usage + MemoryMXBean mBean = ManagementFactory.getMemoryMXBean(); + TJvmMemoryPool heap = new TJvmMemoryPool(); + MemoryUsage heapUsage = mBean.getHeapMemoryUsage(); + heap.setCommitted(heapUsage.getCommitted()); + heap.setInit(heapUsage.getInit()); + heap.setMax(heapUsage.getMax()); + heap.setUsed(heapUsage.getUsed()); + heap.setName("heap"); + heap.setPeakCommitted(0); + heap.setPeakInit(0); + heap.setPeakMax(0); + heap.setPeakUsed(0); + jvmMetrics.getMemoryPools().add(heap); + + // Populate non-heap usage + TJvmMemoryPool nonHeap = new TJvmMemoryPool(); + MemoryUsage nonHeapUsage = mBean.getNonHeapMemoryUsage(); + nonHeap.setCommitted(nonHeapUsage.getCommitted()); + nonHeap.setInit(nonHeapUsage.getInit()); + nonHeap.setMax(nonHeapUsage.getMax()); + nonHeap.setUsed(nonHeapUsage.getUsed()); + nonHeap.setName("non-heap"); + nonHeap.setPeakCommitted(0); + nonHeap.setPeakInit(0); + nonHeap.setPeakMax(0); + nonHeap.setPeakUsed(0); + jvmMetrics.getMemoryPools().add(nonHeap); + + // Populate JvmPauseMonitor metrics + jvmMetrics.setGcNumWarnThresholdExceeded( + JvmPauseMonitor.INSTANCE.getNumGcWarnThresholdExceeded()); + jvmMetrics.setGcNumInfoThresholdExceeded( + JvmPauseMonitor.INSTANCE.getNumGcInfoThresholdExceeded()); + jvmMetrics.setGcTotalExtraSleepTimeMillis( + JvmPauseMonitor.INSTANCE.getTotalGcExtraSleepTime()); + + // And Garbage Collector metrics + long gcCount = 0; + long gcTimeMillis = 0; + for (GarbageCollectorMXBean bean : ManagementFactory.getGarbageCollectorMXBeans()) { + gcCount += bean.getCollectionCount(); + gcTimeMillis += bean.getCollectionTime(); + } + jvmMetrics.setGcCount(gcCount); + jvmMetrics.setGcTimeMillis(gcTimeMillis); + + return serializeToThrift(jvmMetrics, protocolFactory_); + } + + /** + * Get information about the live JVM threads. + */ + public static byte[] getJvmThreadsInfo(byte[] argument) throws InternalException { + TGetJvmThreadsInfoRequest request = new TGetJvmThreadsInfoRequest(); + JniUtil.deserializeThrift(protocolFactory_, request, argument); + TGetJvmThreadsInfoResponse response = new TGetJvmThreadsInfoResponse(); + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + response.setTotalThreadCount(threadBean.getThreadCount()); + response.setDaemonThreadCount(threadBean.getDaemonThreadCount()); + response.setPeakThreadCount(threadBean.getPeakThreadCount()); + if (request.get_complete_info) { + for (ThreadInfo threadInfo : threadBean.dumpAllThreads(true, true)) { + TJvmThreadInfo tThreadInfo = new TJvmThreadInfo(); + long id = threadInfo.getThreadId(); + tThreadInfo.setSummary(threadInfo.toString()); + tThreadInfo.setCpuTimeInNs(threadBean.getThreadCpuTime(id)); + tThreadInfo.setUserTimeInNs(threadBean.getThreadUserTime(id)); + tThreadInfo.setBlockedCount(threadInfo.getBlockedCount()); + tThreadInfo.setBlockedTimeInMs(threadInfo.getBlockedTime()); + tThreadInfo.setIsInNative(threadInfo.isInNative()); + response.addToThreads(tThreadInfo); + } + } + return serializeToThrift(response, protocolFactory_); + } + + public static byte[] getJMXJson() throws InternalException { + TGetJMXJsonResponse response = new TGetJMXJsonResponse(JMXJsonUtil.getJMXJson()); + return serializeToThrift(response, protocolFactory_); + } + + /** + * Get Java version, input arguments and system properties. + */ + public static String getJavaVersion() { + RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); + StringBuilder sb = new StringBuilder(); + sb.append("Java Input arguments:\n"); + sb.append(Joiner.on(" ").join(runtime.getInputArguments())); + sb.append("\nJava System properties:\n"); + for (Map.Entry entry : runtime.getSystemProperties().entrySet()) { + sb.append(entry.getKey() + ":" + entry.getValue() + "\n"); + } + return sb.toString(); + } +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java new file mode 100644 index 0000000000..89b6ea79a2 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java @@ -0,0 +1,437 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import org.apache.doris.catalog.Type; +import org.apache.doris.thrift.TJavaUdfExecutorCtorParams; +import org.apache.doris.thrift.TPrimitiveType; +import org.apache.log4j.Logger; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; + +import sun.misc.Unsafe; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; + +public class UdfExecutor { + private static final Logger LOG = Logger.getLogger(UdfExecutor.class); + public static final Unsafe UNSAFE; + + static { + UNSAFE = (Unsafe) AccessController.doPrivileged( + (PrivilegedAction) () -> { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return f.get(null); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new Error(); + } + }); + } + + // By convention, the function in the class must be called evaluate() + public static final String UDF_FUNCTION_NAME = "evaluate"; + + // Object to deserialize ctor params from BE. + private final static TBinaryProtocol.Factory PROTOCOL_FACTORY = + new TBinaryProtocol.Factory(); + + private Object udf_; + // setup by init() and cleared by close() + private Method method_; + // setup by init() and cleared by close() + private URLClassLoader classLoader_; + + // Return and argument types of the function inferred from the udf method signature. + // The JavaUdfDataType enum maps it to corresponding primitive type. + private JavaUdfDataType[] argTypes_; + private JavaUdfDataType retType_; + + // Input buffer from the backend. This is valid for the duration of an evaluate() call. + // These buffers are allocated in the BE. + private final long inputBufferPtrs_; + private final long inputNullsPtrs_; + + // Output buffer to return non-string values. These buffers are allocated in the BE. + private final long outputBufferPtr_; + private final long outputNullPtr_; + + // Pre-constructed input objects for the UDF. This minimizes object creation overhead + // as these objects are reused across calls to evaluate(). + private Object[] inputObjects_; + // inputArgs_[i] is either inputObjects_[i] or null + private Object[] inputArgs_; + + private final long batch_size_ptr_; + + // Data types that are supported as return or argument types in Java UDFs. + public enum JavaUdfDataType { + INVALID_TYPE("INVALID_TYPE", TPrimitiveType.INVALID_TYPE, 0), + BOOLEAN("BOOLEAN", TPrimitiveType.BOOLEAN, 1), + TINYINT("TINYINT", TPrimitiveType.TINYINT, 1), + SMALLINT("SMALLINT", TPrimitiveType.SMALLINT, 2), + INT("INT", TPrimitiveType.INT, 4), + BIGINT("BIGINT", TPrimitiveType.BIGINT, 8), + FLOAT("FLOAT", TPrimitiveType.FLOAT, 4), + DOUBLE("DOUBLE", TPrimitiveType.DOUBLE, 8); + + private final String description_; + private final TPrimitiveType thriftType_; + private final int len_; + + JavaUdfDataType(String description, TPrimitiveType thriftType, int len) { + description_ = description; + thriftType_ = thriftType; + len_ = len; + } + + @Override + public String toString() { + return description_; + } + + public TPrimitiveType getPrimitiveType() { + return thriftType_; + } + + public int getLen() { + return len_; + } + + public static JavaUdfDataType getType(Class c) { + if (c == boolean.class || c == Boolean.class) { + return JavaUdfDataType.BOOLEAN; + } else if (c == byte.class || c == Byte.class) { + return JavaUdfDataType.TINYINT; + } else if (c == short.class || c == Short.class) { + return JavaUdfDataType.SMALLINT; + } else if (c == int.class || c == Integer.class) { + return JavaUdfDataType.INT; + } else if (c == long.class || c == Long.class) { + return JavaUdfDataType.BIGINT; + } else if (c == float.class || c == Float.class) { + return JavaUdfDataType.FLOAT; + } else if (c == double.class || c == Double.class) { + return JavaUdfDataType.DOUBLE; + } + return JavaUdfDataType.INVALID_TYPE; + } + + public static boolean isSupported(Type t) { + for (JavaUdfDataType javaType : JavaUdfDataType.values()) { + if (javaType == JavaUdfDataType.INVALID_TYPE) continue; + if (javaType.getPrimitiveType() == t.getPrimitiveType().toThrift()) { + return true; + } + } + return false; + } + } + + /** + * Create a UdfExecutor, using parameters from a serialized thrift object. Used by + * the backend. + */ + public UdfExecutor(byte[] thriftParams) throws Exception { + TJavaUdfExecutorCtorParams request = new TJavaUdfExecutorCtorParams(); + + TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY); + try { + deserializer.deserialize(request, thriftParams); + } catch (TException e) { + throw new InternalException(e.getMessage()); + } + + String className = request.fn.scalar_fn.symbol; + String jarFile = request.location; + Type retType = UdfUtils.fromThrift(request.fn.ret_type, 0).first; + Type[] parameterTypes = new Type[request.fn.arg_types.size()]; + for (int i = 0; i < request.fn.arg_types.size(); ++i) { + parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i)); + } + batch_size_ptr_ = request.batch_size_ptr; + inputBufferPtrs_ = request.input_buffer_ptrs; + inputNullsPtrs_ = request.input_nulls_ptrs; + outputBufferPtr_ = request.output_buffer_ptr; + outputNullPtr_ = request.output_null_ptr; + + init(jarFile, className, retType, parameterTypes); + } + + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } + + /** + * Close the class loader we may have created. + */ + public void close() { + if (classLoader_ != null) { + try { + classLoader_.close(); + } catch (IOException e) { + // Log and ignore. + LOG.debug("Error closing the URLClassloader.", e); + } + } + // We are now un-usable (because the class loader has been + // closed), so null out method_ and classLoader_. + method_ = null; + classLoader_ = null; + } + + /** + * evaluate function called by the backend. The inputs to the UDF have + * been serialized to 'input' + */ + public void evaluate() throws UdfRuntimeException { + try { + int batch_size = UNSAFE.getInt(null, batch_size_ptr_); + for (int row = 0; row < batch_size; row++) { + allocateInputObjects(row); + for (int i = 0; i < argTypes_.length; ++i) { + if (UNSAFE.getByte(null, UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputNullsPtrs_, i)) + row * 1L) == 0) { + inputArgs_[i] = inputObjects_[i]; + } else { + inputArgs_[i] = null; + } + } + storeUdfResult(evaluate(inputArgs_), row); + } + } catch (Exception e) { + throw new UdfRuntimeException("UDF::evaluate() ran into a problem.", e); + } + } + + /** + * Evaluates the UDF with 'args' as the input to the UDF. + */ + private Object evaluate(Object... args) throws UdfRuntimeException { + try { + return method_.invoke(udf_, args); + } catch (Exception e) { + throw new UdfRuntimeException("UDF failed to evaluate", e); + } + } + + public Method getMethod() { + return method_; + } + + // Sets the result object 'obj' into the outputBufferPtr_ and outputNullPtr_ + private void storeUdfResult(Object obj, int row) throws UdfRuntimeException { + if (obj == null) { + UNSAFE.putByte(null, UNSAFE.getLong(null, outputNullPtr_) + row * 1L, (byte) 1); + return; + } + UNSAFE.putByte(UNSAFE.getLong(null, outputNullPtr_) + row * 1L, (byte) 0); + switch (retType_) { + case BOOLEAN: { + boolean val = (boolean) obj; + UNSAFE.putByte(UNSAFE.getLong(null, outputBufferPtr_) + row * retType_.getLen(), val ? (byte) 1 : 0); + return; + } + case TINYINT: { + UNSAFE.putByte(UNSAFE.getLong(null, outputBufferPtr_) + row * retType_.getLen(), (byte) obj); + return; + } + case SMALLINT: { + UNSAFE.putShort(UNSAFE.getLong(null, outputBufferPtr_) + row * retType_.getLen(), (short) obj); + return; + } + case INT: { + UNSAFE.putInt(UNSAFE.getLong(null, outputBufferPtr_) + row * retType_.getLen(), (int) obj); + return; + } + case BIGINT: { + UNSAFE.putLong(UNSAFE.getLong(null, outputBufferPtr_) + row * retType_.getLen(), (long) obj); + return; + } + case FLOAT: { + UNSAFE.putFloat(UNSAFE.getLong(null, outputBufferPtr_) + row * retType_.getLen(), (float) obj); + return; + } + case DOUBLE: { + UNSAFE.putDouble(UNSAFE.getLong(null, outputBufferPtr_) + row * retType_.getLen(), (double) obj); + return; + } + default: + throw new UdfRuntimeException("Unsupported return type: " + retType_); + } + } + + // Preallocate the input objects that will be passed to the underlying UDF. + // These objects are allocated once and reused across calls to evaluate() + private void allocateInputObjects(int row) throws UdfRuntimeException { + inputObjects_ = new Object[argTypes_.length]; + inputArgs_ = new Object[argTypes_.length]; + + for (int i = 0; i < argTypes_.length; ++i) { + switch (argTypes_[i]) { + case BOOLEAN: + inputObjects_[i] = UNSAFE.getBoolean(null, UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs_, i)) + 1L * row); + break; + case TINYINT: + inputObjects_[i] = UNSAFE.getByte(null, UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs_, i)) + 1L * row); + break; + case SMALLINT: + inputObjects_[i] = UNSAFE.getShort(null, UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs_, i)) + 2L * row); + break; + case INT: + inputObjects_[i] = UNSAFE.getInt(null, UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs_, i)) + 4L * row); + break; + case BIGINT: + inputObjects_[i] = UNSAFE.getLong(null, UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs_, i)) + 8L * row); + break; + case FLOAT: + inputObjects_[i] = UNSAFE.getFloat(null, UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs_, i)) + 4L * row); + break; + case DOUBLE: + inputObjects_[i] = UNSAFE.getDouble(null, UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs_, i)) + 8L * row); + break; + default: + throw new UdfRuntimeException("Unsupported argument type: " + argTypes_[i]); + } + } + } + + private URLClassLoader getClassLoader(String jarPath) throws MalformedURLException { + URL url = new File(jarPath).toURI().toURL(); + return URLClassLoader.newInstance(new URL[]{url}, getClass().getClassLoader()); + } + + /** + * Sets the return type of a Java UDF. Returns true if the return type is compatible + * with the return type from the function definition. Throws an UdfRuntimeException + * if the return type is not supported. + */ + private boolean setReturnType(Type retType, Class udfReturnType) + throws InternalException { + if (!JavaUdfDataType.isSupported(retType)) { + throw new InternalException("Unsupported return type: " + retType.toSql()); + } + JavaUdfDataType javaType = JavaUdfDataType.getType(udfReturnType); + // Check if the evaluate method return type is compatible with the return type from + // the function definition. This happens when both of them map to the same primitive + // type. + if (retType.getPrimitiveType().toThrift() != javaType.getPrimitiveType()) { + return false; + } + retType_ = javaType; + return true; + } + + /** + * Sets the argument types of a Java UDF. Returns true if the argument types specified + * in the UDF are compatible with the argument types of the evaluate() function loaded + * from the associated JAR file. + */ + private boolean setArgTypes(Type[] parameterTypes, Class[] udfArgTypes) { + Preconditions.checkNotNull(argTypes_); + for (int i = 0; i < udfArgTypes.length; ++i) { + argTypes_[i] = JavaUdfDataType.getType(udfArgTypes[i]); + if (argTypes_[i].getPrimitiveType() + != parameterTypes[i].getPrimitiveType().toThrift()) { + return false; + } + } + return true; + } + + private void init(String jarPath, String udfPath, + Type retType, Type... parameterTypes) throws UdfRuntimeException { + ArrayList signatures = Lists.newArrayList(); + try { + LOG.debug("Loading UDF '" + udfPath + "' from " + jarPath); + ClassLoader loader; + if (jarPath != null) { + // Save for cleanup. + classLoader_ = getClassLoader(jarPath); + loader = classLoader_; + } else { + loader = ClassLoader.getSystemClassLoader(); + } + Class c = Class.forName(udfPath, true, loader); + Constructor ctor = c.getConstructor(); + udf_ = ctor.newInstance(); + argTypes_ = new JavaUdfDataType[parameterTypes.length]; + Method[] methods = c.getMethods(); + for (Method m : methods) { + // By convention, the udf must contain the function "evaluate" + if (!m.getName().equals(UDF_FUNCTION_NAME)) continue; + signatures.add(m.toGenericString()); + Class[] methodTypes = m.getParameterTypes(); + + // Try to match the arguments + if (methodTypes.length != parameterTypes.length) continue; + method_ = m; + if (methodTypes.length == 0 && parameterTypes.length == 0) { + // Special case where the UDF doesn't take any input args + if (!setReturnType(retType, m.getReturnType())) continue; + LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath); + return; + } + if (!setReturnType(retType, m.getReturnType())) continue; + if (!setArgTypes(parameterTypes, methodTypes)) continue; + LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath); + return; + } + + StringBuilder sb = new StringBuilder(); + sb.append("Unable to find evaluate function with the correct signature: ") + .append(udfPath + ".evaluate(") + .append(Joiner.on(", ").join(parameterTypes)) + .append(")\n") + .append("UDF contains: \n ") + .append(Joiner.on("\n ").join(signatures)); + throw new UdfRuntimeException(sb.toString()); + } catch (MalformedURLException e) { + throw new UdfRuntimeException("Unable to load jar.", e); + } catch (SecurityException e) { + throw new UdfRuntimeException("Unable to load function.", e); + } catch (ClassNotFoundException e) { + throw new UdfRuntimeException("Unable to find class.", e); + } catch (NoSuchMethodException e) { + throw new UdfRuntimeException( + "Unable to find constructor with no arguments.", e); + } catch (IllegalArgumentException e) { + throw new UdfRuntimeException( + "Unable to call UDF constructor with no arguments.", e); + } catch (Exception e) { + throw new UdfRuntimeException("Unable to call create UDF instance.", e); + } + } +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfRuntimeException.java b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfRuntimeException.java new file mode 100644 index 0000000000..ef02ae5c36 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfRuntimeException.java @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +public class UdfRuntimeException extends Exception { + public UdfRuntimeException(String msg, Throwable cause) { + super(msg, cause); + } + + public UdfRuntimeException(String msg) { + super(msg); + } +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java new file mode 100644 index 0000000000..9f33977df3 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import com.google.common.base.Preconditions; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.Pair; + +import org.apache.doris.thrift.TPrimitiveType; +import org.apache.doris.thrift.TScalarType; +import org.apache.doris.thrift.TTypeDesc; +import org.apache.doris.thrift.TTypeNode; + +public class UdfUtils { + protected static Pair fromThrift(TTypeDesc typeDesc, int nodeIdx) throws InternalException { + TTypeNode node = typeDesc.getTypes().get(nodeIdx); + Type type = null; + switch (node.getType()) { + case SCALAR: { + Preconditions.checkState(node.isSetScalarType()); + TScalarType scalarType = node.getScalarType(); + if (scalarType.getType() == TPrimitiveType.CHAR) { + Preconditions.checkState(scalarType.isSetLen()); + type = ScalarType.createCharType(scalarType.getLen()); + } else if (scalarType.getType() == TPrimitiveType.VARCHAR) { + Preconditions.checkState(scalarType.isSetLen()); + type = ScalarType.createVarcharType(scalarType.getLen()); + } else if (scalarType.getType() == TPrimitiveType.DECIMALV2) { + Preconditions.checkState(scalarType.isSetPrecision() + && scalarType.isSetScale()); + type = ScalarType.createDecimalV2Type(scalarType.getPrecision(), + scalarType.getScale()); + } else { + type = ScalarType.createType( + PrimitiveType.fromThrift(scalarType.getType())); + } + break; + } + default: + throw new InternalException("Return type " + node.getType() + " is not supported now!"); + } + return new Pair(type, nodeIdx); + } + + protected static long getAddressAtOffset(long base, int offset) { + return base + 8L * offset; + } +} diff --git a/fe/java-udf/src/test/java/org/apache/doris/udf/ConstantOneUdf.java b/fe/java-udf/src/test/java/org/apache/doris/udf/ConstantOneUdf.java new file mode 100644 index 0000000000..458cdb3cc1 --- /dev/null +++ b/fe/java-udf/src/test/java/org/apache/doris/udf/ConstantOneUdf.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +public class ConstantOneUdf { + public int evaluate() { + return 1; + } +} diff --git a/fe/java-udf/src/test/java/org/apache/doris/udf/SimpleAddUdf.java b/fe/java-udf/src/test/java/org/apache/doris/udf/SimpleAddUdf.java new file mode 100644 index 0000000000..d0ed615156 --- /dev/null +++ b/fe/java-udf/src/test/java/org/apache/doris/udf/SimpleAddUdf.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +public class SimpleAddUdf { + public Integer evaluate(Integer a, int b) { + return a == null? null: a + b; + } +} diff --git a/fe/java-udf/src/test/java/org/apache/doris/udf/UdfExecutorTest.java b/fe/java-udf/src/test/java/org/apache/doris/udf/UdfExecutorTest.java new file mode 100644 index 0000000000..6b1c487604 --- /dev/null +++ b/fe/java-udf/src/test/java/org/apache/doris/udf/UdfExecutorTest.java @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import org.apache.doris.thrift.*; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; + +public class UdfExecutorTest { + @Test + public void testConstantOneUdf() throws Exception { + TScalarFunction scalarFunction = new TScalarFunction(); + scalarFunction.symbol = "org.apache.doris.udf.ConstantOneUdf"; + + TFunction fn = new TFunction(); + fn.binary_type = TFunctionBinaryType.JAVA_UDF; + TTypeNode typeNode = new TTypeNode(TTypeNodeType.SCALAR); + typeNode.scalar_type = new TScalarType(TPrimitiveType.INT); + fn.ret_type = new TTypeDesc(Collections.singletonList(typeNode)); + fn.arg_types = new ArrayList<>(); + fn.scalar_fn = scalarFunction; + fn.name = new TFunctionName("ConstantOne"); + + + long batchSizePtr = UdfExecutor.UNSAFE.allocateMemory(32); + int batchSize = 10; + UdfExecutor.UNSAFE.putInt(batchSizePtr, batchSize); + TJavaUdfExecutorCtorParams params = new TJavaUdfExecutorCtorParams(); + params.batch_size_ptr = batchSizePtr; + params.fn = fn; + + long outputBuffer = UdfExecutor.UNSAFE.allocateMemory(32 * batchSize); + long outputNull = UdfExecutor.UNSAFE.allocateMemory(8 * batchSize); + long outputBufferPtr = UdfExecutor.UNSAFE.allocateMemory(64); + UdfExecutor.UNSAFE.putLong(outputBufferPtr, outputBuffer); + long outputNullPtr = UdfExecutor.UNSAFE.allocateMemory(64); + UdfExecutor.UNSAFE.putLong(outputNullPtr, outputNull); + params.output_buffer_ptr = outputBufferPtr; + params.output_null_ptr = outputNullPtr; + params.input_buffer_ptrs = 0; + params.input_nulls_ptrs = 0; + params.input_byte_offsets = 0; + + TBinaryProtocol.Factory factory = + new TBinaryProtocol.Factory(); + TSerializer serializer = new TSerializer(factory); + + UdfExecutor executor; + executor = new UdfExecutor(serializer.serialize(params)); + + executor.evaluate(); + for (int i = 0; i < 10; i ++) { + assert (UdfExecutor.UNSAFE.getByte(outputNull + 8 * i) == 0); + assert (UdfExecutor.UNSAFE.getInt(outputBuffer + 32 * i) == 1); + } + } + + @Test + public void testSimpleAddUdf() throws Exception { + TScalarFunction scalarFunction = new TScalarFunction(); + scalarFunction.symbol = "org.apache.doris.udf.SimpleAddUdf"; + + TFunction fn = new TFunction(); + fn.binary_type = TFunctionBinaryType.JAVA_UDF; + TTypeNode typeNode = new TTypeNode(TTypeNodeType.SCALAR); + typeNode.scalar_type = new TScalarType(TPrimitiveType.INT); + TTypeDesc typeDesc = new TTypeDesc(Collections.singletonList(typeNode)); + fn.ret_type = typeDesc; + fn.arg_types = Arrays.asList(typeDesc, typeDesc); + fn.scalar_fn = scalarFunction; + fn.name = new TFunctionName("SimpleAdd"); + + + long batchSizePtr = UdfExecutor.UNSAFE.allocateMemory(32); + int batchSize = 10; + UdfExecutor.UNSAFE.putInt(batchSizePtr, batchSize); + + TJavaUdfExecutorCtorParams params = new TJavaUdfExecutorCtorParams(); + params.batch_size_ptr = batchSizePtr; + params.fn = fn; + + long outputBufferPtr = UdfExecutor.UNSAFE.allocateMemory(64); + long outputNullPtr = UdfExecutor.UNSAFE.allocateMemory(64); + long outputBuffer = UdfExecutor.UNSAFE.allocateMemory(32 * batchSize); + long outputNull = UdfExecutor.UNSAFE.allocateMemory(8 * batchSize); + UdfExecutor.UNSAFE.putLong(outputBufferPtr, outputBuffer); + UdfExecutor.UNSAFE.putLong(outputNullPtr, outputNull); + + params.output_buffer_ptr = outputBufferPtr; + params.output_null_ptr = outputNullPtr; + + int numCols = 2; + long inputBufferPtr = UdfExecutor.UNSAFE.allocateMemory(64 * numCols); + long inputNullPtr = UdfExecutor.UNSAFE.allocateMemory(64 * numCols); + + long inputBuffer1 = UdfExecutor.UNSAFE.allocateMemory(32 * batchSize); + long inputNull1 = UdfExecutor.UNSAFE.allocateMemory(8 * batchSize); + long inputBuffer2 = UdfExecutor.UNSAFE.allocateMemory(32 * batchSize); + long inputNull2 = UdfExecutor.UNSAFE.allocateMemory(8 * batchSize); + + UdfExecutor.UNSAFE.putLong(inputBufferPtr, inputBuffer1); + UdfExecutor.UNSAFE.putLong(inputBufferPtr + 64, inputBuffer2); + UdfExecutor.UNSAFE.putLong(inputNullPtr, inputNull1); + UdfExecutor.UNSAFE.putLong(inputNullPtr + 64, inputNull2); + + for (int i = 0; i < batchSize; i ++) { + UdfExecutor.UNSAFE.putInt(null, inputBuffer1 + i * 32, i); + UdfExecutor.UNSAFE.putInt(null, inputBuffer2 + i * 32, i); + + if (i % 2 == 0) { + UdfExecutor.UNSAFE.putByte(null, inputNull1 + i * 8, (byte) 1); + } else { + UdfExecutor.UNSAFE.putByte(null, inputNull1 + i * 8, (byte) 0); + } + UdfExecutor.UNSAFE.putByte(null, inputNull2 + i * 8, (byte) 0); + } + params.input_buffer_ptrs = inputBufferPtr; + params.input_nulls_ptrs = inputNullPtr; + params.input_byte_offsets = 0; + + TBinaryProtocol.Factory factory = + new TBinaryProtocol.Factory(); + TSerializer serializer = new TSerializer(factory); + + UdfExecutor executor; + executor = new UdfExecutor(serializer.serialize(params)); + + executor.evaluate(); + for (int i = 0; i < batchSize; i ++) { + if (i % 2 == 0) { + assert (UdfExecutor.UNSAFE.getByte(outputNull + 8 * i) == 1); + } else { + assert (UdfExecutor.UNSAFE.getByte(outputNull + 8 * i) == 0); + assert (UdfExecutor.UNSAFE.getInt(outputBuffer + 32 * i) == i * 2); + } + } + } +} diff --git a/fe/pom.xml b/fe/pom.xml index 5812cb14e6..b944ebd04f 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -118,6 +118,7 @@ under the License. spark-dpp fe-core hive-udf + java-udf ${basedir}/../ diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 71dd38a613..a9f9a7d1de 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -337,6 +337,136 @@ struct TFunction { 13: optional bool vectorized = false } +struct TJavaUdfExecutorCtorParams { + 1: optional TFunction fn + + // Local path to the UDF's jar file + 2: optional string location + + // The byte offset for each argument in the input buffer. The BE will + // call the Java executor with a buffer for all the inputs. + // input_byte_offsets[0] is the byte offset in the buffer for the first + // argument; input_byte_offsets[1] is the second, etc. + 3: optional i64 input_byte_offsets + + // Native input buffer ptr (cast as i64) for the inputs. The input arguments + // are written to this buffer directly and read from java with no copies + // input_null_ptr[i] is true if the i-th input is null. + // input_buffer_ptr[input_byte_offsets[i]] is the value of the i-th input. + 4: optional i64 input_nulls_ptrs + 5: optional i64 input_buffer_ptrs + + // Native output buffer ptr. For non-variable length types, the output is + // written here and read from the native side with no copies. + // The UDF should set *output_null_ptr to true, if the result of the UDF is + // NULL. + 6: optional i64 output_null_ptr + 7: optional i64 output_buffer_ptr + + 8: optional i64 batch_size_ptr +} + +// Contains all interesting statistics from a single 'memory pool' in the JVM. +// All numeric values are measured in bytes. +struct TJvmMemoryPool { + // Memory committed by the operating system to this pool (i.e. not just virtual address + // space) + 1: required i64 committed + + // The initial amount of memory committed to this pool + 2: required i64 init + + // The maximum amount of memory this pool will use. + 3: required i64 max + + // The amount of memory currently in use by this pool (will be <= committed). + 4: required i64 used + + // Maximum committed memory over time + 5: required i64 peak_committed + + // Should be always == init + 6: required i64 peak_init + + // Peak maximum memory over time (usually will not change) + 7: required i64 peak_max + + // Peak consumed memory over time + 8: required i64 peak_used + + // Name of this pool, defined by the JVM + 9: required string name +} + +// Response from JniUtil::GetJvmMemoryMetrics() +struct TGetJvmMemoryMetricsResponse { + // One entry for every pool tracked by the Jvm, plus a synthetic aggregate pool called + // 'total' + 1: required list memory_pools + + // Metrics from JvmPauseMonitor, measuring how much time is spend + // pausing, presumably because of Garbage Collection. These + // names are consistent with Hadoop's metric names. + 2: required i64 gc_num_warn_threshold_exceeded + 3: required i64 gc_num_info_threshold_exceeded + 4: required i64 gc_total_extra_sleep_time_millis + + // Metrics for JVM Garbage Collection, from the management beans; + // these are cumulative across all types of GCs. + 5: required i64 gc_count + 6: required i64 gc_time_millis +} + +// Contains information about a JVM thread +struct TJvmThreadInfo { + // Summary of a JVM thread. Includes stacktraces, locked monitors + // and synchronizers. + 1: required string summary + + // The total CPU time for this thread in nanoseconds + 2: required i64 cpu_time_in_ns + + // The CPU time that this thread has executed in user mode in nanoseconds + 3: required i64 user_time_in_ns + + // The number of times this thread blocked to enter or reenter a monitor + 4: required i64 blocked_count + + // Approximate accumulated elapsed time (in milliseconds) that this thread has blocked + // to enter or reenter a monitor + 5: required i64 blocked_time_in_ms + + // True if this thread is executing native code via the Java Native Interface (JNI) + 6: required bool is_in_native +} + +// Request to get information about JVM threads +struct TGetJvmThreadsInfoRequest { + // If set, return complete info about JVM threads. Otherwise, return only + // the total number of live JVM threads. + 1: required bool get_complete_info +} + +struct TGetJvmThreadsInfoResponse { + // The current number of live threads including both daemon and non-daemon threads + 1: required i32 total_thread_count + + // The current number of live daemon threads + 2: required i32 daemon_thread_count + + // The peak live thread count since the Java virtual machine started + 3: required i32 peak_thread_count + + // Information about JVM threads. It is not included when + // TGetJvmThreadsInfoRequest.get_complete_info is false. + 4: optional list threads +} + +struct TGetJMXJsonResponse { + // JMX of the JVM serialized to a json string. + 1: required string jmx_json +} + enum TLoadJobState { PENDING, ETL, diff --git a/tools/find_libjvm.sh b/tools/find_libjvm.sh new file mode 100644 index 0000000000..4017616874 --- /dev/null +++ b/tools/find_libjvm.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +jdk_version() { + local result + local java_cmd=$JAVA_HOME/bin/java + local IFS=$'\n' + # remove \r for Cygwin + local lines=$("$java_cmd" -Xms32M -Xmx32M -version 2>&1 | tr '\r' '\n') + if [[ -z $java_cmd ]] + then + result=no_java + else + for line in $lines; do + if [[ (-z $result) && ($line = *"version \""*) ]] + then + local ver=$(echo $line | sed -e 's/.*version "\(.*\)"\(.*\)/\1/; 1q') + # on macOS, sed doesn't support '?' + if [[ $ver = "1."* ]] + then + result=$(echo $ver | sed -e 's/1\.\([0-9]*\)\(.*\)/\1/; 1q') + else + result=$(echo $ver | sed -e 's/\([0-9]*\)\(.*\)/\1/; 1q') + fi + fi + done + fi + echo "$result" +} +java_version=$(jdk_version) +MACHINE_TYPE=$(uname -m) +jvm_arch="amd64" +if [[ "${MACHINE_TYPE}" == "aarch64" ]]; then + jvm_arch="aarch64" +fi +if [[ $java_version -gt 8 ]]; then + export LIBJVM_PATH=$JAVA_HOME/lib +# JAVA_HOME is jdk +elif [[ -d "$JAVA_HOME/jre" ]]; then + export LIBJVM_PATH=$JAVA_HOME/jre/lib/$jvm_arch +# JAVA_HOME is jre +else + export LIBJVM_PATH=$JAVA_HOME/lib/$jvm_arch +fi +echo ${LIBJVM_PATH}/*/libjvm.so