From 706463781c3fc6891d9bd22f3acc7b87e1123b32 Mon Sep 17 00:00:00 2001 From: meiyi Date: Tue, 2 Jan 2024 15:52:03 +0800 Subject: [PATCH] [refactor](group commit) refactor group commit wal code (#29375) --- be/src/http/action/http_stream.cpp | 1 - be/src/http/action/stream_load.cpp | 1 - be/src/http/utils.cpp | 2 +- be/src/olap/{ => wal}/wal_dirs_info.cpp | 42 ++----- be/src/olap/{ => wal}/wal_dirs_info.h | 11 +- be/src/olap/{ => wal}/wal_info.cpp | 9 +- be/src/olap/{ => wal}/wal_info.h | 2 +- be/src/olap/{ => wal}/wal_manager.cpp | 48 ++++---- be/src/olap/{ => wal}/wal_manager.h | 108 ++++++++++-------- be/src/olap/{ => wal}/wal_reader.cpp | 2 +- be/src/olap/{ => wal}/wal_reader.h | 0 be/src/olap/{ => wal}/wal_table.cpp | 16 ++- be/src/olap/{ => wal}/wal_table.h | 39 ++++--- be/src/olap/{ => wal}/wal_writer.cpp | 8 +- be/src/olap/{ => wal}/wal_writer.h | 6 - be/src/runtime/exec_env_init.cpp | 2 +- be/src/runtime/group_commit_mgr.cpp | 13 --- be/src/runtime/group_commit_mgr.h | 2 +- be/src/service/internal_service.cpp | 2 +- .../format/arrow/arrow_pip_input_stream.cpp | 1 - .../exec/format/arrow/arrow_stream_reader.cpp | 1 - be/src/vec/exec/format/wal/wal_reader.cpp | 6 +- be/src/vec/exec/format/wal/wal_reader.h | 9 +- be/src/vec/sink/volap_table_sink.h | 1 - be/src/vec/sink/writer/vtablet_writer.cpp | 1 - be/src/vec/sink/writer/vwal_writer.cpp | 18 --- be/src/vec/sink/writer/vwal_writer.h | 61 +--------- be/test/http/stream_load_test.cpp | 2 +- be/test/olap/wal_manager_test.cpp | 2 +- be/test/olap/wal_reader_writer_test.cpp | 4 +- be/test/vec/exec/vtablet_sink_test.cpp | 2 +- be/test/vec/exec/vwal_scanner_test.cpp | 2 +- .../doris/planner/GroupCommitPlanner.java | 2 + 33 files changed, 164 insertions(+), 262 deletions(-) rename be/src/olap/{ => wal}/wal_dirs_info.cpp (88%) rename be/src/olap/{ => wal}/wal_dirs_info.h (92%) rename be/src/olap/{ => wal}/wal_info.cpp (96%) rename be/src/olap/{ => wal}/wal_info.h (97%) rename be/src/olap/{ => wal}/wal_manager.cpp (95%) rename be/src/olap/{ => wal}/wal_manager.h (85%) rename be/src/olap/{ => wal}/wal_reader.cpp (99%) rename be/src/olap/{ => wal}/wal_reader.h (100%) rename be/src/olap/{ => wal}/wal_table.cpp (98%) rename be/src/olap/{ => wal}/wal_table.h (91%) rename be/src/olap/{ => wal}/wal_writer.cpp (96%) rename be/src/olap/{ => wal}/wal_writer.h (91%) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 15d2a1a18d..a965270783 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -46,7 +46,6 @@ #include "http/utils.h" #include "io/fs/stream_load_pipe.h" #include "olap/storage_engine.h" -#include "olap/wal_manager.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index a702be65d0..6f6b7ad6b0 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -51,7 +51,6 @@ #include "http/utils.h" #include "io/fs/stream_load_pipe.h" #include "olap/storage_engine.h" -#include "olap/wal_manager.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/group_commit_mgr.h" diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index 9fbf4c7930..72b80f875e 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -39,7 +39,7 @@ #include "http/http_status.h" #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include "runtime/exec_env.h" #include "util/path_util.h" #include "util/url_coding.h" diff --git a/be/src/olap/wal_dirs_info.cpp b/be/src/olap/wal/wal_dirs_info.cpp similarity index 88% rename from be/src/olap/wal_dirs_info.cpp rename to be/src/olap/wal/wal_dirs_info.cpp index 1b7216b072..19ad256277 100644 --- a/be/src/olap/wal_dirs_info.cpp +++ b/be/src/olap/wal/wal_dirs_info.cpp @@ -15,11 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "olap/wal_dirs_info.h" - -#include -#include -#include +#include "olap/wal/wal_dirs_info.h" #include "common/config.h" #include "common/status.h" @@ -37,36 +33,23 @@ size_t WalDirInfo::get_limit() { return _limit; } -size_t WalDirInfo::get_used() { - std::shared_lock rlock(_lock); - return _used; -} - -size_t WalDirInfo::get_pre_allocated() { - std::shared_lock rlock(_lock); - return _pre_allocated; -} - -Status WalDirInfo::set_limit(size_t limit) { +void WalDirInfo::set_limit(size_t limit) { std::unique_lock wlock(_lock); _limit = limit; - return Status::OK(); } -Status WalDirInfo::set_used(size_t used) { +void WalDirInfo::set_used(size_t used) { std::unique_lock wlock(_lock); _used = used; - return Status::OK(); } -Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { +void WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { std::unique_lock wlock(_lock); if (is_add_pre_allocated) { _pre_allocated += pre_allocated; } else { _pre_allocated -= pre_allocated; } - return Status::OK(); } size_t WalDirInfo::available() { @@ -77,7 +60,7 @@ size_t WalDirInfo::available() { Status WalDirInfo::update_wal_dir_limit(size_t limit) { if (limit != static_cast(-1)) { - RETURN_IF_ERROR(set_limit(limit)); + set_limit(limit); } else { size_t available_bytes; size_t disk_capacity_bytes; @@ -91,24 +74,25 @@ Status WalDirInfo::update_wal_dir_limit(size_t limit) { } size_t wal_dir_size = 0; RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); - RETURN_IF_ERROR(set_limit(wal_disk_limit)); + // TODO should be wal_disk_limit + wal_dir_size + set_limit(wal_disk_limit); } return Status::OK(); } Status WalDirInfo::update_wal_dir_used(size_t used) { if (used != static_cast(-1)) { - RETURN_IF_ERROR(set_used(used)); + set_used(used); } else { size_t wal_dir_size = 0; RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); - RETURN_IF_ERROR(set_used(wal_dir_size)); + set_used(wal_dir_size); } return Status::OK(); } Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { - RETURN_IF_ERROR(set_pre_allocated(pre_allocated, is_add_pre_allocated)); + set_pre_allocated(pre_allocated, is_add_pre_allocated); return Status::OK(); } @@ -128,12 +112,6 @@ Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, return Status::OK(); } -Status WalDirsInfo::clear() { - std::unique_lock wlock(_lock); - _wal_dirs_info_vec.clear(); - return Status::OK(); -} - std::string WalDirsInfo::get_available_random_wal_dir() { if (_wal_dirs_info_vec.size() == 1) { return (*_wal_dirs_info_vec.begin())->get_wal_dir(); diff --git a/be/src/olap/wal_dirs_info.h b/be/src/olap/wal/wal_dirs_info.h similarity index 92% rename from be/src/olap/wal_dirs_info.h rename to be/src/olap/wal/wal_dirs_info.h index 048cd8f956..91a26af2b5 100644 --- a/be/src/olap/wal_dirs_info.h +++ b/be/src/olap/wal/wal_dirs_info.h @@ -1,4 +1,3 @@ - // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -41,11 +40,10 @@ public: _pre_allocated(pre_allocated) {} std::string get_wal_dir(); size_t get_limit(); - size_t get_used(); - size_t get_pre_allocated(); - Status set_limit(size_t limit); - Status set_used(size_t used); - Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + void set_limit(size_t limit); + void set_used(size_t used); + // TODO increase_pre_allocated and decrease_pre_allocated + void set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); size_t available(); Status update_wal_dir_limit(size_t limit = -1); Status update_wal_dir_used(size_t used = -1); @@ -66,7 +64,6 @@ public: WalDirsInfo() = default; ~WalDirsInfo() = default; Status add(const std::string& wal_dir, size_t limit, size_t used, size_t pre_allocated); - Status clear(); std::string get_available_random_wal_dir(); size_t get_max_available_size(); Status update_wal_dir_limit(std::string wal_dir, size_t limit = -1); diff --git a/be/src/olap/wal_info.cpp b/be/src/olap/wal/wal_info.cpp similarity index 96% rename from be/src/olap/wal_info.cpp rename to be/src/olap/wal/wal_info.cpp index d93593cfaf..3c6fc4190b 100644 --- a/be/src/olap/wal_info.cpp +++ b/be/src/olap/wal/wal_info.cpp @@ -15,26 +15,31 @@ // specific language governing permissions and limitations // under the License. -#include "olap/wal_info.h" +#include "olap/wal/wal_info.h" + namespace doris { WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms) : _wal_id(wal_id), _wal_path(wal_path), _retry_num(retry_num), _start_time_ms(start_time_ms) {} -WalInfo::~WalInfo() {} + int64_t WalInfo::get_wal_id() { return _wal_id; } + std::string WalInfo::get_wal_path() { return _wal_path; } + int64_t WalInfo::get_retry_num() { return _retry_num; } + int64_t WalInfo::get_start_time_ms() { return _start_time_ms; } + void WalInfo::add_retry_num() { _retry_num++; } diff --git a/be/src/olap/wal_info.h b/be/src/olap/wal/wal_info.h similarity index 97% rename from be/src/olap/wal_info.h rename to be/src/olap/wal/wal_info.h index 0383ac68f2..0365e23abd 100644 --- a/be/src/olap/wal_info.h +++ b/be/src/olap/wal/wal_info.h @@ -21,7 +21,7 @@ namespace doris { class WalInfo { public: WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms); - ~WalInfo(); + ~WalInfo() = default; int64_t get_wal_id(); int64_t get_retry_num(); int64_t get_start_time_ms(); diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp similarity index 95% rename from be/src/olap/wal_manager.cpp rename to be/src/olap/wal/wal_manager.cpp index a9c3579469..9af91cf7b5 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal/wal_manager.cpp @@ -15,41 +15,31 @@ // specific language governing permissions and limitations // under the License. -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include -#include -#include #include -#include #include -#include -#include #include #include #include -#include #include #include "common/config.h" #include "common/status.h" #include "io/fs/local_file_system.h" -#include "olap/wal_dirs_info.h" -#include "olap/wal_writer.h" -#include "runtime/client_cache.h" +#include "olap/wal/wal_dirs_info.h" +#include "olap/wal/wal_writer.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/plan_fragment_executor.h" -#include "runtime/stream_load/stream_load_context.h" #include "util/parse_util.h" -#include "util/path_util.h" -#include "util/thrift_rpc_helper.h" #include "vec/exec/format/wal/wal_reader.h" namespace doris { WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) - : _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false) { + : _exec_env(exec_env), _stop(false), _stop_background_threads_latch(1) { doris::vectorized::WalReader::string_split(wal_dir_list, ";", _wal_dirs); static_cast(ThreadPoolBuilder("GroupCommitReplayWalThreadPool") .set_min_threads(1) @@ -62,10 +52,14 @@ WalManager::~WalManager() { LOG(INFO) << "WalManager is destoried"; } +bool WalManager::is_running() { + return !_stop.load(); +} + void WalManager::stop() { if (!this->_stop.load()) { this->_stop.store(true); - stop_relay_wal(); + _stop_relay_wal(); _stop_background_threads_latch.count_down(); if (_replay_thread) { _replay_thread->join(); @@ -83,10 +77,10 @@ Status WalManager::init() { RETURN_IF_ERROR(_init_wal_dirs()); RETURN_IF_ERROR(_init_wal_dirs_info()); for (auto wal_dir : _wal_dirs) { - RETURN_IF_ERROR(scan_wals(wal_dir)); + RETURN_IF_ERROR(_scan_wals(wal_dir)); } return Thread::create( - "WalMgr", "replay_wal", [this]() { static_cast(this->replay()); }, + "WalMgr", "replay_wal", [this]() { static_cast(this->_replay()); }, &_replay_thread); } @@ -276,6 +270,7 @@ Status WalManager::create_wal_reader(const std::string& wal_path, Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr& wal_writer) { std::string wal_path; RETURN_IF_ERROR(get_wal_path(wal_id, wal_path)); + // TODO move the create_dir into wal_writer::init std::vector path_element; doris::vectorized::WalReader::string_split(wal_path, "/", path_element); std::stringstream ss; @@ -292,13 +287,14 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr& wal_writer = std::make_shared(wal_path); RETURN_IF_ERROR(wal_writer->init()); { + // TODO no use, should remove it std::lock_guard wrlock(_wal_lock); _wal_id_to_writer_map.emplace(wal_id, wal_writer); } return Status::OK(); } -Status WalManager::scan_wals(const std::string& wal_path) { +Status WalManager::_scan_wals(const std::string& wal_path) { size_t count = 0; bool exists = true; std::vector dbs; @@ -360,7 +356,7 @@ Status WalManager::scan_wals(const std::string& wal_path) { return Status::OK(); } -Status WalManager::replay() { +Status WalManager::_replay() { do { if (_stop.load()) { break; @@ -372,7 +368,7 @@ Status WalManager::replay() { } std::vector replay_tables; { - std::lock_guard wrlock(_lock); + std::lock_guard wrlock(_table_lock); auto it = _table_map.begin(); while (it != _table_map.end()) { if (it->second->size() > 0) { @@ -396,7 +392,7 @@ Status WalManager::replay() { Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal) { - std::lock_guard wrlock(_lock); + std::lock_guard wrlock(_table_lock); std::shared_ptr table_ptr; auto it = _table_map.find(table_id); if (it == _table_map.end()) { @@ -414,7 +410,7 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_ } size_t WalManager::get_wal_table_size(int64_t table_id) { - std::shared_lock rdlock(_lock); + std::shared_lock rdlock(_table_lock); auto it = _table_map.find(table_id); if (it != _table_map.end()) { return it->second->size(); @@ -440,12 +436,8 @@ Status WalManager::delete_wal(int64_t wal_id, size_t block_queue_pre_allocated) return Status::OK(); } -bool WalManager::is_running() { - return !_stop.load(); -} - -void WalManager::stop_relay_wal() { - std::lock_guard wrlock(_lock); +void WalManager::_stop_relay_wal() { + std::lock_guard wrlock(_table_lock); for (auto it = _table_map.begin(); it != _table_map.end(); it++) { it->second->stop(); } diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal/wal_manager.h similarity index 85% rename from be/src/olap/wal_manager.h rename to be/src/olap/wal/wal_manager.h index 4b42beaf45..f4e28445f8 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal/wal_manager.h @@ -33,10 +33,10 @@ #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/HeartbeatService_types.h" #include "gutil/ref_counted.h" -#include "olap/wal_dirs_info.h" -#include "olap/wal_reader.h" -#include "olap/wal_table.h" -#include "olap/wal_writer.h" +#include "olap/wal/wal_dirs_info.h" +#include "olap/wal/wal_reader.h" +#include "olap/wal/wal_table.h" +#include "olap/wal/wal_writer.h" #include "runtime/exec_env.h" #include "runtime/stream_load/stream_load_context.h" #include "util/thread.h" @@ -49,39 +49,18 @@ class WalManager { public: enum WalStatus { PREPARE = 0, - REPLAY, - CREATE, + REPLAY = 1, + CREATE = 2, }; public: WalManager(ExecEnv* exec_env, const std::string& wal_dir); ~WalManager(); - // used for wal - Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0); Status init(); - Status scan_wals(const std::string& wal_path); - Status replay(); - Status create_wal_reader(const std::string& wal_path, std::shared_ptr& wal_reader); - Status create_wal_writer(int64_t wal_id, std::shared_ptr& wal_writer); - Status scan(); - size_t get_wal_table_size(int64_t table_id); - Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal); - Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label, - std::string& base_path); - Status get_wal_path(int64_t wal_id, std::string& wal_path); - Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request, - PGetWalQueueSizeResponse* response); - void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status); - Status erase_wal_status_queue(int64_t table_id, int64_t wal_id); - void print_wal_status_queue(); - void stop(); bool is_running(); - void stop_relay_wal(); - void add_wal_column_index(int64_t wal_id, std::vector& column_index); - void erase_wal_column_index(int64_t wal_id); - Status get_wal_column_index(int64_t wal_id, std::vector& column_index); + void stop(); - // used for limit + // wal back pressure Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1); Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1); Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated, @@ -89,15 +68,43 @@ public: Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes); size_t get_max_available_size(); + // replay wal + Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label, + std::string& base_path); + Status get_wal_path(int64_t wal_id, std::string& wal_path); + Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0); + Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal); + // used for ut + size_t get_wal_table_size(int64_t table_id); + // TODO util function, should remove + Status create_wal_reader(const std::string& wal_path, std::shared_ptr& wal_reader); + Status create_wal_writer(int64_t wal_id, std::shared_ptr& wal_writer); + + // for wal status, can be removed + Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request, + PGetWalQueueSizeResponse* response); + void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status); + Status erase_wal_status_queue(int64_t table_id, int64_t wal_id); + void print_wal_status_queue(); + + // for _wal_column_id_map, can be removed + void add_wal_column_index(int64_t wal_id, std::vector& column_index); + void erase_wal_column_index(int64_t wal_id); + Status get_wal_column_index(int64_t wal_id, std::vector& column_index); + private: - // used for limit + // wal back pressure Status _init_wal_dirs_conf(); Status _init_wal_dirs(); Status _init_wal_dirs_info(); std::string _get_base_wal_path(const std::string& wal_path_str); - const std::string& _get_available_random_wal_dir(); Status _update_wal_dir_info_thread(); + // replay wal + Status _scan_wals(const std::string& wal_path); + Status _replay(); + void _stop_relay_wal(); + public: // used for be ut size_t wal_limit_test_bytes; @@ -105,24 +112,33 @@ public: const std::string tmp = "tmp"; private: - //used for wal ExecEnv* _exec_env = nullptr; - std::shared_mutex _lock; - scoped_refptr _replay_thread; - CountDownLatch _stop_background_threads_latch; - std::map> _table_map; - std::vector _wal_dirs; - std::shared_mutex _wal_lock; - std::shared_mutex _wal_status_lock; - std::unordered_map _wal_path_map; - std::unordered_map> _wal_id_to_writer_map; - std::unordered_map> _wal_status_queues; std::atomic _stop; - std::shared_mutex _wal_column_id_map_lock; - std::unordered_map&> _wal_column_id_map; - std::unique_ptr _thread_pool; - // used for limit + CountDownLatch _stop_background_threads_latch; + + // wal back pressure + std::vector _wal_dirs; scoped_refptr _update_wal_dirs_info_thread; std::unique_ptr _wal_dirs_info; + + // replay wal + scoped_refptr _replay_thread; + std::unique_ptr _thread_pool; + + std::shared_mutex _table_lock; + std::map> _table_map; + + std::shared_mutex _wal_lock; + std::unordered_map _wal_path_map; + // TODO no use? need remove it. And the map dose not clear + std::unordered_map> _wal_id_to_writer_map; + + // TODO Now only used for debug wal status, consider remove it + std::shared_mutex _wal_status_lock; + std::unordered_map> _wal_status_queues; + + // TODO should remove + std::shared_mutex _wal_column_id_map_lock; + std::unordered_map&> _wal_column_id_map; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp similarity index 99% rename from be/src/olap/wal_reader.cpp rename to be/src/olap/wal/wal_reader.cpp index c3a9b225dd..75bbaf1094 100644 --- a/be/src/olap/wal_reader.cpp +++ b/be/src/olap/wal/wal_reader.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "olap/wal_reader.h" +#include "olap/wal/wal_reader.h" #include "common/status.h" #include "io/fs/file_reader.h" diff --git a/be/src/olap/wal_reader.h b/be/src/olap/wal/wal_reader.h similarity index 100% rename from be/src/olap/wal_reader.h rename to be/src/olap/wal/wal_reader.h diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal/wal_table.cpp similarity index 98% rename from be/src/olap/wal_table.cpp rename to be/src/olap/wal/wal_table.cpp index a1c9a12f3e..f89794a458 100644 --- a/be/src/olap/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "olap/wal_table.h" +#include "olap/wal/wal_table.h" #include @@ -27,7 +27,7 @@ #include "http/utils.h" #include "io/fs/local_file_system.h" #include "io/fs/stream_load_pipe.h" -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include "runtime/client_cache.h" #include "runtime/fragment_mgr.h" #include "runtime/plan_fragment_executor.h" @@ -53,7 +53,8 @@ void WalTable::add_wal(int64_t wal_id, std::string wal) { auto wal_info = std::make_shared(wal_id, wal, 0, UnixMillis()); _replay_wal_map.emplace(wal, wal_info); } -void WalTable::pick_relay_wals() { + +void WalTable::_pick_relay_wals() { std::lock_guard lock(_replay_wal_lock); std::vector need_replay_wals; std::vector need_erase_wals; @@ -84,7 +85,7 @@ void WalTable::pick_relay_wals() { } } -Status WalTable::relay_wal_one_by_one() { +Status WalTable::_relay_wal_one_by_one() { std::vector> need_retry_wals; std::vector> need_delete_wals; while (!_replaying_queue.empty()) { @@ -124,6 +125,7 @@ Status WalTable::relay_wal_one_by_one() { } return Status::OK(); } + Status WalTable::replay_wals() { { std::lock_guard lock(_replay_wal_lock); @@ -138,8 +140,8 @@ Status WalTable::replay_wals() { } VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id << ", wal size=" << _replay_wal_map.size(); - pick_relay_wals(); - RETURN_IF_ERROR(relay_wal_one_by_one()); + _pick_relay_wals(); + RETURN_IF_ERROR(_relay_wal_one_by_one()); return Status::OK(); } @@ -184,6 +186,7 @@ Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) { TLoadTxnRollbackRequest request; request.__set_auth_code(0); // this is a fake, fe not check it now request.__set_db_id(db_id); + // TODO should we use label, because the replay wal use the same label and different wal_id request.__set_txnId(wal_id); std::string reason = "relay wal " + std::to_string(wal_id); request.__set_reason(reason); @@ -265,6 +268,7 @@ Status WalTable::_construct_sql_str(const std::string& wal, const std::string& l sql_str = ss.str().data(); return Status::OK(); } + Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label) { std::string sql_str; diff --git a/be/src/olap/wal_table.h b/be/src/olap/wal/wal_table.h similarity index 91% rename from be/src/olap/wal_table.h rename to be/src/olap/wal/wal_table.h index 251e8d51a6..66ee4fd372 100644 --- a/be/src/olap/wal_table.h +++ b/be/src/olap/wal/wal_table.h @@ -25,9 +25,10 @@ #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/HeartbeatService_types.h" #include "http/action/http_stream.h" -#include "olap/wal_info.h" +#include "olap/wal/wal_info.h" #include "runtime/exec_env.h" #include "runtime/stream_load/stream_load_context.h" + namespace doris { class WalTable { public: @@ -35,44 +36,46 @@ public: ~WalTable(); // used when be start and there are wals need to do recovery void add_wal(int64_t wal_id, std::string wal); - void pick_relay_wals(); - Status relay_wal_one_by_one(); Status replay_wals(); size_t size(); void stop(); -public: +private: // using ColumnInfo = std::pair; -private: + void _pick_relay_wals(); + bool _need_replay(std::shared_ptr); + Status _relay_wal_one_by_one(); + Status _delete_wal(int64_t wal_id); + Status _rename_to_tmp_path(const std::string wal); + + Status _replay_wal_internal(const std::string& wal); + // TODO change the param: (wal, int64_t* wal_id, std::string* label) Status _parse_wal_path(const std::string& wal, std::shared_ptr>&); - Status _rename_to_tmp_path(const std::string wal); - Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal, - const std::string& label); Status _try_abort_txn(int64_t db_id, int64_t wal_id); Status _get_column_info(int64_t db_id, int64_t tb_id); - Status _read_wal_header(const std::string& wal, std::string& columns); - bool _need_replay(std::shared_ptr); - Status _replay_wal_internal(const std::string& wal); - Status _delete_wal(int64_t wal_id); + + Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal, + const std::string& label); + Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label); Status _construct_sql_str(const std::string& wal, const std::string& label, std::string& sql_str, std::vector& index_vector); - Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label); + Status _read_wal_header(const std::string& wal, std::string& columns); private: ExecEnv* _exec_env; int64_t _db_id; int64_t _table_id; - std::string _relay = "relay"; - std::string _split = "_"; + // TODO the stop is not used? + std::atomic _stop; + std::shared_ptr _http_stream_action; mutable std::mutex _replay_wal_lock; - // key is wal_id + // key is wal_path std::map> _replay_wal_map; std::list> _replaying_queue; - std::atomic _stop; + // TODO should not use this map std::map> _column_id_info_map; - std::shared_ptr _http_stream_action; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp similarity index 96% rename from be/src/olap/wal_writer.cpp rename to be/src/olap/wal/wal_writer.cpp index beae6ec80b..25f7cc0870 100644 --- a/be/src/olap/wal_writer.cpp +++ b/be/src/olap/wal/wal_writer.cpp @@ -15,18 +15,14 @@ // specific language governing permissions and limitations // under the License. -#include "olap/wal_writer.h" - -#include -#include -#include +#include "olap/wal/wal_writer.h" #include "common/config.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "io/fs/path.h" #include "olap/storage_engine.h" -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include "util/crc32c.h" namespace doris { diff --git a/be/src/olap/wal_writer.h b/be/src/olap/wal/wal_writer.h similarity index 91% rename from be/src/olap/wal_writer.h rename to be/src/olap/wal/wal_writer.h index ea8bba4f02..08d2a4eb71 100644 --- a/be/src/olap/wal_writer.h +++ b/be/src/olap/wal/wal_writer.h @@ -17,11 +17,6 @@ #pragma once -#include -#include -#include -#include - #include "common/status.h" #include "gen_cpp/internal_service.pb.h" #include "io/fs/file_reader_writer_fwd.h" @@ -51,7 +46,6 @@ public: static const int64_t VERSION_SIZE = 4; private: - static constexpr size_t MAX_WAL_WRITE_WAIT_TIME = 1000; std::string _file_name; io::FileWriterPtr _file_writer; }; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 2a1233c9b5..eae038aaa4 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -49,7 +49,7 @@ #include "olap/segment_loader.h" #include "olap/storage_engine.h" #include "olap/tablet_schema_cache.h" -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/block_spill_manager.h" diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index b965efadd7..69f150b6c6 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -20,24 +20,11 @@ #include #include -#include -#include -#include -#include -#include -#include -#include -#include - #include "client_cache.h" #include "common/config.h" -#include "common/status.h" -#include "olap/wal_manager.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" -#include "runtime/runtime_state.h" #include "util/thrift_rpc_helper.h" -#include "vec/core/block.h" namespace doris { diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 03c917f8fc..bf89aa2aa5 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -29,7 +29,7 @@ #include #include "common/status.h" -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include "runtime/exec_env.h" #include "util/threadpool.h" #include "vec/core/block.h" diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index ba2686c0fd..a92775a6f6 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -83,7 +83,7 @@ #include "olap/tablet_schema.h" #include "olap/txn_manager.h" #include "olap/utils.h" -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include "runtime/buffer_control_block.h" #include "runtime/cache/result_cache.h" #include "runtime/define_primitive_type.h" diff --git a/be/src/vec/exec/format/arrow/arrow_pip_input_stream.cpp b/be/src/vec/exec/format/arrow/arrow_pip_input_stream.cpp index 1c9edb84a4..7bc32c7ab4 100644 --- a/be/src/vec/exec/format/arrow/arrow_pip_input_stream.cpp +++ b/be/src/vec/exec/format/arrow/arrow_pip_input_stream.cpp @@ -27,7 +27,6 @@ #include "arrow/result.h" #include "common/logging.h" #include "io/fs/stream_load_pipe.h" -#include "olap/wal_manager.h" #include "runtime/runtime_state.h" namespace doris::vectorized { diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp index 33ac46a827..d1a1a9d35c 100644 --- a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp +++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp @@ -27,7 +27,6 @@ #include "arrow_pip_input_stream.h" #include "common/logging.h" #include "io/fs/stream_load_pipe.h" -#include "olap/wal_manager.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "vec/utils/arrow_column_to_doris_column.h" diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp index 32fd66cc30..7344b4cbd9 100644 --- a/be/src/vec/exec/format/wal/wal_reader.cpp +++ b/be/src/vec/exec/format/wal/wal_reader.cpp @@ -18,23 +18,27 @@ #include "wal_reader.h" #include "common/logging.h" -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include "runtime/runtime_state.h" #include "vec/data_types/data_type_string.h" + namespace doris::vectorized { WalReader::WalReader(RuntimeState* state) : _state(state) { _wal_id = state->wal_id(); } + WalReader::~WalReader() { if (_wal_reader.get() != nullptr) { static_cast(_wal_reader->finalize()); } } + Status WalReader::init_reader() { RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path)); RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_reader(_wal_path, _wal_reader)); return Status::OK(); } + Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { //read src block PBlock pblock; diff --git a/be/src/vec/exec/format/wal/wal_reader.h b/be/src/vec/exec/format/wal/wal_reader.h index ed55d6e166..78f2c31d92 100644 --- a/be/src/vec/exec/format/wal/wal_reader.h +++ b/be/src/vec/exec/format/wal/wal_reader.h @@ -16,8 +16,9 @@ // under the License. #pragma once -#include "olap/wal_reader.h" +#include "olap/wal/wal_reader.h" #include "vec/exec/format/generic_reader.h" + namespace doris { namespace vectorized { struct ScannerCounter; @@ -29,16 +30,16 @@ public: Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override; + // TODO move it static void string_split(const std::string& str, const std::string& splits, std::vector& res); - std::vector get_index() { return _column_index; } private: RuntimeState* _state = nullptr; - std::string _wal_path; - std::string _path_split = "/"; int64_t _wal_id; + std::string _wal_path; std::shared_ptr _wal_reader; + // TODO version should in olap/wal_reader uint32_t _version = 0; std::string _col_ids; std::vector _column_index; diff --git a/be/src/vec/sink/volap_table_sink.h b/be/src/vec/sink/volap_table_sink.h index 68315eb5a9..add285cdfd 100644 --- a/be/src/vec/sink/volap_table_sink.h +++ b/be/src/vec/sink/volap_table_sink.h @@ -57,7 +57,6 @@ #include "exec/data_sink.h" #include "exec/tablet_info.h" #include "gutil/ref_counted.h" -#include "olap/wal_writer.h" #include "runtime/decimalv2_value.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 40b1c544e2..89519ad6ca 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -47,7 +47,6 @@ #include #include "common/config.h" -#include "olap/wal_manager.h" #include "util/runtime_profile.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index 2ab37c340a..c697d373f1 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -19,25 +19,7 @@ #include -#include #include -#include -#include -#include - -#include "common/compiler_util.h" -#include "common/status.h" -#include "runtime/client_cache.h" -#include "runtime/descriptors.h" -#include "runtime/runtime_state.h" -#include "util/doris_metrics.h" -#include "util/network_util.h" -#include "util/proto_util.h" -#include "util/thrift_util.h" -#include "vec/common/assert_cast.h" -#include "vec/core/block.h" -#include "vec/sink/vtablet_block_convertor.h" -#include "vec/sink/vtablet_finder.h" namespace doris { namespace vectorized { diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h index 324409e9d4..6593837373 100644 --- a/be/src/vec/sink/writer/vwal_writer.h +++ b/be/src/vec/sink/writer/vwal_writer.h @@ -16,68 +16,14 @@ // under the License. #pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include -#include -#include -#include -#include #include -#include "common/config.h" #include "common/status.h" -#include "exec/data_sink.h" -#include "exec/tablet_info.h" -#include "gutil/ref_counted.h" -#include "olap/wal_manager.h" -#include "olap/wal_writer.h" -#include "runtime/decimalv2_value.h" -#include "runtime/exec_env.h" -#include "runtime/memory/mem_tracker.h" -#include "runtime/thread_context.h" -#include "runtime/types.h" -#include "util/countdown_latch.h" -#include "util/ref_count_closure.h" -#include "util/runtime_profile.h" -#include "util/spinlock.h" -#include "util/stopwatch.hpp" -#include "vec/columns/column.h" -#include "vec/common/allocator.h" +#include "olap/wal/wal_manager.h" +#include "olap/wal/wal_writer.h" #include "vec/core/block.h" -#include "vec/data_types/data_type.h" -#include "vec/exprs/vexpr_fwd.h" -#include "vec/runtime/vfile_format_transformer.h" -#include "vec/sink/vrow_distribution.h" -#include "vec/sink/vtablet_block_convertor.h" -#include "vec/sink/vtablet_finder.h" -#include "vec/sink/writer/async_result_writer.h" + namespace doris { namespace vectorized { @@ -94,6 +40,7 @@ public: private: int64_t _tb_id; int64_t _wal_id; + // TODO version should in olap/wal_writer uint32_t _version = 0; std::string _label; WalManager* _wal_manager; diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index 74dcf9f1f3..881154e857 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -35,7 +35,7 @@ #include "http/http_headers.h" #include "http/http_request.h" #include "http/utils.h" -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include "runtime/exec_env.h" namespace doris { diff --git a/be/test/olap/wal_manager_test.cpp b/be/test/olap/wal_manager_test.cpp index c5c216f12a..d5db9bc750 100644 --- a/be/test/olap/wal_manager_test.cpp +++ b/be/test/olap/wal_manager_test.cpp @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include diff --git a/be/test/olap/wal_reader_writer_test.cpp b/be/test/olap/wal_reader_writer_test.cpp index d24db72868..47a93fa7f2 100644 --- a/be/test/olap/wal_reader_writer_test.cpp +++ b/be/test/olap/wal_reader_writer_test.cpp @@ -24,8 +24,8 @@ #include "gen_cpp/internal_service.pb.h" #include "gmock/gmock.h" #include "io/fs/local_file_system.h" -#include "olap/wal_reader.h" -#include "olap/wal_writer.h" +#include "olap/wal/wal_reader.h" +#include "olap/wal/wal_writer.h" #include "runtime/exec_env.h" #include "service/brpc.h" #include "testutil/test_util.h" diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp index f0b8e8c964..51ad8660ea 100644 --- a/be/test/vec/exec/vtablet_sink_test.cpp +++ b/be/test/vec/exec/vtablet_sink_test.cpp @@ -33,7 +33,7 @@ #include "gtest/gtest_pred_impl.h" #include "io/fs/local_file_system.h" #include "olap/olap_define.h" -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptor_helper.h" diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index ac066aceb9..2d786679d4 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -23,7 +23,7 @@ #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" #include "io/fs/local_file_system.h" -#include "olap/wal_manager.h" +#include "olap/wal/wal_manager.h" #include "runtime/descriptors.h" #include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 6593142630..8b9f6b1833 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -107,6 +107,7 @@ public class GroupCommitPlanner { StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest); StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask); // Will using load id as query id in fragment + // TODO support pipeline TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); for (Map.Entry> entry : tRequest.params.per_node_scan_ranges.entrySet()) { for (TScanRangeParams scanRangeParams : entry.getValue()) { @@ -116,6 +117,7 @@ public class GroupCommitPlanner { TFileCompressType.PLAIN); } } + tRequest.query_options.setEnablePipelineEngine(false); List scanRangeParams = tRequest.params.per_node_scan_ranges.values().stream() .flatMap(Collection::stream).collect(Collectors.toList()); Preconditions.checkState(scanRangeParams.size() == 1);