If the tmp file is deleted, the flush task will not be retried; delete the tmp file as soon as possible;

This commit is contained in:
obdev 2024-08-26 07:26:48 +00:00 committed by ob-robot
parent 2509b30374
commit 9b2deedb61
6 changed files with 56 additions and 15 deletions

View File

@ -197,7 +197,9 @@ void ObTmpFileBatchFlushContext::try_update_prepare_finished_cnt(const ObTmpFile
if (OB_UNLIKELY(flush_seq_ctx_.prepare_finished_cnt_ >= flush_seq_ctx_.create_flush_task_cnt_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected flush_seq_ctx_", KPC(this));
} else if (ObTmpFileFlushTask::TFFT_WAIT == flush_task.get_state() || flush_task.get_data_length() == 0) {
} else if (ObTmpFileFlushTask::TFFT_WAIT == flush_task.get_state() ||
ObTmpFileFlushTask::TFFT_ABORT == flush_task.get_state() ||
flush_task.get_data_length() == 0) {
++flush_seq_ctx_.prepare_finished_cnt_;
recorded = true;
}

View File

@ -240,6 +240,7 @@ public:
TFFT_ASYNC_WRITE = 5,
TFFT_WAIT = 6,
TFFT_FINISH = 7,
TFFT_ABORT = 8,
};
public:
void destroy();

View File

@ -650,6 +650,27 @@ int ObTmpFileFlushManager::drive_flush_task_prepare_(ObTmpFileFlushTask &flush_t
return ret;
}
void ObTmpFileFlushManager::try_remove_unused_flush_info_(ObTmpFileFlushTask &flush_task)
{
int ret = OB_SUCCESS;
ObArray<ObTmpFileFlushInfo> &flush_infos = flush_task.get_flush_infos();
for (int64_t i = 0; OB_SUCC(ret) && i >= 0 && i < flush_infos.count(); ++i) {
ObTmpFileFlushInfo &flush_info = flush_infos.at(i);
ObSharedNothingTmpFile *file = flush_info.file_handle_.get();
if (OB_ISNULL(file)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "file is nullptr", KR(ret), K(flush_info));
} else if (file->is_deleting()) {
STORAGE_LOG(INFO, "the file is deleting, abort this flush info",
KR(ret), K(flush_info), K(flush_task));
flush_info.reset();
flush_infos.remove(i);
--i;
}
}
}
int ObTmpFileFlushManager::drive_flush_task_retry_(
ObTmpFileFlushTask &flush_task,
const FlushState state,
@ -664,7 +685,11 @@ int ObTmpFileFlushManager::drive_flush_task_retry_(
}
break;
case FlushState::TFFT_ASYNC_WRITE:
if (OB_FAIL(handle_async_write_(flush_task, next_state))) {
try_remove_unused_flush_info_(flush_task);
if (0 == flush_task.get_flush_infos().count()) {
STORAGE_LOG(INFO, "all flush info is aborted", KR(ret), K(flush_task));
next_state = FlushState::TFFT_ABORT;
} else if (OB_FAIL(handle_async_write_(flush_task, next_state))) {
STORAGE_LOG(WARN, "fail to handle flush task async write", KR(ret), K(flush_task));
}
break;
@ -710,7 +735,7 @@ int ObTmpFileFlushManager::retry(ObTmpFileFlushTask &flush_task)
} else if (OB_FAIL(advance_status_(flush_task, next_state))) {
STORAGE_LOG(WARN, "fail to advance status", KR(ret), K(state), K(next_state), K(flush_task));
}
} while (OB_SUCC(ret) && FlushState::TFFT_WAIT != next_state);
} while (OB_SUCC(ret) && FlushState::TFFT_WAIT != next_state && FlushState::TFFT_ABORT != next_state);
if (!flush_task.get_recorded_as_prepare_finished()) {
if (flush_task.get_flush_seq() != flush_ctx_.get_flush_sequence()) {

View File

@ -110,6 +110,7 @@ private:
int evict_pages_and_retry_insert_(ObTmpFileFlushTask &flush_task,
ObTmpFileFlushInfo &flush_info,
const int64_t logic_block_index);
void try_remove_unused_flush_info_(ObTmpFileFlushTask &flush_task);
DISALLOW_COPY_AND_ASSIGN(ObTmpFileFlushManager);
private:
bool is_inited_;

View File

@ -382,7 +382,10 @@ int ObTmpFileFlushTG::retry_task_()
}
// push task into wait_list_/retry_list_ according to task state, ignore error code
FlushState state = flush_task->get_state();
if (FlushState::TFFT_WAIT == state) {
if (FlushState::TFFT_ABORT == state) {
STORAGE_LOG(INFO, "free abort flush task", KPC(flush_task));
flush_task_finished_(flush_task);
} else if (FlushState::TFFT_WAIT == state) {
push_wait_list_(flush_task);
} else if (FlushState::TFFT_FILL_BLOCK_BUF < state) {
push_retry_list_(flush_task);
@ -455,29 +458,37 @@ int ObTmpFileFlushTG::check_flush_task_io_finished_()
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "flush task is nullptr", KR(ret));
} else {
bool is_flush_tree_page = flush_task->get_is_fast_flush_tree();
STORAGE_LOG(DEBUG, "flush task io complete", K(flushing_block_num_), KPC(flush_task));
// if the update fails, it will be retried during the next wakeup
if (OB_FAIL(flush_mgr_.update_file_meta_after_flush(*flush_task))) {
STORAGE_LOG(WARN, "fail to drive flush state machine", KR(ret), KPC(flush_task));
push_finished_list_(flush_task);
} else {
flush_mgr_.free_flush_task(flush_task);
ATOMIC_DEC(&flushing_block_num_);
fast_flush_meta_task_cnt_ -= is_flush_tree_page ? 1 : 0;
if (fast_flush_meta_task_cnt_ == 0) {
// reset is_fast_flush_meta_ flag to resume retry task and flush
is_fast_flush_meta_ = false;
} else if (OB_UNLIKELY(fast_flush_meta_task_cnt_ < 0)) {
STORAGE_LOG(ERROR, "fast_flush_meta_task_cnt_ is negative", KPC(this));
}
signal_io_finish();
flush_task_finished_(flush_task);
}
}
}
return ret;
}
void ObTmpFileFlushTG::flush_task_finished_(ObTmpFileFlushTask *flush_task)
{
int ret = OB_SUCCESS;
bool is_flush_tree_page = flush_task->get_is_fast_flush_tree();
flush_mgr_.free_flush_task(flush_task);
ATOMIC_DEC(&flushing_block_num_);
fast_flush_meta_task_cnt_ -= is_flush_tree_page ? 1 : 0;
if (fast_flush_meta_task_cnt_ == 0) {
// reset is_fast_flush_meta_ flag to resume retry task and flush
is_fast_flush_meta_ = false;
} else if (OB_UNLIKELY(fast_flush_meta_task_cnt_ < 0)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "fast_flush_meta_task_cnt_ is negative", KR(ret), KPC(this));
}
signal_io_finish();
}
int ObTmpFileFlushTG::push_wait_list_(ObTmpFileFlushTask *flush_task)
{
int ret = OB_SUCCESS;

View File

@ -74,6 +74,7 @@ private:
int pop_retry_list_(ObTmpFileFlushTask *&flush_task);
int push_finished_list_(ObTmpFileFlushTask *flush_task);
int pop_finished_list_(ObTmpFileFlushTask *&flush_task);
void flush_task_finished_(ObTmpFileFlushTask *flush_task);
private:
bool is_inited_;
RUNNING_MODE mode_;