diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp index 0191a80bf8..1293c55576 100644 --- a/be/src/util/jni-util.cpp +++ b/be/src/util/jni-util.cpp @@ -153,6 +153,8 @@ jmethodID JniUtil::throwable_to_stack_trace_id_ = NULL; jmethodID JniUtil::get_jvm_metrics_id_ = NULL; jmethodID JniUtil::get_jvm_threads_id_ = NULL; jmethodID JniUtil::get_jmx_json_ = NULL; +jobject JniUtil::jni_scanner_loader_obj_ = NULL; +jmethodID JniUtil::jni_scanner_loader_method_ = NULL; Status JniUtfCharGuard::create(JNIEnv* env, jstring jstr, JniUtfCharGuard* out) { DCHECK(jstr != nullptr); @@ -340,12 +342,57 @@ Status JniUtil::LocalToGlobalRef(JNIEnv* env, jobject local_ref, jobject* global return Status::OK(); } +Status JniUtil::init_jni_scanner_loader(JNIEnv* env) { + // Get scanner loader; + jclass jni_scanner_loader_cls; + std::string jni_scanner_loader_str = "org/apache/doris/common/classloader/ScannerLoader"; + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, jni_scanner_loader_str.c_str(), + &jni_scanner_loader_cls)); + jmethodID jni_scanner_loader_constructor = + env->GetMethodID(jni_scanner_loader_cls, "", "()V"); + RETURN_ERROR_IF_EXC(env); + jni_scanner_loader_method_ = env->GetMethodID(jni_scanner_loader_cls, "getLoadedClass", + "(Ljava/lang/String;)Ljava/lang/Class;"); + if (jni_scanner_loader_method_ == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Failed to find ScannerLoader.getLoadedClass method."); + } + RETURN_ERROR_IF_EXC(env); + jmethodID load_jni_scanner = + env->GetMethodID(jni_scanner_loader_cls, "loadAllScannerJars", "()V"); + RETURN_ERROR_IF_EXC(env); + + jni_scanner_loader_obj_ = + env->NewObject(jni_scanner_loader_cls, jni_scanner_loader_constructor); + RETURN_ERROR_IF_EXC(env); + if (jni_scanner_loader_obj_ == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Failed to create ScannerLoader object."); + } + env->CallVoidMethod(jni_scanner_loader_obj_, load_jni_scanner); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} + +Status JniUtil::get_jni_scanner_class(JNIEnv* env, std::string classname, + jclass* jni_scanner_class) { + // Get JNI scanner class by class name; + jobject loaded_class_obj = + env->CallObjectMethod(jni_scanner_loader_obj_, jni_scanner_loader_method_, + env->NewStringUTF(classname.c_str())); + RETURN_ERROR_IF_EXC(env); + *jni_scanner_class = reinterpret_cast(env->NewGlobalRef(loaded_class_obj)); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} + Status JniUtil::Init() { RETURN_IF_ERROR(LibJVMLoader::instance().load()); // Get the JNIEnv* corresponding to current thread. JNIEnv* env; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + if (env == NULL) return Status::InternalError("Failed to get/create JVM"); // Find JniUtil class and create a global ref. jclass local_jni_util_cl = env->FindClass("org/apache/doris/common/jni/utils/JniUtil"); @@ -454,6 +501,7 @@ Status JniUtil::Init() { if (env->ExceptionOccurred()) env->ExceptionDescribe(); return Status::InternalError("Failed to find JniUtil.getJMXJson method."); } + RETURN_IF_ERROR(init_jni_scanner_loader(env)); jvm_inited_ = true; return Status::OK(); } diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index bb08612be0..15649b8efc 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -30,7 +30,7 @@ #include "util/thrift_util.h" #ifdef USE_HADOOP_HDFS -// defined in hadoop_hdfs/hdfs.h +// defined in hadoop/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c extern "C" JNIEnv* getJNIEnv(void); #endif @@ -74,12 +74,13 @@ public: static inline int64_t IncreaseReservedBufferSize(int n) { return INITIAL_RESERVED_BUFFER_SIZE << n; } - + static Status get_jni_scanner_class(JNIEnv* env, std::string classname, jclass* loaded_class); static jobject convert_to_java_map(JNIEnv* env, const std::map& map); static std::map convert_to_cpp_map(JNIEnv* env, jobject map); private: static Status GetJNIEnvSlowPath(JNIEnv** env); + static Status init_jni_scanner_loader(JNIEnv* env); static bool jvm_inited_; static jclass internal_exc_cl_; @@ -90,7 +91,9 @@ private: static jmethodID get_jvm_metrics_id_; static jmethodID get_jvm_threads_id_; static jmethodID get_jmx_json_; - + // JNI scanner loader + static jobject jni_scanner_loader_obj_; + static jmethodID jni_scanner_loader_method_; // Thread-local cache of the JNIEnv for this thread. static __thread JNIEnv* tls_env_; }; diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index ea7e9e82c4..5bac2759f0 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -201,7 +201,12 @@ Status JniConnector::close() { } Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) { - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, _connector_class.c_str(), &_jni_scanner_cls)); + RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, _connector_class, &_jni_scanner_cls)); + if (_jni_scanner_cls == NULL) { + if (env->ExceptionOccurred()) env->ExceptionDescribe(); + return Status::InternalError("Fail to get JniScanner class."); + } + RETURN_ERROR_IF_EXC(env); jmethodID scanner_constructor = env->GetMethodID(_jni_scanner_cls, "", "(ILjava/util/Map;)V"); RETURN_ERROR_IF_EXC(env); diff --git a/bin/start_be.sh b/bin/start_be.sh index b0ea52ea06..5f17fefca4 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -78,7 +78,7 @@ if [[ "${MAX_FILE_COUNT}" -lt 65536 ]]; then fi # add java libs -for f in "${DORIS_HOME}/lib/java_extensions"/*.jar; do +for f in "${DORIS_HOME}/lib/java_extensions/java-udf"/*.jar; do if [[ -z "${DORIS_CLASSPATH}" ]]; then export DORIS_CLASSPATH="${f}" else diff --git a/build.sh b/build.sh index ff12f464e3..92f98f31a9 100755 --- a/build.sh +++ b/build.sh @@ -667,8 +667,7 @@ EOF cp -r -p "${DORIS_HOME}/bin/run-fs-benchmark.sh" "${DORIS_OUTPUT}/be/bin/"/ fi - extensions_modules=("") - extensions_modules+=("java-udf") + extensions_modules=("java-udf") extensions_modules+=("jdbc-scanner") extensions_modules+=("hudi-scanner") extensions_modules+=("paimon-scanner") @@ -680,8 +679,9 @@ EOF mkdir "${BE_JAVA_EXTENSIONS_DIR}" for extensions_module in "${extensions_modules[@]}"; do module_path="${DORIS_HOME}/fe/be-java-extensions/${extensions_module}/target/${extensions_module}-jar-with-dependencies.jar" + mkdir "${BE_JAVA_EXTENSIONS_DIR}"/"${extensions_module}" if [[ -f "${module_path}" ]]; then - cp "${module_path}" "${BE_JAVA_EXTENSIONS_DIR}"/ + cp "${module_path}" "${BE_JAVA_EXTENSIONS_DIR}"/"${extensions_module}" fi done diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java new file mode 100644 index 0000000000..12105546cf --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.classloader; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; + +public class JniScannerClassLoader extends URLClassLoader { + + private final String scannerName; + + public JniScannerClassLoader(String scannerName, List urls) { + super(urls.toArray(new URL[0]), ClassLoader.getSystemClassLoader()); + this.scannerName = scannerName; + } + + @Override + public String toString() { + return "JniScannerClassLoader{" + + "scannerName='" + scannerName + + '}'; + } +} diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java new file mode 100644 index 0000000000..726a0dbe0e --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.classloader; + +import com.google.common.collect.Streams; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; +import java.util.stream.Collectors; + +/** + * BE will load scanners by JNI call, and then the JniConnector on BE will get scanner class by getLoadedClass. + */ +public class ScannerLoader { + private static final Map> loadedClasses = new HashMap<>(); + private static final String CLASS_SUFFIX = ".class"; + private static final String LOAD_PACKAGE = "org.apache.doris"; + + /** + * Load all classes from $DORIS_HOME/lib/java_extensions/* + */ + public void loadAllScannerJars() { + String basePath = System.getenv("DORIS_HOME"); + File library = new File(basePath, "/lib/java_extensions/"); + // TODO: add thread pool to load each scanner + listFiles(library).stream().filter(File::isDirectory).forEach(sd -> { + JniScannerClassLoader classLoader = new JniScannerClassLoader(sd.getName(), buildClassPath(sd)); + try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { + loadJarClassFromDir(sd, classLoader); + } + }); + } + + /** + * Get loaded class for JNI scanners + * @param className JNI scanner class name + * @return scanner class object + * @throws ClassNotFoundException JNI scanner class not found + */ + public Class getLoadedClass(String className) throws ClassNotFoundException { + String loadedClassName = getPackagePathName(className); + if (loadedClasses.containsKey(loadedClassName)) { + return loadedClasses.get(loadedClassName); + } else { + throw new ClassNotFoundException("JNI scanner has not been loaded or no such class: " + className); + } + } + + private static List buildClassPath(File path) { + return listFiles(path).stream() + .map(ScannerLoader::classFileUrl) + .collect(Collectors.toList()); + } + + private static URL classFileUrl(File file) { + try { + return file.toURI().toURL(); + } catch (MalformedURLException e) { + throw new UncheckedIOException(e); + } + } + + public static List listFiles(File library) { + try (DirectoryStream directoryStream = Files.newDirectoryStream(library.toPath())) { + return Streams.stream(directoryStream) + .map(Path::toFile) + .sorted() + .collect(Collectors.toList()); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void loadJarClassFromDir(File dir, JniScannerClassLoader classLoader) { + listFiles(dir).forEach(file -> { + Enumeration entryEnumeration; + List loadClassNames = new ArrayList<>(); + try { + try (JarFile jar = new JarFile(file)) { + entryEnumeration = jar.entries(); + while (entryEnumeration.hasMoreElements()) { + JarEntry entry = entryEnumeration.nextElement(); + String className = entry.getName(); + if (!className.endsWith(CLASS_SUFFIX)) { + continue; + } + className = className.substring(0, className.length() - CLASS_SUFFIX.length()); + String packageClassName = getPackagePathName(className); + if (needToLoad(packageClassName)) { + loadClassNames.add(packageClassName); + } + } + } + for (String className : loadClassNames) { + loadedClasses.putIfAbsent(className, classLoader.loadClass(className)); + } + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + }); + } + + private static String getPackagePathName(String className) { + return className.replace("/", "."); + } + + private static boolean needToLoad(String className) { + return className.contains(LOAD_PACKAGE) && !className.contains("$"); + } +} diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ThreadClassLoaderContext.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ThreadClassLoaderContext.java new file mode 100644 index 0000000000..3f358a5016 --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ThreadClassLoaderContext.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.classloader; + +import java.io.Closeable; + +public class ThreadClassLoaderContext implements Closeable { + + private final ClassLoader originClassLoader; + + public ThreadClassLoaderContext(ClassLoader contextClassLoader) { + this.originClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + + @Override + public void close() { + Thread.currentThread().setContextClassLoader(originClassLoader); + } +}