Modify marker status.

This commit is contained in:
ND501 2024-09-18 09:17:00 +00:00 committed by ob-robot
parent b331e767c7
commit a010054307
6 changed files with 144 additions and 90 deletions

View File

@ -170,7 +170,7 @@ int ObAllVirtualMacroBlockMarkerStatus::inner_get_next_row(common::ObNewRow *&ro
}
case OB_APP_MIN_COLUMN_ID + 18: {
// whether finished marking
cur_row_.cells_[i].set_bool(true);
cur_row_.cells_[i].set_bool(marker_status_.mark_finished_);
break;
}
case OB_APP_MIN_COLUMN_ID + 19: {

View File

@ -808,29 +808,30 @@ int ObBlockManager::get_marker_status(ObMacroBlockMarkerStatus &status) {
return ret;
}
void ObBlockManager::update_marker_status(
const ObMacroBlockMarkerStatus &tmp_status) {
void ObBlockManager::update_marker_status(const ObMacroBlockMarkerStatus &tmp_status)
{
SpinWLockGuard guard(marker_lock_);
marker_status_.reset();
marker_status_.total_block_count_ =
OB_STORAGE_OBJECT_MGR.get_total_macro_block_count();
marker_status_.reserved_block_count_ =
io_device_->get_reserved_block_count() + tmp_status.reserved_block_count_;
marker_status_.total_block_count_ = OB_STORAGE_OBJECT_MGR.get_total_macro_block_count();
marker_status_.reserved_block_count_ = io_device_->get_reserved_block_count() + tmp_status.reserved_block_count_;
marker_status_.free_count_ = get_free_macro_block_count();
marker_status_.hold_count_ = tmp_status.hold_count_;
marker_status_.mark_cost_time_ = tmp_status.mark_cost_time_;
marker_status_.sweep_cost_time_ = tmp_status.sweep_cost_time_;
marker_status_.start_time_ = tmp_status.start_time_;
marker_status_.last_end_time_ = tmp_status.last_end_time_;
marker_status_.linked_block_count_ = tmp_status.linked_block_count_;
marker_status_.index_block_count_ = tmp_status.index_block_count_;
marker_status_.ids_block_count_ = tmp_status.ids_block_count_;
marker_status_.tmp_file_count_ = tmp_status.tmp_file_count_;
marker_status_.data_block_count_ = tmp_status.data_block_count_;
marker_status_.shared_data_block_count_ = tmp_status.shared_data_block_count_;
marker_status_.pending_free_count_ = tmp_status.pending_free_count_;
marker_status_.shared_meta_block_count_ = tmp_status.shared_meta_block_count_;
marker_status_.hold_info_ = tmp_status.hold_info_;
marker_status_.mark_finished_ = tmp_status.mark_finished_;
if (tmp_status.mark_finished_) {
marker_status_.last_end_time_ = tmp_status.last_end_time_;
marker_status_.linked_block_count_ = tmp_status.linked_block_count_;
marker_status_.index_block_count_ = tmp_status.index_block_count_;
marker_status_.ids_block_count_ = tmp_status.ids_block_count_;
marker_status_.tmp_file_count_ = tmp_status.tmp_file_count_;
marker_status_.data_block_count_ = tmp_status.data_block_count_;
marker_status_.shared_data_block_count_ = tmp_status.shared_data_block_count_;
marker_status_.pending_free_count_ = tmp_status.pending_free_count_;
marker_status_.shared_meta_block_count_ = tmp_status.shared_meta_block_count_;
marker_status_.hold_info_ = tmp_status.hold_info_;
}
}
bool ObBlockManager::GetOldestHoldBlockFunctor::operator()(
@ -949,17 +950,15 @@ int ObBlockManager::sweep_one_block(const MacroBlockId &macro_id) {
return ret;
}
void ObBlockManager::mark_and_sweep() {
void ObBlockManager::mark_and_sweep()
{
int ret = OB_SUCCESS;
ObHashSet<MacroBlockId, NoPthreadDefendMode> macro_id_set;
MacroBlkIdMap mark_info;
ObMacroBlockMarkerStatus tmp_status;
bool skip_mark = false;
// we must assign alloc_num_ before mark_macro_blocks, because it will be set
// to 0 in this func
// we must assign alloc_num_ before mark_macro_blocks, because it will be set to 0 in this func
int64_t alloc_num = 0;
// recycle maximum 400 GB space, but no more than 8MB memory consumption for
// mark_info
// recycle maximum 400 GB space, but no more than 8MB memory consumption for mark_info
const int64_t MAX_FREE_BLOCK_COUNT_PER_ROUND = 200000;
if (IS_NOT_INIT) {
@ -972,11 +971,11 @@ void ObBlockManager::mark_and_sweep() {
LOG_WARN("slog replay hasn't finished, this task can't start", K(ret));
}
} else {
if (OB_FAIL(mark_info.init(ObModIds::OB_STORAGE_FILE_BLOCK_REF,
OB_SERVER_TENANT_ID))) {
if (OB_FAIL(mark_info.init(ObModIds::OB_STORAGE_FILE_BLOCK_REF, OB_SERVER_TENANT_ID))) {
LOG_WARN("fail to init mark info, ", K(ret));
} else if (OB_FAIL(macro_id_set.create(MAX(2, block_map_.get_bkt_cnt()),
"BlkIdSetBkt", "BlkIdSetNode",
"BlkIdSetBkt",
"BlkIdSetNode",
OB_SERVER_TENANT_ID))) {
LOG_WARN("fail to create macro id set", K(ret));
} else {
@ -987,62 +986,47 @@ void ObBlockManager::mark_and_sweep() {
ret = pending_free_functor.get_ret_code();
LOG_WARN("fail to get pending free blocks", K(ret));
} else if ((mark_info.count() < MAX_FREE_BLOCK_COUNT_PER_ROUND)) {
// Only try to set alloc_num_ to 0 when macro info is complete, else do
// mark and sweep again.
// Only try to set alloc_num_ to 0 when macro info is complete, else do mark and sweep again.
if (0 != (alloc_num = ATOMIC_SET(&alloc_num_, 0))) {
// Some one alloc block after GetPendingFreeBlockFunctor concurrently.
// let mark and sweep do again next round whatever mark_info is empty
// or not
// Some one alloc block after GetPendingFreeBlockFunctor concurrently. let mark and sweep do again next round
// whatever mark_info is empty or not
ATOMIC_SET(&alloc_num_, alloc_num);
}
}
if (OB_FAIL(ret)) {
} else if (0 == mark_info.count()) {
skip_mark = true;
LOG_INFO("no block alloc/free, no need to mark blocks", K(ret),
K(mark_info.count()));
} else if (OB_FAIL(
mark_macro_blocks(mark_info, macro_id_set, tmp_status))) {
} else if (OB_FAIL(mark_macro_blocks(mark_info, macro_id_set, tmp_status))) {
if (OB_ALLOCATE_MEMORY_FAILED == ret) {
LOG_INFO("mark blocks meet memory issue, still countinue sweep to "
"lease compaction space");
LOG_INFO("mark blocks meet memory issue, still countinue sweep to lease compaction space");
ret = OB_SUCCESS;
skip_mark = true;
} else {
LOG_WARN("fail to mark macro blocks", K(ret));
}
}
if (OB_FAIL(ret)) {
ATOMIC_FAA(&alloc_num_,
alloc_num); // add alloc_num back to trigger next round mark
ATOMIC_FAA(&alloc_num_, alloc_num); // add alloc_num back to trigger next round mark
} else {
tmp_status.pending_free_count_ += mark_info.count();
tmp_status.mark_cost_time_ =
ObTimeUtility::fast_current_time() - tmp_status.start_time_;
tmp_status.mark_cost_time_ = ObTimeUtility::fast_current_time() - tmp_status.start_time_;
// sweep
SpinWLockGuard guard(sweep_lock_);
if (OB_FAIL(do_sweep(mark_info))) {
LOG_WARN("do sweep fail", K(ret));
} else if (!skip_mark) {
} else if (tmp_status.mark_finished_) {
tmp_status.last_end_time_ = ObTimeUtility::fast_current_time();
tmp_status.sweep_cost_time_ = tmp_status.last_end_time_ -
tmp_status.start_time_ -
tmp_status.mark_cost_time_;
tmp_status.sweep_cost_time_ = tmp_status.last_end_time_ - tmp_status.start_time_ - tmp_status.mark_cost_time_;
GetOldestHoldBlockFunctor hold_info_functor(macro_id_set,
tmp_status.hold_info_);
GetOldestHoldBlockFunctor hold_info_functor(macro_id_set, tmp_status.hold_info_);
if (OB_FAIL(block_map_.for_each(hold_info_functor))) {
ret = hold_info_functor.get_ret_code();
LOG_WARN("fail to get oldest hold block", K(ret));
} else {
update_marker_status(tmp_status);
}
} else {
update_partial_status(tmp_status);
}
}
if (OB_SUCC(ret)) {
update_marker_status(tmp_status);
}
FLOG_INFO("finish once mark and sweep", K(ret), K(alloc_num),
K(mark_info.count()), K_(marker_status), "map_cnt",
block_map_.count());
@ -1051,23 +1035,11 @@ void ObBlockManager::mark_and_sweep() {
macro_id_set.destroy();
}
void ObBlockManager::update_partial_status(
const ObMacroBlockMarkerStatus &tmp_status) {
SpinWLockGuard guard(marker_lock_);
marker_status_.pending_free_count_ = tmp_status.pending_free_count_;
marker_status_.last_end_time_ = ObTimeUtility::fast_current_time();
marker_status_.mark_cost_time_ = tmp_status.mark_cost_time_;
marker_status_.sweep_cost_time_ = 0;
marker_status_.start_time_ = tmp_status.start_time_;
marker_status_.hold_count_ = tmp_status.hold_count_;
marker_status_.free_count_ = get_free_macro_block_count();
}
int ObBlockManager::mark_macro_blocks(
MacroBlkIdMap &mark_info,
common::hash::ObHashSet<MacroBlockId, common::hash::NoPthreadDefendMode>
&macro_id_set,
ObMacroBlockMarkerStatus &tmp_status) {
common::hash::ObHashSet<MacroBlockId, common::hash::NoPthreadDefendMode> &macro_id_set,
ObMacroBlockMarkerStatus &tmp_status)
{
int ret = OB_SUCCESS;
omt::ObMultiTenant *omt = GCTX.omt_;
common::ObSEArray<uint64_t, 8> mtl_tenant_ids;
@ -1075,39 +1047,37 @@ int ObBlockManager::mark_macro_blocks(
if (OB_ISNULL(omt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, omt is nullptr", K(ret), KP(omt));
} else if (OB_FAIL(
mark_tmp_file_blocks(mark_info, macro_id_set, tmp_status))) {
} else if (0 == mark_info.count()) {
tmp_status.mark_finished_ = false;
LOG_INFO("no block alloc/free, no need to mark blocks", K(ret), K(mark_info.count()));
} else if (OB_FAIL(mark_tmp_file_blocks(mark_info, macro_id_set, tmp_status))) {
LOG_WARN("fail to mark tmp file blocks", K(ret));
} else if (OB_FAIL(mark_server_meta_blocks(mark_info, macro_id_set,
tmp_status))) {
} else if (OB_FAIL(mark_server_meta_blocks(mark_info, macro_id_set, tmp_status))) {
LOG_WARN("fail to mark server meta blocks", K(ret));
} else {
omt->get_mtl_tenant_ids(mtl_tenant_ids);
for (int64_t i = 0; OB_SUCC(ret) && i < mtl_tenant_ids.count(); i++) {
const uint64_t tenant_id = mtl_tenant_ids.at(i);
MacroBlockId macro_id;
MTL_SWITCH(tenant_id) {
MTL_SWITCH(tenant_id)
{
CONSUMER_GROUP_FUNC_GUARD(ObFunctionType::PRIO_GC_MACRO_BLOCK);
if (OB_FAIL(mark_tenant_blocks(mark_info, macro_id_set,
tmp_status))) {
if (OB_FAIL(mark_tenant_blocks(mark_info, macro_id_set, tmp_status))) {
LOG_WARN("fail to mark tenant blocks", K(ret), K(tenant_id));
} else if (OB_FALSE_IT(MTL(ObSharedMacroBlockMgr *)
->get_cur_shared_block(macro_id))) {
} else if (OB_FAIL(mark_held_shared_block(macro_id, mark_info,
macro_id_set, tmp_status))) {
LOG_WARN(
"fail to mark shared block held by shared_macro_block_manager",
K(ret), K(macro_id));
} else if (OB_FALSE_IT(MTL(ObSharedMacroBlockMgr *)->get_cur_shared_block(macro_id))) {
} else if (OB_FAIL(mark_held_shared_block(macro_id, mark_info, macro_id_set, tmp_status))) {
LOG_WARN("fail to mark shared block held by shared_macro_block_manager", K(ret), K(macro_id));
} else if (OB_FALSE_IT(MTL(ObTenantStorageMetaService *)
->get_shared_object_reader_writer()
.get_cur_shared_block(macro_id))) {
} else if (OB_FAIL(mark_held_shared_block(macro_id, mark_info,
macro_id_set, tmp_status))) {
LOG_WARN("fail to mark shared block held by shared_reader_writer",
K(ret), K(macro_id));
} else if (OB_FAIL(mark_held_shared_block(macro_id, mark_info, macro_id_set, tmp_status))) {
LOG_WARN("fail to mark shared block held by shared_reader_writer", K(ret), K(macro_id));
}
}
}
if (OB_SUCC(ret)) {
tmp_status.mark_finished_ = true;
}
}
return ret;
}

View File

@ -345,7 +345,6 @@ private:
};
private:
void update_partial_status(const ObMacroBlockMarkerStatus &tmp_status);
int get_macro_block_info(const MacroBlockId &macro_id,
ObMacroBlockInfo &macro_block_info,
ObMacroBlockHandle &macro_block_handle);

View File

@ -515,7 +515,8 @@ ObMacroBlockMarkerStatus::ObMacroBlockMarkerStatus()
sweep_cost_time_(0),
start_time_(0),
last_end_time_(0),
hold_info_()
hold_info_(),
mark_finished_(false)
{
}
@ -562,6 +563,7 @@ void ObMacroBlockMarkerStatus::reuse()
start_time_ = 0;
last_end_time_ = 0;
hold_info_.reset();
mark_finished_ = false;
}
ObRecordHeaderV3::ObRecordHeaderV3()

View File

@ -971,7 +971,8 @@ public:
K_(sweep_cost_time),
KTIME_(start_time),
KTIME_(last_end_time),
K_(hold_info));
K_(hold_info),
K_(mark_finished));
public:
int64_t total_block_count_;
int64_t reserved_block_count_;
@ -992,6 +993,7 @@ public:
int64_t start_time_;
int64_t last_end_time_;
ObSimpleMacroBlockInfo hold_info_;
bool mark_finished_;
};
/****************************** following codes are inline functions ****************************/

View File

@ -14,6 +14,7 @@
#include <sys/statvfs.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <iostream>
#define USING_LOG_PREFIX STORAGE
@ -42,6 +43,13 @@ public:
TestBlockManager();
virtual ~TestBlockManager() = default;
virtual void SetUp() override;
private:
int init_multi_tenant();
private:
common::ObAddr addr_;
omt::ObMultiTenant multi_tenant_;
};
TestBlockManager::TestBlockManager()
@ -49,10 +57,25 @@ TestBlockManager::TestBlockManager()
{
}
int TestBlockManager::init_multi_tenant()
{
int ret = OB_SUCCESS;
GCONF.cpu_count = 6;
if (OB_SUCCESS != (ret = multi_tenant_.init(addr_))) {
STORAGE_LOG(WARN, "init multi_tenant failed", K(ret));
} else {
multi_tenant_.start();
GCTX.omt_ = &multi_tenant_;
}
return ret;
}
void TestBlockManager::SetUp()
{
TestDataFilePrepare::SetUp();
ASSERT_EQ(OB_SUCCESS, init_multi_tenant());
OB_SERVER_BLOCK_MGR.block_map_.reset();
SERVER_STORAGE_META_SERVICE.is_started_ = true;
}
TEST_F(TestBlockManager, test_inc_and_dec_ref_cnt)
@ -182,6 +205,64 @@ TEST_F(TestBlockManager, test_mark_and_sweep)
common::ObClockGenerator::destroy();
}
TEST_F(TestBlockManager, test_mark_and_sweep_skip_mark)
{
int ret = OB_SUCCESS;
ObBlockManager::BlockInfo block_info;
ObMacroBlockHandle macro_handle;
ASSERT_EQ(0, OB_SERVER_BLOCK_MGR.block_map_.count());
const int64_t bucket_num = 1024;
const int64_t max_cache_size = 1024 * 1024 * 1024;
const int64_t block_size = common::OB_MALLOC_BIG_BLOCK_SIZE;
ret = ObKVGlobalCache::get_instance().init(&getter, bucket_num, max_cache_size, block_size);
if (OB_INIT_TWICE == ret) {
ret = common::OB_SUCCESS;
} else {
ASSERT_EQ(common::OB_SUCCESS, ret);
}
ASSERT_EQ(common::OB_SUCCESS, ret);
ASSERT_EQ(0, OB_SERVER_BLOCK_MGR.block_map_.count());
ObBlockManager::MacroBlkIdMap mark_info;
ret = mark_info.init(ObModIds::OB_STORAGE_FILE_BLOCK_REF, OB_SERVER_TENANT_ID);
ASSERT_EQ(OB_SUCCESS, ret);
common::hash::ObHashSet<MacroBlockId, common::hash::NoPthreadDefendMode> macro_id_set;
ret = macro_id_set.create(MAX(2, OB_SERVER_BLOCK_MGR.block_map_.count()));
ASSERT_EQ(OB_SUCCESS, ret);
int64_t safe_ts = ObTimeUtility::current_time();
int64_t hold_cnt = 0;
ObBlockManager::GetPendingFreeBlockFunctor functor(1000000, mark_info, hold_cnt);
ret = OB_SERVER_BLOCK_MGR.block_map_.for_each(functor);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(0, mark_info.count());
// first mark_and_sweep, should update start_time_
OB_SERVER_BLOCK_MGR.mark_and_sweep();
ASSERT_EQ(false, OB_SERVER_BLOCK_MGR.marker_status_.mark_finished_);
int64_t first_start_time = OB_SERVER_BLOCK_MGR.marker_status_.start_time_;
int64_t first_last_end_time = OB_SERVER_BLOCK_MGR.marker_status_.last_end_time_;
std::cout << ObTimeUtility::fast_current_time() << std::endl;
sleep(1);
std::cout << ObTimeUtility::fast_current_time() << std::endl;
// second mark_and_sweep, should update start_time_, should not update last_end_time_
OB_SERVER_BLOCK_MGR.mark_and_sweep();
ASSERT_EQ(false, OB_SERVER_BLOCK_MGR.marker_status_.mark_finished_);
ASSERT_NE(first_start_time, OB_SERVER_BLOCK_MGR.marker_status_.start_time_);
ASSERT_EQ(first_last_end_time, OB_SERVER_BLOCK_MGR.marker_status_.last_end_time_);
macro_handle.reset();
// FILE_MANAGER_INSTANCE_V2.destroy();
ObKVGlobalCache::get_instance().destroy();
}
TEST_F(TestBlockManager, test_ref_cnt_wash_and_load)
{
int ret = OB_SUCCESS;