[BUG.FIX] fix data corrupt in tmp file
This commit is contained in:
@ -622,6 +622,8 @@ int ObTmpFileExtent::try_sync_block()
|
|||||||
} else if (OB_FAIL(OB_TMP_FILE_STORE.wash_block(owner_->get_tenant_id(), block_id_, handle))) {
|
} else if (OB_FAIL(OB_TMP_FILE_STORE.wash_block(owner_->get_tenant_id(), block_id_, handle))) {
|
||||||
// try to flush the block to the disk. If fails, do nothing.
|
// try to flush the block to the disk. If fails, do nothing.
|
||||||
STORAGE_LOG(DEBUG, "fail to sync block", K(ret), K(owner_->get_tenant_id()), K(block_id_));
|
STORAGE_LOG(DEBUG, "fail to sync block", K(ret), K(owner_->get_tenant_id()), K(block_id_));
|
||||||
|
} else {
|
||||||
|
STORAGE_LOG(DEBUG, "succeed to sync wash block", K(block_id_));
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -1137,6 +1137,7 @@ int ObTmpTenantMemBlockManager::cleanup()
|
|||||||
} else if (OB_FAIL(t_mblk_map_.foreach_refactored(op))) {
|
} else if (OB_FAIL(t_mblk_map_.foreach_refactored(op))) {
|
||||||
STORAGE_LOG(WARN, "choose blks failed", K(ret));
|
STORAGE_LOG(WARN, "choose blks failed", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
const int64_t candidate_cnt = heap.count();
|
||||||
bool wash_success = false;
|
bool wash_success = false;
|
||||||
while (OB_SUCC(ret) && heap.count() > 0 && !wash_success) {
|
while (OB_SUCC(ret) && heap.count() > 0 && !wash_success) {
|
||||||
const BlockInfo info = heap.top();
|
const BlockInfo info = heap.top();
|
||||||
@ -1145,6 +1146,7 @@ int ObTmpTenantMemBlockManager::cleanup()
|
|||||||
STORAGE_LOG(WARN, "fail to wash", K(ret), K_(tenant_id), K(info.block_id_));
|
STORAGE_LOG(WARN, "fail to wash", K(ret), K_(tenant_id), K(info.block_id_));
|
||||||
} else {
|
} else {
|
||||||
wash_success = handle.is_valid();
|
wash_success = handle.is_valid();
|
||||||
|
STORAGE_LOG(DEBUG, "succeed to wash block for cleanup", K(info));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
@ -1155,6 +1157,7 @@ int ObTmpTenantMemBlockManager::cleanup()
|
|||||||
}
|
}
|
||||||
if (OB_SUCC(ret) && !wash_success) {
|
if (OB_SUCC(ret) && !wash_success) {
|
||||||
ret = OB_STATE_NOT_MATCH;
|
ret = OB_STATE_NOT_MATCH;
|
||||||
|
STORAGE_LOG(WARN, "fail to cleanup", K(ret), K(t_mblk_map_.size()), K(candidate_cnt));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1217,6 +1220,7 @@ int ObTmpTenantMemBlockManager::ChooseBlocksMapOp::operator () (oceanbase::commo
|
|||||||
STORAGE_LOG(WARN, "insert block to array failed", K(ret));
|
STORAGE_LOG(WARN, "insert block to array failed", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
STORAGE_LOG(DEBUG, "choose one block", K(ret), KPC(blk));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1270,7 +1274,7 @@ int ObTmpTenantMemBlockManager::refresh_dir_to_blk_map(const int64_t dir_id,
|
|||||||
int ObTmpTenantMemBlockManager::get_block_and_set_washing(int64_t block_id, ObTmpMacroBlock *&m_blk)
|
int ObTmpTenantMemBlockManager::get_block_and_set_washing(int64_t block_id, ObTmpMacroBlock *&m_blk)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool is_closed = false;
|
bool is_sealed = false;
|
||||||
uint64_t hash_val = 0;
|
uint64_t hash_val = 0;
|
||||||
hash_val = murmurhash(&block_id, sizeof(block_id), hash_val);
|
hash_val = murmurhash(&block_id, sizeof(block_id), hash_val);
|
||||||
ObBucketHashRLockGuard lock_guard(map_lock_, hash_val);
|
ObBucketHashRLockGuard lock_guard(map_lock_, hash_val);
|
||||||
@ -1291,11 +1295,11 @@ int ObTmpTenantMemBlockManager::get_block_and_set_washing(int64_t block_id, ObTm
|
|||||||
}
|
}
|
||||||
// refresh ret can be ignored. overwrite the ret.
|
// refresh ret can be ignored. overwrite the ret.
|
||||||
ret = OB_STATE_NOT_MATCH;
|
ret = OB_STATE_NOT_MATCH;
|
||||||
} else if (OB_FAIL(m_blk->close(is_closed))) {
|
} else if (OB_FAIL(m_blk->seal(is_sealed))) {
|
||||||
STORAGE_LOG(WARN, "fail to close block", K(ret), K(*m_blk));
|
STORAGE_LOG(WARN, "fail to seal block", K(ret), K(*m_blk));
|
||||||
} else if (!is_closed) {
|
} else if (OB_UNLIKELY(!is_sealed)) {
|
||||||
ret = OB_STATE_NOT_MATCH;
|
ret = OB_STATE_NOT_MATCH;
|
||||||
STORAGE_LOG(WARN, "the block has some unclosed extents", K(ret), K(*m_blk));
|
STORAGE_LOG(WARN, "the block has some unclosed extents", K(ret), K(is_sealed), K(*m_blk));
|
||||||
} else if (OB_UNLIKELY(0 == m_blk->get_used_page_nums())) {
|
} else if (OB_UNLIKELY(0 == m_blk->get_used_page_nums())) {
|
||||||
ret = OB_STATE_NOT_MATCH;
|
ret = OB_STATE_NOT_MATCH;
|
||||||
STORAGE_LOG(WARN, "the block write has not been finished", K(ret), K(*m_blk));
|
STORAGE_LOG(WARN, "the block write has not been finished", K(ret), K(*m_blk));
|
||||||
|
|||||||
@ -420,6 +420,7 @@ ObTmpMacroBlock::ObTmpMacroBlock()
|
|||||||
tmp_file_header_(),
|
tmp_file_header_(),
|
||||||
io_desc_(),
|
io_desc_(),
|
||||||
block_status_(MAX),
|
block_status_(MAX),
|
||||||
|
is_sealed_(false),
|
||||||
is_inited_(false),
|
is_inited_(false),
|
||||||
alloc_time_(0),
|
alloc_time_(0),
|
||||||
access_time_(0)
|
access_time_(0)
|
||||||
@ -465,21 +466,24 @@ void ObTmpMacroBlock::destroy()
|
|||||||
buffer_ = NULL;
|
buffer_ = NULL;
|
||||||
handle_.reset();
|
handle_.reset();
|
||||||
ATOMIC_STORE(&block_status_, MAX);
|
ATOMIC_STORE(&block_status_, MAX);
|
||||||
|
is_sealed_ = false;
|
||||||
alloc_time_ = 0;
|
alloc_time_ = 0;
|
||||||
ATOMIC_STORE(&access_time_, 0);
|
ATOMIC_STORE(&access_time_, 0);
|
||||||
|
is_inited_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTmpMacroBlock::close(bool &is_all_close)
|
int ObTmpMacroBlock::seal(bool &is_sealed)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
is_all_close = true;
|
is_sealed = false;
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
is_all_close = false;
|
ret = OB_NOT_INIT;
|
||||||
STORAGE_LOG(WARN, "ObTmpMacroBlock has not been inited");
|
STORAGE_LOG(WARN, "ObTmpMacroBlock has not been inited", K(ret), KPC(this));
|
||||||
} else {
|
} else {
|
||||||
ObTmpFileExtent *tmp = NULL;
|
ObTmpFileExtent *tmp = NULL;
|
||||||
SpinWLockGuard guard(lock_);
|
SpinWLockGuard guard(lock_);
|
||||||
for (int32_t i = 0; OB_SUCC(ret) && i < using_extents_.count() && is_all_close; i++) {
|
bool is_all_closed = using_extents_.count() == 0 ? false : true;
|
||||||
|
for (int32_t i = 0; OB_SUCC(ret) && i < using_extents_.count() && is_all_closed; ++i) {
|
||||||
tmp = using_extents_.at(i);
|
tmp = using_extents_.at(i);
|
||||||
if (NULL != tmp && !tmp->is_closed()) {
|
if (NULL != tmp && !tmp->is_closed()) {
|
||||||
uint8_t start_id = ObTmpFilePageBuddy::MAX_PAGE_NUMS;
|
uint8_t start_id = ObTmpFilePageBuddy::MAX_PAGE_NUMS;
|
||||||
@ -497,18 +501,23 @@ int ObTmpMacroBlock::close(bool &is_all_close)
|
|||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
tmp->unclose(page_nums);
|
tmp->unclose(page_nums);
|
||||||
is_all_close = false;
|
is_all_closed = false;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
is_all_close = false;
|
is_all_closed = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (OB_SUCC(ret) && is_all_closed) {
|
||||||
|
is_sealed = true;
|
||||||
|
ATOMIC_SET(&is_sealed_, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTmpMacroBlock::is_extents_closed(bool &is_extents_closed) {
|
int ObTmpMacroBlock::is_extents_closed(bool &is_extents_closed)
|
||||||
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
is_extents_closed = true;
|
is_extents_closed = true;
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
@ -516,9 +525,10 @@ int ObTmpMacroBlock::is_extents_closed(bool &is_extents_closed) {
|
|||||||
STORAGE_LOG(WARN, "ObTmpMacroBlock has not been inited");
|
STORAGE_LOG(WARN, "ObTmpMacroBlock has not been inited");
|
||||||
} else {
|
} else {
|
||||||
SpinRLockGuard guard(lock_);
|
SpinRLockGuard guard(lock_);
|
||||||
|
is_extents_closed = using_extents_.count() == 0 ? false : true;
|
||||||
for (int64_t i = 0; i< using_extents_.count(); ++i) {
|
for (int64_t i = 0; i< using_extents_.count(); ++i) {
|
||||||
if (!using_extents_.at(i)->is_closed()) {
|
if (!using_extents_.at(i)->is_closed()) {
|
||||||
STORAGE_LOG(DEBUG, "the tmp macro block's extents is not all closed", K(ret), K(tmp_file_header_.block_id_));
|
STORAGE_LOG(DEBUG, "the tmp macro block's extents is not all closed", K(tmp_file_header_.block_id_));
|
||||||
is_extents_closed = false;
|
is_extents_closed = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -587,12 +597,13 @@ int ObTmpMacroBlock::alloc_all_pages(ObTmpFileExtent &extent)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
SpinWLockGuard guard(lock_);
|
SpinWLockGuard guard(lock_);
|
||||||
|
const bool sealed = is_sealed();
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
STORAGE_LOG(WARN, "ObTmpMacroBlock has not been inited", K(ret));
|
STORAGE_LOG(WARN, "ObTmpMacroBlock has not been inited", K(ret));
|
||||||
} else if (!is_memory()) {
|
} else if (OB_UNLIKELY(!is_memory() || sealed)) {
|
||||||
ret = OB_STATE_NOT_MATCH;
|
ret = OB_STATE_NOT_MATCH;
|
||||||
STORAGE_LOG(WARN, "the block is not in memory", K(ret), K(ATOMIC_LOAD(&block_status_)));
|
STORAGE_LOG(WARN, "the block is not in memory", K(ret), K(ATOMIC_LOAD(&block_status_)), K(sealed));
|
||||||
} else if (OB_FAIL(page_buddy_.alloc_all_pages())) {
|
} else if (OB_FAIL(page_buddy_.alloc_all_pages())) {
|
||||||
STORAGE_LOG(WARN, "Fail to allocate the tmp extent", K(ret), K_(tmp_file_header), K_(page_buddy));
|
STORAGE_LOG(WARN, "Fail to allocate the tmp extent", K(ret), K_(tmp_file_header), K_(page_buddy));
|
||||||
} else {
|
} else {
|
||||||
@ -620,12 +631,13 @@ int ObTmpMacroBlock::alloc(const uint8_t page_nums, ObTmpFileExtent &extent)
|
|||||||
uint8_t start_page_id = extent.get_start_page_id();
|
uint8_t start_page_id = extent.get_start_page_id();
|
||||||
uint8_t alloced_page_nums = 0;
|
uint8_t alloced_page_nums = 0;
|
||||||
SpinWLockGuard guard(lock_);
|
SpinWLockGuard guard(lock_);
|
||||||
|
const bool sealed = is_sealed();
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
STORAGE_LOG(WARN, "ObTmpMacroBlock has not been inited", K(ret));
|
STORAGE_LOG(WARN, "ObTmpMacroBlock has not been inited", K(ret));
|
||||||
} else if (!is_memory()) {
|
} else if (OB_UNLIKELY(!is_memory() || sealed)) {
|
||||||
ret = OB_STATE_NOT_MATCH;
|
ret = OB_STATE_NOT_MATCH;
|
||||||
STORAGE_LOG(WARN, "the block is not in memory", K(ret), K(ATOMIC_LOAD(&block_status_)));
|
STORAGE_LOG(WARN, "the block is not in memory", K(ret), K(ATOMIC_LOAD(&block_status_)), K(sealed));
|
||||||
} else if (OB_FAIL(page_buddy_.alloc(page_nums, start_page_id, alloced_page_nums))) {
|
} else if (OB_FAIL(page_buddy_.alloc(page_nums, start_page_id, alloced_page_nums))) {
|
||||||
STORAGE_LOG(WARN, "Fail to allocate the tmp extent", K(ret), K_(tmp_file_header), K_(page_buddy));
|
STORAGE_LOG(WARN, "Fail to allocate the tmp extent", K(ret), K_(tmp_file_header), K_(page_buddy));
|
||||||
} else {
|
} else {
|
||||||
@ -696,7 +708,7 @@ int64_t ObTmpMacroBlock::get_used_page_nums() const
|
|||||||
SpinRLockGuard guard(lock_);
|
SpinRLockGuard guard(lock_);
|
||||||
int64_t used_page_nums = 0;
|
int64_t used_page_nums = 0;
|
||||||
for (int64_t i = using_extents_.count() - 1; i >= 0; --i) {
|
for (int64_t i = using_extents_.count() - 1; i >= 0; --i) {
|
||||||
used_page_nums += std::ceil(using_extents_.at(i)->get_offset() / DEFAULT_PAGE_SIZE);
|
used_page_nums += std::ceil((1.0 * using_extents_.at(i)->get_offset()) / DEFAULT_PAGE_SIZE);
|
||||||
}
|
}
|
||||||
return used_page_nums;
|
return used_page_nums;
|
||||||
}
|
}
|
||||||
@ -1364,9 +1376,8 @@ int ObTmpTenantFileStore::write(const ObTmpBlockIOInfo &io_info)
|
|||||||
block->set_io_desc(io_info.io_desc_);
|
block->set_io_desc(io_info.io_desc_);
|
||||||
MEMCPY(block->get_buffer() + io_info.offset_, io_info.buf_, io_info.size_);
|
MEMCPY(block->get_buffer() + io_info.offset_, io_info.buf_, io_info.size_);
|
||||||
} else {
|
} else {
|
||||||
// Skip washing and disked status
|
// The washing and disked block shouldn't write data, Otherwise, it will cause data corrupt.
|
||||||
// Won't change io_info for retry alloc block.
|
ret = OB_NOT_SUPPORTED;
|
||||||
ret = OB_EAGAIN;
|
|
||||||
STORAGE_LOG(WARN, "block status is not correct", K(ret), K(io_info));
|
STORAGE_LOG(WARN, "block status is not correct", K(ret), K(io_info));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -1416,6 +1427,8 @@ int ObTmpTenantFileStore::sync_block(const int64_t block_id,
|
|||||||
//do nothing
|
//do nothing
|
||||||
} else if (OB_FAIL(tmp_mem_block_manager_.wash_block(block_id, handle))) {
|
} else if (OB_FAIL(tmp_mem_block_manager_.wash_block(block_id, handle))) {
|
||||||
STORAGE_LOG(WARN, "wash block failed", K(ret), K(block_id));
|
STORAGE_LOG(WARN, "wash block failed", K(ret), K(block_id));
|
||||||
|
} else {
|
||||||
|
STORAGE_LOG(DEBUG, "succeed to sync block", K(block_id));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -186,14 +186,16 @@ public:
|
|||||||
common::ObIArray<ObTmpFileExtent *> &get_extents() { return using_extents_; }
|
common::ObIArray<ObTmpFileExtent *> &get_extents() { return using_extents_; }
|
||||||
ObTmpBlockValueHandle &get_handle() { return handle_; }
|
ObTmpBlockValueHandle &get_handle() { return handle_; }
|
||||||
bool is_empty() const { return page_buddy_.is_empty(); }
|
bool is_empty() const { return page_buddy_.is_empty(); }
|
||||||
int close(bool &is_all_close);
|
int seal(bool &is_sealed);
|
||||||
int is_extents_closed(bool &is_extents_closed);
|
int is_extents_closed(bool &is_extents_closed);
|
||||||
int give_back_buf_into_cache(const bool is_wash = false);
|
int give_back_buf_into_cache(const bool is_wash = false);
|
||||||
|
|
||||||
TO_STRING_KV(KP_(buffer), K_(page_buddy), K_(handle), K_(macro_block_handle), K_(tmp_file_header),
|
TO_STRING_KV(KP_(buffer), K_(page_buddy), K_(handle), K_(macro_block_handle), K_(tmp_file_header),
|
||||||
K_(io_desc), K_(block_status), K_(is_inited), K_(alloc_time), K_(access_time));
|
K_(io_desc), K_(block_status), K_(is_inited), K_(alloc_time), K_(access_time));
|
||||||
private:
|
private:
|
||||||
static const int64_t DEFAULT_PAGE_SIZE;
|
bool is_sealed() const { return ATOMIC_LOAD(&is_sealed_); }
|
||||||
|
private:
|
||||||
|
static const int64_t DEFAULT_PAGE_SIZE;
|
||||||
char *buffer_;
|
char *buffer_;
|
||||||
ObTmpFilePageBuddy page_buddy_;
|
ObTmpFilePageBuddy page_buddy_;
|
||||||
ObTmpBlockValueHandle handle_;
|
ObTmpBlockValueHandle handle_;
|
||||||
@ -203,6 +205,7 @@ private:
|
|||||||
common::ObIOFlag io_desc_;
|
common::ObIOFlag io_desc_;
|
||||||
common::SpinRWLock lock_;
|
common::SpinRWLock lock_;
|
||||||
BlockStatus block_status_;
|
BlockStatus block_status_;
|
||||||
|
bool is_sealed_;
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
int64_t alloc_time_;
|
int64_t alloc_time_;
|
||||||
int64_t access_time_;
|
int64_t access_time_;
|
||||||
|
|||||||
Reference in New Issue
Block a user