From 85c7c531f1561546fe9cbd4de3a885b145c98dac Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Fri, 30 Dec 2022 00:29:08 +0800 Subject: [PATCH] [vectorized](jdbc) support array type in jdbc external table (#15303) --- be/src/exec/table_connector.cpp | 291 ++++++++++-------- be/src/exec/table_connector.h | 7 +- be/src/vec/core/column_with_type_and_name.cpp | 2 +- be/src/vec/exec/scan/new_jdbc_scanner.cpp | 2 +- be/src/vec/exec/vjdbc_connector.cpp | 179 ++++++++++- be/src/vec/exec/vjdbc_connector.h | 32 +- be/src/vec/sink/vjdbc_table_sink.cpp | 4 +- be/src/vec/sink/vjdbc_table_sink.h | 2 - .../org/apache/doris/analysis/ColumnDef.java | 2 +- .../doris/analysis/CreateTableStmt.java | 2 +- .../org/apache/doris/udf/JdbcExecutor.java | 16 + 11 files changed, 385 insertions(+), 154 deletions(-) diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp index e342f9abd6..b27d949aea 100644 --- a/be/src/exec/table_connector.cpp +++ b/be/src/exec/table_connector.cpp @@ -17,12 +17,18 @@ #include "exec/table_connector.h" +#include + #include #include "exprs/expr.h" +#include "runtime/define_primitive_type.h" #include "runtime/primitive_type.h" #include "util/mysql_global.h" +#include "vec/columns/column_array.h" #include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_array.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" @@ -48,26 +54,13 @@ std::u16string TableConnector::utf8_to_u16string(const char* first, const char* Status TableConnector::append(const std::string& table_name, vectorized::Block* block, const std::vector& output_vexpr_ctxs, uint32_t start_send_row, uint32_t* num_rows_sent, - bool need_extra_convert) { + TOdbcTableType::type table_type) { _insert_stmt_buffer.clear(); std::u16string insert_stmt; { SCOPED_TIMER(_convert_tuple_timer); fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name); - auto extra_convert_func = [&](const std::string_view& str, const bool& is_date) -> void { - if (!need_extra_convert) { - fmt::format_to(_insert_stmt_buffer, "'{}'", str); - } else { - if (is_date) { - fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd')", str); - } else { - fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd hh24:mi:ss')", - str); - } - } - }; - int num_rows = block->rows(); int num_columns = block->columns(); for (int i = start_send_row; i < num_rows; ++i) { @@ -80,117 +73,8 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block* } auto& column_ptr = block->get_by_position(j).column; auto& type_ptr = block->get_by_position(j).type; - vectorized::ColumnPtr column; - if (type_ptr->is_nullable()) { - column = assert_cast(*column_ptr) - .get_nested_column_ptr(); - if (column_ptr->is_null_at(i)) { - fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); - continue; - } - } else { - column = column_ptr; - } - auto [item, size] = column->get_data_at(i); - switch (output_vexpr_ctxs[j]->root()->type().type) { - case TYPE_BOOLEAN: - case TYPE_TINYINT: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_SMALLINT: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_INT: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_BIGINT: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_FLOAT: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_DOUBLE: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_DATE: { - vectorized::VecDateTimeValue value = - binary_cast( - *(int64_t*)item); - - char buf[64]; - char* pos = value.to_string(buf); - std::string_view str(buf, pos - buf - 1); - extra_convert_func(str, true); - break; - } - case TYPE_DATETIME: { - vectorized::VecDateTimeValue value = - binary_cast( - *(int64_t*)item); - - char buf[64]; - char* pos = value.to_string(buf); - std::string_view str(buf, pos - buf - 1); - extra_convert_func(str, false); - break; - } - case TYPE_DATEV2: { - vectorized::DateV2Value value = binary_cast< - uint32_t, doris::vectorized::DateV2Value>( - *(int32_t*)item); - - char buf[64]; - char* pos = value.to_string(buf); - std::string str(buf, pos - buf - 1); - extra_convert_func(str, true); - break; - } - case TYPE_DATETIMEV2: { - vectorized::DateV2Value value = binary_cast< - uint64_t, - doris::vectorized::DateV2Value>( - *(int64_t*)item); - - char buf[64]; - char* pos = value.to_string(buf, output_vexpr_ctxs[i]->root()->type().scale); - std::string str(buf, pos - buf - 1); - extra_convert_func(str, false); - break; - } - case TYPE_VARCHAR: - case TYPE_CHAR: - case TYPE_STRING: { - fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size)); - break; - } - case TYPE_DECIMALV2: { - DecimalV2Value value = *(DecimalV2Value*)(item); - fmt::format_to(_insert_stmt_buffer, "{}", value.to_string()); - break; - } - case TYPE_DECIMAL32: - case TYPE_DECIMAL64: - case TYPE_DECIMAL128I: { - auto val = type_ptr->to_string(*column, i); - fmt::format_to(_insert_stmt_buffer, "{}", val); - break; - } - case TYPE_LARGEINT: { - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - } - default: { - return Status::InternalError("can't convert this type to mysql type. type = {}", - output_vexpr_ctxs[j]->root()->type().type); - } - } + RETURN_IF_ERROR(convert_column_data( + column_ptr, type_ptr, output_vexpr_ctxs[j]->root()->type(), i, table_type)); } if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) { @@ -210,4 +94,161 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block* return Status::OK(); } +Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_ptr, + const vectorized::DataTypePtr& type_ptr, + const TypeDescriptor& type, int row, + TOdbcTableType::type table_type) { + auto extra_convert_func = [&](const std::string_view& str, const bool& is_date) -> void { + if (table_type != TOdbcTableType::ORACLE) { + fmt::format_to(_insert_stmt_buffer, "\"{}\"", str); + } else { + //if is ORACLE and date type, insert into need convert + if (is_date) { + fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd')", str); + } else { + fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd hh24:mi:ss')", str); + } + } + }; + const vectorized::IColumn* column = column_ptr; + if (type_ptr->is_nullable()) { + auto nullable_column = assert_cast(column_ptr.get()); + if (nullable_column->is_null_at(row)) { + fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); + return Status::OK(); + } + column = nullable_column->get_nested_column_ptr().get(); + } else { + column = column_ptr; + } + auto [item, size] = column->get_data_at(row); + switch (type.type) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast(item)); + break; + case TYPE_SMALLINT: + fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast(item)); + break; + case TYPE_INT: + fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast(item)); + break; + case TYPE_BIGINT: + fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast(item)); + break; + case TYPE_FLOAT: + fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast(item)); + break; + case TYPE_DOUBLE: + fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast(item)); + break; + case TYPE_DATE: { + vectorized::VecDateTimeValue value = + binary_cast(*(int64_t*)item); + + char buf[64]; + char* pos = value.to_string(buf); + std::string_view str(buf, pos - buf - 1); + extra_convert_func(str, true); + break; + } + case TYPE_DATETIME: { + vectorized::VecDateTimeValue value = + binary_cast(*(int64_t*)item); + + char buf[64]; + char* pos = value.to_string(buf); + std::string_view str(buf, pos - buf - 1); + extra_convert_func(str, false); + break; + } + case TYPE_DATEV2: { + vectorized::DateV2Value value = + binary_cast>( + *(int32_t*)item); + + char buf[64]; + char* pos = value.to_string(buf); + std::string str(buf, pos - buf - 1); + extra_convert_func(str, true); + break; + } + case TYPE_DATETIMEV2: { + vectorized::DateV2Value value = + binary_cast>( + *(int64_t*)item); + + char buf[64]; + char* pos = value.to_string(buf, type.scale); + std::string str(buf, pos - buf - 1); + extra_convert_func(str, false); + break; + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + // here need check the ' is used, now for pg array string must be " + fmt::format_to(_insert_stmt_buffer, "\"{}\"", fmt::basic_string_view(item, size)); + break; + } + case TYPE_ARRAY: { + auto& arr_nested = reinterpret_cast(column)->get_data_ptr(); + auto& arr_offset = reinterpret_cast(column)->get_offsets(); + auto array_type = remove_nullable(type_ptr); + auto nested_type = + reinterpret_cast(*array_type).get_nested_type(); + + //for doris、CK insert into ---> [] + //for PG insert into ---> '{}' + if (table_type == TOdbcTableType::POSTGRESQL) { + fmt::format_to(_insert_stmt_buffer, "{}", "'{"); + } else if (table_type == TOdbcTableType::CLICKHOUSE || + table_type == TOdbcTableType::MYSQL) { + fmt::format_to(_insert_stmt_buffer, "{}", "["); + } + bool first_value = true; + for (auto idx = arr_offset[row - 1]; idx < arr_offset[row]; ++idx) { + if (first_value == false) { + fmt::format_to(_insert_stmt_buffer, "{}", ", "); + } + if (arr_nested->is_null_at(idx)) { + fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); + } else { + RETURN_IF_ERROR(convert_column_data(arr_nested, nested_type, type.children[0], idx, + table_type)); + } + first_value = false; + } + if (table_type == TOdbcTableType::POSTGRESQL) { + fmt::format_to(_insert_stmt_buffer, "{}", "}'"); + } else if (table_type == TOdbcTableType::CLICKHOUSE || + table_type == TOdbcTableType::MYSQL) { + fmt::format_to(_insert_stmt_buffer, "{}", "]"); + } + break; + } + case TYPE_DECIMALV2: { + DecimalV2Value value = *(DecimalV2Value*)(item); + fmt::format_to(_insert_stmt_buffer, "{}", value.to_string()); + break; + } + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128I: { + auto val = type_ptr->to_string(*column, row); + fmt::format_to(_insert_stmt_buffer, "{}", val); + break; + } + case TYPE_LARGEINT: { + fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast(item)); + break; + } + default: { + return Status::InternalError("can't convert this type to mysql type. type = {}", + type.debug_string()); + } + } + return Status::OK(); +} } // namespace doris diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h index a6077b3227..9f5330a486 100644 --- a/be/src/exec/table_connector.h +++ b/be/src/exec/table_connector.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -54,12 +55,16 @@ public: Status append(const std::string& table_name, vectorized::Block* block, const std::vector& _output_vexpr_ctxs, uint32_t start_send_row, uint32_t* num_rows_sent, - bool need_extra_convert = false); + TOdbcTableType::type table_type = TOdbcTableType::MYSQL); void init_profile(RuntimeProfile*); std::u16string utf8_to_u16string(const char* first, const char* last); + Status convert_column_data(const vectorized::ColumnPtr& column_ptr, + const vectorized::DataTypePtr& type_ptr, const TypeDescriptor& type, + int row, TOdbcTableType::type table_type); + virtual Status close() { return Status::OK(); } protected: diff --git a/be/src/vec/core/column_with_type_and_name.cpp b/be/src/vec/core/column_with_type_and_name.cpp index e141efe8f3..e196935bff 100644 --- a/be/src/vec/core/column_with_type_and_name.cpp +++ b/be/src/vec/core/column_with_type_and_name.cpp @@ -50,7 +50,7 @@ void ColumnWithTypeAndName::dump_structure(std::ostream& out) const { out << name; if (type) - out << " "; + out << " " << type->get_name(); else out << " nullptr"; diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index cbb9588bbd..edfb843733 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -125,7 +125,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* } } - RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, columns, state->batch_size())); + RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, columns, block, state->batch_size())); if (_jdbc_eos == true) { if (block->rows() == 0) { diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 9aed3c20b8..ff6e5ecd4b 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -25,8 +25,11 @@ #include "runtime/define_primitive_type.h" #include "runtime/user_function_cache.h" #include "util/jni-util.h" +#include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" -#include "vec/exprs/vexpr.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/data_type_string.h" +#include "vec/functions/simple_function_factory.h" namespace doris { namespace vectorized { @@ -36,6 +39,8 @@ const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I"; const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z"; const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;"; const char* JDBC_EXECUTOR_GET_TYPES_SIGNATURE = "()Ljava/util/List;"; +const char* JDBC_EXECUTOR_GET_ARR_LIST_SIGNATURE = "(Ljava/lang/Object;)Ljava/util/List;"; +const char* JDBC_EXECUTOR_GET_ARR_TYPE_SIGNATURE = "()I"; const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V"; const char* JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE = "(Ljava/lang/Object;)J"; const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;)J"; @@ -195,7 +200,7 @@ Status JdbcConnector::_check_column_type() { env->CallObjectMethod(type_lists, _executor_get_list_id, materialized_column_index); const std::string& type_str = _jobject_to_string(env, column_type); - RETURN_IF_ERROR(_check_type(slot_desc, type_str)); + RETURN_IF_ERROR(_check_type(slot_desc, type_str, column_index)); env->DeleteLocalRef(column_type); materialized_column_index++; } @@ -222,7 +227,8 @@ DATETIME java.sql.Timestamp java.sql.Timestamp java.sql.Tim NOTE: because oracle always use number(p,s) to create all numerical type, so it's java type maybe java.math.BigDecimal */ -Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& type_str) { +Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& type_str, + int column_index) { const std::string error_msg = fmt::format( "Fail to convert jdbc type of {} to doris type {} on column: {}. You need to " "check this column type between external table and doris table.", @@ -289,6 +295,30 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& } break; } + case TYPE_ARRAY: { + if (type_str != "java.sql.Array" && type_str != "java.lang.String") { + return Status::InternalError(error_msg); + } + if (!slot_desc->type().children[0].children.empty()) { + return Status::InternalError("Now doris not support nested array type in array {}.", + slot_desc->type().debug_string()); + } + // when type is array, except pd database, others use string cast array + if (_conn_param.table_type != TOdbcTableType::POSTGRESQL) { + _need_cast_array_type = true; + _map_column_idx_to_cast_idx[column_index] = _input_array_string_types.size(); + if (slot_desc->is_nullable()) { + _input_array_string_types.push_back( + make_nullable(std::make_shared())); + } else { + _input_array_string_types.push_back(std::make_shared()); + } + str_array_cols.push_back( + _input_array_string_types[_map_column_idx_to_cast_idx[column_index]] + ->create_column()); + } + break; + } default: { return Status::InternalError(error_msg); } @@ -296,7 +326,8 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& return Status::OK(); } -Status JdbcConnector::get_next(bool* eos, std::vector& columns, int batch_size) { +Status JdbcConnector::get_next(bool* eos, std::vector& columns, Block* block, + int batch_size) { if (!_is_open) { return Status::InternalError("get_next before open of jdbc connector."); } @@ -322,17 +353,23 @@ Status JdbcConnector::get_next(bool* eos, std::vector& columns if (!slot_desc->is_materialized()) { continue; } + const std::string& column_name = slot_desc->col_name(); jobject column_data = env->CallObjectMethod(block_obj, _executor_get_list_id, materialized_column_index); jint num_rows = env->CallIntMethod(column_data, _executor_get_list_size_id); for (int row = 0; row < num_rows; ++row) { jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row); - _convert_column_data(env, cur_data, slot_desc, columns[column_index].get()); + RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc, + columns[column_index].get(), column_index, + column_name)); env->DeleteLocalRef(cur_data); } env->DeleteLocalRef(column_data); - + //here need to cast string to array type + if (_need_cast_array_type && slot_desc->type().is_array_type()) { + _cast_string_to_array(slot_desc, block, column_index, num_rows); + } materialized_column_index++; } // All Java objects returned by JNI functions are local references. @@ -384,25 +421,43 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames", JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "getArrayColumnData", + JDBC_EXECUTOR_GET_ARR_LIST_SIGNATURE, _executor_get_arr_list_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "getBaseTypeInt", + JDBC_EXECUTOR_GET_ARR_TYPE_SIGNATURE, _executor_get_arr_type_id)); + return Status::OK(); } Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc, - vectorized::IColumn* column_ptr) { + vectorized::IColumn* column_ptr, int column_index, + std::string_view column_name) { vectorized::IColumn* col_ptr = column_ptr; if (true == slot_desc->is_nullable()) { auto* nullable_column = reinterpret_cast(column_ptr); if (jobj == nullptr) { nullable_column->insert_data(nullptr, 0); + if (_need_cast_array_type && slot_desc->type().type == TYPE_ARRAY) { + reinterpret_cast( + str_array_cols[_map_column_idx_to_cast_idx[column_index]].get()) + ->insert_data(nullptr, 0); + } return Status::OK(); } else { nullable_column->get_null_map_data().push_back(0); col_ptr = &nullable_column->get_nested_column(); } } + RETURN_IF_ERROR( + _insert_column_data(env, jobj, slot_desc->type(), col_ptr, column_index, column_name)); + return Status::OK(); +} - switch (slot_desc->type().type) { +Status JdbcConnector::_insert_column_data(JNIEnv* env, jobject jobj, const TypeDescriptor& type, + vectorized::IColumn* col_ptr, int column_index, + std::string_view column_name) { + switch (type.type) { #define M(TYPE, CPP_TYPE, COLUMN_TYPE) \ case TYPE: { \ CPP_TYPE num = _jobject_to_##CPP_TYPE(env, jobj); \ @@ -467,8 +522,7 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj, std::string data = _jobject_to_string(env, jobj); StringParser::ParseResult result = StringParser::PARSE_SUCCESS; const Int32 decimal_slot = StringParser::string_to_decimal( - data.c_str(), data.length(), slot_desc->type().precision, slot_desc->type().scale, - &result); + data.c_str(), data.length(), type.precision, type.scale, &result); reinterpret_cast(col_ptr)->insert_data( reinterpret_cast(&decimal_slot), 0); break; @@ -477,8 +531,7 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj, std::string data = _jobject_to_string(env, jobj); StringParser::ParseResult result = StringParser::PARSE_SUCCESS; const Int64 decimal_slot = StringParser::string_to_decimal( - data.c_str(), data.length(), slot_desc->type().precision, slot_desc->type().scale, - &result); + data.c_str(), data.length(), type.precision, type.scale, &result); reinterpret_cast(col_ptr)->insert_data( reinterpret_cast(&decimal_slot), 0); break; @@ -487,22 +540,114 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj, std::string data = _jobject_to_string(env, jobj); StringParser::ParseResult result = StringParser::PARSE_SUCCESS; const Int128 decimal_slot = StringParser::string_to_decimal( - data.c_str(), data.length(), slot_desc->type().precision, slot_desc->type().scale, - &result); + data.c_str(), data.length(), type.precision, type.scale, &result); reinterpret_cast(col_ptr)->insert_data( reinterpret_cast(&decimal_slot), 0); break; } + case TYPE_ARRAY: { + if (_need_cast_array_type) { + // read array data is a big string: [1,2,3], need cast it by self + std::string data = _jobject_to_string(env, jobj); + str_array_cols[_map_column_idx_to_cast_idx[column_index]]->insert_data(data.c_str(), + data.length()); + } else { + //POSTGRESQL read array is object[], so could get data by index + jobject arr_lists = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz, + _executor_get_arr_list_id, jobj); + jint arr_type = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, + _executor_get_arr_type_id); + //here type check is maybe no needed,more checks affect performance + if (_arr_jdbc_map[arr_type] != type.children[0].type) { + const std::string& error_msg = fmt::format( + "Fail to convert jdbc value to array type of {} on column: {}, could check " + "this column type between external table and doris table. {}.{} ", + type.children[0].debug_string(), column_name, _arr_jdbc_map[arr_type], + arr_type); + return Status::InternalError(std::string(error_msg)); + } + jint num_rows = env->CallIntMethod(arr_lists, _executor_get_list_size_id); + RETURN_IF_ERROR(_insert_arr_column_data(env, arr_lists, type.children[0], num_rows, + col_ptr, column_index, column_name)); + env->DeleteLocalRef(arr_lists); + } + break; + } default: { - std::string error_msg = - fmt::format("Fail to convert jdbc value to {} on column: {}.", - slot_desc->type().debug_string(), slot_desc->col_name()); + const std::string& error_msg = fmt::format( + "Fail to convert jdbc value to {} on column: {}, could check this column type " + "between external table and doris table.", + type.debug_string(), column_name); return Status::InternalError(std::string(error_msg)); } } return Status::OK(); } +Status JdbcConnector::_insert_arr_column_data(JNIEnv* env, jobject arr_lists, + const TypeDescriptor& type, int nums, + vectorized::IColumn* arr_column_ptr, int column_index, + std::string_view column_name) { + auto& arr_nested = reinterpret_cast(arr_column_ptr)->get_data(); + vectorized::IColumn* col_ptr = + reinterpret_cast(arr_nested).get_nested_column_ptr(); + auto& nullmap_data = + reinterpret_cast(arr_nested).get_null_map_data(); + for (int i = 0; i < nums; ++i) { + jobject cur_data = env->CallObjectMethod(arr_lists, _executor_get_list_id, i); + if (cur_data == nullptr) { + arr_nested.insert_default(); + continue; + } else { + nullmap_data.push_back(0); + } + RETURN_IF_ERROR( + _insert_column_data(env, cur_data, type, col_ptr, column_index, column_name)); + env->DeleteLocalRef(cur_data); + } + auto old_size = + reinterpret_cast(arr_column_ptr)->get_offsets().back(); + reinterpret_cast(arr_column_ptr) + ->get_offsets() + .push_back(nums + old_size); + return Status::OK(); +} + +Status JdbcConnector::_cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, + int column_index, int rows) { + DataTypePtr _target_data_type = slot_desc->get_data_type_ptr(); + std::string _target_data_type_name = DataTypeFactory::instance().get(_target_data_type); + DataTypePtr _cast_param_data_type = std::make_shared(); + ColumnPtr _cast_param = _cast_param_data_type->create_column_const(1, _target_data_type_name); + + ColumnsWithTypeAndName argument_template; + argument_template.reserve(2); + argument_template.emplace_back( + std::move(str_array_cols[_map_column_idx_to_cast_idx[column_index]]), + _input_array_string_types[_map_column_idx_to_cast_idx[column_index]], + "java.sql.String"); + argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name); + FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function( + "CAST", argument_template, make_nullable(_target_data_type)); + + Block cast_block(argument_template); + int result_idx = cast_block.columns(); + cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"}); + func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows); + + auto res_col = cast_block.get_by_position(result_idx).column; + if (_target_data_type->is_nullable()) { + block->replace_by_position(column_index, res_col); + } else { + auto nested_ptr = reinterpret_cast(res_col.get()) + ->get_nested_column_ptr(); + block->replace_by_position(column_index, nested_ptr); + } + str_array_cols[_map_column_idx_to_cast_idx[column_index]] = + _input_array_string_types[_map_column_idx_to_cast_idx[column_index]]->create_column(); + return Status::OK(); +} + Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt, const fmt::memory_buffer& insert_stmt_buffer) { SCOPED_TIMER(_result_send_timer); diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index 84ca17e02c..0d51fd278c 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -19,8 +19,12 @@ #include +#include + #include "common/status.h" #include "exec/table_connector.h" +#include "runtime/define_primitive_type.h" +#include "vec/data_types/data_type.h" namespace doris { namespace vectorized { @@ -51,7 +55,8 @@ public: Status exec_write_sql(const std::u16string& insert_stmt, const fmt::memory_buffer& insert_stmt_buffer) override; - Status get_next(bool* eos, std::vector& columns, int batch_size); + Status get_next(bool* eos, std::vector& columns, Block* block, + int batch_size); // use in JDBC transaction Status begin_trans() override; // should be call after connect and before query or init_to_write @@ -63,14 +68,28 @@ public: private: Status _register_func_id(JNIEnv* env); Status _check_column_type(); - Status _check_type(SlotDescriptor*, const std::string& type_str); + Status _check_type(SlotDescriptor*, const std::string& type_str, int column_index); Status _convert_column_data(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc, - vectorized::IColumn* column_ptr); + vectorized::IColumn* column_ptr, int column_index, + std::string_view column_name); + Status _insert_column_data(JNIEnv* env, jobject jobj, const TypeDescriptor& type, + vectorized::IColumn* column_ptr, int column_index, + std::string_view column_name); + Status _insert_arr_column_data(JNIEnv* env, jobject jobj, const TypeDescriptor& type, int nums, + vectorized::IColumn* column_ptr, int column_index, + std::string_view column_name); std::string _jobject_to_string(JNIEnv* env, jobject jobj); int64_t _jobject_to_date(JNIEnv* env, jobject jobj); int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj); + Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index, + int rows); const JdbcConnectorParam& _conn_param; + //java.sql.Types: https://docs.oracle.com/javase/7/docs/api/constant-values.html#java.sql.Types.INTEGER + std::map _arr_jdbc_map { + {-7, TYPE_BOOLEAN}, {-6, TYPE_TINYINT}, {5, TYPE_SMALLINT}, {4, TYPE_INT}, + {-5, TYPE_BIGINT}, {12, TYPE_STRING}, {7, TYPE_FLOAT}, {8, TYPE_DOUBLE}, + {91, TYPE_DATE}, {93, TYPE_DATETIME}, {2, TYPE_DECIMALV2}}; bool _closed; jclass _executor_clazz; jclass _executor_list_clazz; @@ -83,6 +102,8 @@ private: jmethodID _executor_has_next_id; jmethodID _executor_get_blocks_id; jmethodID _executor_get_types_id; + jmethodID _executor_get_arr_list_id; + jmethodID _executor_get_arr_type_id; jmethodID _executor_close_id; jmethodID _executor_get_list_id; jmethodID _executor_get_list_size_id; @@ -93,6 +114,11 @@ private: jmethodID _executor_begin_trans_id; jmethodID _executor_finish_trans_id; jmethodID _executor_abort_trans_id; + bool _need_cast_array_type; + std::map _map_column_idx_to_cast_idx; + std::vector _input_array_string_types; + std::vector + str_array_cols; // for array type to save data like big string [1,2,3] #define FUNC_VARI_DECLARE(RETURN_TYPE) \ RETURN_TYPE _jobject_to_##RETURN_TYPE(JNIEnv* env, jobject jobj); \ diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp b/be/src/vec/sink/vjdbc_table_sink.cpp index 0007a8b9b7..973342da73 100644 --- a/be/src/vec/sink/vjdbc_table_sink.cpp +++ b/be/src/vec/sink/vjdbc_table_sink.cpp @@ -44,9 +44,9 @@ Status VJdbcTableSink::init(const TDataSink& t_sink) { _jdbc_param.driver_path = t_jdbc_sink.jdbc_table.jdbc_driver_url; _jdbc_param.driver_checksum = t_jdbc_sink.jdbc_table.jdbc_driver_checksum; _jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name; + _jdbc_param.table_type = t_jdbc_sink.table_type; _table_name = t_jdbc_sink.jdbc_table.jdbc_table_name; _use_transaction = t_jdbc_sink.use_transaction; - _need_extra_convert = (t_jdbc_sink.table_type == TOdbcTableType::ORACLE); return Status::OK(); } @@ -81,7 +81,7 @@ Status VJdbcTableSink::send(RuntimeState* state, Block* block, bool eos) { uint32_t num_row_sent = 0; while (start_send_row < output_block.rows()) { RETURN_IF_ERROR(_writer->append(_table_name, &output_block, _output_vexpr_ctxs, - start_send_row, &num_row_sent, _need_extra_convert)); + start_send_row, &num_row_sent, _jdbc_param.table_type)); start_send_row += num_row_sent; num_row_sent = 0; } diff --git a/be/src/vec/sink/vjdbc_table_sink.h b/be/src/vec/sink/vjdbc_table_sink.h index a3de3ff62b..c313cc8214 100644 --- a/be/src/vec/sink/vjdbc_table_sink.h +++ b/be/src/vec/sink/vjdbc_table_sink.h @@ -39,8 +39,6 @@ public: private: JdbcConnectorParam _jdbc_param; std::unique_ptr _writer; - //if is ORACLE and date type, insert into need convert - bool _need_extra_convert = false; }; } // namespace vectorized } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java index 36b556c623..66670db2f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java @@ -266,7 +266,7 @@ public class ColumnDef { defaultValue = DefaultValue.BITMAP_EMPTY_DEFAULT_VALUE; } - if (type.getPrimitiveType() == PrimitiveType.ARRAY) { + if (type.getPrimitiveType() == PrimitiveType.ARRAY && isOlap) { if (isKey()) { throw new AnalysisException("Array can only be used in the non-key column of" + " the duplicate table at present."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 1f43193035..c72b882064 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -390,7 +390,7 @@ public class CreateTableStmt extends DdlStmt { for (ColumnDef columnDef : columnDefs) { columnDef.analyze(engineName.equals("olap")); - if (columnDef.getType().isArrayType()) { + if (columnDef.getType().isArrayType() && engineName.equals("olap")) { if (columnDef.getAggregateType() != null && columnDef.getAggregateType() != AggregateType.NONE) { throw new AnalysisException("Array column can't support aggregation " + columnDef.getAggregateType()); diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index 9539f65aba..d48a679a6c 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -42,6 +42,7 @@ import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public class JdbcExecutor { @@ -54,6 +55,7 @@ public class JdbcExecutor { // Use HikariDataSource to help us manage the JDBC connections. private HikariDataSource dataSource = null; private List resultColumnTypeNames = null; + private int baseTypeInt = 0; public JdbcExecutor(byte[] thriftParams) throws Exception { TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); @@ -124,6 +126,20 @@ public class JdbcExecutor { } } + public List getArrayColumnData(Object object) throws UdfRuntimeException { + try { + java.sql.Array obj = (java.sql.Array) object; + baseTypeInt = obj.getBaseType(); + return Arrays.asList((Object[]) obj.getArray()); + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor getArrayColumnData has error: ", e); + } + } + + public int getBaseTypeInt() { + return baseTypeInt; + } + public void openTrans() throws UdfRuntimeException { try { if (conn != null) {