Limit memory consumption during block manager mark and sweep
This commit is contained in:
@ -840,6 +840,8 @@ bool ObBlockManager::GetPendingFreeBlockFunctor::operator()(const MacroBlockId &
|
|||||||
} else if (OB_UNLIKELY(value.ref_cnt_ < 0)) {
|
} else if (OB_UNLIKELY(value.ref_cnt_ < 0)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_ERROR("fatal error, macro block ref cnt less than 0", K(ret), K(key), K(value));
|
LOG_ERROR("fatal error, macro block ref cnt less than 0", K(ret), K(key), K(value));
|
||||||
|
} else if (OB_UNLIKELY(blk_map_.count() >= max_free_blk_cnt_)) {
|
||||||
|
// skip inserting more free block
|
||||||
} else if (OB_FAIL(blk_map_.insert(key, true))) {
|
} else if (OB_FAIL(blk_map_.insert(key, true))) {
|
||||||
LOG_WARN("push back block id fail", K(ret), K(key));
|
LOG_WARN("push back block id fail", K(ret), K(key));
|
||||||
}
|
}
|
||||||
@ -861,18 +863,21 @@ bool ObBlockManager::GetAllMacroBlockIdFunctor::operator()(const MacroBlockId &k
|
|||||||
return OB_SUCCESS == ret;
|
return OB_SUCCESS == ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObBlockManager::CopyBlockToArrayFunctor::operator()(const MacroBlockId ¯o_id,
|
bool ObBlockManager::DoBlockSweepFunctor::operator()(
|
||||||
|
const MacroBlockId ¯o_id,
|
||||||
const bool can_free)
|
const bool can_free)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_UNLIKELY(!can_free)) {
|
if (OB_UNLIKELY(!can_free)) {
|
||||||
// ret = OB_ERR_UNEXPECTED;
|
// ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected error, this block cannot be freed", K(macro_id), K(can_free));
|
LOG_WARN("unexpected error, this block cannot be freed", K(macro_id), K(can_free));
|
||||||
} else if (OB_FAIL(block_ids_.push_back(macro_id))) {
|
} else if (OB_FAIL(block_manager_.sweep_one_block(macro_id))) {
|
||||||
LOG_WARN("fail to push back block id into array", K(ret), K(macro_id));
|
LOG_WARN("fail to sweep one block", K(ret), K(macro_id));
|
||||||
}
|
}
|
||||||
ret_code_ = ret;
|
// record last failure ret
|
||||||
return OB_SUCCESS == ret;
|
ret_code_ = OB_SUCCESS == ret ? ret_code_ : ret;
|
||||||
|
// ignore ret to sweep all blocks
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObBlockManager::is_bad_block(const MacroBlockId ¯o_block_id)
|
bool ObBlockManager::is_bad_block(const MacroBlockId ¯o_block_id)
|
||||||
@ -891,17 +896,19 @@ bool ObBlockManager::is_bad_block(const MacroBlockId ¯o_block_id)
|
|||||||
int ObBlockManager::do_sweep(MacroBlkIdMap &mark_info)
|
int ObBlockManager::do_sweep(MacroBlkIdMap &mark_info)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
common::ObSEArray<blocksstable::MacroBlockId, 256> blocks;
|
DoBlockSweepFunctor functor(*this);
|
||||||
CopyBlockToArrayFunctor functor(blocks);
|
|
||||||
if (0 == mark_info.count()) {
|
if (0 == mark_info.count()) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (OB_FAIL(mark_info.for_each(functor))) {
|
} else if (OB_FAIL(mark_info.for_each(functor))) {
|
||||||
ret = functor.get_ret_code();
|
ret = functor.get_ret_code();
|
||||||
LOG_WARN("fail to copy block into pending free list", K(ret));
|
LOG_WARN("fail to do block sweep", K(ret));
|
||||||
} else {
|
}
|
||||||
// ignore ret to sweep all blocks
|
return ret;
|
||||||
for (int64_t i = 0; i < blocks.count(); i++) {
|
}
|
||||||
const MacroBlockId ¯o_id = blocks.at(i);
|
|
||||||
|
int ObBlockManager::sweep_one_block(const MacroBlockId& macro_id)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
ObBucketHashWLockGuard lock_guard(bucket_lock_, macro_id.hash());
|
ObBucketHashWLockGuard lock_guard(bucket_lock_, macro_id.hash());
|
||||||
BlockInfo block_info;
|
BlockInfo block_info;
|
||||||
ObIOFd io_fd;
|
ObIOFd io_fd;
|
||||||
@ -911,15 +918,12 @@ int ObBlockManager::do_sweep(MacroBlkIdMap &mark_info)
|
|||||||
LOG_WARN("fail to get block info from block map", K(ret), K(macro_id));
|
LOG_WARN("fail to get block info from block map", K(ret), K(macro_id));
|
||||||
} else if (OB_UNLIKELY(block_info.ref_cnt_ > 0)) {
|
} else if (OB_UNLIKELY(block_info.ref_cnt_ > 0)) {
|
||||||
// skip using block.
|
// skip using block.
|
||||||
continue;
|
|
||||||
} else if (OB_FAIL(block_map_.erase(macro_id))) {
|
} else if (OB_FAIL(block_map_.erase(macro_id))) {
|
||||||
LOG_WARN("fail to erase block info from block map", K(ret), K(macro_id));
|
LOG_WARN("fail to erase block info from block map", K(ret), K(macro_id));
|
||||||
} else {
|
} else {
|
||||||
io_device_->free_block(io_fd);
|
io_device_->free_block(io_fd);
|
||||||
FLOG_INFO("block manager free block", K(macro_id), K(io_fd));
|
FLOG_INFO("block manager free block", K(macro_id), K(io_fd));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -932,6 +936,8 @@ void ObBlockManager::mark_and_sweep()
|
|||||||
bool skip_mark = false;
|
bool skip_mark = false;
|
||||||
// we must assign alloc_num_ before mark_macro_blocks, because it will be set to 0 in this func
|
// we must assign alloc_num_ before mark_macro_blocks, because it will be set to 0 in this func
|
||||||
int64_t alloc_num = 0;
|
int64_t alloc_num = 0;
|
||||||
|
// recycle maximum 200 GB space, but no more than 6MB memory consumption for mark_info
|
||||||
|
const int64_t MAX_FREE_BLOCK_COUNT_PER_ROUND = 100000;
|
||||||
|
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -945,16 +951,27 @@ void ObBlockManager::mark_and_sweep()
|
|||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(mark_info.init(ObModIds::OB_STORAGE_FILE_BLOCK_REF, OB_SERVER_TENANT_ID))) {
|
if (OB_FAIL(mark_info.init(ObModIds::OB_STORAGE_FILE_BLOCK_REF, OB_SERVER_TENANT_ID))) {
|
||||||
LOG_WARN("fail to init mark info, ", K(ret));
|
LOG_WARN("fail to init mark info, ", K(ret));
|
||||||
} else if (OB_FAIL(macro_id_set.create(MAX(2, block_map_.count()), "BlkIdSetBkt", "BlkIdSetNode",
|
} else if (OB_FAIL(macro_id_set.create(MAX(2, block_map_.get_bkt_cnt()), "BlkIdSetBkt", "BlkIdSetNode",
|
||||||
OB_SERVER_TENANT_ID))) {
|
OB_SERVER_TENANT_ID))) {
|
||||||
LOG_WARN("fail to create macro id set", K(ret));
|
LOG_WARN("fail to create macro id set", K(ret));
|
||||||
} else {
|
} else {
|
||||||
GetPendingFreeBlockFunctor pending_free_functor(mark_info, tmp_status.hold_count_);
|
GetPendingFreeBlockFunctor pending_free_functor(
|
||||||
|
MAX_FREE_BLOCK_COUNT_PER_ROUND, mark_info, tmp_status.hold_count_);
|
||||||
tmp_status.start_time_ = ObTimeUtility::fast_current_time();
|
tmp_status.start_time_ = ObTimeUtility::fast_current_time();
|
||||||
if (OB_FAIL(block_map_.for_each(pending_free_functor))) {
|
if (OB_FAIL(block_map_.for_each(pending_free_functor))) {
|
||||||
ret = pending_free_functor.get_ret_code();
|
ret = pending_free_functor.get_ret_code();
|
||||||
LOG_WARN("fail to get pending free blocks", K(ret));
|
LOG_WARN("fail to get pending free blocks", K(ret));
|
||||||
} else if (0 == (alloc_num = ATOMIC_SET(&alloc_num_, 0)) && 0 == mark_info.count()) {
|
} else if ((mark_info.count() < MAX_FREE_BLOCK_COUNT_PER_ROUND)) {
|
||||||
|
// Only try to set alloc_num_ to 0 when macro info is complete, else do mark and sweep again.
|
||||||
|
if (0 != (alloc_num = ATOMIC_SET(&alloc_num_, 0))) {
|
||||||
|
// Some one alloc block after GetPendingFreeBlockFunctor concurrently.
|
||||||
|
// let mark and sweep do again next round whatever mark_info is empty or not
|
||||||
|
ATOMIC_SET(&alloc_num_, alloc_num);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
} else if (0 == mark_info.count()) {
|
||||||
skip_mark = true;
|
skip_mark = true;
|
||||||
LOG_INFO("no block alloc/free, no need to mark blocks", K(ret), K(mark_info.count()));
|
LOG_INFO("no block alloc/free, no need to mark blocks", K(ret), K(mark_info.count()));
|
||||||
} else if (OB_FAIL(mark_macro_blocks(mark_info, macro_id_set, tmp_status))) {//mark
|
} else if (OB_FAIL(mark_macro_blocks(mark_info, macro_id_set, tmp_status))) {//mark
|
||||||
|
|||||||
@ -300,9 +300,11 @@ private:
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
GetPendingFreeBlockFunctor(
|
GetPendingFreeBlockFunctor(
|
||||||
|
const int64_t max_free_blk_cnt,
|
||||||
MacroBlkIdMap &blk_map,
|
MacroBlkIdMap &blk_map,
|
||||||
int64_t &hold_count)
|
int64_t &hold_count)
|
||||||
: ret_code_(common::OB_SUCCESS),
|
: ret_code_(common::OB_SUCCESS),
|
||||||
|
max_free_blk_cnt_(max_free_blk_cnt),
|
||||||
blk_map_(blk_map),
|
blk_map_(blk_map),
|
||||||
hold_count_(hold_count)
|
hold_count_(hold_count)
|
||||||
{}
|
{}
|
||||||
@ -313,25 +315,26 @@ private:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
int ret_code_;
|
int ret_code_;
|
||||||
|
int64_t max_free_blk_cnt_;
|
||||||
MacroBlkIdMap &blk_map_;
|
MacroBlkIdMap &blk_map_;
|
||||||
int64_t &hold_count_;
|
int64_t &hold_count_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class CopyBlockToArrayFunctor final
|
class DoBlockSweepFunctor final
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
CopyBlockToArrayFunctor(common::ObIArray<MacroBlockId> &block_ids)
|
DoBlockSweepFunctor(ObBlockManager& block_manager)
|
||||||
: ret_code_(common::OB_SUCCESS),
|
: ret_code_(common::OB_SUCCESS),
|
||||||
block_ids_(block_ids)
|
block_manager_(block_manager)
|
||||||
{}
|
{}
|
||||||
~CopyBlockToArrayFunctor() = default;
|
~DoBlockSweepFunctor() = default;
|
||||||
|
|
||||||
bool operator()(const MacroBlockId ¯o_id, const bool can_free);
|
bool operator()(const MacroBlockId ¯o_id, const bool can_free);
|
||||||
int get_ret_code() const { return ret_code_; }
|
int get_ret_code() const { return ret_code_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int ret_code_;
|
int ret_code_;
|
||||||
common::ObIArray<MacroBlockId> &block_ids_;
|
ObBlockManager& block_manager_;
|
||||||
};
|
};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -391,6 +394,7 @@ private:
|
|||||||
ObMacroBlockMarkerStatus &tmp_status);
|
ObMacroBlockMarkerStatus &tmp_status);
|
||||||
int set_group_id(const uint64_t tenant_id);
|
int set_group_id(const uint64_t tenant_id);
|
||||||
int do_sweep(MacroBlkIdMap &mark_info);
|
int do_sweep(MacroBlkIdMap &mark_info);
|
||||||
|
int sweep_one_block(const MacroBlockId& macro_id);
|
||||||
|
|
||||||
int update_mark_info(
|
int update_mark_info(
|
||||||
const common::ObIArray<MacroBlockId> ¯o_block_list,
|
const common::ObIArray<MacroBlockId> ¯o_block_list,
|
||||||
|
|||||||
@ -170,7 +170,7 @@ TEST_F(TestBlockManager, test_mark_and_sweep)
|
|||||||
ASSERT_EQ(OB_SUCCESS, ret);
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
int64_t safe_ts = ObTimeUtility::current_time();
|
int64_t safe_ts = ObTimeUtility::current_time();
|
||||||
int64_t hold_cnt = 0;
|
int64_t hold_cnt = 0;
|
||||||
ObBlockManager::GetPendingFreeBlockFunctor functor(mark_info, hold_cnt);
|
ObBlockManager::GetPendingFreeBlockFunctor functor(1000000, mark_info, hold_cnt);
|
||||||
ret = OB_SERVER_BLOCK_MGR.block_map_.for_each(functor);
|
ret = OB_SERVER_BLOCK_MGR.block_map_.for_each(functor);
|
||||||
ASSERT_EQ(OB_SUCCESS, ret);
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
ASSERT_EQ(blk_cnt - 1, mark_info.count());
|
ASSERT_EQ(blk_cnt - 1, mark_info.count());
|
||||||
|
|||||||
Reference in New Issue
Block a user