From 7a1bd6abb08041957dcbcb3f5c6b6e24ae0bdb7a Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Mon, 19 Feb 2024 21:45:02 +0800 Subject: [PATCH] [improvment](group_commit) Refector scan wal function (#30939) Co-authored-by: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> --- be/src/olap/wal/wal_manager.cpp | 149 ++++++++++++++++++------- be/src/olap/wal/wal_manager.h | 28 ++++- be/src/olap/wal/wal_table.cpp | 30 ++--- be/src/olap/wal/wal_table.h | 1 - be/test/olap/wal/wal_manager_test.cpp | 55 +++++---- be/test/vec/exec/vwal_scanner_test.cpp | 15 ++- 6 files changed, 183 insertions(+), 95 deletions(-) diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp index 38506046b9..06e1bd37d7 100644 --- a/be/src/olap/wal/wal_manager.cpp +++ b/be/src/olap/wal/wal_manager.cpp @@ -39,7 +39,10 @@ namespace doris { WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) - : _exec_env(exec_env), _stop(false), _stop_background_threads_latch(1) { + : _exec_env(exec_env), + _stop(false), + _stop_background_threads_latch(1), + _first_replay(true) { _wal_dirs = strings::Split(wal_dir_list, ";", strings::SkipWhitespace()); static_cast(ThreadPoolBuilder("GroupCommitReplayWalThreadPool") .set_min_threads(1) @@ -76,11 +79,8 @@ Status WalManager::init() { RETURN_IF_ERROR(_init_wal_dirs_conf()); RETURN_IF_ERROR(_init_wal_dirs()); RETURN_IF_ERROR(_init_wal_dirs_info()); - for (auto wal_dir : _wal_dirs) { - RETURN_IF_ERROR(_scan_wals(wal_dir)); - } return Thread::create( - "WalMgr", "replay_wal", [this]() { static_cast(this->_replay()); }, + "WalMgr", "replay_wal", [this]() { static_cast(this->_replay_background()); }, &_replay_thread); } @@ -209,6 +209,7 @@ Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_ base_path = _wal_dirs_info->get_available_random_wal_dir(); std::stringstream ss; ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" + << _wal_version << "_" << _exec_env->master_info()->backend_id << "_" << std::to_string(wal_id) << "_" << label; { std::lock_guard wrlock(_wal_path_lock); @@ -232,9 +233,70 @@ Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) { return Status::OK(); } -Status WalManager::_scan_wals(const std::string& wal_path) { - size_t count = 0; - bool exists = true; +Status WalManager::parse_wal_path(const std::string& file_name, int64_t& version, + int64_t& backend_id, int64_t& wal_id, std::string& label) { + try { + // find version + auto pos = file_name.find("_"); + version = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10); + // find be id + auto substring1 = file_name.substr(pos + 1); + pos = substring1.find("_"); + backend_id = std::strtoll(substring1.substr(0, pos).c_str(), NULL, 10); + // find wal id + auto substring2 = substring1.substr(pos + 1); + pos = substring2.find("_"); + wal_id = std::strtoll(substring2.substr(0, pos).c_str(), NULL, 10); + // find label + label = substring2.substr(pos + 1); + VLOG_DEBUG << "version:" << version << "backend_id:" << backend_id << ",wal_id:" << wal_id + << ",label:" << label; + } catch (const std::invalid_argument& e) { + return Status::InvalidArgument("Invalid format, {}", e.what()); + } + return Status::OK(); +} + +Status WalManager::_load_wals() { + std::vector wals; + for (auto wal_dir : _wal_dirs) { + WARN_IF_ERROR(_scan_wals(wal_dir, wals), fmt::format("fail to scan wal dir={}", wal_dir)); + } + for (const auto& wal : wals) { + bool exists = false; + WARN_IF_ERROR(io::global_local_filesystem()->exists(wal.wal_path, &exists), + fmt::format("fail to check exist on wal file={}", wal.wal_path)); + if (!exists) { + continue; + } + LOG(INFO) << "find wal: " << wal.wal_path; + { + std::lock_guard wrlock(_wal_path_lock); + auto it = _wal_path_map.find(wal.wal_id); + if (it != _wal_path_map.end()) { + LOG(INFO) << "wal_id " << wal.wal_id << " already in wal_path_map, skip it"; + continue; + } + _wal_path_map.emplace(wal.wal_id, wal.wal_path); + } + // this config is use for test p0 case in pipeline + if (config::group_commit_wait_replay_wal_finish) { + auto lock = std::make_shared(); + auto cv = std::make_shared(); + auto add_st = add_wal_cv_map(wal.wal_id, lock, cv); + if (!add_st.ok()) { + LOG(WARNING) << "fail to add wal_id " << wal.wal_id << " to wal_cv_map"; + continue; + } + } + WARN_IF_ERROR(add_recover_wal(wal.db_id, wal.tb_id, wal.wal_id, wal.wal_path), + fmt::format("Failed to add recover wal={}", wal.wal_path)); + } + return Status::OK(); +} + +Status WalManager::_scan_wals(const std::string& wal_path, std::vector& res) { + bool exists = false; std::vector dbs; Status st = io::global_local_filesystem()->list(wal_path, false, &dbs, &exists); if (!st.ok()) { @@ -249,7 +311,7 @@ Status WalManager::_scan_wals(const std::string& wal_path) { auto db_path = wal_path + "/" + database_id.file_name; st = io::global_local_filesystem()->list(db_path, false, &tables, &exists); if (!st.ok()) { - LOG(WARNING) << "failed to list files for wal_dir=" << db_path + LOG(WARNING) << "failed list files for wal_dir=" << db_path << ", st=" << st.to_string(); return st; } @@ -261,49 +323,48 @@ Status WalManager::_scan_wals(const std::string& wal_path) { auto table_path = db_path + "/" + table_id.file_name; st = io::global_local_filesystem()->list(table_path, false, &wals, &exists); if (!st.ok()) { - LOG(WARNING) << "failed to list files for wal_dir=" << table_path + LOG(WARNING) << "failed list files for wal_dir=" << table_path << ", st=" << st.to_string(); return st; } if (wals.empty()) { continue; } - std::vector res; - for (const auto& wal : wals) { - auto wal_file = table_path + "/" + wal.file_name; - res.emplace_back(wal_file); - { - std::lock_guard wrlock(_wal_path_lock); - auto pos = wal.file_name.find("_"); - try { - int64_t wal_id = - std::strtoll(wal.file_name.substr(0, pos).c_str(), NULL, 10); - _wal_path_map.emplace(wal_id, wal_file); - int64_t db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10); - int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10); - if (config::group_commit_wait_replay_wal_finish) { - std::shared_ptr lock = std::make_shared(); - std::shared_ptr cv = - std::make_shared(); - auto add_st = add_wal_cv_map(wal_id, lock, cv); - if (!add_st.ok()) { - LOG(WARNING) << "fail to add wal_id " << wal_id << " to wal_cv_map"; - } - } - RETURN_IF_ERROR(add_recover_wal(db_id, tb_id, wal_id, wal_file)); - } catch (const std::invalid_argument& e) { - return Status::InvalidArgument("Invalid format, {}", e.what()); - } - } + int64_t db_id = -1; + int64_t tb_id = -1; + try { + db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10); + tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10); + } catch (const std::invalid_argument& e) { + return Status::InvalidArgument("Invalid format, {}", e.what()); + } + for (const auto& wal : wals) { + int64_t version = -1; + int64_t backend_id = -1; + int64_t wal_id = -1; + std::string label = ""; + auto parse_st = parse_wal_path(wal.file_name, version, backend_id, wal_id, label); + if (!parse_st.ok()) { + LOG(WARNING) << "fail to parse file=" << wal.file_name + << ",st=" << parse_st.to_string(); + continue; + } + auto wal_file = table_path + "/" + wal.file_name; + struct ScanWalInfo scan_wal_info; + scan_wal_info.wal_path = wal_file; + scan_wal_info.db_id = db_id; + scan_wal_info.tb_id = tb_id; + scan_wal_info.wal_id = wal_id; + scan_wal_info.be_id = backend_id; + res.emplace_back(scan_wal_info); } - count += res.size(); } } - LOG(INFO) << "Finish list wal_dir=" << wal_path << ", wal count=" << count; + LOG(INFO) << "Finish list wal_dir=" << wal_path << ", wal count=" << res.size(); return Status::OK(); } -Status WalManager::_replay() { +Status WalManager::_replay_background() { do { if (_stop.load()) { break; @@ -313,6 +374,12 @@ Status WalManager::_replay() { _exec_env->master_info()->network_address.port == 0) { continue; } + // replay residual wal,only replay once + bool expected = true; + if (_first_replay.compare_exchange_strong(expected, false)) { + RETURN_IF_ERROR(_load_wals()); + } + // replay wal of current process std::vector replay_tables; { std::lock_guard wrlock(_table_lock); @@ -546,4 +613,4 @@ Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, i return Status::OK(); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h index 8e86f22a59..3710b9e31c 100644 --- a/be/src/olap/wal/wal_manager.h +++ b/be/src/olap/wal/wal_manager.h @@ -46,6 +46,13 @@ namespace doris { class WalManager { ENABLE_FACTORY_CREATOR(WalManager); + struct ScanWalInfo { + std::string wal_path; + int64_t db_id; + int64_t tb_id; + int64_t wal_id; + int64_t be_id; + }; public: WalManager(ExecEnv* exec_env, const std::string& wal_dir); @@ -74,6 +81,13 @@ public: void add_wal_queue(int64_t table_id, int64_t wal_id); void erase_wal_queue(int64_t table_id, int64_t wal_id); size_t get_wal_queue_size(int64_t table_id); + // filename format:a_b_c_group_commit_xxx + // a:version + // b:be id + // c:wal id + // group_commit_xxx:label + static Status parse_wal_path(const std::string& file_name, int64_t& version, + int64_t& backend_id, int64_t& wal_id, std::string& label); // fot ut size_t get_wal_table_size(int64_t table_id); @@ -94,9 +108,12 @@ private: Status _init_wal_dirs_info(); Status _update_wal_dir_info_thread(); - // replay wal - Status _scan_wals(const std::string& wal_path); - Status _replay(); + // scan all wal files under storage path + Status _scan_wals(const std::string& wal_path, std::vector& res); + // use a background thread to do replay task + Status _replay_background(); + // load residual wals + Status _load_wals(); void _stop_relay_wal(); public: @@ -127,6 +144,9 @@ private: std::shared_mutex _wal_queue_lock; std::unordered_map> _wal_queues; + int64_t _wal_version = 0; + std::atomic _first_replay; + // for test relay // using WalCvInfo = @@ -134,4 +154,4 @@ private: std::shared_mutex _wal_cv_lock; std::unordered_map _wal_cv_map; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index cb5f53508e..93cd12765f 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -91,13 +91,7 @@ void WalTable::_pick_relay_wals() { Status WalTable::_relay_wal_one_by_one() { std::vector> need_retry_wals; std::vector> need_delete_wals; - while (!_replaying_queue.empty()) { - std::shared_ptr wal_info = nullptr; - { - std::lock_guard lock(_replay_wal_lock); - wal_info = _replaying_queue.front(); - _replaying_queue.pop_front(); - } + for (auto wal_info : _replaying_queue) { wal_info->add_retry_num(); auto st = _replay_wal_internal(wal_info->get_wal_path()); if (!st.ok()) { @@ -117,6 +111,7 @@ Status WalTable::_relay_wal_one_by_one() { } { std::lock_guard lock(_replay_wal_lock); + _replaying_queue.clear(); for (auto retry_wal_info : need_retry_wals) { _replay_wal_map.emplace(retry_wal_info->get_wal_path(), retry_wal_info); } @@ -193,9 +188,13 @@ Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) { Status WalTable::_replay_wal_internal(const std::string& wal) { LOG(INFO) << "start replay wal=" << wal; - int64_t wal_id = 0; + int64_t version = -1; + int64_t backend_id = -1; + int64_t wal_id = -1; std::string label = ""; - RETURN_IF_ERROR(_parse_wal_path(wal, wal_id, label)); + io::Path wal_path = wal; + auto file_name = wal_path.filename().string(); + RETURN_IF_ERROR(WalManager::parse_wal_path(file_name, version, backend_id, wal_id, label)); #ifndef BE_TEST if (!config::group_commit_wait_replay_wal_finish) { [[maybe_unused]] auto st = _try_abort_txn(_db_id, label); @@ -204,19 +203,6 @@ Status WalTable::_replay_wal_internal(const std::string& wal) { return _replay_one_txn_with_stremaload(wal_id, wal, label); } -Status WalTable::_parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label) { - io::Path wal_path = wal; - auto file_name = wal_path.filename().string(); - auto pos = file_name.find("_"); - try { - wal_id = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10); - label = file_name.substr(pos + 1); - } catch (const std::invalid_argument& e) { - return Status::InvalidArgument("Invalid format, {}", e.what()); - } - return Status::OK(); -} - Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label, std::string& sql_str) { std::string columns; diff --git a/be/src/olap/wal/wal_table.h b/be/src/olap/wal/wal_table.h index 9b1ead87a2..f6ed3d865b 100644 --- a/be/src/olap/wal/wal_table.h +++ b/be/src/olap/wal/wal_table.h @@ -46,7 +46,6 @@ private: Status _relay_wal_one_by_one(); Status _replay_wal_internal(const std::string& wal); - Status _parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label); Status _try_abort_txn(int64_t db_id, std::string& label); Status _get_column_info(int64_t db_id, int64_t tb_id, std::map& column_info_map); diff --git a/be/test/olap/wal/wal_manager_test.cpp b/be/test/olap/wal/wal_manager_test.cpp index b0e3f9f1dc..d264cb621c 100644 --- a/be/test/olap/wal/wal_manager_test.cpp +++ b/be/test/olap/wal/wal_manager_test.cpp @@ -59,6 +59,7 @@ public: _env->_master_info = new TMasterInfo(); _env->_master_info->network_address.hostname = "host name"; _env->_master_info->network_address.port = 1234; + _env->_master_info->backend_id = 1001; _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); _env->_internal_client_cache = new BrpcClientCache(); _env->_function_client_cache = new BrpcClientCache(); @@ -71,6 +72,8 @@ public: Status st = io::global_local_filesystem()->delete_directory(wal_dir); if (!st.ok()) { LOG(WARNING) << "fail to delete " << wal_dir.string(); + } else { + LOG(INFO) << "delete " << wal_dir.string(); } SAFE_STOP(_env->_wal_manager); SAFE_DELETE(_env->_function_client_cache); @@ -81,7 +84,7 @@ public: void prepare() { Status st = io::global_local_filesystem()->create_directory(wal_dir); if (!st.ok()) { - LOG(WARNING) << "create dir " << wal_dir.string(); + LOG(WARNING) << "fail to create dir " << wal_dir.string(); } } @@ -104,47 +107,53 @@ TEST_F(WalManagerTest, recovery_normal) { std::string db_id = "1"; int64_t tb_1_id = 1; - std::string wal_100_id = "100"; - std::string wal_101_id = "101"; + std::string wal_file_1 = "0_1001_1_group_commit_label1"; + std::string wal_file_2 = "0_1001_2_group_commit_label2"; int64_t tb_2_id = 2; - std::string wal_200_id = "200"; - std::string wal_201_id = "201"; + std::string wal_file_3 = "0_1001_3_group_commit_label3"; + std::string wal_file_4 = "0_1001_4_group_commit_label4"; bool res = std::filesystem::create_directory(wal_dir.string() + "/" + db_id); ASSERT_TRUE(res); res = std::filesystem::create_directory(wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_1_id)); ASSERT_TRUE(res); - std::string wal_100 = - wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_1_id) + "/" + wal_100_id; - std::string wal_101 = - wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_1_id) + "/" + wal_101_id; - createWal(wal_100); - createWal(wal_101); + std::string wal_1 = + wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_1_id) + "/" + wal_file_1; + std::string wal_2 = + wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_1_id) + "/" + wal_file_2; + createWal(wal_1); + createWal(wal_2); res = std::filesystem::create_directory(wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_2_id)); ASSERT_TRUE(res); - std::string wal_200 = - wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_2_id) + "/" + wal_200_id; - std::string wal_201 = - wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_2_id) + "/" + wal_201_id; - createWal(wal_200); - createWal(wal_201); + std::string wal_3 = + wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_2_id) + "/" + wal_file_3; + std::string wal_4 = + wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_2_id) + "/" + wal_file_4; + createWal(wal_3); + createWal(wal_4); Status st = _env->wal_mgr()->init(); if (!st.ok()) { LOG(WARNING) << "fail to int wal manager "; } - while (_env->wal_mgr()->get_wal_table_size(tb_1_id) > 0 || - _env->wal_mgr()->get_wal_table_size(tb_2_id) > 0) { + auto count = 0; + while (std::filesystem::exists(wal_1) || std::filesystem::exists(wal_2) || + std::filesystem::exists(wal_3) || std::filesystem::exists(wal_4)) { + if (count > 30) { + LOG(WARNING) << "wait time out"; + break; + } sleep(1); + count++; continue; } - ASSERT_TRUE(!std::filesystem::exists(wal_100)); - ASSERT_TRUE(!std::filesystem::exists(wal_101)); - ASSERT_TRUE(!std::filesystem::exists(wal_200)); - ASSERT_TRUE(!std::filesystem::exists(wal_201)); + ASSERT_TRUE(!std::filesystem::exists(wal_1)); + ASSERT_TRUE(!std::filesystem::exists(wal_2)); + ASSERT_TRUE(!std::filesystem::exists(wal_3)); + ASSERT_TRUE(!std::filesystem::exists(wal_4)); } TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) { diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index b47d756345..8d4fe6ad75 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -60,6 +60,8 @@ private: int64_t db_id = 1; int64_t tb_id = 2; int64_t txn_id = 789; + int64_t version = 0; + int64_t backend_id = 1001; std::string label = "test"; TupleId _dst_tuple_id = 0; @@ -197,10 +199,6 @@ void VWalScannerTest::init() { init_desc_table(); static_cast(io::global_local_filesystem()->create_directory( wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id))); - std::string src = "./be/test/exec/test_data/wal_scanner/wal"; - std::string dst = wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id) + "/" + - std::to_string(txn_id) + "_" + label; - std::filesystem::copy(src, dst); // Node Id _tnode.node_id = 0; @@ -213,10 +211,19 @@ void VWalScannerTest::init() { _tnode.__isset.file_scan_node = true; _env = ExecEnv::GetInstance(); + _env->_master_info = new TMasterInfo(); + _env->_master_info->network_address.hostname = "host name"; + _env->_master_info->network_address.port = backend_id; + _env->_master_info->backend_id = 1001; _env->_wal_manager = WalManager::create_shared(_env, wal_dir); std::string base_path; auto st = _env->_wal_manager->_init_wal_dirs_info(); st = _env->_wal_manager->create_wal_path(db_id, tb_id, txn_id, label, base_path); + std::string src = "./be/test/exec/test_data/wal_scanner/wal"; + std::string dst = wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id) + "/" + + std::to_string(version) + "_" + std::to_string(backend_id) + "_" + + std::to_string(txn_id) + "_" + label; + std::filesystem::copy(src, dst); } TEST_F(VWalScannerTest, normal) {