[Feature](Vectorized) support jdbc scan node (#12010)
This commit is contained in:
@ -73,6 +73,7 @@
|
||||
#include "vec/exec/vexcept_node.h"
|
||||
#include "vec/exec/vexchange_node.h"
|
||||
#include "vec/exec/vintersect_node.h"
|
||||
#include "vec/exec/vjdbc_scan_node.h"
|
||||
#include "vec/exec/vmysql_scan_node.h"
|
||||
#include "vec/exec/vodbc_scan_node.h"
|
||||
#include "vec/exec/volap_scan_node.h"
|
||||
@ -417,6 +418,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
|
||||
case TPlanNodeType::BROKER_SCAN_NODE:
|
||||
case TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE:
|
||||
case TPlanNodeType::FILE_SCAN_NODE:
|
||||
case TPlanNodeType::JDBC_SCAN_NODE:
|
||||
break;
|
||||
default: {
|
||||
const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
|
||||
@ -452,6 +454,18 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
case TPlanNodeType::JDBC_SCAN_NODE:
|
||||
if (state->enable_vectorized_exec()) {
|
||||
#ifdef LIBJVM
|
||||
*node = pool->add(new vectorized::VJdbcScanNode(pool, tnode, descs));
|
||||
#else
|
||||
return Status::InternalError("Jdbc scan node is disabled since no libjvm is found!");
|
||||
#endif
|
||||
} else {
|
||||
return Status::InternalError("Jdbc scan node only support vectorized engine.");
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
case TPlanNodeType::ES_HTTP_SCAN_NODE:
|
||||
if (state->enable_vectorized_exec()) {
|
||||
*node = pool->add(new vectorized::VEsHttpScanNode(pool, tnode, descs));
|
||||
|
||||
@ -20,6 +20,8 @@
|
||||
|
||||
#include "runtime/descriptors.h"
|
||||
|
||||
#include <gen_cpp/Types_types.h>
|
||||
|
||||
#include <boost/algorithm/string/join.hpp>
|
||||
#include <ios>
|
||||
#include <sstream>
|
||||
@ -221,6 +223,29 @@ std::string ODBCTableDescriptor::debug_string() const {
|
||||
return out.str();
|
||||
}
|
||||
|
||||
JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc)
|
||||
: TableDescriptor(tdesc),
|
||||
_jdbc_resource_name(tdesc.jdbcTable.jdbc_resource_name),
|
||||
_jdbc_driver_url(tdesc.jdbcTable.jdbc_driver_url),
|
||||
_jdbc_driver_class(tdesc.jdbcTable.jdbc_driver_class),
|
||||
_jdbc_driver_checksum(tdesc.jdbcTable.jdbc_driver_checksum),
|
||||
_jdbc_url(tdesc.jdbcTable.jdbc_url),
|
||||
_jdbc_table_name(tdesc.jdbcTable.jdbc_table_name),
|
||||
_jdbc_user(tdesc.jdbcTable.jdbc_user),
|
||||
_jdbc_passwd(tdesc.jdbcTable.jdbc_password) {}
|
||||
|
||||
std::string JdbcTableDescriptor::debug_string() const {
|
||||
fmt::memory_buffer buf;
|
||||
fmt::format_to(buf,
|
||||
"JDBCTable({} ,_jdbc_resource_name={} ,_jdbc_driver_url={} "
|
||||
",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} "
|
||||
",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={})",
|
||||
TableDescriptor::debug_string(), _jdbc_resource_name, _jdbc_driver_url,
|
||||
_jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url, _jdbc_table_name,
|
||||
_jdbc_user, _jdbc_passwd);
|
||||
return fmt::to_string(buf);
|
||||
}
|
||||
|
||||
TupleDescriptor::TupleDescriptor(const TTupleDescriptor& tdesc)
|
||||
: _id(tdesc.id),
|
||||
_table_desc(nullptr),
|
||||
@ -563,6 +588,9 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb
|
||||
case TTableType::ICEBERG_TABLE:
|
||||
desc = pool->add(new IcebergTableDescriptor(tdesc));
|
||||
break;
|
||||
case TTableType::JDBC_TABLE:
|
||||
desc = pool->add(new JdbcTableDescriptor(tdesc));
|
||||
break;
|
||||
default:
|
||||
DCHECK(false) << "invalid table type: " << tdesc.tableType;
|
||||
}
|
||||
|
||||
@ -275,6 +275,30 @@ private:
|
||||
TOdbcTableType::type _type;
|
||||
};
|
||||
|
||||
class JdbcTableDescriptor : public TableDescriptor {
|
||||
public:
|
||||
JdbcTableDescriptor(const TTableDescriptor& tdesc);
|
||||
std::string debug_string() const override;
|
||||
const std::string& jdbc_resource_name() const { return _jdbc_resource_name; }
|
||||
const std::string& jdbc_driver_url() const { return _jdbc_driver_url; }
|
||||
const std::string& jdbc_driver_class() const { return _jdbc_driver_class; }
|
||||
const std::string& jdbc_driver_checksum() const { return _jdbc_driver_checksum; }
|
||||
const std::string& jdbc_url() const { return _jdbc_url; }
|
||||
const std::string& jdbc_table_name() const { return _jdbc_table_name; }
|
||||
const std::string& jdbc_user() const { return _jdbc_user; }
|
||||
const std::string& jdbc_passwd() const { return _jdbc_passwd; }
|
||||
|
||||
private:
|
||||
std::string _jdbc_resource_name;
|
||||
std::string _jdbc_driver_url;
|
||||
std::string _jdbc_driver_class;
|
||||
std::string _jdbc_driver_checksum;
|
||||
std::string _jdbc_url;
|
||||
std::string _jdbc_table_name;
|
||||
std::string _jdbc_user;
|
||||
std::string _jdbc_passwd;
|
||||
};
|
||||
|
||||
class TupleDescriptor {
|
||||
public:
|
||||
// virtual ~TupleDescriptor() {}
|
||||
|
||||
@ -726,7 +726,7 @@ bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) {
|
||||
type == TPlanNodeType::BROKER_SCAN_NODE || type == TPlanNodeType::ES_SCAN_NODE ||
|
||||
type == TPlanNodeType::ES_HTTP_SCAN_NODE || type == TPlanNodeType::ODBC_SCAN_NODE ||
|
||||
type == TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE ||
|
||||
type == TPlanNodeType::FILE_SCAN_NODE;
|
||||
type == TPlanNodeType::FILE_SCAN_NODE || type == TPlanNodeType::JDBC_SCAN_NODE;
|
||||
}
|
||||
|
||||
Status FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason,
|
||||
|
||||
@ -18,7 +18,6 @@
|
||||
#include "runtime/user_function_cache.h"
|
||||
|
||||
#include <atomic>
|
||||
#include <boost/algorithm/string/predicate.hpp> // boost::algorithm::ends_with
|
||||
#include <regex>
|
||||
#include <vector>
|
||||
|
||||
@ -27,6 +26,7 @@
|
||||
#include "http/http_client.h"
|
||||
#include "util/dynamic_util.h"
|
||||
#include "util/file_utils.h"
|
||||
#include "util/string_util.h"
|
||||
#ifdef LIBJVM
|
||||
#include "util/jni-util.h"
|
||||
#endif
|
||||
@ -97,7 +97,7 @@ UserFunctionCacheEntry::~UserFunctionCacheEntry() {
|
||||
}
|
||||
}
|
||||
|
||||
UserFunctionCache::UserFunctionCache() {}
|
||||
UserFunctionCache::UserFunctionCache() = default;
|
||||
|
||||
UserFunctionCache::~UserFunctionCache() {
|
||||
std::lock_guard<std::mutex> l(_cache_lock);
|
||||
@ -130,8 +130,13 @@ Status UserFunctionCache::init(const std::string& lib_dir) {
|
||||
}
|
||||
|
||||
Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std::string& file) {
|
||||
if (!boost::algorithm::ends_with(file, ".so")) {
|
||||
return Status::InternalError("unknown library file format");
|
||||
LibType lib_type;
|
||||
if (ends_with(file, ".so")) {
|
||||
lib_type = LibType::SO;
|
||||
} else if (ends_with(file, ".jar")) {
|
||||
lib_type = LibType::JAR;
|
||||
} else {
|
||||
return Status::InternalError("unknown library file format: " + file);
|
||||
}
|
||||
|
||||
std::vector<std::string> split_parts = strings::Split(file, ".");
|
||||
@ -149,7 +154,7 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std
|
||||
}
|
||||
// create a cache entry and put it into entry map
|
||||
UserFunctionCacheEntry* entry =
|
||||
new UserFunctionCacheEntry(function_id, checksum, dir + "/" + file, LibType::SO);
|
||||
new UserFunctionCacheEntry(function_id, checksum, dir + "/" + file, lib_type);
|
||||
entry->is_downloaded = true;
|
||||
|
||||
entry->ref();
|
||||
@ -255,7 +260,6 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
|
||||
} else {
|
||||
entry = new UserFunctionCacheEntry(fid, checksum, _make_lib_file(fid, checksum, type),
|
||||
type);
|
||||
|
||||
entry->ref();
|
||||
_entry_map.emplace(fid, entry);
|
||||
}
|
||||
@ -343,7 +347,7 @@ Status UserFunctionCache::_download_lib(const std::string& url, UserFunctionCach
|
||||
RETURN_IF_ERROR(client.execute(download_cb));
|
||||
RETURN_IF_ERROR(status);
|
||||
digest.digest();
|
||||
if (!boost::iequals(digest.hex(), entry->checksum)) {
|
||||
if (!iequal(digest.hex(), entry->checksum)) {
|
||||
LOG(WARNING) << "UDF's checksum is not equal, one=" << digest.hex()
|
||||
<< ", other=" << entry->checksum;
|
||||
return Status::InternalError("UDF's library checksum is not match");
|
||||
|
||||
@ -107,6 +107,8 @@ set(VEC_FILES
|
||||
exec/vbroker_scan_node.cpp
|
||||
exec/vbroker_scanner.cpp
|
||||
exec/vjson_scanner.cpp
|
||||
exec/vjdbc_connector.cpp
|
||||
exec/vjdbc_scan_node.cpp
|
||||
exec/vparquet_scanner.cpp
|
||||
exec/vorc_scanner.cpp
|
||||
exec/join/vhash_join_node.cpp
|
||||
|
||||
344
be/src/vec/exec/vjdbc_connector.cpp
Normal file
344
be/src/vec/exec/vjdbc_connector.cpp
Normal file
@ -0,0 +1,344 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "vec/exec/vjdbc_connector.h"
|
||||
#ifdef LIBJVM
|
||||
#include "gen_cpp/Types_types.h"
|
||||
#include "gutil/strings/substitute.h"
|
||||
#include "jni.h"
|
||||
#include "runtime/user_function_cache.h"
|
||||
#include "util/jni-util.h"
|
||||
#include "vec/columns/column_nullable.h"
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/udf/JdbcExecutor";
|
||||
const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V";
|
||||
const char* JDBC_EXECUTOR_QUERYSQL_SIGNATURE = "(Ljava/lang/String;)I";
|
||||
const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
|
||||
const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;";
|
||||
const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V";
|
||||
const char* JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE = "(Ljava/lang/Object;)J";
|
||||
const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;)J";
|
||||
|
||||
JdbcConnector::JdbcConnector(const JdbcConnectorParam& param)
|
||||
: _is_open(false),
|
||||
_query_string(param.query_string),
|
||||
_tuple_desc(param.tuple_desc),
|
||||
_conn_param(param) {}
|
||||
|
||||
JdbcConnector::~JdbcConnector() {
|
||||
if (!_is_open) {
|
||||
return;
|
||||
}
|
||||
JNIEnv* env;
|
||||
Status status;
|
||||
RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
|
||||
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_close_id);
|
||||
RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
|
||||
env->DeleteGlobalRef(_executor_obj);
|
||||
}
|
||||
|
||||
Status JdbcConnector::open() {
|
||||
if (_is_open) {
|
||||
LOG(INFO) << "this scanner of jdbc already opened";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
JNIEnv* env = nullptr;
|
||||
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JDBC_EXECUTOR_CLASS, &_executor_clazz));
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/List", &_executor_list_clazz));
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Object", &_executor_object_clazz));
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Boolean", &_executor_uint8_t_clazz));
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Byte", &_executor_int8_t_clazz));
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Short", &_executor_int16_t_clazz));
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Integer", &_executor_int32_t_clazz));
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Long", &_executor_int64_t_clazz));
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_float_clazz));
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_double_clazz));
|
||||
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/String", &_executor_string_clazz));
|
||||
RETURN_IF_ERROR(_register_func_id(env));
|
||||
|
||||
// Add a scoped cleanup jni reference object. This cleans up local refs made below.
|
||||
JniLocalFrame jni_frame;
|
||||
{
|
||||
std::string local_location;
|
||||
std::hash<std::string> hash_str;
|
||||
auto function_cache = UserFunctionCache::instance();
|
||||
RETURN_IF_ERROR(function_cache->get_jarpath(
|
||||
std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path,
|
||||
_conn_param.driver_checksum, &local_location));
|
||||
TJdbcExecutorCtorParams ctor_params;
|
||||
ctor_params.__set_jar_location_path(local_location);
|
||||
ctor_params.__set_jdbc_url(_conn_param.jdbc_url);
|
||||
ctor_params.__set_jdbc_user(_conn_param.user);
|
||||
ctor_params.__set_jdbc_password(_conn_param.passwd);
|
||||
ctor_params.__set_jdbc_driver_class(_conn_param.driver_class);
|
||||
|
||||
jbyteArray ctor_params_bytes;
|
||||
// Pushed frame will be popped when jni_frame goes out-of-scope.
|
||||
RETURN_IF_ERROR(jni_frame.push(env));
|
||||
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
|
||||
_executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes);
|
||||
}
|
||||
RETURN_ERROR_IF_EXC(env);
|
||||
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _executor_obj, &_executor_obj));
|
||||
_is_open = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status JdbcConnector::query_exec() {
|
||||
if (!_is_open) {
|
||||
return Status::InternalError("Query before open of JdbcConnector.");
|
||||
}
|
||||
// check materialize num equal
|
||||
int materialize_num = 0;
|
||||
for (int i = 0; i < _tuple_desc->slots().size(); ++i) {
|
||||
if (_tuple_desc->slots()[i]->is_materialized()) {
|
||||
materialize_num++;
|
||||
}
|
||||
}
|
||||
|
||||
JNIEnv* env = nullptr;
|
||||
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
||||
jstring query_sql = env->NewStringUTF(_query_string.c_str());
|
||||
jint colunm_count = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz,
|
||||
_executor_query_id, query_sql);
|
||||
env->DeleteLocalRef(query_sql);
|
||||
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
|
||||
|
||||
if (colunm_count != materialize_num) {
|
||||
return Status::InternalError("input and output column num not equal of jdbc query.");
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns, int batch_size) {
|
||||
if (!_is_open) {
|
||||
return Status::InternalError("get_next before open if jdbc.");
|
||||
}
|
||||
JNIEnv* env = nullptr;
|
||||
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
||||
jboolean has_next =
|
||||
env->CallNonvirtualBooleanMethod(_executor_obj, _executor_clazz, _executor_has_next_id);
|
||||
if (has_next != JNI_TRUE) {
|
||||
*eos = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
jobject block_obj = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz,
|
||||
_executor_get_blocks_id, batch_size);
|
||||
|
||||
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
|
||||
|
||||
auto column_size = _tuple_desc->slots().size();
|
||||
for (int column_index = 0, materialized_column_index = 0; column_index < column_size;
|
||||
++column_index) {
|
||||
auto slot_desc = _tuple_desc->slots()[column_index];
|
||||
// because the fe planner filter the non_materialize column
|
||||
if (!slot_desc->is_materialized()) {
|
||||
continue;
|
||||
}
|
||||
jobject column_data =
|
||||
env->CallObjectMethod(block_obj, _executor_get_list_id, materialized_column_index);
|
||||
jint num_rows = env->CallIntMethod(column_data, _executor_get_list_size_id);
|
||||
|
||||
for (int row = 0; row < num_rows; ++row) {
|
||||
jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row);
|
||||
_convert_column_data(env, cur_data, slot_desc, columns[column_index].get());
|
||||
}
|
||||
|
||||
materialized_column_index++;
|
||||
}
|
||||
return JniUtil::GetJniExceptionMsg(env);
|
||||
}
|
||||
|
||||
Status JdbcConnector::_register_func_id(JNIEnv* env) {
|
||||
auto register_id = [&](jclass clazz, const char* func_name, const char* func_sign,
|
||||
jmethodID& func_id) {
|
||||
func_id = env->GetMethodID(clazz, func_name, func_sign);
|
||||
Status s = JniUtil::GetJniExceptionMsg(env);
|
||||
if (!s.ok()) {
|
||||
return Status::InternalError(strings::Substitute(
|
||||
"Jdbc connector _register_func_id meet error and error is $0",
|
||||
s.get_error_msg()));
|
||||
}
|
||||
return s;
|
||||
};
|
||||
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "<init>", JDBC_EXECUTOR_CTOR_SIGNATURE,
|
||||
_executor_ctor_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "querySQL", JDBC_EXECUTOR_QUERYSQL_SIGNATURE,
|
||||
_executor_query_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE,
|
||||
_executor_close_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE,
|
||||
_executor_has_next_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", JDBC_EXECUTOR_GET_BLOCK_SIGNATURE,
|
||||
_executor_get_blocks_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateToLong",
|
||||
JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE, _executor_convert_date_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateTimeToLong",
|
||||
JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE,
|
||||
_executor_convert_datetime_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_list_clazz, "get", "(I)Ljava/lang/Object;",
|
||||
_executor_get_list_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_list_clazz, "size", "()I", _executor_get_list_size_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_string_clazz, "getBytes", "(Ljava/lang/String;)[B",
|
||||
_get_bytes_id));
|
||||
RETURN_IF_ERROR(
|
||||
register_id(_executor_object_clazz, "toString", "()Ljava/lang/String;", _to_string_id));
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
|
||||
const SlotDescriptor* slot_desc,
|
||||
vectorized::IColumn* column_ptr) {
|
||||
vectorized::IColumn* col_ptr = column_ptr;
|
||||
if (true == slot_desc->is_nullable()) {
|
||||
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
|
||||
if (jobj == nullptr) {
|
||||
nullable_column->insert_data(nullptr, 0);
|
||||
return Status::OK();
|
||||
} else {
|
||||
nullable_column->get_null_map_data().push_back(0);
|
||||
col_ptr = &nullable_column->get_nested_column();
|
||||
}
|
||||
}
|
||||
|
||||
switch (slot_desc->type().type) {
|
||||
case TYPE_BOOLEAN: {
|
||||
uint8_t num = _jobject_to_uint8_t(env, jobj);
|
||||
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(
|
||||
(uint8_t)num);
|
||||
break;
|
||||
}
|
||||
case TYPE_TINYINT: {
|
||||
int8_t num = _jobject_to_int8_t(env, jobj);
|
||||
reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(num);
|
||||
break;
|
||||
}
|
||||
case TYPE_SMALLINT: {
|
||||
int16_t num = _jobject_to_int16_t(env, jobj);
|
||||
reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(num);
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_INT: {
|
||||
int32_t num = _jobject_to_int32_t(env, jobj);
|
||||
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(num);
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_BIGINT: {
|
||||
int64_t num = _jobject_to_int64_t(env, jobj);
|
||||
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num);
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_FLOAT: {
|
||||
float num = _jobject_to_float(env, jobj);
|
||||
reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(
|
||||
num);
|
||||
break;
|
||||
}
|
||||
case TYPE_DOUBLE: {
|
||||
double num = _jobject_to_double(env, jobj);
|
||||
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
|
||||
num);
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_STRING:
|
||||
case TYPE_CHAR:
|
||||
case TYPE_VARCHAR: {
|
||||
std::string data = _jobject_to_string(env, jobj);
|
||||
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data.c_str(),
|
||||
data.length());
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_DATE: {
|
||||
int64_t num = _jobject_to_date(env, jobj);
|
||||
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num);
|
||||
break;
|
||||
}
|
||||
case TYPE_DATETIME: {
|
||||
int64_t num = _jobject_to_datetime(env, jobj);
|
||||
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num);
|
||||
break;
|
||||
}
|
||||
case TYPE_DECIMALV2: {
|
||||
std::string data = _jobject_to_string(env, jobj);
|
||||
DecimalV2Value decimal_slot;
|
||||
decimal_slot.parse_from_str(data.c_str(), data.length());
|
||||
reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(
|
||||
decimal_slot.value());
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
std::string error_msg =
|
||||
fmt::format("Fail to convert jdbc value to {} on column: {}.",
|
||||
slot_desc->type().debug_string(), slot_desc->col_name());
|
||||
return Status::InternalError(std::string(error_msg));
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) {
|
||||
jobject jstr = env->CallObjectMethod(jobj, _to_string_id);
|
||||
const jbyteArray stringJbytes =
|
||||
(jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, env->NewStringUTF("UTF-8"));
|
||||
size_t length = (size_t)env->GetArrayLength(stringJbytes);
|
||||
jbyte* pBytes = env->GetByteArrayElements(stringJbytes, nullptr);
|
||||
std::string str = std::string((char*)pBytes, length);
|
||||
env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT);
|
||||
env->DeleteLocalRef(stringJbytes);
|
||||
return str;
|
||||
}
|
||||
|
||||
int64_t JdbcConnector::_jobject_to_date(JNIEnv* env, jobject jobj) {
|
||||
return env->CallNonvirtualLongMethod(_executor_obj, _executor_clazz, _executor_convert_date_id,
|
||||
jobj);
|
||||
}
|
||||
|
||||
int64_t JdbcConnector::_jobject_to_datetime(JNIEnv* env, jobject jobj) {
|
||||
return env->CallNonvirtualLongMethod(_executor_obj, _executor_clazz,
|
||||
_executor_convert_datetime_id, jobj);
|
||||
}
|
||||
|
||||
#define FUNC_IMPL_TO_CONVERT_DATA(cpp_return_type, java_type, sig, java_return_type) \
|
||||
cpp_return_type JdbcConnector::_jobject_to_##cpp_return_type(JNIEnv* env, jobject jobj) { \
|
||||
jmethodID method_id_##cpp_return_type = env->GetMethodID( \
|
||||
_executor_##cpp_return_type##_clazz, #java_type "Value", "()" #sig); \
|
||||
return env->Call##java_return_type##Method(jobj, method_id_##cpp_return_type); \
|
||||
}
|
||||
|
||||
FUNC_IMPL_TO_CONVERT_DATA(uint8_t, boolean, Z, Boolean)
|
||||
FUNC_IMPL_TO_CONVERT_DATA(int8_t, byte, B, Byte)
|
||||
FUNC_IMPL_TO_CONVERT_DATA(int16_t, short, S, Short)
|
||||
FUNC_IMPL_TO_CONVERT_DATA(int32_t, int, I, Int)
|
||||
FUNC_IMPL_TO_CONVERT_DATA(int64_t, long, J, Long)
|
||||
FUNC_IMPL_TO_CONVERT_DATA(float, float, F, Float)
|
||||
FUNC_IMPL_TO_CONVERT_DATA(double, double, D, Double)
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
#endif
|
||||
98
be/src/vec/exec/vjdbc_connector.h
Normal file
98
be/src/vec/exec/vjdbc_connector.h
Normal file
@ -0,0 +1,98 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
#ifdef LIBJVM
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "jni.h"
|
||||
#include "runtime/descriptors.h"
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
struct JdbcConnectorParam {
|
||||
std::string driver_path;
|
||||
std::string driver_class;
|
||||
std::string resource_name;
|
||||
std::string driver_checksum;
|
||||
std::string jdbc_url;
|
||||
std::string user;
|
||||
std::string passwd;
|
||||
std::string query_string;
|
||||
|
||||
const TupleDescriptor* tuple_desc;
|
||||
};
|
||||
|
||||
class JdbcConnector {
|
||||
public:
|
||||
JdbcConnector(const JdbcConnectorParam& param);
|
||||
|
||||
~JdbcConnector();
|
||||
|
||||
Status open();
|
||||
|
||||
Status query_exec();
|
||||
|
||||
Status get_next(bool* eos, std::vector<MutableColumnPtr>& columns, int batch_size);
|
||||
|
||||
private:
|
||||
Status _register_func_id(JNIEnv* env);
|
||||
Status _convert_column_data(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc,
|
||||
vectorized::IColumn* column_ptr);
|
||||
std::string _jobject_to_string(JNIEnv* env, jobject jobj);
|
||||
int64_t _jobject_to_date(JNIEnv* env, jobject jobj);
|
||||
int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj);
|
||||
|
||||
bool _is_open;
|
||||
std::string _query_string;
|
||||
const TupleDescriptor* _tuple_desc;
|
||||
const JdbcConnectorParam _conn_param;
|
||||
jclass _executor_clazz;
|
||||
jclass _executor_list_clazz;
|
||||
jclass _executor_object_clazz;
|
||||
jclass _executor_string_clazz;
|
||||
jobject _executor_obj;
|
||||
jmethodID _executor_ctor_id;
|
||||
jmethodID _executor_query_id;
|
||||
jmethodID _executor_has_next_id;
|
||||
jmethodID _executor_get_blocks_id;
|
||||
jmethodID _executor_close_id;
|
||||
jmethodID _executor_get_list_id;
|
||||
jmethodID _executor_get_list_size_id;
|
||||
jmethodID _executor_convert_date_id;
|
||||
jmethodID _executor_convert_datetime_id;
|
||||
jmethodID _get_bytes_id;
|
||||
jmethodID _to_string_id;
|
||||
|
||||
#define FUNC_VARI_DECLARE(RETURN_TYPE) \
|
||||
RETURN_TYPE _jobject_to_##RETURN_TYPE(JNIEnv* env, jobject jobj); \
|
||||
jclass _executor_##RETURN_TYPE##_clazz;
|
||||
|
||||
FUNC_VARI_DECLARE(uint8_t)
|
||||
FUNC_VARI_DECLARE(int8_t)
|
||||
FUNC_VARI_DECLARE(int16_t)
|
||||
FUNC_VARI_DECLARE(int32_t)
|
||||
FUNC_VARI_DECLARE(int64_t)
|
||||
FUNC_VARI_DECLARE(float)
|
||||
FUNC_VARI_DECLARE(double)
|
||||
};
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
#endif
|
||||
173
be/src/vec/exec/vjdbc_scan_node.cpp
Normal file
173
be/src/vec/exec/vjdbc_scan_node.cpp
Normal file
@ -0,0 +1,173 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
#include "vec/exec/vjdbc_scan_node.h"
|
||||
#ifdef LIBJVM
|
||||
#include <string>
|
||||
|
||||
#include "common/status.h"
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
|
||||
VJdbcScanNode::VJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
|
||||
: ScanNode(pool, tnode, descs),
|
||||
_is_init(false),
|
||||
_table_name(tnode.jdbc_scan_node.table_name),
|
||||
_tuple_id(tnode.jdbc_scan_node.tuple_id),
|
||||
_query_string(tnode.jdbc_scan_node.query_string),
|
||||
_tuple_desc(nullptr) {}
|
||||
|
||||
Status VJdbcScanNode::prepare(RuntimeState* state) {
|
||||
VLOG_CRITICAL << "VJdbcScanNode::Prepare";
|
||||
if (_is_init) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (state == nullptr) {
|
||||
return Status::InternalError("input pointer is NULL of VJdbcScanNode::prepare.");
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(ScanNode::prepare(state));
|
||||
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
||||
|
||||
// get tuple desc
|
||||
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
|
||||
if (_tuple_desc == nullptr) {
|
||||
return Status::InternalError("Failed to get tuple descriptor.");
|
||||
}
|
||||
|
||||
// get jdbc table info
|
||||
const JdbcTableDescriptor* jdbc_table =
|
||||
static_cast<const JdbcTableDescriptor*>(_tuple_desc->table_desc());
|
||||
if (jdbc_table == nullptr) {
|
||||
return Status::InternalError("jdbc table pointer is NULL of VJdbcScanNode::prepare.");
|
||||
}
|
||||
_jdbc_param.driver_class = jdbc_table->jdbc_driver_class();
|
||||
_jdbc_param.driver_path = jdbc_table->jdbc_driver_url();
|
||||
_jdbc_param.resource_name = jdbc_table->jdbc_resource_name();
|
||||
_jdbc_param.driver_checksum = jdbc_table->jdbc_driver_checksum();
|
||||
_jdbc_param.jdbc_url = jdbc_table->jdbc_url();
|
||||
_jdbc_param.user = jdbc_table->jdbc_user();
|
||||
_jdbc_param.passwd = jdbc_table->jdbc_passwd();
|
||||
_jdbc_param.tuple_desc = _tuple_desc;
|
||||
_jdbc_param.query_string = std::move(_query_string);
|
||||
|
||||
_jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param));
|
||||
if (_jdbc_connector == nullptr) {
|
||||
return Status::InternalError("new a jdbc scanner failed.");
|
||||
}
|
||||
|
||||
_is_init = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VJdbcScanNode::open(RuntimeState* state) {
|
||||
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJdbcScanNode::open");
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_ERROR(ExecNode::open(state));
|
||||
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
||||
VLOG_CRITICAL << "VJdbcScanNode::open";
|
||||
|
||||
if (state == nullptr) {
|
||||
return Status::InternalError("input pointer is NULL of VJdbcScanNode::open.");
|
||||
}
|
||||
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("used before initialize of VJdbcScanNode::open.");
|
||||
}
|
||||
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(_jdbc_connector->open());
|
||||
RETURN_IF_ERROR(_jdbc_connector->query_exec());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VJdbcScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
|
||||
VLOG_CRITICAL << "VJdbcScanNode::get_next";
|
||||
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VJdbcScanNode::get_next");
|
||||
if (nullptr == state || nullptr == block || nullptr == eos) {
|
||||
return Status::InternalError("input is NULL pointer");
|
||||
}
|
||||
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("used before initialize of VJdbcScanNode::get_next.");
|
||||
}
|
||||
|
||||
auto column_size = _tuple_desc->slots().size();
|
||||
std::vector<MutableColumnPtr> columns(column_size);
|
||||
bool mem_reuse = block->mem_reuse();
|
||||
// only empty block should be here
|
||||
DCHECK(block->rows() == 0);
|
||||
|
||||
bool jdbc_eos = false;
|
||||
do {
|
||||
RETURN_IF_CANCELLED(state);
|
||||
|
||||
columns.resize(column_size);
|
||||
for (auto i = 0; i < column_size; i++) {
|
||||
if (mem_reuse) {
|
||||
columns[i] = std::move(*block->get_by_position(i).column).mutate();
|
||||
} else {
|
||||
columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
|
||||
}
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(_jdbc_connector->get_next(&jdbc_eos, columns, state->batch_size()));
|
||||
|
||||
if (jdbc_eos) {
|
||||
*eos = true;
|
||||
break;
|
||||
}
|
||||
|
||||
// Before really use the Block, must clear other ptr of column in block
|
||||
// So here need do std::move and clear in `columns`
|
||||
if (!mem_reuse) {
|
||||
int column_index = 0;
|
||||
for (const auto slot_desc : _tuple_desc->slots()) {
|
||||
block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]),
|
||||
slot_desc->get_data_type_ptr(),
|
||||
slot_desc->col_name()));
|
||||
}
|
||||
} else {
|
||||
columns.clear();
|
||||
}
|
||||
VLOG_ROW << "VJdbcScanNode output rows: " << block->rows();
|
||||
} while (block->rows() == 0 && !(*eos));
|
||||
|
||||
RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns()));
|
||||
reached_limit(block, eos);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VJdbcScanNode::close(RuntimeState* state) {
|
||||
if (is_closed()) {
|
||||
return Status::OK();
|
||||
}
|
||||
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJdbcScanNode::close");
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
return ExecNode::close(state);
|
||||
}
|
||||
|
||||
// No use
|
||||
Status VJdbcScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
#endif
|
||||
68
be/src/vec/exec/vjdbc_scan_node.h
Normal file
68
be/src/vec/exec/vjdbc_scan_node.h
Normal file
@ -0,0 +1,68 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
#ifdef LIBJVM
|
||||
|
||||
#include "exec/exec_node.h"
|
||||
#include "exec/scan_node.h"
|
||||
#include "vec/exec/vjdbc_connector.h"
|
||||
namespace doris {
|
||||
|
||||
namespace vectorized {
|
||||
class VJdbcScanNode final : public ScanNode {
|
||||
public:
|
||||
VJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
|
||||
~VJdbcScanNode() override = default;
|
||||
|
||||
Status prepare(RuntimeState* state) override;
|
||||
|
||||
Status open(RuntimeState* state) override;
|
||||
|
||||
Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
|
||||
return Status::NotSupported("Not Implemented JdbcScanNode::get_next.");
|
||||
}
|
||||
|
||||
Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
|
||||
|
||||
Status close(RuntimeState* state) override;
|
||||
|
||||
// No use
|
||||
Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
|
||||
|
||||
private:
|
||||
std::string get_query_stmt(const std::string& table, const std::vector<std::string>& fields,
|
||||
const std::vector<std::string>& filters, int64_t limit);
|
||||
|
||||
bool _is_init;
|
||||
std::string _table_name;
|
||||
|
||||
// Tuple id resolved in prepare() to set _tuple_desc;
|
||||
TupleId _tuple_id;
|
||||
//SQL
|
||||
std::string _query_string;
|
||||
// Descriptor of tuples read from JDBC table.
|
||||
const TupleDescriptor* _tuple_desc;
|
||||
|
||||
// Scanner of JDBC.
|
||||
std::unique_ptr<JdbcConnector> _jdbc_connector;
|
||||
JdbcConnectorParam _jdbc_param;
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
#endif
|
||||
215
docs/en/docs/ecosystem/external-table/jdbc-of-doris.md
Normal file
215
docs/en/docs/ecosystem/external-table/jdbc-of-doris.md
Normal file
@ -0,0 +1,215 @@
|
||||
---
|
||||
{
|
||||
"title": "Doris On JDBC",
|
||||
"language": "en"
|
||||
}
|
||||
---
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# JDBC External Table Of Doris
|
||||
|
||||
JDBC External Table Of Doris provides Doris to access external tables through the standard interface (JDBC) of database access. External tables save the tedious data import work, allowing Doris to have the ability to access various databases, and with the help of Doris's capabilities to solve data analysis problems with external tables:
|
||||
|
||||
1. Support various data sources to access Doris
|
||||
2. Supports Doris's joint query with tables in various data sources for more complex analysis operations
|
||||
|
||||
This document mainly introduces how to use this function.
|
||||
|
||||
|
||||
## Instructions
|
||||
|
||||
### Create JDBC external table in Doris
|
||||
|
||||
Specific table building syntax reference:[CREATE TABLE](../../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md)
|
||||
|
||||
|
||||
#### 1. Create JDBC external table through JDBC_Resource
|
||||
```sql
|
||||
CREATE EXTERNAL RESOURCE jdbc_resource
|
||||
properties (
|
||||
"type"="jdbc",
|
||||
"user"="root",
|
||||
"password"="123456",
|
||||
"jdbc_url"="jdbc:mysql://192.168.0.1:3306/test",
|
||||
"driver_url"="http://IP:port/mysql-connector-java-5.1.47.jar",
|
||||
"driver_class"="com.mysql.jdbc.Driver"
|
||||
);
|
||||
|
||||
CREATE EXTERNAL TABLE `baseall_mysql` (
|
||||
`k1` tinyint(4) NULL,
|
||||
`k2` smallint(6) NULL,
|
||||
`k3` int(11) NULL,
|
||||
`k4` bigint(20) NULL,
|
||||
`k5` decimal(9, 3) NULL
|
||||
) ENGINE=JDBC
|
||||
PROPERTIES (
|
||||
"resource" = "jdbc_resource",
|
||||
"table" = "baseall",
|
||||
"table_type"="mysql"
|
||||
);
|
||||
```
|
||||
Parameter Description:
|
||||
|
||||
| Parameter | Description |
|
||||
| ---------------- | ----------------------------- |
|
||||
| **type** | "jdbc", Required flag of Resource Type 。|
|
||||
| **user** | Username used to access the external database。|
|
||||
| **password** | Password information corresponding to the user。|
|
||||
| **jdbc_url** | The URL protocol of JDBC, including database type, IP address, port number and database name, has different formats for different database protocols. for example mysql: "jdbc:mysql://127.0.0.1:3306/test"。|
|
||||
| **driver_class** | The class name of the driver package for accessing the external database,for example mysql:com.mysql.jdbc.Driver. |
|
||||
| **driver_url** | The package driver URL used to download and access external databases。http://IP:port/mysql-connector-java-5.1.47.jar .|
|
||||
| **resource** | The resource name that depends on when creating the external table in Doris corresponds to the name when creating the resource in the previous step.|
|
||||
| **table** | The table name mapped to the external database when creating the external table in Doris.|
|
||||
| **table_type** | When creating an appearance in Doris, the table comes from that database. for example mysql,postgresql,sqlserver,oracle.|
|
||||
|
||||
### Query usage
|
||||
|
||||
```
|
||||
select * from mysql_table where k1 > 1000 and k3 ='term';
|
||||
```
|
||||
|
||||
#### 1.Mysql
|
||||
|
||||
| Mysql Version | Mysql JDBC Driver Version |
|
||||
| ------------- | ------------------------------- |
|
||||
| 8.0.30 | mysql-connector-java-5.1.47.jar |
|
||||
|
||||
#### 2.PostgreSQL
|
||||
| PostgreSQL Version | PostgreSQL JDBC Driver Version |
|
||||
| ------------------ | ------------------------------ |
|
||||
| 14.5 | postgresql-42.5.0.jar |
|
||||
|
||||
#### 2 SQLServer
|
||||
| SQLserver Version | SQLserver JDBC Driver Version |
|
||||
| ------------- | -------------------------- |
|
||||
| 2022 | mssql-jdbc-11.2.0.jre8.jar |
|
||||
|
||||
#### 2.oracle
|
||||
| Oracle Version | Oracle JDBC Driver Version |
|
||||
| ---------- | ------------------- |
|
||||
| 11 | ojdbc6.jar |
|
||||
|
||||
At present, only this version has been tested, and other versions will be added after testing
|
||||
|
||||
|
||||
## Type matching
|
||||
|
||||
There are different data types among different databases. Here is a list of the matching between the types in each database and the data types in Doris.
|
||||
|
||||
### MySQL
|
||||
|
||||
| MySQL | Doris |
|
||||
| :------: | :------: |
|
||||
| BOOLEAN | BOOLEAN |
|
||||
| CHAR | CHAR |
|
||||
| VARCHAR | VARCHAR |
|
||||
| DATE | DATE |
|
||||
| FLOAT | FLOAT |
|
||||
| TINYINT | TINYINT |
|
||||
| SMALLINT | SMALLINT |
|
||||
| INT | INT |
|
||||
| BIGINT | BIGINT |
|
||||
| DOUBLE | DOUBLE |
|
||||
| DATETIME | DATETIME |
|
||||
| DECIMAL | DECIMAL |
|
||||
|
||||
|
||||
### PostgreSQL
|
||||
|
||||
| PostgreSQL | Doris |
|
||||
| :--------------: | :------: |
|
||||
| BOOLEAN | BOOLEAN |
|
||||
| CHAR | CHAR |
|
||||
| VARCHAR | VARCHAR |
|
||||
| DATE | DATE |
|
||||
| REAL | FLOAT |
|
||||
| SMALLINT | SMALLINT |
|
||||
| INT | INT |
|
||||
| BIGINT | BIGINT |
|
||||
| DOUBLE PRECISION | DOUBLE |
|
||||
| TIMESTAMP | DATETIME |
|
||||
| DECIMAL | DECIMAL |
|
||||
|
||||
### Oracle
|
||||
|
||||
| Oracle | Doris |
|
||||
| :------: | :------: |
|
||||
| CHAR | CHAR |
|
||||
| VARCHAR | VARCHAR |
|
||||
| DATE | DATETIME |
|
||||
| SMALLINT | SMALLINT |
|
||||
| INT | INT |
|
||||
| NUMBER | DECIMAL |
|
||||
|
||||
|
||||
### SQL server
|
||||
|
||||
| SQLServer | Doris |
|
||||
| :-------: | :------: |
|
||||
| BIT | BOOLEAN |
|
||||
| CHAR | CHAR |
|
||||
| VARCHAR | VARCHAR |
|
||||
| DATE | DATE |
|
||||
| REAL | FLOAT |
|
||||
| TINYINT | TINYINT |
|
||||
| SMALLINT | SMALLINT |
|
||||
| INT | INT |
|
||||
| BIGINT | BIGINT |
|
||||
| DATETIME | DATETIME |
|
||||
| DECIMAL | DECIMAL |
|
||||
|
||||
|
||||
## Q&A
|
||||
|
||||
1. Besides mysql, Oracle, PostgreSQL, and SQL Server support more databases
|
||||
|
||||
At present, Doris only adapts to MySQL, Oracle, SQL Server, and PostgreSQL. And planning to adapt other databases. In principle, any database that supports JDBC access can be accessed through the JDBC facade. If you need to access other appearances, you are welcome to modify the code and contribute to Doris.
|
||||
|
||||
1. Read the Emoji expression on the surface of MySQL, and there is garbled code
|
||||
|
||||
When Doris makes a JDBC appearance connection, because the default utf8 code in MySQL is utf8mb3, it cannot represent Emoji expressions that require 4-byte coding. Here, you need to set the code of the corresponding column to utf8mb4, set the server code to utf8mb4, and do not configure characterencoding in the JDBC URL when creating the MySQL appearance (this attribute does not support utf8mb4. If non utf8mb4 is configured, the expression cannot be written. Therefore, it should be left blank and not configured.)
|
||||
|
||||
|
||||
```
|
||||
Configuration items can be modified globally
|
||||
|
||||
Modify the my.ini file in the MySQL directory (the Linux system is the my.cnf file in the etc directory)
|
||||
[client]
|
||||
default-character-set=utf8mb4
|
||||
|
||||
[mysql]
|
||||
Set MySQL default character set
|
||||
default-character-set=utf8mb4
|
||||
|
||||
[mysqld]
|
||||
Set up MySQL character set server
|
||||
character-set-server=utf8mb4
|
||||
collation-server=utf8mb4_unicode_ci
|
||||
init_connect='SET NAMES utf8mb4
|
||||
|
||||
Modify the type of corresponding table and column
|
||||
ALTER TABLE table_name MODIFY colum_name VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
|
||||
ALTER TABLE table_name CHARSET=utf8mb4;
|
||||
|
||||
SET NAMES utf8mb4
|
||||
|
||||
```
|
||||
211
docs/zh-CN/docs/ecosystem/external-table/jdbc-of-doris.md
Normal file
211
docs/zh-CN/docs/ecosystem/external-table/jdbc-of-doris.md
Normal file
@ -0,0 +1,211 @@
|
||||
---
|
||||
{
|
||||
"title": "Doris On JDBC",
|
||||
"language": "zh-CN"
|
||||
}
|
||||
---
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# JDBC External Table Of Doris
|
||||
|
||||
JDBC External Table Of Doris 提供了Doris通过数据库访问的标准接口(JDBC)来访问外部表,外部表省去了繁琐的数据导入工作,让Doris可以具有了访问各式数据库的能力,并借助Doris本身的OLAP的能力来解决外部表的数据分析问题:
|
||||
|
||||
1. 支持各种数据源接入Doris
|
||||
2. 支持Doris与各种数据源中的表联合查询,进行更加复杂的分析操作
|
||||
|
||||
本文档主要介绍该功能的使用方式等。
|
||||
|
||||
### Doris中创建JDBC的外表
|
||||
|
||||
具体建表语法参照:[CREATE TABLE](../../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md)
|
||||
|
||||
|
||||
#### 1. 通过JDBC_Resource来创建JDBC外表
|
||||
```sql
|
||||
CREATE EXTERNAL RESOURCE jdbc_resource
|
||||
properties (
|
||||
"type"="jdbc",
|
||||
"user"="root",
|
||||
"password"="123456",
|
||||
"jdbc_url"="jdbc:mysql://192.168.0.1:3306/test",
|
||||
"driver_url"="http://IP:port/mysql-connector-java-5.1.47.jar",
|
||||
"driver_class"="com.mysql.jdbc.Driver"
|
||||
);
|
||||
|
||||
CREATE EXTERNAL TABLE `baseall_mysql` (
|
||||
`k1` tinyint(4) NULL,
|
||||
`k2` smallint(6) NULL,
|
||||
`k3` int(11) NULL,
|
||||
`k4` bigint(20) NULL,
|
||||
`k5` decimal(9, 3) NULL
|
||||
) ENGINE=JDBC
|
||||
PROPERTIES (
|
||||
"resource" = "jdbc_resource",
|
||||
"table" = "baseall",
|
||||
"table_type"="mysql"
|
||||
);
|
||||
```
|
||||
参数说明:
|
||||
|
||||
| 参数 | 说明 |
|
||||
| ---------------- | ---------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| **type** | "jdbc", 必填项标志资源类型 |
|
||||
| **user** | 访问外表数据库所使的用户名 |
|
||||
| **password** | 该用户对应的密码信息 |
|
||||
| **jdbc_url** | JDBC的URL协议,包括数据库类型,IP地址,端口号和数据库名,不同数据库协议格式不一样。例如mysql: "jdbc:mysql://127.0.0.1:3306/test"。 |
|
||||
| **driver_class** | 访问外表数据库的驱动包类名,例如mysql是:com.mysql.jdbc.Driver. |
|
||||
| **driver_url** | 用于下载访问外部数据库的jar包驱动URL。http://IP:port/mysql-connector-java-5.1.47.jar |
|
||||
| **resource** | 在Doris中建立外表时依赖的资源名,对应上步创建资源时的名字。 |
|
||||
| **table** | 在Doris中建立外表时,与外部数据库相映射的表名。 |
|
||||
| **table_type** | 在Doris中建立外表时,该表来自那个数据库。例如mysql,postgresql,sqlserver,oracle |
|
||||
|
||||
### 查询用法
|
||||
|
||||
```
|
||||
select * from mysql_table where k1 > 1000 and k3 ='term';
|
||||
```
|
||||
|
||||
#### 1.Mysql测试
|
||||
|
||||
| Mysql版本 | Mysql JDBC驱动版本 |
|
||||
| --------- | ------------------------------- |
|
||||
| 8.0.30 | mysql-connector-java-5.1.47.jar |
|
||||
|
||||
#### 2.PostgreSQL测试
|
||||
| PostgreSQL版本 | PostgreSQL JDBC驱动版本 |
|
||||
| -------------- | ----------------------- |
|
||||
| 14.5 | postgresql-42.5.0.jar |
|
||||
|
||||
#### 2 SQLServer测试
|
||||
| SQLserver版本 | SQLserver JDBC驱动版本 |
|
||||
| ------------- | -------------------------- |
|
||||
| 2022 | mssql-jdbc-11.2.0.jre8.jar |
|
||||
|
||||
#### 2.oracle测试
|
||||
| Oracle版本 | Oracle JDBC驱动版本 |
|
||||
| ---------- | ------------------- |
|
||||
| 11 | ojdbc6.jar |
|
||||
|
||||
目前只测试了这一个版本其他版本测试后补充
|
||||
|
||||
|
||||
## 类型匹配
|
||||
|
||||
各个数据库之间数据类型存在不同,这里列出了各个数据库中的类型和Doris之中数据类型匹配的情况。
|
||||
|
||||
### MySQL
|
||||
|
||||
| MySQL | Doris |
|
||||
| :------: | :------: |
|
||||
| BOOLEAN | BOOLEAN |
|
||||
| CHAR | CHAR |
|
||||
| VARCHAR | VARCHAR |
|
||||
| DATE | DATE |
|
||||
| FLOAT | FLOAT |
|
||||
| TINYINT | TINYINT |
|
||||
| SMALLINT | SMALLINT |
|
||||
| INT | INT |
|
||||
| BIGINT | BIGINT |
|
||||
| DOUBLE | DOUBLE |
|
||||
| DATETIME | DATETIME |
|
||||
| DECIMAL | DECIMAL |
|
||||
|
||||
|
||||
### PostgreSQL
|
||||
|
||||
| PostgreSQL | Doris |
|
||||
| :--------------: | :------: |
|
||||
| BOOLEAN | BOOLEAN |
|
||||
| CHAR | CHAR |
|
||||
| VARCHAR | VARCHAR |
|
||||
| DATE | DATE |
|
||||
| REAL | FLOAT |
|
||||
| SMALLINT | SMALLINT |
|
||||
| INT | INT |
|
||||
| BIGINT | BIGINT |
|
||||
| DOUBLE PRECISION | DOUBLE |
|
||||
| TIMESTAMP | DATETIME |
|
||||
| DECIMAL | DECIMAL |
|
||||
|
||||
### Oracle
|
||||
|
||||
| Oracle | Doris |
|
||||
| :------: | :------: |
|
||||
| CHAR | CHAR |
|
||||
| VARCHAR | VARCHAR |
|
||||
| DATE | DATETIME |
|
||||
| SMALLINT | SMALLINT |
|
||||
| INT | INT |
|
||||
| NUMBER | DECIMAL |
|
||||
|
||||
|
||||
### SQL server
|
||||
|
||||
| SQLServer | Doris |
|
||||
| :-------: | :------: |
|
||||
| BIT | BOOLEAN |
|
||||
| CHAR | CHAR |
|
||||
| VARCHAR | VARCHAR |
|
||||
| DATE | DATE |
|
||||
| REAL | FLOAT |
|
||||
| TINYINT | TINYINT |
|
||||
| SMALLINT | SMALLINT |
|
||||
| INT | INT |
|
||||
| BIGINT | BIGINT |
|
||||
| DATETIME | DATETIME |
|
||||
| DECIMAL | DECIMAL |
|
||||
|
||||
## Q&A
|
||||
|
||||
1. 除了MySQL,Oracle,PostgreSQL,SQLServer是否能够支持更多的数据库
|
||||
|
||||
目前Doris只适配了MySQL,PostgreSQL,SQLServer,Oracle.关于其他的数据库的适配工作正在规划之中,原则上来说任何支持JDBC访问的数据库都能通过JDBC外表来访问。如果您有访问其他外表的需求,欢迎修改代码并贡献给Doris。
|
||||
|
||||
2. 读写mysql外表的emoji表情出现乱码
|
||||
|
||||
Doris进行jdbc外表连接时,由于mysql之中默认的utf8编码为utf8mb3,无法表示需要4字节编码的emoji表情。这里需要在建立mysql外表时设置对应列的编码为utf8mb4,设置服务器编码为utf8mb4,JDBC Url中的characterEncoding不配置.(该属性不支持utf8mb4,配置了非utf8mb4将导致无法写入表情,因此要留空,不配置)
|
||||
|
||||
|
||||
```
|
||||
可全局修改配置项
|
||||
|
||||
修改mysql目录下的my.ini文件(linux系统为etc目录下的my.cnf文件)
|
||||
[client]
|
||||
default-character-set=utf8mb4
|
||||
|
||||
[mysql]
|
||||
设置mysql默认字符集
|
||||
default-character-set=utf8mb4
|
||||
|
||||
[mysqld]
|
||||
设置mysql字符集服务器
|
||||
character-set-server=utf8mb4
|
||||
collation-server=utf8mb4_unicode_ci
|
||||
init_connect='SET NAMES utf8mb4
|
||||
|
||||
修改对应表与列的类型
|
||||
ALTER TABLE table_name MODIFY colum_name VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
|
||||
ALTER TABLE table_name CHARSET=utf8mb4;
|
||||
|
||||
SET NAMES utf8mb4
|
||||
|
||||
```
|
||||
@ -91,6 +91,7 @@ public class CreateTableStmt extends DdlStmt {
|
||||
engineNames.add("hive");
|
||||
engineNames.add("iceberg");
|
||||
engineNames.add("hudi");
|
||||
engineNames.add("jdbc");
|
||||
}
|
||||
|
||||
public CreateTableStmt() {
|
||||
@ -514,7 +515,7 @@ public class CreateTableStmt extends DdlStmt {
|
||||
|
||||
if (engineName.equals("mysql") || engineName.equals("odbc") || engineName.equals("broker")
|
||||
|| engineName.equals("elasticsearch") || engineName.equals("hive")
|
||||
|| engineName.equals("iceberg") || engineName.equals("hudi")) {
|
||||
|| engineName.equals("iceberg") || engineName.equals("hudi") || engineName.equals("jdbc")) {
|
||||
if (!isExternal) {
|
||||
// this is for compatibility
|
||||
isExternal = true;
|
||||
|
||||
@ -2710,7 +2710,7 @@ public class Env {
|
||||
sb.append("CREATE ");
|
||||
if (table.getType() == TableType.ODBC || table.getType() == TableType.MYSQL
|
||||
|| table.getType() == TableType.ELASTICSEARCH || table.getType() == TableType.BROKER
|
||||
|| table.getType() == TableType.HIVE) {
|
||||
|| table.getType() == TableType.HIVE || table.getType() == TableType.JDBC) {
|
||||
sb.append("EXTERNAL ");
|
||||
}
|
||||
sb.append("TABLE ");
|
||||
@ -3052,6 +3052,13 @@ public class Env {
|
||||
sb.append("\nPROPERTIES (\n");
|
||||
sb.append(new PrintableMap<>(hudiTable.getTableProperties(), " = ", true, true, false).toString());
|
||||
sb.append("\n)");
|
||||
} else if (table.getType() == TableType.JDBC) {
|
||||
JdbcTable jdbcTable = (JdbcTable) table;
|
||||
addTableComment(jdbcTable, sb);
|
||||
sb.append("\nPROPERTIES (\n");
|
||||
sb.append("\"resource\" = \"").append(jdbcTable.getResourceName()).append("\",\n");
|
||||
sb.append("\"table\" = \"").append(jdbcTable.getJdbcTable()).append("\"");
|
||||
sb.append("\n)");
|
||||
}
|
||||
|
||||
createTableStmt.add(sb + ";");
|
||||
|
||||
@ -0,0 +1,196 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.proc.BaseProcResult;
|
||||
import org.apache.doris.common.util.Util;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* External JDBC Catalog resource for external table query.
|
||||
*
|
||||
* create external resource jdbc_mysql
|
||||
* properties (
|
||||
* "type"="jdbc",
|
||||
* "user"="root",
|
||||
* "password"="123456",
|
||||
* "jdbc_url"="jdbc:mysql://127.0.0.1:3306/test",
|
||||
* "driver_url"="http://127.0.0.1:8888/mysql-connector-java-5.1.47.jar",
|
||||
* "driver_class"="com.mysql.jdbc.Driver"
|
||||
* );
|
||||
*
|
||||
* DROP RESOURCE "jdbc_mysql";
|
||||
*/
|
||||
public class JdbcResource extends Resource {
|
||||
public static final String URL = "jdbc_url";
|
||||
public static final String USER = "user";
|
||||
public static final String PASSWORD = "password";
|
||||
public static final String DRIVER_CLASS = "driver_class";
|
||||
public static final String DRIVER_URL = "driver_url";
|
||||
public static final String TYPE = "type";
|
||||
public static final String CHECK_SUM = "checksum";
|
||||
private static final Logger LOG = LogManager.getLogger(JdbcResource.class);
|
||||
// timeout for both connection and read. 10 seconds is long enough.
|
||||
private static final int HTTP_TIMEOUT_MS = 10000;
|
||||
@SerializedName(value = "configs")
|
||||
private Map<String, String> configs;
|
||||
|
||||
public JdbcResource() {
|
||||
super();
|
||||
}
|
||||
|
||||
public JdbcResource(String name) {
|
||||
this(name, Maps.newHashMap());
|
||||
}
|
||||
|
||||
private JdbcResource(String name, Map<String, String> configs) {
|
||||
super(name, ResourceType.JDBC);
|
||||
this.configs = configs;
|
||||
}
|
||||
|
||||
public JdbcResource getCopiedResource() {
|
||||
return new JdbcResource(name, Maps.newHashMap(configs));
|
||||
}
|
||||
|
||||
private void checkProperties(String propertiesKey) throws DdlException {
|
||||
// check the properties key
|
||||
String value = configs.get(propertiesKey);
|
||||
if (value == null) {
|
||||
throw new DdlException("JdbcResource Missing " + propertiesKey + " in properties");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void modifyProperties(Map<String, String> properties) throws DdlException {
|
||||
// modify properties
|
||||
replaceIfEffectiveValue(this.configs, DRIVER_URL, properties.get(DRIVER_URL));
|
||||
replaceIfEffectiveValue(this.configs, DRIVER_CLASS, properties.get(DRIVER_CLASS));
|
||||
replaceIfEffectiveValue(this.configs, URL, properties.get(URL));
|
||||
replaceIfEffectiveValue(this.configs, USER, properties.get(USER));
|
||||
replaceIfEffectiveValue(this.configs, PASSWORD, properties.get(PASSWORD));
|
||||
replaceIfEffectiveValue(this.configs, TYPE, properties.get(TYPE));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkProperties(Map<String, String> properties) throws AnalysisException {
|
||||
Map<String, String> copiedProperties = Maps.newHashMap(properties);
|
||||
// check properties
|
||||
copiedProperties.remove(DRIVER_URL);
|
||||
copiedProperties.remove(DRIVER_CLASS);
|
||||
copiedProperties.remove(URL);
|
||||
copiedProperties.remove(USER);
|
||||
copiedProperties.remove(PASSWORD);
|
||||
copiedProperties.remove(TYPE);
|
||||
if (!copiedProperties.isEmpty()) {
|
||||
throw new AnalysisException("Unknown JDBC catalog resource properties: " + copiedProperties);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setProperties(Map<String, String> properties) throws DdlException {
|
||||
Preconditions.checkState(properties != null);
|
||||
for (String key : properties.keySet()) {
|
||||
if (!DRIVER_URL.equals(key) && !URL.equals(key) && !USER.equals(key) && !PASSWORD.equals(key)
|
||||
&& !TYPE.equals(key) && !DRIVER_CLASS.equals(key)) {
|
||||
throw new DdlException("JDBC resource Property of " + key + " is unknown");
|
||||
}
|
||||
}
|
||||
configs = properties;
|
||||
computeObjectChecksum();
|
||||
checkProperties(DRIVER_URL);
|
||||
checkProperties(DRIVER_CLASS);
|
||||
checkProperties(URL);
|
||||
checkProperties(USER);
|
||||
checkProperties(PASSWORD);
|
||||
checkProperties(TYPE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getCopiedProperties() {
|
||||
Map<String, String> copiedProperties = Maps.newHashMap(configs);
|
||||
return copiedProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void getProcNodeData(BaseProcResult result) {
|
||||
String lowerCaseType = type.name().toLowerCase();
|
||||
for (Map.Entry<String, String> entry : configs.entrySet()) {
|
||||
// it's dangerous to show password in show jdbc resource
|
||||
// so we use empty string to replace the real password
|
||||
if (entry.getKey().equals(PASSWORD)) {
|
||||
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), ""));
|
||||
} else {
|
||||
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public String getProperty(String propertiesKey) {
|
||||
// check the properties key
|
||||
String value = configs.get(propertiesKey);
|
||||
return value;
|
||||
}
|
||||
|
||||
private void computeObjectChecksum() throws DdlException {
|
||||
if (FeConstants.runningUnitTest) {
|
||||
// skip checking checksum when running ut
|
||||
return;
|
||||
}
|
||||
|
||||
InputStream inputStream = null;
|
||||
try {
|
||||
inputStream = Util.getInputStreamFromUrl(getProperty(DRIVER_URL), null, HTTP_TIMEOUT_MS, HTTP_TIMEOUT_MS);
|
||||
MessageDigest digest = MessageDigest.getInstance("MD5");
|
||||
byte[] buf = new byte[4096];
|
||||
int bytesRead = 0;
|
||||
do {
|
||||
bytesRead = inputStream.read(buf);
|
||||
if (bytesRead < 0) {
|
||||
break;
|
||||
}
|
||||
digest.update(buf, 0, bytesRead);
|
||||
} while (true);
|
||||
String checkSum = Hex.encodeHexString(digest.digest());
|
||||
configs.put(CHECK_SUM, checkSum);
|
||||
} catch (IOException e) {
|
||||
throw new DdlException(
|
||||
"compute driver checksum from url: " + getProperty(DRIVER_URL) + " meet an IOException.");
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new DdlException(
|
||||
"compute driver checksum from url: " + getProperty(DRIVER_URL) + " could not find algorithm.");
|
||||
}
|
||||
}
|
||||
}
|
||||
185
fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
Normal file
185
fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
Normal file
@ -0,0 +1,185 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.catalog.Resource.ResourceType;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.io.DeepCopy;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.thrift.TJdbcTable;
|
||||
import org.apache.doris.thrift.TOdbcTableType;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
import org.apache.doris.thrift.TTableType;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class JdbcTable extends Table {
|
||||
private static final Logger LOG = LogManager.getLogger(JdbcTable.class);
|
||||
|
||||
private static final String TABLE = "table";
|
||||
private static final String RESOURCE = "resource";
|
||||
private static final String TABLE_TYPE = "table_type";
|
||||
// TODO: We may have to support various types of databases like ODBC
|
||||
// map now jdbc external table Doris support now
|
||||
private static Map<String, TOdbcTableType> TABLE_TYPE_MAP;
|
||||
private String resourceName;
|
||||
private String externalTableName;
|
||||
private String jdbcTypeName;
|
||||
|
||||
static {
|
||||
Map<String, TOdbcTableType> tempMap = new HashMap<>();
|
||||
tempMap.put("mysql", TOdbcTableType.MYSQL);
|
||||
tempMap.put("postgresql", TOdbcTableType.POSTGRESQL);
|
||||
tempMap.put("sqlserver", TOdbcTableType.SQLSERVER);
|
||||
tempMap.put("oracle", TOdbcTableType.ORACLE);
|
||||
TABLE_TYPE_MAP = Collections.unmodifiableMap(tempMap);
|
||||
}
|
||||
|
||||
public JdbcTable() {
|
||||
super(TableType.JDBC);
|
||||
}
|
||||
|
||||
public JdbcTable(long id, String name, List<Column> schema, Map<String, String> properties)
|
||||
throws DdlException {
|
||||
super(id, name, TableType.JDBC, schema);
|
||||
validate(properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TTableDescriptor toThrift() {
|
||||
TJdbcTable tJdbcTable = new TJdbcTable();
|
||||
JdbcResource jdbcResource = (JdbcResource) (Env.getCurrentEnv().getResourceMgr().getResource(resourceName));
|
||||
tJdbcTable.setJdbcUrl(jdbcResource.getProperty(JdbcResource.URL));
|
||||
tJdbcTable.setJdbcUser(jdbcResource.getProperty(JdbcResource.USER));
|
||||
tJdbcTable.setJdbcPassword(jdbcResource.getProperty(JdbcResource.PASSWORD));
|
||||
tJdbcTable.setJdbcTableName(externalTableName);
|
||||
tJdbcTable.setJdbcDriverClass(jdbcResource.getProperty(JdbcResource.DRIVER_CLASS));
|
||||
tJdbcTable.setJdbcDriverUrl(jdbcResource.getProperty(JdbcResource.DRIVER_URL));
|
||||
tJdbcTable.setJdbcResourceName(jdbcResource.getName());
|
||||
tJdbcTable.setJdbcDriverChecksum(jdbcResource.getProperty(JdbcResource.CHECK_SUM));
|
||||
|
||||
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.JDBC_TABLE, fullSchema.size(), 0,
|
||||
getName(), "");
|
||||
tTableDescriptor.setJdbcTable(tJdbcTable);
|
||||
return tTableDescriptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
super.write(out);
|
||||
Text.writeString(out, externalTableName);
|
||||
Text.writeString(out, resourceName);
|
||||
Text.writeString(out, jdbcTypeName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
super.readFields(in);
|
||||
externalTableName = Text.readString(in);
|
||||
resourceName = Text.readString(in);
|
||||
jdbcTypeName = Text.readString(in);
|
||||
}
|
||||
|
||||
public String getResourceName() {
|
||||
return resourceName;
|
||||
}
|
||||
|
||||
public String getJdbcTable() {
|
||||
return externalTableName;
|
||||
}
|
||||
|
||||
public String getTableTypeName() {
|
||||
return jdbcTypeName;
|
||||
}
|
||||
|
||||
public TOdbcTableType getJdbcTableType() {
|
||||
return TABLE_TYPE_MAP.get(getTableTypeName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSignature(int signatureVersion) {
|
||||
JdbcResource jdbcResource = (JdbcResource) (Env.getCurrentEnv().getResourceMgr().getResource(resourceName));
|
||||
StringBuilder sb = new StringBuilder(signatureVersion);
|
||||
sb.append(name);
|
||||
sb.append(type);
|
||||
sb.append(resourceName);
|
||||
sb.append(externalTableName);
|
||||
sb.append(jdbcTypeName);
|
||||
sb.append(jdbcResource.getProperty(JdbcResource.URL));
|
||||
sb.append(jdbcResource.getProperty(JdbcResource.USER));
|
||||
sb.append(jdbcResource.getProperty(JdbcResource.PASSWORD));
|
||||
sb.append(jdbcResource.getProperty(JdbcResource.DRIVER_CLASS));
|
||||
sb.append(jdbcResource.getProperty(JdbcResource.DRIVER_URL));
|
||||
|
||||
String md5 = DigestUtils.md5Hex(sb.toString());
|
||||
LOG.debug("get signature of odbc table {}: {}. signature string: {}", name, md5, sb.toString());
|
||||
return md5;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JdbcTable clone() {
|
||||
JdbcTable copied = new JdbcTable();
|
||||
if (!DeepCopy.copy(this, copied, JdbcTable.class, FeConstants.meta_version)) {
|
||||
LOG.warn("failed to copy jdbc table: " + getName());
|
||||
return null;
|
||||
}
|
||||
return copied;
|
||||
}
|
||||
|
||||
private void validate(Map<String, String> properties) throws DdlException {
|
||||
if (properties == null) {
|
||||
throw new DdlException("Please set properties of jdbc table, "
|
||||
+ "they are: host, port, user, password, database and table");
|
||||
}
|
||||
|
||||
externalTableName = properties.get(TABLE);
|
||||
if (Strings.isNullOrEmpty(externalTableName)) {
|
||||
throw new DdlException("property " + TABLE + " must be set");
|
||||
}
|
||||
|
||||
resourceName = properties.get(RESOURCE);
|
||||
if (Strings.isNullOrEmpty(resourceName)) {
|
||||
throw new DdlException("property " + RESOURCE + " must be set");
|
||||
}
|
||||
|
||||
jdbcTypeName = properties.get(TABLE_TYPE);
|
||||
if (Strings.isNullOrEmpty(jdbcTypeName)) {
|
||||
throw new DdlException("property " + TABLE_TYPE + " must be set");
|
||||
}
|
||||
|
||||
Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(resourceName);
|
||||
if (resource == null) {
|
||||
throw new DdlException("jdbc resource [" + resourceName + "] not exists");
|
||||
}
|
||||
if (resource.getType() != ResourceType.JDBC) {
|
||||
throw new DdlException("resource [" + resourceName + "] is not jdbc resource");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -44,7 +44,8 @@ public abstract class Resource implements Writable {
|
||||
UNKNOWN,
|
||||
SPARK,
|
||||
ODBC_CATALOG,
|
||||
S3;
|
||||
S3,
|
||||
JDBC;
|
||||
|
||||
public static ResourceType fromString(String resourceType) {
|
||||
for (ResourceType type : ResourceType.values()) {
|
||||
@ -95,6 +96,9 @@ public abstract class Resource implements Writable {
|
||||
case S3:
|
||||
resource = new S3Resource(name);
|
||||
break;
|
||||
case JDBC:
|
||||
resource = new JdbcResource(name);
|
||||
break;
|
||||
default:
|
||||
throw new DdlException("Unknown resource type: " + type);
|
||||
}
|
||||
|
||||
@ -72,8 +72,9 @@ public class ResourceMgr implements Writable {
|
||||
public void createResource(CreateResourceStmt stmt) throws DdlException {
|
||||
if (stmt.getResourceType() != ResourceType.SPARK
|
||||
&& stmt.getResourceType() != ResourceType.ODBC_CATALOG
|
||||
&& stmt.getResourceType() != ResourceType.S3) {
|
||||
throw new DdlException("Only support SPARK, ODBC_CATALOG and REMOTE_STORAGE resource.");
|
||||
&& stmt.getResourceType() != ResourceType.S3
|
||||
&& stmt.getResourceType() != ResourceType.JDBC) {
|
||||
throw new DdlException("Only support SPARK, ODBC_CATALOG ,JDBC, and REMOTE_STORAGE resource.");
|
||||
}
|
||||
Resource resource = Resource.fromStmt(stmt);
|
||||
createResource(resource);
|
||||
|
||||
@ -347,6 +347,8 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
|
||||
table = new IcebergTable();
|
||||
} else if (type == TableType.HUDI) {
|
||||
table = new HudiTable();
|
||||
} else if (type == TableType.JDBC) {
|
||||
table = new JdbcTable();
|
||||
} else {
|
||||
throw new IOException("Unknown table type: " + type.name());
|
||||
}
|
||||
|
||||
@ -99,7 +99,7 @@ public interface TableIf {
|
||||
* Doris table type.
|
||||
*/
|
||||
enum TableType {
|
||||
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI,
|
||||
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, JDBC,
|
||||
TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW;
|
||||
|
||||
public String toEngineName() {
|
||||
@ -124,6 +124,8 @@ public interface TableIf {
|
||||
return "Hive";
|
||||
case HUDI:
|
||||
return "Hudi";
|
||||
case JDBC:
|
||||
return "jdbc";
|
||||
case TABLE_VALUED_FUNCTION:
|
||||
return "Table_Valued_Function";
|
||||
case HMS_EXTERNAL_TABLE:
|
||||
@ -150,6 +152,7 @@ public interface TableIf {
|
||||
case ELASTICSEARCH:
|
||||
case HIVE:
|
||||
case HUDI:
|
||||
case JDBC:
|
||||
case TABLE_VALUED_FUNCTION:
|
||||
case HMS_EXTERNAL_TABLE:
|
||||
case ES_EXTERNAL_TABLE:
|
||||
|
||||
@ -79,6 +79,7 @@ import org.apache.doris.catalog.HiveTable;
|
||||
import org.apache.doris.catalog.IcebergTable;
|
||||
import org.apache.doris.catalog.Index;
|
||||
import org.apache.doris.catalog.InfoSchemaDb;
|
||||
import org.apache.doris.catalog.JdbcTable;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.ListPartitionItem;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
@ -1051,6 +1052,9 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
} else if (engineName.equalsIgnoreCase("hudi")) {
|
||||
createHudiTable(db, stmt);
|
||||
return;
|
||||
} else if (engineName.equalsIgnoreCase("jdbc")) {
|
||||
createJdbcTable(db, stmt);
|
||||
return;
|
||||
} else {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName);
|
||||
}
|
||||
@ -2234,6 +2238,21 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
LOG.info("successfully create table[{}-{}]", tableName, tableId);
|
||||
}
|
||||
|
||||
private void createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
|
||||
String tableName = stmt.getTableName();
|
||||
List<Column> columns = stmt.getColumns();
|
||||
|
||||
long tableId = Env.getCurrentEnv().getNextId();
|
||||
|
||||
JdbcTable jdbcTable = new JdbcTable(tableId, tableName, columns, stmt.getProperties());
|
||||
jdbcTable.setComment(stmt.getComment());
|
||||
// check table if exists
|
||||
if (!db.createTableWithLock(jdbcTable, false, stmt.isSetIfNotExists()).first) {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
|
||||
}
|
||||
LOG.info("successfully create table[{}-{}]", tableName, tableId);
|
||||
}
|
||||
|
||||
private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
|
||||
DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta,
|
||||
Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer) throws DdlException {
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.alter.SchemaChangeJobV2;
|
||||
import org.apache.doris.catalog.ArrayType;
|
||||
import org.apache.doris.catalog.DistributionInfo;
|
||||
import org.apache.doris.catalog.HashDistributionInfo;
|
||||
import org.apache.doris.catalog.JdbcResource;
|
||||
import org.apache.doris.catalog.MapType;
|
||||
import org.apache.doris.catalog.OdbcCatalogResource;
|
||||
import org.apache.doris.catalog.RandomDistributionInfo;
|
||||
@ -120,7 +121,8 @@ public class GsonUtils {
|
||||
.of(Resource.class, "clazz")
|
||||
.registerSubtype(SparkResource.class, SparkResource.class.getSimpleName())
|
||||
.registerSubtype(OdbcCatalogResource.class, OdbcCatalogResource.class.getSimpleName())
|
||||
.registerSubtype(S3Resource.class, S3Resource.class.getSimpleName());
|
||||
.registerSubtype(S3Resource.class, S3Resource.class.getSimpleName())
|
||||
.registerSubtype(JdbcResource.class, JdbcResource.class.getSimpleName());
|
||||
|
||||
// runtime adapter for class "AlterJobV2"
|
||||
private static RuntimeTypeAdapterFactory<AlterJobV2> alterJobV2TypeAdapterFactory = RuntimeTypeAdapterFactory
|
||||
|
||||
@ -0,0 +1,195 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.ExprSubstitutionMap;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.JdbcTable;
|
||||
import org.apache.doris.catalog.OdbcTable;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.statistics.StatsRecursiveDerive;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TJdbcScanNode;
|
||||
import org.apache.doris.thrift.TOdbcTableType;
|
||||
import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class JdbcScanNode extends ScanNode {
|
||||
private static final Logger LOG = LogManager.getLogger(JdbcScanNode.class);
|
||||
|
||||
private final List<String> columns = new ArrayList<String>();
|
||||
private final List<String> filters = new ArrayList<String>();
|
||||
private String tableName;
|
||||
private TOdbcTableType jdbcType;
|
||||
|
||||
public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, JdbcTable tbl) {
|
||||
super(id, desc, "SCAN JDBC", StatisticalType.JDBC_SCAN_NODE);
|
||||
jdbcType = tbl.getJdbcTableType();
|
||||
tableName = OdbcTable.databaseProperName(jdbcType, tbl.getJdbcTable());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
computeStats(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
|
||||
return null;
|
||||
}
|
||||
|
||||
private void createJdbcFilters(Analyzer analyzer) {
|
||||
if (conjuncts.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<SlotRef> slotRefs = Lists.newArrayList();
|
||||
Expr.collectList(conjuncts, SlotRef.class, slotRefs);
|
||||
ExprSubstitutionMap sMap = new ExprSubstitutionMap();
|
||||
for (SlotRef slotRef : slotRefs) {
|
||||
SlotRef slotRef1 = (SlotRef) slotRef.clone();
|
||||
slotRef1.setTblName(null);
|
||||
slotRef1.setLabel(OdbcTable.databaseProperName(jdbcType, slotRef1.getColumnName()));
|
||||
sMap.put(slotRef, slotRef1);
|
||||
}
|
||||
|
||||
ArrayList<Expr> conjunctsList = Expr.cloneList(conjuncts, sMap);
|
||||
for (Expr p : conjunctsList) {
|
||||
if (OdbcScanNode.shouldPushDownConjunct(jdbcType, p)) {
|
||||
String filter = p.toMySql();
|
||||
filters.add(filter);
|
||||
conjuncts.remove(p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void createJdbcColumns(Analyzer analyzer) {
|
||||
for (SlotDescriptor slot : desc.getSlots()) {
|
||||
if (!slot.isMaterialized()) {
|
||||
continue;
|
||||
}
|
||||
Column col = slot.getColumn();
|
||||
columns.add(col.getName());
|
||||
}
|
||||
if (0 == columns.size()) {
|
||||
columns.add("*");
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldPushDownLimit() {
|
||||
return limit != -1 && conjuncts.isEmpty();
|
||||
}
|
||||
|
||||
private String getJdbcQueryStr() {
|
||||
StringBuilder sql = new StringBuilder("SELECT ");
|
||||
|
||||
// Oracle use the where clause to do top n
|
||||
if (shouldPushDownLimit() && jdbcType == TOdbcTableType.ORACLE) {
|
||||
filters.add("ROWNUM <= " + limit);
|
||||
}
|
||||
|
||||
// MSSQL use select top to do top n
|
||||
if (shouldPushDownLimit() && jdbcType == TOdbcTableType.SQLSERVER) {
|
||||
sql.append("TOP " + limit + " ");
|
||||
}
|
||||
|
||||
sql.append(Joiner.on(", ").join(columns));
|
||||
sql.append(" FROM ").append(tableName);
|
||||
|
||||
if (!filters.isEmpty()) {
|
||||
sql.append(" WHERE (");
|
||||
sql.append(Joiner.on(") AND (").join(filters));
|
||||
sql.append(")");
|
||||
}
|
||||
|
||||
// Other DataBase use limit do top n
|
||||
if (shouldPushDownLimit()
|
||||
&& (jdbcType == TOdbcTableType.MYSQL
|
||||
|| jdbcType == TOdbcTableType.POSTGRESQL
|
||||
|| jdbcType == TOdbcTableType.MONGODB)) {
|
||||
sql.append(" LIMIT ").append(limit);
|
||||
}
|
||||
|
||||
return sql.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
|
||||
StringBuilder output = new StringBuilder();
|
||||
output.append(prefix).append("TABLE: ").append(tableName).append("\n");
|
||||
if (detailLevel == TExplainLevel.BRIEF) {
|
||||
return output.toString();
|
||||
}
|
||||
output.append(prefix).append("QUERY: ").append(getJdbcQueryStr()).append("\n");
|
||||
return output.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalize(Analyzer analyzer) throws UserException {
|
||||
// Convert predicates to Jdbc columns and filters.
|
||||
createJdbcColumns(analyzer);
|
||||
createJdbcFilters(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeStats(Analyzer analyzer) throws UserException {
|
||||
super.computeStats(analyzer);
|
||||
// even if current node scan has no data,at least on backend will be assigned when the fragment actually execute
|
||||
numNodes = numNodes <= 0 ? 1 : numNodes;
|
||||
|
||||
StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this);
|
||||
cardinality = statsDeriveResult.getRowCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toThrift(TPlanNode msg) {
|
||||
msg.node_type = TPlanNodeType.JDBC_SCAN_NODE;
|
||||
msg.jdbc_scan_node = new TJdbcScanNode();
|
||||
msg.jdbc_scan_node.setTupleId(desc.getId().asInt());
|
||||
msg.jdbc_scan_node.setTableName(tableName);
|
||||
msg.jdbc_scan_node.setQueryString(getJdbcQueryStr());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String debugString() {
|
||||
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
|
||||
return helper.addValue(super.debugString()).toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumInstances() {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
@ -53,7 +53,7 @@ public class OdbcScanNode extends ScanNode {
|
||||
|
||||
// Now some database have different function call like doris, now doris do not
|
||||
// push down the function call except MYSQL
|
||||
private static boolean shouldPushDownConjunct(TOdbcTableType tableType, Expr expr) {
|
||||
public static boolean shouldPushDownConjunct(TOdbcTableType tableType, Expr expr) {
|
||||
if (!tableType.equals(TOdbcTableType.MYSQL)) {
|
||||
List<FunctionCallExpr> fnExprList = Lists.newArrayList();
|
||||
expr.collect(FunctionCallExpr.class, fnExprList);
|
||||
|
||||
@ -55,6 +55,7 @@ import org.apache.doris.catalog.AggregateFunction;
|
||||
import org.apache.doris.catalog.AggregateType;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.FunctionSet;
|
||||
import org.apache.doris.catalog.JdbcTable;
|
||||
import org.apache.doris.catalog.MysqlTable;
|
||||
import org.apache.doris.catalog.OdbcTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
@ -1765,6 +1766,9 @@ public class SingleNodePlanner {
|
||||
scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HudiScanNode",
|
||||
null, -1);
|
||||
break;
|
||||
case JDBC:
|
||||
scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (JdbcTable) tblRef.getTable());
|
||||
break;
|
||||
case TABLE_VALUED_FUNCTION:
|
||||
scanNode = new TableValuedFunctionScanNode(ctx.getNextNodeId(), tblRef.getDesc(),
|
||||
"TableValuedFunctionScanNode", ((TableValuedFunctionRef) tblRef).getTableFunction());
|
||||
|
||||
@ -46,4 +46,5 @@ public enum StatisticalType {
|
||||
UNION_NODE,
|
||||
TABLE_VALUED_FUNCTION_NODE,
|
||||
FILE_SCAN_NODE,
|
||||
JDBC_SCAN_NODE,
|
||||
}
|
||||
|
||||
167
fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
Normal file
167
fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
Normal file
@ -0,0 +1,167 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.udf;
|
||||
|
||||
|
||||
import org.apache.doris.thrift.TJdbcExecutorCtorParams;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.thrift.TDeserializer;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URLClassLoader;
|
||||
import java.sql.Connection;
|
||||
import java.sql.Date;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class JdbcExecutor {
|
||||
private static final Logger LOG = Logger.getLogger(JdbcExecutor.class);
|
||||
private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
|
||||
private URLClassLoader classLoader = null;
|
||||
private Connection conn = null;
|
||||
private Statement stmt = null;
|
||||
private ResultSet resultSet = null;
|
||||
private ResultSetMetaData resultSetMetaData = null;
|
||||
|
||||
public JdbcExecutor(byte[] thriftParams) throws Exception {
|
||||
TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
|
||||
TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY);
|
||||
try {
|
||||
deserializer.deserialize(request, thriftParams);
|
||||
} catch (TException e) {
|
||||
throw new InternalException(e.getMessage());
|
||||
}
|
||||
init(request.jar_location_path, request.jdbc_driver_class, request.jdbc_url, request.jdbc_user,
|
||||
request.jdbc_password);
|
||||
}
|
||||
|
||||
public void close() throws Exception {
|
||||
if (resultSet != null) {
|
||||
resultSet.close();
|
||||
}
|
||||
if (stmt != null) {
|
||||
stmt.close();
|
||||
}
|
||||
if (conn != null) {
|
||||
conn.close();
|
||||
}
|
||||
if (classLoader != null) {
|
||||
classLoader.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public int querySQL(String sql) throws UdfRuntimeException {
|
||||
try {
|
||||
boolean res = stmt.execute(sql);
|
||||
if (res) { // sql query
|
||||
resultSet = stmt.getResultSet();
|
||||
resultSetMetaData = resultSet.getMetaData();
|
||||
return resultSetMetaData.getColumnCount();
|
||||
} else { //TODO: update query
|
||||
return 0;
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new UdfRuntimeException("JDBC executor sql has error: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<List<Object>> getBlock(int batchSize) throws UdfRuntimeException {
|
||||
List<List<Object>> block = null;
|
||||
try {
|
||||
int columnCount = resultSetMetaData.getColumnCount();
|
||||
block = new ArrayList<>(columnCount);
|
||||
for (int i = 0; i < columnCount; ++i) {
|
||||
block.add(new ArrayList<>(batchSize));
|
||||
}
|
||||
int numRows = 0;
|
||||
do {
|
||||
for (int i = 0; i < columnCount; ++i) {
|
||||
block.get(i).add(resultSet.getObject(i + 1));
|
||||
}
|
||||
numRows++;
|
||||
} while (numRows < batchSize && resultSet.next());
|
||||
} catch (SQLException e) {
|
||||
throw new UdfRuntimeException("get next block failed: ", e);
|
||||
} catch (Exception e) {
|
||||
throw new UdfRuntimeException("unable to get next : ", e);
|
||||
}
|
||||
return block;
|
||||
}
|
||||
|
||||
public boolean hasNext() throws UdfRuntimeException {
|
||||
try {
|
||||
if (resultSet == null) {
|
||||
return false;
|
||||
}
|
||||
return resultSet.next();
|
||||
} catch (SQLException e) {
|
||||
throw new UdfRuntimeException("resultSet to get next error: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
public long convertDateToLong(Object obj) {
|
||||
LocalDate date = ((Date) obj).toLocalDate();
|
||||
long time = UdfUtils.convertDateTimeToLong(date.getYear(), date.getMonthValue(), date.getDayOfMonth(),
|
||||
0, 0, 0, true);
|
||||
return time;
|
||||
}
|
||||
|
||||
public long convertDateTimeToLong(Object obj) {
|
||||
LocalDateTime date = ((Timestamp) obj).toLocalDateTime();
|
||||
long time = UdfUtils.convertDateTimeToLong(date.getYear(), date.getMonthValue(), date.getDayOfMonth(),
|
||||
date.getHour(), date.getMinute(), date.getSecond(), false);
|
||||
return time;
|
||||
}
|
||||
|
||||
private void init(String driverPath, String driverClass, String jdbcUrl, String jdbcUser, String jdbcPassword)
|
||||
throws UdfRuntimeException {
|
||||
try {
|
||||
ClassLoader loader;
|
||||
if (driverPath != null) {
|
||||
ClassLoader parent = getClass().getClassLoader();
|
||||
classLoader = UdfUtils.getClassLoader(driverPath, parent);
|
||||
loader = classLoader;
|
||||
} else {
|
||||
loader = ClassLoader.getSystemClassLoader();
|
||||
}
|
||||
Class.forName(driverClass, true, loader);
|
||||
conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
|
||||
stmt = conn.createStatement();
|
||||
} catch (MalformedURLException e) {
|
||||
throw new UdfRuntimeException("MalformedURLException to load class about " + driverPath, e);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new UdfRuntimeException("Loading JDBC class error ClassNotFoundException about " + driverClass, e);
|
||||
} catch (SQLException e) {
|
||||
throw new UdfRuntimeException("Connection JDBC class error about " + jdbcUrl, e);
|
||||
} catch (Exception e) {
|
||||
throw new UdfRuntimeException("unable to init jdbc executor Exception ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -274,6 +274,17 @@ struct THudiTable {
|
||||
3: optional map<string, string> properties
|
||||
}
|
||||
|
||||
struct TJdbcTable {
|
||||
1: optional string jdbc_url
|
||||
2: optional string jdbc_table_name
|
||||
3: optional string jdbc_user
|
||||
4: optional string jdbc_password
|
||||
5: optional string jdbc_driver_url
|
||||
6: optional string jdbc_resource_name
|
||||
7: optional string jdbc_driver_class
|
||||
8: optional string jdbc_driver_checksum
|
||||
}
|
||||
|
||||
// "Union" of all table types.
|
||||
struct TTableDescriptor {
|
||||
1: required Types.TTableId id
|
||||
@ -295,6 +306,7 @@ struct TTableDescriptor {
|
||||
17: optional THiveTable hiveTable
|
||||
18: optional TIcebergTable icebergTable
|
||||
19: optional THudiTable hudiTable
|
||||
20: optional TJdbcTable jdbcTable
|
||||
}
|
||||
|
||||
struct TDescriptorTable {
|
||||
|
||||
@ -55,6 +55,7 @@ enum TPlanNodeType {
|
||||
TABLE_FUNCTION_NODE,
|
||||
TABLE_VALUED_FUNCTION_SCAN_NODE,
|
||||
FILE_SCAN_NODE,
|
||||
JDBC_SCAN_NODE,
|
||||
}
|
||||
|
||||
// phases of an execution node
|
||||
@ -312,6 +313,12 @@ struct TOdbcScanNode {
|
||||
8: optional string query_string
|
||||
}
|
||||
|
||||
struct TJdbcScanNode {
|
||||
1: optional Types.TTupleId tuple_id
|
||||
2: optional string table_name
|
||||
3: optional string query_string
|
||||
}
|
||||
|
||||
|
||||
struct TBrokerScanNode {
|
||||
1: required Types.TTupleId tuple_id
|
||||
@ -887,6 +894,7 @@ struct TPlanNode {
|
||||
|
||||
// file scan node
|
||||
44: optional TFileScanNode file_scan_node
|
||||
45: optional TJdbcScanNode jdbc_scan_node
|
||||
|
||||
101: optional list<Exprs.TExpr> projections
|
||||
102: optional Types.TTupleId output_tuple_id
|
||||
|
||||
@ -356,6 +356,23 @@ struct TFunction {
|
||||
13: optional bool vectorized = false
|
||||
}
|
||||
|
||||
struct TJdbcExecutorCtorParams {
|
||||
// Local path to the UDF's jar file
|
||||
1: optional string jar_location_path
|
||||
|
||||
// "jdbc:mysql://127.0.0.1:3307/test";
|
||||
2: optional string jdbc_url
|
||||
|
||||
//root
|
||||
3: optional string jdbc_user
|
||||
|
||||
//password
|
||||
4: optional string jdbc_password
|
||||
|
||||
//"com.mysql.jdbc.Driver"
|
||||
5: optional string jdbc_driver_class
|
||||
}
|
||||
|
||||
struct TJavaUdfExecutorCtorParams {
|
||||
1: optional TFunction fn
|
||||
|
||||
@ -517,7 +534,8 @@ enum TTableType {
|
||||
ODBC_TABLE,
|
||||
HIVE_TABLE,
|
||||
ICEBERG_TABLE,
|
||||
HUDI_TABLE
|
||||
HUDI_TABLE,
|
||||
JDBC_TABLE
|
||||
}
|
||||
|
||||
enum TOdbcTableType {
|
||||
|
||||
Reference in New Issue
Block a user