[Enhancement](group commit) optimize pre allocated calculation (#30893)
This commit is contained in:
@ -401,8 +401,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
|
||||
if (status.ok() && st.ok() &&
|
||||
(result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
|
||||
if (!config::group_commit_wait_replay_wal_finish) {
|
||||
auto delete_st = _exec_env->wal_mgr()->delete_wal(
|
||||
table_id, txn_id, load_block_queue->block_queue_pre_allocated());
|
||||
auto delete_st = _exec_env->wal_mgr()->delete_wal(table_id, txn_id);
|
||||
if (!delete_st.ok()) {
|
||||
LOG(WARNING) << "fail to delete wal " << txn_id << ", st=" << delete_st.to_string();
|
||||
}
|
||||
@ -411,9 +410,6 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
|
||||
std::string wal_path;
|
||||
RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path));
|
||||
RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path));
|
||||
RETURN_IF_ERROR(_exec_env->wal_mgr()->update_wal_dir_pre_allocated(
|
||||
WalManager::get_base_wal_path(wal_path), 0,
|
||||
load_block_queue->block_queue_pre_allocated()));
|
||||
}
|
||||
std::stringstream ss;
|
||||
ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label
|
||||
@ -421,7 +417,6 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
|
||||
<< ", exec_plan_fragment status=" << status.to_string()
|
||||
<< ", commit/abort txn rpc status=" << st.to_string()
|
||||
<< ", commit/abort txn status=" << result_status.to_string()
|
||||
<< ", block queue pre allocated size is " << load_block_queue->block_queue_pre_allocated()
|
||||
<< ", wal space info:" << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
|
||||
if (state) {
|
||||
if (!state->get_error_log_file_path().empty()) {
|
||||
@ -539,7 +534,7 @@ Status LoadBlockQueue::close_wal() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) {
|
||||
bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) {
|
||||
DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", { return false; });
|
||||
auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
|
||||
size_t available_bytes = 0;
|
||||
@ -549,12 +544,12 @@ bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) {
|
||||
LOG(WARNING) << "get wal dir available size failed, st=" << st.to_string();
|
||||
}
|
||||
}
|
||||
if (pre_allocated < available_bytes) {
|
||||
Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, 0);
|
||||
if (estimated_wal_bytes < available_bytes) {
|
||||
Status st =
|
||||
wal_mgr->update_wal_dir_estimated_wal_bytes(_wal_base_path, estimated_wal_bytes, 0);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string();
|
||||
LOG(WARNING) << "update wal dir estimated_wal_bytes failed, reason: " << st.to_string();
|
||||
}
|
||||
_block_queue_pre_allocated.fetch_add(pre_allocated);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
||||
Reference in New Issue
Block a user