From 7592f52d2e20266e2755cee74242e90226355f57 Mon Sep 17 00:00:00 2001 From: pengxiangyu Date: Wed, 21 Jul 2021 10:54:11 +0800 Subject: [PATCH] [Feature][Insert] Add transaction for the operation of insert #6244 (#6245) ## Proposed changes Add transaction for the operation of insert. It will cost less time than non-transaction(it will cost 1/1000 time) when you want to insert a amount of rows. ### Syntax ``` BEGIN [ WITH LABEL label]; INSERT INTO table_name ... [COMMIT | ROLLBACK]; ``` ### Example commit a transaction: ``` begin; insert into Tbl values(11, 22, 33); commit; ``` rollback a transaction: ``` begin; insert into Tbl values(11, 22, 33); rollback; ``` commit a transaction with label: ``` begin with label test_label; insert into Tbl values(11, 22, 33); commit; ``` ### Description ``` begin: begin a transaction, the next insert will execute in the transaction until commit/rollback; commit: commit the transaction, the data in the transaction will be inserted into the table; rollback: abort the transaction, nothing will be inserted into the table; ``` ### The main realization principle: ``` 1. begin a transaction in the session. next sql is executed in the transaction; 2. insert sql will be parser and get the database name and table name, they will be used to select a be and create a pipe to accept data; 3. all inserted values will be sent to the be and write into the pipe; 4. a thread will get the data from the pipe, then write them to disk; 5. commit will complete this transaction and make these data visible; 6. rollback will abort this transaction ``` ### Some restrictions on the use of update syntax. 1. Only ```insert``` can be called in a transaction. 2. If something error happened, ```commit``` will not succeed, it will ```rollback``` directly; 3. By default, if part of insert in the transaction is invalid, ```commit``` will only insert the other correct data into the table. 4. If you need ```commit``` return failed when any insert in the transaction is invalid, you need execute ```set enable_insert_strict = true``` before ```begin```. --- be/src/common/utils.h | 4 + be/src/exec/CMakeLists.txt | 1 + be/src/exec/broker_scanner.cpp | 89 ++-- be/src/exec/broker_scanner.h | 2 + be/src/exec/plain_binary_line_reader.cpp | 48 ++ be/src/exec/plain_binary_line_reader.h | 40 ++ be/src/runtime/fragment_mgr.cpp | 72 ++- be/src/runtime/fragment_mgr.h | 5 + .../runtime/stream_load/stream_load_context.h | 2 + .../stream_load/stream_load_executor.cpp | 27 +- .../stream_load/stream_load_executor.h | 4 + be/src/runtime/stream_load/stream_load_pipe.h | 36 +- be/src/service/internal_service.cpp | 62 +++ be/src/service/internal_service.h | 12 + .../sql-statements/Data Manipulation/BEGIN.md | 92 ++++ .../Data Manipulation/SHOW TRANSACTION.md | 10 +- .../sql-statements/Data Manipulation/BEGIN.md | 93 ++++ .../Data Manipulation/SHOW TRANSACTION.md | 9 +- fe/fe-core/src/main/cup/sql_parser.cup | 35 +- .../java/org/apache/doris/analysis/Expr.java | 3 + .../org/apache/doris/analysis/InsertStmt.java | 18 +- .../org/apache/doris/analysis/IntLiteral.java | 62 +-- .../doris/analysis/ShowTransactionStmt.java | 24 +- .../doris/analysis/TransactionBeginStmt.java | 45 ++ .../doris/analysis/TransactionCommitStmt.java | 22 + .../analysis/TransactionRollbackStmt.java | 22 + .../doris/analysis/TransactionStmt.java | 34 ++ .../java/org/apache/doris/common/Config.java | 7 + .../org/apache/doris/planner/Planner.java | 2 +- .../org/apache/doris/qe/ConnectContext.java | 48 ++ .../org/apache/doris/qe/ConnectScheduler.java | 1 + .../doris/qe/InsertStreamTxnExecutor.java | 188 +++++++ .../apache/doris/qe/MasterTxnExecutor.java | 141 +++++ .../org/apache/doris/qe/ShowExecutor.java | 9 +- .../org/apache/doris/qe/StmtExecutor.java | 504 ++++++++++++++---- .../doris/rpc/BackendServiceClient.java | 12 + .../apache/doris/rpc/BackendServiceProxy.java | 43 ++ .../doris/service/FrontendServiceImpl.java | 76 ++- .../transaction/DatabaseTransactionMgr.java | 14 + .../transaction/GlobalTransactionMgr.java | 53 ++ .../doris/transaction/TransactionEntry.java | 119 +++++ .../doris/transaction/TransactionState.java | 13 +- .../org/apache/doris/qe/StmtExecutorTest.java | 3 +- gensrc/proto/internal_service.proto | 36 ++ gensrc/thrift/FrontendService.thrift | 21 + gensrc/thrift/PaloInternalService.thrift | 13 + gensrc/thrift/PlanNodes.thrift | 1 + 47 files changed, 1939 insertions(+), 238 deletions(-) create mode 100644 be/src/exec/plain_binary_line_reader.cpp create mode 100644 be/src/exec/plain_binary_line_reader.h create mode 100644 docs/en/sql-reference/sql-statements/Data Manipulation/BEGIN.md create mode 100644 docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BEGIN.md create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionBeginStmt.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionCommitStmt.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionRollbackStmt.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionStmt.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/qe/MasterTxnExecutor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java diff --git a/be/src/common/utils.h b/be/src/common/utils.h index 479848317e..50c6344475 100644 --- a/be/src/common/utils.h +++ b/be/src/common/utils.h @@ -28,6 +28,7 @@ struct AuthInfo { std::string user_ip; // -1 as unset int64_t auth_code = -1; + std::string auth_code_uuid = ""; }; template @@ -39,6 +40,9 @@ void set_request_auth(T* req, const AuthInfo& auth) { // so they have to be set. req->user = ""; req->passwd = ""; + } else if (auth.auth_code_uuid != "") { + req->__isset.auth_code_uuid = true; + req->auth_code_uuid = auth.auth_code_uuid; } else { req->user = auth.user; req->passwd = auth.passwd; diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 27ef5ed911..9e826ccf99 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -54,6 +54,7 @@ set(EXEC_FILES olap_common.cpp tablet_info.cpp tablet_sink.cpp + plain_binary_line_reader.cpp plain_text_line_reader.cpp csv_scan_node.cpp csv_scanner.cpp diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index b275647143..058b3e8aef 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -25,6 +25,7 @@ #include "exec/decompressor.h" #include "exec/exec_node.h" #include "exec/local_file_reader.h" +#include "exec/plain_binary_line_reader.h" #include "exec/plain_text_line_reader.h" #include "exec/s3_reader.h" #include "exec/text_converter.h" @@ -222,6 +223,7 @@ Status BrokerScanner::create_decompressor(TFileFormatType::type type) { switch (type) { case TFileFormatType::FORMAT_CSV_PLAIN: case TFileFormatType::FORMAT_JSON: + case TFileFormatType::FORMAT_PROTO: compress_type = CompressType::UNCOMPRESSED; break; case TFileFormatType::FORMAT_CSV_GZ: @@ -279,6 +281,7 @@ Status BrokerScanner::open_line_reader() { // _decompressor may be NULL if this is not a compressed file RETURN_IF_ERROR(create_decompressor(range.format_type)); + _file_format_type = range.format_type; // open line reader switch (range.format_type) { case TFileFormatType::FORMAT_CSV_PLAIN: @@ -290,6 +293,9 @@ Status BrokerScanner::open_line_reader() { _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, _cur_decompressor, size, _line_delimiter, _line_delimiter_length); break; + case TFileFormatType::FORMAT_PROTO: + _cur_line_reader = new PlainBinaryLineReader(_cur_file_reader); + break; default: { std::stringstream ss; ss << "Unknown format type, cannot init line reader, type=" << range.format_type; @@ -326,40 +332,53 @@ void BrokerScanner::close() { void BrokerScanner::split_line(const Slice& line) { _split_values.clear(); - const char* value = line.data; - size_t start = 0; // point to the start pos of next col value. - size_t curpos = 0; // point to the start pos of separator matching sequence. - size_t p1 = 0; // point to the current pos of separator matching sequence. + if (_file_format_type == TFileFormatType::FORMAT_PROTO) { + PDataRow** ptr = reinterpret_cast(line.data); + PDataRow* row = *ptr; + for (const PDataColumn& col : (row)->col()) { + int len = col.value().size(); + uint8_t* buf = new uint8_t[len]; + memcpy(buf, col.value().c_str(), len); + _split_values.emplace_back(buf, len); + } + delete row; + delete ptr; + } else { + const char *value = line.data; + size_t start = 0; // point to the start pos of next col value. + size_t curpos = 0; // point to the start pos of separator matching sequence. + size_t p1 = 0; // point to the current pos of separator matching sequence. - // Separator: AAAA - // - // curpos - // ▼ - // AAAA - // 1000AAAA2000AAAA - // ▲ ▲ - // Start │ - // p1 + // Separator: AAAA + // + // curpos + // ▼ + // AAAA + // 1000AAAA2000AAAA + // ▲ ▲ + // Start │ + // p1 - while (curpos < line.size) { - if (*(value + curpos + p1) != _value_separator[p1]) { - // Not match, move forward: - curpos += (p1 == 0 ? 1 : p1); - p1 = 0; - } else { - p1++; - if (p1 == _value_separator_length) { - // Match a separator - _split_values.emplace_back(value + start, curpos - start); - start = curpos + _value_separator_length; - curpos = start; + while (curpos < line.size) { + if (*(value + curpos + p1) != _value_separator[p1]) { + // Not match, move forward: + curpos += (p1 == 0 ? 1 : p1); p1 = 0; + } else { + p1++; + if (p1 == _value_separator_length) { + // Match a separator + _split_values.emplace_back(value + start, curpos - start); + start = curpos + _value_separator_length; + curpos = start; + p1 = 0; + } } } - } - CHECK(curpos == line.size) << curpos << " vs " << line.size; - _split_values.emplace_back(value + start, curpos - start); + CHECK(curpos == line.size) << curpos << " vs " << line.size; + _split_values.emplace_back(value + start, curpos - start); + } } void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p, @@ -454,7 +473,7 @@ bool BrokerScanner::convert_one_row(const Slice& line, Tuple* tuple, MemPool* tu // Convert one row to this tuple bool BrokerScanner::line_to_src_tuple(const Slice& line) { - if (!validate_utf8(line.data, line.size)) { + if (_file_format_type != TFileFormatType::FORMAT_PROTO && !validate_utf8(line.data, line.size)) { std::stringstream error_msg; error_msg << "data is not encoded by UTF-8"; _state->append_error_msg_to_file("Unable to display", error_msg.str()); @@ -474,7 +493,11 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { << _value_separator << "], " << "line delimiter: [" << _line_delimiter << "], " << "schema number: " << _src_slot_descs.size() << "; "; - _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str()); + if (_file_format_type == TFileFormatType::FORMAT_PROTO) { + _state->append_error_msg_to_file("", error_msg.str()); + } else { + _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str()); + } _counter->num_rows_filtered++; return false; } else if (_split_values.size() + columns_from_path.size() > _src_slot_descs.size()) { @@ -484,7 +507,11 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { << _value_separator << "], " << "line delimiter: [" << _line_delimiter << "], " << "schema number: " << _src_slot_descs.size() << "; "; - _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str()); + if (_file_format_type == TFileFormatType::FORMAT_PROTO) { + _state->append_error_msg_to_file("", error_msg.str()); + } else { + _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str()); + } _counter->num_rows_filtered++; return false; } diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 4578a77bf7..1da3e11d67 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -27,6 +27,7 @@ #include "exec/base_scanner.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/Types_types.h" +#include "gen_cpp/internal_service.pb.h" #include "runtime/mem_pool.h" #include "util/runtime_profile.h" #include "util/slice.h" @@ -99,6 +100,7 @@ private: std::string _value_separator; std::string _line_delimiter; + TFileFormatType::type _file_format_type; int _value_separator_length; int _line_delimiter_length; diff --git a/be/src/exec/plain_binary_line_reader.cpp b/be/src/exec/plain_binary_line_reader.cpp new file mode 100644 index 0000000000..e680e15023 --- /dev/null +++ b/be/src/exec/plain_binary_line_reader.cpp @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/plain_binary_line_reader.h" + +#include "common/status.h" +#include "exec/file_reader.h" + +namespace doris { + +PlainBinaryLineReader::PlainBinaryLineReader(FileReader* file_reader) + : _file_reader(file_reader) { +} + +PlainBinaryLineReader::~PlainBinaryLineReader() { + close(); +} + +void PlainBinaryLineReader::close() { +} + +Status PlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) { + std::unique_ptr file_buf; + int64_t read_size = 0; + RETURN_IF_ERROR(_file_reader->read_one_message(&file_buf, &read_size)); + *ptr = file_buf.release(); + *size = read_size; + if (read_size == 0) { + *eof = true; + } + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/exec/plain_binary_line_reader.h b/be/src/exec/plain_binary_line_reader.h new file mode 100644 index 0000000000..9e1143b60c --- /dev/null +++ b/be/src/exec/plain_binary_line_reader.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/line_reader.h" + +namespace doris { + +class FileReader; + +class PlainBinaryLineReader : public LineReader { +public: + PlainBinaryLineReader(FileReader* file_reader); + + virtual ~PlainBinaryLineReader(); + + virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override; + + virtual void close() override; + +private: + FileReader* _file_reader; +}; + +} // namespace doris diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index a7832f8a6e..415e188fd1 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -40,6 +40,9 @@ #include "runtime/exec_env.h" #include "runtime/plan_fragment_executor.h" #include "runtime/runtime_filter_mgr.h" +#include "runtime/stream_load/stream_load_pipe.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" #include "service/backend_options.h" #include "util/debug_util.h" #include "util/doris_metrics.h" @@ -136,6 +139,9 @@ public: std::shared_ptr get_fragments_ctx() { return _fragments_ctx; } + void set_pipe(std::shared_ptr pipe) { _pipe = pipe; } + std::shared_ptr get_pipe() const { return _pipe; } + private: void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done); @@ -166,6 +172,8 @@ private: std::shared_ptr _fragments_ctx; std::shared_ptr _merge_controller_handler; + // The pipe for data transfering, such as insert. + std::shared_ptr _pipe; }; FragmentExecState::FragmentExecState(const TUniqueId& query_id, @@ -239,6 +247,9 @@ Status FragmentExecState::cancel_before_execute() { // set status as 'abort', cuz cancel() won't effect the status arg of DataSink::close(). _executor.set_abort(); _executor.cancel(); + if (_pipe != nullptr) { + _pipe->cancel(); + } return Status::OK(); } @@ -249,6 +260,9 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) { _executor.set_is_report_on_cancel(false); } _executor.cancel(); + if (_pipe != nullptr) { + _pipe->cancel(); + } return Status::OK(); } @@ -457,7 +471,63 @@ void FragmentMgr::_exec_actual(std::shared_ptr exec_state, Fi } Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { - return exec_plan_fragment(params, std::bind(&empty_function, std::placeholders::_1)); + if (params.txn_conf.need_txn) { + StreamLoadContext* stream_load_cxt = new StreamLoadContext(_exec_env); + stream_load_cxt->db = params.txn_conf.db; + stream_load_cxt->db_id = params.txn_conf.db_id; + stream_load_cxt->table = params.txn_conf.tbl; + stream_load_cxt->txn_id = params.txn_conf.txn_id; + stream_load_cxt->id = UniqueId(params.params.query_id); + stream_load_cxt->put_result.params = params; + stream_load_cxt->use_streaming = true; + stream_load_cxt->load_type = TLoadType::MANUL_LOAD; + stream_load_cxt->load_src_type = TLoadSourceType::RAW; + stream_load_cxt->label = params.import_label; + stream_load_cxt->format = TFileFormatType::FORMAT_CSV_PLAIN; + stream_load_cxt->timeout_second = 3600; + stream_load_cxt->auth.auth_code_uuid = params.txn_conf.auth_code_uuid; + stream_load_cxt->need_commit_self = true; + stream_load_cxt->need_rollback = true; + // total_length == -1 means read one message from pipe in once time, don't care the length. + auto pipe = std::make_shared( + 1024 * 1024 /* max_buffered_bytes */, + 64 * 1024 /* min_chunk_size */, + -1 /* total_length */, + true /* use_proto */); + stream_load_cxt->body_sink = pipe; + stream_load_cxt->max_filter_ratio = params.txn_conf.max_filter_ratio; + + RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_cxt->id, pipe)); + + RETURN_IF_ERROR( + _exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt, pipe)); + set_pipe(params.params.fragment_instance_id, pipe); + return Status::OK(); + } else { + return exec_plan_fragment(params, std::bind(&empty_function, std::placeholders::_1)); + } +} + +void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr pipe) { + { + std::lock_guard lock(_lock); + auto iter = _fragment_map.find(fragment_instance_id); + if (iter != _fragment_map.end()) { + _fragment_map[fragment_instance_id]->set_pipe(pipe); + } + } +} + +std::shared_ptr FragmentMgr::get_pipe(const TUniqueId& fragment_instance_id) { + { + std::lock_guard lock(_lock); + auto iter = _fragment_map.find(fragment_instance_id); + if (iter != _fragment_map.end()) { + return _fragment_map[fragment_instance_id]->get_pipe(); + } else { + return nullptr; + } + } } Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) { diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index a6329cb87a..2ea0cba4db 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -48,6 +48,7 @@ class TExecPlanFragmentParams; class TExecPlanFragmentParamsList; class TUniqueId; class RuntimeFilterMergeController; +class StreamLoadPipe; std::string to_load_error_http_path(const std::string& file_name); @@ -88,6 +89,10 @@ public: Status merge_filter(const PMergeFilterRequest* request, const char* attach_data); + void set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr pipe); + + std::shared_ptr get_pipe(const TUniqueId& fragment_instance_id); + private: void _exec_actual(std::shared_ptr exec_state, FinishCallback cb); diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index aeca593605..f6e7b4ae96 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -126,6 +126,7 @@ public: UniqueId id; std::string db; + int64_t db_id = -1; std::string table; std::string label; // optional @@ -193,6 +194,7 @@ public: // to identified a specified data consumer. int64_t consumer_id; + bool need_commit_self = false; public: ExecEnv* exec_env() { return _exec_env; } diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 3c9a237b7e..781792bb93 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -42,6 +42,10 @@ Status k_stream_load_plan_status; #endif Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { + return execute_plan_fragment(ctx, nullptr); +} + +Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx, std::shared_ptr pipe) { DorisMetrics::instance()->txn_exec_plan_total->increment(1); // submit this params #ifndef BE_TEST @@ -50,7 +54,7 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << print_id(ctx->put_result.params.params.query_id); auto st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.params, [ctx](PlanFragmentExecutor* executor) { + ctx->put_result.params, [ctx, pipe, this](PlanFragmentExecutor* executor) { ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos()); Status status = executor->status(); if (status.ok()) { @@ -105,6 +109,15 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; ctx->promise.set_value(status); + if (ctx->need_commit_self && pipe != nullptr) { + if (pipe->closed() || !status.ok()) { + ctx->status = status; + this->rollback_txn(ctx); + } else { + this->commit_txn(ctx); + } + } + if (ctx->unref()) { delete ctx; } @@ -119,7 +132,6 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { #endif return Status::OK(); } - Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { DorisMetrics::instance()->txn_begin_request_total->increment(1); @@ -156,6 +168,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { return status; } ctx->txn_id = result.txnId; + if (result.__isset.db_id) { + ctx->db_id = result.db_id; + } ctx->need_rollback = true; return Status::OK(); @@ -167,6 +182,10 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { TLoadTxnCommitRequest request; set_request_auth(&request, ctx->auth); request.db = ctx->db; + if (ctx->db_id > 0) { + request.db_id = ctx->db_id; + request.__isset.db_id = true; + } request.tbl = ctx->table; request.txnId = ctx->txn_id; request.sync = true; @@ -217,6 +236,10 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { TLoadTxnRollbackRequest request; set_request_auth(&request, ctx->auth); request.db = ctx->db; + if (ctx->db_id > 0) { + request.db_id = ctx->db_id; + request.__isset.db_id = true; + } request.tbl = ctx->table; request.txnId = ctx->txn_id; request.__set_reason(ctx->status.get_error_msg()); diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h index 127e710ee5..4ba791c238 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -17,12 +17,15 @@ #pragma once +#include + namespace doris { class ExecEnv; class StreamLoadContext; class Status; class TTxnCommitAttachment; +class StreamLoadPipe; class StreamLoadExecutor { public: @@ -36,6 +39,7 @@ public: Status execute_plan_fragment(StreamLoadContext* ctx); + Status execute_plan_fragment(StreamLoadContext* ctx, std::shared_ptr pipe); private: // collect the load statistics from context and set them to stat // return true if stat is set, otherwise, return false diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index 95b0adfba7..3912a0fcae 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -25,6 +25,7 @@ #include "runtime/message_body_sink.h" #include "util/bit_util.h" #include "util/byte_buffer.h" +#include "gen_cpp/internal_service.pb.h" namespace doris { @@ -33,22 +34,24 @@ namespace doris { class StreamLoadPipe : public MessageBodySink, public FileReader { public: StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024, - int64_t total_length = -1) + int64_t total_length = -1, bool use_proto = false) : _buffered_bytes(0), + _proto_buffered_bytes(0), _max_buffered_bytes(max_buffered_bytes), _min_chunk_size(min_chunk_size), _total_length(total_length), + _use_proto(use_proto), _finished(false), _cancelled(false) {} virtual ~StreamLoadPipe() {} Status open() override { return Status::OK(); } - Status append_and_flush(const char* data, size_t size) { + Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0) { ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1)); buf->put_bytes(data, size); buf->flip(); - return _append(buf); + return _append(buf, proto_byte_size); } Status append(const char* data, size_t size) override { @@ -210,23 +213,38 @@ private: _buf_queue.pop_front(); _buffered_bytes -= buf->limit; + if (_use_proto) { + PDataRow** ptr = reinterpret_cast(data->get()); + _proto_buffered_bytes -= (sizeof(PDataRow*) + (*ptr)->GetCachedSize()); + } _put_cond.notify_one(); return Status::OK(); } - Status _append(const ByteBufferPtr& buf) { + Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0) { { std::unique_lock l(_lock); // if _buf_queue is empty, we append this buf without size check - while (!_cancelled && !_buf_queue.empty() && - _buffered_bytes + buf->remaining() > _max_buffered_bytes) { - _put_cond.wait(l); + if (_use_proto) { + while (!_cancelled && !_buf_queue.empty() && + (_proto_buffered_bytes + proto_byte_size > _max_buffered_bytes)) { + _put_cond.wait(l); + } + } else { + while (!_cancelled && !_buf_queue.empty() && + _buffered_bytes + buf->remaining() > _max_buffered_bytes) { + _put_cond.wait(l); + } } if (_cancelled) { return Status::InternalError("cancelled"); } _buf_queue.push_back(buf); - _buffered_bytes += buf->remaining(); + if (_use_proto) { + _proto_buffered_bytes += proto_byte_size; + } else { + _buffered_bytes += buf->remaining(); + } } _get_cond.notify_one(); return Status::OK(); @@ -235,6 +253,7 @@ private: // Blocking queue std::mutex _lock; size_t _buffered_bytes; + size_t _proto_buffered_bytes; size_t _max_buffered_bytes; size_t _min_chunk_size; // The total amount of data expected to be read. @@ -245,6 +264,7 @@ private: // and the length is unknown. // size_t is unsigned, so use int64_t int64_t _total_length = -1; + bool _use_proto = false; std::deque _buf_queue; std::condition_variable _put_cond; std::condition_variable _get_cond; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index a24f35695a..fbe7cbb451 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -288,6 +288,67 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* co st.to_protobuf(response->mutable_status()); } +template +void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, + const PSendDataRequest* request, + PSendDataResult* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + for (int i = 0; i < request->data_size(); ++i) { + PDataRow* row = new PDataRow(); + row->CopyFrom(request->data(i)); + pipe->append_and_flush(reinterpret_cast(&row), sizeof(row), sizeof(row) + row->ByteSize()); + } + response->mutable_status()->set_status_code(0); + } +} + +template +void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, + const PCommitRequest* request, + PCommitResult* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->finish(); + response->mutable_status()->set_status_code(0); + } +} + +template +void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, + const PRollbackRequest* request, + PRollbackResult* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->cancel(); + response->mutable_status()->set_status_code(0); + } +} + template void PInternalServiceImpl::fold_constant_expr( google::protobuf::RpcController* cntl_base, @@ -324,6 +385,7 @@ Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_reque FoldConstantMgr mgr(_exec_env); return mgr.fold_constant_expr(t_request, response); } + template class PInternalServiceImpl; template class PInternalServiceImpl; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index b51680db93..9a4f2400c3 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -93,6 +93,18 @@ public: ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) override; + void send_data(google::protobuf::RpcController* controller, + const PSendDataRequest* request, + PSendDataResult* response, + google::protobuf::Closure* done); + void commit(google::protobuf::RpcController* controller, + const PCommitRequest* request, + PCommitResult* response, + google::protobuf::Closure* done); + void rollback(google::protobuf::RpcController* controller, + const PRollbackRequest* request, + PRollbackResult* response, + google::protobuf::Closure* done); void fold_constant_expr(google::protobuf::RpcController* controller, const PConstantExprRequest* request, PConstantExprResult* response, diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BEGIN.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BEGIN.md new file mode 100644 index 0000000000..069eeeb786 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BEGIN.md @@ -0,0 +1,92 @@ +--- +{ + "title": "BEGIN", + "language": "en" +} +--- + + + +# BEGIN, COMMIT, ROLLBACK +## Description +### Syntax + +``` +BEGIN; +INSERT INTO table_name ... +COMMIT; +``` +``` +BEGIN [ WITH LABEL label]; +INSERT INTO table_name ... +ROLLBACK; +``` +### Parameters + +> label: the label for this transaction, if you need to set it to a string. + +### Note + +A transaction can only be used on insert, nor update or delete. You can check the state of this transaction by `SHOW TRANSACTION WHERE LABEL = 'label'` + +## example + +1. Begin a transaction without a label, then commit it + +``` +BEGIN +INSERT INTO test VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, DEFAULT); +INSERT INTO test (c1) VALUES (1); +COMMIT: +``` + +All the data in the sql between `begin` and `commit` will be inserted into the table. + +2. Begin a transaction without a label, then abort it + +``` +BEGIN +INSERT INTO test VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, DEFAULT); +INSERT INTO test (c1) VALUES (1); +ROLLBACK: +``` + +All the data in the sql between `begin` and `rollback` will be aborted, nothing will be inserted into the table. + +3. Begin a transaction with a label, then commit it + +``` +BEGIN WITH LABEL test_label1 +INSERT INTO test VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, DEFAULT); +INSERT INTO test (c1) VALUES (1); +COMMIT: +``` + +All the data in the sql between `begin` and `commit` will be inserted into the table. +The label of `test_label1` will be set to mark this transaction. You can check this transaction by `SHOW TRANSACTION WHERE LABEL = 'test_label1'`. + +## keyword +BEGIN, COMMIT, ROLLBACK diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md index 0639f15356..310636d458 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md @@ -27,14 +27,16 @@ under the License. # SHOW TRANSACTION ## description -This syntax is used to view transaction details for the specified transaction id. +This syntax is used to view transaction details for the specified transaction id or label name. grammar: ``` SHOW TRANSACTION [FROM db_name] -WHERE id = transaction_id; +WHERE +[id = transaction_id] +[label = label_name]; ``` Example return result: @@ -81,6 +83,10 @@ ErrorReplicasCount: 0 SHOW TRANSACTION FROM db WHERE ID = 4005; +3. View the transaction with label `label_name`: + + SHOW TRANSACTION WHERE LABEL = 'label_name'; + ## keyword SHOW, TRANSACTION \ No newline at end of file diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BEGIN.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BEGIN.md new file mode 100644 index 0000000000..a286a3f785 --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BEGIN.md @@ -0,0 +1,93 @@ +--- +{ + "title": "BEGIN", + "language": "zh-CN" +} +--- + + + +# BEGIN, COMMIT, ROLLBACK +## Description +### Syntax + +``` +BEGIN; +INSERT INTO table_name ... +COMMIT; +``` + +``` +BEGIN [ WITH LABEL label]; +INSERT INTO table_name ... +ROLLBACK; +``` +### Parameters + +> label: 用于指定当前事务的标签名。 + +### Note + +事务只能对insert使用,而不能对update和delete使用,当指定标签时,可通过以下命令检查事务的运行状态: `SHOW TRANSACTION WHERE LABEL = 'label'` + +## example + +1. 开启一个事务,不指定标签,执行insert后提交。 + +``` +BEGIN +INSERT INTO test VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, DEFAULT); +INSERT INTO test (c1) VALUES (1); +COMMIT: +``` + +所有在`begin`和`commit`之间的数据会被插入到test表中。 + +2. 开启一个事务,不指定标签,执行insert后,回滚。 + +``` +BEGIN +INSERT INTO test VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, DEFAULT); +INSERT INTO test (c1) VALUES (1); +ROLLBACK: +``` + +所有在`begin`和`commit`之间的数据会取消,没有任何数据插入到test表中。 + +3. 开启一个事务,指定标签为test_label1,执行insert后提交。 + +``` +BEGIN WITH LABEL test_label1 +INSERT INTO test VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, 2); +INSERT INTO test (c1, c2) VALUES (1, DEFAULT); +INSERT INTO test (c1) VALUES (1); +COMMIT: +``` + +所有在`begin`和`commit`之间的数据会被插入到test表中。 +标签`test_label1`用于标记该事务,可以通过以下命令来检查事务的状态:`SHOW TRANSACTION WHERE LABEL = 'test_label1'`。 + +## keyword +BEGIN, COMMIT, ROLLBACK diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md index ad9dabf55f..f3e83b568c 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md @@ -27,14 +27,16 @@ under the License. # SHOW TRANSACTION ## description -该语法用于查看指定 transaction id 的事务详情。 +该语法用于查看指定 transaction id 或 label 的事务详情。 语法: ``` SHOW TRANSACTION [FROM db_name] -WHERE id = transaction_id; +WHERE +[id = transaction_id] +[label = label_name]; ``` 返回结果示例: @@ -81,6 +83,9 @@ ErrorReplicasCount: 0 SHOW TRANSACTION FROM db WHERE ID=4005; +3. 查看 label 为 label_name的事务: + SHOW TRANSACTION WHERE LABEL = 'label_name'; + ## keyword SHOW, TRANSACTION diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index caa9ade91c..c3c3ed1407 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -292,10 +292,11 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt, show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt, describe_stmt, alter_stmt, use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt, - link_stmt, migrate_stmt, enter_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, + link_stmt, migrate_stmt, enter_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt, import_preceding_filter_stmt; +nonterminal String transaction_label; nonterminal ImportColumnDesc import_column_desc; nonterminal List import_column_descs; @@ -689,6 +690,8 @@ stmt ::= {: RESULT = stmt; :} | restore_stmt : stmt {: RESULT = stmt; :} + | transaction_stmt : stmt + {: RESULT = stmt; :} | unsupported_stmt : stmt {: RESULT = stmt; :} | export_stmt : stmt @@ -4810,20 +4813,38 @@ truncate_stmt ::= :} ; -unsupported_stmt ::= - KW_START KW_TRANSACTION opt_with_consistent_snapshot:v +transaction_stmt ::= + KW_BEGIN {: - RESULT = new UnsupportedStmt(); + RESULT = new TransactionBeginStmt(); :} - | KW_BEGIN opt_work:work + | KW_BEGIN KW_WITH KW_LABEL transaction_label:label {: - RESULT = new UnsupportedStmt(); + RESULT = new TransactionBeginStmt(label); :} | KW_COMMIT opt_work opt_chain opt_release {: - RESULT = new UnsupportedStmt(); + RESULT = new TransactionCommitStmt(); :} | KW_ROLLBACK opt_work opt_chain opt_release + {: + RESULT = new TransactionRollbackStmt(); + :} + ; + +transaction_label ::= + /* empty */ + {: + RESULT = null; + :} + | ident:label + {: + RESULT = label; + :} + ; + +unsupported_stmt ::= + KW_START KW_TRANSACTION opt_with_consistent_snapshot:v {: RESULT = new UnsupportedStmt(); :} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 2823d98c14..b6b503f5e9 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -279,6 +279,8 @@ abstract public class Expr extends TreeNode implements ParseNode, Cloneabl return isAnalyzed; } + public void checkValueValid() throws AnalysisException {} + public ExprId getId() { return id; } @@ -1779,4 +1781,5 @@ abstract public class Expr extends TreeNode implements ParseNode, Cloneabl } return false; } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index 8c6e0d6f7d..3b961bdb25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -187,6 +187,10 @@ public class InsertStmt extends DdlStmt { return tblName.getDb(); } + public String getTbl() { + return tblName.getTbl(); + } + public void getTables(Analyzer analyzer, Map tableMap, Set parentViewNameSet) throws AnalysisException { // get dbs of statement queryStmt.getTables(analyzer, tableMap, parentViewNameSet); @@ -291,6 +295,10 @@ public class InsertStmt extends DdlStmt { analyzePlanHints(analyzer); + if (analyzer.getContext().isTxnModel()) { + return; + } + // create data sink createDataSink(); @@ -298,10 +306,10 @@ public class InsertStmt extends DdlStmt { // create label and begin transaction long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS(); + if (Strings.isNullOrEmpty(label)) { + label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId()); + } if (!isExplain() && !isTransactionBegin) { - if (Strings.isNullOrEmpty(label)) { - label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId()); - } if (targetTable instanceof OlapTable) { LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; @@ -720,7 +728,9 @@ public class InsertStmt extends DdlStmt { if (col.getDataType().equals(expr.getType().getPrimitiveType())) { return expr; } - return expr.castTo(col.getType()); + Expr newExpr = expr.castTo(col.getType()); + newExpr.checkValueValid(); + return newExpr; } public void prepareExpressions() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java index 64b3973c24..65aee31997 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java @@ -66,37 +66,7 @@ public class IntLiteral extends LiteralExpr { public IntLiteral(long longValue, Type type) throws AnalysisException { super(); - boolean valid = true; - switch (type.getPrimitiveType()) { - case TINYINT: - if (longValue < TINY_INT_MIN || longValue > TINY_INT_MAX) { - valid = false; - } - break; - case SMALLINT: - if (longValue < SMALL_INT_MIN || longValue > SMALL_INT_MAX) { - valid = false; - } - break; - case INT: - if (longValue < INT_MIN || longValue > INT_MAX) { - valid = false; - } - break; - case BIGINT: - if (longValue < BIG_INT_MIN) { - valid = false; - } - // no need to check upper bound - break; - default: - valid = false; - break; - } - - if (!valid) { - throw new AnalysisException("Number out of range[" + value + "]. type: " + type); - } + checkValueValid(longValue, type); this.value = longValue; this.type = type; @@ -107,11 +77,28 @@ public class IntLiteral extends LiteralExpr { super(); long longValue = -1L; try { - longValue = Long.valueOf(value); + longValue = Long.parseLong(value); } catch (NumberFormatException e) { throw new AnalysisException("Invalid number format: " + value); } + checkValueValid(longValue, type); + this.value = longValue; + this.type = type; + analysisDone(); + } + + protected IntLiteral(IntLiteral other) { + super(other); + value = other.value; + } + + @Override + public void checkValueValid() throws AnalysisException { + checkValueValid(value, type); + } + + private void checkValueValid(long longValue, Type type) throws AnalysisException { boolean valid = true; switch (type.getPrimitiveType()) { case TINYINT: @@ -141,17 +128,8 @@ public class IntLiteral extends LiteralExpr { } if (!valid) { - throw new AnalysisException("Number out of range[" + value + "]. type: " + type); + throw new AnalysisException("Number out of range[" + longValue + "]. type: " + type); } - - this.value = longValue; - this.type = type; - analysisDone(); - } - - protected IntLiteral(IntLiteral other) { - super(other); - value = other.value; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java index 0115247f3b..2c42cf5ca8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java @@ -40,7 +40,8 @@ public class ShowTransactionStmt extends ShowStmt { private String dbName; private Expr whereClause; - private long txnId; + private long txnId = -1; + private String label = ""; public ShowTransactionStmt(String dbName, Expr whereClause) { this.dbName = dbName; @@ -55,6 +56,10 @@ public class ShowTransactionStmt extends ShowStmt { return txnId; } + public String getLabel() { + return label; + } + @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); @@ -99,22 +104,17 @@ public class ShowTransactionStmt extends ShowStmt { break CHECK; } String leftKey = ((SlotRef) whereClause.getChild(0)).getColumnName(); - if (!leftKey.equalsIgnoreCase("id")) { + if (leftKey.equalsIgnoreCase("id") && (whereClause.getChild(1) instanceof IntLiteral)) { + txnId = ((IntLiteral) whereClause.getChild(1)).getLongValue(); + } else if (leftKey.equalsIgnoreCase("label") && (whereClause.getChild(1) instanceof StringLiteral)) { + label = ((StringLiteral) whereClause.getChild(1)).getStringValue(); + } else { valid = false; - break CHECK; } - - // right child - if (!(whereClause.getChild(1) instanceof IntLiteral)) { - valid = false; - break CHECK; - } - - txnId = ((IntLiteral) whereClause.getChild(1)).getLongValue(); } if (!valid) { - throw new AnalysisException("Where clause should looks like: id = 123"); + throw new AnalysisException("Where clause should looks like one of them: id = 123 or label = 'label'"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionBeginStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionBeginStmt.java new file mode 100644 index 0000000000..7e631b536e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionBeginStmt.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.transaction.TransactionEntry; + +public class TransactionBeginStmt extends TransactionStmt { + private String label = null; + public TransactionBeginStmt() { + this.label = ""; + } + public TransactionBeginStmt(final String label) { + this.label = label; + } + @Override + public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + if (label == null || label.isEmpty()) { + label = "txn_insert_" + DebugUtil.printId(analyzer.getContext().queryId()); + } + if (analyzer.getContext().getTxnEntry() == null) { + analyzer.getContext().setTxnEntry(new TransactionEntry()); + } + analyzer.getContext().getTxnEntry().setLabel(label); + super.analyze(analyzer); + } + +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionCommitStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionCommitStmt.java new file mode 100644 index 0000000000..08bca1845e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionCommitStmt.java @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +public class TransactionCommitStmt extends TransactionStmt { + +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionRollbackStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionRollbackStmt.java new file mode 100644 index 0000000000..d62028e6d3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionRollbackStmt.java @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +public class TransactionRollbackStmt extends TransactionStmt { + +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionStmt.java new file mode 100644 index 0000000000..1aeb635ae3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionStmt.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; + +public class TransactionStmt extends StatementBase { + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + super.analyze(analyzer); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index b77a8ceaeb..3f0155e90b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -432,6 +432,13 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int publish_version_timeout_second = 30; // 30 seconds + /** + * Maximal waiting time for all data inserted before one transaction to be committed + * This is the timeout second for the command "commit" + */ + @ConfField(mutable = true, masterOnly = true) + public static int commit_timeout_second = 30; // 30 seconds + /** * minimal intervals between two publish version action */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java index 6fd8d6e69f..86b2fd2abd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java @@ -225,7 +225,7 @@ public class Planner { RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot()); } - if (statement instanceof InsertStmt) { + if (statement instanceof InsertStmt && !analyzer.getContext().isTxnModel()) { InsertStmt insertStmt = (InsertStmt) statement; rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments); rootFragment.setSink(insertStmt.getDataSink()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index d76252685c..8f1dad822c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -19,7 +19,9 @@ package org.apache.doris.qe; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.UserException; import org.apache.doris.mysql.MysqlCapability; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; @@ -27,6 +29,7 @@ import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.TransactionEntry; import com.google.common.collect.Lists; @@ -64,6 +67,9 @@ public class ConnectContext { protected volatile boolean isKilled; // Db protected volatile String currentDb = ""; + protected volatile long currentDbId = -1; + // Transaction + protected volatile TransactionEntry txnEntry = null; // cluster name protected volatile String clusterName = ""; // username@host of current login user @@ -154,6 +160,30 @@ public class ConnectContext { queryDetail = null; } + public boolean isTxnModel() { + return txnEntry != null && txnEntry.isTxnModel(); + } + public boolean isTxnIniting() { + return txnEntry != null && txnEntry.isTxnIniting(); + } + public boolean isTxnBegin() { + return txnEntry != null && txnEntry.isTxnBegin(); + } + public void closeTxn() { + if (isTxnModel()) { + if (isTxnBegin()) { + try { + Catalog.getCurrentGlobalTransactionMgr().abortTransaction( + currentDbId, txnEntry.getTxnConf().getTxnId(), "timeout"); + } catch (UserException e) { + LOG.error("db: {}, txnId: {}, rollback error.", currentDb, + txnEntry.getTxnConf().getTxnId(), e); + } + } + txnEntry = null; + } + } + // Just for unit test public void resetSessionVariables() { sessionVariable = VariableMgr.newSessionVariable(); @@ -199,6 +229,18 @@ public class ConnectContext { threadLocalInfo.set(this); } + public long getCurrentDbId() { + return currentDbId; + } + + public TransactionEntry getTxnEntry() { + return txnEntry; + } + + public void setTxnEntry(TransactionEntry txnEntry) { + this.txnEntry = txnEntry; + } + public TResourceInfo toResourceCtx() { return new TResourceInfo(qualifiedUser, sessionVariable.getResourceGroup()); } @@ -315,6 +357,12 @@ public class ConnectContext { public void setDatabase(String db) { currentDb = db; + Database database = Catalog.getCurrentCatalog().getDb(db); + if (database == null) { + currentDbId = -1; + } else { + currentDbId = database.getId(); + } } public void setExecutor(StmtExecutor executor) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index ae838f2748..80f7a9242f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -113,6 +113,7 @@ public class ConnectScheduler { } public synchronized void unregisterConnection(ConnectContext ctx) { + ctx.closeTxn(); if (connectionMap.remove((long) ctx.getConnectionId()) != null) { numberConnection--; AtomicInteger conns = connByUser.get(ctx.getQualifiedUser()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java new file mode 100644 index 0000000000..9106006e9f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.StreamLoadPlanner; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.task.StreamLoadTask; +import org.apache.doris.thrift.TBrokerRangeDesc; +import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TScanRangeParams; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TStreamLoadPutRequest; +import org.apache.doris.thrift.TTxnParams; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.TransactionEntry; + +import org.apache.thrift.TException; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class InsertStreamTxnExecutor { + private long txnId; + private TUniqueId loadId; + private TransactionEntry txnEntry; + + public InsertStreamTxnExecutor(TransactionEntry txnEntry) { + this.txnEntry = txnEntry; + } + + public void beginTransaction(TStreamLoadPutRequest request) throws UserException, TException, TimeoutException, + InterruptedException, ExecutionException { + TTxnParams txnConf = txnEntry.getTxnConf(); + StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); + StreamLoadPlanner planner = new StreamLoadPlanner(txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask); + TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); + List beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIds( + 1, true, true, txnEntry.getDb().getClusterName()); + if (beIds == null || beIds.isEmpty()) { + throw new UserException("there is no scanNode Backend."); + } + + tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); + for (Map.Entry> entry : tRequest.params.per_node_scan_ranges.entrySet()) { + for (TScanRangeParams scanRangeParams : entry.getValue()) { + for (TBrokerRangeDesc desc : scanRangeParams.scan_range.broker_scan_range.ranges) { + desc.setFormatType(TFileFormatType.FORMAT_PROTO); + } + } + } + txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id); + + Backend backend = Catalog.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); + txnConf.setUserIp(backend.getHost()); + txnEntry.setBackend(backend); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + try { + Future future = BackendServiceProxy.getInstance().execPlanFragmentAsync( + address, tRequest); + InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList()); + } + } catch (RpcException e) { + throw new TException(e); + } + } + + public void commitTransaction() throws TException, TimeoutException, + InterruptedException, ExecutionException { + TTxnParams txnConf = txnEntry.getTxnConf(); + Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder() + .setHi(txnConf.getFragmentInstanceId().getHi()) + .setLo(txnConf.getFragmentInstanceId().getLo()).build(); + + Backend backend = txnEntry.getBackend(); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + try { + Future future = BackendServiceProxy.getInstance().commit(address, fragmentInstanceId); + InternalService.PCommitResult result = future.get(5, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new TException("failed to commit txn: " + result.getStatus().getErrorMsgsList()); + } + } catch (RpcException e) { + throw new TException(e); + } + } + + public void abortTransaction() throws TException, TimeoutException, + InterruptedException, ExecutionException { + TTxnParams txnConf = txnEntry.getTxnConf(); + Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder() + .setHi(txnConf.getFragmentInstanceId().getHi()) + .setLo(txnConf.getFragmentInstanceId().getLo()).build(); + + Backend be = txnEntry.getBackend(); + TNetworkAddress address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + try { + Future future = BackendServiceProxy.getInstance().rollback(address, + fragmentInstanceId); + InternalService.PRollbackResult result = future.get(5, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new TException("failed to rollback txn: " + result.getStatus().getErrorMsgsList()); + } + } catch (RpcException e) { + throw new TException(e); + } + } + + public void sendData() throws TException, TimeoutException, + InterruptedException, ExecutionException { + if (txnEntry.getDataToSend() == null || txnEntry.getDataToSend().isEmpty()) { + return; + } + + TTxnParams txnConf = txnEntry.getTxnConf(); + Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder() + .setHi(txnConf.getFragmentInstanceId().getHi()) + .setLo(txnConf.getFragmentInstanceId().getLo()).build(); + + Backend backend = txnEntry.getBackend(); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + try { + Future future = BackendServiceProxy.getInstance().sendData( + address, fragmentInstanceId, txnEntry.getDataToSend()); + InternalService.PSendDataResult result = future.get(5, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new TException("failed to insert data: " + result.getStatus().getErrorMsgsList()); + } + txnEntry.clearDataToSend(); + } catch (RpcException e) { + throw new TException(e); + } + } + + public TUniqueId getLoadId() { + return loadId; + } + + public void setLoadId(TUniqueId loadId) { + this.loadId = loadId; + } + + public long getTxnId() { + return txnId; + } + + public void setTxnId(long txnId) { + this.txnId = txnId; + } + + public TransactionEntry getTxnEntry() { + return txnEntry; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterTxnExecutor.java new file mode 100644 index 0000000000..917c4cc307 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterTxnExecutor.java @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.ClientPool; +import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.TLoadTxnBeginRequest; +import org.apache.doris.thrift.TLoadTxnBeginResult; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TWaitingTxnStatusRequest; +import org.apache.doris.thrift.TWaitingTxnStatusResult; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + +public class MasterTxnExecutor { + private static final Logger LOG = LogManager.getLogger(MasterTxnExecutor.class); + + private int waitTimeoutMs; + // the total time of thrift connectTime add readTime and writeTime + private int thriftTimeoutMs; + private ConnectContext ctx; + + public MasterTxnExecutor(ConnectContext ctx) { + this.ctx = ctx; + this.waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000; + this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000; + } + + private TNetworkAddress getMasterAddress() throws TException { + if (!ctx.getCatalog().isReady()) { + throw new TException("Node catalog is not ready, please wait for a while."); + } + String masterHost = ctx.getCatalog().getMasterIp(); + int masterRpcPort = ctx.getCatalog().getMasterRpcPort(); + return new TNetworkAddress(masterHost, masterRpcPort); + } + + private FrontendService.Client getClient(TNetworkAddress thriftAddress) throws TException { + try { + return ClientPool.frontendPool.borrowObject(thriftAddress, thriftTimeoutMs); + } catch (Exception e) { + // may throw NullPointerException. add err msg + throw new TException("Failed to get master client.", e); + } + } + // Send request to Master + public TLoadTxnBeginResult beginTxn(TLoadTxnBeginRequest request) throws TException { + TNetworkAddress thriftAddress = getMasterAddress(); + + FrontendService.Client client = getClient(thriftAddress); + + LOG.info("Send begin transaction {} to Master {}", ctx.getStmtId(), thriftAddress); + + boolean isReturnToPool = false; + try { + TLoadTxnBeginResult result = client.loadTxnBegin(request); + isReturnToPool = true; + if (result.getStatus().getStatusCode() != TStatusCode.OK) { + throw new TException("begin txn failed."); + } + return result; + } catch (TTransportException e) { + boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); + if (!ok) { + throw e; + } + if (e.getType() == TTransportException.TIMED_OUT) { + throw e; + } else { + TLoadTxnBeginResult result = client.loadTxnBegin(request); + isReturnToPool = true; + return result; + } + } finally { + if (isReturnToPool) { + ClientPool.frontendPool.returnObject(thriftAddress, client); + } else { + ClientPool.frontendPool.invalidateObject(thriftAddress, client); + } + } + } + + public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws TException { + TNetworkAddress thriftAddress = getMasterAddress(); + + FrontendService.Client client = getClient(thriftAddress); + + LOG.info("Send waiting transaction status {} to Master {}", ctx.getStmtId(), thriftAddress); + + boolean isReturnToPool = false; + try { + TWaitingTxnStatusResult result = client.waitingTxnStatus(request); + isReturnToPool = true; + if (result.getStatus().getStatusCode() != TStatusCode.OK) { + throw new TException("get txn status failed."); + } + return result; + } catch (TTransportException e) { + boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); + if (!ok) { + throw e; + } + if (e.getType() == TTransportException.TIMED_OUT) { + throw e; + } else { + TWaitingTxnStatusResult result = client.waitingTxnStatus(request); + if (result.getStatus().getStatusCode() != TStatusCode.OK) { + throw new TException("commit failed."); + } + isReturnToPool = true; + return result; + } + } finally { + if (isReturnToPool) { + ClientPool.frontendPool.returnObject(thriftAddress, client); + } else { + ClientPool.frontendPool.invalidateObject(thriftAddress, client); + } + } + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index fecc3e1b20..8c54369571 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -1853,8 +1853,15 @@ public class ShowExecutor { ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName()); } - long txnId = showStmt.getTxnId(); + Long txnId = showStmt.getTxnId(); + String label = showStmt.getLabel(); GlobalTransactionMgr transactionMgr = Catalog.getCurrentGlobalTransactionMgr(); + if (!label.isEmpty()) { + txnId = transactionMgr.getTransactionId(db.getId(), label); + if (txnId == null) { + throw new AnalysisException("transaction with label " + label + " does not exist"); + } + } resultSet = new ShowResultSet(showStmt.getMetaData(), transactionMgr.getSingleTranInfo(db.getId(), txnId)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index f9b0ca9471..5938712360 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.ExportStmt; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.KillStmt; +import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.OutFileClause; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.RedirectStatus; @@ -38,10 +39,15 @@ import org.apache.doris.analysis.SqlScanner; import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.StmtRewriter; import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.analysis.TransactionBeginStmt; +import org.apache.doris.analysis.TransactionCommitStmt; +import org.apache.doris.analysis.TransactionRollbackStmt; +import org.apache.doris.analysis.TransactionStmt; import org.apache.doris.analysis.UnsupportedStmt; import org.apache.doris.analysis.UseStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; @@ -80,13 +86,25 @@ import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.LoadEtlTask; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TLoadTxnBeginRequest; +import org.apache.doris.thrift.TLoadTxnBeginResult; +import org.apache.doris.thrift.TMergeType; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TResultBatch; +import org.apache.doris.thrift.TStreamLoadPutRequest; +import org.apache.doris.thrift.TTxnParams; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TWaitingTxnStatusRequest; +import org.apache.doris.thrift.TWaitingTxnStatusResult; import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionCommitFailedException; +import org.apache.doris.transaction.TransactionEntry; +import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Strings; @@ -96,6 +114,7 @@ import com.google.protobuf.ByteString; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import org.glassfish.jersey.internal.guava.Sets; import java.io.IOException; @@ -105,6 +124,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -115,6 +136,7 @@ public class StmtExecutor implements ProfileWriter { private static final Logger LOG = LogManager.getLogger(StmtExecutor.class); private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0); + private static final int MAX_DATA_TO_SEND_FOR_TXN = 100; private ConnectContext context; private MysqlSerializer serializer; @@ -264,22 +286,28 @@ public class StmtExecutor implements ProfileWriter { context.setQueryId(queryId); try { + if (context.isTxnModel() && !(parsedStmt instanceof InsertStmt) + && !(parsedStmt instanceof TransactionStmt)) { + throw new TException("This is in a transaction, only insert, commit, rollback is acceptable."); + } // support select hint e.g. select /*+ SET_VAR(query_timeout=1) */ sleep(3); analyzeVariablesInStmt(); - // analyze this query - SessionVariable sessionVariable = context.getSessionVariable(); - analyze(sessionVariable.toThrift()); - if (isForwardToMaster()) { - forwardToMaster(); - if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) { - // If the query id changed in master, we set it in context. - // WARN: when query timeout, this code may not be reach. - context.setQueryId(masterOpExecutor.getQueryId()); + if (!context.isTxnModel()) { + // analyze this query + analyze(context.getSessionVariable().toThrift()); + if (isForwardToMaster()) { + forwardToMaster(); + if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) { + context.setQueryId(masterOpExecutor.getQueryId()); + } + return; + } else { + LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId()); } - return; } else { - LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId()); + analyzer = new Analyzer(context.getCatalog(), context); + parsedStmt.analyze(analyzer); } if (parsedStmt instanceof QueryStmt) { @@ -320,6 +348,8 @@ public class StmtExecutor implements ProfileWriter { handleEnterStmt(); } else if (parsedStmt instanceof UseStmt) { handleUseStmt(); + } else if (parsedStmt instanceof TransactionStmt) { + handleTransactionStmt(); } else if (parsedStmt instanceof CreateTableAsSelectStmt) { handleInsertStmt(); } else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InserStmt is its subclass @@ -378,7 +408,7 @@ public class StmtExecutor implements ProfileWriter { LOG.warn("failed to revert Session value.", e); context.getState().setError(e.getMessage()); } - if (parsedStmt instanceof InsertStmt) { + if (!context.isTxnModel() && parsedStmt instanceof InsertStmt) { InsertStmt insertStmt = (InsertStmt) parsedStmt; // The transaction of a insert operation begin at analyze phase. // So we should abort the transaction at this finally block if it encounter exception. @@ -852,6 +882,231 @@ public class StmtExecutor implements ProfileWriter { plannerProfile.setQueryFetchResultFinishTime(); } + + private TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception { + TWaitingTxnStatusResult statusResult = null; + if (Catalog.getCurrentCatalog().isMaster()) { + statusResult = Catalog.getCurrentGlobalTransactionMgr() + .getWaitingTxnStatus(request); + } else { + MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context); + statusResult = masterTxnExecutor.getWaitingTxnStatus(request); + } + return statusResult; + } + + private void handleTransactionStmt() throws Exception { + // Every time set no send flag and clean all data in buffer + context.getMysqlChannel().reset(); + context.getState().setOk(0, 0, ""); + // create plan + if (context.getTxnEntry() != null && context.getTxnEntry().getRowsInTransaction() == 0 + && (parsedStmt instanceof TransactionCommitStmt || parsedStmt instanceof TransactionRollbackStmt)) { + context.setTxnEntry(null); + } else if (parsedStmt instanceof TransactionBeginStmt) { + if (context.isTxnModel()) { + LOG.info("A transaction has already begin"); + return; + } + TTxnParams txnParams = new TTxnParams(); + txnParams.setNeedTxn(true).setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl(""); + if (context.getSessionVariable().getEnableInsertStrict()) { + txnParams.setMaxFilterRatio(0); + } else { + txnParams.setMaxFilterRatio(1.0); + } + if (context.getTxnEntry() == null) { + context.setTxnEntry(new TransactionEntry()); + } + TransactionEntry txnEntry = context.getTxnEntry(); + txnEntry.setTxnConf(txnParams); + StringBuilder sb = new StringBuilder(); + sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'") + .append(TransactionStatus.PREPARE.name()); + sb.append("', 'txnId':'").append("'").append("}"); + context.getState().setOk(0, 0, sb.toString()); + } else if (parsedStmt instanceof TransactionCommitStmt) { + if (!context.isTxnModel()) { + LOG.info("No transaction to commit"); + return; + } + + TTxnParams txnConf = context.getTxnEntry().getTxnConf(); + try { + InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(context.getTxnEntry()); + if (context.getTxnEntry().getDataToSend().size() > 0) { + // send rest data + executor.sendData(); + } + // commit txn + executor.commitTransaction(); + + // wait txn visible + TWaitingTxnStatusRequest request = new TWaitingTxnStatusRequest(); + request.setDbId(txnConf.getDbId()).setTxnId(txnConf.getTxnId()); + request.setLabelIsSet(false); + request.setTxnIdIsSet(true); + + TWaitingTxnStatusResult statusResult = getWaitingTxnStatus(request); + TransactionStatus txnStatus = TransactionStatus.valueOf(statusResult.getTxnStatusId()); + if (txnStatus == TransactionStatus.COMMITTED) { + throw new AnalysisException("transaction commit successfully, BUT data will be visible later."); + } else if (txnStatus != TransactionStatus.VISIBLE) { + String errMsg = "commit failed, rollback."; + if (statusResult.getStatus().isSetErrorMsgs() + && statusResult.getStatus().getErrorMsgs().size() > 0) { + errMsg = String.join(". ", statusResult.getStatus().getErrorMsgs()); + } + throw new AnalysisException(errMsg); + } + StringBuilder sb = new StringBuilder(); + sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'") + .append(txnStatus.name()).append("', 'txnId':'") + .append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}"); + context.getState().setOk(0, 0, sb.toString()); + } catch (Exception e) { + throw new AnalysisException(e.getMessage()); + } finally { + context.setTxnEntry(null); + } + } else if (parsedStmt instanceof TransactionRollbackStmt) { + if (!context.isTxnModel()) { + LOG.info("No transaction to rollback"); + return; + } + try { + // abort txn + InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(context.getTxnEntry()); + executor.abortTransaction(); + + StringBuilder sb = new StringBuilder(); + sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'") + .append(TransactionStatus.ABORTED.name()).append("', 'txnId':'") + .append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}"); + context.getState().setOk(0, 0, sb.toString()); + } catch (Exception e) { + throw new AnalysisException(e.getMessage()); + } finally { + context.setTxnEntry(null); + } + } else { + throw new TException("parsedStmt type is not TransactionStmt"); + } + } + + public int executeForTxn(InsertStmt insertStmt) + throws UserException, TException, InterruptedException, ExecutionException, TimeoutException { + if (context.isTxnIniting()) { // first time, begin txn + beginTxn(insertStmt.getDb(), insertStmt.getTbl()); + } + if (!context.getTxnEntry().getTxnConf().getDb().equals(insertStmt.getDb()) || + !context.getTxnEntry().getTxnConf().getTbl().equals(insertStmt.getTbl())) { + throw new TException("Only one table can be inserted in one transaction."); + } + + QueryStmt queryStmt = insertStmt.getQueryStmt(); + if (!(queryStmt instanceof SelectStmt)) { + throw new TException("queryStmt is not SelectStmt, insert command error"); + } + TransactionEntry txnEntry = context.getTxnEntry(); + SelectStmt selectStmt = (SelectStmt) queryStmt; + int effectRows = 0; + if (selectStmt.getValueList() != null) { + Table tbl = txnEntry.getTable(); + int schemaSize = tbl.getBaseSchema(false).size(); + for (List row : selectStmt.getValueList().getRows()) { + // the value columns are columns which are visible to user, so here we use + // getBaseSchema(), not getFullSchema() + if (schemaSize != row.size()) { + throw new TException("Column count doesn't match value count"); + } + } + for (List row : selectStmt.getValueList().getRows()) { + ++effectRows; + InternalService.PDataRow data = getRowStringValue(row); + if (data == null) { + continue; + } + List dataToSend = txnEntry.getDataToSend(); + dataToSend.add(data); + if (dataToSend.size() >= MAX_DATA_TO_SEND_FOR_TXN) { + // send data + InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry); + executor.sendData(); + } + } + } + txnEntry.setRowsInTransaction(txnEntry.getRowsInTransaction() + effectRows); + return effectRows; + } + + private void beginTxn(String dbName, String tblName) throws UserException, TException, + InterruptedException, ExecutionException, TimeoutException { + TransactionEntry txnEntry = context.getTxnEntry(); + TTxnParams txnConf = txnEntry.getTxnConf(); + long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS(); + TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING; + Database dbObj = Catalog.getCurrentCatalog().getDb(dbName); + if (dbObj == null) { + throw new TException("database is invalid for dbName: " + dbName); + } + Table tblObj = dbObj.getTable(tblName); + if (tblObj == null) { + throw new TException("table is invalid: " + tblName); + } + txnConf.setDbId(dbObj.getId()).setTbl(tblName).setDb(dbName); + txnEntry.setTable(tblObj); + txnEntry.setDb(dbObj); + String label = txnEntry.getLabel(); + if (Catalog.getCurrentCatalog().isMaster()) { + long txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( + txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), + label, new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + sourceType, timeoutSecond); + txnConf.setTxnId(txnId); + String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState( + txnConf.getDbId(), txnConf.getTxnId()).getAuthCode(); + txnConf.setAuthCodeUuid(authCodeUuid); + } else { + String authCodeUuid = UUID.randomUUID().toString(); + MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context); + TLoadTxnBeginRequest request = new TLoadTxnBeginRequest(); + request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setAuthCodeUuid(authCodeUuid) + .setCluster(dbObj.getClusterName()).setLabel(label).setUser("").setUserIp("").setPasswd(""); + TLoadTxnBeginResult result = masterTxnExecutor.beginTxn(request); + txnConf.setTxnId(result.getTxnId()); + txnConf.setAuthCodeUuid(authCodeUuid); + } + + TStreamLoadPutRequest request = new TStreamLoadPutRequest(); + request.setTxnId(txnConf.getTxnId()).setDb(txnConf.getDb()) + .setTbl(txnConf.getTbl()) + .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) + .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId()); + + // execute begin txn + InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry); + executor.beginTransaction(request); + } + + private static final String NULL_VALUE_FOR_LOAD = "\\N"; + + public static InternalService.PDataRow getRowStringValue(List cols) { + if (cols.size() == 0) { + return null; + } + InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder(); + for (Expr expr : cols) { + if (expr instanceof NullLiteral) { + row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD); + } else { + row.addColBuilder().setValue(expr.getStringValue()); + } + } + return row.build(); + } + // Process a select statement. private void handleInsertStmt() throws Exception { // Every time set no send flag and clean all data in buffer @@ -878,131 +1133,145 @@ public class StmtExecutor implements ProfileWriter { long createTime = System.currentTimeMillis(); Throwable throwable = null; - - String label = insertStmt.getLabel(); - LOG.info("Do insert [{}] with query id: {}", label, DebugUtil.printId(context.queryId())); - + long txnId = -1; + String label = ""; long loadedRows = 0; int filteredRows = 0; TransactionStatus txnStatus = TransactionStatus.ABORTED; - try { - coord = new Coordinator(context, analyzer, planner); - coord.setQueryType(TQueryType.LOAD); - - QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); - - coord.exec(); - - coord.join(context.getSessionVariable().getQueryTimeoutS()); - if (!coord.isDone()) { - coord.cancel(); - ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT); + String errMsg = ""; + if (context.isTxnModel()) { + if (insertStmt.getQueryStmt() instanceof SelectStmt) { + if (((SelectStmt) insertStmt.getQueryStmt()).getTableRefs().size() > 0) { + throw new TException("Insert into ** select is not supported in a transaction"); + } } + txnStatus = TransactionStatus.PREPARE; + loadedRows = executeForTxn(insertStmt); + label = context.getTxnEntry().getLabel(); + txnId = context.getTxnEntry().getTxnConf().getTxnId(); + } else { + label = insertStmt.getLabel(); + LOG.info("Do insert [{}] with query id: {}", label, DebugUtil.printId(context.queryId())); - if (!coord.getExecStatus().ok()) { - String errMsg = coord.getExecStatus().getErrorMsg(); - LOG.warn("insert failed: {}", errMsg); - ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); - } + try { + coord = new Coordinator(context, analyzer, planner); + coord.setQueryType(TQueryType.LOAD); - LOG.debug("delta files is {}", coord.getDeltaUrls()); + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); - if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) { - loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); - } - if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) { - filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); - } + coord.exec(); - // if in strict mode, insert will fail if there are filtered rows - if (context.getSessionVariable().getEnableInsertStrict()) { - if (filteredRows > 0) { - context.getState().setError("Insert has filtered data in strict mode, tracking_url=" - + coord.getTrackingUrl()); + coord.join(context.getSessionVariable().getQueryTimeoutS()); + if (!coord.isDone()) { + coord.cancel(); + ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT); + } + + if (!coord.getExecStatus().ok()) { + errMsg = coord.getExecStatus().getErrorMsg(); + LOG.warn("insert failed: {}", errMsg); + ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); + } + + LOG.debug("delta files is {}", coord.getDeltaUrls()); + + if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) { + loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); + } + if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) { + filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); + } + + // if in strict mode, insert will fail if there are filtered rows + if (context.getSessionVariable().getEnableInsertStrict()) { + if (filteredRows > 0) { + context.getState().setError("Insert has filtered data in strict mode, tracking_url=" + + coord.getTrackingUrl()); + return; + } + } + + if (insertStmt.getTargetTable().getType() != TableType.OLAP) { + // no need to add load job. + // MySQL table is already being inserted. + context.getState().setOk(loadedRows, filteredRows, null); return; } - } - if (insertStmt.getTargetTable().getType() != TableType.OLAP) { - // no need to add load job. - // MySQL table is already being inserted. - context.getState().setOk(loadedRows, filteredRows, null); - return; - } - - if (loadedRows == 0 && filteredRows == 0) { - // if no data, just abort txn and return ok - Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getDbObj().getId(), - insertStmt.getTransactionId(), TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG); - context.getState().setOk(); - return; - } - if (Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( - insertStmt.getDbObj(), Lists.newArrayList(insertStmt.getTargetTable()), insertStmt.getTransactionId(), - TabletCommitInfo.fromThrift(coord.getCommitInfos()), - context.getSessionVariable().getInsertVisibleTimeoutMs())) { - txnStatus = TransactionStatus.VISIBLE; - MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); - } else { - txnStatus = TransactionStatus.COMMITTED; - } - - } catch (Throwable t) { - // if any throwable being thrown during insert operation, first we should abort this txn - LOG.warn("handle insert stmt fail: {}", label, t); - try { - Catalog.getCurrentGlobalTransactionMgr().abortTransaction( - insertStmt.getDbObj().getId(), insertStmt.getTransactionId(), - t.getMessage() == null ? "unknown reason" : t.getMessage()); - } catch (Exception abortTxnException) { - // just print a log if abort txn failed. This failure do not need to pass to user. - // user only concern abort how txn failed. - LOG.warn("errors when abort txn", abortTxnException); - } - - if (!Config.using_old_load_usage_pattern) { - // if not using old load usage pattern, error will be returned directly to user - StringBuilder sb = new StringBuilder(t.getMessage()); - if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) { - sb.append(". url: " + coord.getTrackingUrl()); + if (loadedRows == 0 && filteredRows == 0) { + // if no data, just abort txn and return ok + Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getDbObj().getId(), + insertStmt.getTransactionId(), TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG); + context.getState().setOk(); + return; } - context.getState().setError(sb.toString()); - return; + if (Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( + insertStmt.getDbObj(), Lists.newArrayList(insertStmt.getTargetTable()), insertStmt.getTransactionId(), + TabletCommitInfo.fromThrift(coord.getCommitInfos()), + context.getSessionVariable().getInsertVisibleTimeoutMs())) { + txnStatus = TransactionStatus.VISIBLE; + MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); + } else { + txnStatus = TransactionStatus.COMMITTED; + } + + } catch (Throwable t) { + // if any throwable being thrown during insert operation, first we should abort this txn + LOG.warn("handle insert stmt fail: {}", label, t); + try { + Catalog.getCurrentGlobalTransactionMgr().abortTransaction( + insertStmt.getDbObj().getId(), insertStmt.getTransactionId(), + t.getMessage() == null ? "unknown reason" : t.getMessage()); + } catch (Exception abortTxnException) { + // just print a log if abort txn failed. This failure do not need to pass to user. + // user only concern abort how txn failed. + LOG.warn("errors when abort txn", abortTxnException); + } + + if (!Config.using_old_load_usage_pattern) { + // if not using old load usage pattern, error will be returned directly to user + StringBuilder sb = new StringBuilder(t.getMessage()); + if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) { + sb.append(". url: " + coord.getTrackingUrl()); + } + context.getState().setError(sb.toString()); + return; + } + + /* + * If config 'using_old_load_usage_pattern' is true. + * Doris will return a label to user, and user can use this label to check load job's status, + * which exactly like the old insert stmt usage pattern. + */ + throwable = t; } - /* - * If config 'using_old_load_usage_pattern' is true. - * Doris will return a label to user, and user can use this label to check load job's status, - * which exactly like the old insert stmt usage pattern. - */ - throwable = t; - } + // Go here, which means: + // 1. transaction is finished successfully (COMMITTED or VISIBLE), or + // 2. transaction failed but Config.using_old_load_usage_pattern is true. + // we will record the load job info for these 2 cases - // Go here, which means: - // 1. transaction is finished successfully (COMMITTED or VISIBLE), or - // 2. transaction failed but Config.using_old_load_usage_pattern is true. - // we will record the load job info for these 2 cases - - String errMsg = ""; - try { - context.getCatalog().getLoadManager().recordFinishedLoadJob( - label, - insertStmt.getDb(), - insertStmt.getTargetTable().getId(), - EtlJobType.INSERT, - createTime, - throwable == null ? "" : throwable.getMessage(), - coord.getTrackingUrl()); - } catch (MetaNotFoundException e) { - LOG.warn("Record info of insert load with error {}", e.getMessage(), e); - errMsg = "Record info of insert load with error " + e.getMessage(); + try { + context.getCatalog().getLoadManager().recordFinishedLoadJob( + label, + insertStmt.getDb(), + insertStmt.getTargetTable().getId(), + EtlJobType.INSERT, + createTime, + throwable == null ? "" : throwable.getMessage(), + coord.getTrackingUrl()); + } catch (MetaNotFoundException e) { + LOG.warn("Record info of insert load with error {}", e.getMessage(), e); + errMsg = "Record info of insert load with error " + e.getMessage(); + } + txnId = insertStmt.getTransactionId(); } // {'label':'my_label1', 'status':'visible', 'txnId':'123'} // {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'} StringBuilder sb = new StringBuilder(); sb.append("{'label':'").append(label).append("', 'status':'").append(txnStatus.name()); - sb.append("', 'txnId':'").append(insertStmt.getTransactionId()).append("'"); + sb.append("', 'txnId':'").append(txnId).append("'"); if (!Strings.isNullOrEmpty(errMsg)) { sb.append(", 'err':'").append(errMsg).append("'"); } @@ -1165,6 +1434,9 @@ public class StmtExecutor implements ProfileWriter { if (!statisticsForAuditLog.hasScanRows()) { statisticsForAuditLog.setScanRows(0L); } + if (statisticsForAuditLog.hasReturnedRows()) { + statisticsForAuditLog.setReturnedRows(0L); + } if (!statisticsForAuditLog.hasCpuMs()) { statisticsForAuditLog.setCpuMs(0L); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index ffd7e25b97..143b066178 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -77,6 +77,18 @@ public class BackendServiceClient { return stub.getInfo(request); } + public Future sendData(InternalService.PSendDataRequest request) { + return stub.sendData(request); + } + + public Future rollback(InternalService.PRollbackRequest request) { + return stub.rollback(request); + } + + public Future commit(InternalService.PCommitRequest request) { + return stub.commit(request); + } + public Future foldConstantExpr(InternalService.PConstantExprRequest request) { return stub.foldConstantExpr(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 40f2897520..2210645b1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -32,6 +32,7 @@ import org.apache.thrift.TSerializer; import com.google.common.collect.Maps; import com.google.protobuf.ByteString; +import java.util.List; import java.util.Map; import java.util.concurrent.Future; @@ -169,6 +170,48 @@ public class BackendServiceProxy { } } + public Future sendData( + TNetworkAddress address, Types.PUniqueId fragmentInstanceId, List data) + throws RpcException { + + final InternalService.PSendDataRequest.Builder pRequest = InternalService.PSendDataRequest.newBuilder(); + pRequest.setFragmentInstanceId(fragmentInstanceId); + pRequest.addAllData(data); + try { + final BackendServiceClient client = getProxy(address); + return client.sendData(pRequest.build()); + } catch (Throwable e) { + LOG.warn("failed to send data, address={}:{}", address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + + public Future rollback(TNetworkAddress address, Types.PUniqueId fragmentInstanceId) + throws RpcException { + final InternalService.PRollbackRequest pRequest = InternalService.PRollbackRequest.newBuilder() + .setFragmentInstanceId(fragmentInstanceId).build(); + try { + final BackendServiceClient client = getProxy(address); + return client.rollback(pRequest); + } catch (Throwable e) { + LOG.warn("failed to rollback, address={}:{}", address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + + public Future commit(TNetworkAddress address, Types.PUniqueId fragmentInstanceId) + throws RpcException { + final InternalService.PCommitRequest pRequest = InternalService.PCommitRequest.newBuilder() + .setFragmentInstanceId(fragmentInstanceId).build(); + try { + final BackendServiceClient client = getProxy(address); + return client.commit(pRequest); + } catch (Throwable e) { + LOG.warn("failed to commit, address={}:{}", address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public Future foldConstantExpr( TNetworkAddress address, TFoldConstantParams tParams) throws RpcException, TException { final InternalService.PConstantExprRequest pRequest = InternalService.PConstantExprRequest.newBuilder() diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 1bc16e7140..a25f29fc14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -105,6 +105,8 @@ import org.apache.doris.thrift.TPrivilegeStatus; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TUpdateExportTaskStatusRequest; import org.apache.doris.thrift.TUpdateMiniEtlTaskStatusRequest; +import org.apache.doris.thrift.TWaitingTxnStatusRequest; +import org.apache.doris.thrift.TWaitingTxnStatusResult; import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.TxnCoordinator; @@ -126,6 +128,7 @@ import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.doris.thrift.TStatusCode.NOT_IMPLEMENTED_ERROR; @@ -663,6 +666,23 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + + private void checkAuthCodeUuid(String dbName, long txnId, String authCodeUuid) throws AuthenticationException { + Database db = Catalog.getCurrentCatalog().getDb(dbName); + if (db == null) { + throw new AuthenticationException("invalid db name: " + dbName); + } + TransactionState transactionState = Catalog.getCurrentGlobalTransactionMgr(). + getTransactionState(db.getId(), txnId); + if (transactionState == null) { + throw new AuthenticationException("invalid transactionState: " + txnId); + } + if (!authCodeUuid.equals(transactionState.getAuthCode())) { + throw new AuthenticationException( + "Access denied; you need (at least one of) the LOAD privilege(s) for this operation"); + } + } + private void checkPasswordAndPrivs(String cluster, String user, String passwd, String db, String tbl, String clientIp, PrivPredicate predicate) throws AuthenticationException { @@ -718,7 +738,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); try { - result.setTxnId(loadTxnBeginImpl(request, clientAddr)); + TLoadTxnBeginResult tmpRes = loadTxnBeginImpl(request, clientAddr); + result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId()); } catch (DuplicatedRequestException e) { // this is a duplicate request, just return previous txn id LOG.warn("duplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(), e.getTxnId()); @@ -740,14 +761,16 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } - private long loadTxnBeginImpl(TLoadTxnBeginRequest request, String clientIp) throws UserException { + private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, String clientIp) throws UserException { String cluster = request.getCluster(); if (Strings.isNullOrEmpty(cluster)) { cluster = SystemInfoService.DEFAULT_CLUSTER; } - checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), - request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); + if (Strings.isNullOrEmpty(request.getAuthCodeUuid())) { + checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), + request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); + } // check label if (Strings.isNullOrEmpty(request.getLabel())) { @@ -769,10 +792,17 @@ public class FrontendServiceImpl implements FrontendService.Iface { // begin long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; MetricRepo.COUNTER_LOAD_ADD.increase(1L); - return Catalog.getCurrentGlobalTransactionMgr().beginTransaction( + long txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequestId(), new TxnCoordinator(TxnSourceType.BE, clientIp), TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); + if (!Strings.isNullOrEmpty(request.getAuthCodeUuid())) { + Catalog.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), txnId) + .setAuthCode(request.getAuthCodeUuid()); + } + TLoadTxnBeginResult result = new TLoadTxnBeginResult(); + result.setTxnId(txnId).setDbId(db.getId()); + return result; } @Override @@ -811,6 +841,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (request.isSetAuthCode()) { // TODO(cmy): find a way to check + } else if (request.isSetAuthCodeUuid()) { + checkAuthCodeUuid(request.getDb(), request.getTxnId(), request.getAuthCodeUuid()); } else { checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); @@ -819,7 +851,12 @@ public class FrontendServiceImpl implements FrontendService.Iface { // get database Catalog catalog = Catalog.getCurrentCatalog(); String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb()); - Database db = catalog.getDb(fullDbName); + Database db = null; + if (request.isSetDbId() && request.getDbId() > 0) { + db = catalog.getDb(request.getDbId()); + } else { + db = catalog.getDb(fullDbName); + } if (db == null) { String dbName = fullDbName; if (Strings.isNullOrEmpty(request.getCluster())) { @@ -872,12 +909,19 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (request.isSetAuthCode()) { // TODO(cmy): find a way to check + } else if (request.isSetAuthCodeUuid()) { + checkAuthCodeUuid(request.getDb(), request.getTxnId(), request.getAuthCodeUuid()); } else { checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); } String dbName = ClusterNamespace.getFullName(cluster, request.getDb()); - Database db = Catalog.getCurrentCatalog().getDb(dbName); + Database db = null; + if (request.isSetDbId() && request.getDbId() > 0) { + db = Catalog.getCurrentCatalog().getDb(request.getDbId()); + } else { + db = Catalog.getCurrentCatalog().getDb(dbName); + } if (db == null) { throw new MetaNotFoundException("db " + request.getDb() + " does not exist"); } @@ -1003,6 +1047,24 @@ public class FrontendServiceImpl implements FrontendService.Iface { TNetworkAddress addr = getClientAddr(); return addr == null ? "unknown" : addr.hostname; } + + @Override + public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request) throws TException { + TWaitingTxnStatusResult result = new TWaitingTxnStatusResult(); + result.setStatus(new TStatus()); + try { + result = Catalog.getCurrentGlobalTransactionMgr().getWaitingTxnStatus(request); + result.status.setStatusCode(TStatusCode.OK); + } catch (TimeoutException e) { + result.status.setStatusCode(TStatusCode.INCOMPLETE); + result.status.addToErrorMsgs(e.getMessage()); + } catch (AnalysisException e) { + result.status.setStatusCode(TStatusCode.INTERNAL_ERROR); + result.status.addToErrorMsgs(e.getMessage()); + } + return result; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 121ff60332..23b42eea47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -654,6 +654,20 @@ public class DatabaseTransactionMgr { } } + public Long getTransactionId(String label) { + readLock(); + try { + Set existingTxnIds = unprotectedGetTxnIdsByLabel(label); + if (existingTxnIds == null || existingTxnIds.isEmpty()) { + return null; + } + // find the latest txn (which id is largest) + return existingTxnIds.stream().max(Comparator.comparingLong(Long::valueOf)).get(); + } finally { + readUnlock(); + } + } + public List getCommittedTxnList() { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 6444b6e3e7..83ebf070a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -32,7 +32,10 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.persist.BatchRemoveTransactionsOperation; import org.apache.doris.persist.EditLog; +import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TWaitingTxnStatusRequest; +import org.apache.doris.thrift.TWaitingTxnStatusResult; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; @@ -52,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Transaction Manager @@ -159,6 +163,16 @@ public class GlobalTransactionMgr implements Writable { } + public Long getTransactionId(long dbId, String label) { + try { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); + return dbTransactionMgr.getTransactionId(label); + } catch (AnalysisException e) { + LOG.warn("Get transaction id by label " + label + " failed", e); + return null; + } + } + public void commitTransaction(long dbId, List tableList, long transactionId, List tabletCommitInfos) throws UserException { commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null); @@ -495,4 +509,43 @@ public class GlobalTransactionMgr implements Writable { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); dbTransactionMgr.updateDatabaseUsedQuotaData(usedQuotaDataBytes); } + + public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws AnalysisException, TimeoutException { + long dbId = request.getDbId(); + int commitTimeoutSec = Config.commit_timeout_second; + for (int i = 0; i < commitTimeoutSec; ++i) { + Database db = Catalog.getCurrentCatalog().getDb(dbId); + if (db == null) { + throw new AnalysisException("invalid db id: " + dbId); + } + TWaitingTxnStatusResult statusResult = new TWaitingTxnStatusResult(); + statusResult.status = new TStatus(); + TransactionStatus txnStatus = null; + if (request.isSetTxnId()) { + long txnId = request.getTxnId(); + TransactionState txnState = Catalog.getCurrentGlobalTransactionMgr(). + getTransactionState(dbId, txnId); + if (txnState == null) { + throw new AnalysisException("txn does not exist: " + txnId); + } + txnStatus = txnState.getTransactionStatus(); + if (!txnState.getReason().trim().isEmpty()) { + statusResult.status.setErrorMsgsIsSet(true); + statusResult.status.addToErrorMsgs(txnState.getReason()); + } + } else { + txnStatus = getLabelState(dbId, request.getLabel()); + } + if (txnStatus == TransactionStatus.UNKNOWN || txnStatus.isFinalStatus()) { + statusResult.setTxnStatusId(txnStatus.value()); + return statusResult; + } + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + LOG.info("commit sleep exception.", e); + } + } + throw new TimeoutException("Operation is timeout"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java new file mode 100644 index 0000000000..4db596dc55 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Table; +import org.apache.doris.proto.InternalService; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TTxnParams; + +import java.util.ArrayList; +import java.util.List; + +public class TransactionEntry { + + private String label = ""; + private Database db; + private Table table; + private Backend backend; + private TTxnParams txnConf; + private List dataToSend = new ArrayList<>(); + private long rowsInTransaction = 0; + + public TransactionEntry() { + } + + public TransactionEntry(TTxnParams txnConf, Database db, Table table) { + this.txnConf = txnConf; + this.db = db; + this.table = table; + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public Database getDb() { + return db; + } + + public void setDb(Database db) { + this.db = db; + } + + public Table getTable() { + return table; + } + + public void setTable(Table table) { + this.table = table; + } + + public Backend getBackend() { + return backend; + } + + public void setBackend(Backend backend) { + this.backend = backend; + } + + public TTxnParams getTxnConf() { + return txnConf; + } + + public void setTxnConf(TTxnParams txnConf) { + this.txnConf = txnConf; + } + + public boolean isTxnModel() { + return txnConf != null && txnConf.isNeedTxn(); + } + + public boolean isTxnIniting() { + return isTxnModel() && txnConf.getTxnId() == -1; + } + + public boolean isTxnBegin() { + return isTxnModel() && txnConf.getTxnId() != -1; + } + + public List getDataToSend() { + return dataToSend; + } + + public void setDataToSend(List dataToSend) { + this.dataToSend = dataToSend; + } + + public void clearDataToSend() { + dataToSend.clear(); + } + + public long getRowsInTransaction() { + return rowsInTransaction; + } + + public void setRowsInTransaction(long rowsInTransaction) { + this.rowsInTransaction = rowsInTransaction; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 23c8053969..28a77e3372 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -45,6 +45,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -205,6 +206,7 @@ public class TransactionState implements Writable { // 3. in afterStateTransform(), callback object can not be found, so the write lock can not be released. private TxnStateChangeCallback callback = null; private long timeoutMs = Config.stream_load_default_timeout_second; + private String authCode = ""; // is set to true, we will double the publish timeout private boolean prolongPublishTimeout = false; @@ -241,6 +243,7 @@ public class TransactionState implements Writable { this.publishVersionTasks = Maps.newHashMap(); this.hasSendTask = false; this.latch = new CountDownLatch(1); + this.authCode = UUID.randomUUID().toString(); } public TransactionState(long dbId, List tableIdList, long transactionId, String label, TUniqueId requestId, @@ -264,8 +267,16 @@ public class TransactionState implements Writable { this.latch = new CountDownLatch(1); this.callbackId = callbackId; this.timeoutMs = timeoutMs; + this.authCode = UUID.randomUUID().toString(); } - + + public void setAuthCode(String authCode) { + this.authCode = authCode; + } + public String getAuthCode() { + return authCode; + } + public void setErrorReplicas(Set newErrorReplicas) { this.errorReplicas = newErrorReplicas; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 5d7ddb5a0b..506b08ca9d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -67,6 +67,7 @@ public class StmtExecutorTest { private ConnectContext ctx; private QueryState state; private ConnectScheduler scheduler; + private MysqlChannel channel = null; @Mocked SocketChannel socketChannel; @@ -92,7 +93,7 @@ public class StmtExecutorTest { MysqlSerializer serializer = MysqlSerializer.newInstance(); Catalog catalog = AccessTestUtil.fetchAdminCatalog(); - MysqlChannel channel = new MysqlChannel(socketChannel); + channel = new MysqlChannel(socketChannel); new Expectations(channel) { { channel.sendOnePacket((ByteBuffer) any); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index d900073a15..4ebb671556 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -256,6 +256,39 @@ message PProxyResult { optional PKafkaPartitionOffsets partition_offsets = 3; }; +message PDataColumn { + optional string value = 1; +} + +message PDataRow { + repeated PDataColumn col = 1; +} + +message PSendDataRequest { + required PUniqueId fragment_instance_id = 1; + repeated PDataRow data = 2; +} + +message PSendDataResult { + required PStatus status = 1; +} + +message PCommitRequest { + required PUniqueId fragment_instance_id = 1; +} + +message PCommitResult { + required PStatus status = 1; +} + +message PRollbackRequest { + required PUniqueId fragment_instance_id = 1; +} + +message PRollbackResult { + required PStatus status = 1; +} + message PBloomFilter { required bool always_true = 2; required int32 filter_length = 1; @@ -358,6 +391,9 @@ service PBackendService { rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse); rpc fetch_cache(PFetchCacheRequest) returns (PFetchCacheResult); rpc clear_cache(PClearCacheRequest) returns (PCacheResponse); + rpc send_data(PSendDataRequest) returns (PSendDataResult); + rpc commit(PCommitRequest) returns (PCommitResult); + rpc rollback(PRollbackRequest) returns (PRollbackResult); rpc merge_filter(PMergeFilterRequest) returns (PMergeFilterResponse); rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse); rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 0f8b420219..78e4ea3041 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -503,6 +503,7 @@ struct TMiniLoadBeginRequest { 11: optional i64 auth_code 12: optional i64 create_timestamp 13: optional Types.TUniqueId request_id + 14: optional string auth_code_uuid } struct TIsMethodSupportedRequest { @@ -533,12 +534,14 @@ struct TLoadTxnBeginRequest { // The real value of timeout should be i32. i64 ensures the compatibility of interface. 10: optional i64 timeout 11: optional Types.TUniqueId request_id + 12: optional string auth_code_uuid } struct TLoadTxnBeginResult { 1: required Status.TStatus status 2: optional i64 txnId 3: optional string job_status // if label already used, set status of existing job + 4: optional i64 db_id } // StreamLoad request, used to load a streaming to engine @@ -587,6 +590,7 @@ struct TStreamLoadPutRequest { 31: optional bool fuzzy_parse 32: optional string line_delimiter 33: optional bool read_json_by_line + 34: optional string auth_code_uuid } struct TStreamLoadPutResult { @@ -638,6 +642,8 @@ struct TLoadTxnCommitRequest { 10: optional i64 auth_code 11: optional TTxnCommitAttachment txnCommitAttachment 12: optional i64 thrift_rpc_timeout_ms + 13: optional string auth_code_uuid + 14: optional i64 db_id } struct TLoadTxnCommitResult { @@ -655,6 +661,8 @@ struct TLoadTxnRollbackRequest { 8: optional string reason 9: optional i64 auth_code 10: optional TTxnCommitAttachment txnCommitAttachment + 11: optional string auth_code_uuid + 12: optional i64 db_id } struct TLoadTxnRollbackResult { @@ -695,6 +703,17 @@ struct TPropertyVal { 4: optional bool boolVal } +struct TWaitingTxnStatusRequest { + 1: optional i64 db_id + 2: optional i64 txn_id + 3: optional string label +} + +struct TWaitingTxnStatusResult { + 1: optional Status.TStatus status + 2: optional i32 txn_status_id +} + service FrontendService { TGetDbsResult getDbNames(1:TGetDbsParams params) TGetTablesResult getTableNames(1:TGetTablesParams params) @@ -727,6 +746,8 @@ service FrontendService { TLoadTxnCommitResult loadTxnCommit(1: TLoadTxnCommitRequest request) TLoadTxnRollbackResult loadTxnRollback(1: TLoadTxnRollbackRequest request) + TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request) + TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request) Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request) diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index ae64ef5bf2..434c661296 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -244,6 +244,18 @@ enum PaloInternalServiceVersion { V1 } +struct TTxnParams { + 1: optional bool need_txn + 2: optional string auth_code_uuid + 3: optional i64 thrift_rpc_timeout_ms + 4: optional string db + 5: optional string tbl + 6: optional string user_ip + 7: optional i64 txn_id + 8: optional Types.TUniqueId fragment_instance_id + 9: optional i64 db_id + 10: optional double max_filter_ratio +} // ExecPlanFragment @@ -299,6 +311,7 @@ struct TExecPlanFragmentParams { // If true, all @Common components is unset and should be got from BE's cache // If this field is unset or it set to false, all @Common components is set. 16: optional bool is_simplified_param + 17: optional TTxnParams txn_conf } struct TExecPlanFragmentResult { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 8d968dc000..029f5c4bc3 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -106,6 +106,7 @@ enum TFileFormatType { FORMAT_CSV_DEFLATE, FORMAT_ORC, FORMAT_JSON, + FORMAT_PROTO, } struct THdfsConf {