742 lines
32 KiB
C++
742 lines
32 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include "vec/exec/vjdbc_connector.h"
|
|
|
|
#include <gen_cpp/Types_types.h>
|
|
|
|
#include <algorithm>
|
|
#include <boost/iterator/iterator_facade.hpp>
|
|
// IWYU pragma: no_include <bits/std_abs.h>
|
|
#include <cmath> // IWYU pragma: keep
|
|
#include <memory>
|
|
#include <ostream>
|
|
#include <utility>
|
|
|
|
#include "common/logging.h"
|
|
#include "common/status.h"
|
|
#include "exec/table_connector.h"
|
|
#include "gutil/strings/substitute.h"
|
|
#include "jni.h"
|
|
#include "runtime/define_primitive_type.h"
|
|
#include "runtime/descriptors.h"
|
|
#include "runtime/runtime_state.h"
|
|
#include "runtime/types.h"
|
|
#include "runtime/user_function_cache.h"
|
|
#include "util/jni-util.h"
|
|
#include "util/runtime_profile.h"
|
|
#include "vec/columns/column_nullable.h"
|
|
#include "vec/core/block.h"
|
|
#include "vec/data_types/data_type_nullable.h"
|
|
#include "vec/data_types/data_type_string.h"
|
|
#include "vec/exec/jni_connector.h"
|
|
#include "vec/exprs/vexpr.h"
|
|
#include "vec/functions/simple_function_factory.h"
|
|
#include "vec/io/reader_buffer.h"
|
|
|
|
namespace doris::vectorized {
|
|
const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/jdbc/JdbcExecutor";
|
|
const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V";
|
|
const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I";
|
|
const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
|
|
const char* JDBC_EXECUTOR_GET_TYPES_SIGNATURE = "()Ljava/util/List;";
|
|
const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V";
|
|
const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V";
|
|
|
|
JdbcConnector::JdbcConnector(const JdbcConnectorParam& param)
|
|
: TableConnector(param.tuple_desc, param.use_transaction, param.table_name,
|
|
param.query_string),
|
|
_conn_param(param),
|
|
_closed(false) {}
|
|
|
|
JdbcConnector::~JdbcConnector() {
|
|
if (!_closed) {
|
|
static_cast<void>(close());
|
|
}
|
|
}
|
|
|
|
#define GET_BASIC_JAVA_CLAZZ(JAVA_TYPE, CPP_TYPE) \
|
|
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JAVA_TYPE, &_executor_##CPP_TYPE##_clazz));
|
|
|
|
#define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz);
|
|
|
|
Status JdbcConnector::close(Status /*unused*/) {
|
|
SCOPED_RAW_TIMER(&_jdbc_statistic._connector_close_timer);
|
|
_closed = true;
|
|
if (!_is_open) {
|
|
return Status::OK();
|
|
}
|
|
if (_is_in_transaction) {
|
|
static_cast<void>(abort_trans());
|
|
}
|
|
JNIEnv* env;
|
|
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
|
env->DeleteGlobalRef(_executor_clazz);
|
|
DELETE_BASIC_JAVA_CLAZZ_REF(object)
|
|
DELETE_BASIC_JAVA_CLAZZ_REF(string)
|
|
DELETE_BASIC_JAVA_CLAZZ_REF(list)
|
|
#undef DELETE_BASIC_JAVA_CLAZZ_REF
|
|
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_close_id);
|
|
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
|
|
env->DeleteGlobalRef(_executor_obj);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status JdbcConnector::open(RuntimeState* state, bool read) {
|
|
if (_is_open) {
|
|
LOG(INFO) << "this scanner of jdbc already opened";
|
|
return Status::OK();
|
|
}
|
|
|
|
JNIEnv* env = nullptr;
|
|
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
|
RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, JDBC_EXECUTOR_CLASS, &_executor_clazz));
|
|
GET_BASIC_JAVA_CLAZZ("java/util/List", list)
|
|
GET_BASIC_JAVA_CLAZZ("java/lang/Object", object)
|
|
GET_BASIC_JAVA_CLAZZ("java/lang/String", string)
|
|
|
|
#undef GET_BASIC_JAVA_CLAZZ
|
|
RETURN_IF_ERROR(_register_func_id(env));
|
|
|
|
// Add a scoped cleanup jni reference object. This cleans up local refs made below.
|
|
JniLocalFrame jni_frame;
|
|
{
|
|
std::string local_location;
|
|
std::hash<std::string> hash_str;
|
|
auto function_cache = UserFunctionCache::instance();
|
|
if (_conn_param.resource_name.empty()) {
|
|
// for jdbcExternalTable, _conn_param.resource_name == ""
|
|
// so, we use _conn_param.driver_path as key of jarpath
|
|
SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
|
|
RETURN_IF_ERROR(function_cache->get_jarpath(
|
|
std::abs((int64_t)hash_str(_conn_param.driver_path)), _conn_param.driver_path,
|
|
_conn_param.driver_checksum, &local_location));
|
|
} else {
|
|
SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
|
|
RETURN_IF_ERROR(function_cache->get_jarpath(
|
|
std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path,
|
|
_conn_param.driver_checksum, &local_location));
|
|
}
|
|
VLOG_QUERY << "driver local path = " << local_location;
|
|
|
|
TJdbcExecutorCtorParams ctor_params;
|
|
ctor_params.__set_statement(_sql_str);
|
|
ctor_params.__set_jdbc_url(_conn_param.jdbc_url);
|
|
ctor_params.__set_jdbc_user(_conn_param.user);
|
|
ctor_params.__set_jdbc_password(_conn_param.passwd);
|
|
ctor_params.__set_jdbc_driver_class(_conn_param.driver_class);
|
|
ctor_params.__set_driver_path(local_location);
|
|
ctor_params.__set_batch_size(read ? state->batch_size() : 0);
|
|
ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE);
|
|
ctor_params.__set_table_type(_conn_param.table_type);
|
|
|
|
jbyteArray ctor_params_bytes;
|
|
// Pushed frame will be popped when jni_frame goes out-of-scope.
|
|
RETURN_IF_ERROR(jni_frame.push(env));
|
|
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
|
|
{
|
|
SCOPED_RAW_TIMER(&_jdbc_statistic._init_connector_timer);
|
|
_executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes);
|
|
}
|
|
jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
|
|
env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
|
|
env->DeleteLocalRef(ctor_params_bytes);
|
|
}
|
|
RETURN_ERROR_IF_EXC(env);
|
|
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _executor_obj, &_executor_obj));
|
|
_is_open = true;
|
|
static_cast<void>(begin_trans());
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status JdbcConnector::query() {
|
|
if (!_is_open) {
|
|
return Status::InternalError("Query before open of JdbcConnector.");
|
|
}
|
|
// check materialize num equal
|
|
int materialize_num = 0;
|
|
for (int i = 0; i < _tuple_desc->slots().size(); ++i) {
|
|
if (_tuple_desc->slots()[i]->is_materialized()) {
|
|
materialize_num++;
|
|
}
|
|
}
|
|
|
|
JNIEnv* env = nullptr;
|
|
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
|
{
|
|
SCOPED_RAW_TIMER(&_jdbc_statistic._execte_read_timer);
|
|
jint colunm_count =
|
|
env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_read_id);
|
|
if (auto status = JniUtil::GetJniExceptionMsg(env); !status) {
|
|
return Status::InternalError("GetJniExceptionMsg meet error, query={}, msg={}",
|
|
_conn_param.query_string, status.to_string());
|
|
}
|
|
if (colunm_count != materialize_num) {
|
|
return Status::InternalError("input and output column num not equal of jdbc query.");
|
|
}
|
|
}
|
|
|
|
LOG(INFO) << "JdbcConnector::query has exec success: " << _sql_str;
|
|
if (_conn_param.table_type != TOdbcTableType::NEBULA) {
|
|
RETURN_IF_ERROR(_check_column_type());
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status JdbcConnector::get_next(bool* eos, Block* block, int batch_size) {
|
|
if (!_is_open) {
|
|
return Status::InternalError("get_next before open of jdbc connector.");
|
|
}
|
|
SCOPED_RAW_TIMER(&_jdbc_statistic._get_data_timer);
|
|
JNIEnv* env = nullptr;
|
|
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
|
jboolean has_next =
|
|
env->CallNonvirtualBooleanMethod(_executor_obj, _executor_clazz, _executor_has_next_id);
|
|
if (has_next != JNI_TRUE) {
|
|
*eos = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
|
|
|
|
auto column_size = _tuple_desc->slots().size();
|
|
auto slots = _tuple_desc->slots();
|
|
|
|
jobject map = _get_reader_params(block, env, column_size);
|
|
SCOPED_RAW_TIMER(&_jdbc_statistic._get_block_address_timer);
|
|
long address =
|
|
env->CallLongMethod(_executor_obj, _executor_get_block_address_id, batch_size, map);
|
|
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
|
|
env->DeleteLocalRef(map);
|
|
|
|
std::vector<size_t> all_columns;
|
|
for (size_t i = 0; i < column_size; ++i) {
|
|
all_columns.push_back(i);
|
|
}
|
|
SCOPED_RAW_TIMER(&_jdbc_statistic._fill_block_timer);
|
|
Status fill_block_status = JniConnector::fill_block(block, all_columns, address);
|
|
if (!fill_block_status) {
|
|
return fill_block_status;
|
|
}
|
|
|
|
Status cast_status = _cast_string_to_special(block, env, column_size);
|
|
|
|
if (!cast_status) {
|
|
return cast_status;
|
|
}
|
|
|
|
return JniUtil::GetJniExceptionMsg(env);
|
|
}
|
|
|
|
Status JdbcConnector::append(vectorized::Block* block,
|
|
const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
|
|
uint32_t start_send_row, uint32_t* num_rows_sent,
|
|
TOdbcTableType::type table_type) {
|
|
RETURN_IF_ERROR(exec_stmt_write(block, output_vexpr_ctxs, num_rows_sent));
|
|
COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status JdbcConnector::exec_stmt_write(Block* block, const VExprContextSPtrs& output_vexpr_ctxs,
|
|
uint32_t* num_rows_sent) {
|
|
SCOPED_TIMER(_result_send_timer);
|
|
JNIEnv* env = nullptr;
|
|
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
|
|
|
// prepare table meta information
|
|
std::unique_ptr<long[]> meta_data;
|
|
RETURN_IF_ERROR(JniConnector::to_java_table(block, meta_data));
|
|
long meta_address = (long)meta_data.get();
|
|
auto table_schema = JniConnector::parse_table_schema(block);
|
|
|
|
// prepare constructor parameters
|
|
std::map<String, String> write_params = {{"meta_address", std::to_string(meta_address)},
|
|
{"required_fields", table_schema.first},
|
|
{"columns_types", table_schema.second}};
|
|
jobject hashmap_object = JniUtil::convert_to_java_map(env, write_params);
|
|
env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_stmt_write_id,
|
|
hashmap_object);
|
|
env->DeleteLocalRef(hashmap_object);
|
|
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
|
|
*num_rows_sent = block->rows();
|
|
return Status::OK();
|
|
}
|
|
|
|
Status JdbcConnector::begin_trans() {
|
|
if (_use_tranaction) {
|
|
JNIEnv* env = nullptr;
|
|
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
|
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_begin_trans_id);
|
|
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
|
|
_is_in_transaction = true;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status JdbcConnector::abort_trans() {
|
|
if (!_is_in_transaction) {
|
|
return Status::InternalError("Abort transaction before begin trans.");
|
|
}
|
|
JNIEnv* env = nullptr;
|
|
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
|
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_abort_trans_id);
|
|
return JniUtil::GetJniExceptionMsg(env);
|
|
}
|
|
|
|
Status JdbcConnector::finish_trans() {
|
|
if (_use_tranaction && _is_in_transaction) {
|
|
JNIEnv* env = nullptr;
|
|
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
|
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_finish_trans_id);
|
|
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
|
|
_is_in_transaction = false;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status JdbcConnector::_register_func_id(JNIEnv* env) {
|
|
auto register_id = [&](jclass clazz, const char* func_name, const char* func_sign,
|
|
jmethodID& func_id) {
|
|
func_id = env->GetMethodID(clazz, func_name, func_sign);
|
|
Status s = JniUtil::GetJniExceptionMsg(env);
|
|
if (!s.ok()) {
|
|
return Status::InternalError(strings::Substitute(
|
|
"Jdbc connector _register_func_id meet error and error is $0", s.to_string()));
|
|
}
|
|
return s;
|
|
};
|
|
|
|
RETURN_IF_ERROR(register_id(_executor_clazz, "<init>", JDBC_EXECUTOR_CTOR_SIGNATURE,
|
|
_executor_ctor_id));
|
|
RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_STMT_WRITE_SIGNATURE,
|
|
_executor_stmt_write_id));
|
|
RETURN_IF_ERROR(register_id(_executor_clazz, "read", "()I", _executor_read_id));
|
|
RETURN_IF_ERROR(register_id(_executor_clazz, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE,
|
|
_executor_close_id));
|
|
RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE,
|
|
_executor_has_next_id));
|
|
RETURN_IF_ERROR(register_id(_executor_clazz, "getBlockAddress", "(ILjava/util/Map;)J",
|
|
_executor_get_block_address_id));
|
|
RETURN_IF_ERROR(
|
|
register_id(_executor_clazz, "getCurBlockRows", "()I", _executor_block_rows_id));
|
|
RETURN_IF_ERROR(register_id(_executor_list_clazz, "get", "(I)Ljava/lang/Object;",
|
|
_executor_get_list_id));
|
|
RETURN_IF_ERROR(register_id(_executor_string_clazz, "getBytes", "(Ljava/lang/String;)[B",
|
|
_get_bytes_id));
|
|
RETURN_IF_ERROR(
|
|
register_id(_executor_object_clazz, "toString", "()Ljava/lang/String;", _to_string_id));
|
|
|
|
RETURN_IF_ERROR(register_id(_executor_clazz, "openTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE,
|
|
_executor_begin_trans_id));
|
|
RETURN_IF_ERROR(register_id(_executor_clazz, "commitTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE,
|
|
_executor_finish_trans_id));
|
|
RETURN_IF_ERROR(register_id(_executor_clazz, "rollbackTrans",
|
|
JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id));
|
|
RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames",
|
|
JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id));
|
|
return Status::OK();
|
|
}
|
|
|
|
Status JdbcConnector::_check_column_type() {
|
|
SCOPED_RAW_TIMER(&_jdbc_statistic._check_type_timer);
|
|
JNIEnv* env = nullptr;
|
|
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
|
|
jobject type_lists =
|
|
env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz, _executor_get_types_id);
|
|
auto column_size = _tuple_desc->slots().size();
|
|
for (int column_index = 0, materialized_column_index = 0; column_index < column_size;
|
|
++column_index) {
|
|
auto slot_desc = _tuple_desc->slots()[column_index];
|
|
if (!slot_desc->is_materialized()) {
|
|
continue;
|
|
}
|
|
jobject column_type =
|
|
env->CallObjectMethod(type_lists, _executor_get_list_id, materialized_column_index);
|
|
|
|
const std::string& type_str = _jobject_to_string(env, column_type);
|
|
RETURN_IF_ERROR(_check_type(slot_desc, type_str, column_index));
|
|
env->DeleteLocalRef(column_type);
|
|
materialized_column_index++;
|
|
}
|
|
env->DeleteLocalRef(type_lists);
|
|
return JniUtil::GetJniExceptionMsg(env);
|
|
}
|
|
|
|
/* type mapping: https://doris.apache.org/zh-CN/docs/dev/ecosystem/external-table/jdbc-of-doris?_highlight=jdbc
|
|
|
|
Doris MYSQL PostgreSQL Oracle SQLServer
|
|
|
|
BOOLEAN java.lang.Boolean java.lang.Boolean java.lang.Boolean
|
|
TINYINT java.lang.Integer java.lang.Short
|
|
SMALLINT java.lang.Integer java.lang.Integer java.math.BigDecimal java.lang.Short
|
|
INT java.lang.Integer java.lang.Integer java.math.BigDecimal java.lang.Integer
|
|
BIGINT java.lang.Long java.lang.Long java.lang.Long
|
|
LARGET java.math.BigInteger
|
|
DECIMAL java.math.BigDecimal java.math.BigDecimal java.math.BigDecimal java.math.BigDecimal
|
|
VARCHAR java.lang.String java.lang.String java.lang.String java.lang.String
|
|
DOUBLE java.lang.Double java.lang.Double java.lang.Double java.lang.Double
|
|
FLOAT java.lang.Float java.lang.Float java.lang.Float
|
|
DATE java.sql.Date java.sql.Date java.sql.Date
|
|
DATETIME java.sql.Timestamp java.sql.Timestamp java.sql.Timestamp java.sql.Timestamp
|
|
|
|
NOTE: because oracle always use number(p,s) to create all numerical type, so it's java type maybe java.math.BigDecimal
|
|
*/
|
|
|
|
Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& type_str,
|
|
int column_index) {
|
|
const std::string error_msg = fmt::format(
|
|
"Fail to convert jdbc type of {} to doris type {} on column: {}. You need to "
|
|
"check this column type between external table and doris table.",
|
|
type_str, slot_desc->type().debug_string(), slot_desc->col_name());
|
|
switch (slot_desc->type().type) {
|
|
case TYPE_BOOLEAN: {
|
|
if (type_str != "java.lang.Boolean" && type_str != "java.lang.Byte" &&
|
|
type_str != "java.lang.Integer") {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_TINYINT:
|
|
case TYPE_SMALLINT:
|
|
case TYPE_INT: {
|
|
if (type_str != "java.lang.Short" && type_str != "java.lang.Integer" &&
|
|
type_str != "java.math.BigDecimal" && type_str != "java.lang.Byte" &&
|
|
type_str != "com.clickhouse.data.value.UnsignedByte" &&
|
|
type_str != "com.clickhouse.data.value.UnsignedShort" && type_str != "java.lang.Long") {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_BIGINT:
|
|
case TYPE_LARGEINT: {
|
|
if (type_str != "java.lang.Long" && type_str != "java.math.BigDecimal" &&
|
|
type_str != "java.math.BigInteger" && type_str != "java.lang.String" &&
|
|
type_str != "com.clickhouse.data.value.UnsignedInteger" &&
|
|
type_str != "com.clickhouse.data.value.UnsignedLong") {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_FLOAT: {
|
|
if (type_str != "java.lang.Float" && type_str != "java.math.BigDecimal") {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_DOUBLE: {
|
|
if (type_str != "java.lang.Double" && type_str != "java.math.BigDecimal") {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_CHAR:
|
|
case TYPE_VARCHAR:
|
|
case TYPE_STRING: {
|
|
//now here break directly
|
|
break;
|
|
}
|
|
case TYPE_DATE:
|
|
case TYPE_DATEV2:
|
|
case TYPE_TIMEV2:
|
|
case TYPE_DATETIME:
|
|
case TYPE_DATETIMEV2: {
|
|
if (type_str != "java.sql.Timestamp" && type_str != "java.time.LocalDateTime" &&
|
|
type_str != "java.sql.Date" && type_str != "java.time.LocalDate" &&
|
|
type_str != "oracle.sql.TIMESTAMP" && type_str != "java.time.OffsetDateTime") {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_DECIMALV2:
|
|
case TYPE_DECIMAL32:
|
|
case TYPE_DECIMAL64:
|
|
case TYPE_DECIMAL128I:
|
|
case TYPE_DECIMAL256: {
|
|
if (type_str != "java.math.BigDecimal") {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_ARRAY: {
|
|
if (type_str != "java.sql.Array" && type_str != "java.lang.String" &&
|
|
type_str != "java.lang.Object") {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
break;
|
|
}
|
|
case TYPE_JSONB: {
|
|
if (type_str != "java.lang.String" && type_str != "org.postgresql.util.PGobject") {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
|
|
_map_column_idx_to_cast_idx_json[column_index] = _input_json_string_types.size();
|
|
if (slot_desc->is_nullable()) {
|
|
_input_json_string_types.push_back(make_nullable(std::make_shared<DataTypeString>()));
|
|
} else {
|
|
_input_json_string_types.push_back(std::make_shared<DataTypeString>());
|
|
}
|
|
str_json_cols.push_back(
|
|
_input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]]
|
|
->create_column());
|
|
break;
|
|
}
|
|
case TYPE_HLL: {
|
|
if (type_str != "java.lang.String") {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
|
|
_map_column_idx_to_cast_idx_hll[column_index] = _input_hll_string_types.size();
|
|
if (slot_desc->is_nullable()) {
|
|
_input_hll_string_types.push_back(make_nullable(std::make_shared<DataTypeString>()));
|
|
} else {
|
|
_input_hll_string_types.push_back(std::make_shared<DataTypeString>());
|
|
}
|
|
|
|
str_hll_cols.push_back(
|
|
_input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]]
|
|
->create_column());
|
|
break;
|
|
}
|
|
case TYPE_OBJECT: {
|
|
if (type_str != "java.lang.String") {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
|
|
_map_column_idx_to_cast_idx_bitmap[column_index] = _input_bitmap_string_types.size();
|
|
if (slot_desc->is_nullable()) {
|
|
_input_bitmap_string_types.push_back(make_nullable(std::make_shared<DataTypeString>()));
|
|
} else {
|
|
_input_bitmap_string_types.push_back(std::make_shared<DataTypeString>());
|
|
}
|
|
|
|
str_bitmap_cols.push_back(
|
|
_input_bitmap_string_types[_map_column_idx_to_cast_idx_bitmap[column_index]]
|
|
->create_column());
|
|
break;
|
|
}
|
|
default: {
|
|
return Status::InternalError(error_msg);
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) {
|
|
jobject jstr = env->CallObjectMethod(jobj, _to_string_id);
|
|
auto coding = env->NewStringUTF("UTF-8");
|
|
const jbyteArray stringJbytes = (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, coding);
|
|
size_t length = (size_t)env->GetArrayLength(stringJbytes);
|
|
jbyte* pBytes = env->GetByteArrayElements(stringJbytes, nullptr);
|
|
std::string str = std::string((char*)pBytes, length);
|
|
env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT);
|
|
env->DeleteLocalRef(stringJbytes);
|
|
env->DeleteLocalRef(jstr);
|
|
env->DeleteLocalRef(coding);
|
|
return str;
|
|
}
|
|
|
|
jobject JdbcConnector::_get_reader_params(Block* block, JNIEnv* env, size_t column_size) {
|
|
std::ostringstream columns_nullable;
|
|
std::ostringstream columns_replace_string;
|
|
std::ostringstream required_fields;
|
|
std::ostringstream columns_types;
|
|
|
|
for (int i = 0; i < column_size; ++i) {
|
|
auto* slot = _tuple_desc->slots()[i];
|
|
if (slot->is_materialized()) {
|
|
auto type = slot->type();
|
|
// Record if column is nullable
|
|
columns_nullable << (slot->is_nullable() ? "true" : "false") << ",";
|
|
// Check column type and replace accordingly
|
|
std::string replace_type = "not_replace";
|
|
if (type.is_bitmap_type()) {
|
|
replace_type = "bitmap";
|
|
} else if (type.is_hll_type()) {
|
|
replace_type = "hll";
|
|
} else if (type.is_json_type()) {
|
|
replace_type = "jsonb";
|
|
}
|
|
columns_replace_string << replace_type << ",";
|
|
if (replace_type != "not_replace") {
|
|
block->get_by_position(i).column = std::make_shared<DataTypeString>()
|
|
->create_column()
|
|
->convert_to_full_column_if_const();
|
|
block->get_by_position(i).type = std::make_shared<DataTypeString>();
|
|
if (slot->is_nullable()) {
|
|
block->get_by_position(i).column =
|
|
make_nullable(block->get_by_position(i).column);
|
|
block->get_by_position(i).type = make_nullable(block->get_by_position(i).type);
|
|
}
|
|
}
|
|
}
|
|
// Record required fields and column types
|
|
std::string field = slot->col_name();
|
|
std::string jni_type;
|
|
if (slot->type().is_bitmap_type() || slot->type().is_hll_type() ||
|
|
slot->type().is_json_type()) {
|
|
jni_type = "string";
|
|
} else {
|
|
jni_type = JniConnector::get_jni_type(slot->type());
|
|
}
|
|
required_fields << (i != 0 ? "," : "") << field;
|
|
columns_types << (i != 0 ? "#" : "") << jni_type;
|
|
}
|
|
|
|
std::map<String, String> reader_params = {{"is_nullable", columns_nullable.str()},
|
|
{"replace_string", columns_replace_string.str()},
|
|
{"required_fields", required_fields.str()},
|
|
{"columns_types", columns_types.str()}};
|
|
return JniUtil::convert_to_java_map(env, reader_params);
|
|
}
|
|
|
|
Status JdbcConnector::_cast_string_to_special(Block* block, JNIEnv* env, size_t column_size) {
|
|
for (size_t column_index = 0; column_index < column_size; ++column_index) {
|
|
auto* slot_desc = _tuple_desc->slots()[column_index];
|
|
// because the fe planner filter the non_materialize column
|
|
if (!slot_desc->is_materialized()) {
|
|
continue;
|
|
}
|
|
jint num_rows = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz,
|
|
_executor_block_rows_id);
|
|
|
|
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
|
|
|
|
if (slot_desc->type().is_hll_type()) {
|
|
static_cast<void>(_cast_string_to_hll(slot_desc, block, column_index, num_rows));
|
|
} else if (slot_desc->type().is_json_type()) {
|
|
static_cast<void>(_cast_string_to_json(slot_desc, block, column_index, num_rows));
|
|
} else if (slot_desc->type().is_bitmap_type()) {
|
|
static_cast<void>(_cast_string_to_bitmap(slot_desc, block, column_index, num_rows));
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status JdbcConnector::_cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block,
|
|
int column_index, int rows) {
|
|
DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
|
|
std::string _target_data_type_name = _target_data_type->get_name();
|
|
DataTypePtr _cast_param_data_type = _target_data_type;
|
|
ColumnPtr _cast_param = _cast_param_data_type->create_column_const_with_default_value(1);
|
|
|
|
auto& input_col = block->get_by_position(column_index).column;
|
|
|
|
ColumnsWithTypeAndName argument_template;
|
|
argument_template.reserve(2);
|
|
argument_template.emplace_back(
|
|
std::move(input_col),
|
|
_input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]],
|
|
"java.sql.String");
|
|
argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name);
|
|
FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
|
|
"CAST", argument_template, make_nullable(_target_data_type));
|
|
|
|
Block cast_block(argument_template);
|
|
int result_idx = cast_block.columns();
|
|
cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"});
|
|
static_cast<void>(func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows));
|
|
|
|
auto res_col = cast_block.get_by_position(result_idx).column;
|
|
block->get_by_position(column_index).type = _target_data_type;
|
|
if (_target_data_type->is_nullable()) {
|
|
block->replace_by_position(column_index, res_col);
|
|
} else {
|
|
auto nested_ptr = reinterpret_cast<const vectorized::ColumnNullable*>(res_col.get())
|
|
->get_nested_column_ptr();
|
|
block->replace_by_position(column_index, nested_ptr);
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status JdbcConnector::_cast_string_to_bitmap(const SlotDescriptor* slot_desc, Block* block,
|
|
int column_index, int rows) {
|
|
DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
|
|
std::string _target_data_type_name = _target_data_type->get_name();
|
|
DataTypePtr _cast_param_data_type = _target_data_type;
|
|
ColumnPtr _cast_param = _cast_param_data_type->create_column_const_with_default_value(1);
|
|
|
|
auto& input_col = block->get_by_position(column_index).column;
|
|
|
|
ColumnsWithTypeAndName argument_template;
|
|
argument_template.reserve(2);
|
|
argument_template.emplace_back(
|
|
std::move(input_col),
|
|
_input_bitmap_string_types[_map_column_idx_to_cast_idx_bitmap[column_index]],
|
|
"java.sql.String");
|
|
argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name);
|
|
FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
|
|
"CAST", argument_template, make_nullable(_target_data_type));
|
|
|
|
Block cast_block(argument_template);
|
|
int result_idx = cast_block.columns();
|
|
cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"});
|
|
static_cast<void>(func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows));
|
|
|
|
auto res_col = cast_block.get_by_position(result_idx).column;
|
|
block->get_by_position(column_index).type = _target_data_type;
|
|
if (_target_data_type->is_nullable()) {
|
|
block->replace_by_position(column_index, res_col);
|
|
} else {
|
|
auto nested_ptr = reinterpret_cast<const vectorized::ColumnNullable*>(res_col.get())
|
|
->get_nested_column_ptr();
|
|
block->replace_by_position(column_index, nested_ptr);
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status JdbcConnector::_cast_string_to_json(const SlotDescriptor* slot_desc, Block* block,
|
|
int column_index, int rows) {
|
|
DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
|
|
std::string _target_data_type_name = _target_data_type->get_name();
|
|
DataTypePtr _cast_param_data_type = _target_data_type;
|
|
ColumnPtr _cast_param = _cast_param_data_type->create_column_const(1, "{}");
|
|
|
|
auto& input_col = block->get_by_position(column_index).column;
|
|
|
|
ColumnsWithTypeAndName argument_template;
|
|
argument_template.reserve(2);
|
|
argument_template.emplace_back(
|
|
std::move(input_col),
|
|
_input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]],
|
|
"java.sql.String");
|
|
argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name);
|
|
FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
|
|
"CAST", argument_template, make_nullable(_target_data_type));
|
|
|
|
Block cast_block(argument_template);
|
|
int result_idx = cast_block.columns();
|
|
cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"});
|
|
static_cast<void>(func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows));
|
|
|
|
auto res_col = cast_block.get_by_position(result_idx).column;
|
|
block->get_by_position(column_index).type = _target_data_type;
|
|
if (_target_data_type->is_nullable()) {
|
|
block->replace_by_position(column_index, res_col);
|
|
} else {
|
|
auto nested_ptr = reinterpret_cast<const vectorized::ColumnNullable*>(res_col.get())
|
|
->get_nested_column_ptr();
|
|
block->replace_by_position(column_index, nested_ptr);
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
} // namespace doris::vectorized
|