[BUG.FIX] fix coredump in tmp file.

This commit is contained in:
Tyshawn
2023-07-25 08:42:27 +00:00
committed by ob-robot
parent 0870baa73f
commit 17a96954c7
4 changed files with 139 additions and 91 deletions

View File

@ -655,7 +655,7 @@ void ObTmpFileMemTask::runTimerTask()
ObTmpTenantMemBlockManager::IOWaitInfo::IOWaitInfo(
ObMacroBlockHandle &block_handle, ObTmpMacroBlock &block, ObIAllocator &allocator)
: block_handle_(block_handle), block_(block), allocator_(allocator), ref_cnt_(0)
: block_handle_(block_handle), block_(block), allocator_(allocator), ref_cnt_(0), ret_code_(OB_SUCCESS)
{
}
@ -691,6 +691,8 @@ int ObTmpTenantMemBlockManager::IOWaitInfo::wait(int64_t timeout_ms)
while (OB_SUCC(ret) && block_.is_washing() && wait_ms > 0) {
if (OB_FAIL(cond_.wait(wait_ms))) {
STORAGE_LOG(WARN, "fail to wait block write condition", K(ret), K(wait_ms), K(block_.get_block_id()));
} else if (OB_FAIL(ret = ret_code_)) {
STORAGE_LOG(WARN, "fail to wait io info", K(ret), KPC(this));
} else if (block_.is_washing()) {
wait_ms = timeout_ms - (ObTimeUtility::fast_current_time() - begin_us) / 1000;
}
@ -713,9 +715,8 @@ int ObTmpTenantMemBlockManager::IOWaitInfo::exec_wait(int64_t io_timeout_ms)
STORAGE_LOG(ERROR, "lock io request condition failed", K(ret), K(block_.get_block_id()));
} else if (OB_FAIL(block_handle_.wait(io_timeout_ms))) {
STORAGE_LOG(WARN, "wait handle wait io failed", K(ret), K(block_.get_block_id()));
} else if (OB_SERVER_BLOCK_MGR.dec_ref(block_handle_.get_macro_id())) {
STORAGE_LOG(WARN, "dec block ref failed", K(ret), K(block_.get_block_id()));
}
reset_io();
return ret;
}
@ -738,7 +739,12 @@ ObTmpTenantMemBlockManager::IOWaitInfo::~IOWaitInfo()
void ObTmpTenantMemBlockManager::IOWaitInfo::destroy()
{
ret_code_ = OB_SUCCESS;
block_handle_.get_io_handle().reset();
if (0 != ATOMIC_LOAD(&ref_cnt_)) {
int ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "unexpected error, ref cnt isn't zero", K(ret), KPC(this));
}
}
void ObTmpTenantMemBlockManager::IOWaitInfo::reset_io()
@ -1128,7 +1134,7 @@ int ObTmpTenantMemBlockManager::cleanup()
const int64_t clean_nums = t_mblk_map_.size() - wash_threshold - ATOMIC_LOAD(&washing_count_);
if (clean_nums <= 0) {
STORAGE_LOG(DEBUG, "there is no need to wash blocks", K(ret), K(clean_nums));
} else if OB_FAIL(t_mblk_map_.foreach_refactored(op)) {
} else if (OB_FAIL(t_mblk_map_.foreach_refactored(op))) {
STORAGE_LOG(WARN, "choose blks failed", K(ret));
} else {
bool wash_success = false;
@ -1295,7 +1301,9 @@ int ObTmpTenantMemBlockManager::get_block_and_set_washing(int64_t block_id, ObTm
STORAGE_LOG(WARN, "the block write has not been finished", K(ret), K(*m_blk));
} else if (OB_FAIL(m_blk->check_and_set_status(
ObTmpMacroBlock::BlockStatus::MEMORY, ObTmpMacroBlock::BlockStatus::WASHING))) {
STORAGE_LOG(WARN, "check and set status failed", K(ret), K(*m_blk));
if (OB_STATE_NOT_MATCH != ret) {
STORAGE_LOG(WARN, "check and set status failed", K(ret), K(*m_blk));
}
}
return ret;
@ -1326,7 +1334,7 @@ int ObTmpTenantMemBlockManager::wash_block(const int64_t block_id, ObIOWaitInfoH
STORAGE_LOG(WARN, "fail to get wash io info", K(ret), K_(tenant_id), K(m_blk));
} else if (OB_FAIL(write_io(info, mb_handle))) {
STORAGE_LOG(WARN, "fail to write tmp block", K(ret), K_(tenant_id), K(info), K(*m_blk));
} else if(OB_ISNULL(buf = static_cast<char *>(allocator_->alloc(sizeof(IOWaitInfo))))) {
} else if(OB_ISNULL(buf = static_cast<char *>(allocator_->alloc(sizeof(IOWaitInfo))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc io wait info memory", K(ret), K_(tenant_id));
} else if (FALSE_IT(wait_info = new (buf) IOWaitInfo(mb_handle, *m_blk, *allocator_))) {
@ -1346,13 +1354,18 @@ int ObTmpTenantMemBlockManager::wash_block(const int64_t block_id, ObIOWaitInfoH
}
if (OB_FAIL(ret) && OB_NOT_NULL(m_blk)) {
mb_handle.reset();
if (OB_NOT_NULL(wait_info)) {
// don't release wait info unless ObIOWaitInfoHandle doesn't hold its ref
if (OB_NOT_NULL(wait_info) && OB_ISNULL(handle.get_wait_info())) {
wait_info->~IOWaitInfo();
allocator_->free(wait_info);
wait_info = nullptr;
}
m_blk->check_and_set_status(ObTmpMacroBlock::BlockStatus::WASHING,
ObTmpMacroBlock::BlockStatus::MEMORY);
handle.reset();
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(m_blk->check_and_set_status(ObTmpMacroBlock::BlockStatus::WASHING,
ObTmpMacroBlock::BlockStatus::MEMORY))) {
STORAGE_LOG(ERROR, "fail to rollback block status", K(ret), K(tmp_ret));
}
}
}
return ret;
@ -1380,8 +1393,6 @@ int ObTmpTenantMemBlockManager::write_io(
write_info.io_desc_.set_group_id(ObIOModule::TMP_TENANT_MEM_BLOCK_IO);
if (OB_FAIL(ObBlockManager::async_write_block(write_info, handle))) {
STORAGE_LOG(WARN, "Fail to async write block", K(ret), K(write_info), K(handle));
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.inc_ref(handle.get_macro_id()))) {
STORAGE_LOG(WARN, "fail to add macro id", K(ret), "macro id", handle.get_macro_id());
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.update_write_time(handle.get_macro_id(),
true/*update_to_max_time*/))) { //just to skip bad block inspect
STORAGE_LOG(WARN, "fail to update macro id write time", K(ret), "macro id", handle.get_macro_id());
@ -1415,83 +1426,70 @@ int64_t ObTmpTenantMemBlockManager::get_tenant_mem_block_num()
return tenant_mem_block_num;
}
int ObTmpTenantMemBlockManager::erase_block_from_map(const int64_t block_id)
{
int ret = OB_SUCCESS;
if (OB_FAIL(t_mblk_map_.erase_refactored(block_id))) {
STORAGE_LOG(WARN, "fail to erase t_mblk_map", K(ret), K(block_id));
}
int tmp_ret = ret;
if (OB_FAIL(wait_handles_map_.erase_refactored(block_id))) {
STORAGE_LOG(WARN, "fail to erase wait handles map", K(ret), K(block_id));
}
// need erase block_id from both map, return the error if not OB_SUCCESS
return OB_SUCCESS == tmp_ret ? ret : tmp_ret;
}
int ObTmpTenantMemBlockManager::exec_wait()
{
int ret = OB_SUCCESS;
const int64_t io_timeout_ms = GCONF._data_storage_io_timeout / 1000L;
int64_t wait_io_cnt = 0;
int64_t loop_nums = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObTmpFileStore has not been inited", K(ret));
} else if (!stopped_) {
common::ObSpLinkQueue::Link *node = NULL;
ObIOWaitInfoHandle wait_handle;
IOWaitInfo *wait_info = NULL;
SpinWLockGuard io_guard(io_lock_);
int64_t begin_us = ObTimeUtility::fast_current_time();
const int64_t begin_us = ObTimeUtility::fast_current_time();
while (OB_SUCC(wait_info_queue_.pop(node)) &&
(ObTimeUtility::fast_current_time() - begin_us)/1000 < TASK_INTERVAL) {
if (OB_ISNULL(node)) {
IOWaitInfo *wait_info = NULL;
++loop_nums;
if (OB_ISNULL(wait_info = static_cast<IOWaitInfo*>(node))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "unexpected error", K(ret));
} else if (FALSE_IT(wait_info = static_cast<IOWaitInfo*>(node))) {
} else if (OB_FAIL(wait_handles_map_.get_refactored(wait_info->get_block().get_block_id(), wait_handle))) {
STORAGE_LOG(WARN, "fail to get wait handles", K(ret), K(wait_info->get_block().get_block_id()), K_(tenant_id));
} else if (OB_FAIL(wait_info->exec_wait(io_timeout_ms))) {
STORAGE_LOG(WARN, "fail to exec iohandle wait", K(ret), K_(tenant_id));
} else if (FALSE_IT(wait_info->reset_io())) {
STORAGE_LOG(ERROR, "unexpected error, wait info is nullptr", K(ret), KP(node));
} else {
bool need_rollback = true;
ObTmpMacroBlock &blk = wait_info->get_block();
const MacroBlockId &macro_id = wait_info->block_handle_.get_macro_id();
const int64_t block_id = blk.get_block_id();
STORAGE_LOG(INFO, "start to wash a block", KPC(&blk));
ObThreadCondGuard cond_guard(cond_);
if (OB_FAIL(cond_guard.get_ret())) {
STORAGE_LOG(ERROR, "fail to guard request condition", K(ret));
} else if (OB_FAIL(erase_block_from_map(block_id))) {
STORAGE_LOG(WARN, "fail to erase t_mblk_map", K(ret), K(block_id));
} else if (OB_FAIL(blk.give_back_buf_into_cache(true/*set block disked for washed block*/))) {
STORAGE_LOG(WARN, "fail to give back buf into cache", K(ret), K(block_id));
} else if (FALSE_IT(need_rollback = false)) {
} else if (OB_FAIL(wait_info->broadcast())) {
STORAGE_LOG(ERROR, "signal io request condition failed", K(ret), K(block_id));
} else if (FALSE_IT(OB_TMP_FILE_STORE.dec_block_cache_num(tenant_id_, 1))) {
const int64_t free_page_nums = blk.get_free_page_nums();
if (OB_FAIL(wait_info->exec_wait(io_timeout_ms))) {
STORAGE_LOG(WARN, "fail to exec io handle wait", K(ret), K_(tenant_id), KPC(wait_info));
} else {
ObTaskController::get().allow_next_syslog();
STORAGE_LOG(INFO, "succeed to wash a block", K(block_id), K(t_mblk_map_.size()));
ATOMIC_DEC(&washing_count_);
}
if (OB_FAIL(ret) && need_rollback) {
STORAGE_LOG(WARN, "fail to wash a block, need rollback", K(ret), K(blk));
int tmp_ret = OB_SUCCESS;
blk.set_block_status(ObTmpMacroBlock::BlockStatus::MEMORY);
if (OB_SUCCESS != (tmp_ret = t_mblk_map_.set_refactored(block_id, &blk, 1/*overwrite*/))) {
STORAGE_LOG(INFO, "fail to retry wash block", K(tmp_ret), K(block_id), K(t_mblk_map_.size()));
STORAGE_LOG(INFO, "start to wash a block", K(block_id), KPC(&blk));
ObThreadCondGuard cond_guard(cond_);
if (OB_FAIL(cond_guard.get_ret())) {
STORAGE_LOG(WARN, "fail to guard request condition", K(ret));
} else {
ret = OB_SUCCESS;
ATOMIC_DEC(&washing_count_);
if (OB_FAIL(blk.give_back_buf_into_cache(true/*set block disked for washed block*/))) {
STORAGE_LOG(WARN, "fail to give back buf into cache", K(ret), K(block_id));
} else if (OB_FAIL(t_mblk_map_.erase_refactored(block_id))) {
if (OB_HASH_NOT_EXIST != ret) {
STORAGE_LOG(WARN, "fail to erase t_mblk_map", K(ret), K(block_id));
} else {
ret = OB_SUCCESS;
}
} else {
++wait_io_cnt;
OB_TMP_FILE_STORE.dec_block_cache_num(tenant_id_, 1);
ObTaskController::get().allow_next_syslog();
STORAGE_LOG(INFO, "succeed to wash a block", K(block_id), K(macro_id),
"free_page_nums:", free_page_nums, K(t_mblk_map_.size()));
}
}
}
wait_info->ret_code_ = ret;
int64_t tmp_ret = OB_SUCCESS;
// The broadcast() is executed regardless of success or failure, and the error code is ignored
// so that the next request can be executed.
if (OB_TMP_FAIL(wait_info->broadcast())) {
STORAGE_LOG(ERROR, "signal io request condition failed", K(ret), K(tmp_ret), K(block_id));
}
// Regardless of success or failure, need to erase wait info handle from map.
if (OB_TMP_FAIL(wait_handles_map_.erase_refactored(block_id))) {
if (OB_HASH_NOT_EXIST != tmp_ret) {
STORAGE_LOG(ERROR, "fail to erase wait handles map", K(ret), K(tmp_ret), K(block_id));
}
}
}
}
// no wait handle to process.
if (OB_FAIL(ret)) {
if (OB_EAGAIN != ret) {
STORAGE_LOG(ERROR, "unexpected error", K(ret));
}
}
}
@ -1500,6 +1498,16 @@ int ObTmpTenantMemBlockManager::exec_wait()
if (OB_TMP_FAIL(cond_.broadcast())) {
STORAGE_LOG(ERROR, "signal wash condition failed", K(ret), K(tmp_ret));
}
if (loop_nums > 0) {
const int64_t washing_count = ATOMIC_LOAD(&washing_count_);
int64_t block_cache_num = -1;
int64_t page_cache_num = -1;
OB_TMP_FILE_STORE.get_block_cache_num(tenant_id_, block_cache_num);
OB_TMP_FILE_STORE.get_page_cache_num(tenant_id_, page_cache_num);
ObTaskController::get().allow_next_syslog();
STORAGE_LOG(INFO, "succeed to do one round of tmp block io", K(ret), K(loop_nums),
K(wait_io_cnt), K(washing_count), K(block_cache_num), K(page_cache_num));
}
}
return ret;
}