From 42bdde87504e8bebe75df7f0d16544cc8074cf81 Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Wed, 7 Sep 2022 10:29:41 +0800 Subject: [PATCH] [Feature](Vectorized) support jdbc scan node (#12010) --- be/src/exec/exec_node.cpp | 14 + be/src/runtime/descriptors.cpp | 28 ++ be/src/runtime/descriptors.h | 24 ++ be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/user_function_cache.cpp | 18 +- be/src/vec/CMakeLists.txt | 2 + be/src/vec/exec/vjdbc_connector.cpp | 344 ++++++++++++++++++ be/src/vec/exec/vjdbc_connector.h | 98 +++++ be/src/vec/exec/vjdbc_scan_node.cpp | 173 +++++++++ be/src/vec/exec/vjdbc_scan_node.h | 68 ++++ .../ecosystem/external-table/jdbc-of-doris.md | 215 +++++++++++ .../ecosystem/external-table/jdbc-of-doris.md | 211 +++++++++++ .../doris/analysis/CreateTableStmt.java | 3 +- .../java/org/apache/doris/catalog/Env.java | 9 +- .../apache/doris/catalog/JdbcResource.java | 196 ++++++++++ .../org/apache/doris/catalog/JdbcTable.java | 185 ++++++++++ .../org/apache/doris/catalog/Resource.java | 6 +- .../org/apache/doris/catalog/ResourceMgr.java | 5 +- .../java/org/apache/doris/catalog/Table.java | 2 + .../org/apache/doris/catalog/TableIf.java | 5 +- .../doris/datasource/InternalCatalog.java | 19 + .../apache/doris/persist/gson/GsonUtils.java | 4 +- .../apache/doris/planner/JdbcScanNode.java | 195 ++++++++++ .../apache/doris/planner/OdbcScanNode.java | 2 +- .../doris/planner/SingleNodePlanner.java | 4 + .../doris/statistics/StatisticalType.java | 1 + .../org/apache/doris/udf/JdbcExecutor.java | 167 +++++++++ gensrc/thrift/Descriptors.thrift | 12 + gensrc/thrift/PlanNodes.thrift | 8 + gensrc/thrift/Types.thrift | 20 +- 30 files changed, 2023 insertions(+), 17 deletions(-) create mode 100644 be/src/vec/exec/vjdbc_connector.cpp create mode 100644 be/src/vec/exec/vjdbc_connector.h create mode 100644 be/src/vec/exec/vjdbc_scan_node.cpp create mode 100644 be/src/vec/exec/vjdbc_scan_node.h create mode 100644 docs/en/docs/ecosystem/external-table/jdbc-of-doris.md create mode 100644 docs/zh-CN/docs/ecosystem/external-table/jdbc-of-doris.md create mode 100644 fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/JdbcScanNode.java create mode 100644 fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 7f6f07d1c2..be0b8690f2 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -73,6 +73,7 @@ #include "vec/exec/vexcept_node.h" #include "vec/exec/vexchange_node.h" #include "vec/exec/vintersect_node.h" +#include "vec/exec/vjdbc_scan_node.h" #include "vec/exec/vmysql_scan_node.h" #include "vec/exec/vodbc_scan_node.h" #include "vec/exec/volap_scan_node.h" @@ -417,6 +418,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::BROKER_SCAN_NODE: case TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE: case TPlanNodeType::FILE_SCAN_NODE: + case TPlanNodeType::JDBC_SCAN_NODE: break; default: { const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); @@ -452,6 +454,18 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN } return Status::OK(); + case TPlanNodeType::JDBC_SCAN_NODE: + if (state->enable_vectorized_exec()) { +#ifdef LIBJVM + *node = pool->add(new vectorized::VJdbcScanNode(pool, tnode, descs)); +#else + return Status::InternalError("Jdbc scan node is disabled since no libjvm is found!"); +#endif + } else { + return Status::InternalError("Jdbc scan node only support vectorized engine."); + } + return Status::OK(); + case TPlanNodeType::ES_HTTP_SCAN_NODE: if (state->enable_vectorized_exec()) { *node = pool->add(new vectorized::VEsHttpScanNode(pool, tnode, descs)); diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 585f1ce574..9eea456fb2 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -20,6 +20,8 @@ #include "runtime/descriptors.h" +#include + #include #include #include @@ -221,6 +223,29 @@ std::string ODBCTableDescriptor::debug_string() const { return out.str(); } +JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc) + : TableDescriptor(tdesc), + _jdbc_resource_name(tdesc.jdbcTable.jdbc_resource_name), + _jdbc_driver_url(tdesc.jdbcTable.jdbc_driver_url), + _jdbc_driver_class(tdesc.jdbcTable.jdbc_driver_class), + _jdbc_driver_checksum(tdesc.jdbcTable.jdbc_driver_checksum), + _jdbc_url(tdesc.jdbcTable.jdbc_url), + _jdbc_table_name(tdesc.jdbcTable.jdbc_table_name), + _jdbc_user(tdesc.jdbcTable.jdbc_user), + _jdbc_passwd(tdesc.jdbcTable.jdbc_password) {} + +std::string JdbcTableDescriptor::debug_string() const { + fmt::memory_buffer buf; + fmt::format_to(buf, + "JDBCTable({} ,_jdbc_resource_name={} ,_jdbc_driver_url={} " + ",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} " + ",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={})", + TableDescriptor::debug_string(), _jdbc_resource_name, _jdbc_driver_url, + _jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url, _jdbc_table_name, + _jdbc_user, _jdbc_passwd); + return fmt::to_string(buf); +} + TupleDescriptor::TupleDescriptor(const TTupleDescriptor& tdesc) : _id(tdesc.id), _table_desc(nullptr), @@ -563,6 +588,9 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb case TTableType::ICEBERG_TABLE: desc = pool->add(new IcebergTableDescriptor(tdesc)); break; + case TTableType::JDBC_TABLE: + desc = pool->add(new JdbcTableDescriptor(tdesc)); + break; default: DCHECK(false) << "invalid table type: " << tdesc.tableType; } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index abc87e705f..c49eebb6db 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -275,6 +275,30 @@ private: TOdbcTableType::type _type; }; +class JdbcTableDescriptor : public TableDescriptor { +public: + JdbcTableDescriptor(const TTableDescriptor& tdesc); + std::string debug_string() const override; + const std::string& jdbc_resource_name() const { return _jdbc_resource_name; } + const std::string& jdbc_driver_url() const { return _jdbc_driver_url; } + const std::string& jdbc_driver_class() const { return _jdbc_driver_class; } + const std::string& jdbc_driver_checksum() const { return _jdbc_driver_checksum; } + const std::string& jdbc_url() const { return _jdbc_url; } + const std::string& jdbc_table_name() const { return _jdbc_table_name; } + const std::string& jdbc_user() const { return _jdbc_user; } + const std::string& jdbc_passwd() const { return _jdbc_passwd; } + +private: + std::string _jdbc_resource_name; + std::string _jdbc_driver_url; + std::string _jdbc_driver_class; + std::string _jdbc_driver_checksum; + std::string _jdbc_url; + std::string _jdbc_table_name; + std::string _jdbc_user; + std::string _jdbc_passwd; +}; + class TupleDescriptor { public: // virtual ~TupleDescriptor() {} diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 969900e995..c5b3cbf818 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -726,7 +726,7 @@ bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) { type == TPlanNodeType::BROKER_SCAN_NODE || type == TPlanNodeType::ES_SCAN_NODE || type == TPlanNodeType::ES_HTTP_SCAN_NODE || type == TPlanNodeType::ODBC_SCAN_NODE || type == TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE || - type == TPlanNodeType::FILE_SCAN_NODE; + type == TPlanNodeType::FILE_SCAN_NODE || type == TPlanNodeType::JDBC_SCAN_NODE; } Status FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason, diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp index 150cdc6e8f..47e688ed5f 100644 --- a/be/src/runtime/user_function_cache.cpp +++ b/be/src/runtime/user_function_cache.cpp @@ -18,7 +18,6 @@ #include "runtime/user_function_cache.h" #include -#include // boost::algorithm::ends_with #include #include @@ -27,6 +26,7 @@ #include "http/http_client.h" #include "util/dynamic_util.h" #include "util/file_utils.h" +#include "util/string_util.h" #ifdef LIBJVM #include "util/jni-util.h" #endif @@ -97,7 +97,7 @@ UserFunctionCacheEntry::~UserFunctionCacheEntry() { } } -UserFunctionCache::UserFunctionCache() {} +UserFunctionCache::UserFunctionCache() = default; UserFunctionCache::~UserFunctionCache() { std::lock_guard l(_cache_lock); @@ -130,8 +130,13 @@ Status UserFunctionCache::init(const std::string& lib_dir) { } Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std::string& file) { - if (!boost::algorithm::ends_with(file, ".so")) { - return Status::InternalError("unknown library file format"); + LibType lib_type; + if (ends_with(file, ".so")) { + lib_type = LibType::SO; + } else if (ends_with(file, ".jar")) { + lib_type = LibType::JAR; + } else { + return Status::InternalError("unknown library file format: " + file); } std::vector split_parts = strings::Split(file, "."); @@ -149,7 +154,7 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std } // create a cache entry and put it into entry map UserFunctionCacheEntry* entry = - new UserFunctionCacheEntry(function_id, checksum, dir + "/" + file, LibType::SO); + new UserFunctionCacheEntry(function_id, checksum, dir + "/" + file, lib_type); entry->is_downloaded = true; entry->ref(); @@ -255,7 +260,6 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url, } else { entry = new UserFunctionCacheEntry(fid, checksum, _make_lib_file(fid, checksum, type), type); - entry->ref(); _entry_map.emplace(fid, entry); } @@ -343,7 +347,7 @@ Status UserFunctionCache::_download_lib(const std::string& url, UserFunctionCach RETURN_IF_ERROR(client.execute(download_cb)); RETURN_IF_ERROR(status); digest.digest(); - if (!boost::iequals(digest.hex(), entry->checksum)) { + if (!iequal(digest.hex(), entry->checksum)) { LOG(WARNING) << "UDF's checksum is not equal, one=" << digest.hex() << ", other=" << entry->checksum; return Status::InternalError("UDF's library checksum is not match"); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 23f8c08fad..f6214a0501 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -107,6 +107,8 @@ set(VEC_FILES exec/vbroker_scan_node.cpp exec/vbroker_scanner.cpp exec/vjson_scanner.cpp + exec/vjdbc_connector.cpp + exec/vjdbc_scan_node.cpp exec/vparquet_scanner.cpp exec/vorc_scanner.cpp exec/join/vhash_join_node.cpp diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp new file mode 100644 index 0000000000..d598ef26c7 --- /dev/null +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -0,0 +1,344 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vjdbc_connector.h" +#ifdef LIBJVM +#include "gen_cpp/Types_types.h" +#include "gutil/strings/substitute.h" +#include "jni.h" +#include "runtime/user_function_cache.h" +#include "util/jni-util.h" +#include "vec/columns/column_nullable.h" +namespace doris { +namespace vectorized { +const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/udf/JdbcExecutor"; +const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V"; +const char* JDBC_EXECUTOR_QUERYSQL_SIGNATURE = "(Ljava/lang/String;)I"; +const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z"; +const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;"; +const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V"; +const char* JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE = "(Ljava/lang/Object;)J"; +const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;)J"; + +JdbcConnector::JdbcConnector(const JdbcConnectorParam& param) + : _is_open(false), + _query_string(param.query_string), + _tuple_desc(param.tuple_desc), + _conn_param(param) {} + +JdbcConnector::~JdbcConnector() { + if (!_is_open) { + return; + } + JNIEnv* env; + Status status; + RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_close_id); + RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env)); + env->DeleteGlobalRef(_executor_obj); +} + +Status JdbcConnector::open() { + if (_is_open) { + LOG(INFO) << "this scanner of jdbc already opened"; + return Status::OK(); + } + + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JDBC_EXECUTOR_CLASS, &_executor_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/List", &_executor_list_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Object", &_executor_object_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Boolean", &_executor_uint8_t_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Byte", &_executor_int8_t_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Short", &_executor_int16_t_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Integer", &_executor_int32_t_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Long", &_executor_int64_t_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_float_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_double_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/String", &_executor_string_clazz)); + RETURN_IF_ERROR(_register_func_id(env)); + + // Add a scoped cleanup jni reference object. This cleans up local refs made below. + JniLocalFrame jni_frame; + { + std::string local_location; + std::hash hash_str; + auto function_cache = UserFunctionCache::instance(); + RETURN_IF_ERROR(function_cache->get_jarpath( + std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path, + _conn_param.driver_checksum, &local_location)); + TJdbcExecutorCtorParams ctor_params; + ctor_params.__set_jar_location_path(local_location); + ctor_params.__set_jdbc_url(_conn_param.jdbc_url); + ctor_params.__set_jdbc_user(_conn_param.user); + ctor_params.__set_jdbc_password(_conn_param.passwd); + ctor_params.__set_jdbc_driver_class(_conn_param.driver_class); + + jbyteArray ctor_params_bytes; + // Pushed frame will be popped when jni_frame goes out-of-scope. + RETURN_IF_ERROR(jni_frame.push(env)); + RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); + _executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes); + } + RETURN_ERROR_IF_EXC(env); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _executor_obj, &_executor_obj)); + _is_open = true; + return Status::OK(); +} + +Status JdbcConnector::query_exec() { + if (!_is_open) { + return Status::InternalError("Query before open of JdbcConnector."); + } + // check materialize num equal + int materialize_num = 0; + for (int i = 0; i < _tuple_desc->slots().size(); ++i) { + if (_tuple_desc->slots()[i]->is_materialized()) { + materialize_num++; + } + } + + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + jstring query_sql = env->NewStringUTF(_query_string.c_str()); + jint colunm_count = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, + _executor_query_id, query_sql); + env->DeleteLocalRef(query_sql); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + + if (colunm_count != materialize_num) { + return Status::InternalError("input and output column num not equal of jdbc query."); + } + return Status::OK(); +} + +Status JdbcConnector::get_next(bool* eos, std::vector& columns, int batch_size) { + if (!_is_open) { + return Status::InternalError("get_next before open if jdbc."); + } + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + jboolean has_next = + env->CallNonvirtualBooleanMethod(_executor_obj, _executor_clazz, _executor_has_next_id); + if (has_next != JNI_TRUE) { + *eos = true; + return Status::OK(); + } + + jobject block_obj = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz, + _executor_get_blocks_id, batch_size); + + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + + auto column_size = _tuple_desc->slots().size(); + for (int column_index = 0, materialized_column_index = 0; column_index < column_size; + ++column_index) { + auto slot_desc = _tuple_desc->slots()[column_index]; + // because the fe planner filter the non_materialize column + if (!slot_desc->is_materialized()) { + continue; + } + jobject column_data = + env->CallObjectMethod(block_obj, _executor_get_list_id, materialized_column_index); + jint num_rows = env->CallIntMethod(column_data, _executor_get_list_size_id); + + for (int row = 0; row < num_rows; ++row) { + jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row); + _convert_column_data(env, cur_data, slot_desc, columns[column_index].get()); + } + + materialized_column_index++; + } + return JniUtil::GetJniExceptionMsg(env); +} + +Status JdbcConnector::_register_func_id(JNIEnv* env) { + auto register_id = [&](jclass clazz, const char* func_name, const char* func_sign, + jmethodID& func_id) { + func_id = env->GetMethodID(clazz, func_name, func_sign); + Status s = JniUtil::GetJniExceptionMsg(env); + if (!s.ok()) { + return Status::InternalError(strings::Substitute( + "Jdbc connector _register_func_id meet error and error is $0", + s.get_error_msg())); + } + return s; + }; + + RETURN_IF_ERROR(register_id(_executor_clazz, "", JDBC_EXECUTOR_CTOR_SIGNATURE, + _executor_ctor_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "querySQL", JDBC_EXECUTOR_QUERYSQL_SIGNATURE, + _executor_query_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE, + _executor_close_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE, + _executor_has_next_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", JDBC_EXECUTOR_GET_BLOCK_SIGNATURE, + _executor_get_blocks_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateToLong", + JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE, _executor_convert_date_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateTimeToLong", + JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE, + _executor_convert_datetime_id)); + RETURN_IF_ERROR(register_id(_executor_list_clazz, "get", "(I)Ljava/lang/Object;", + _executor_get_list_id)); + RETURN_IF_ERROR(register_id(_executor_list_clazz, "size", "()I", _executor_get_list_size_id)); + RETURN_IF_ERROR(register_id(_executor_string_clazz, "getBytes", "(Ljava/lang/String;)[B", + _get_bytes_id)); + RETURN_IF_ERROR( + register_id(_executor_object_clazz, "toString", "()Ljava/lang/String;", _to_string_id)); + + return Status::OK(); +} + +Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj, + const SlotDescriptor* slot_desc, + vectorized::IColumn* column_ptr) { + vectorized::IColumn* col_ptr = column_ptr; + if (true == slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast(column_ptr); + if (jobj == nullptr) { + nullable_column->insert_data(nullptr, 0); + return Status::OK(); + } else { + nullable_column->get_null_map_data().push_back(0); + col_ptr = &nullable_column->get_nested_column(); + } + } + + switch (slot_desc->type().type) { + case TYPE_BOOLEAN: { + uint8_t num = _jobject_to_uint8_t(env, jobj); + reinterpret_cast*>(col_ptr)->insert_value( + (uint8_t)num); + break; + } + case TYPE_TINYINT: { + int8_t num = _jobject_to_int8_t(env, jobj); + reinterpret_cast*>(col_ptr)->insert_value(num); + break; + } + case TYPE_SMALLINT: { + int16_t num = _jobject_to_int16_t(env, jobj); + reinterpret_cast*>(col_ptr)->insert_value(num); + break; + } + + case TYPE_INT: { + int32_t num = _jobject_to_int32_t(env, jobj); + reinterpret_cast*>(col_ptr)->insert_value(num); + break; + } + + case TYPE_BIGINT: { + int64_t num = _jobject_to_int64_t(env, jobj); + reinterpret_cast*>(col_ptr)->insert_value(num); + break; + } + + case TYPE_FLOAT: { + float num = _jobject_to_float(env, jobj); + reinterpret_cast*>(col_ptr)->insert_value( + num); + break; + } + case TYPE_DOUBLE: { + double num = _jobject_to_double(env, jobj); + reinterpret_cast*>(col_ptr)->insert_value( + num); + break; + } + + case TYPE_STRING: + case TYPE_CHAR: + case TYPE_VARCHAR: { + std::string data = _jobject_to_string(env, jobj); + reinterpret_cast(col_ptr)->insert_data(data.c_str(), + data.length()); + break; + } + + case TYPE_DATE: { + int64_t num = _jobject_to_date(env, jobj); + reinterpret_cast*>(col_ptr)->insert_value(num); + break; + } + case TYPE_DATETIME: { + int64_t num = _jobject_to_datetime(env, jobj); + reinterpret_cast*>(col_ptr)->insert_value(num); + break; + } + case TYPE_DECIMALV2: { + std::string data = _jobject_to_string(env, jobj); + DecimalV2Value decimal_slot; + decimal_slot.parse_from_str(data.c_str(), data.length()); + reinterpret_cast*>(col_ptr)->insert_value( + decimal_slot.value()); + break; + } + default: { + std::string error_msg = + fmt::format("Fail to convert jdbc value to {} on column: {}.", + slot_desc->type().debug_string(), slot_desc->col_name()); + return Status::InternalError(std::string(error_msg)); + } + } + return Status::OK(); +} + +std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) { + jobject jstr = env->CallObjectMethod(jobj, _to_string_id); + const jbyteArray stringJbytes = + (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, env->NewStringUTF("UTF-8")); + size_t length = (size_t)env->GetArrayLength(stringJbytes); + jbyte* pBytes = env->GetByteArrayElements(stringJbytes, nullptr); + std::string str = std::string((char*)pBytes, length); + env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT); + env->DeleteLocalRef(stringJbytes); + return str; +} + +int64_t JdbcConnector::_jobject_to_date(JNIEnv* env, jobject jobj) { + return env->CallNonvirtualLongMethod(_executor_obj, _executor_clazz, _executor_convert_date_id, + jobj); +} + +int64_t JdbcConnector::_jobject_to_datetime(JNIEnv* env, jobject jobj) { + return env->CallNonvirtualLongMethod(_executor_obj, _executor_clazz, + _executor_convert_datetime_id, jobj); +} + +#define FUNC_IMPL_TO_CONVERT_DATA(cpp_return_type, java_type, sig, java_return_type) \ + cpp_return_type JdbcConnector::_jobject_to_##cpp_return_type(JNIEnv* env, jobject jobj) { \ + jmethodID method_id_##cpp_return_type = env->GetMethodID( \ + _executor_##cpp_return_type##_clazz, #java_type "Value", "()" #sig); \ + return env->Call##java_return_type##Method(jobj, method_id_##cpp_return_type); \ + } + +FUNC_IMPL_TO_CONVERT_DATA(uint8_t, boolean, Z, Boolean) +FUNC_IMPL_TO_CONVERT_DATA(int8_t, byte, B, Byte) +FUNC_IMPL_TO_CONVERT_DATA(int16_t, short, S, Short) +FUNC_IMPL_TO_CONVERT_DATA(int32_t, int, I, Int) +FUNC_IMPL_TO_CONVERT_DATA(int64_t, long, J, Long) +FUNC_IMPL_TO_CONVERT_DATA(float, float, F, Float) +FUNC_IMPL_TO_CONVERT_DATA(double, double, D, Double) + +} // namespace vectorized +} // namespace doris + +#endif \ No newline at end of file diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h new file mode 100644 index 0000000000..750096fabf --- /dev/null +++ b/be/src/vec/exec/vjdbc_connector.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#ifdef LIBJVM + +#include + +#include "jni.h" +#include "runtime/descriptors.h" + +namespace doris { +namespace vectorized { +struct JdbcConnectorParam { + std::string driver_path; + std::string driver_class; + std::string resource_name; + std::string driver_checksum; + std::string jdbc_url; + std::string user; + std::string passwd; + std::string query_string; + + const TupleDescriptor* tuple_desc; +}; + +class JdbcConnector { +public: + JdbcConnector(const JdbcConnectorParam& param); + + ~JdbcConnector(); + + Status open(); + + Status query_exec(); + + Status get_next(bool* eos, std::vector& columns, int batch_size); + +private: + Status _register_func_id(JNIEnv* env); + Status _convert_column_data(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc, + vectorized::IColumn* column_ptr); + std::string _jobject_to_string(JNIEnv* env, jobject jobj); + int64_t _jobject_to_date(JNIEnv* env, jobject jobj); + int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj); + + bool _is_open; + std::string _query_string; + const TupleDescriptor* _tuple_desc; + const JdbcConnectorParam _conn_param; + jclass _executor_clazz; + jclass _executor_list_clazz; + jclass _executor_object_clazz; + jclass _executor_string_clazz; + jobject _executor_obj; + jmethodID _executor_ctor_id; + jmethodID _executor_query_id; + jmethodID _executor_has_next_id; + jmethodID _executor_get_blocks_id; + jmethodID _executor_close_id; + jmethodID _executor_get_list_id; + jmethodID _executor_get_list_size_id; + jmethodID _executor_convert_date_id; + jmethodID _executor_convert_datetime_id; + jmethodID _get_bytes_id; + jmethodID _to_string_id; + +#define FUNC_VARI_DECLARE(RETURN_TYPE) \ + RETURN_TYPE _jobject_to_##RETURN_TYPE(JNIEnv* env, jobject jobj); \ + jclass _executor_##RETURN_TYPE##_clazz; + + FUNC_VARI_DECLARE(uint8_t) + FUNC_VARI_DECLARE(int8_t) + FUNC_VARI_DECLARE(int16_t) + FUNC_VARI_DECLARE(int32_t) + FUNC_VARI_DECLARE(int64_t) + FUNC_VARI_DECLARE(float) + FUNC_VARI_DECLARE(double) +}; + +} // namespace vectorized +} // namespace doris + +#endif \ No newline at end of file diff --git a/be/src/vec/exec/vjdbc_scan_node.cpp b/be/src/vec/exec/vjdbc_scan_node.cpp new file mode 100644 index 0000000000..d302d01be8 --- /dev/null +++ b/be/src/vec/exec/vjdbc_scan_node.cpp @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "vec/exec/vjdbc_scan_node.h" +#ifdef LIBJVM +#include + +#include "common/status.h" + +namespace doris { +namespace vectorized { + +VJdbcScanNode::VJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ScanNode(pool, tnode, descs), + _is_init(false), + _table_name(tnode.jdbc_scan_node.table_name), + _tuple_id(tnode.jdbc_scan_node.tuple_id), + _query_string(tnode.jdbc_scan_node.query_string), + _tuple_desc(nullptr) {} + +Status VJdbcScanNode::prepare(RuntimeState* state) { + VLOG_CRITICAL << "VJdbcScanNode::Prepare"; + if (_is_init) { + return Status::OK(); + } + + if (state == nullptr) { + return Status::InternalError("input pointer is NULL of VJdbcScanNode::prepare."); + } + + RETURN_IF_ERROR(ScanNode::prepare(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + + // get tuple desc + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + return Status::InternalError("Failed to get tuple descriptor."); + } + + // get jdbc table info + const JdbcTableDescriptor* jdbc_table = + static_cast(_tuple_desc->table_desc()); + if (jdbc_table == nullptr) { + return Status::InternalError("jdbc table pointer is NULL of VJdbcScanNode::prepare."); + } + _jdbc_param.driver_class = jdbc_table->jdbc_driver_class(); + _jdbc_param.driver_path = jdbc_table->jdbc_driver_url(); + _jdbc_param.resource_name = jdbc_table->jdbc_resource_name(); + _jdbc_param.driver_checksum = jdbc_table->jdbc_driver_checksum(); + _jdbc_param.jdbc_url = jdbc_table->jdbc_url(); + _jdbc_param.user = jdbc_table->jdbc_user(); + _jdbc_param.passwd = jdbc_table->jdbc_passwd(); + _jdbc_param.tuple_desc = _tuple_desc; + _jdbc_param.query_string = std::move(_query_string); + + _jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param)); + if (_jdbc_connector == nullptr) { + return Status::InternalError("new a jdbc scanner failed."); + } + + _is_init = true; + return Status::OK(); +} + +Status VJdbcScanNode::open(RuntimeState* state) { + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJdbcScanNode::open"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::open(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + VLOG_CRITICAL << "VJdbcScanNode::open"; + + if (state == nullptr) { + return Status::InternalError("input pointer is NULL of VJdbcScanNode::open."); + } + + if (!_is_init) { + return Status::InternalError("used before initialize of VJdbcScanNode::open."); + } + + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(_jdbc_connector->open()); + RETURN_IF_ERROR(_jdbc_connector->query_exec()); + return Status::OK(); +} + +Status VJdbcScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { + VLOG_CRITICAL << "VJdbcScanNode::get_next"; + INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VJdbcScanNode::get_next"); + if (nullptr == state || nullptr == block || nullptr == eos) { + return Status::InternalError("input is NULL pointer"); + } + + if (!_is_init) { + return Status::InternalError("used before initialize of VJdbcScanNode::get_next."); + } + + auto column_size = _tuple_desc->slots().size(); + std::vector columns(column_size); + bool mem_reuse = block->mem_reuse(); + // only empty block should be here + DCHECK(block->rows() == 0); + + bool jdbc_eos = false; + do { + RETURN_IF_CANCELLED(state); + + columns.resize(column_size); + for (auto i = 0; i < column_size; i++) { + if (mem_reuse) { + columns[i] = std::move(*block->get_by_position(i).column).mutate(); + } else { + columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); + } + } + + RETURN_IF_ERROR(_jdbc_connector->get_next(&jdbc_eos, columns, state->batch_size())); + + if (jdbc_eos) { + *eos = true; + break; + } + + // Before really use the Block, must clear other ptr of column in block + // So here need do std::move and clear in `columns` + if (!mem_reuse) { + int column_index = 0; + for (const auto slot_desc : _tuple_desc->slots()) { + block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + } else { + columns.clear(); + } + VLOG_ROW << "VJdbcScanNode output rows: " << block->rows(); + } while (block->rows() == 0 && !(*eos)); + + RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns())); + reached_limit(block, eos); + return Status::OK(); +} + +Status VJdbcScanNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK(); + } + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJdbcScanNode::close"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return ExecNode::close(state); +} + +// No use +Status VJdbcScanNode::set_scan_ranges(const std::vector& scan_ranges) { + return Status::OK(); +} + +} // namespace vectorized +} // namespace doris + +#endif \ No newline at end of file diff --git a/be/src/vec/exec/vjdbc_scan_node.h b/be/src/vec/exec/vjdbc_scan_node.h new file mode 100644 index 0000000000..c9f13a544e --- /dev/null +++ b/be/src/vec/exec/vjdbc_scan_node.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#ifdef LIBJVM + +#include "exec/exec_node.h" +#include "exec/scan_node.h" +#include "vec/exec/vjdbc_connector.h" +namespace doris { + +namespace vectorized { +class VJdbcScanNode final : public ScanNode { +public: + VJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + ~VJdbcScanNode() override = default; + + Status prepare(RuntimeState* state) override; + + Status open(RuntimeState* state) override; + + Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override { + return Status::NotSupported("Not Implemented JdbcScanNode::get_next."); + } + + Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; + + Status close(RuntimeState* state) override; + + // No use + Status set_scan_ranges(const std::vector& scan_ranges) override; + +private: + std::string get_query_stmt(const std::string& table, const std::vector& fields, + const std::vector& filters, int64_t limit); + + bool _is_init; + std::string _table_name; + + // Tuple id resolved in prepare() to set _tuple_desc; + TupleId _tuple_id; + //SQL + std::string _query_string; + // Descriptor of tuples read from JDBC table. + const TupleDescriptor* _tuple_desc; + + // Scanner of JDBC. + std::unique_ptr _jdbc_connector; + JdbcConnectorParam _jdbc_param; +}; +} // namespace vectorized +} // namespace doris +#endif \ No newline at end of file diff --git a/docs/en/docs/ecosystem/external-table/jdbc-of-doris.md b/docs/en/docs/ecosystem/external-table/jdbc-of-doris.md new file mode 100644 index 0000000000..ede844c68e --- /dev/null +++ b/docs/en/docs/ecosystem/external-table/jdbc-of-doris.md @@ -0,0 +1,215 @@ +--- +{ + "title": "Doris On JDBC", + "language": "en" +} +--- + + + +# JDBC External Table Of Doris + +JDBC External Table Of Doris provides Doris to access external tables through the standard interface (JDBC) of database access. External tables save the tedious data import work, allowing Doris to have the ability to access various databases, and with the help of Doris's capabilities to solve data analysis problems with external tables: + +1. Support various data sources to access Doris +2. Supports Doris's joint query with tables in various data sources for more complex analysis operations + +This document mainly introduces how to use this function. + + +## Instructions + +### Create JDBC external table in Doris + +Specific table building syntax reference:[CREATE TABLE](../../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md) + + +#### 1. Create JDBC external table through JDBC_Resource +```sql +CREATE EXTERNAL RESOURCE jdbc_resource +properties ( + "type"="jdbc", + "user"="root", + "password"="123456", + "jdbc_url"="jdbc:mysql://192.168.0.1:3306/test", + "driver_url"="http://IP:port/mysql-connector-java-5.1.47.jar", + "driver_class"="com.mysql.jdbc.Driver" +); + +CREATE EXTERNAL TABLE `baseall_mysql` ( + `k1` tinyint(4) NULL, + `k2` smallint(6) NULL, + `k3` int(11) NULL, + `k4` bigint(20) NULL, + `k5` decimal(9, 3) NULL +) ENGINE=JDBC +PROPERTIES ( +"resource" = "jdbc_resource", +"table" = "baseall", +"table_type"="mysql" +); +``` +Parameter Description: + +| Parameter | Description | +| ---------------- | ----------------------------- | +| **type** | "jdbc", Required flag of Resource Type 。| +| **user** | Username used to access the external database。| +| **password** | Password information corresponding to the user。| +| **jdbc_url** | The URL protocol of JDBC, including database type, IP address, port number and database name, has different formats for different database protocols. for example mysql: "jdbc:mysql://127.0.0.1:3306/test"。| +| **driver_class** | The class name of the driver package for accessing the external database,for example mysql:com.mysql.jdbc.Driver. | +| **driver_url** | The package driver URL used to download and access external databases。http://IP:port/mysql-connector-java-5.1.47.jar .| +| **resource** | The resource name that depends on when creating the external table in Doris corresponds to the name when creating the resource in the previous step.| +| **table** | The table name mapped to the external database when creating the external table in Doris.| +| **table_type** | When creating an appearance in Doris, the table comes from that database. for example mysql,postgresql,sqlserver,oracle.| + +### Query usage + +``` +select * from mysql_table where k1 > 1000 and k3 ='term'; +``` + +#### 1.Mysql + +| Mysql Version | Mysql JDBC Driver Version | +| ------------- | ------------------------------- | +| 8.0.30 | mysql-connector-java-5.1.47.jar | + +#### 2.PostgreSQL +| PostgreSQL Version | PostgreSQL JDBC Driver Version | +| ------------------ | ------------------------------ | +| 14.5 | postgresql-42.5.0.jar | + +#### 2 SQLServer +| SQLserver Version | SQLserver JDBC Driver Version | +| ------------- | -------------------------- | +| 2022 | mssql-jdbc-11.2.0.jre8.jar | + +#### 2.oracle +| Oracle Version | Oracle JDBC Driver Version | +| ---------- | ------------------- | +| 11 | ojdbc6.jar | + +At present, only this version has been tested, and other versions will be added after testing + + +## Type matching + +There are different data types among different databases. Here is a list of the matching between the types in each database and the data types in Doris. + +### MySQL + +| MySQL | Doris | +| :------: | :------: | +| BOOLEAN | BOOLEAN | +| CHAR | CHAR | +| VARCHAR | VARCHAR | +| DATE | DATE | +| FLOAT | FLOAT | +| TINYINT | TINYINT | +| SMALLINT | SMALLINT | +| INT | INT | +| BIGINT | BIGINT | +| DOUBLE | DOUBLE | +| DATETIME | DATETIME | +| DECIMAL | DECIMAL | + + +### PostgreSQL + +| PostgreSQL | Doris | +| :--------------: | :------: | +| BOOLEAN | BOOLEAN | +| CHAR | CHAR | +| VARCHAR | VARCHAR | +| DATE | DATE | +| REAL | FLOAT | +| SMALLINT | SMALLINT | +| INT | INT | +| BIGINT | BIGINT | +| DOUBLE PRECISION | DOUBLE | +| TIMESTAMP | DATETIME | +| DECIMAL | DECIMAL | + +### Oracle + +| Oracle | Doris | +| :------: | :------: | +| CHAR | CHAR | +| VARCHAR | VARCHAR | +| DATE | DATETIME | +| SMALLINT | SMALLINT | +| INT | INT | +| NUMBER | DECIMAL | + + +### SQL server + +| SQLServer | Doris | +| :-------: | :------: | +| BIT | BOOLEAN | +| CHAR | CHAR | +| VARCHAR | VARCHAR | +| DATE | DATE | +| REAL | FLOAT | +| TINYINT | TINYINT | +| SMALLINT | SMALLINT | +| INT | INT | +| BIGINT | BIGINT | +| DATETIME | DATETIME | +| DECIMAL | DECIMAL | + + +## Q&A + +1. Besides mysql, Oracle, PostgreSQL, and SQL Server support more databases + +At present, Doris only adapts to MySQL, Oracle, SQL Server, and PostgreSQL. And planning to adapt other databases. In principle, any database that supports JDBC access can be accessed through the JDBC facade. If you need to access other appearances, you are welcome to modify the code and contribute to Doris. + +1. Read the Emoji expression on the surface of MySQL, and there is garbled code + +When Doris makes a JDBC appearance connection, because the default utf8 code in MySQL is utf8mb3, it cannot represent Emoji expressions that require 4-byte coding. Here, you need to set the code of the corresponding column to utf8mb4, set the server code to utf8mb4, and do not configure characterencoding in the JDBC URL when creating the MySQL appearance (this attribute does not support utf8mb4. If non utf8mb4 is configured, the expression cannot be written. Therefore, it should be left blank and not configured.) + + +``` +Configuration items can be modified globally + +Modify the my.ini file in the MySQL directory (the Linux system is the my.cnf file in the etc directory) +[client] +default-character-set=utf8mb4 + +[mysql] +Set MySQL default character set +default-character-set=utf8mb4 + +[mysqld] +Set up MySQL character set server +character-set-server=utf8mb4 +collation-server=utf8mb4_unicode_ci +init_connect='SET NAMES utf8mb4 + +Modify the type of corresponding table and column +ALTER TABLE table_name MODIFY colum_name VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +ALTER TABLE table_name CHARSET=utf8mb4; + +SET NAMES utf8mb4 + +``` diff --git a/docs/zh-CN/docs/ecosystem/external-table/jdbc-of-doris.md b/docs/zh-CN/docs/ecosystem/external-table/jdbc-of-doris.md new file mode 100644 index 0000000000..068a6a4d40 --- /dev/null +++ b/docs/zh-CN/docs/ecosystem/external-table/jdbc-of-doris.md @@ -0,0 +1,211 @@ +--- +{ + "title": "Doris On JDBC", + "language": "zh-CN" +} +--- + + + +# JDBC External Table Of Doris + +JDBC External Table Of Doris 提供了Doris通过数据库访问的标准接口(JDBC)来访问外部表,外部表省去了繁琐的数据导入工作,让Doris可以具有了访问各式数据库的能力,并借助Doris本身的OLAP的能力来解决外部表的数据分析问题: + +1. 支持各种数据源接入Doris +2. 支持Doris与各种数据源中的表联合查询,进行更加复杂的分析操作 + +本文档主要介绍该功能的使用方式等。 + +### Doris中创建JDBC的外表 + +具体建表语法参照:[CREATE TABLE](../../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md) + + +#### 1. 通过JDBC_Resource来创建JDBC外表 +```sql +CREATE EXTERNAL RESOURCE jdbc_resource +properties ( + "type"="jdbc", + "user"="root", + "password"="123456", + "jdbc_url"="jdbc:mysql://192.168.0.1:3306/test", + "driver_url"="http://IP:port/mysql-connector-java-5.1.47.jar", + "driver_class"="com.mysql.jdbc.Driver" +); + +CREATE EXTERNAL TABLE `baseall_mysql` ( + `k1` tinyint(4) NULL, + `k2` smallint(6) NULL, + `k3` int(11) NULL, + `k4` bigint(20) NULL, + `k5` decimal(9, 3) NULL +) ENGINE=JDBC +PROPERTIES ( +"resource" = "jdbc_resource", +"table" = "baseall", +"table_type"="mysql" +); +``` +参数说明: + +| 参数 | 说明 | +| ---------------- | ---------------------------------------------------------------------------------------------------------------------------------- | +| **type** | "jdbc", 必填项标志资源类型 | +| **user** | 访问外表数据库所使的用户名 | +| **password** | 该用户对应的密码信息 | +| **jdbc_url** | JDBC的URL协议,包括数据库类型,IP地址,端口号和数据库名,不同数据库协议格式不一样。例如mysql: "jdbc:mysql://127.0.0.1:3306/test"。 | +| **driver_class** | 访问外表数据库的驱动包类名,例如mysql是:com.mysql.jdbc.Driver. | +| **driver_url** | 用于下载访问外部数据库的jar包驱动URL。http://IP:port/mysql-connector-java-5.1.47.jar | +| **resource** | 在Doris中建立外表时依赖的资源名,对应上步创建资源时的名字。 | +| **table** | 在Doris中建立外表时,与外部数据库相映射的表名。 | +| **table_type** | 在Doris中建立外表时,该表来自那个数据库。例如mysql,postgresql,sqlserver,oracle | + +### 查询用法 + +``` +select * from mysql_table where k1 > 1000 and k3 ='term'; +``` + +#### 1.Mysql测试 + +| Mysql版本 | Mysql JDBC驱动版本 | +| --------- | ------------------------------- | +| 8.0.30 | mysql-connector-java-5.1.47.jar | + +#### 2.PostgreSQL测试 +| PostgreSQL版本 | PostgreSQL JDBC驱动版本 | +| -------------- | ----------------------- | +| 14.5 | postgresql-42.5.0.jar | + +#### 2 SQLServer测试 +| SQLserver版本 | SQLserver JDBC驱动版本 | +| ------------- | -------------------------- | +| 2022 | mssql-jdbc-11.2.0.jre8.jar | + +#### 2.oracle测试 +| Oracle版本 | Oracle JDBC驱动版本 | +| ---------- | ------------------- | +| 11 | ojdbc6.jar | + +目前只测试了这一个版本其他版本测试后补充 + + +## 类型匹配 + +各个数据库之间数据类型存在不同,这里列出了各个数据库中的类型和Doris之中数据类型匹配的情况。 + +### MySQL + +| MySQL | Doris | +| :------: | :------: | +| BOOLEAN | BOOLEAN | +| CHAR | CHAR | +| VARCHAR | VARCHAR | +| DATE | DATE | +| FLOAT | FLOAT | +| TINYINT | TINYINT | +| SMALLINT | SMALLINT | +| INT | INT | +| BIGINT | BIGINT | +| DOUBLE | DOUBLE | +| DATETIME | DATETIME | +| DECIMAL | DECIMAL | + + +### PostgreSQL + +| PostgreSQL | Doris | +| :--------------: | :------: | +| BOOLEAN | BOOLEAN | +| CHAR | CHAR | +| VARCHAR | VARCHAR | +| DATE | DATE | +| REAL | FLOAT | +| SMALLINT | SMALLINT | +| INT | INT | +| BIGINT | BIGINT | +| DOUBLE PRECISION | DOUBLE | +| TIMESTAMP | DATETIME | +| DECIMAL | DECIMAL | + +### Oracle + +| Oracle | Doris | +| :------: | :------: | +| CHAR | CHAR | +| VARCHAR | VARCHAR | +| DATE | DATETIME | +| SMALLINT | SMALLINT | +| INT | INT | +| NUMBER | DECIMAL | + + +### SQL server + +| SQLServer | Doris | +| :-------: | :------: | +| BIT | BOOLEAN | +| CHAR | CHAR | +| VARCHAR | VARCHAR | +| DATE | DATE | +| REAL | FLOAT | +| TINYINT | TINYINT | +| SMALLINT | SMALLINT | +| INT | INT | +| BIGINT | BIGINT | +| DATETIME | DATETIME | +| DECIMAL | DECIMAL | + +## Q&A + +1. 除了MySQL,Oracle,PostgreSQL,SQLServer是否能够支持更多的数据库 + + 目前Doris只适配了MySQL,PostgreSQL,SQLServer,Oracle.关于其他的数据库的适配工作正在规划之中,原则上来说任何支持JDBC访问的数据库都能通过JDBC外表来访问。如果您有访问其他外表的需求,欢迎修改代码并贡献给Doris。 + +2. 读写mysql外表的emoji表情出现乱码 + + Doris进行jdbc外表连接时,由于mysql之中默认的utf8编码为utf8mb3,无法表示需要4字节编码的emoji表情。这里需要在建立mysql外表时设置对应列的编码为utf8mb4,设置服务器编码为utf8mb4,JDBC Url中的characterEncoding不配置.(该属性不支持utf8mb4,配置了非utf8mb4将导致无法写入表情,因此要留空,不配置) + + +``` +可全局修改配置项 + +修改mysql目录下的my.ini文件(linux系统为etc目录下的my.cnf文件) +[client] +default-character-set=utf8mb4 + +[mysql] +设置mysql默认字符集 +default-character-set=utf8mb4 + +[mysqld] +设置mysql字符集服务器 +character-set-server=utf8mb4 +collation-server=utf8mb4_unicode_ci +init_connect='SET NAMES utf8mb4 + +修改对应表与列的类型 +ALTER TABLE table_name MODIFY colum_name VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +ALTER TABLE table_name CHARSET=utf8mb4; + +SET NAMES utf8mb4 + +``` diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 5345110ba1..a15fab5755 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -91,6 +91,7 @@ public class CreateTableStmt extends DdlStmt { engineNames.add("hive"); engineNames.add("iceberg"); engineNames.add("hudi"); + engineNames.add("jdbc"); } public CreateTableStmt() { @@ -514,7 +515,7 @@ public class CreateTableStmt extends DdlStmt { if (engineName.equals("mysql") || engineName.equals("odbc") || engineName.equals("broker") || engineName.equals("elasticsearch") || engineName.equals("hive") - || engineName.equals("iceberg") || engineName.equals("hudi")) { + || engineName.equals("iceberg") || engineName.equals("hudi") || engineName.equals("jdbc")) { if (!isExternal) { // this is for compatibility isExternal = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 3f891fe487..805f46b35a 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -2710,7 +2710,7 @@ public class Env { sb.append("CREATE "); if (table.getType() == TableType.ODBC || table.getType() == TableType.MYSQL || table.getType() == TableType.ELASTICSEARCH || table.getType() == TableType.BROKER - || table.getType() == TableType.HIVE) { + || table.getType() == TableType.HIVE || table.getType() == TableType.JDBC) { sb.append("EXTERNAL "); } sb.append("TABLE "); @@ -3052,6 +3052,13 @@ public class Env { sb.append("\nPROPERTIES (\n"); sb.append(new PrintableMap<>(hudiTable.getTableProperties(), " = ", true, true, false).toString()); sb.append("\n)"); + } else if (table.getType() == TableType.JDBC) { + JdbcTable jdbcTable = (JdbcTable) table; + addTableComment(jdbcTable, sb); + sb.append("\nPROPERTIES (\n"); + sb.append("\"resource\" = \"").append(jdbcTable.getResourceName()).append("\",\n"); + sb.append("\"table\" = \"").append(jdbcTable.getJdbcTable()).append("\""); + sb.append("\n)"); } createTableStmt.add(sb + ";"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java new file mode 100644 index 0000000000..883a624ad5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.proc.BaseProcResult; +import org.apache.doris.common.util.Util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; +import org.apache.commons.codec.binary.Hex; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.io.InputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Map; + + +/** + * External JDBC Catalog resource for external table query. + * + * create external resource jdbc_mysql + * properties ( + * "type"="jdbc", + * "user"="root", + * "password"="123456", + * "jdbc_url"="jdbc:mysql://127.0.0.1:3306/test", + * "driver_url"="http://127.0.0.1:8888/mysql-connector-java-5.1.47.jar", + * "driver_class"="com.mysql.jdbc.Driver" + * ); + * + * DROP RESOURCE "jdbc_mysql"; + */ +public class JdbcResource extends Resource { + public static final String URL = "jdbc_url"; + public static final String USER = "user"; + public static final String PASSWORD = "password"; + public static final String DRIVER_CLASS = "driver_class"; + public static final String DRIVER_URL = "driver_url"; + public static final String TYPE = "type"; + public static final String CHECK_SUM = "checksum"; + private static final Logger LOG = LogManager.getLogger(JdbcResource.class); + // timeout for both connection and read. 10 seconds is long enough. + private static final int HTTP_TIMEOUT_MS = 10000; + @SerializedName(value = "configs") + private Map configs; + + public JdbcResource() { + super(); + } + + public JdbcResource(String name) { + this(name, Maps.newHashMap()); + } + + private JdbcResource(String name, Map configs) { + super(name, ResourceType.JDBC); + this.configs = configs; + } + + public JdbcResource getCopiedResource() { + return new JdbcResource(name, Maps.newHashMap(configs)); + } + + private void checkProperties(String propertiesKey) throws DdlException { + // check the properties key + String value = configs.get(propertiesKey); + if (value == null) { + throw new DdlException("JdbcResource Missing " + propertiesKey + " in properties"); + } + } + + @Override + public void modifyProperties(Map properties) throws DdlException { + // modify properties + replaceIfEffectiveValue(this.configs, DRIVER_URL, properties.get(DRIVER_URL)); + replaceIfEffectiveValue(this.configs, DRIVER_CLASS, properties.get(DRIVER_CLASS)); + replaceIfEffectiveValue(this.configs, URL, properties.get(URL)); + replaceIfEffectiveValue(this.configs, USER, properties.get(USER)); + replaceIfEffectiveValue(this.configs, PASSWORD, properties.get(PASSWORD)); + replaceIfEffectiveValue(this.configs, TYPE, properties.get(TYPE)); + } + + @Override + public void checkProperties(Map properties) throws AnalysisException { + Map copiedProperties = Maps.newHashMap(properties); + // check properties + copiedProperties.remove(DRIVER_URL); + copiedProperties.remove(DRIVER_CLASS); + copiedProperties.remove(URL); + copiedProperties.remove(USER); + copiedProperties.remove(PASSWORD); + copiedProperties.remove(TYPE); + if (!copiedProperties.isEmpty()) { + throw new AnalysisException("Unknown JDBC catalog resource properties: " + copiedProperties); + } + } + + @Override + protected void setProperties(Map properties) throws DdlException { + Preconditions.checkState(properties != null); + for (String key : properties.keySet()) { + if (!DRIVER_URL.equals(key) && !URL.equals(key) && !USER.equals(key) && !PASSWORD.equals(key) + && !TYPE.equals(key) && !DRIVER_CLASS.equals(key)) { + throw new DdlException("JDBC resource Property of " + key + " is unknown"); + } + } + configs = properties; + computeObjectChecksum(); + checkProperties(DRIVER_URL); + checkProperties(DRIVER_CLASS); + checkProperties(URL); + checkProperties(USER); + checkProperties(PASSWORD); + checkProperties(TYPE); + } + + @Override + public Map getCopiedProperties() { + Map copiedProperties = Maps.newHashMap(configs); + return copiedProperties; + } + + @Override + protected void getProcNodeData(BaseProcResult result) { + String lowerCaseType = type.name().toLowerCase(); + for (Map.Entry entry : configs.entrySet()) { + // it's dangerous to show password in show jdbc resource + // so we use empty string to replace the real password + if (entry.getKey().equals(PASSWORD)) { + result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), "")); + } else { + result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue())); + } + } + } + + public String getProperty(String propertiesKey) { + // check the properties key + String value = configs.get(propertiesKey); + return value; + } + + private void computeObjectChecksum() throws DdlException { + if (FeConstants.runningUnitTest) { + // skip checking checksum when running ut + return; + } + + InputStream inputStream = null; + try { + inputStream = Util.getInputStreamFromUrl(getProperty(DRIVER_URL), null, HTTP_TIMEOUT_MS, HTTP_TIMEOUT_MS); + MessageDigest digest = MessageDigest.getInstance("MD5"); + byte[] buf = new byte[4096]; + int bytesRead = 0; + do { + bytesRead = inputStream.read(buf); + if (bytesRead < 0) { + break; + } + digest.update(buf, 0, bytesRead); + } while (true); + String checkSum = Hex.encodeHexString(digest.digest()); + configs.put(CHECK_SUM, checkSum); + } catch (IOException e) { + throw new DdlException( + "compute driver checksum from url: " + getProperty(DRIVER_URL) + " meet an IOException."); + } catch (NoSuchAlgorithmException e) { + throw new DdlException( + "compute driver checksum from url: " + getProperty(DRIVER_URL) + " could not find algorithm."); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java new file mode 100644 index 0000000000..1045ca746b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.catalog.Resource.ResourceType; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.io.DeepCopy; +import org.apache.doris.common.io.Text; +import org.apache.doris.thrift.TJdbcTable; +import org.apache.doris.thrift.TOdbcTableType; +import org.apache.doris.thrift.TTableDescriptor; +import org.apache.doris.thrift.TTableType; + +import com.google.common.base.Strings; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class JdbcTable extends Table { + private static final Logger LOG = LogManager.getLogger(JdbcTable.class); + + private static final String TABLE = "table"; + private static final String RESOURCE = "resource"; + private static final String TABLE_TYPE = "table_type"; + // TODO: We may have to support various types of databases like ODBC + // map now jdbc external table Doris support now + private static Map TABLE_TYPE_MAP; + private String resourceName; + private String externalTableName; + private String jdbcTypeName; + + static { + Map tempMap = new HashMap<>(); + tempMap.put("mysql", TOdbcTableType.MYSQL); + tempMap.put("postgresql", TOdbcTableType.POSTGRESQL); + tempMap.put("sqlserver", TOdbcTableType.SQLSERVER); + tempMap.put("oracle", TOdbcTableType.ORACLE); + TABLE_TYPE_MAP = Collections.unmodifiableMap(tempMap); + } + + public JdbcTable() { + super(TableType.JDBC); + } + + public JdbcTable(long id, String name, List schema, Map properties) + throws DdlException { + super(id, name, TableType.JDBC, schema); + validate(properties); + } + + @Override + public TTableDescriptor toThrift() { + TJdbcTable tJdbcTable = new TJdbcTable(); + JdbcResource jdbcResource = (JdbcResource) (Env.getCurrentEnv().getResourceMgr().getResource(resourceName)); + tJdbcTable.setJdbcUrl(jdbcResource.getProperty(JdbcResource.URL)); + tJdbcTable.setJdbcUser(jdbcResource.getProperty(JdbcResource.USER)); + tJdbcTable.setJdbcPassword(jdbcResource.getProperty(JdbcResource.PASSWORD)); + tJdbcTable.setJdbcTableName(externalTableName); + tJdbcTable.setJdbcDriverClass(jdbcResource.getProperty(JdbcResource.DRIVER_CLASS)); + tJdbcTable.setJdbcDriverUrl(jdbcResource.getProperty(JdbcResource.DRIVER_URL)); + tJdbcTable.setJdbcResourceName(jdbcResource.getName()); + tJdbcTable.setJdbcDriverChecksum(jdbcResource.getProperty(JdbcResource.CHECK_SUM)); + + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.JDBC_TABLE, fullSchema.size(), 0, + getName(), ""); + tTableDescriptor.setJdbcTable(tJdbcTable); + return tTableDescriptor; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, externalTableName); + Text.writeString(out, resourceName); + Text.writeString(out, jdbcTypeName); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + externalTableName = Text.readString(in); + resourceName = Text.readString(in); + jdbcTypeName = Text.readString(in); + } + + public String getResourceName() { + return resourceName; + } + + public String getJdbcTable() { + return externalTableName; + } + + public String getTableTypeName() { + return jdbcTypeName; + } + + public TOdbcTableType getJdbcTableType() { + return TABLE_TYPE_MAP.get(getTableTypeName()); + } + + @Override + public String getSignature(int signatureVersion) { + JdbcResource jdbcResource = (JdbcResource) (Env.getCurrentEnv().getResourceMgr().getResource(resourceName)); + StringBuilder sb = new StringBuilder(signatureVersion); + sb.append(name); + sb.append(type); + sb.append(resourceName); + sb.append(externalTableName); + sb.append(jdbcTypeName); + sb.append(jdbcResource.getProperty(JdbcResource.URL)); + sb.append(jdbcResource.getProperty(JdbcResource.USER)); + sb.append(jdbcResource.getProperty(JdbcResource.PASSWORD)); + sb.append(jdbcResource.getProperty(JdbcResource.DRIVER_CLASS)); + sb.append(jdbcResource.getProperty(JdbcResource.DRIVER_URL)); + + String md5 = DigestUtils.md5Hex(sb.toString()); + LOG.debug("get signature of odbc table {}: {}. signature string: {}", name, md5, sb.toString()); + return md5; + } + + @Override + public JdbcTable clone() { + JdbcTable copied = new JdbcTable(); + if (!DeepCopy.copy(this, copied, JdbcTable.class, FeConstants.meta_version)) { + LOG.warn("failed to copy jdbc table: " + getName()); + return null; + } + return copied; + } + + private void validate(Map properties) throws DdlException { + if (properties == null) { + throw new DdlException("Please set properties of jdbc table, " + + "they are: host, port, user, password, database and table"); + } + + externalTableName = properties.get(TABLE); + if (Strings.isNullOrEmpty(externalTableName)) { + throw new DdlException("property " + TABLE + " must be set"); + } + + resourceName = properties.get(RESOURCE); + if (Strings.isNullOrEmpty(resourceName)) { + throw new DdlException("property " + RESOURCE + " must be set"); + } + + jdbcTypeName = properties.get(TABLE_TYPE); + if (Strings.isNullOrEmpty(jdbcTypeName)) { + throw new DdlException("property " + TABLE_TYPE + " must be set"); + } + + Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(resourceName); + if (resource == null) { + throw new DdlException("jdbc resource [" + resourceName + "] not exists"); + } + if (resource.getType() != ResourceType.JDBC) { + throw new DdlException("resource [" + resourceName + "] is not jdbc resource"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java index fac6a30e45..2554bc589c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java @@ -44,7 +44,8 @@ public abstract class Resource implements Writable { UNKNOWN, SPARK, ODBC_CATALOG, - S3; + S3, + JDBC; public static ResourceType fromString(String resourceType) { for (ResourceType type : ResourceType.values()) { @@ -95,6 +96,9 @@ public abstract class Resource implements Writable { case S3: resource = new S3Resource(name); break; + case JDBC: + resource = new JdbcResource(name); + break; default: throw new DdlException("Unknown resource type: " + type); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java index fd783bf38b..ffe8571f97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java @@ -72,8 +72,9 @@ public class ResourceMgr implements Writable { public void createResource(CreateResourceStmt stmt) throws DdlException { if (stmt.getResourceType() != ResourceType.SPARK && stmt.getResourceType() != ResourceType.ODBC_CATALOG - && stmt.getResourceType() != ResourceType.S3) { - throw new DdlException("Only support SPARK, ODBC_CATALOG and REMOTE_STORAGE resource."); + && stmt.getResourceType() != ResourceType.S3 + && stmt.getResourceType() != ResourceType.JDBC) { + throw new DdlException("Only support SPARK, ODBC_CATALOG ,JDBC, and REMOTE_STORAGE resource."); } Resource resource = Resource.fromStmt(stmt); createResource(resource); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index e46080e4c9..02f9632903 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -347,6 +347,8 @@ public abstract class Table extends MetaObject implements Writable, TableIf { table = new IcebergTable(); } else if (type == TableType.HUDI) { table = new HudiTable(); + } else if (type == TableType.JDBC) { + table = new JdbcTable(); } else { throw new IOException("Unknown table type: " + type.name()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index ab8955ebe1..bb0097d5d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -99,7 +99,7 @@ public interface TableIf { * Doris table type. */ enum TableType { - MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, + MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, JDBC, TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW; public String toEngineName() { @@ -124,6 +124,8 @@ public interface TableIf { return "Hive"; case HUDI: return "Hudi"; + case JDBC: + return "jdbc"; case TABLE_VALUED_FUNCTION: return "Table_Valued_Function"; case HMS_EXTERNAL_TABLE: @@ -150,6 +152,7 @@ public interface TableIf { case ELASTICSEARCH: case HIVE: case HUDI: + case JDBC: case TABLE_VALUED_FUNCTION: case HMS_EXTERNAL_TABLE: case ES_EXTERNAL_TABLE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index fbfee9d842..63515d092c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -79,6 +79,7 @@ import org.apache.doris.catalog.HiveTable; import org.apache.doris.catalog.IcebergTable; import org.apache.doris.catalog.Index; import org.apache.doris.catalog.InfoSchemaDb; +import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.MaterializedIndex; @@ -1051,6 +1052,9 @@ public class InternalCatalog implements CatalogIf { } else if (engineName.equalsIgnoreCase("hudi")) { createHudiTable(db, stmt); return; + } else if (engineName.equalsIgnoreCase("jdbc")) { + createJdbcTable(db, stmt); + return; } else { ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName); } @@ -2234,6 +2238,21 @@ public class InternalCatalog implements CatalogIf { LOG.info("successfully create table[{}-{}]", tableName, tableId); } + private void createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException { + String tableName = stmt.getTableName(); + List columns = stmt.getColumns(); + + long tableId = Env.getCurrentEnv().getNextId(); + + JdbcTable jdbcTable = new JdbcTable(tableId, tableName, columns, stmt.getProperties()); + jdbcTable.setComment(stmt.getComment()); + // check table if exists + if (!db.createTableWithLock(jdbcTable, false, stmt.isSetIfNotExists()).first) { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); + } + LOG.info("successfully create table[{}-{}]", tableName, tableId); + } + private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState, DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta, Set tabletIdSet, IdGeneratorBuffer idGeneratorBuffer) throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 442eaed914..3bee6d59ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -23,6 +23,7 @@ import org.apache.doris.alter.SchemaChangeJobV2; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.JdbcResource; import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.OdbcCatalogResource; import org.apache.doris.catalog.RandomDistributionInfo; @@ -120,7 +121,8 @@ public class GsonUtils { .of(Resource.class, "clazz") .registerSubtype(SparkResource.class, SparkResource.class.getSimpleName()) .registerSubtype(OdbcCatalogResource.class, OdbcCatalogResource.class.getSimpleName()) - .registerSubtype(S3Resource.class, S3Resource.class.getSimpleName()); + .registerSubtype(S3Resource.class, S3Resource.class.getSimpleName()) + .registerSubtype(JdbcResource.class, JdbcResource.class.getSimpleName()); // runtime adapter for class "AlterJobV2" private static RuntimeTypeAdapterFactory alterJobV2TypeAdapterFactory = RuntimeTypeAdapterFactory diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcScanNode.java new file mode 100644 index 0000000000..09c2d56f4e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcScanNode.java @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ExprSubstitutionMap; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.JdbcTable; +import org.apache.doris.catalog.OdbcTable; +import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.statistics.StatsRecursiveDerive; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TJdbcScanNode; +import org.apache.doris.thrift.TOdbcTableType; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.base.Joiner; +import com.google.common.base.MoreObjects; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class JdbcScanNode extends ScanNode { + private static final Logger LOG = LogManager.getLogger(JdbcScanNode.class); + + private final List columns = new ArrayList(); + private final List filters = new ArrayList(); + private String tableName; + private TOdbcTableType jdbcType; + + public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, JdbcTable tbl) { + super(id, desc, "SCAN JDBC", StatisticalType.JDBC_SCAN_NODE); + jdbcType = tbl.getJdbcTableType(); + tableName = OdbcTable.databaseProperName(jdbcType, tbl.getJdbcTable()); + } + + @Override + public void init(Analyzer analyzer) throws UserException { + super.init(analyzer); + computeStats(analyzer); + } + + @Override + public List getScanRangeLocations(long maxScanRangeLength) { + return null; + } + + private void createJdbcFilters(Analyzer analyzer) { + if (conjuncts.isEmpty()) { + return; + } + + List slotRefs = Lists.newArrayList(); + Expr.collectList(conjuncts, SlotRef.class, slotRefs); + ExprSubstitutionMap sMap = new ExprSubstitutionMap(); + for (SlotRef slotRef : slotRefs) { + SlotRef slotRef1 = (SlotRef) slotRef.clone(); + slotRef1.setTblName(null); + slotRef1.setLabel(OdbcTable.databaseProperName(jdbcType, slotRef1.getColumnName())); + sMap.put(slotRef, slotRef1); + } + + ArrayList conjunctsList = Expr.cloneList(conjuncts, sMap); + for (Expr p : conjunctsList) { + if (OdbcScanNode.shouldPushDownConjunct(jdbcType, p)) { + String filter = p.toMySql(); + filters.add(filter); + conjuncts.remove(p); + } + } + } + + private void createJdbcColumns(Analyzer analyzer) { + for (SlotDescriptor slot : desc.getSlots()) { + if (!slot.isMaterialized()) { + continue; + } + Column col = slot.getColumn(); + columns.add(col.getName()); + } + if (0 == columns.size()) { + columns.add("*"); + } + } + + private boolean shouldPushDownLimit() { + return limit != -1 && conjuncts.isEmpty(); + } + + private String getJdbcQueryStr() { + StringBuilder sql = new StringBuilder("SELECT "); + + // Oracle use the where clause to do top n + if (shouldPushDownLimit() && jdbcType == TOdbcTableType.ORACLE) { + filters.add("ROWNUM <= " + limit); + } + + // MSSQL use select top to do top n + if (shouldPushDownLimit() && jdbcType == TOdbcTableType.SQLSERVER) { + sql.append("TOP " + limit + " "); + } + + sql.append(Joiner.on(", ").join(columns)); + sql.append(" FROM ").append(tableName); + + if (!filters.isEmpty()) { + sql.append(" WHERE ("); + sql.append(Joiner.on(") AND (").join(filters)); + sql.append(")"); + } + + // Other DataBase use limit do top n + if (shouldPushDownLimit() + && (jdbcType == TOdbcTableType.MYSQL + || jdbcType == TOdbcTableType.POSTGRESQL + || jdbcType == TOdbcTableType.MONGODB)) { + sql.append(" LIMIT ").append(limit); + } + + return sql.toString(); + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + output.append(prefix).append("TABLE: ").append(tableName).append("\n"); + if (detailLevel == TExplainLevel.BRIEF) { + return output.toString(); + } + output.append(prefix).append("QUERY: ").append(getJdbcQueryStr()).append("\n"); + return output.toString(); + } + + @Override + public void finalize(Analyzer analyzer) throws UserException { + // Convert predicates to Jdbc columns and filters. + createJdbcColumns(analyzer); + createJdbcFilters(analyzer); + } + + @Override + public void computeStats(Analyzer analyzer) throws UserException { + super.computeStats(analyzer); + // even if current node scan has no data,at least on backend will be assigned when the fragment actually execute + numNodes = numNodes <= 0 ? 1 : numNodes; + + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); + } + + @Override + protected void toThrift(TPlanNode msg) { + msg.node_type = TPlanNodeType.JDBC_SCAN_NODE; + msg.jdbc_scan_node = new TJdbcScanNode(); + msg.jdbc_scan_node.setTupleId(desc.getId().asInt()); + msg.jdbc_scan_node.setTableName(tableName); + msg.jdbc_scan_node.setQueryString(getJdbcQueryStr()); + } + + @Override + protected String debugString() { + MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this); + return helper.addValue(super.debugString()).toString(); + } + + @Override + public int getNumInstances() { + return 1; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java index 0ab7d915a4..bcc567d0e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java @@ -53,7 +53,7 @@ public class OdbcScanNode extends ScanNode { // Now some database have different function call like doris, now doris do not // push down the function call except MYSQL - private static boolean shouldPushDownConjunct(TOdbcTableType tableType, Expr expr) { + public static boolean shouldPushDownConjunct(TOdbcTableType tableType, Expr expr) { if (!tableType.equals(TOdbcTableType.MYSQL)) { List fnExprList = Lists.newArrayList(); expr.collect(FunctionCallExpr.class, fnExprList); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 1b1acce0ae..ebdf38370c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -55,6 +55,7 @@ import org.apache.doris.catalog.AggregateFunction; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FunctionSet; +import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.Table; @@ -1765,6 +1766,9 @@ public class SingleNodePlanner { scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HudiScanNode", null, -1); break; + case JDBC: + scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (JdbcTable) tblRef.getTable()); + break; case TABLE_VALUED_FUNCTION: scanNode = new TableValuedFunctionScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "TableValuedFunctionScanNode", ((TableValuedFunctionRef) tblRef).getTableFunction()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index 4b23337038..c8b94422b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -46,4 +46,5 @@ public enum StatisticalType { UNION_NODE, TABLE_VALUED_FUNCTION_NODE, FILE_SCAN_NODE, + JDBC_SCAN_NODE, } diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java new file mode 100644 index 0000000000..ce1e3d6df1 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + + +import org.apache.doris.thrift.TJdbcExecutorCtorParams; + +import org.apache.log4j.Logger; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; + +import java.net.MalformedURLException; +import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +public class JdbcExecutor { + private static final Logger LOG = Logger.getLogger(JdbcExecutor.class); + private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); + private URLClassLoader classLoader = null; + private Connection conn = null; + private Statement stmt = null; + private ResultSet resultSet = null; + private ResultSetMetaData resultSetMetaData = null; + + public JdbcExecutor(byte[] thriftParams) throws Exception { + TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); + TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY); + try { + deserializer.deserialize(request, thriftParams); + } catch (TException e) { + throw new InternalException(e.getMessage()); + } + init(request.jar_location_path, request.jdbc_driver_class, request.jdbc_url, request.jdbc_user, + request.jdbc_password); + } + + public void close() throws Exception { + if (resultSet != null) { + resultSet.close(); + } + if (stmt != null) { + stmt.close(); + } + if (conn != null) { + conn.close(); + } + if (classLoader != null) { + classLoader.close(); + } + } + + + public int querySQL(String sql) throws UdfRuntimeException { + try { + boolean res = stmt.execute(sql); + if (res) { // sql query + resultSet = stmt.getResultSet(); + resultSetMetaData = resultSet.getMetaData(); + return resultSetMetaData.getColumnCount(); + } else { //TODO: update query + return 0; + } + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor sql has error: ", e); + } + } + + public List> getBlock(int batchSize) throws UdfRuntimeException { + List> block = null; + try { + int columnCount = resultSetMetaData.getColumnCount(); + block = new ArrayList<>(columnCount); + for (int i = 0; i < columnCount; ++i) { + block.add(new ArrayList<>(batchSize)); + } + int numRows = 0; + do { + for (int i = 0; i < columnCount; ++i) { + block.get(i).add(resultSet.getObject(i + 1)); + } + numRows++; + } while (numRows < batchSize && resultSet.next()); + } catch (SQLException e) { + throw new UdfRuntimeException("get next block failed: ", e); + } catch (Exception e) { + throw new UdfRuntimeException("unable to get next : ", e); + } + return block; + } + + public boolean hasNext() throws UdfRuntimeException { + try { + if (resultSet == null) { + return false; + } + return resultSet.next(); + } catch (SQLException e) { + throw new UdfRuntimeException("resultSet to get next error: ", e); + } + } + + public long convertDateToLong(Object obj) { + LocalDate date = ((Date) obj).toLocalDate(); + long time = UdfUtils.convertDateTimeToLong(date.getYear(), date.getMonthValue(), date.getDayOfMonth(), + 0, 0, 0, true); + return time; + } + + public long convertDateTimeToLong(Object obj) { + LocalDateTime date = ((Timestamp) obj).toLocalDateTime(); + long time = UdfUtils.convertDateTimeToLong(date.getYear(), date.getMonthValue(), date.getDayOfMonth(), + date.getHour(), date.getMinute(), date.getSecond(), false); + return time; + } + + private void init(String driverPath, String driverClass, String jdbcUrl, String jdbcUser, String jdbcPassword) + throws UdfRuntimeException { + try { + ClassLoader loader; + if (driverPath != null) { + ClassLoader parent = getClass().getClassLoader(); + classLoader = UdfUtils.getClassLoader(driverPath, parent); + loader = classLoader; + } else { + loader = ClassLoader.getSystemClassLoader(); + } + Class.forName(driverClass, true, loader); + conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); + stmt = conn.createStatement(); + } catch (MalformedURLException e) { + throw new UdfRuntimeException("MalformedURLException to load class about " + driverPath, e); + } catch (ClassNotFoundException e) { + throw new UdfRuntimeException("Loading JDBC class error ClassNotFoundException about " + driverClass, e); + } catch (SQLException e) { + throw new UdfRuntimeException("Connection JDBC class error about " + jdbcUrl, e); + } catch (Exception e) { + throw new UdfRuntimeException("unable to init jdbc executor Exception ", e); + } + } +} diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 8f1b75c298..ff365db7c4 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -274,6 +274,17 @@ struct THudiTable { 3: optional map properties } +struct TJdbcTable { + 1: optional string jdbc_url + 2: optional string jdbc_table_name + 3: optional string jdbc_user + 4: optional string jdbc_password + 5: optional string jdbc_driver_url + 6: optional string jdbc_resource_name + 7: optional string jdbc_driver_class + 8: optional string jdbc_driver_checksum +} + // "Union" of all table types. struct TTableDescriptor { 1: required Types.TTableId id @@ -295,6 +306,7 @@ struct TTableDescriptor { 17: optional THiveTable hiveTable 18: optional TIcebergTable icebergTable 19: optional THudiTable hudiTable + 20: optional TJdbcTable jdbcTable } struct TDescriptorTable { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index b214eeb496..13c8452293 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -55,6 +55,7 @@ enum TPlanNodeType { TABLE_FUNCTION_NODE, TABLE_VALUED_FUNCTION_SCAN_NODE, FILE_SCAN_NODE, + JDBC_SCAN_NODE, } // phases of an execution node @@ -312,6 +313,12 @@ struct TOdbcScanNode { 8: optional string query_string } +struct TJdbcScanNode { + 1: optional Types.TTupleId tuple_id + 2: optional string table_name + 3: optional string query_string +} + struct TBrokerScanNode { 1: required Types.TTupleId tuple_id @@ -887,6 +894,7 @@ struct TPlanNode { // file scan node 44: optional TFileScanNode file_scan_node + 45: optional TJdbcScanNode jdbc_scan_node 101: optional list projections 102: optional Types.TTupleId output_tuple_id diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index ecf3268de0..b5ba5e6752 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -356,6 +356,23 @@ struct TFunction { 13: optional bool vectorized = false } +struct TJdbcExecutorCtorParams { + // Local path to the UDF's jar file + 1: optional string jar_location_path + + // "jdbc:mysql://127.0.0.1:3307/test"; + 2: optional string jdbc_url + + //root + 3: optional string jdbc_user + + //password + 4: optional string jdbc_password + + //"com.mysql.jdbc.Driver" + 5: optional string jdbc_driver_class +} + struct TJavaUdfExecutorCtorParams { 1: optional TFunction fn @@ -517,7 +534,8 @@ enum TTableType { ODBC_TABLE, HIVE_TABLE, ICEBERG_TABLE, - HUDI_TABLE + HUDI_TABLE, + JDBC_TABLE } enum TOdbcTableType {