Writing to the tmp file may quickly exit with an error code when OB_SERVER_OUTOF_DISK_SPACE reported;

This commit is contained in:
obdev 2024-08-27 11:49:44 +00:00 committed by ob-robot
parent ad224ebdb6
commit 2dca1d4437
7 changed files with 75 additions and 13 deletions

View File

@ -233,6 +233,7 @@ ObTmpFileFlushTask::ObTmpFileFlushTask()
: inst_handle_(),
kvpair_(nullptr),
block_handle_(),
write_block_ret_code_(OB_SUCCESS),
ret_code_(OB_SUCCESS),
data_length_(0),
block_index_(-1),
@ -254,6 +255,7 @@ void ObTmpFileFlushTask::destroy()
block_handle_.reset();
inst_handle_.reset();
kvpair_ = nullptr;
write_block_ret_code_ = OB_SUCCESS;
ret_code_ = OB_SUCCESS;
data_length_ = 0;
block_index_ = -1;
@ -298,6 +300,8 @@ int ObTmpFileFlushTask::write_one_block()
true/*update_to_max_time)*/))){ // update to max time to skip bad block inspect
LOG_WARN("failed to update write time", KR(ret), K(handle_));
}
atomic_set_write_block_ret_code(ret);
return ret;
}

View File

@ -256,6 +256,12 @@ public:
OB_INLINE char *get_data_buf() const { return block_handle_.value_ == nullptr ? nullptr : block_handle_.value_->get_buffer(); }
OB_INLINE void atomic_set_ret_code(int ret_code) { ATOMIC_SET(&ret_code_, ret_code); }
OB_INLINE int atomic_get_ret_code() const { return ATOMIC_LOAD(&ret_code_); }
OB_INLINE void atomic_set_write_block_ret_code(int write_block_ret_code) {
ATOMIC_SET(&write_block_ret_code_, write_block_ret_code);
}
OB_INLINE int atomic_get_write_block_ret_code() const {
return ATOMIC_LOAD(&write_block_ret_code_);
}
OB_INLINE void set_data_length(const int64_t len) { data_length_ = len; }
OB_INLINE int64_t get_data_length() const { return data_length_; }
OB_INLINE void set_block_index(const int64_t block_index) { block_index_ = block_index; }
@ -283,13 +289,14 @@ public:
return buffer != nullptr && get_data_buf() != nullptr &&
buffer >= get_data_buf() && buffer + length <= get_data_buf() + OB_SERVER_BLOCK_MGR.get_macro_block_size();
}
TO_STRING_KV(KP(this), KP(kvpair_), K(ret_code_), K(data_length_),
TO_STRING_KV(KP(this), KP(kvpair_), K(write_block_ret_code_), K(ret_code_), K(data_length_),
K(block_index_), K(flush_seq_), K(create_ts_), K(is_io_finished_),
K(fast_flush_tree_page_), K(recorded_as_prepare_finished_), K(task_state_), K(tmp_file_block_handle_), K(flush_infos_));
private:
ObKVCacheInstHandle inst_handle_;
ObKVCachePair *kvpair_;
ObTmpBlockValueHandle block_handle_;
int write_block_ret_code_;
int ret_code_;
int64_t data_length_; // data length (including padding to make length upper align to page size)
int64_t block_index_; // tmp file block logical index in ObTmpFileBlockManager

View File

@ -147,11 +147,15 @@ int ObTmpFilePageCacheController::invoke_swap_and_wait(int64_t expect_swap_size,
}
if (OB_NOT_NULL(swap_job)) {
if (OB_SUCCESS != swap_job->get_ret_code()) {
ret = swap_job->get_ret_code();
}
// reset swap job to set is_finished to false in case of failure to push into queue:
// otherwise job is not finished, but it will not be executed, so it will never become finished.
swap_job->reset();
if (OB_FAIL(free_swap_job_(swap_job))) {
STORAGE_LOG(ERROR, "fail to free swap job", KR(ret));
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(free_swap_job_(swap_job))) {
STORAGE_LOG(ERROR, "fail to free swap job", KR(ret), KR(tmp_ret));
}
}
return ret;

View File

@ -33,6 +33,7 @@ int ObTmpFileSwapJob::init(int64_t expect_swap_size, uint32_t timeout_ms)
STORAGE_LOG(WARN, "ObTmpFileSwapJob init cond failed", KR(ret));
} else {
is_inited_ = true;
ret_code_ = OB_SUCCESS;
is_finished_ = false;
expect_swap_size_ = expect_swap_size;
timeout_ms_ = timeout_ms;
@ -45,6 +46,7 @@ int ObTmpFileSwapJob::init(int64_t expect_swap_size, uint32_t timeout_ms)
void ObTmpFileSwapJob::reset()
{
is_inited_ = false;
ret_code_ = OB_SUCCESS;
is_finished_ = false;
expect_swap_size_ = 0;
timeout_ms_ = DEFAULT_TIMEOUT_MS;
@ -78,7 +80,7 @@ int ObTmpFileSwapJob::wait_swap_complete()
}
// set swap job is_finished, wake up threads that invoke swap job
int ObTmpFileSwapJob::signal_swap_complete()
int ObTmpFileSwapJob::signal_swap_complete(int ret_code)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -86,6 +88,7 @@ int ObTmpFileSwapJob::signal_swap_complete()
STORAGE_LOG(WARN, "ObTmpFileSwapJob not init", KR(ret));
} else {
ObThreadCondGuard guard(swap_cond_);
ATOMIC_SET(&ret_code_, ret_code);
ATOMIC_SET(&is_finished_, true);
if (OB_FAIL(swap_cond_.signal())) {
STORAGE_LOG(WARN, "ObTmpFileSwapJob signal swap complete failed", KR(ret));

View File

@ -29,6 +29,7 @@ public:
static const uint32_t DEFAULT_TIMEOUT_MS = 10 * 1000;
ObTmpFileSwapJob()
: is_inited_(false),
ret_code_(OB_SUCCESS),
is_finished_(false),
timeout_ms_(DEFAULT_TIMEOUT_MS),
create_ts_(0),
@ -39,16 +40,18 @@ public:
int init(int64_t expect_swap_size, uint32_t timeout_ms = DEFAULT_TIMEOUT_MS);
void reset();
int wait_swap_complete();
int signal_swap_complete();
int signal_swap_complete(int ret_code);
OB_INLINE int64_t get_create_ts() const { return create_ts_; }
OB_INLINE int64_t get_abs_timeout_ts() const { return abs_timeout_ts_; }
OB_INLINE int64_t get_expect_swap_size() const { return expect_swap_size_; }
OB_INLINE bool is_valid() { return ATOMIC_LOAD(&is_inited_) && swap_cond_.is_inited(); }
OB_INLINE bool is_finished() const { return ATOMIC_LOAD(&is_finished_); }
OB_INLINE bool is_inited() const { return ATOMIC_LOAD(&is_inited_); }
OB_INLINE int get_ret_code() const { return ATOMIC_LOAD(&ret_code_); }
TO_STRING_KV(KP(this), K(is_inited_), K(is_finished_), K(create_ts_), K(timeout_ms_), K(abs_timeout_ts_), K(expect_swap_size_));
private:
bool is_inited_;
int ret_code_;
bool is_finished_;
uint32_t timeout_ms_;
int64_t create_ts_;

View File

@ -31,6 +31,7 @@ ObTmpFileFlushTG::ObTmpFileFlushTG(
: is_inited_(false),
mode_(RUNNING_MODE::INVALID),
last_flush_timestamp_(0),
flush_io_finished_ret_(OB_SUCCESS),
flush_io_finished_round_(0),
flushing_block_num_(0),
is_fast_flush_meta_(false),
@ -62,6 +63,7 @@ int ObTmpFileFlushTG::init()
is_inited_ = true;
mode_ = RUNNING_MODE::NORMAL;
last_flush_timestamp_ = 0;
flush_io_finished_ret_ = OB_SUCCESS;
flush_io_finished_round_ = 0;
flushing_block_num_ = 0;
@ -84,6 +86,7 @@ void ObTmpFileFlushTG::destroy()
clean_up_lists();
mode_ = RUNNING_MODE::INVALID;
last_flush_timestamp_ = 0;
flush_io_finished_ret_ = OB_SUCCESS;
flush_io_finished_round_ = 0;
flushing_block_num_ = 0;
@ -168,8 +171,9 @@ void ObTmpFileFlushTG::notify_doing_flush()
last_flush_timestamp_ = 0;
}
void ObTmpFileFlushTG::signal_io_finish()
void ObTmpFileFlushTG::signal_io_finish(int flush_io_finished_ret)
{
flush_io_finished_ret_ = flush_io_finished_ret;
++flush_io_finished_round_;
}
@ -178,6 +182,11 @@ int64_t ObTmpFileFlushTG::get_flush_io_finished_round()
return flush_io_finished_round_;
}
int64_t ObTmpFileFlushTG::get_flush_io_finished_ret()
{
return flush_io_finished_ret_;
}
int64_t ObTmpFileFlushTG::cal_idle_time()
{
int64_t idle_time = 0;
@ -320,6 +329,10 @@ int ObTmpFileFlushTG::handle_generated_flush_tasks_(ObSpLinkQueue &flushing_list
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("fail to switch state after data copied from tmp file", KR(ret), KPC(flush_task));
} else if (FlushState::TFFT_FILL_BLOCK_BUF < state) {
if (FlushState::TFFT_ASYNC_WRITE == state &&
flush_task->atomic_get_write_block_ret_code() == OB_SERVER_OUTOF_DISK_SPACE) {
signal_io_finish(OB_SERVER_OUTOF_DISK_SPACE);
}
push_retry_list_(flush_task);
ATOMIC_INC(&flushing_block_num_);
STORAGE_LOG(DEBUG, "push flush task to retry list", KPC(flush_task));
@ -349,7 +362,7 @@ int ObTmpFileFlushTG::wash_(const int64_t expect_flush_size, const RUNNING_MODE
bool idle_loop = flushing_task_cnt == 0;
if (idle_loop && wbp_.get_dirty_page_percentage() < ObTmpFileFlushManager::FLUSH_WATERMARK_F5) {
signal_io_finish();
signal_io_finish(OB_SUCCESS);
}
if (RUNNING_MODE::NORMAL == mode) {
@ -388,6 +401,10 @@ int ObTmpFileFlushTG::retry_task_()
} else if (FlushState::TFFT_WAIT == state) {
push_wait_list_(flush_task);
} else if (FlushState::TFFT_FILL_BLOCK_BUF < state) {
if (FlushState::TFFT_ASYNC_WRITE == state &&
flush_task->atomic_get_write_block_ret_code() == OB_SERVER_OUTOF_DISK_SPACE) {
signal_io_finish(OB_SERVER_OUTOF_DISK_SPACE);
}
push_retry_list_(flush_task);
if (FlushState::TFFT_INSERT_META_TREE == state && OB_ALLOCATE_TMP_FILE_PAGE_FAILED == ret) {
STORAGE_LOG(WARN, "fail to retry insert meta item in TFFT_INSERT_META_TREE", KPC(flush_task));
@ -486,7 +503,7 @@ void ObTmpFileFlushTG::flush_task_finished_(ObTmpFileFlushTask *flush_task)
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "fast_flush_meta_task_cnt_ is negative", KR(ret), KPC(this));
}
signal_io_finish();
signal_io_finish(OB_SUCCESS);
}
int ObTmpFileFlushTG::push_wait_list_(ObTmpFileFlushTask *flush_task)
@ -786,7 +803,7 @@ void ObTmpFileSwapTG::clean_up_lists_()
ObTmpFileSwapJob *swap_job = nullptr;
if (OB_FAIL(swap_job_dequeue(swap_job))) {
STORAGE_LOG(WARN, "fail dequeue swap job or swap job is nullptr", KR(ret), KP(swap_job));
} else if (OB_FAIL(swap_job->signal_swap_complete())){
} else if (OB_FAIL(swap_job->signal_swap_complete(OB_SUCCESS))){
STORAGE_LOG(WARN, "fail to signal swap complete", KR(ret));
}
}
@ -796,7 +813,7 @@ void ObTmpFileSwapTG::clean_up_lists_()
ObTmpFileSwapJob *swap_job = nullptr;
if (OB_FAIL(pop_working_job_(swap_job))) {
STORAGE_LOG(WARN, "fail to pop working job or ptr is null", KR(ret), KP(swap_job));
} else if (OB_FAIL(swap_job->signal_swap_complete())){
} else if (OB_FAIL(swap_job->signal_swap_complete(OB_SUCCESS))){
STORAGE_LOG(WARN, "fail to signal swap complete", KR(ret));
}
}
@ -898,6 +915,10 @@ int ObTmpFileSwapTG::swap_fast_()
int64_t wakeup_job_cnt = 0;
wakeup_satisfied_jobs_(wakeup_job_cnt);
wakeup_timeout_jobs_();
int io_finished_ret = flush_tg_ref_.get_flush_io_finished_ret();
if (OB_SERVER_OUTOF_DISK_SPACE == io_finished_ret) {
wakeup_all_jobs_(OB_SERVER_OUTOF_DISK_SPACE);
}
// do flush if could not evict enough pages
if (OB_SUCC(ret) && !working_list_.is_empty() && wakeup_job_cnt < PROCCESS_JOB_NUM_PER_BATCH) {
@ -953,7 +974,7 @@ int ObTmpFileSwapTG::wakeup_satisfied_jobs_(int64_t& wakeup_job_cnt)
wbp_free_page_cnt -= min(wbp_free_page_cnt, single_job_swap_page_cnt);
int64_t response_time = ObTimeUtility::current_time() - swap_job->get_create_ts();
swap_monitor_.record_swap_response_time(response_time);
if (OB_FAIL(swap_job->signal_swap_complete())) {
if (OB_FAIL(swap_job->signal_swap_complete(OB_SUCCESS))) {
STORAGE_LOG(WARN, "fail to signal swap complete", KR(ret), KPC(swap_job));
} else {
++wakeup_job_cnt;
@ -974,7 +995,7 @@ int ObTmpFileSwapTG::wakeup_timeout_jobs_()
// timeout, wake it up
int64_t response_time = ObTimeUtility::current_time() - swap_job->get_create_ts();
swap_monitor_.record_swap_response_time(response_time);
if (OB_FAIL(swap_job->signal_swap_complete())) {
if (OB_FAIL(swap_job->signal_swap_complete(OB_TIMEOUT))) {
STORAGE_LOG(WARN, "fail to signal swap complete", KR(ret), KP(swap_job));
}
} else {
@ -986,6 +1007,23 @@ int ObTmpFileSwapTG::wakeup_timeout_jobs_()
return ret;
}
void ObTmpFileSwapTG::wakeup_all_jobs_(int ret_code)
{
int ret = OB_SUCCESS;
for (int64_t i = working_list_size_; OB_SUCC(ret) && i > 0 && !working_list_.is_empty(); --i) {
ObTmpFileSwapJob *swap_job = nullptr;
if (OB_FAIL(pop_working_job_(swap_job))) {
STORAGE_LOG(WARN, "fail to pop working job or ptr is null", KR(ret), KP(swap_job));
} else {
int64_t response_time = ObTimeUtility::current_time() - swap_job->get_create_ts();
swap_monitor_.record_swap_response_time(response_time);
if (OB_FAIL(swap_job->signal_swap_complete(ret_code))) {
STORAGE_LOG(WARN, "fail to signal swap complete", KR(ret), KP(swap_job));
}
}
}
}
int ObTmpFileSwapTG::push_working_job_(ObTmpFileSwapJob *swap_job)
{
int ret = OB_SUCCESS;

View File

@ -50,7 +50,8 @@ public:
int try_work();
void set_running_mode(const RUNNING_MODE mode);
void notify_doing_flush();
void signal_io_finish();
void signal_io_finish(int flush_io_finished_ret);
int64_t get_flush_io_finished_ret();
int64_t get_flush_io_finished_round();
int64_t cal_idle_time();
void clean_up_lists();
@ -79,6 +80,7 @@ private:
bool is_inited_;
RUNNING_MODE mode_;
int64_t last_flush_timestamp_;
int flush_io_finished_ret_;
int64_t flush_io_finished_round_;
int64_t flushing_block_num_; // maintain it when ObTmpFileFlushTask is created and freed
@ -133,6 +135,7 @@ private:
int calculate_swap_page_num_(const int64_t batch_size, int64_t &expect_swap_cnt);
int wakeup_satisfied_jobs_(int64_t& wakeup_job_cnt);
int wakeup_timeout_jobs_();
void wakeup_all_jobs_(int ret_code);
private:
bool is_inited_;
int tg_id_;