From b5012dc55aa9aa4421eb41eb390939fe46498738 Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Sun, 18 Feb 2024 10:16:07 +0800 Subject: [PATCH] [Enhancement](group commit) optimize pre allocated calculation (#30893) --- be/src/olap/wal/wal_dirs_info.cpp | 44 ++++++++++--------- be/src/olap/wal/wal_dirs_info.h | 21 +++++---- be/src/olap/wal/wal_manager.cpp | 14 +++--- be/src/olap/wal/wal_manager.h | 7 +-- be/src/runtime/group_commit_mgr.cpp | 17 +++---- be/src/runtime/group_commit_mgr.h | 4 +- be/src/vec/sink/group_commit_block_sink.cpp | 40 ++++++++++++++--- be/src/vec/sink/group_commit_block_sink.h | 4 +- be/test/olap/wal/wal_dirs_info_test.cpp | 25 ++++++----- ...ommit_async_wal_msg_fault_injection.groovy | 4 +- 10 files changed, 105 insertions(+), 75 deletions(-) diff --git a/be/src/olap/wal/wal_dirs_info.cpp b/be/src/olap/wal/wal_dirs_info.cpp index c1ca6fce2f..36af47d1b0 100644 --- a/be/src/olap/wal/wal_dirs_info.cpp +++ b/be/src/olap/wal/wal_dirs_info.cpp @@ -50,20 +50,21 @@ void WalDirInfo::set_used(size_t used) { _used = used; } -size_t WalDirInfo::get_pre_allocated() { +size_t WalDirInfo::get_estimated_wal_bytes() { std::shared_lock rlock(_lock); - return _pre_allocated; + return _estimated_wal_bytes; } -void WalDirInfo::set_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated) { +void WalDirInfo::set_estimated_wal_bytes(size_t increase_estimated_wal_bytes, + size_t decrease_estimated_wal_bytes) { std::unique_lock wlock(_lock); - _pre_allocated += increase_pre_allocated; - _pre_allocated -= decrease_pre_allocated; + _estimated_wal_bytes += increase_estimated_wal_bytes; + _estimated_wal_bytes -= decrease_estimated_wal_bytes; } size_t WalDirInfo::available() { std::unique_lock wlock(_lock); - int64_t available = _limit - _used - _pre_allocated; + int64_t available = _limit - _used - _estimated_wal_bytes; return available > 0 ? available : 0; } @@ -102,19 +103,20 @@ Status WalDirInfo::update_wal_dir_used(size_t used) { return Status::OK(); } -void WalDirInfo::update_wal_dir_pre_allocated(size_t increase_pre_allocated, - size_t decrease_pre_allocated) { - set_pre_allocated(increase_pre_allocated, decrease_pre_allocated); +void WalDirInfo::update_wal_dir_estimated_wal_bytes(size_t increase_estimated_wal_bytes, + size_t decrease_estimated_wal_bytes) { + set_estimated_wal_bytes(increase_estimated_wal_bytes, decrease_estimated_wal_bytes); } std::string WalDirInfo::get_wal_dir_info_string() { return "[" + _wal_dir + ": limit " + std::to_string(_limit) + " Bytes, used " + - std::to_string(_used) + " Bytes, pre allocated " + std::to_string(_pre_allocated) + - " Bytes, available " + std::to_string(available()) + "Bytes.]"; + std::to_string(_used) + " Bytes, estimated wal bytes " + + std::to_string(_estimated_wal_bytes) + " Bytes, available " + + std::to_string(available()) + " Bytes.]"; } Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, - size_t pre_allocated) { + size_t estimated_wal_bytes) { for (const auto& it : _wal_dirs_info_vec) { if (it->get_wal_dir() == wal_dir) { #ifdef BE_TEST @@ -125,7 +127,7 @@ Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, } std::unique_lock wlock(_lock); _wal_dirs_info_vec.emplace_back( - std::make_shared(wal_dir, limit, used, pre_allocated)); + std::make_shared(wal_dir, limit, used, estimated_wal_bytes)); return Status::OK(); } @@ -166,7 +168,7 @@ size_t WalDirsInfo::get_max_available_size() { std::string WalDirsInfo::get_wal_dirs_info_string() { std::string wal_dirs_info_string; for (const auto& wal_dir_info : _wal_dirs_info_vec) { - wal_dirs_info_string += wal_dir_info->get_wal_dir_info_string() + "\n"; + wal_dirs_info_string += wal_dir_info->get_wal_dir_info_string() + ";"; } return wal_dirs_info_string; } @@ -205,18 +207,18 @@ Status WalDirsInfo::update_all_wal_dir_used() { return Status::OK(); } -Status WalDirsInfo::update_wal_dir_pre_allocated(const std::string& wal_dir, - size_t increase_pre_allocated, - size_t decrease_pre_allocated) { +Status WalDirsInfo::update_wal_dir_estimated_wal_bytes(const std::string& wal_dir, + size_t increase_estimated_wal_bytes, + size_t decrease_estimated_wal_bytes) { for (const auto& wal_dir_info : _wal_dirs_info_vec) { if (wal_dir_info->get_wal_dir() == wal_dir) { - wal_dir_info->update_wal_dir_pre_allocated(increase_pre_allocated, - decrease_pre_allocated); + wal_dir_info->update_wal_dir_estimated_wal_bytes(increase_estimated_wal_bytes, + decrease_estimated_wal_bytes); return Status::OK(); } } - return Status::InternalError("Can not find wal dir {} when update wal dir pre allocated", - wal_dir); + return Status::InternalError( + "Can not find wal dir {} when update wal dir estimated wal bytes", wal_dir); } Status WalDirsInfo::get_wal_dir_available_size(const std::string& wal_dir, diff --git a/be/src/olap/wal/wal_dirs_info.h b/be/src/olap/wal/wal_dirs_info.h index cfd3473300..e6f732dc43 100644 --- a/be/src/olap/wal/wal_dirs_info.h +++ b/be/src/olap/wal/wal_dirs_info.h @@ -33,29 +33,31 @@ class WalDirInfo { ENABLE_FACTORY_CREATOR(WalDirInfo); public: - WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t estimated_wal_bytes) : _wal_dir(std::move(wal_dir)), _limit(limit), _used(used), - _pre_allocated(pre_allocated) {} + _estimated_wal_bytes(estimated_wal_bytes) {} const std::string& get_wal_dir() const; size_t get_limit(); size_t get_used(); - size_t get_pre_allocated(); + size_t get_estimated_wal_bytes(); void set_limit(size_t limit); void set_used(size_t used); - void set_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated); + void set_estimated_wal_bytes(size_t increase_estimated_wal_bytes, + size_t decrease_estimated_wal_bytes); size_t available(); Status update_wal_dir_limit(size_t limit = -1); Status update_wal_dir_used(size_t used = -1); - void update_wal_dir_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated); + void update_wal_dir_estimated_wal_bytes(size_t increase_estimated_wal_bytes, + size_t decrease_estimated_wal_bytes); std::string get_wal_dir_info_string(); private: std::string _wal_dir; size_t _limit; size_t _used; - size_t _pre_allocated; + size_t _estimated_wal_bytes; std::shared_mutex _lock; }; @@ -65,15 +67,16 @@ class WalDirsInfo { public: WalDirsInfo() = default; ~WalDirsInfo() = default; - Status add(const std::string& wal_dir, size_t limit, size_t used, size_t pre_allocated); + Status add(const std::string& wal_dir, size_t limit, size_t used, size_t estimated_wal_bytes); std::string get_available_random_wal_dir(); size_t get_max_available_size(); Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1); Status update_all_wal_dir_limit(); Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1); Status update_all_wal_dir_used(); - Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t increase_pre_allocated, - size_t decrease_pre_allocated); + Status update_wal_dir_estimated_wal_bytes(const std::string& wal_dir, + size_t increase_estimated_wal_bytes, + size_t decrease_estimated_wal_bytes); Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes); Status get_wal_dir_info(const std::string& wal_dir, std::shared_ptr& wal_dir_info); std::string get_wal_dirs_info_string(); diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp index 49fbbfa43b..38506046b9 100644 --- a/be/src/olap/wal/wal_manager.cpp +++ b/be/src/olap/wal/wal_manager.cpp @@ -392,11 +392,11 @@ Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t used) return _wal_dirs_info->update_wal_dir_used(wal_dir, used); } -Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir, - size_t increase_pre_allocated, - size_t decrease_pre_allocated) { - return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, increase_pre_allocated, - decrease_pre_allocated); +Status WalManager::update_wal_dir_estimated_wal_bytes(const std::string& wal_dir, + size_t increase_estimated_wal_bytes, + size_t decrease_estimated_wal_bytes) { + return _wal_dirs_info->update_wal_dir_estimated_wal_bytes(wal_dir, increase_estimated_wal_bytes, + decrease_estimated_wal_bytes); } Status WalManager::_update_wal_dir_info_thread() { @@ -488,7 +488,7 @@ Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr& return Status::OK(); } -Status WalManager::delete_wal(int64_t table_id, int64_t wal_id, size_t block_queue_pre_allocated) { +Status WalManager::delete_wal(int64_t table_id, int64_t wal_id) { std::string wal_path; { std::lock_guard wrlock(_wal_path_lock); @@ -505,8 +505,6 @@ Status WalManager::delete_wal(int64_t table_id, int64_t wal_id, size_t block_que } } erase_wal_queue(table_id, wal_id); - RETURN_IF_ERROR(update_wal_dir_pre_allocated(get_base_wal_path(wal_path), 0, - block_queue_pre_allocated)); return Status::OK(); } diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h index db75d99115..8e86f22a59 100644 --- a/be/src/olap/wal/wal_manager.h +++ b/be/src/olap/wal/wal_manager.h @@ -57,8 +57,9 @@ public: // wal back pressure Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1); Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1); - Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t increase_pre_allocated, - size_t decrease_pre_allocated); + Status update_wal_dir_estimated_wal_bytes(const std::string& wal_dir, + size_t increase_estimated_wal_bytes, + size_t decrease_estimated_wal_bytes); Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes); size_t get_max_available_size(); std::string get_wal_dirs_info_string(); @@ -67,7 +68,7 @@ public: Status create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label, std::string& base_path); Status get_wal_path(int64_t wal_id, std::string& wal_path); - Status delete_wal(int64_t table_id, int64_t wal_id, size_t block_queue_pre_allocated = 0); + Status delete_wal(int64_t table_id, int64_t wal_id); Status rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t wal_id); Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal); void add_wal_queue(int64_t table_id, int64_t wal_id); diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index b793efa6e7..7422890aa4 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -401,8 +401,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ if (status.ok() && st.ok() && (result_status.ok() || result_status.is())) { if (!config::group_commit_wait_replay_wal_finish) { - auto delete_st = _exec_env->wal_mgr()->delete_wal( - table_id, txn_id, load_block_queue->block_queue_pre_allocated()); + auto delete_st = _exec_env->wal_mgr()->delete_wal(table_id, txn_id); if (!delete_st.ok()) { LOG(WARNING) << "fail to delete wal " << txn_id << ", st=" << delete_st.to_string(); } @@ -411,9 +410,6 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ std::string wal_path; RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path)); RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path)); - RETURN_IF_ERROR(_exec_env->wal_mgr()->update_wal_dir_pre_allocated( - WalManager::get_base_wal_path(wal_path), 0, - load_block_queue->block_queue_pre_allocated())); } std::stringstream ss; ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label @@ -421,7 +417,6 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ << ", exec_plan_fragment status=" << status.to_string() << ", commit/abort txn rpc status=" << st.to_string() << ", commit/abort txn status=" << result_status.to_string() - << ", block queue pre allocated size is " << load_block_queue->block_queue_pre_allocated() << ", wal space info:" << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); if (state) { if (!state->get_error_log_file_path().empty()) { @@ -539,7 +534,7 @@ Status LoadBlockQueue::close_wal() { return Status::OK(); } -bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) { +bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) { DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", { return false; }); auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr(); size_t available_bytes = 0; @@ -549,12 +544,12 @@ bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) { LOG(WARNING) << "get wal dir available size failed, st=" << st.to_string(); } } - if (pre_allocated < available_bytes) { - Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, 0); + if (estimated_wal_bytes < available_bytes) { + Status st = + wal_mgr->update_wal_dir_estimated_wal_bytes(_wal_base_path, estimated_wal_bytes, 0); if (!st.ok()) { - LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string(); + LOG(WARNING) << "update wal dir estimated_wal_bytes failed, reason: " << st.to_string(); } - _block_queue_pre_allocated.fetch_add(pre_allocated); return true; } else { return false; diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 6467ca3856..e3b28be580 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -71,8 +71,7 @@ public: WalManager* wal_manager, std::vector& slot_desc, int be_exe_version); Status close_wal(); - bool has_enough_wal_disk_space(size_t pre_allocated); - size_t block_queue_pre_allocated() { return _block_queue_pre_allocated.load(); } + bool has_enough_wal_disk_space(size_t estimated_wal_bytes); UniqueId load_instance_id; std::string label; @@ -96,7 +95,6 @@ private: // wal std::string _wal_base_path; std::shared_ptr _v_wal_writer; - std::atomic_size_t _block_queue_pre_allocated = 0; // commit bool _need_commit = false; diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index aee7138eba..6ff9d4a142 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -23,6 +23,7 @@ #include #include "common/exception.h" +#include "common/status.h" #include "runtime/exec_env.h" #include "runtime/group_commit_mgr.h" #include "runtime/runtime_state.h" @@ -45,6 +46,7 @@ GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const RowDescriptor GroupCommitBlockSink::~GroupCommitBlockSink() { if (_load_block_queue) { + _remove_estimated_wal_bytes(); _load_block_queue->remove_load_id(_load_id); } } @@ -117,6 +119,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) { RETURN_IF_ERROR(_add_blocks(state, true)); } if (_load_block_queue) { + _remove_estimated_wal_bytes(); _load_block_queue->remove_load_id(_load_id); } // wait to wal @@ -240,16 +243,20 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* state, _db_id, _table_id, _base_schema_version, load_id, _load_block_queue, _state->be_exec_version())); if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) { - size_t pre_allocated = _pre_allocated(is_blocks_contain_all_load_data); - _group_commit_mode = _load_block_queue->has_enough_wal_disk_space(pre_allocated) - ? TGroupCommitMode::ASYNC_MODE - : TGroupCommitMode::SYNC_MODE; + size_t estimated_wal_bytes = + _calculate_estimated_wal_bytes(is_blocks_contain_all_load_data); + _group_commit_mode = + _load_block_queue->has_enough_wal_disk_space(estimated_wal_bytes) + ? TGroupCommitMode::ASYNC_MODE + : TGroupCommitMode::SYNC_MODE; if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) { LOG(INFO) << "Load id=" << print_id(_state->query_id()) << ", use group commit label=" << _load_block_queue->label << " will not write wal because wal disk space usage reach max " "limit. Detail info: " << _state->exec_env()->wal_mgr()->get_wal_dirs_info_string(); + } else { + _estimated_wal_bytes = estimated_wal_bytes; } } _state->set_import_label(_load_block_queue->label); @@ -266,6 +273,7 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* state, _blocks.clear(); DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg", { if (_load_block_queue) { + _remove_estimated_wal_bytes(); _load_block_queue->remove_load_id(_load_id); } if (ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for( @@ -281,7 +289,7 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* state, return Status::OK(); } -size_t GroupCommitBlockSink::_pre_allocated(bool is_blocks_contain_all_load_data) { +size_t GroupCommitBlockSink::_calculate_estimated_wal_bytes(bool is_blocks_contain_all_load_data) { size_t blocks_size = 0; for (auto block : _blocks) { blocks_size += block->bytes(); @@ -292,5 +300,27 @@ size_t GroupCommitBlockSink::_pre_allocated(bool is_blocks_contain_all_load_data : _state->content_length()); } +void GroupCommitBlockSink::_remove_estimated_wal_bytes() { + if (_estimated_wal_bytes == 0) { + return; + } else { + std::string wal_path; + Status st = ExecEnv::GetInstance()->wal_mgr()->get_wal_path(_load_block_queue->txn_id, + wal_path); + if (!st.ok()) { + LOG(WARNING) << "Failed to get wal path in remove estimated wal bytes, reason: " + << st.to_string(); + return; + } + st = ExecEnv::GetInstance()->wal_mgr()->update_wal_dir_estimated_wal_bytes( + WalManager::get_base_wal_path(wal_path), 0, _estimated_wal_bytes); + if (!st.ok()) { + LOG(WARNING) << "Failed to remove estimated wal bytes, reason: " << st.to_string(); + return; + } + _estimated_wal_bytes = 0; + } +}; + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/group_commit_block_sink.h b/be/src/vec/sink/group_commit_block_sink.h index cfe7727e3e..be31c1ef1f 100644 --- a/be/src/vec/sink/group_commit_block_sink.h +++ b/be/src/vec/sink/group_commit_block_sink.h @@ -48,7 +48,8 @@ public: private: Status _add_block(RuntimeState* state, std::shared_ptr block); Status _add_blocks(RuntimeState* state, bool is_blocks_contain_all_load_data); - size_t _pre_allocated(bool is_blocks_contain_all_load_data); + size_t _calculate_estimated_wal_bytes(bool is_blocks_contain_all_load_data); + void _remove_estimated_wal_bytes(); vectorized::VExprContextSPtrs _output_vexpr_ctxs; @@ -78,6 +79,7 @@ private: std::vector _partitions; Bitmap _filter_bitmap; bool _has_filtered_rows = false; + size_t _estimated_wal_bytes = 0; }; } // namespace vectorized diff --git a/be/test/olap/wal/wal_dirs_info_test.cpp b/be/test/olap/wal/wal_dirs_info_test.cpp index 7cb8ec5eef..04967350a9 100644 --- a/be/test/olap/wal/wal_dirs_info_test.cpp +++ b/be/test/olap/wal/wal_dirs_info_test.cpp @@ -28,20 +28,20 @@ public: WalDirsInfoTest() = default; ~WalDirsInfoTest() override = default; void SetUp() override { - // limit 1000 used 100 preallocated 200 available 700 + // limit 1000 used 100 estimated bytes in wal 200 available 700 Status st = wal_dirs_info.add(wal_dir_test_1, 0, 0, 0); EXPECT_EQ(st, Status::OK()); - // limit 1000 used 200 preallocated 300 available 500 + // limit 1000 used 200 estimated bytes in wal 300 available 500 st = wal_dirs_info.add(wal_dir_test_2, 0, 0, 0); EXPECT_EQ(st, Status::OK()); - // limit 1000 used 400 preallocated 500 available 100 + // limit 1000 used 400 estimated bytes in wal 500 available 100 st = wal_dirs_info.add(wal_dir_test_3, 0, 0, 0); EXPECT_EQ(st, Status::OK()); } void TearDown() override {} void set_and_check_success(std::string wal_dir, size_t limit, size_t used, - size_t pre_allocated) { + size_t estimated_wal_bytes) { Status st = wal_dirs_info.update_wal_dir_limit(wal_dir, limit); EXPECT_EQ(st, Status::OK()); std::shared_ptr wal_dir_info; @@ -57,31 +57,32 @@ public: EXPECT_EQ(st, Status::OK()); EXPECT_EQ(wal_dir_info->get_used(), used); - st = wal_dirs_info.update_wal_dir_pre_allocated(wal_dir, pre_allocated, 0); + st = wal_dirs_info.update_wal_dir_estimated_wal_bytes(wal_dir, estimated_wal_bytes, 0); EXPECT_EQ(st, Status::OK()); st = wal_dirs_info.get_wal_dir_info(wal_dir, wal_dir_info); EXPECT_NE(wal_dir_info, nullptr); EXPECT_EQ(st, Status::OK()); - EXPECT_EQ(wal_dir_info->get_pre_allocated(), pre_allocated); + EXPECT_EQ(wal_dir_info->get_estimated_wal_bytes(), estimated_wal_bytes); - st = wal_dirs_info.update_wal_dir_pre_allocated(wal_dir, 0, pre_allocated); + st = wal_dirs_info.update_wal_dir_estimated_wal_bytes(wal_dir, 0, estimated_wal_bytes); EXPECT_EQ(st, Status::OK()); st = wal_dirs_info.get_wal_dir_info(wal_dir, wal_dir_info); EXPECT_NE(wal_dir_info, nullptr); EXPECT_EQ(st, Status::OK()); - EXPECT_EQ(wal_dir_info->get_pre_allocated(), 0); + EXPECT_EQ(wal_dir_info->get_estimated_wal_bytes(), 0); - st = wal_dirs_info.update_wal_dir_pre_allocated(wal_dir, pre_allocated, 0); + st = wal_dirs_info.update_wal_dir_estimated_wal_bytes(wal_dir, estimated_wal_bytes, 0); EXPECT_EQ(st, Status::OK()); st = wal_dirs_info.get_wal_dir_info(wal_dir, wal_dir_info); EXPECT_NE(wal_dir_info, nullptr); EXPECT_EQ(st, Status::OK()); - EXPECT_EQ(wal_dir_info->get_pre_allocated(), pre_allocated); + EXPECT_EQ(wal_dir_info->get_estimated_wal_bytes(), estimated_wal_bytes); - EXPECT_EQ(wal_dir_info->available(), limit - used - pre_allocated); + EXPECT_EQ(wal_dir_info->available(), limit - used - estimated_wal_bytes); } - void set_and_check_fail(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) { + void set_and_check_fail(std::string wal_dir, size_t limit, size_t used, + size_t estimated_wal_bytes) { Status st = wal_dirs_info.update_wal_dir_limit(wal_dir, limit); EXPECT_EQ(st, Status::InternalError("")); } diff --git a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy index 0ad2d8b8f2..cdf537749c 100644 --- a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy @@ -49,7 +49,7 @@ suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") { assertFalse(true); } catch (Exception e) { logger.info(e.getMessage()) - assertTrue(e.getMessage().contains('pre allocated 0 Bytes')) + assertTrue(e.getMessage().contains('estimated wal bytes 0 Bytes')) exception = true; } finally { GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg") @@ -86,7 +86,7 @@ suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") { assertFalse(true); } catch (Exception e) { logger.info(e.getMessage()) - assertTrue(e.getMessage().contains('pre allocated 0 Bytes')) + assertTrue(e.getMessage().contains('estimated wal bytes 0 Bytes')) exception = true; } finally { GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")