[refactor](wal) move group commit load content length to runtime state (#29188)
This commit is contained in:
@ -498,33 +498,17 @@ Status LoadBlockQueue::close_wal() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool LoadBlockQueue::has_enough_wal_disk_space(
|
||||
const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const TUniqueId& load_id,
|
||||
bool is_blocks_contain_all_load_data) {
|
||||
size_t blocks_size = 0;
|
||||
for (auto block : blocks) {
|
||||
blocks_size += block->bytes();
|
||||
}
|
||||
size_t content_length = 0;
|
||||
Status st = ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id, &content_length);
|
||||
if (st.ok()) {
|
||||
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id));
|
||||
} else {
|
||||
return Status::InternalError("can not find load id.");
|
||||
}
|
||||
size_t pre_allocated = is_blocks_contain_all_load_data
|
||||
? blocks_size
|
||||
: (blocks_size > content_length ? blocks_size : content_length);
|
||||
bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) {
|
||||
auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
|
||||
size_t available_bytes = 0;
|
||||
{
|
||||
st = wal_mgr->get_wal_dir_available_size(wal_base_path, &available_bytes);
|
||||
Status st = wal_mgr->get_wal_dir_available_size(wal_base_path, &available_bytes);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "get wal disk available size filed!";
|
||||
}
|
||||
}
|
||||
if (pre_allocated < available_bytes) {
|
||||
st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, pre_allocated, true);
|
||||
Status st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, pre_allocated, true);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string();
|
||||
}
|
||||
@ -534,30 +518,4 @@ bool LoadBlockQueue::has_enough_wal_disk_space(
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t content_length) {
|
||||
std::unique_lock l(_load_info_lock);
|
||||
if (_load_id_to_content_length_map.find(load_id) == _load_id_to_content_length_map.end()) {
|
||||
_load_id_to_content_length_map.insert(std::make_pair(load_id, content_length));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t* content_length) {
|
||||
std::shared_lock l(_load_info_lock);
|
||||
if (_load_id_to_content_length_map.find(load_id) != _load_id_to_content_length_map.end()) {
|
||||
*content_length = _load_id_to_content_length_map[load_id];
|
||||
return Status::OK();
|
||||
}
|
||||
return Status::InternalError("can not find load id!");
|
||||
}
|
||||
|
||||
Status GroupCommitMgr::remove_load_info(TUniqueId load_id) {
|
||||
std::unique_lock l(_load_info_lock);
|
||||
if (_load_id_to_content_length_map.find(load_id) == _load_id_to_content_length_map.end()) {
|
||||
return Status::InternalError("can not remove load id!");
|
||||
}
|
||||
_load_id_to_content_length_map.erase(load_id);
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user