From 082bcd820b0cfe1c8533b84352b574a6ec5f20cb Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Tue, 26 Sep 2023 14:46:24 +0800 Subject: [PATCH] [feature](insert) Support wal for group commit insert (#23053) --- be/src/common/config.cpp | 6 + be/src/common/config.h | 6 + be/src/http/action/http_stream.cpp | 6 +- be/src/http/http_common.h | 2 + be/src/http/utils.cpp | 10 +- be/src/olap/wal_manager.cpp | 252 +++++++++++++++++ be/src/olap/wal_manager.h | 61 ++++ be/src/olap/wal_reader.cpp | 91 ++++++ be/src/olap/wal_reader.h | 45 +++ be/src/olap/wal_table.cpp | 265 ++++++++++++++++++ be/src/olap/wal_table.h | 58 ++++ be/src/olap/wal_writer.cpp | 75 +++++ be/src/olap/wal_writer.h | 49 ++++ be/src/runtime/exec_env.h | 3 + be/src/runtime/exec_env_init.cpp | 5 + be/src/runtime/group_commit_mgr.cpp | 31 +- be/src/runtime/plan_fragment_executor.cpp | 3 + be/src/runtime/runtime_state.h | 7 + .../runtime/stream_load/stream_load_context.h | 1 + be/src/util/load_util.cpp | 3 + be/src/vec/exec/format/wal/wal_reader.cpp | 77 +++++ be/src/vec/exec/format/wal/wal_reader.h | 43 +++ be/src/vec/exec/scan/vfile_scanner.cpp | 10 + be/src/vec/sink/vtablet_finder.cpp | 1 + be/src/vec/sink/vtablet_finder.h | 6 +- be/src/vec/sink/vtablet_sink.cpp | 2 +- be/src/vec/sink/vtablet_sink.h | 1 + be/src/vec/sink/writer/vtablet_writer.cpp | 64 ++++- be/src/vec/sink/writer/vtablet_writer.h | 17 +- be/test/exec/test_data/wal_scanner/wal | Bin 0 -> 132 bytes be/test/olap/wal_manager_test.cpp | 165 +++++++++++ be/test/olap/wal_reader_writer_test.cpp | 137 +++++++++ be/test/vec/exec/vtablet_sink_test.cpp | 263 +++++++++++++++++ .../doris/analysis/DataDescription.java | 4 + .../org/apache/doris/common/util/Util.java | 2 + .../planner/external/FileQueryScanNode.java | 3 +- .../doris/service/FrontendServiceImpl.java | 9 +- .../ExternalFileTableValuedFunction.java | 23 ++ gensrc/thrift/FrontendService.thrift | 10 + gensrc/thrift/PaloInternalService.thrift | 2 + gensrc/thrift/PlanNodes.thrift | 1 + 41 files changed, 1796 insertions(+), 23 deletions(-) create mode 100644 be/src/olap/wal_manager.cpp create mode 100644 be/src/olap/wal_manager.h create mode 100644 be/src/olap/wal_reader.cpp create mode 100644 be/src/olap/wal_reader.h create mode 100644 be/src/olap/wal_table.cpp create mode 100644 be/src/olap/wal_table.h create mode 100644 be/src/olap/wal_writer.cpp create mode 100644 be/src/olap/wal_writer.h create mode 100644 be/src/vec/exec/format/wal/wal_reader.cpp create mode 100644 be/src/vec/exec/format/wal/wal_reader.h create mode 100644 be/test/exec/test_data/wal_scanner/wal create mode 100644 be/test/olap/wal_manager_test.cpp create mode 100644 be/test/olap/wal_reader_writer_test.cpp diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c3d3193c2a..643f6eb23c 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1088,6 +1088,12 @@ DEFINE_Int32(grace_shutdown_wait_seconds, "120"); DEFINE_Int16(bitmap_serialize_version, "1"); +// group commit insert config +DEFINE_String(group_commit_replay_wal_dir, "./wal"); +DEFINE_Int32(group_commit_replay_wal_retry_num, "10"); +DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5"); +DEFINE_Int32(group_commit_sync_wal_batch, "10"); + // the count of thread to group commit insert DEFINE_Int32(group_commit_insert_threads, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index fba48bf3f4..349a8a93a9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1159,6 +1159,12 @@ DECLARE_Int32(grace_shutdown_wait_seconds); // BitmapValue serialize version. DECLARE_Int16(bitmap_serialize_version); +// group commit insert config +DECLARE_String(group_commit_replay_wal_dir); +DECLARE_Int32(group_commit_replay_wal_retry_num); +DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds); +DECLARE_Int32(group_commit_sync_wal_batch); + // This config can be set to limit thread number in group commit insert thread pool. DECLARE_mInt32(group_commit_insert_threads); diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index f8068389b2..11e69615d2 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -223,12 +223,13 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { if (ctx == nullptr || !ctx->status.ok()) { return; } - + if (!req->header(HTTP_WAL_ID_KY).empty()) { + ctx->wal_id = std::stoll(req->header(HTTP_WAL_ID_KY)); + } struct evhttp_request* ev_req = req->get_evhttp_request(); auto evbuf = evhttp_request_get_input_buffer(ev_req); int64_t start_read_data_time = MonotonicNanos(); - while (evbuffer_get_length(evbuf) > 0) { auto bb = ByteBuffer::allocate(128 * 1024); auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); @@ -306,6 +307,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req, ctx->db = ctx->put_result.params.db_name; ctx->table = ctx->put_result.params.table_name; ctx->txn_id = ctx->put_result.params.txn_conf.txn_id; + ctx->put_result.params.__set_wal_id(ctx->wal_id); return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); } diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index bcbfa33e10..8e84140707 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -63,5 +63,7 @@ static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; static const std::string HTTP_TXN_OPERATION_KEY = "txn_operation"; static const std::string HTTP_MEMTABLE_ON_SINKNODE = "memtable_on_sink_node"; +static const std::string HTTP_WAL_ID_KY = "wal_id"; +static const std::string HTTP_AUTH_CODE = "auth_code"; } // namespace doris diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index fe11738d5a..f55b5d4769 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -30,6 +30,7 @@ #include "common/status.h" #include "common/utils.h" #include "http/http_channel.h" +#include "http/http_common.h" #include "http/http_headers.h" #include "http/http_method.h" #include "http/http_request.h" @@ -72,7 +73,12 @@ bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* pa bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) { auto& token = req.header("token"); - if (token.empty()) { + auto& auth_code = req.header(HTTP_AUTH_CODE); + if (!token.empty()) { + auth->token = token; + } else if (!auth_code.empty()) { + auth->auth_code = std::stoll(auth_code); + } else { std::string full_user; if (!parse_basic_auth(req, &full_user, &auth->passwd)) { return false; @@ -84,8 +90,6 @@ bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) { } else { auth->user = full_user; } - } else { - auth->token = token; } // set user ip diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp new file mode 100644 index 0000000000..8379c9e0d8 --- /dev/null +++ b/be/src/olap/wal_manager.cpp @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_manager.h" + +#include + +#include +#include + +#include "io/fs/local_file_system.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/path_util.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/format/wal/wal_reader.h" + +namespace doris { +WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) + : _exec_env(exec_env), _stop_background_threads_latch(1) { + doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); +} + +WalManager::~WalManager() { + _stop_background_threads_latch.count_down(); + if (_replay_thread) { + _replay_thread->join(); + } + LOG(INFO) << "WalManager is destoried"; +} +void WalManager::stop() { + _stop = true; + LOG(INFO) << "WalManager is stopped"; +} + +Status WalManager::init() { + bool exists = false; + for (auto wal_dir : _wal_dirs) { + std::string tmp_dir = wal_dir + "/tmp"; + LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir)); + } + RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); + } + RETURN_IF_ERROR(scan_wals(wal_dir)); + } + return Thread::create( + "WalMgr", "replay_wal", [this]() { this->replay(); }, &_replay_thread); +} + +Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, + const std::string& label) { + std::string base_path = + _wal_dirs.size() == 1 ? _wal_dirs[0] : _wal_dirs[rand() % _wal_dirs.size()]; + std::stringstream ss; + ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" + << std::to_string(wal_id) << "_" << label; + { + std::lock_guard wrlock(_wal_lock); + _wal_path_map.emplace(wal_id, ss.str()); + } + return Status::OK(); +} + +Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) { + std::shared_lock rdlock(_wal_lock); + auto it = _wal_path_map.find(wal_id); + if (it != _wal_path_map.end()) { + wal_path = _wal_path_map[wal_id]; + } else { + return Status::InternalError("can not find wal_id {} in wal_path_map", wal_id); + } + return Status::OK(); +} + +Status WalManager::create_wal_reader(const std::string& wal_path, + std::shared_ptr& wal_reader) { + wal_reader = std::make_shared(wal_path); + RETURN_IF_ERROR(wal_reader->init()); + return Status::OK(); +} + +Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr& wal_writer) { + std::string wal_path; + RETURN_IF_ERROR(get_wal_path(wal_id, wal_path)); + std::vector path_element; + doris::vectorized::WalReader::string_split(wal_path, "/", path_element); + std::stringstream ss; + for (int i = 0; i < path_element.size() - 1; i++) { + ss << path_element[i] << "/"; + } + std::string base_path = ss.str(); + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(base_path, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path)); + } + LOG(INFO) << "create wal " << wal_path; + wal_writer = std::make_shared(wal_path); + RETURN_IF_ERROR(wal_writer->init()); + return Status::OK(); +} + +Status WalManager::scan_wals(const std::string& wal_path) { + size_t count = 0; + bool exists = true; + std::vector dbs; + Status st = io::global_local_filesystem()->list(wal_path, false, &dbs, &exists); + if (!st.ok()) { + LOG(WARNING) << "Failed list files for dir=" << wal_path << ", st=" << st.to_string(); + return st; + } + for (const auto& db_id : dbs) { + if (db_id.is_file) { + continue; + } + std::vector tables; + auto db_path = wal_path + "/" + db_id.file_name; + st = io::global_local_filesystem()->list(db_path, false, &tables, &exists); + if (!st.ok()) { + LOG(WARNING) << "Failed list files for dir=" << db_path << ", st=" << st.to_string(); + return st; + } + for (const auto& table_id : tables) { + if (table_id.is_file) { + continue; + } + std::vector wals; + auto table_path = db_path + "/" + table_id.file_name; + st = io::global_local_filesystem()->list(table_path, false, &wals, &exists); + if (!st.ok()) { + LOG(WARNING) << "Failed list files for dir=" << table_path + << ", st=" << st.to_string(); + return st; + } + if (wals.size() == 0) { + continue; + } + std::vector res; + for (const auto& wal : wals) { + auto wal_file = table_path + "/" + wal.file_name; + res.emplace_back(wal_file); + { + std::lock_guard wrlock(_wal_lock); + int64_t wal_id = std::strtoll(wal.file_name.c_str(), NULL, 10); + _wal_path_map.emplace(wal_id, wal_file); + } + } + st = add_recover_wal(db_id.file_name, table_id.file_name, res); + count += res.size(); + if (!st.ok()) { + LOG(WARNING) << "Failed add replay wal, db=" << db_id.file_name + << ", table=" << table_id.file_name << ", st=" << st.to_string(); + return st; + } + } + } + LOG(INFO) << "Finish list all wals, size:" << count; + return Status::OK(); +} + +Status WalManager::replay() { + do { + if (_stop || _exec_env->master_info() == nullptr) { + break; + } + // port == 0 means not received heartbeat yet + while (_exec_env->master_info()->network_address.port == 0) { + sleep(1); + continue; + } + std::vector replay_tables; + { + std::lock_guard wrlock(_lock); + auto it = _table_map.begin(); + while (it != _table_map.end()) { + if (it->second->size() == 0) { + it = _table_map.erase(it); + } else { + replay_tables.push_back(it->first); + it++; + } + } + } + for (const auto& table_id : replay_tables) { + auto st = _table_map[table_id]->replay_wals(); + if (!st.ok()) { + LOG(WARNING) << "Failed add replay wal on table " << table_id; + } + } + } while (!_stop_background_threads_latch.wait_for( + std::chrono::seconds(config::group_commit_replay_wal_retry_interval_seconds))); + return Status::OK(); +} + +Status WalManager::add_recover_wal(const std::string& db_id, const std::string& table_id, + std::vector wals) { + std::lock_guard wrlock(_lock); + std::shared_ptr table_ptr; + auto it = _table_map.find(table_id); + if (it == _table_map.end()) { + table_ptr = std::make_shared(_exec_env, std::stoll(db_id), std::stoll(table_id)); + _table_map.emplace(table_id, table_ptr); + } else { + table_ptr = it->second; + } + table_ptr->add_wals(wals); + return Status::OK(); +} + +size_t WalManager::get_wal_table_size(const std::string& table_id) { + std::shared_lock rdlock(_lock); + auto it = _table_map.find(table_id); + if (it != _table_map.end()) { + return it->second->size(); + } else { + return 0; + } +} + +Status WalManager::delete_wal(int64_t wal_id) { + { + std::lock_guard wrlock(_wal_lock); + std::string wal_path = _wal_path_map[wal_id]; + RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path)); + LOG(INFO) << "delete file=" << wal_path; + _wal_path_map.erase(wal_id); + } + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h new file mode 100644 index 0000000000..f5b49f6dda --- /dev/null +++ b/be/src/olap/wal_manager.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "common/config.h" +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "olap/wal_reader.h" +#include "olap/wal_table.h" +#include "olap/wal_writer.h" +#include "runtime/exec_env.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/thread.h" + +namespace doris { +class WalManager { + ENABLE_FACTORY_CREATOR(WalManager); + +public: + WalManager(ExecEnv* exec_env, const std::string& wal_dir); + ~WalManager(); + Status delete_wal(int64_t wal_id); + Status init(); + Status scan_wals(const std::string& wal_path); + Status replay(); + Status create_wal_reader(const std::string& wal_path, std::shared_ptr& wal_reader); + Status create_wal_writer(int64_t wal_id, std::shared_ptr& wal_writer); + Status scan(); + size_t get_wal_table_size(const std::string& table_id); + Status add_recover_wal(const std::string& db_id, const std::string& table_id, + std::vector wals); + Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label); + Status get_wal_path(int64_t wal_id, std::string& wal_path); + void stop(); + +private: + ExecEnv* _exec_env; + std::shared_mutex _lock; + scoped_refptr _replay_thread; + CountDownLatch _stop_background_threads_latch; + std::map> _table_map; + std::vector _wal_dirs; + std::shared_mutex _wal_lock; + std::unordered_map _wal_path_map; + bool _stop = false; +}; +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_reader.cpp b/be/src/olap/wal_reader.cpp new file mode 100644 index 0000000000..180e391016 --- /dev/null +++ b/be/src/olap/wal_reader.cpp @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_reader.h" + +#include "common/status.h" +#include "io/fs/file_reader.h" +#include "io/fs/local_file_system.h" +#include "io/fs/path.h" +#include "util/crc32c.h" +#include "wal_writer.h" + +namespace doris { + +WalReader::WalReader(const std::string& file_name) : _file_name(file_name), _offset(0) {} + +WalReader::~WalReader() {} + +Status WalReader::init() { + RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_name, &file_reader)); + return Status::OK(); +} + +Status WalReader::finalize() { + auto st = file_reader->close(); + if (!st.ok()) { + LOG(WARNING) << "fail to close file " << _file_name; + } + return Status::OK(); +} + +Status WalReader::read_block(PBlock& block) { + if (_offset >= file_reader->size()) { + return Status::EndOfFile("end of wal file"); + } + size_t bytes_read = 0; + uint8_t row_len_buf[WalWriter::LENGTH_SIZE]; + RETURN_IF_ERROR( + file_reader->read_at(_offset, {row_len_buf, WalWriter::LENGTH_SIZE}, &bytes_read)); + _offset += WalWriter::LENGTH_SIZE; + size_t block_len; + memcpy(&block_len, row_len_buf, WalWriter::LENGTH_SIZE); + // read block + std::string block_buf; + block_buf.resize(block_len); + RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.c_str(), block_len}, &bytes_read)); + _offset += block_len; + RETURN_IF_ERROR(_deserialize(block, block_buf)); + // checksum + uint8_t checksum_len_buf[WalWriter::CHECKSUM_SIZE]; + RETURN_IF_ERROR(file_reader->read_at(_offset, {checksum_len_buf, WalWriter::CHECKSUM_SIZE}, + &bytes_read)); + _offset += WalWriter::CHECKSUM_SIZE; + uint32_t checksum; + memcpy(&checksum, checksum_len_buf, WalWriter::CHECKSUM_SIZE); + RETURN_IF_ERROR(_check_checksum(block_buf.data(), block_len, checksum)); + return Status::OK(); +} + +Status WalReader::_deserialize(PBlock& block, std::string& buf) { + if (UNLIKELY(!block.ParseFromString(buf))) { + return Status::InternalError("failed to deserialize row"); + } + return Status::OK(); +} + +Status WalReader::_check_checksum(const char* binary, size_t size, uint32_t checksum) { + uint32_t computed_checksum = crc32c::Value(binary, size); + if (LIKELY(computed_checksum == checksum)) { + return Status::OK(); + } + return Status::InternalError("checksum failed for wal=" + _file_name + + ", computed checksum=" + std::to_string(computed_checksum) + + ", expected=" + std::to_string(checksum)); +} + +} // namespace doris diff --git a/be/src/olap/wal_reader.h b/be/src/olap/wal_reader.h new file mode 100644 index 0000000000..825d11fae3 --- /dev/null +++ b/be/src/olap/wal_reader.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "gen_cpp/internal_service.pb.h" +#include "io/fs/file_reader_writer_fwd.h" + +namespace doris { + +class WalReader { +public: + explicit WalReader(const std::string& file_name); + ~WalReader(); + + Status init(); + Status finalize(); + + Status read_block(PBlock& block); + +private: + Status _deserialize(PBlock& block, std::string& buf); + Status _check_checksum(const char* binary, size_t size, uint32_t checksum); + + std::string _file_name; + size_t _offset; + io::FileReaderSPtr file_reader; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp new file mode 100644 index 0000000000..40b48ad773 --- /dev/null +++ b/be/src/olap/wal_table.cpp @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_table.h" + +#include +#include +#include +#include +#include + +#include "evhttp.h" +#include "http/action/stream_load.h" +#include "http/ev_http_server.h" +#include "http/http_common.h" +#include "http/http_headers.h" +#include "http/utils.h" +#include "io/fs/local_file_system.h" +#include "olap/wal_manager.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "util/path_util.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/format/wal/wal_reader.h" + +namespace doris { + +WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) + : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {} +WalTable::~WalTable() {} + +#ifdef BE_TEST +std::string k_request_line; +#endif + +void WalTable::add_wals(std::vector wals) { + std::lock_guard lock(_replay_wal_lock); + for (const auto& wal : wals) { + LOG(INFO) << "add replay wal " << wal; + _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false}); + } +} +Status WalTable::replay_wals() { + std::vector need_replay_wals; + { + std::lock_guard lock(_replay_wal_lock); + if (_replay_wal_map.empty()) { + return Status::OK(); + } + VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id + << ", wal size=" << _replay_wal_map.size(); + for (auto& [wal, info] : _replay_wal_map) { + auto& [retry_num, start_ts, replaying] = info; + if (replaying) { + continue; + } + if (retry_num >= config::group_commit_replay_wal_retry_num) { + LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id + << ", wal=" << wal + << ", retry_num=" << config::group_commit_replay_wal_retry_num; + std::string rename_path = get_tmp_path(wal); + LOG(INFO) << "rename wal from " << wal << " to " << rename_path; + std::rename(wal.c_str(), rename_path.c_str()); + _replay_wal_map.erase(wal); + continue; + } + if (need_replay(info)) { + replaying = true; + need_replay_wals.push_back(wal); + } + } + std::sort(need_replay_wals.begin(), need_replay_wals.end()); + } + for (const auto& wal : need_replay_wals) { + auto st = replay_wal_internal(wal); + if (!st.ok()) { + std::lock_guard lock(_replay_wal_lock); + auto it = _replay_wal_map.find(wal); + if (it != _replay_wal_map.end()) { + auto& [retry_num, start_time, replaying] = it->second; + replaying = false; + } + LOG(WARNING) << "failed replay wal, drop this round, db=" << _db_id + << ", table=" << _table_id << ", wal=" << wal << ", st=" << st.to_string(); + break; + } + VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id << ", label=" << wal + << ", st=" << st.to_string(); + } + return Status::OK(); +} + +std::string WalTable::get_tmp_path(const std::string wal) { + std::vector path_element; + doris::vectorized::WalReader::string_split(wal, "/", path_element); + std::stringstream ss; + int index = 0; + while (index < path_element.size() - 3) { + ss << path_element[index] << "/"; + index++; + } + ss << "tmp/"; + while (index < path_element.size()) { + if (index != path_element.size() - 1) { + ss << path_element[index] << "_"; + } else { + ss << path_element[index]; + } + index++; + } + return ss.str(); +} + +bool WalTable::need_replay(const doris::WalTable::replay_wal_info& info) { +#ifndef BE_TEST + auto& [retry_num, start_ts, replaying] = info; + auto replay_interval = + pow(2, retry_num) * config::group_commit_replay_wal_retry_interval_seconds * 1000; + return UnixMillis() - start_ts >= replay_interval; +#else + return true; +#endif +} + +Status WalTable::replay_wal_internal(const std::string& wal) { + LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" << _table_id << ", wal=" << wal; + // start a new stream load + { + std::lock_guard lock(_replay_wal_lock); + auto it = _replay_wal_map.find(wal); + if (it != _replay_wal_map.end()) { + auto& [retry_num, start_time, replaying] = it->second; + ++retry_num; + replaying = true; + } else { + LOG(WARNING) << "can not find wal in stream load replay map. db=" << _db_id + << ", table=" << _table_id << ", wal=" << wal; + return Status::OK(); + } + } + auto pair = get_wal_info(wal); + auto wal_id = pair.first; + auto label = pair.second; + RETURN_IF_ERROR(send_request(wal_id, wal, label)); + return Status::OK(); +} + +std::pair WalTable::get_wal_info(const std::string& wal) { + std::vector path_element; + doris::vectorized::WalReader::string_split(wal, "/", path_element); + auto pos = path_element[path_element.size() - 1].find("_"); + int64_t wal_id = + std::strtoll(path_element[path_element.size() - 1].substr(0, pos).c_str(), NULL, 10); + auto label = path_element[path_element.size() - 1].substr(pos + 1); + return std::make_pair(wal_id, label); +} + +void http_request_done(struct evhttp_request* req, void* arg) { + event_base_loopbreak((struct event_base*)arg); +} + +Status WalTable::send_request(int64_t wal_id, const std::string& wal, const std::string& label) { +#ifndef BE_TEST + struct event_base* base = nullptr; + struct evhttp_connection* conn = nullptr; + struct evhttp_request* req = nullptr; + event_init(); + base = event_base_new(); + conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port); + evhttp_connection_set_base(conn, base); + req = evhttp_request_new(http_request_done, base); + evhttp_add_header(req->output_headers, HTTP_LABEL_KEY.c_str(), label.c_str()); + evhttp_add_header(req->output_headers, HTTP_AUTH_CODE.c_str(), std::to_string(wal_id).c_str()); + evhttp_add_header(req->output_headers, HTTP_WAL_ID_KY.c_str(), std::to_string(wal_id).c_str()); + std::stringstream ss; + ss << "insert into " << std::to_string(_table_id) << " WITH LABEL " << label + << " select * from " + "http_stream(\"format\" = \"wal\", \"table_id\" = \"" + << std::to_string(_table_id) << "\")"; + evhttp_add_header(req->output_headers, HTTP_SQL.c_str(), ss.str().c_str()); + evbuffer* output = evhttp_request_get_output_buffer(req); + evbuffer_add_printf(output, "replay wal %s", std::to_string(wal_id).c_str()); + + evhttp_make_request(conn, req, EVHTTP_REQ_PUT, "/api/_http_stream"); + evhttp_connection_set_timeout(req->evcon, 300); + + event_base_dispatch(base); + evhttp_connection_free(conn); + event_base_free(base); + +#endif + bool retry = false; + std::string status; + std::string msg; + std::stringstream out; + rapidjson::Document doc; +#ifndef BE_TEST + size_t len = 0; + auto input = evhttp_request_get_input_buffer(req); + char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); + while (request_line != nullptr) { + std::string s(request_line); + out << request_line; + request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); + } +#else + out << k_request_line; +#endif + auto out_str = out.str(); + if (!out_str.empty()) { + doc.Parse(out.str().c_str()); + status = std::string(doc["Status"].GetString()); + msg = std::string(doc["Message"].GetString()); + LOG(INFO) << "replay wal " << wal_id << " status:" << status << ",msg:" << msg; + if (status.find("Fail") != status.npos) { + if (msg.find("Label") != msg.npos && msg.find("has already been used") != msg.npos) { + retry = false; + } else { + retry = true; + } + } else { + retry = false; + } + } else { + retry = true; + } + if (retry) { + LOG(INFO) << "fail to replay wal =" << wal << ",status:" << status << ",msg:" << msg; + std::lock_guard lock(_replay_wal_lock); + auto it = _replay_wal_map.find(wal); + if (it != _replay_wal_map.end()) { + auto& [retry_num, start_time, replaying] = it->second; + replaying = false; + } else { + _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false}); + } + } else { + LOG(INFO) << "success to replay wal =" << wal << ",status:" << status << ",msg:" << msg; + RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id)); + std::lock_guard lock(_replay_wal_lock); + _replay_wal_map.erase(wal); + } + return Status::OK(); +} + +size_t WalTable::size() { + std::lock_guard lock(_replay_wal_lock); + return _replay_wal_map.size(); +} +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_table.h b/be/src/olap/wal_table.h new file mode 100644 index 0000000000..2dd63240d3 --- /dev/null +++ b/be/src/olap/wal_table.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include +#include +#include +#include + +#include "common/status.h" +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "runtime/exec_env.h" +#include "runtime/stream_load/stream_load_context.h" +namespace doris { +class WalTable { +public: + WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id); + ~WalTable(); + // + using replay_wal_info = std::tuple; + // used when be start and there are wals need to do recovery + void add_wals(std::vector wals); + Status replay_wals(); + size_t size(); + +private: + std::pair get_wal_info(const std::string& wal); + std::string get_tmp_path(const std::string wal); + Status send_request(int64_t wal_id, const std::string& wal, const std::string& label); + +private: + ExecEnv* _exec_env; + int64_t _db_id; + int64_t _table_id; + std::string _relay = "relay"; + std::string _split = "_"; + mutable std::mutex _replay_wal_lock; + // key is wal_id + std::map _replay_wal_map; + bool need_replay(const replay_wal_info& info); + Status replay_wal_internal(const std::string& wal); +}; +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal_writer.cpp new file mode 100644 index 0000000000..7cd427453b --- /dev/null +++ b/be/src/olap/wal_writer.cpp @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_writer.h" + +#include "io/fs/file_writer.h" +#include "io/fs/local_file_system.h" +#include "io/fs/path.h" +#include "olap/storage_engine.h" +#include "util/crc32c.h" + +namespace doris { + +WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {} + +WalWriter::~WalWriter() {} + +Status WalWriter::init() { + _batch = config::group_commit_sync_wal_batch; + RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_name, &_file_writer)); + return Status::OK(); +} + +Status WalWriter::finalize() { + auto st = _file_writer->close(); + if (!st.ok()) { + LOG(WARNING) << "fail to close file " << _file_name; + } + return Status::OK(); +} + +Status WalWriter::append_blocks(const PBlockArray& blocks) { + size_t total_size = 0; + for (const auto& block : blocks) { + total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE; + } + std::string binary(total_size, '\0'); + char* row_binary = binary.data(); + size_t offset = 0; + for (const auto& block : blocks) { + unsigned long row_length = block->GetCachedSize(); + memcpy(row_binary + offset, &row_length, LENGTH_SIZE); + offset += LENGTH_SIZE; + memcpy(row_binary + offset, block->SerializeAsString().data(), row_length); + offset += row_length; + uint32_t checksum = crc32c::Value(block->SerializeAsString().data(), row_length); + memcpy(row_binary + offset, &checksum, CHECKSUM_SIZE); + offset += CHECKSUM_SIZE; + } + DCHECK(offset == total_size); + // write rows + RETURN_IF_ERROR(_file_writer->append({row_binary, offset})); + _count++; + if (_count % _batch == 0) { + //todo sync data + //LOG(INFO) << "count=" << count << ",do sync"; + } + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/olap/wal_writer.h b/be/src/olap/wal_writer.h new file mode 100644 index 0000000000..12fd84f258 --- /dev/null +++ b/be/src/olap/wal_writer.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "gen_cpp/internal_service.pb.h" +#include "io/fs/file_reader_writer_fwd.h" + +namespace doris { + +using PBlockArray = std::vector; + +class WalWriter { +public: + explicit WalWriter(const std::string& file_name); + ~WalWriter(); + + Status init(); + Status finalize(); + + Status append_blocks(const PBlockArray& blocks); + + std::string file_name() { return _file_name; }; + static const int64_t LENGTH_SIZE = 8; + static const int64_t CHECKSUM_SIZE = 4; + +private: + std::string _file_name; + io::FileWriterPtr _file_writer; + int64_t _count; + int64_t _batch; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index ed59a81916..9fffd859ba 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -99,6 +99,7 @@ class SegmentLoader; class LookupConnectionCache; class RowCache; class CacheManager; +class WalManager; inline bool k_doris_exit = false; @@ -209,6 +210,7 @@ public: doris::vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } FileMetaCache* file_meta_cache() { return _file_meta_cache; } MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); } + WalManager* wal_mgr() { return _wal_manager.get(); } #ifdef BE_TEST void set_ready() { this->_s_ready = true; } void set_not_ready() { this->_s_ready = false; } @@ -344,6 +346,7 @@ private: std::unique_ptr _memtable_memory_limiter; std::unique_ptr _load_stream_stub_pool; std::unique_ptr _delta_writer_v2_pool; + std::shared_ptr _wal_manager; std::mutex _frontends_lock; std::map _frontends; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index ebdf0dd4fc..59c2b96d53 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -48,6 +48,7 @@ #include "olap/schema_cache.h" #include "olap/segment_loader.h" #include "olap/storage_engine.h" +#include "olap/wal_manager.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/block_spill_manager.h" @@ -210,6 +211,7 @@ Status ExecEnv::_init(const std::vector& store_paths, _memtable_memory_limiter = std::make_unique(); _load_stream_stub_pool = std::make_unique(); _delta_writer_v2_pool = std::make_unique(); + _wal_manager = WalManager::create_shared(this, config::group_commit_replay_wal_dir); _backend_client_cache->init_metrics("backend"); _frontend_client_cache->init_metrics("frontend"); @@ -232,6 +234,7 @@ Status ExecEnv::_init(const std::vector& store_paths, RETURN_IF_ERROR(_memtable_memory_limiter->init(MemInfo::mem_limit())); RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit())); + RETURN_IF_ERROR(_wal_manager->init()); _heartbeat_flags = new HeartbeatFlags(); _register_metrics(); @@ -526,6 +529,8 @@ void ExecEnv::destroy() { // Memory barrier to prevent other threads from accessing destructed resources _s_ready = false; + SAFE_STOP(_wal_manager); + _wal_manager.reset(); SAFE_STOP(_tablet_schema_cache); SAFE_STOP(_load_channel_mgr); SAFE_STOP(_scanner_scheduler); diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 94c0dba30a..45ccb8d6b4 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -27,6 +27,7 @@ #include "common/object_pool.h" #include "exec/data_sink.h" #include "io/fs/stream_load_pipe.h" +#include "olap/wal_manager.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/runtime_state.h" @@ -185,16 +186,17 @@ Status GroupCommitTable::get_first_block_load_queue( Status GroupCommitTable::_create_group_commit_load( int64_t table_id, std::shared_ptr& load_block_queue) { TStreamLoadPutRequest request; - std::stringstream ss; - ss << "insert into " << table_id << " select * from group_commit(\"table_id\"=\"" << table_id - << "\")"; - request.__set_load_sql(ss.str()); UniqueId load_id = UniqueId::gen_uid(); TUniqueId tload_id; tload_id.__set_hi(load_id.hi); tload_id.__set_lo(load_id.lo); + std::regex reg("-"); + std::string label = "group_commit_" + std::regex_replace(load_id.to_string(), reg, "_"); + std::stringstream ss; + ss << "insert into " << table_id << " WITH LABEL " << label + << " select * from group_commit(\"table_id\"=\"" << table_id << "\")"; + request.__set_load_sql(ss.str()); request.__set_loadId(tload_id); - std::string label = "group_commit_" + load_id.to_string(); request.__set_label(label); request.__set_token("group_commit"); // this is a fake, fe not check it now request.__set_max_filter_ratio(1.0); @@ -243,6 +245,7 @@ Status GroupCommitTable::_create_group_commit_load( std::unique_lock l(_lock); _load_block_queues.emplace(instance_id, load_block_queue); } + params.__set_import_label(label); st = _exec_plan_fragment(_db_id, table_id, label, txn_id, is_pipeline, params, pipeline_params); if (!st.ok()) { _finish_group_commit_load(_db_id, table_id, label, txn_id, instance_id, st, true, nullptr); @@ -308,9 +311,27 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ << ", instance_id=" << print_id(instance_id) << ", executor status=" << status.to_string() << ", request commit status=" << st.to_string(); + if (!prepare_failed) { + RETURN_IF_ERROR(_exec_env->wal_mgr()->add_wal_path(_db_id, table_id, txn_id, label)); + std::string wal_path; + RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path)); + RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal( + std::to_string(db_id), std::to_string(table_id), + std::vector {wal_path})); + } return st; } // TODO handle execute and commit error + if (!prepare_failed && !result_status.ok()) { + RETURN_IF_ERROR(_exec_env->wal_mgr()->add_wal_path(_db_id, table_id, txn_id, label)); + std::string wal_path; + RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path)); + RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(std::to_string(db_id), + std::to_string(table_id), + std::vector {wal_path})); + } else { + RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(txn_id)); + } std::stringstream ss; ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 18ef1871ec..59a89b44af 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -156,6 +156,9 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { if (request.__isset.load_job_id) { _runtime_state->set_load_job_id(request.load_job_id); } + if (request.__isset.wal_id) { + _runtime_state->set_wal_id(request.wal_id); + } if (request.query_options.__isset.is_report_success) { _is_report_success = request.query_options.is_report_success; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 5275a21c64..87f0f45b6a 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -229,6 +229,12 @@ public: const std::string& db_name() { return _db_name; } + void set_wal_id(int64_t wal_id) { _wal_id = wal_id; } + + int64_t wal_id() { return _wal_id; } + + const std::string& import_label() { return _import_label; } + const std::string& load_dir() const { return _load_dir; } void set_load_job_id(int64_t job_id) { _load_job_id = job_id; } @@ -547,6 +553,7 @@ private: std::string _db_name; std::string _load_dir; int64_t _load_job_id; + int64_t _wal_id = -1; // mini load int64_t _normal_row_number; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index b4be684fbb..844caea126 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -134,6 +134,7 @@ public: std::string db; int64_t db_id = -1; + int64_t wal_id = -1; std::string table; std::string label; // optional diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp index 1277132378..1a0dff2f90 100644 --- a/be/src/util/load_util.cpp +++ b/be/src/util/load_util.cpp @@ -67,6 +67,8 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co *format_type = TFileFormatType::FORMAT_PARQUET; } else if (iequal(format_str, "ORC")) { *format_type = TFileFormatType::FORMAT_ORC; + } else if (iequal(format_str, "WAL")) { + *format_type = TFileFormatType::FORMAT_WAL; } return; } @@ -82,6 +84,7 @@ bool LoadUtil::is_format_support_streaming(TFileFormatType::type format) { case TFileFormatType::FORMAT_CSV_LZO: case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_JSON: + case TFileFormatType::FORMAT_WAL: return true; default: return false; diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp new file mode 100644 index 0000000000..f616740e68 --- /dev/null +++ b/be/src/vec/exec/format/wal/wal_reader.cpp @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "wal_reader.h" + +#include "common/logging.h" +#include "olap/wal_manager.h" +#include "runtime/runtime_state.h" +namespace doris::vectorized { +WalReader::WalReader(RuntimeState* state) : _state(state) { + _wal_id = state->wal_id(); +} +WalReader::~WalReader() { + if (_wal_reader.get() != nullptr) { + _wal_reader->finalize(); + } +} +Status WalReader::init_reader() { + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path)); + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_reader(_wal_path, _wal_reader)); + return Status::OK(); +} +Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + PBlock pblock; + auto st = _wal_reader->read_block(pblock); + if (st.is()) { + LOG(INFO) << "read eof on wal:" << _wal_path; + *read_rows = 0; + *eof = true; + return Status::OK(); + } + if (!st.ok()) { + LOG(WARNING) << "Failed to read wal on path = " << _wal_path; + return st; + } + vectorized::Block tmp_block; + tmp_block.deserialize(pblock); + block->swap(tmp_block); + *read_rows = block->rows(); + VLOG_DEBUG << "read block rows:" << *read_rows; + return Status::OK(); +} + +void WalReader::string_split(const std::string& str, const std::string& splits, + std::vector& res) { + if (str == "") return; + std::string strs = str + splits; + size_t pos = strs.find(splits); + int step = splits.size(); + while (pos != strs.npos) { + std::string temp = strs.substr(0, pos); + res.push_back(temp); + strs = strs.substr(pos + step, strs.size()); + pos = strs.find(splits); + } +} + +Status WalReader::get_columns(std::unordered_map* name_to_type, + std::unordered_set* missing_cols) { + return Status::OK(); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/wal/wal_reader.h b/be/src/vec/exec/format/wal/wal_reader.h new file mode 100644 index 0000000000..c204cd9b5e --- /dev/null +++ b/be/src/vec/exec/format/wal/wal_reader.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include "olap/wal_reader.h" +#include "vec/exec/format/generic_reader.h" +namespace doris { +namespace vectorized { +struct ScannerCounter; +class WalReader : public GenericReader { +public: + WalReader(RuntimeState* state); + ~WalReader() override; + Status init_reader(); + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + Status get_columns(std::unordered_map* name_to_type, + std::unordered_set* missing_cols) override; + static void string_split(const std::string& str, const std::string& splits, + std::vector& res); + +private: + RuntimeState* _state; + std::string _wal_path; + std::string _path_split = "/"; + int64_t _wal_id; + std::shared_ptr _wal_reader = nullptr; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 505b6807b4..dd96ef88a4 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -32,6 +32,7 @@ #include #include "vec/data_types/data_type_factory.hpp" +#include "vec/exec/format/wal/wal_reader.h" // IWYU pragma: no_include #include "common/compiler_util.h" // IWYU pragma: keep @@ -258,6 +259,10 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo RETURN_IF_ERROR( _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof)); } + if (_params->format_type == TFileFormatType::FORMAT_WAL) { + block->swap(*_src_block_ptr); + break; + } // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result. if (read_rows > 0) { @@ -784,6 +789,11 @@ Status VFileScanner::_get_next_reader() { ->init_fetch_table_reader(_colname_to_value_range); break; } + case TFileFormatType::FORMAT_WAL: { + _cur_reader.reset(new WalReader(_state)); + init_status = ((WalReader*)(_cur_reader.get()))->init_reader(); + break; + } default: return Status::InternalError("Not supported file format: {}", _params->format_type); } diff --git a/be/src/vec/sink/vtablet_finder.cpp b/be/src/vec/sink/vtablet_finder.cpp index 5d030a7ffa..421b3ebb11 100644 --- a/be/src/vec/sink/vtablet_finder.cpp +++ b/be/src/vec/sink/vtablet_finder.cpp @@ -66,6 +66,7 @@ Status OlapTabletFinder::find_tablet(RuntimeState* state, Block* block, int row_ }, &stop_processing)); _num_filtered_rows++; + _filter_bitmap.Set(row_index, true); if (stop_processing) { return Status::EndOfFile("Encountered unqualified data, stop processing"); } diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h index 2258e15de0..28d71c6a1e 100644 --- a/be/src/vec/sink/vtablet_finder.h +++ b/be/src/vec/sink/vtablet_finder.h @@ -21,6 +21,7 @@ #include "common/status.h" #include "exec/tablet_info.h" +#include "util/bitmap.h" #include "vec/core/block.h" namespace doris::vectorized { @@ -36,7 +37,7 @@ public: enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK }; OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode) - : _vpartition(vpartition), _find_tablet_mode(mode) {}; + : _vpartition(vpartition), _find_tablet_mode(mode), _filter_bitmap(1024) {}; Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index, const VOlapTablePartition** partition, uint32_t& tablet_index, @@ -62,6 +63,8 @@ public: return _num_immutable_partition_filtered_rows; } + Bitmap& filter_bitmap() { return _filter_bitmap; } + private: VOlapTablePartitionParam* _vpartition; FindTabletMode _find_tablet_mode; @@ -70,6 +73,7 @@ private: int64_t _num_filtered_rows = 0; int64_t _num_immutable_partition_filtered_rows = 0; + Bitmap _filter_bitmap; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 39f4bd6ef9..bc351c0f65 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -134,7 +134,7 @@ VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, Status VOlapTableSink::init(const TDataSink& t_sink) { RETURN_IF_ERROR(AsyncWriterSink::init(t_sink)); - _writer->init_properties(_pool, _group_commit); + RETURN_IF_ERROR(_writer->init_properties(_pool, _group_commit)); return Status::OK(); } diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index a5208f070c..f0c9874578 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -57,6 +57,7 @@ #include "exec/data_sink.h" #include "exec/tablet_info.h" #include "gutil/ref_counted.h" +#include "olap/wal_writer.h" #include "runtime/decimalv2_value.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 322451cde3..1fdd1004f9 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -43,6 +43,7 @@ #include #include +#include "olap/wal_manager.h" #include "util/runtime_profile.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" @@ -969,11 +970,17 @@ void VNodeChannel::mark_close() { VTabletWriter::VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs) : AsyncResultWriter(output_exprs), _t_sink(t_sink) { _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc; + DCHECK(t_sink.__isset.olap_table_sink); + auto& table_sink = t_sink.olap_table_sink; + _db_id = table_sink.db_id; + _tb_id = table_sink.table_id; + _wal_id = table_sink.txn_id; } -void VTabletWriter::init_properties(doris::ObjectPool* pool, bool group_commit) { +Status VTabletWriter::init_properties(doris::ObjectPool* pool, bool group_commit) { _pool = pool; _group_commit = group_commit; + return Status::OK(); } void VTabletWriter::_send_batch_process() { @@ -1198,6 +1205,11 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { auto [part_ctx, part_func] = _get_partition_function(); RETURN_IF_ERROR(part_func->prepare(_state, *_output_row_desc, part_ctx.get())); } + if (_group_commit) { + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id, _tb_id, _wal_id, + _state->import_label())); + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer)); + } _prepare = true; return Status::OK(); @@ -1619,6 +1631,9 @@ Status VTabletWriter::close(Status exec_status) { [](const std::shared_ptr& ch) { ch->clear_all_blocks(); }); } + if (_wal_writer.get() != nullptr) { + _wal_writer->finalize(); + } return _close_status; } @@ -1659,6 +1674,7 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { _tablet_finder->clear_for_new_batch(); _row_distribution_watch.start(); auto num_rows = block->rows(); + _tablet_finder->filter_bitmap().Reset(num_rows); size_t partition_num = _vpartition->get_partitions().size(); if (!_vpartition->is_auto_partition() && partition_num == 1 && _tablet_finder->is_find_tablet_every_sink()) { @@ -1778,9 +1794,10 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { } if (_group_commit) { - _group_commit_block(&input_block, rows, + _group_commit_block(&input_block, num_rows, _block_convertor->num_filtered_rows() + - _tablet_finder->num_filtered_rows() - filtered_rows); + _tablet_finder->num_filtered_rows() - filtered_rows, + _state, block.get(), _block_convertor.get(), _tablet_finder.get()); } // TODO: Before load, we need to projection unuseful column // auto slots = _schema->tuple_desc()->slots(); @@ -1808,11 +1825,48 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { return Status::OK(); } -void VTabletWriter::_group_commit_block(Block* input_block, int64_t rows, int64_t filter_rows) { +Status VTabletWriter::write_wal(OlapTableBlockConvertor* block_convertor, + OlapTabletFinder* tablet_finder, vectorized::Block* block, + RuntimeState* state, int64_t num_rows, int64_t filtered_rows) { + PBlock pblock; + size_t uncompressed_bytes = 0, compressed_bytes = 0; + if (filtered_rows == 0) { + RETURN_IF_ERROR(block->serialize(state->be_exec_version(), &pblock, &uncompressed_bytes, + &compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); + RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); + } else { + auto cloneBlock = block->clone_without_columns(); + auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); + for (int i = 0; i < num_rows; ++i) { + if (block_convertor->num_filtered_rows() > 0 && + block_convertor->filter_bitmap().Get(i)) { + continue; + } + if (tablet_finder->num_filtered_rows() > 0 && tablet_finder->filter_bitmap().Get(i)) { + continue; + } + res_block.add_row(block, i); + } + RETURN_IF_ERROR(res_block.to_block().serialize(state->be_exec_version(), &pblock, + &uncompressed_bytes, &compressed_bytes, + segment_v2::CompressionTypePB::SNAPPY)); + RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); + } + return Status::OK(); +} + +void VTabletWriter::_group_commit_block(vectorized::Block* input_block, int64_t num_rows, + int64_t filter_rows, RuntimeState* state, + vectorized::Block* block, + OlapTableBlockConvertor* block_convertor, + OlapTabletFinder* tablet_finder) { + write_wal(block_convertor, tablet_finder, block, state, num_rows, filter_rows); +#ifndef BE_TEST auto* future_block = assert_cast(input_block); std::unique_lock l(*(future_block->lock)); - future_block->set_result(Status::OK(), rows, rows - filter_rows); + future_block->set_result(Status::OK(), num_rows, num_rows - filter_rows); future_block->cv->notify_all(); +#endif } } // namespace vectorized diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index d2de0415fd..1e7937cc80 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -33,6 +33,8 @@ #include #include + +#include "olap/wal_writer.h" // IWYU pragma: no_include #include // IWYU pragma: keep #include @@ -527,7 +529,7 @@ class VTabletWriter final : public AsyncResultWriter { public: VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); - void init_properties(ObjectPool* pool, bool group_commit); + Status init_properties(ObjectPool* pool, bool group_commit); Status append_block(Block& block) override; @@ -575,7 +577,14 @@ private: Status _incremental_open_node_channel(const std::vector& partitions); - void _group_commit_block(vectorized::Block* input_block, int64_t rows, int64_t filter_rows); + Status write_wal(OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder, + vectorized::Block* block, RuntimeState* state, int64_t num_rows, + int64_t filtered_rows); + + void _group_commit_block(vectorized::Block* input_block, int64_t num_rows, int64_t filter_rows, + RuntimeState* state, vectorized::Block* block, + OlapTableBlockConvertor* block_convertor, + OlapTabletFinder* tablet_finder); TDataSink _t_sink; @@ -673,5 +682,9 @@ private: RuntimeState* _state = nullptr; // not owned, set when open RuntimeProfile* _profile = nullptr; // not owned, set when open bool _group_commit = false; + std::shared_ptr _wal_writer = nullptr; + int64_t _tb_id; + int64_t _db_id; + int64_t _wal_id; }; } // namespace doris::vectorized diff --git a/be/test/exec/test_data/wal_scanner/wal b/be/test/exec/test_data/wal_scanner/wal new file mode 100644 index 0000000000000000000000000000000000000000..976c60e8489e470836a46ed1e700b58b0dab4745 GIT binary patch literal 132 zcmb + +#include +#include +#include +#include + +#include "common/config.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "gen_cpp/internal_service.pb.h" +#include "io/fs/local_file_system.h" +#include "runtime/decimalv2_value.h" +#include "runtime/exec_env.h" +#include "runtime/result_queue_mgr.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/types.h" +#include "service/brpc.h" +#include "util/brpc_client_cache.h" +#include "util/cpu_info.h" +#include "util/debug/leakcheck_disabler.h" +#include "util/proto_util.h" + +namespace doris { + +extern TLoadTxnBeginResult k_stream_load_begin_result; +extern Status k_stream_load_plan_status; +extern std::string k_request_line; + +ExecEnv* _env = nullptr; +std::string wal_dir = "./wal_test"; +std::string tmp_dir = "./wal_test/tmp"; + +class WalManagerTest : public testing::Test { +public: + WalManagerTest() {} + virtual ~WalManagerTest() {} + void SetUp() override { + prepare(); + _env = ExecEnv::GetInstance(); + _env->_master_info = new TMasterInfo(); + _env->_master_info->network_address.hostname = "host name"; + _env->_master_info->network_address.port = 1234; + _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); + _env->_internal_client_cache = new BrpcClientCache(); + _env->_function_client_cache = new BrpcClientCache(); + _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_wal_manager = WalManager::create_shared(_env, wal_dir); + k_stream_load_begin_result = TLoadTxnBeginResult(); + k_stream_load_plan_status = Status::OK(); + } + void TearDown() override { + io::global_local_filesystem()->delete_directory(wal_dir); + SAFE_DELETE(_env->_function_client_cache); + SAFE_DELETE(_env->_internal_client_cache); + SAFE_DELETE(_env->_master_info); + } + + void prepare() { io::global_local_filesystem()->create_directory(wal_dir); } + + void createWal(const std::string& wal_path) { + auto wal_writer = WalWriter(wal_path); + wal_writer.init(); + wal_writer.finalize(); + } +}; + +TEST_F(WalManagerTest, recovery_normal) { + k_request_line = "{\"Status\": \"Success\", \"Message\": \"Test\"}"; + + std::string db_id = "1"; + std::string tb_1_id = "1"; + std::string wal_100_id = "100"; + std::string wal_101_id = "101"; + std::string tb_2_id = "2"; + std::string wal_200_id = "200"; + std::string wal_201_id = "201"; + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_1_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_100_id; + std::string wal_101 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_101_id; + createWal(wal_100); + createWal(wal_101); + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_2_id); + std::string wal_200 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_200_id; + std::string wal_201 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_201_id; + createWal(wal_200); + createWal(wal_201); + _env->wal_mgr()->init(); + + while (_env->wal_mgr()->get_wal_table_size(tb_1_id) > 0 || + _env->wal_mgr()->get_wal_table_size(tb_2_id) > 0) { + sleep(1); + continue; + } + ASSERT_TRUE(!std::filesystem::exists(wal_100)); + ASSERT_TRUE(!std::filesystem::exists(wal_101)); + ASSERT_TRUE(!std::filesystem::exists(wal_200)); + ASSERT_TRUE(!std::filesystem::exists(wal_201)); +} + +TEST_F(WalManagerTest, not_need_recovery) { + std::string db_id = "1"; + std::string tb_id = "1"; + std::string wal_id = "100"; + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_id + "/" + wal_id; + createWal(wal_100); + + _env->wal_mgr()->init(); + + while (_env->wal_mgr()->get_wal_table_size("1") > 0) { + sleep(1); + continue; + } + ASSERT_TRUE(!std::filesystem::exists(wal_100)); +} + +TEST_F(WalManagerTest, recover_fail) { + k_request_line = "{\"Status\": \"Fail\", \"Message\": \"Test\"}"; + config::group_commit_replay_wal_retry_num = 3; + config::group_commit_replay_wal_retry_interval_seconds = 1; + + std::string db_id = "1"; + std::string tb_id = "1"; + std::string wal_id = "100"; + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_id + "/" + wal_id; + createWal(wal_100); + + _env->wal_mgr()->init(); + + while (_env->wal_mgr()->get_wal_table_size("1") > 0) { + sleep(1); + continue; + } + std::string tmp_file = tmp_dir + "/" + db_id + "_" + tb_id + "_" + wal_id; + ASSERT_TRUE(std::filesystem::exists(tmp_file)); + ASSERT_TRUE(!std::filesystem::exists(wal_100)); +} + +} // namespace doris \ No newline at end of file diff --git a/be/test/olap/wal_reader_writer_test.cpp b/be/test/olap/wal_reader_writer_test.cpp new file mode 100644 index 0000000000..070f2b8942 --- /dev/null +++ b/be/test/olap/wal_reader_writer_test.cpp @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include + +#include + +#include "agent/be_exec_version_manager.h" +#include "common/object_pool.h" +#include "gen_cpp/internal_service.pb.h" +#include "gmock/gmock.h" +#include "io/fs/local_file_system.h" +#include "olap/wal_reader.h" +#include "olap/wal_writer.h" +#include "runtime/exec_env.h" +#include "service/brpc.h" +#include "testutil/test_util.h" +#include "util/proto_util.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" +#include "vec/runtime/vdata_stream_mgr.h" +#include "vec/runtime/vdata_stream_recvr.h" + +using ::testing::_; +using ::testing::Return; +using ::testing::SetArgPointee; +using std::string; + +namespace doris { + +class WalReaderWriterTest : public testing::Test { +public: + // create a mock cgroup folder + virtual void SetUp() { io::global_local_filesystem()->create_directory(_s_test_data_path); } + + // delete the mock cgroup folder + virtual void TearDown() { io::global_local_filesystem()->delete_directory(_s_test_data_path); } + + static std::string _s_test_data_path; +}; + +std::string WalReaderWriterTest::_s_test_data_path = "./log/wal_reader_writer_test"; +size_t block_rows = 1024; + +void covert_block_to_pb( + const vectorized::Block& block, PBlock* pblock, + segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + Status st = block.serialize(BeExecVersionManager::get_newest_version(), pblock, + &uncompressed_bytes, &compressed_bytes, compression_type); + EXPECT_TRUE(st.ok()); + EXPECT_TRUE(uncompressed_bytes >= compressed_bytes); + EXPECT_EQ(compressed_bytes, pblock->column_values().size()); + + const vectorized::ColumnWithTypeAndName& type_and_name = + block.get_columns_with_type_and_name()[0]; + EXPECT_EQ(type_and_name.name, pblock->column_metas()[0].name()); +} + +void generate_block(PBlock& pblock, int row_index) { + auto vec = vectorized::ColumnVector::create(); + auto& data = vec->get_data(); + for (int i = 0; i < block_rows; ++i) { + data.push_back(i + row_index); + } + vectorized::DataTypePtr data_type(std::make_shared()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, "test_int"); + vectorized::Block block({type_and_name}); + covert_block_to_pb(block, &pblock, segment_v2::CompressionTypePB::SNAPPY); +} + +TEST_F(WalReaderWriterTest, TestWriteAndRead1) { + std::string file_name = _s_test_data_path + "/abcd123.txt"; + auto wal_writer = WalWriter(file_name); + wal_writer.init(); + size_t file_len = 0; + int64_t file_size = -1; + // add 1 block + { + PBlock pblock; + generate_block(pblock, 0); + + EXPECT_EQ(Status::OK(), wal_writer.append_blocks(std::vector {&pblock})); + file_len += pblock.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + io::global_local_filesystem()->file_size(file_name, &file_size); + EXPECT_EQ(file_len, file_size); + } + // add 2 block + { + PBlock pblock; + generate_block(pblock, 1024); + file_len += pblock.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + + PBlock pblock1; + generate_block(pblock1, 2048); + file_len += pblock1.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + + EXPECT_EQ(Status::OK(), wal_writer.append_blocks(std::vector {&pblock, &pblock1})); + io::global_local_filesystem()->file_size(file_name, &file_size); + EXPECT_EQ(file_len, file_size); + } + wal_writer.finalize(); + // read block + auto wal_reader = WalReader(file_name); + wal_reader.init(); + auto block_count = 0; + while (true) { + doris::PBlock pblock; + Status st = wal_reader.read_block(pblock); + EXPECT_TRUE(st.ok() || st.is()); + if (st.ok()) { + ++block_count; + } else if (st.is()) { + break; + } + vectorized::Block block; + block.deserialize(pblock); + EXPECT_EQ(block_rows, block.rows()); + } + wal_reader.finalize(); + EXPECT_EQ(3, block_count); +} +} // namespace doris \ No newline at end of file diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp index 122232c74f..4cd7265797 100644 --- a/be/test/vec/exec/vtablet_sink_test.cpp +++ b/be/test/vec/exec/vtablet_sink_test.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -32,7 +33,9 @@ #include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/internal_service.pb.h" #include "gtest/gtest_pred_impl.h" +#include "io/fs/local_file_system.h" #include "olap/olap_define.h" +#include "olap/wal_manager.h" #include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptor_helper.h" @@ -358,8 +361,12 @@ public: k_add_batch_status = Status::OK(); _env = ExecEnv::GetInstance(); _env->_master_info = new TMasterInfo(); + _env->_master_info->network_address.hostname = "host name"; + _env->_master_info->network_address.port = 1234; _env->_internal_client_cache = new BrpcClientCache(); _env->_function_client_cache = new BrpcClientCache(); + _env->_wal_manager = WalManager::create_shared(_env, wal_dir); + _env->wal_mgr()->init(); ThreadPoolBuilder("SendBatchThreadPool") .set_min_threads(1) .set_max_threads(5) @@ -370,6 +377,7 @@ public: } void TearDown() override { + io::global_local_filesystem()->delete_directory(wal_dir); SAFE_DELETE(_env->_internal_client_cache); SAFE_DELETE(_env->_function_client_cache); SAFE_DELETE(_env->_master_info); @@ -488,6 +496,7 @@ public: private: ExecEnv* _env = nullptr; brpc::Server* _server = nullptr; + std::string wal_dir = "./wal_test"; }; TEST_F(VOlapTableSinkTest, normal) { @@ -835,5 +844,259 @@ TEST_F(VOlapTableSinkTest, decimal) { ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0); ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0); } + +TEST_F(VOlapTableSinkTest, group_commit) { + // start brpc service first + _server = new brpc::Server(); + auto service = new VTestInternalService(); + ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); + brpc::ServerOptions options; + { + debug::ScopedLeakCheckDisabler disable_lsan; + _server->Start(4356, &options); + } + + TUniqueId fragment_id; + TQueryOptions query_options; + query_options.batch_size = 1; + query_options.be_exec_version = 0; + RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env); + state.init_mem_trackers(TUniqueId()); + + ObjectPool obj_pool; + TDescriptorTable tdesc_tbl; + auto t_data_sink = get_data_sink(&tdesc_tbl); + + // crate desc_tabl + DescriptorTbl* desc_tbl = nullptr; + auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); + ASSERT_TRUE(st.ok()); + state._desc_tbl = desc_tbl; + state._wal_id = 789; + state._import_label = "test"; + + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string(); + + RowDescriptor row_desc(*desc_tbl, {0}, {false}); + service->_row_desc = &row_desc; + std::set output_set; + service->_output_set = &output_set; + + std::vector exprs; + VOlapTableSink sink(&obj_pool, row_desc, exprs, true); + + // init + st = sink.init(t_data_sink); + ASSERT_TRUE(st.ok()); + // prepare + st = sink.prepare(&state); + ASSERT_TRUE(st.ok()); + // open + st = sink.open(&state); + ASSERT_TRUE(st.ok()); + + int slot_count = tuple_desc->slots().size(); + std::vector columns(slot_count); + for (int i = 0; i < slot_count; i++) { + columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column(); + } + + int col_idx = 0; + auto* column_ptr = columns[col_idx++].get(); + auto column_vector_int = column_ptr; + int int_val = 12; + column_vector_int->insert_data((const char*)&int_val, 0); + int_val = 13; + column_vector_int->insert_data((const char*)&int_val, 0); + int_val = 14; + column_vector_int->insert_data((const char*)&int_val, 0); + + column_ptr = columns[col_idx++].get(); + auto column_vector_bigint = column_ptr; + int64_t int64_val = 9; + column_vector_bigint->insert_data((const char*)&int64_val, 0); + int64_val = 25; + column_vector_bigint->insert_data((const char*)&int64_val, 0); + int64_val = 50; + column_vector_bigint->insert_data((const char*)&int64_val, 0); + + column_ptr = columns[col_idx++].get(); + auto column_vector_str = column_ptr; + column_vector_str->insert_data("abc", 3); + column_vector_str->insert_data("abcd", 4); + column_vector_str->insert_data("1234567890", 10); + + vectorized::Block block; + col_idx = 0; + for (const auto slot_desc : tuple_desc->slots()) { + block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + vectorized::Block org_block(block); + + // send + st = sink.send(&state, &block); + ASSERT_TRUE(st.ok()); + // close + st = sink.close(&state, Status::OK()); + ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close failed. ") + << st.to_string(); + + // each node has a eof + ASSERT_EQ(2, service->_eof_counters); + ASSERT_EQ(2 * 3, service->_row_counters); + + // 2node * 2 + ASSERT_EQ(0, state.num_rows_load_filtered()); + + std::string wal_path = wal_dir + "/" + std::to_string(t_data_sink.olap_table_sink.db_id) + "/" + + std::to_string(t_data_sink.olap_table_sink.table_id) + "/" + + std::to_string(t_data_sink.olap_table_sink.txn_id) + "_" + + state.import_label(); + doris::PBlock pblock; + auto wal_reader = WalReader(wal_path); + st = wal_reader.init(); + ASSERT_TRUE(st.ok()); + st = wal_reader.read_block(pblock); + ASSERT_TRUE(st.ok()); + vectorized::Block wal_block; + wal_block.deserialize(pblock); + ASSERT_TRUE(st.ok() || st.is()); + ASSERT_EQ(org_block.rows(), wal_block.rows()); + for (int i = 0; i < org_block.rows(); i++) { + std::string srcRow = org_block.dump_one_line(i, org_block.columns()); + std::string walRow = wal_block.dump_one_line(i, org_block.columns()); + ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0); + } +} + +TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) { + // start brpc service first + _server = new brpc::Server(); + auto service = new VTestInternalService(); + ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); + brpc::ServerOptions options; + { + debug::ScopedLeakCheckDisabler disable_lsan; + _server->Start(4356, &options); + } + + TUniqueId fragment_id; + TQueryOptions query_options; + query_options.batch_size = 1; + query_options.be_exec_version = 0; + RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env); + state.init_mem_trackers(TUniqueId()); + + ObjectPool obj_pool; + TDescriptorTable tdesc_tbl; + auto t_data_sink = get_data_sink(&tdesc_tbl); + + // crate desc_tabl + DescriptorTbl* desc_tbl = nullptr; + auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); + ASSERT_TRUE(st.ok()); + state._desc_tbl = desc_tbl; + state._wal_id = 789; + state._import_label = "test"; + + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string(); + + RowDescriptor row_desc(*desc_tbl, {0}, {false}); + service->_row_desc = &row_desc; + std::set output_set; + service->_output_set = &output_set; + + std::vector exprs; + VOlapTableSink sink(&obj_pool, row_desc, exprs, true); + + // init + st = sink.init(t_data_sink); + ASSERT_TRUE(st.ok()); + // prepare + st = sink.prepare(&state); + ASSERT_TRUE(st.ok()); + // open + st = sink.open(&state); + ASSERT_TRUE(st.ok()); + + int slot_count = tuple_desc->slots().size(); + std::vector columns(slot_count); + for (int i = 0; i < slot_count; i++) { + columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column(); + } + + int col_idx = 0; + auto* column_ptr = columns[col_idx++].get(); + auto column_vector_int = column_ptr; + int int_val = 12; + column_vector_int->insert_data((const char*)&int_val, 0); + int_val = 13; + column_vector_int->insert_data((const char*)&int_val, 0); + int_val = 14; + column_vector_int->insert_data((const char*)&int_val, 0); + + column_ptr = columns[col_idx++].get(); + auto column_vector_bigint = column_ptr; + int64_t int64_val = 9; + column_vector_bigint->insert_data((const char*)&int64_val, 0); + int64_val = 25; + column_vector_bigint->insert_data((const char*)&int64_val, 0); + int64_val = 50; + column_vector_bigint->insert_data((const char*)&int64_val, 0); + + column_ptr = columns[col_idx++].get(); + auto column_vector_str = column_ptr; + column_vector_str->insert_data("abc", 3); + column_vector_str->insert_data("abcd", 4); + column_vector_str->insert_data("abcde1234567890", 15); + + vectorized::Block block; + col_idx = 0; + for (const auto slot_desc : tuple_desc->slots()) { + block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + vectorized::Block org_block(block); + + // send + st = sink.send(&state, &block); + ASSERT_TRUE(st.ok()); + // close + st = sink.close(&state, Status::OK()); + ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close failed. ") + << st.to_string(); + + // each node has a eof + ASSERT_EQ(2, service->_eof_counters); + ASSERT_EQ(2 * 2, service->_row_counters); + + // 2node * 2 + ASSERT_EQ(1, state.num_rows_load_filtered()); + + std::string wal_path = wal_dir + "/" + std::to_string(t_data_sink.olap_table_sink.db_id) + "/" + + std::to_string(t_data_sink.olap_table_sink.table_id) + "/" + + std::to_string(t_data_sink.olap_table_sink.txn_id) + "_" + + state.import_label(); + doris::PBlock pblock; + auto wal_reader = WalReader(wal_path); + st = wal_reader.init(); + ASSERT_TRUE(st.ok()); + st = wal_reader.read_block(pblock); + ASSERT_TRUE(st.ok()); + vectorized::Block wal_block; + wal_block.deserialize(pblock); + ASSERT_TRUE(st.ok() || st.is()); + ASSERT_EQ(org_block.rows() - 1, wal_block.rows()); + for (int i = 0; i < wal_block.rows(); i++) { + std::string srcRow = org_block.dump_one_line(i, org_block.columns()); + std::string walRow = wal_block.dump_one_line(i, org_block.columns()); + ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0); + } +} } // namespace vectorized } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index fc867847f1..b877555ee3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -354,6 +354,9 @@ public class DataDescription implements InsertStmt.DataDesc { case FORMAT_JSON: this.fileFormat = "json"; break; + case FORMAT_WAL: + this.fileFormat = "wal"; + break; default: this.fileFormat = "unknown"; break; @@ -1114,6 +1117,7 @@ public class DataDescription implements InsertStmt.DataDesc { && !fileFormat.equalsIgnoreCase(FeConstants.csv) && !fileFormat.equalsIgnoreCase("orc") && !fileFormat.equalsIgnoreCase("json") + && !fileFormat.equalsIgnoreCase("wal") && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names) && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types) && !fileFormat.equalsIgnoreCase("hive_text")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index ae471ce8dd..01198f9493 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -563,6 +563,8 @@ public class Util { // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format. || lowerFileFormat.equals(FeConstants.text)) { return TFileFormatType.FORMAT_CSV_PLAIN; + } else if (lowerFileFormat.equals("wal")) { + return TFileFormatType.FORMAT_WAL; } else { return TFileFormatType.FORMAT_UNKNOWN; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 8e6976b5ef..b1638be5b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -274,7 +274,8 @@ public abstract class FileQueryScanNode extends FileScanNode { TFileFormatType fileFormatType = getFileFormatType(); params.setFormatType(fileFormatType); boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON; - if (isCsvOrJson) { + boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL; + if (isCsvOrJson || isWal) { params.setFileAttributes(getFileAttributes()); if (getLocationType() == TFileType.FILE_STREAM) { params.setFileType(TFileType.FILE_STREAM); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index e2a9cc23ab..304cbe6ab5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1174,8 +1174,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (Strings.isNullOrEmpty(cluster)) { cluster = SystemInfoService.DEFAULT_CLUSTER; } - - if (Strings.isNullOrEmpty(request.getToken())) { + if (request.isSetAuthCode()) { + // TODO(cmy): find a way to check + } else if (Strings.isNullOrEmpty(request.getToken())) { checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); } @@ -2040,7 +2041,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { cluster = SystemInfoService.DEFAULT_CLUSTER; } ConnectContext ctx = new ConnectContext(); - if (Strings.isNullOrEmpty(request.getToken())) { + if (request.isSetAuthCode()) { + // TODO(cmy): find a way to check + } else if (Strings.isNullOrEmpty(request.getToken())) { checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index ff55bb1d7a..69b002c3cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -25,10 +25,12 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; @@ -94,6 +96,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio protected static String DEFAULT_COLUMN_SEPARATOR = ","; protected static final String DEFAULT_LINE_DELIMITER = "\n"; public static final String FORMAT = "format"; + public static final String TABLE_ID = "table_id"; public static final String COLUMN_SEPARATOR = "column_separator"; public static final String LINE_DELIMITER = "line_delimiter"; protected static final String JSON_ROOT = "json_root"; @@ -114,6 +117,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio protected static final ImmutableSet FILE_FORMAT_PROPERTIES = new ImmutableSet.Builder() .add(FORMAT) + .add(TABLE_ID) .add(JSON_ROOT) .add(JSON_PATHS) .add(STRIP_OUTER_ARRAY) @@ -157,6 +161,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio private boolean fuzzyParse; private boolean trimDoubleQuotes; private int skipLines; + private long tableId; public abstract TFileType getTFileType(); @@ -236,10 +241,14 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio case "avro": this.fileFormatType = TFileFormatType.FORMAT_AVRO; break; + case "wal": + this.fileFormatType = TFileFormatType.FORMAT_WAL; + break; default: throw new AnalysisException("format:" + formatString + " is not supported."); } + tableId = Long.valueOf(validParams.getOrDefault(TABLE_ID, "-1")).longValue(); columnSeparator = validParams.getOrDefault(COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR); if (Strings.isNullOrEmpty(columnSeparator)) { throw new AnalysisException("column_separator can not be empty."); @@ -401,6 +410,20 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio throw new AnalysisException("No Alive backends"); } + if (this.fileFormatType == TFileFormatType.FORMAT_WAL) { + List fileColumns = new ArrayList<>(); + Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId); + List tableColumns = table.getBaseSchema(false); + for (int i = 1; i <= tableColumns.size(); i++) { + fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getDataType(), true)); + } + Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn(); + if (deleteSignColumn != null) { + fileColumns.add(new Column("c" + (tableColumns.size() + 1), deleteSignColumn.getDataType(), true)); + } + return fileColumns; + } + TNetworkAddress address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); try { PFetchTableSchemaRequest request = getFetchTableStructureRequest(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 6ef2d512ac..b1ccf7db08 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -666,6 +666,16 @@ struct TStreamLoadWithLoadStatusResult { 6: optional i64 unselected_rows } +struct TCheckWalRequest { + 1: optional i64 wal_id + 2: optional i64 db_id +} + +struct TCheckWalResult { + 1: optional Status.TStatus status + 2: optional bool need_recovery +} + struct TKafkaRLTaskProgress { 1: required map partitionCmtOffset } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 2588996cd4..cc7102e408 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -445,6 +445,8 @@ struct TExecPlanFragmentParams { // scan node id -> scan range params, only for external file scan 24: optional map file_scan_params + + 25: optional i64 wal_id } struct TExecPlanFragmentParamsList { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 11c72e5ae5..3cc1c56911 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -119,6 +119,7 @@ enum TFileFormatType { FORMAT_AVRO, FORMAT_CSV_LZ4BLOCK, FORMAT_CSV_SNAPPYBLOCK, + FORMAT_WAL, } // In previous versions, the data compression format and file format were stored together, as TFileFormatType,