Limit memory consumption during block manager mark and sweep
This commit is contained in:
parent
4458733d23
commit
98d518f292
@ -840,6 +840,8 @@ bool ObBlockManager::GetPendingFreeBlockFunctor::operator()(const MacroBlockId &
|
||||
} else if (OB_UNLIKELY(value.ref_cnt_ < 0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("fatal error, macro block ref cnt less than 0", K(ret), K(key), K(value));
|
||||
} else if (OB_UNLIKELY(blk_map_.count() >= max_free_blk_cnt_)) {
|
||||
// skip inserting more free block
|
||||
} else if (OB_FAIL(blk_map_.insert(key, true))) {
|
||||
LOG_WARN("push back block id fail", K(ret), K(key));
|
||||
}
|
||||
@ -861,18 +863,21 @@ bool ObBlockManager::GetAllMacroBlockIdFunctor::operator()(const MacroBlockId &k
|
||||
return OB_SUCCESS == ret;
|
||||
}
|
||||
|
||||
bool ObBlockManager::CopyBlockToArrayFunctor::operator()(const MacroBlockId ¯o_id,
|
||||
const bool can_free)
|
||||
bool ObBlockManager::DoBlockSweepFunctor::operator()(
|
||||
const MacroBlockId ¯o_id,
|
||||
const bool can_free)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!can_free)) {
|
||||
// ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, this block cannot be freed", K(macro_id), K(can_free));
|
||||
} else if (OB_FAIL(block_ids_.push_back(macro_id))) {
|
||||
LOG_WARN("fail to push back block id into array", K(ret), K(macro_id));
|
||||
} else if (OB_FAIL(block_manager_.sweep_one_block(macro_id))) {
|
||||
LOG_WARN("fail to sweep one block", K(ret), K(macro_id));
|
||||
}
|
||||
ret_code_ = ret;
|
||||
return OB_SUCCESS == ret;
|
||||
// record last failure ret
|
||||
ret_code_ = OB_SUCCESS == ret ? ret_code_ : ret;
|
||||
// ignore ret to sweep all blocks
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ObBlockManager::is_bad_block(const MacroBlockId ¯o_block_id)
|
||||
@ -891,34 +896,33 @@ bool ObBlockManager::is_bad_block(const MacroBlockId ¯o_block_id)
|
||||
int ObBlockManager::do_sweep(MacroBlkIdMap &mark_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObSEArray<blocksstable::MacroBlockId, 256> blocks;
|
||||
CopyBlockToArrayFunctor functor(blocks);
|
||||
DoBlockSweepFunctor functor(*this);
|
||||
if (0 == mark_info.count()) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(mark_info.for_each(functor))) {
|
||||
ret = functor.get_ret_code();
|
||||
LOG_WARN("fail to copy block into pending free list", K(ret));
|
||||
LOG_WARN("fail to do block sweep", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBlockManager::sweep_one_block(const MacroBlockId& macro_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObBucketHashWLockGuard lock_guard(bucket_lock_, macro_id.hash());
|
||||
BlockInfo block_info;
|
||||
ObIOFd io_fd;
|
||||
io_fd.first_id_ = macro_id.first_id();
|
||||
io_fd.second_id_ = macro_id.second_id();
|
||||
if (OB_FAIL(block_map_.get(macro_id, block_info))) {
|
||||
LOG_WARN("fail to get block info from block map", K(ret), K(macro_id));
|
||||
} else if (OB_UNLIKELY(block_info.ref_cnt_ > 0)) {
|
||||
// skip using block.
|
||||
} else if (OB_FAIL(block_map_.erase(macro_id))) {
|
||||
LOG_WARN("fail to erase block info from block map", K(ret), K(macro_id));
|
||||
} else {
|
||||
// ignore ret to sweep all blocks
|
||||
for (int64_t i = 0; i < blocks.count(); i++) {
|
||||
const MacroBlockId ¯o_id = blocks.at(i);
|
||||
ObBucketHashWLockGuard lock_guard(bucket_lock_, macro_id.hash());
|
||||
BlockInfo block_info;
|
||||
ObIOFd io_fd;
|
||||
io_fd.first_id_ = macro_id.first_id();
|
||||
io_fd.second_id_ = macro_id.second_id();
|
||||
if (OB_FAIL(block_map_.get(macro_id, block_info))) {
|
||||
LOG_WARN("fail to get block info from block map", K(ret), K(macro_id));
|
||||
} else if (OB_UNLIKELY(block_info.ref_cnt_ > 0)) {
|
||||
// skip using block.
|
||||
continue;
|
||||
} else if (OB_FAIL(block_map_.erase(macro_id))) {
|
||||
LOG_WARN("fail to erase block info from block map", K(ret), K(macro_id));
|
||||
} else {
|
||||
io_device_->free_block(io_fd);
|
||||
FLOG_INFO("block manager free block", K(macro_id), K(io_fd));
|
||||
}
|
||||
}
|
||||
io_device_->free_block(io_fd);
|
||||
FLOG_INFO("block manager free block", K(macro_id), K(io_fd));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -932,6 +936,8 @@ void ObBlockManager::mark_and_sweep()
|
||||
bool skip_mark = false;
|
||||
// we must assign alloc_num_ before mark_macro_blocks, because it will be set to 0 in this func
|
||||
int64_t alloc_num = 0;
|
||||
// recycle maximum 200 GB space, but no more than 6MB memory consumption for mark_info
|
||||
const int64_t MAX_FREE_BLOCK_COUNT_PER_ROUND = 100000;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -945,16 +951,27 @@ void ObBlockManager::mark_and_sweep()
|
||||
} else {
|
||||
if (OB_FAIL(mark_info.init(ObModIds::OB_STORAGE_FILE_BLOCK_REF, OB_SERVER_TENANT_ID))) {
|
||||
LOG_WARN("fail to init mark info, ", K(ret));
|
||||
} else if (OB_FAIL(macro_id_set.create(MAX(2, block_map_.count()), "BlkIdSetBkt", "BlkIdSetNode",
|
||||
} else if (OB_FAIL(macro_id_set.create(MAX(2, block_map_.get_bkt_cnt()), "BlkIdSetBkt", "BlkIdSetNode",
|
||||
OB_SERVER_TENANT_ID))) {
|
||||
LOG_WARN("fail to create macro id set", K(ret));
|
||||
} else {
|
||||
GetPendingFreeBlockFunctor pending_free_functor(mark_info, tmp_status.hold_count_);
|
||||
GetPendingFreeBlockFunctor pending_free_functor(
|
||||
MAX_FREE_BLOCK_COUNT_PER_ROUND, mark_info, tmp_status.hold_count_);
|
||||
tmp_status.start_time_ = ObTimeUtility::fast_current_time();
|
||||
if (OB_FAIL(block_map_.for_each(pending_free_functor))) {
|
||||
ret = pending_free_functor.get_ret_code();
|
||||
LOG_WARN("fail to get pending free blocks", K(ret));
|
||||
} else if (0 == (alloc_num = ATOMIC_SET(&alloc_num_, 0)) && 0 == mark_info.count()) {
|
||||
} else if ((mark_info.count() < MAX_FREE_BLOCK_COUNT_PER_ROUND)) {
|
||||
// Only try to set alloc_num_ to 0 when macro info is complete, else do mark and sweep again.
|
||||
if (0 != (alloc_num = ATOMIC_SET(&alloc_num_, 0))) {
|
||||
// Some one alloc block after GetPendingFreeBlockFunctor concurrently.
|
||||
// let mark and sweep do again next round whatever mark_info is empty or not
|
||||
ATOMIC_SET(&alloc_num_, alloc_num);
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (0 == mark_info.count()) {
|
||||
skip_mark = true;
|
||||
LOG_INFO("no block alloc/free, no need to mark blocks", K(ret), K(mark_info.count()));
|
||||
} else if (OB_FAIL(mark_macro_blocks(mark_info, macro_id_set, tmp_status))) {//mark
|
||||
|
@ -300,9 +300,11 @@ private:
|
||||
{
|
||||
public:
|
||||
GetPendingFreeBlockFunctor(
|
||||
const int64_t max_free_blk_cnt,
|
||||
MacroBlkIdMap &blk_map,
|
||||
int64_t &hold_count)
|
||||
: ret_code_(common::OB_SUCCESS),
|
||||
max_free_blk_cnt_(max_free_blk_cnt),
|
||||
blk_map_(blk_map),
|
||||
hold_count_(hold_count)
|
||||
{}
|
||||
@ -313,25 +315,26 @@ private:
|
||||
|
||||
private:
|
||||
int ret_code_;
|
||||
int64_t max_free_blk_cnt_;
|
||||
MacroBlkIdMap &blk_map_;
|
||||
int64_t &hold_count_;
|
||||
};
|
||||
|
||||
class CopyBlockToArrayFunctor final
|
||||
class DoBlockSweepFunctor final
|
||||
{
|
||||
public:
|
||||
CopyBlockToArrayFunctor(common::ObIArray<MacroBlockId> &block_ids)
|
||||
DoBlockSweepFunctor(ObBlockManager& block_manager)
|
||||
: ret_code_(common::OB_SUCCESS),
|
||||
block_ids_(block_ids)
|
||||
block_manager_(block_manager)
|
||||
{}
|
||||
~CopyBlockToArrayFunctor() = default;
|
||||
~DoBlockSweepFunctor() = default;
|
||||
|
||||
bool operator()(const MacroBlockId ¯o_id, const bool can_free);
|
||||
int get_ret_code() const { return ret_code_; }
|
||||
|
||||
private:
|
||||
int ret_code_;
|
||||
common::ObIArray<MacroBlockId> &block_ids_;
|
||||
ObBlockManager& block_manager_;
|
||||
};
|
||||
|
||||
private:
|
||||
@ -391,6 +394,7 @@ private:
|
||||
ObMacroBlockMarkerStatus &tmp_status);
|
||||
int set_group_id(const uint64_t tenant_id);
|
||||
int do_sweep(MacroBlkIdMap &mark_info);
|
||||
int sweep_one_block(const MacroBlockId& macro_id);
|
||||
|
||||
int update_mark_info(
|
||||
const common::ObIArray<MacroBlockId> ¯o_block_list,
|
||||
|
@ -170,7 +170,7 @@ TEST_F(TestBlockManager, test_mark_and_sweep)
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
int64_t safe_ts = ObTimeUtility::current_time();
|
||||
int64_t hold_cnt = 0;
|
||||
ObBlockManager::GetPendingFreeBlockFunctor functor(mark_info, hold_cnt);
|
||||
ObBlockManager::GetPendingFreeBlockFunctor functor(1000000, mark_info, hold_cnt);
|
||||
ret = OB_SERVER_BLOCK_MGR.block_map_.for_each(functor);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(blk_cnt - 1, mark_info.count());
|
||||
|
Loading…
x
Reference in New Issue
Block a user