From 22a8d35999d6cd0806f3de553fe63994246e6bef Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Thu, 15 Sep 2022 11:08:41 +0800 Subject: [PATCH] [Feature](vectorized) support jdbc sink for insert into data to table (#12534) --- be/src/exec/CMakeLists.txt | 1 + be/src/exec/data_sink.cpp | 17 + be/src/exec/odbc_connector.cpp | 252 +-------------- be/src/exec/odbc_connector.h | 67 +--- be/src/exec/table_connector.cpp | 296 ++++++++++++++++++ be/src/exec/table_connector.h | 84 +++++ be/src/runtime/odbc_table_sink.cpp | 7 +- be/src/vec/CMakeLists.txt | 2 + be/src/vec/exec/vjdbc_connector.cpp | 70 ++++- be/src/vec/exec/vjdbc_connector.h | 28 +- be/src/vec/exec/vjdbc_scan_node.cpp | 2 +- be/src/vec/sink/vjdbc_table_sink.cpp | 102 ++++++ be/src/vec/sink/vjdbc_table_sink.h | 46 +++ be/src/vec/sink/vmysql_table_sink.cpp | 40 +-- be/src/vec/sink/vmysql_table_sink.h | 34 +- be/src/vec/sink/vodbc_table_sink.cpp | 49 +-- be/src/vec/sink/vodbc_table_sink.h | 31 +- be/src/vec/sink/vtable_sink.cpp | 74 +++++ be/src/vec/sink/vtable_sink.h | 62 ++++ .../ecosystem/external-table/jdbc-of-doris.md | 22 +- .../ecosystem/external-table/jdbc-of-doris.md | 45 ++- .../apache/doris/analysis/DescribeStmt.java | 8 + .../org/apache/doris/analysis/ExportStmt.java | 1 + .../org/apache/doris/analysis/InsertStmt.java | 4 +- .../apache/doris/catalog/JdbcResource.java | 14 +- .../org/apache/doris/catalog/JdbcTable.java | 129 ++++++-- .../java/org/apache/doris/load/ExportJob.java | 6 + .../org/apache/doris/planner/DataSink.java | 3 + .../doris/planner/DistributedPlanner.java | 2 +- .../apache/doris/planner/JdbcTableSink.java | 102 ++++++ .../org/apache/doris/udf/JdbcExecutor.java | 35 ++- gensrc/thrift/DataSinks.thrift | 7 + 32 files changed, 1151 insertions(+), 491 deletions(-) create mode 100644 be/src/exec/table_connector.cpp create mode 100644 be/src/exec/table_connector.h create mode 100644 be/src/vec/sink/vjdbc_table_sink.cpp create mode 100644 be/src/vec/sink/vjdbc_table_sink.h create mode 100644 be/src/vec/sink/vtable_sink.cpp create mode 100644 be/src/vec/sink/vtable_sink.h create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 79cbe11a90..1c7f2be313 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -96,6 +96,7 @@ set(EXEC_FILES parquet_writer.cpp orc_scanner.cpp odbc_connector.cpp + table_connector.cpp json_scanner.cpp assert_num_rows_node.cpp diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 8c581181ca..cb309bd0d8 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -35,6 +35,7 @@ #include "runtime/result_sink.h" #include "runtime/runtime_state.h" #include "vec/sink/vdata_stream_sender.h" +#include "vec/sink/vjdbc_table_sink.h" #include "vec/sink/vmysql_table_sink.h" #include "vec/sink/vodbc_table_sink.h" #include "vec/sink/vresult_file_sink.h" @@ -165,6 +166,22 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink break; } + case TDataSinkType::JDBC_TABLE_SINK: { + if (!thrift_sink.__isset.jdbc_table_sink) { + return Status::InternalError("Missing data jdbc sink."); + } + if (is_vec) { +#ifdef LIBJVM + sink->reset(new vectorized::VJdbcTableSink(pool, row_desc, output_exprs)); +#else + return Status::InternalError("Jdbc table sink is disabled since no libjvm is found!"); +#endif + } else { + return Status::InternalError("only support jdbc sink in vectorized engine."); + } + break; + } + case TDataSinkType::EXPORT_SINK: { if (!thrift_sink.__isset.export_sink) { return Status::InternalError("Missing export sink sink."); diff --git a/be/src/exec/odbc_connector.cpp b/be/src/exec/odbc_connector.cpp index 169b626726..0ffedf4f9a 100644 --- a/be/src/exec/odbc_connector.cpp +++ b/be/src/exec/odbc_connector.cpp @@ -21,14 +21,8 @@ #include -#include "common/config.h" -#include "exprs/expr.h" #include "runtime/primitive_type.h" -#include "util/mysql_global.h" #include "util/types.h" -#include "vec/core/block.h" -#include "vec/exprs/vexpr.h" -#include "vec/exprs/vexpr_context.h" #define ODBC_DISPOSE(h, ht, x, op) \ { \ @@ -45,22 +39,12 @@ static constexpr uint32_t SMALL_COLUMN_SIZE_BUFFER = 100; // Now we only treat HLL, CHAR, VARCHAR as big column static constexpr uint32_t BIG_COLUMN_SIZE_BUFFER = 65535; -// Default max buffer size use in insert to: 50MB, normally a batch is smaller than the size -static constexpr uint32_t INSERT_BUFFER_SIZE = 1024l * 1024 * 50; - -static std::u16string utf8_to_u16string(const char* first, const char* last) { - std::wstring_convert, char16_t> utf8_utf16_cvt; - return utf8_utf16_cvt.from_bytes(first, last); -} namespace doris { ODBCConnector::ODBCConnector(const ODBCConnectorParam& param) - : _connect_string(param.connect_string), - _sql_str(param.query_string), - _tuple_desc(param.tuple_desc), - _output_expr_ctxs(param.output_expr_ctxs), - _is_open(false), + : TableConnector(param.tuple_desc, param.query_string), + _connect_string(param.connect_string), _field_num(0), _env(nullptr), _dbc(nullptr), @@ -189,18 +173,12 @@ Status ODBCConnector::get_next_row(bool* eos) { return Status::OK(); } -void ODBCConnector::_init_profile(doris::RuntimeProfile* profile) { - _convert_tuple_timer = ADD_TIMER(profile, "TupleConvertTime"); - _result_send_timer = ADD_TIMER(profile, "ResultSendTime"); - _sent_rows_counter = ADD_COUNTER(profile, "NumSentRows", TUnit::UNIT); -} - Status ODBCConnector::init_to_write(doris::RuntimeProfile* profile) { if (!_is_open) { return Status::InternalError("Init before open."); } - _init_profile(profile); + init_profile(profile); // Allocate a statement handle ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLAllocHandle(SQL_HANDLE_STMT, _dbc, &_stmt), "alloc statement"); @@ -208,116 +186,12 @@ Status ODBCConnector::init_to_write(doris::RuntimeProfile* profile) { return Status::OK(); } -Status ODBCConnector::append(const std::string& table_name, RowBatch* batch, - uint32_t start_send_row, uint32* num_rows_sent) { - _insert_stmt_buffer.clear(); - std::u16string insert_stmt; - { - SCOPED_TIMER(_convert_tuple_timer); - fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name); - - int num_rows = batch->num_rows(); - for (int i = start_send_row; i < num_rows; ++i) { - auto row = batch->get_row(i); - (*num_rows_sent)++; - - // Construct insert statement of odbc table - int num_columns = _output_expr_ctxs.size(); - for (int j = 0; j < num_columns; ++j) { - if (j != 0) { - fmt::format_to(_insert_stmt_buffer, "{}", ", "); - } - void* item = _output_expr_ctxs[j]->get_value(row); - if (item == nullptr) { - fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); - continue; - } - switch (_output_expr_ctxs[j]->root()->type().type) { - case TYPE_BOOLEAN: - case TYPE_TINYINT: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_SMALLINT: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_INT: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_BIGINT: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_FLOAT: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_DOUBLE: - fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); - break; - case TYPE_DATE: - case TYPE_DATETIME: { - char buf[64]; - const auto* time_val = (const DateTimeValue*)(item); - time_val->to_string(buf); - fmt::format_to(_insert_stmt_buffer, "'{}'", buf); - break; - } - case TYPE_VARCHAR: - case TYPE_CHAR: - case TYPE_STRING: { - const auto* string_val = (const StringValue*)(item); - - if (string_val->ptr == nullptr) { - if (string_val->len == 0) { - fmt::format_to(_insert_stmt_buffer, "{}", "''"); - } else { - fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); - } - } else { - fmt::format_to(_insert_stmt_buffer, "'{}'", - fmt::basic_string_view(string_val->ptr, string_val->len)); - } - break; - } - case TYPE_DECIMALV2: { - const DecimalV2Value decimal_val( - reinterpret_cast(item)->value); - char buffer[MAX_DECIMAL_WIDTH]; - int output_scale = _output_expr_ctxs[j]->root()->output_scale(); - int len = decimal_val.to_buffer(buffer, output_scale); - _insert_stmt_buffer.append(buffer, buffer + len); - break; - } - case TYPE_LARGEINT: { - fmt::format_to(_insert_stmt_buffer, "{}", - reinterpret_cast(item)->value); - break; - } - default: { - return Status::InternalError("can't convert this type to mysql type. type = {}", - _output_expr_ctxs[j]->root()->type().type); - } - } - } - - if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) { - fmt::format_to(_insert_stmt_buffer, "{}", "),("); - } else { - // batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt - fmt::format_to(_insert_stmt_buffer, "{}", ")"); - break; - } - } - // Translate utf8 string to utf16 to use unicode encodeing - insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(), - _insert_stmt_buffer.data() + _insert_stmt_buffer.size()); - } - - { - SCOPED_TIMER(_result_send_timer); - ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT, - SQLExecDirectW(_stmt, (SQLWCHAR*)(insert_stmt.c_str()), SQL_NTS), - _insert_stmt_buffer.data()); - } - COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent); +Status ODBCConnector::exec_write_sql(const std::u16string& insert_stmt, + const fmt::memory_buffer& insert_stmt_buffer) { + SCOPED_TIMER(_result_send_timer); + ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT, + SQLExecDirectW(_stmt, (SQLWCHAR*)(insert_stmt.c_str()), SQL_NTS), + insert_stmt_buffer.data()); return Status::OK(); } @@ -397,112 +271,4 @@ std::string ODBCConnector::handle_diagnostic_record(SQLHANDLE hHandle, SQLSMALLI return diagnostic_msg; } -Status ODBCConnector::append(const std::string& table_name, vectorized::Block* block, - const std::vector& _output_vexpr_ctxs, - uint32_t start_send_row, uint32_t* num_rows_sent) { - _insert_stmt_buffer.clear(); - std::u16string insert_stmt; - { - SCOPED_TIMER(_convert_tuple_timer); - fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name); - - int num_rows = block->rows(); - int num_columns = block->columns(); - for (int i = start_send_row; i < num_rows; ++i) { - (*num_rows_sent)++; - - // Construct insert statement of odbc table - for (int j = 0; j < num_columns; ++j) { - if (j != 0) { - fmt::format_to(_insert_stmt_buffer, "{}", ", "); - } - auto [item, size] = block->get_by_position(j).column->get_data_at(i); - if (item == nullptr) { - fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); - continue; - } - switch (_output_vexpr_ctxs[j]->root()->type().type) { - case TYPE_BOOLEAN: - case TYPE_TINYINT: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_SMALLINT: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_INT: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_BIGINT: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_FLOAT: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_DOUBLE: - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - case TYPE_DATE: - case TYPE_DATETIME: { - vectorized::VecDateTimeValue value = - binary_cast( - *(int64_t*)item); - - char buf[64]; - char* pos = value.to_string(buf); - std::string_view str(buf, pos - buf - 1); - fmt::format_to(_insert_stmt_buffer, "'{}'", str); - break; - } - case TYPE_VARCHAR: - case TYPE_CHAR: - case TYPE_STRING: { - fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size)); - break; - } - case TYPE_DECIMALV2: { - DecimalV2Value value = *(DecimalV2Value*)(item); - fmt::format_to(_insert_stmt_buffer, "{}", value.to_string()); - break; - } - case TYPE_LARGEINT: { - fmt::format_to(_insert_stmt_buffer, "{}", - *reinterpret_cast(item)); - break; - } - default: { - return Status::InternalError("can't convert this type to mysql type. type = {}", - _output_expr_ctxs[j]->root()->type().type); - } - } - } - - if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) { - fmt::format_to(_insert_stmt_buffer, "{}", "),("); - } else { - // batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt - fmt::format_to(_insert_stmt_buffer, "{}", ")"); - break; - } - } - // Translate utf8 string to utf16 to use unicode encodeing - insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(), - _insert_stmt_buffer.data() + _insert_stmt_buffer.size()); - } - - { - SCOPED_TIMER(_result_send_timer); - ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT, - SQLExecDirectW(_stmt, (SQLWCHAR*)(insert_stmt.c_str()), SQL_NTS), - _insert_stmt_buffer.data()); - } - COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent); - return Status::OK(); -} - } // namespace doris diff --git a/be/src/exec/odbc_connector.h b/be/src/exec/odbc_connector.h index 6782a14ae0..41c7280741 100644 --- a/be/src/exec/odbc_connector.h +++ b/be/src/exec/odbc_connector.h @@ -16,26 +16,12 @@ // under the License. #pragma once - -#include -#include - -#include -#include -#include -#include +#include #include "common/status.h" -#include "exprs/expr_context.h" -#include "runtime/descriptors.h" -#include "runtime/row_batch.h" +#include "exec/table_connector.h" namespace doris { - -namespace vectorized { -class VExprContext; -} - struct ODBCConnectorParam { std::string connect_string; @@ -63,66 +49,39 @@ struct DataBinding { }; // ODBC Connector for scan data from ODBC -class ODBCConnector { +class ODBCConnector : public TableConnector { public: explicit ODBCConnector(const ODBCConnectorParam& param); - ~ODBCConnector(); + ~ODBCConnector() override; - Status open(); + Status open() override; // query for ODBC table - Status query(); + Status query() override; + Status get_next_row(bool* eos); - // write for ODBC table - Status init_to_write(RuntimeProfile* profile); - Status append(const std::string& table_name, RowBatch* batch, uint32_t start_send_row, - uint32_t* num_rows_sent); - - Status append(const std::string& table_name, vectorized::Block* block, - const std::vector& _output_vexpr_ctxs, - uint32_t start_send_row, uint32_t* num_rows_sent); + Status exec_write_sql(const std::u16string& insert_stmt, + const fmt::memory_buffer& insert_stmt_buffer) override; // use in ODBC transaction - Status begin_trans(); // should be call after connect and before query or init_to_write - Status abort_trans(); // should be call after transaction abort - Status finish_trans(); // should be call after transaction commit + Status begin_trans() override; // should be call after connect and before query or init_to_write + Status abort_trans() override; // should be call after transaction abort + Status finish_trans() override; // should be call after transaction commit const DataBinding& get_column_data(int i) const { return *_columns_data.at(i).get(); } + Status init_to_write(RuntimeProfile* profile); private: - void _init_profile(RuntimeProfile*); - static Status error_status(const std::string& prefix, const std::string& error_msg); static std::string handle_diagnostic_record(SQLHANDLE hHandle, SQLSMALLINT hType, RETCODE RetCode); std::string _connect_string; - // only use in query - std::string _sql_str; - const TupleDescriptor* _tuple_desc; - - // only use in write - const std::vector _output_expr_ctxs; - fmt::memory_buffer _insert_stmt_buffer; - - // profile use in write - // tuple convert timer, child timer of _append_row_batch_timer - RuntimeProfile::Counter* _convert_tuple_timer = nullptr; - // file write timer, child timer of _append_row_batch_timer - RuntimeProfile::Counter* _result_send_timer = nullptr; - // number of sent rows - RuntimeProfile::Counter* _sent_rows_counter = nullptr; - - bool _is_open; - bool _is_in_transaction; - SQLSMALLINT _field_num; - SQLHENV _env; SQLHDBC _dbc; SQLHSTMT _stmt; - std::vector> _columns_data; }; diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp new file mode 100644 index 0000000000..27af61c8f3 --- /dev/null +++ b/be/src/exec/table_connector.cpp @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/table_connector.h" + +#include "exprs/expr.h" +#include "runtime/primitive_type.h" +#include "util/mysql_global.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris { + +// Default max buffer size use in insert to: 50MB, normally a batch is smaller than the size +static constexpr uint32_t INSERT_BUFFER_SIZE = 1024l * 1024 * 50; + +TableConnector::TableConnector(const TupleDescriptor* tuple_desc, const std::string& sql_str) + : _is_open(false), _is_in_transaction(false), _tuple_desc(tuple_desc), _sql_str(sql_str) {} + +void TableConnector::init_profile(doris::RuntimeProfile* profile) { + _convert_tuple_timer = ADD_TIMER(profile, "TupleConvertTime"); + _result_send_timer = ADD_TIMER(profile, "ResultSendTime"); + _sent_rows_counter = ADD_COUNTER(profile, "NumSentRows", TUnit::UNIT); +} + +std::u16string TableConnector::utf8_to_u16string(const char* first, const char* last) { + std::wstring_convert, char16_t> utf8_utf16_cvt; + return utf8_utf16_cvt.from_bytes(first, last); +} + +Status TableConnector::append(const std::string& table_name, RowBatch* batch, + const std::vector& output_expr_ctxs, + uint32_t start_send_row, uint32* num_rows_sent) { + _insert_stmt_buffer.clear(); + std::u16string insert_stmt; + { + SCOPED_TIMER(_convert_tuple_timer); + fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name); + + int num_rows = batch->num_rows(); + for (int i = start_send_row; i < num_rows; ++i) { + auto row = batch->get_row(i); + (*num_rows_sent)++; + + // Construct insert statement of odbc table + int num_columns = output_expr_ctxs.size(); + for (int j = 0; j < num_columns; ++j) { + if (j != 0) { + fmt::format_to(_insert_stmt_buffer, "{}", ", "); + } + void* item = output_expr_ctxs[j]->get_value(row); + if (item == nullptr) { + fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); + continue; + } + switch (output_expr_ctxs[j]->root()->type().type) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); + break; + case TYPE_SMALLINT: + fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); + break; + case TYPE_INT: + fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); + break; + case TYPE_BIGINT: + fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); + break; + case TYPE_FLOAT: + fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); + break; + case TYPE_DOUBLE: + fmt::format_to(_insert_stmt_buffer, "{}", *static_cast(item)); + break; + case TYPE_DATE: + case TYPE_DATETIME: { + char buf[64]; + const auto* time_val = (const DateTimeValue*)(item); + time_val->to_string(buf); + fmt::format_to(_insert_stmt_buffer, "'{}'", buf); + break; + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + const auto* string_val = (const StringValue*)(item); + + if (string_val->ptr == nullptr) { + if (string_val->len == 0) { + fmt::format_to(_insert_stmt_buffer, "{}", "''"); + } else { + fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); + } + } else { + fmt::format_to(_insert_stmt_buffer, "'{}'", + fmt::basic_string_view(string_val->ptr, string_val->len)); + } + break; + } + case TYPE_DECIMALV2: { + const DecimalV2Value decimal_val( + reinterpret_cast(item)->value); + char buffer[MAX_DECIMAL_WIDTH]; + int output_scale = output_expr_ctxs[j]->root()->output_scale(); + int len = decimal_val.to_buffer(buffer, output_scale); + _insert_stmt_buffer.append(buffer, buffer + len); + break; + } + case TYPE_LARGEINT: { + fmt::format_to(_insert_stmt_buffer, "{}", + reinterpret_cast(item)->value); + break; + } + default: { + return Status::InternalError("can't convert this type to mysql type. type = {}", + output_expr_ctxs[j]->root()->type().type); + } + } + } + + if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) { + fmt::format_to(_insert_stmt_buffer, "{}", "),("); + } else { + // batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt + fmt::format_to(_insert_stmt_buffer, "{}", ")"); + break; + } + } + // Translate utf8 string to utf16 to use unicode encodeing + insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(), + _insert_stmt_buffer.data() + _insert_stmt_buffer.size()); + } + + RETURN_IF_ERROR(exec_write_sql(insert_stmt, _insert_stmt_buffer)); + COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent); + return Status::OK(); +} + +Status TableConnector::append(const std::string& table_name, vectorized::Block* block, + const std::vector& output_vexpr_ctxs, + uint32_t start_send_row, uint32_t* num_rows_sent) { + _insert_stmt_buffer.clear(); + std::u16string insert_stmt; + { + SCOPED_TIMER(_convert_tuple_timer); + fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name); + + int num_rows = block->rows(); + int num_columns = block->columns(); + for (int i = start_send_row; i < num_rows; ++i) { + (*num_rows_sent)++; + + // Construct insert statement of odbc/jdbc table + for (int j = 0; j < num_columns; ++j) { + if (j != 0) { + fmt::format_to(_insert_stmt_buffer, "{}", ", "); + } + auto& column_ptr = block->get_by_position(j).column; + auto& type_ptr = block->get_by_position(j).type; + vectorized::ColumnPtr column; + if (type_ptr->is_nullable()) { + column = assert_cast(*column_ptr) + .get_nested_column_ptr(); + if (column_ptr->is_null_at(i)) { + fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); + continue; + } + } else { + column = column_ptr; + } + auto [item, size] = column->get_data_at(i); + switch (output_vexpr_ctxs[j]->root()->type().type) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + fmt::format_to(_insert_stmt_buffer, "{}", + *reinterpret_cast(item)); + break; + case TYPE_SMALLINT: + fmt::format_to(_insert_stmt_buffer, "{}", + *reinterpret_cast(item)); + break; + case TYPE_INT: + fmt::format_to(_insert_stmt_buffer, "{}", + *reinterpret_cast(item)); + break; + case TYPE_BIGINT: + fmt::format_to(_insert_stmt_buffer, "{}", + *reinterpret_cast(item)); + break; + case TYPE_FLOAT: + fmt::format_to(_insert_stmt_buffer, "{}", + *reinterpret_cast(item)); + break; + case TYPE_DOUBLE: + fmt::format_to(_insert_stmt_buffer, "{}", + *reinterpret_cast(item)); + break; + case TYPE_DATE: + case TYPE_DATETIME: { + vectorized::VecDateTimeValue value = + binary_cast( + *(int64_t*)item); + + char buf[64]; + char* pos = value.to_string(buf); + std::string_view str(buf, pos - buf - 1); + fmt::format_to(_insert_stmt_buffer, "'{}'", str); + break; + } + case TYPE_DATEV2: { + vectorized::DateV2Value value = binary_cast< + uint32_t, doris::vectorized::DateV2Value>( + *(int32_t*)item); + + char buf[64]; + char* pos = value.to_string(buf); + std::string str(buf, pos - buf - 1); + fmt::format_to(_insert_stmt_buffer, "'{}'", str); + break; + } + case TYPE_DATETIMEV2: { + vectorized::DateV2Value value = binary_cast< + uint64_t, + doris::vectorized::DateV2Value>( + *(int64_t*)item); + + char buf[64]; + char* pos = value.to_string(buf, output_vexpr_ctxs[i]->root()->type().scale); + std::string str(buf, pos - buf - 1); + fmt::format_to(_insert_stmt_buffer, "'{}'", str); + break; + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size)); + break; + } + case TYPE_DECIMALV2: { + DecimalV2Value value = *(DecimalV2Value*)(item); + fmt::format_to(_insert_stmt_buffer, "{}", value.to_string()); + break; + } + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128: { + auto val = type_ptr->to_string(*column, i); + fmt::format_to(_insert_stmt_buffer, "{}", val); + break; + } + case TYPE_LARGEINT: { + fmt::format_to(_insert_stmt_buffer, "{}", + *reinterpret_cast(item)); + break; + } + default: { + return Status::InternalError("can't convert this type to mysql type. type = {}", + output_vexpr_ctxs[j]->root()->type().type); + } + } + } + + if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) { + fmt::format_to(_insert_stmt_buffer, "{}", "),("); + } else { + // batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt + fmt::format_to(_insert_stmt_buffer, "{}", ")"); + break; + } + } + // Translate utf8 string to utf16 to use unicode encodeing + insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(), + _insert_stmt_buffer.data() + _insert_stmt_buffer.size()); + } + RETURN_IF_ERROR(exec_write_sql(insert_stmt, _insert_stmt_buffer)); + COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent); + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h new file mode 100644 index 0000000000..8b636bab5a --- /dev/null +++ b/be/src/exec/table_connector.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include +#include + +#include "common/status.h" +#include "exprs/expr_context.h" +#include "runtime/descriptors.h" +#include "runtime/row_batch.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris { + +// Table Connector for scan data from ODBC/JDBC +class TableConnector { +public: + TableConnector(const TupleDescriptor* tuple_desc, const std::string& sql_str); + virtual ~TableConnector() = default; + + virtual Status open() = 0; + // exec query for table + virtual Status query() = 0; + + // use in ODBC/JDBC transaction + virtual Status begin_trans() = 0; // should be call after connect and before query + virtual Status abort_trans() = 0; // should be call after transaction abort + virtual Status finish_trans() = 0; // should be call after transaction commit + + virtual Status exec_write_sql(const std::u16string& insert_stmt, + const fmt::memory_buffer& _insert_stmt_buffer) = 0; + //write data into table row batch + Status append(const std::string& table_name, RowBatch* batch, + const std::vector& _output_expr_ctxs, uint32_t start_send_row, + uint32_t* num_rows_sent); + + //write data into table vectorized + Status append(const std::string& table_name, vectorized::Block* block, + const std::vector& _output_vexpr_ctxs, + uint32_t start_send_row, uint32_t* num_rows_sent); + + void init_profile(RuntimeProfile*); + + std::u16string utf8_to_u16string(const char* first, const char* last); + +protected: + bool _is_open; + bool _is_in_transaction; + const TupleDescriptor* _tuple_desc; + // only use in query + std::string _sql_str; + // only use in write + fmt::memory_buffer _insert_stmt_buffer; + + // profile use in write + // tuple convert timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _convert_tuple_timer = nullptr; + // file write timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _result_send_timer = nullptr; + // number of sent rows + RuntimeProfile::Counter* _sent_rows_counter = nullptr; +}; + +} // namespace doris diff --git a/be/src/runtime/odbc_table_sink.cpp b/be/src/runtime/odbc_table_sink.cpp index 11b175a90c..3191431138 100644 --- a/be/src/runtime/odbc_table_sink.cpp +++ b/be/src/runtime/odbc_table_sink.cpp @@ -80,8 +80,11 @@ Status OdbcTableSink::send(RuntimeState* state, RowBatch* batch) { uint32_t start_send_row = 0; uint32_t num_row_sent = 0; while (start_send_row < batch->num_rows()) { - auto status = _writer->append(_odbc_tbl, batch, start_send_row, &num_row_sent); - if (UNLIKELY(!status.ok())) return status; + auto status = + _writer->append(_odbc_tbl, batch, _output_expr_ctxs, start_send_row, &num_row_sent); + if (UNLIKELY(!status.ok())) { + return status; + } start_send_row += num_row_sent; num_row_sent = 0; } diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 36cadb284f..4cb28ce405 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -212,6 +212,8 @@ set(VEC_FILES sink/vmysql_table_sink.cpp sink/vodbc_table_sink.cpp sink/vresult_file_sink.cpp + sink/vjdbc_table_sink.cpp + sink/vtable_sink.cpp runtime/vdatetime_value.cpp runtime/vdata_stream_recvr.cpp runtime/vdata_stream_mgr.cpp diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index d598ef26c7..2551fd88c2 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -17,12 +17,15 @@ #include "vec/exec/vjdbc_connector.h" #ifdef LIBJVM +#include "exec/table_connector.h" #include "gen_cpp/Types_types.h" #include "gutil/strings/substitute.h" #include "jni.h" #include "runtime/user_function_cache.h" #include "util/jni-util.h" #include "vec/columns/column_nullable.h" +#include "vec/exprs/vexpr.h" + namespace doris { namespace vectorized { const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/udf/JdbcExecutor"; @@ -33,17 +36,18 @@ const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;"; const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V"; const char* JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE = "(Ljava/lang/Object;)J"; const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;)J"; +const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V"; JdbcConnector::JdbcConnector(const JdbcConnectorParam& param) - : _is_open(false), - _query_string(param.query_string), - _tuple_desc(param.tuple_desc), - _conn_param(param) {} + : TableConnector(param.tuple_desc, param.query_string), _conn_param(param) {} JdbcConnector::~JdbcConnector() { if (!_is_open) { return; } + if (_is_in_transaction) { + abort_trans(); + } JNIEnv* env; Status status; RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env)); @@ -101,7 +105,7 @@ Status JdbcConnector::open() { return Status::OK(); } -Status JdbcConnector::query_exec() { +Status JdbcConnector::query() { if (!_is_open) { return Status::InternalError("Query before open of JdbcConnector."); } @@ -115,7 +119,7 @@ Status JdbcConnector::query_exec() { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - jstring query_sql = env->NewStringUTF(_query_string.c_str()); + jstring query_sql = env->NewStringUTF(_sql_str.c_str()); jint colunm_count = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_query_id, query_sql); env->DeleteLocalRef(query_sql); @@ -129,7 +133,7 @@ Status JdbcConnector::query_exec() { Status JdbcConnector::get_next(bool* eos, std::vector& columns, int batch_size) { if (!_is_open) { - return Status::InternalError("get_next before open if jdbc."); + return Status::InternalError("get_next before open of jdbc connector."); } JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); @@ -203,6 +207,12 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { RETURN_IF_ERROR( register_id(_executor_object_clazz, "toString", "()Ljava/lang/String;", _to_string_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "openTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE, + _executor_begin_trans_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "commitTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE, + _executor_finish_trans_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "rollbackTrans", + JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id)); return Status::OK(); } @@ -301,6 +311,18 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj, return Status::OK(); } +Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt, + const fmt::memory_buffer& insert_stmt_buffer) { + SCOPED_TIMER(_result_send_timer); + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + jstring query_sql = env->NewString((const jchar*)insert_stmt.c_str(), insert_stmt.size()); + env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_query_id, query_sql); + env->DeleteLocalRef(query_sql); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + return Status::OK(); +} + std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) { jobject jstr = env->CallObjectMethod(jobj, _to_string_id); const jbyteArray stringJbytes = @@ -323,6 +345,40 @@ int64_t JdbcConnector::_jobject_to_datetime(JNIEnv* env, jobject jobj) { _executor_convert_datetime_id, jobj); } +Status JdbcConnector::begin_trans() { + if (!_is_open) { + return Status::InternalError("Begin transaction before open."); + } + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_begin_trans_id); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + _is_in_transaction = true; + return Status::OK(); +} + +Status JdbcConnector::abort_trans() { + if (!_is_in_transaction) { + return Status::InternalError("Abort transaction before begin trans."); + } + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_abort_trans_id); + return JniUtil::GetJniExceptionMsg(env); +} + +Status JdbcConnector::finish_trans() { + if (!_is_in_transaction) { + return Status::InternalError("Abort transaction before begin trans."); + } + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_finish_trans_id); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + _is_in_transaction = false; + return Status::OK(); +} + #define FUNC_IMPL_TO_CONVERT_DATA(cpp_return_type, java_type, sig, java_return_type) \ cpp_return_type JdbcConnector::_jobject_to_##cpp_return_type(JNIEnv* env, jobject jobj) { \ jmethodID method_id_##cpp_return_type = env->GetMethodID( \ diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index 750096fabf..04aa8982da 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -18,10 +18,8 @@ #pragma once #ifdef LIBJVM -#include - +#include "exec/table_connector.h" #include "jni.h" -#include "runtime/descriptors.h" namespace doris { namespace vectorized { @@ -38,18 +36,26 @@ struct JdbcConnectorParam { const TupleDescriptor* tuple_desc; }; -class JdbcConnector { +class JdbcConnector : public TableConnector { public: JdbcConnector(const JdbcConnectorParam& param); - ~JdbcConnector(); + ~JdbcConnector() override; - Status open(); + Status open() override; - Status query_exec(); + Status query() override; + + Status exec_write_sql(const std::u16string& insert_stmt, + const fmt::memory_buffer& insert_stmt_buffer) override; Status get_next(bool* eos, std::vector& columns, int batch_size); + // use in JDBC transaction + Status begin_trans() override; // should be call after connect and before query or init_to_write + Status abort_trans() override; // should be call after transaction abort + Status finish_trans() override; // should be call after transaction commit + private: Status _register_func_id(JNIEnv* env); Status _convert_column_data(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc, @@ -58,10 +64,7 @@ private: int64_t _jobject_to_date(JNIEnv* env, jobject jobj); int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj); - bool _is_open; - std::string _query_string; - const TupleDescriptor* _tuple_desc; - const JdbcConnectorParam _conn_param; + const JdbcConnectorParam& _conn_param; jclass _executor_clazz; jclass _executor_list_clazz; jclass _executor_object_clazz; @@ -78,6 +81,9 @@ private: jmethodID _executor_convert_datetime_id; jmethodID _get_bytes_id; jmethodID _to_string_id; + jmethodID _executor_begin_trans_id; + jmethodID _executor_finish_trans_id; + jmethodID _executor_abort_trans_id; #define FUNC_VARI_DECLARE(RETURN_TYPE) \ RETURN_TYPE _jobject_to_##RETURN_TYPE(JNIEnv* env, jobject jobj); \ diff --git a/be/src/vec/exec/vjdbc_scan_node.cpp b/be/src/vec/exec/vjdbc_scan_node.cpp index d302d01be8..e857218e97 100644 --- a/be/src/vec/exec/vjdbc_scan_node.cpp +++ b/be/src/vec/exec/vjdbc_scan_node.cpp @@ -92,7 +92,7 @@ Status VJdbcScanNode::open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(_jdbc_connector->open()); - RETURN_IF_ERROR(_jdbc_connector->query_exec()); + RETURN_IF_ERROR(_jdbc_connector->query()); return Status::OK(); } diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp b/be/src/vec/sink/vjdbc_table_sink.cpp new file mode 100644 index 0000000000..b1037d9a6f --- /dev/null +++ b/be/src/vec/sink/vjdbc_table_sink.cpp @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vjdbc_table_sink.h" + +#ifdef LIBJVM +#include + +#include + +#include "vec/core/materialize_block.h" +#include "vec/sink/vtable_sink.h" + +namespace doris { +namespace vectorized { + +VJdbcTableSink::VJdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& t_exprs) + : VTableSink(pool, row_desc, t_exprs) { + _name = "VJdbcTableSink"; +} + +Status VJdbcTableSink::init(const TDataSink& t_sink) { + RETURN_IF_ERROR(VTableSink::init(t_sink)); + const TJdbcTableSink& t_jdbc_sink = t_sink.jdbc_table_sink; + + _jdbc_param.jdbc_url = t_jdbc_sink.jdbc_table.jdbc_url; + _jdbc_param.user = t_jdbc_sink.jdbc_table.jdbc_user; + _jdbc_param.passwd = t_jdbc_sink.jdbc_table.jdbc_password; + _jdbc_param.driver_class = t_jdbc_sink.jdbc_table.jdbc_driver_class; + _jdbc_param.driver_path = t_jdbc_sink.jdbc_table.jdbc_driver_url; + _jdbc_param.driver_checksum = t_jdbc_sink.jdbc_table.jdbc_driver_checksum; + _jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name; + _table_name = t_jdbc_sink.jdbc_table.jdbc_table_name; + _use_transaction = t_jdbc_sink.use_transaction; + + return Status::OK(); +} + +Status VJdbcTableSink::open(RuntimeState* state) { + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJdbcTableSink::open"); + RETURN_IF_ERROR(VTableSink::open(state)); + + // create writer + _writer.reset(new JdbcConnector(_jdbc_param)); + RETURN_IF_ERROR(_writer->open()); + if (_use_transaction) { + RETURN_IF_ERROR(_writer->begin_trans()); + } + + _writer->init_profile(_profile); + return Status::OK(); +} + +Status VJdbcTableSink::send(RuntimeState* state, Block* block) { + INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VJdbcTableSink::send"); + Status status = Status::OK(); + if (block == nullptr || block->rows() == 0) { + return status; + } + + auto output_block = vectorized::VExprContext::get_output_block_after_execute_exprs( + _output_vexpr_ctxs, *block, status); + materialize_block_inplace(output_block); + + uint32_t start_send_row = 0; + uint32_t num_row_sent = 0; + while (start_send_row < output_block.rows()) { + RETURN_IF_ERROR(_writer->append(_table_name, &output_block, _output_vexpr_ctxs, + start_send_row, &num_row_sent)); + start_send_row += num_row_sent; + num_row_sent = 0; + } + + return Status::OK(); +} + +Status VJdbcTableSink::close(RuntimeState* state, Status exec_status) { + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJdbcTableSink::close"); + RETURN_IF_ERROR(VTableSink::close(state, exec_status)); + if (exec_status.ok() && _use_transaction) { + RETURN_IF_ERROR(_writer->finish_trans()); + } + return DataSink::close(state, exec_status); +} +} // namespace vectorized +} // namespace doris +#endif \ No newline at end of file diff --git a/be/src/vec/sink/vjdbc_table_sink.h b/be/src/vec/sink/vjdbc_table_sink.h new file mode 100644 index 0000000000..726e276bdf --- /dev/null +++ b/be/src/vec/sink/vjdbc_table_sink.h @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#ifdef LIBJVM +#include "common/status.h" +#include "vec/exec/vjdbc_connector.h" +#include "vec/sink/vtable_sink.h" + +namespace doris { +namespace vectorized { + +// This class is a sinker, which put input data to jdbc table +class VJdbcTableSink : public VTableSink { +public: + VJdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& t_exprs); + + Status init(const TDataSink& thrift_sink) override; + + Status open(RuntimeState* state) override; + + Status send(RuntimeState* state, vectorized::Block* block) override; + + Status close(RuntimeState* state, Status exec_status) override; + +private: + JdbcConnectorParam _jdbc_param; + std::unique_ptr _writer; +}; +} // namespace vectorized +} // namespace doris +#endif \ No newline at end of file diff --git a/be/src/vec/sink/vmysql_table_sink.cpp b/be/src/vec/sink/vmysql_table_sink.cpp index 49d95f27ec..bdb2374b3a 100644 --- a/be/src/vec/sink/vmysql_table_sink.cpp +++ b/be/src/vec/sink/vmysql_table_sink.cpp @@ -20,63 +20,39 @@ #include #include "runtime/runtime_state.h" -#include "util/debug_util.h" -#include "util/runtime_profile.h" -#include "vec/exprs/vexpr.h" +#include "vec/sink/vtable_sink.h" namespace doris { namespace vectorized { VMysqlTableSink::VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& t_exprs) - : _pool(pool), _row_desc(row_desc), _t_output_expr(t_exprs) { + : VTableSink(pool, row_desc, t_exprs) { _name = "VMysqlTableSink"; } -VMysqlTableSink::~VMysqlTableSink() {} - Status VMysqlTableSink::init(const TDataSink& t_sink) { - RETURN_IF_ERROR(DataSink::init(t_sink)); + RETURN_IF_ERROR(VTableSink::init(t_sink)); const TMysqlTableSink& t_mysql_sink = t_sink.mysql_table_sink; - _conn_info.host = t_mysql_sink.host; _conn_info.port = t_mysql_sink.port; _conn_info.user = t_mysql_sink.user; _conn_info.passwd = t_mysql_sink.passwd; _conn_info.db = t_mysql_sink.db; - _mysql_tbl = t_mysql_sink.table; + _table_name = t_mysql_sink.table; _conn_info.charset = t_mysql_sink.charset; - - // From the thrift expressions create the real exprs. - RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs)); - return Status::OK(); -} - -Status VMysqlTableSink::prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSink::prepare(state)); - // Prepare the exprs to run. - RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _row_desc)); - std::stringstream title; - title << "VMysqlTableSink (frag_id=" << state->fragment_instance_id() << ")"; - // create profile - _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); return Status::OK(); } Status VMysqlTableSink::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlTableSink::open"); // Prepare the exprs to run. - RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state)); + RETURN_IF_ERROR(VTableSink::open(state)); // create writer - _writer = state->obj_pool()->add(new VMysqlTableWriter(_output_expr_ctxs)); - RETURN_IF_ERROR(_writer->open(_conn_info, _mysql_tbl)); + _writer.reset(new VMysqlTableWriter(_output_vexpr_ctxs)); + RETURN_IF_ERROR(_writer->open(_conn_info, _table_name)); return Status::OK(); } -Status VMysqlTableSink::send(RuntimeState* state, RowBatch* batch) { - return Status::NotSupported( - "Not Implemented VMysqlTableSink::send(RuntimeState* state, RowBatch* batch)"); -} - Status VMysqlTableSink::send(RuntimeState* state, Block* block) { INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VMysqlTableSink::send"); return _writer->append(block); @@ -84,7 +60,7 @@ Status VMysqlTableSink::send(RuntimeState* state, Block* block) { Status VMysqlTableSink::close(RuntimeState* state, Status exec_status) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlTableSink::close"); - VExpr::close(_output_expr_ctxs, state); + RETURN_IF_ERROR(VTableSink::close(state, exec_status)); return Status::OK(); } } // namespace vectorized diff --git a/be/src/vec/sink/vmysql_table_sink.h b/be/src/vec/sink/vmysql_table_sink.h index 58c7dfce13..6a30275a8e 100644 --- a/be/src/vec/sink/vmysql_table_sink.h +++ b/be/src/vec/sink/vmysql_table_sink.h @@ -18,55 +18,29 @@ #include #include "common/status.h" -#include "exec/data_sink.h" #include "vec/sink/vmysql_table_writer.h" +#include "vec/sink/vtable_sink.h" namespace doris { - -class RowDescriptor; -class TExpr; -class TMysqlTableSink; -class RuntimeState; -class RuntimeProfile; namespace vectorized { -class VExprContext; -class VExpr; // This class is a sinker, which put input data to mysql table -class VMysqlTableSink : public DataSink { +class VMysqlTableSink : public VTableSink { public: VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& t_exprs); - ~VMysqlTableSink(); - Status init(const TDataSink& thrift_sink) override; - Status prepare(RuntimeState* state) override; - Status open(RuntimeState* state) override; - Status send(RuntimeState* state, RowBatch* batch) override; - Status send(RuntimeState* state, vectorized::Block* block) override; - // Flush all buffered data and close all existing channels to destination - // hosts. Further send() calls are illegal after calling close(). + Status close(RuntimeState* state, Status exec_status) override; - RuntimeProfile* profile() override { return _profile; } - private: - // owned by RuntimeState - ObjectPool* _pool; - const RowDescriptor& _row_desc; - const std::vector& _t_output_expr; - - std::vector _output_expr_ctxs; MysqlConnInfo _conn_info; - std::string _mysql_tbl; - VMysqlTableWriter* _writer; - - RuntimeProfile* _profile; + std::unique_ptr _writer; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/vodbc_table_sink.cpp b/be/src/vec/sink/vodbc_table_sink.cpp index 04fdef8e27..695c06ac68 100644 --- a/be/src/vec/sink/vodbc_table_sink.cpp +++ b/be/src/vec/sink/vodbc_table_sink.cpp @@ -20,47 +20,31 @@ #include #include "runtime/runtime_state.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "vec/core/materialize_block.h" -#include "vec/exprs/vexpr.h" +#include "vec/sink/vtable_sink.h" namespace doris { namespace vectorized { VOdbcTableSink::VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& t_exprs) - : _pool(pool), _row_desc(row_desc), _t_output_expr(t_exprs) { + : VTableSink(pool, row_desc, t_exprs) { _name = "VOdbcTableSink"; } Status VOdbcTableSink::init(const TDataSink& t_sink) { - RETURN_IF_ERROR(DataSink::init(t_sink)); + RETURN_IF_ERROR(VTableSink::init(t_sink)); const TOdbcTableSink& t_odbc_sink = t_sink.odbc_table_sink; - _odbc_param.connect_string = t_odbc_sink.connect_string; - _odbc_tbl = t_odbc_sink.table; + _table_name = t_odbc_sink.table; _use_transaction = t_odbc_sink.use_transaction; - - // From the thrift expressions create the real exprs. - RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs)); - return Status::OK(); -} - -Status VOdbcTableSink::prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSink::prepare(state)); - // Prepare the exprs to run. - RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _row_desc)); - std::stringstream title; - title << "VOdbcTableSink (frag_id=" << state->fragment_instance_id() << ")"; - // create profile - _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); return Status::OK(); } Status VOdbcTableSink::open(RuntimeState* state) { - // Prepare the exprs to run. - RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state)); + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOdbcTableSink::open"); + RETURN_IF_ERROR(VTableSink::open(state)); // create writer _writer.reset(new ODBCConnector(_odbc_param)); @@ -72,27 +56,22 @@ Status VOdbcTableSink::open(RuntimeState* state) { return Status::OK(); } -Status VOdbcTableSink::send(RuntimeState* state, RowBatch* batch) { - return Status::NotSupported( - "Not Implemented VOdbcTableSink::send(RuntimeState* state, RowBatch* batch)"); -} - Status VOdbcTableSink::send(RuntimeState* state, Block* block) { + INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VOdbcTableSink::send"); Status status = Status::OK(); if (block == nullptr || block->rows() == 0) { return status; } auto output_block = vectorized::VExprContext::get_output_block_after_execute_exprs( - _output_expr_ctxs, *block, status); + _output_vexpr_ctxs, *block, status); materialize_block_inplace(output_block); uint32_t start_send_row = 0; uint32_t num_row_sent = 0; while (start_send_row < output_block.rows()) { - status = _writer->append(_odbc_tbl, &output_block, _output_expr_ctxs, start_send_row, - &num_row_sent); - if (UNLIKELY(!status.ok())) return status; + RETURN_IF_ERROR(_writer->append(_table_name, &output_block, _output_vexpr_ctxs, + start_send_row, &num_row_sent)); start_send_row += num_row_sent; num_row_sent = 0; } @@ -101,8 +80,12 @@ Status VOdbcTableSink::send(RuntimeState* state, Block* block) { } Status VOdbcTableSink::close(RuntimeState* state, Status exec_status) { - VExpr::close(_output_expr_ctxs, state); - return Status::OK(); + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOdbcTableSink::close"); + RETURN_IF_ERROR(VTableSink::close(state, exec_status)); + if (exec_status.ok() && _use_transaction) { + RETURN_IF_ERROR(_writer->finish_trans()); + } + return DataSink::close(state, exec_status); } } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/vodbc_table_sink.h b/be/src/vec/sink/vodbc_table_sink.h index 786b3cfb57..dc3d38efee 100644 --- a/be/src/vec/sink/vodbc_table_sink.h +++ b/be/src/vec/sink/vodbc_table_sink.h @@ -17,54 +17,29 @@ #pragma once #include "common/status.h" -#include "exec/data_sink.h" #include "exec/odbc_connector.h" -#include "vec/sink/vmysql_table_writer.h" +#include "vec/sink/vtable_sink.h" namespace doris { - -class RowDescriptor; -class TExpr; -class RuntimeState; -class RuntimeProfile; namespace vectorized { // This class is a sinker, which put input data to odbc table -class VOdbcTableSink : public DataSink { +class VOdbcTableSink : public VTableSink { public: VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& t_exprs); Status init(const TDataSink& thrift_sink) override; - Status prepare(RuntimeState* state) override; - Status open(RuntimeState* state) override; - Status send(RuntimeState* state, RowBatch* batch) override; - Status send(RuntimeState* state, vectorized::Block* block) override; - // Flush all buffered data and close all existing channels to destination - // hosts. Further send() calls are illegal after calling close(). + Status close(RuntimeState* state, Status exec_status) override; - RuntimeProfile* profile() override { return _profile; } - private: - // owned by RuntimeState - ObjectPool* _pool; - const RowDescriptor& _row_desc; - const std::vector& _t_output_expr; - - std::vector _output_expr_ctxs; - - RuntimeProfile* _profile; - ODBCConnectorParam _odbc_param; - std::string _odbc_tbl; std::unique_ptr _writer; - // whether use transaction - bool _use_transaction; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/vtable_sink.cpp b/be/src/vec/sink/vtable_sink.cpp new file mode 100644 index 0000000000..c67a9a933a --- /dev/null +++ b/be/src/vec/sink/vtable_sink.cpp @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vtable_sink.h" + +#include + +#include "runtime/runtime_state.h" +#include "vec/exprs/vexpr.h" + +namespace doris { +namespace vectorized { + +VTableSink::VTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& t_exprs) + : _pool(pool), _row_desc(row_desc), _t_output_expr(t_exprs) {} + +Status VTableSink::init(const TDataSink& t_sink) { + RETURN_IF_ERROR(DataSink::init(t_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_vexpr_ctxs)); + return Status::OK(); +} + +Status VTableSink::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSink::prepare(state)); + // Prepare the exprs to run. + RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); + std::stringstream title; + title << _name << " (frag_id=" << state->fragment_instance_id() << ")"; + // create profile + _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); + return Status::OK(); +} + +Status VTableSink::open(RuntimeState* state) { + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VTableSink::open"); + // Prepare the exprs to run. + RETURN_IF_ERROR(VExpr::open(_output_vexpr_ctxs, state)); + return Status::OK(); +} + +Status VTableSink::send(RuntimeState* state, RowBatch* batch) { + return Status::NotSupported( + "Not Implemented VTableSink::send(RuntimeState* state, RowBatch* batch)"); +} + +Status VTableSink::send(RuntimeState* state, Block* block) { + INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VTableSink::send"); + return Status::OK(); +} +Status VTableSink::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } + VExpr::close(_output_vexpr_ctxs, state); + return Status::OK(); +} +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/vtable_sink.h b/be/src/vec/sink/vtable_sink.h new file mode 100644 index 0000000000..ccad0cb3aa --- /dev/null +++ b/be/src/vec/sink/vtable_sink.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "common/status.h" +#include "exec/data_sink.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris { + +class RowDescriptor; +class TExpr; +class RuntimeState; +class RuntimeProfile; +namespace vectorized { + +class VTableSink : public DataSink { +public: + VTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& t_exprs); + + Status init(const TDataSink& thrift_sink) override; + + Status prepare(RuntimeState* state) override; + + Status open(RuntimeState* state) override; + + Status send(RuntimeState* state, RowBatch* batch) override; + + Status send(RuntimeState* state, vectorized::Block* block) override; + // Flush all buffered data and close all existing channels to destination + // hosts. Further send() calls are illegal after calling close(). + Status close(RuntimeState* state, Status exec_status) override; + + RuntimeProfile* profile() override { return _profile; } + +protected: + // owned by RuntimeState + ObjectPool* _pool; + const RowDescriptor& _row_desc; + const std::vector& _t_output_expr; + std::vector _output_vexpr_ctxs; + RuntimeProfile* _profile; + std::string _table_name; + // whether use transaction + bool _use_transaction; +}; +} // namespace vectorized +} // namespace doris diff --git a/docs/en/docs/ecosystem/external-table/jdbc-of-doris.md b/docs/en/docs/ecosystem/external-table/jdbc-of-doris.md index ede844c68e..2ca0418c0d 100644 --- a/docs/en/docs/ecosystem/external-table/jdbc-of-doris.md +++ b/docs/en/docs/ecosystem/external-table/jdbc-of-doris.md @@ -85,6 +85,24 @@ Parameter Description: ``` select * from mysql_table where k1 > 1000 and k3 ='term'; ``` +### Data write + +After the JDBC external table is create in Doris, the data can be written directly by the `insert into` statement, the query results of Doris can be written to the JDBC external table, or the data can be imported from one JDBC table to another. + +``` +insert into mysql_table values(1, "doris"); +insert into mysql_table select * from table; +``` +#### Transaction + + +The data of Doris is written to the external table by a group of batch. If the import is interrupted, the data written before may need to be rolled back. Therefore, the JDBC external table supports transactions when data is written. Transaction support needs to be supported set by session variable: `enable_odbc_transcation`. ODBC transactions are also controlled by this variable. + +``` +set enable_odbc_transcation = true; +``` + +Transactions ensure the atomicity of JDBC external table writing, but it will reduce the performance of data writing, so we can consider turning on the way as appropriate. #### 1.Mysql @@ -97,12 +115,12 @@ select * from mysql_table where k1 > 1000 and k3 ='term'; | ------------------ | ------------------------------ | | 14.5 | postgresql-42.5.0.jar | -#### 2 SQLServer +#### 3.SQLServer | SQLserver Version | SQLserver JDBC Driver Version | | ------------- | -------------------------- | | 2022 | mssql-jdbc-11.2.0.jre8.jar | -#### 2.oracle +#### 4.oracle | Oracle Version | Oracle JDBC Driver Version | | ---------- | ------------------- | | 11 | ojdbc6.jar | diff --git a/docs/zh-CN/docs/ecosystem/external-table/jdbc-of-doris.md b/docs/zh-CN/docs/ecosystem/external-table/jdbc-of-doris.md index 068a6a4d40..bf9a45a277 100644 --- a/docs/zh-CN/docs/ecosystem/external-table/jdbc-of-doris.md +++ b/docs/zh-CN/docs/ecosystem/external-table/jdbc-of-doris.md @@ -65,17 +65,17 @@ PROPERTIES ( ``` 参数说明: -| 参数 | 说明 | -| ---------------- | ---------------------------------------------------------------------------------------------------------------------------------- | -| **type** | "jdbc", 必填项标志资源类型 | -| **user** | 访问外表数据库所使的用户名 | -| **password** | 该用户对应的密码信息 | -| **jdbc_url** | JDBC的URL协议,包括数据库类型,IP地址,端口号和数据库名,不同数据库协议格式不一样。例如mysql: "jdbc:mysql://127.0.0.1:3306/test"。 | -| **driver_class** | 访问外表数据库的驱动包类名,例如mysql是:com.mysql.jdbc.Driver. | -| **driver_url** | 用于下载访问外部数据库的jar包驱动URL。http://IP:port/mysql-connector-java-5.1.47.jar | -| **resource** | 在Doris中建立外表时依赖的资源名,对应上步创建资源时的名字。 | -| **table** | 在Doris中建立外表时,与外部数据库相映射的表名。 | -| **table_type** | 在Doris中建立外表时,该表来自那个数据库。例如mysql,postgresql,sqlserver,oracle | +| 参数 | 说明| +| ---------------- | ------------ | +| **type** | "jdbc", 必填项标志资源类型 | +| **user** | 访问外表数据库所使的用户名 | +| **password** | 该用户对应的密码信息 | +| **jdbc_url** | JDBC的URL协议,包括数据库类型,IP地址,端口号和数据库名,不同数据库协议格式不一样。例如mysql: "jdbc:mysql://127.0.0.1:3306/test"。| +| **driver_class** | 访问外表数据库的驱动包类名,例如mysql是:com.mysql.jdbc.Driver. | +| **driver_url** | 用于下载访问外部数据库的jar包驱动URL。http://IP:port/mysql-connector-java-5.1.47.jar | +| **resource** | 在Doris中建立外表时依赖的资源名,对应上步创建资源时的名字。| +| **table** | 在Doris中建立外表时,与外部数据库相映射的表名。| +| **table_type** | 在Doris中建立外表时,该表来自那个数据库。例如mysql,postgresql,sqlserver,oracle| ### 查询用法 @@ -83,6 +83,25 @@ PROPERTIES ( select * from mysql_table where k1 > 1000 and k3 ='term'; ``` +### 数据写入 + +在Doris中建立JDBC外表后,可以通过insert into语句直接写入数据,也可以将Doris执行完查询之后的结果写入JDBC外表,或者是从一个JDBC外表将数据导入另一个JDBC外表。 + + +``` +insert into mysql_table values(1, "doris"); +insert into mysql_table select * from table; +``` +#### 事务 + +Doris的数据是由一组batch的方式写入外部表的,如果中途导入中断,之前写入数据可能需要回滚。所以JDBC外表支持数据写入时的事务,事务的支持需要通过设置session variable: `enable_odbc_transcation `(ODBC事务也受此变量控制)。 + +``` +set enable_odbc_transcation = true; +``` + +事务保证了JDBC外表数据写入的原子性,但是一定程度上会降低数据写入的性能,可以考虑酌情开启该功能。 + #### 1.Mysql测试 | Mysql版本 | Mysql JDBC驱动版本 | @@ -94,12 +113,12 @@ select * from mysql_table where k1 > 1000 and k3 ='term'; | -------------- | ----------------------- | | 14.5 | postgresql-42.5.0.jar | -#### 2 SQLServer测试 +#### 3.SQLServer测试 | SQLserver版本 | SQLserver JDBC驱动版本 | | ------------- | -------------------------- | | 2022 | mssql-jdbc-11.2.0.jre8.jar | -#### 2.oracle测试 +#### 4.oracle测试 | Oracle版本 | Oracle JDBC驱动版本 | | ---------- | ------------------- | | 11 | ojdbc6.jar | diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java index f8158d4666..8e481da21c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java @@ -20,6 +20,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.MaterializedIndexMeta; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; @@ -201,6 +202,13 @@ public class DescribeStmt extends ShowStmt { odbcTable.getOdbcDriver(), odbcTable.getOdbcTableTypeName()); totalRows.add(row); + } else if (table.getType() == TableType.JDBC) { + isOlapTable = false; + JdbcTable jdbcTable = (JdbcTable) table; + List row = Arrays.asList(jdbcTable.getJdbcUrl(), jdbcTable.getJdbcUser(), + jdbcTable.getJdbcPasswd(), jdbcTable.getDriverClass(), jdbcTable.getDriverUrl(), + jdbcTable.getExternalTableName(), jdbcTable.getResourceName(), jdbcTable.getJdbcTypeName()); + totalRows.add(row); } else if (table.getType() == TableType.MYSQL) { isOlapTable = false; MysqlTable mysqlTable = (MysqlTable) table; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index e28aa6a700..5048e96a61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -207,6 +207,7 @@ public class ExportStmt extends StatementBase { switch (tblType) { case MYSQL: case ODBC: + case JDBC: case OLAP: break; case BROKER: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index 9a1aac6216..cc40a5512d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; @@ -355,7 +356,8 @@ public class InsertStmt extends DdlStmt { } // will use it during create load job indexIdToSchemaHash = olapTable.getIndexIdToSchemaHash(); - } else if (targetTable instanceof MysqlTable || targetTable instanceof OdbcTable) { + } else if (targetTable instanceof MysqlTable || targetTable instanceof OdbcTable + || targetTable instanceof JdbcTable) { if (targetPartitionNames != null) { ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index 883a624ad5..5c868ac3f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -55,13 +55,13 @@ import java.util.Map; * DROP RESOURCE "jdbc_mysql"; */ public class JdbcResource extends Resource { - public static final String URL = "jdbc_url"; - public static final String USER = "user"; - public static final String PASSWORD = "password"; - public static final String DRIVER_CLASS = "driver_class"; - public static final String DRIVER_URL = "driver_url"; - public static final String TYPE = "type"; - public static final String CHECK_SUM = "checksum"; + private static final String URL = "jdbc_url"; + private static final String USER = "user"; + private static final String PASSWORD = "password"; + private static final String DRIVER_CLASS = "driver_class"; + private static final String DRIVER_URL = "driver_url"; + private static final String TYPE = "type"; + private static final String CHECK_SUM = "checksum"; private static final Logger LOG = LogManager.getLogger(JdbcResource.class); // timeout for both connection and read. 10 seconds is long enough. private static final int HTTP_TIMEOUT_MS = 10000; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java index 1045ca746b..11e2d2b302 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java @@ -28,6 +28,7 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; import com.google.common.base.Strings; +import com.google.common.collect.Maps; import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,13 +47,24 @@ public class JdbcTable extends Table { private static final String TABLE = "table"; private static final String RESOURCE = "resource"; private static final String TABLE_TYPE = "table_type"; - // TODO: We may have to support various types of databases like ODBC - // map now jdbc external table Doris support now + private static final String URL = "jdbc_url"; + private static final String USER = "user"; + private static final String PASSWORD = "password"; + private static final String DRIVER_CLASS = "driver_class"; + private static final String DRIVER_URL = "driver_url"; + private static final String CHECK_SUM = "checksum"; private static Map TABLE_TYPE_MAP; private String resourceName; private String externalTableName; private String jdbcTypeName; + private String jdbcUrl; + private String jdbcUser; + private String jdbcPasswd; + private String driverClass; + private String driverUrl; + private String checkSum; + static { Map tempMap = new HashMap<>(); tempMap.put("mysql", TOdbcTableType.MYSQL); @@ -72,18 +84,49 @@ public class JdbcTable extends Table { validate(properties); } + public String getCheckSum() { + return checkSum; + } + + public String getExternalTableName() { + return externalTableName; + } + + public String getJdbcTypeName() { + return jdbcTypeName; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public String getJdbcUser() { + return jdbcUser; + } + + public String getJdbcPasswd() { + return jdbcPasswd; + } + + public String getDriverClass() { + return driverClass; + } + + public String getDriverUrl() { + return driverUrl; + } + @Override public TTableDescriptor toThrift() { TJdbcTable tJdbcTable = new TJdbcTable(); - JdbcResource jdbcResource = (JdbcResource) (Env.getCurrentEnv().getResourceMgr().getResource(resourceName)); - tJdbcTable.setJdbcUrl(jdbcResource.getProperty(JdbcResource.URL)); - tJdbcTable.setJdbcUser(jdbcResource.getProperty(JdbcResource.USER)); - tJdbcTable.setJdbcPassword(jdbcResource.getProperty(JdbcResource.PASSWORD)); + tJdbcTable.setJdbcUrl(jdbcUrl); + tJdbcTable.setJdbcUser(jdbcUser); + tJdbcTable.setJdbcPassword(jdbcPasswd); tJdbcTable.setJdbcTableName(externalTableName); - tJdbcTable.setJdbcDriverClass(jdbcResource.getProperty(JdbcResource.DRIVER_CLASS)); - tJdbcTable.setJdbcDriverUrl(jdbcResource.getProperty(JdbcResource.DRIVER_URL)); - tJdbcTable.setJdbcResourceName(jdbcResource.getName()); - tJdbcTable.setJdbcDriverChecksum(jdbcResource.getProperty(JdbcResource.CHECK_SUM)); + tJdbcTable.setJdbcDriverClass(driverClass); + tJdbcTable.setJdbcDriverUrl(driverUrl); + tJdbcTable.setJdbcResourceName(resourceName); + tJdbcTable.setJdbcDriverChecksum(checkSum); TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.JDBC_TABLE, fullSchema.size(), 0, getName(), ""); @@ -94,17 +137,50 @@ public class JdbcTable extends Table { @Override public void write(DataOutput out) throws IOException { super.write(out); - Text.writeString(out, externalTableName); - Text.writeString(out, resourceName); - Text.writeString(out, jdbcTypeName); + Map serializeMap = Maps.newHashMap(); + serializeMap.put(TABLE, externalTableName); + serializeMap.put(RESOURCE, resourceName); + serializeMap.put(TABLE_TYPE, jdbcTypeName); + serializeMap.put(URL, jdbcUrl); + serializeMap.put(USER, jdbcUser); + serializeMap.put(PASSWORD, jdbcPasswd); + serializeMap.put(DRIVER_CLASS, driverClass); + serializeMap.put(DRIVER_URL, driverUrl); + serializeMap.put(CHECK_SUM, checkSum); + + int size = (int) serializeMap.values().stream().filter(v -> { + return v != null; + }).count(); + out.writeInt(size); + + for (Map.Entry kv : serializeMap.entrySet()) { + if (kv.getValue() != null) { + Text.writeString(out, kv.getKey()); + Text.writeString(out, kv.getValue()); + } + } } @Override public void readFields(DataInput in) throws IOException { super.readFields(in); - externalTableName = Text.readString(in); - resourceName = Text.readString(in); - jdbcTypeName = Text.readString(in); + + int size = in.readInt(); + Map serializeMap = Maps.newHashMap(); + for (int i = 0; i < size; i++) { + String key = Text.readString(in); + String value = Text.readString(in); + serializeMap.put(key, value); + } + externalTableName = serializeMap.get(TABLE); + resourceName = serializeMap.get(RESOURCE); + jdbcTypeName = serializeMap.get(TABLE_TYPE); + jdbcUrl = serializeMap.get(URL); + jdbcUser = serializeMap.get(USER); + jdbcPasswd = serializeMap.get(PASSWORD); + driverClass = serializeMap.get(DRIVER_CLASS); + driverUrl = serializeMap.get(DRIVER_URL); + checkSum = serializeMap.get(CHECK_SUM); } public String getResourceName() { @@ -125,18 +201,17 @@ public class JdbcTable extends Table { @Override public String getSignature(int signatureVersion) { - JdbcResource jdbcResource = (JdbcResource) (Env.getCurrentEnv().getResourceMgr().getResource(resourceName)); StringBuilder sb = new StringBuilder(signatureVersion); sb.append(name); sb.append(type); sb.append(resourceName); sb.append(externalTableName); - sb.append(jdbcTypeName); - sb.append(jdbcResource.getProperty(JdbcResource.URL)); - sb.append(jdbcResource.getProperty(JdbcResource.USER)); - sb.append(jdbcResource.getProperty(JdbcResource.PASSWORD)); - sb.append(jdbcResource.getProperty(JdbcResource.DRIVER_CLASS)); - sb.append(jdbcResource.getProperty(JdbcResource.DRIVER_URL)); + sb.append(jdbcUrl); + sb.append(jdbcUser); + sb.append(jdbcPasswd); + sb.append(driverClass); + sb.append(driverUrl); + sb.append(checkSum); String md5 = DigestUtils.md5Hex(sb.toString()); LOG.debug("get signature of odbc table {}: {}. signature string: {}", name, md5, sb.toString()); @@ -181,5 +256,13 @@ public class JdbcTable extends Table { if (resource.getType() != ResourceType.JDBC) { throw new DdlException("resource [" + resourceName + "] is not jdbc resource"); } + + JdbcResource jdbcResource = (JdbcResource) resource; + jdbcUrl = jdbcResource.getProperty(URL); + jdbcUser = jdbcResource.getProperty(USER); + jdbcPasswd = jdbcResource.getProperty(PASSWORD); + driverClass = jdbcResource.getProperty(DRIVER_CLASS); + driverUrl = jdbcResource.getProperty(DRIVER_URL); + checkSum = jdbcResource.getProperty(CHECK_SUM); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 5fe50768b3..31c3ac918c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -38,6 +38,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.PrimitiveType; @@ -55,6 +56,7 @@ import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.ExportSink; +import org.apache.doris.planner.JdbcScanNode; import org.apache.doris.planner.MysqlScanNode; import org.apache.doris.planner.OdbcScanNode; import org.apache.doris.planner.OlapScanNode; @@ -417,6 +419,9 @@ public class ExportJob implements Writable { case MYSQL: scanNode = new MysqlScanNode(new PlanNodeId(0), exportTupleDesc, (MysqlTable) this.exportTable); break; + case JDBC: + scanNode = new JdbcScanNode(new PlanNodeId(0), exportTupleDesc, (JdbcTable) this.exportTable); + break; default: break; } @@ -446,6 +451,7 @@ public class ExportJob implements Writable { new PlanFragmentId(nextId.getAndIncrement()), scanNode, DataPartition.RANDOM); break; case ODBC: + case JDBC: case MYSQL: fragment = new PlanFragment( new PlanFragmentId(nextId.getAndIncrement()), scanNode, DataPartition.UNPARTITIONED); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java index 3e0ff32d06..2d65fd2035 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java @@ -20,6 +20,7 @@ package org.apache.doris.planner; +import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.Table; @@ -65,6 +66,8 @@ public abstract class DataSink { return new MysqlTableSink((MysqlTable) table); } else if (table instanceof OdbcTable) { return new OdbcTableSink((OdbcTable) table); + } else if (table instanceof JdbcTable) { + return new JdbcTableSink((JdbcTable) table); } else { throw new AnalysisException("Unknown table type " + table.getType()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index c578e6f822..e9623a1fb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -271,7 +271,7 @@ public class DistributedPlanner { * TODO: hbase scans are range-partitioned on the row key */ private PlanFragment createScanFragment(PlanNode node) throws UserException { - if (node instanceof MysqlScanNode || node instanceof OdbcScanNode) { + if (node instanceof MysqlScanNode || node instanceof OdbcScanNode || node instanceof JdbcScanNode) { return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.UNPARTITIONED); } else if (node instanceof SchemaScanNode) { return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java new file mode 100644 index 0000000000..eefe8a2a00 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.catalog.JdbcTable; +import org.apache.doris.catalog.OdbcTable; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TDataSink; +import org.apache.doris.thrift.TDataSinkType; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TJdbcTable; +import org.apache.doris.thrift.TJdbcTableSink; +import org.apache.doris.thrift.TOdbcTableType; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class JdbcTableSink extends DataSink { + private static final Logger LOG = LogManager.getLogger(JdbcTableSink.class); + + private final String resourceName; + private final String externalTableName; + private final String dorisTableName; + private final String jdbcUrl; + private final String jdbcUser; + private final String jdbcPasswd; + private final String driverClass; + private final String driverUrl; + private final String checkSum; + private final TOdbcTableType jdbcType; + private final boolean useTransaction; + + public JdbcTableSink(JdbcTable jdbcTable) { + resourceName = jdbcTable.getResourceName(); + jdbcType = jdbcTable.getJdbcTableType(); + externalTableName = OdbcTable.databaseProperName(jdbcType, jdbcTable.getExternalTableName()); + useTransaction = ConnectContext.get().getSessionVariable().isEnableOdbcTransaction(); + jdbcUrl = jdbcTable.getJdbcUrl(); + jdbcUser = jdbcTable.getJdbcUser(); + jdbcPasswd = jdbcTable.getJdbcPasswd(); + driverClass = jdbcTable.getDriverClass(); + driverUrl = jdbcTable.getDriverUrl(); + checkSum = jdbcTable.getCheckSum(); + dorisTableName = jdbcTable.getName(); + } + + @Override + public String getExplainString(String prefix, TExplainLevel explainLevel) { + StringBuilder strBuilder = new StringBuilder(); + strBuilder.append(prefix + "JDBC TABLE SINK:\n"); + strBuilder.append(prefix + "TABLENAME IN DORIS: ").append(dorisTableName).append("\n"); + strBuilder.append(prefix + "TABLE TYPE: ").append(jdbcType.toString()).append("\n"); + strBuilder.append(prefix + "TABLENAME OF EXTERNAL TABLE: ").append(externalTableName).append("\n"); + strBuilder.append(prefix + "EnableTransaction: ").append(useTransaction ? "true" : "false").append("\n"); + return strBuilder.toString(); + } + + @Override + protected TDataSink toThrift() { + TDataSink tDataSink = new TDataSink(TDataSinkType.JDBC_TABLE_SINK); + TJdbcTableSink jdbcTableSink = new TJdbcTableSink(); + TJdbcTable jdbcTable = new TJdbcTable(); + jdbcTableSink.setJdbcTable(jdbcTable); + jdbcTableSink.jdbc_table.setJdbcUrl(jdbcUrl); + jdbcTableSink.jdbc_table.setJdbcUser(jdbcUser); + jdbcTableSink.jdbc_table.setJdbcPassword(jdbcPasswd); + jdbcTableSink.jdbc_table.setJdbcTableName(externalTableName); + jdbcTableSink.jdbc_table.setJdbcDriverUrl(driverUrl); + jdbcTableSink.jdbc_table.setJdbcDriverClass(driverClass); + jdbcTableSink.jdbc_table.setJdbcDriverChecksum(checkSum); + jdbcTableSink.jdbc_table.setJdbcResourceName(resourceName); + jdbcTableSink.setUseTransaction(useTransaction); + + tDataSink.setJdbcTableSink(jdbcTableSink); + return tDataSink; + } + + @Override + public PlanNodeId getExchNodeId() { + return null; + } + + @Override + public DataPartition getOutputPartition() { + return null; + } +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index ce1e3d6df1..0468801683 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -76,7 +76,6 @@ public class JdbcExecutor { } } - public int querySQL(String sql) throws UdfRuntimeException { try { boolean res = stmt.execute(sql); @@ -84,14 +83,44 @@ public class JdbcExecutor { resultSet = stmt.getResultSet(); resultSetMetaData = resultSet.getMetaData(); return resultSetMetaData.getColumnCount(); - } else { //TODO: update query - return 0; + } else { + return stmt.getUpdateCount(); } } catch (SQLException e) { throw new UdfRuntimeException("JDBC executor sql has error: ", e); } } + public void openTrans() throws UdfRuntimeException { + try { + if (conn != null) { + conn.setAutoCommit(false); + } + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor open transaction has error: ", e); + } + } + + public void commitTrans() throws UdfRuntimeException { + try { + if (conn != null) { + conn.commit(); + } + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor commit transaction has error: ", e); + } + } + + public void rollbackTrans() throws UdfRuntimeException { + try { + if (conn != null) { + conn.rollback(); + } + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor rollback transaction has error: ", e); + } + } + public List> getBlock(int batchSize) throws UdfRuntimeException { List> block = null; try { diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 1ca7ad45fc..5678041d17 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -34,6 +34,7 @@ enum TDataSinkType { MEMORY_SCRATCH_SINK, ODBC_TABLE_SINK, RESULT_FILE_SINK, + JDBC_TABLE_SINK, } enum TResultSinkType { @@ -103,6 +104,11 @@ struct TOdbcTableSink { 3: optional bool use_transaction } +struct TJdbcTableSink { + 1: optional Descriptors.TJdbcTable jdbc_table + 2: optional bool use_transaction +} + struct TExportSink { 1: required Types.TFileType file_type 2: required string export_path @@ -145,5 +151,6 @@ struct TDataSink { 8: optional TMemoryScratchSink memory_scratch_sink 9: optional TOdbcTableSink odbc_table_sink 10: optional TResultFileSink result_file_sink + 11: optional TJdbcTableSink jdbc_table_sink }