[CP] should not push multi_version_start before reserved_snapshot_clog sync

This commit is contained in:
obdev 2024-02-06 15:51:55 +00:00 committed by ob-robot
parent dabdcef821
commit ff4b63ff0f
13 changed files with 98 additions and 74 deletions

View File

@ -325,9 +325,13 @@ TEST_F(TestMediumInfoReader, pure_mds_table)
ObTabletMediumInfoReader reader(*tablet);
ASSERT_EQ(OB_SUCCESS, reader.init(allocator));
int64_t medium_snapshot = 0;
ret = reader.get_min_medium_snapshot(medium_snapshot);
ret = reader.get_min_medium_snapshot(0, medium_snapshot);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(medium_snapshot, 1);
ret = reader.get_min_medium_snapshot(1, medium_snapshot);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(medium_snapshot, 2);
}
}
@ -494,7 +498,7 @@ TEST_F(TestMediumInfoReader, mds_table_dump_data_overlap)
ObTabletMediumInfoReader reader(*tablet);
ASSERT_EQ(OB_SUCCESS, reader.init(allocator));
int64_t medium_snapshot = 0;
ret = reader.get_min_medium_snapshot(medium_snapshot);
ret = reader.get_min_medium_snapshot(0, medium_snapshot);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(medium_snapshot, 1);
}
@ -620,7 +624,7 @@ TEST_F(TestMediumInfoReader, mds_table_dump_data_no_overlap)
ObTabletMediumInfoReader reader(*tablet);
ASSERT_EQ(OB_SUCCESS, reader.init(allocator));
int64_t medium_snapshot = 0;
ret = reader.get_min_medium_snapshot(medium_snapshot);
ret = reader.get_min_medium_snapshot(0, medium_snapshot);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(medium_snapshot, 1);
}

View File

@ -48,6 +48,22 @@ int ObSortColumnIdArray::build(
return ret;
}
int32_t ObSortColumnIdArray::get_func_from_map(
ObSortColumnIdArray &sort_array,
const int64_t column_id,
int64_t &input_array_idx)
{
int ret = OB_SUCCESS;
if (OB_FAIL(sort_array.map_.get_refactored(column_id, input_array_idx))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_ENTRY_NOT_EXIST;
} else {
LOG_WARN("failed to get column id from map", KR(ret), K(column_id));
}
}
return ret;
}
int ObSortColumnIdArray::build_hash_map(
const uint64_t tenant_id,
const ObIArray<ObColDesc> &column_descs)
@ -63,17 +79,25 @@ int ObSortColumnIdArray::build_hash_map(
}
} // end of for
if (OB_SUCC(ret)) {
get_func_ = [](ObSortColumnIdArray &sort_array, const int64_t column_id, int64_t &array_idx) -> int {
int ret = OB_SUCCESS;
if (OB_FAIL(sort_array.map_.get_refactored(column_id, array_idx))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_ENTRY_NOT_EXIST;
} else {
LOG_WARN("failed to get column id from map", KR(ret), K(column_id));
}
}
return ret;
};
get_func_ = get_func_from_map;
}
return ret;
}
int32_t ObSortColumnIdArray::get_func_from_array(
ObSortColumnIdArray &sort_array,
const int64_t column_id,
int64_t &input_array_idx)
{
int ret = OB_SUCCESS;
const int64_t array_idx =
std::lower_bound(sort_array.array_.begin(), sort_array.array_.end(),
ObColumnIdToIdx(column_id)) - sort_array.array_.begin();
if ((sort_array.array_.count() != array_idx) &&
(sort_array.array_[array_idx].column_id_ == column_id)) {
input_array_idx = sort_array.array_[array_idx].idx_;
} else {
ret = OB_ENTRY_NOT_EXIST;
}
return ret;
}
@ -97,20 +121,7 @@ int ObSortColumnIdArray::build_sort_array(
if (OB_SUCC(ret)) {
std::sort(array_.begin(), array_.end());
LOG_TRACE("success to sort array", KR(ret), K(array_));
get_func_ = [](ObSortColumnIdArray &sort_array, const int64_t column_id, int64_t &input_array_idx) -> int {
int ret = OB_SUCCESS;
const int64_t array_idx = std::lower_bound(
sort_array.array_.begin(),
sort_array.array_.end(),
ObColumnIdToIdx(column_id)) - sort_array.array_.begin();
if ((sort_array.array_.count() != array_idx)
&& (sort_array.array_[array_idx].column_id_ == column_id)) {
input_array_idx = sort_array.array_[array_idx].idx_;
} else {
ret = OB_ENTRY_NOT_EXIST;
}
return ret;
};
get_func_ = get_func_from_array;
}
}
return ret;

View File

@ -83,6 +83,8 @@ public:
private:
int build_hash_map(const uint64_t tenant_id, const ObIArray<share::schema::ObColDesc> &column_descs);
int build_sort_array(const ObIArray<share::schema::ObColDesc> &column_descs);
static int32_t get_func_from_array(ObSortColumnIdArray &sort_array, const int64_t column_id, int64_t &input_array_idx);
static int32_t get_func_from_map(ObSortColumnIdArray &sort_array, const int64_t column_id, int64_t &input_array_idx);
typedef int32_t (*GET_FUNC)(ObSortColumnIdArray&, const int64_t, int64_t&);
typedef hash::ObHashMap<int64_t, int64_t> ColIdToIdxMap;
typedef common::ObSEArray<ObColumnIdToIdx, share::ObTabletReplicaReportColumnMeta::DEFAULT_COLUMN_CNT> ColIdToIdxArray;

View File

@ -21,8 +21,7 @@ namespace compaction
ObCompactionScheduleIterator::ObCompactionScheduleIterator(
const bool is_major,
ObLSGetMod mod,
const int64_t batch_tablet_cnt)
ObLSGetMod mod)
: mod_(mod),
is_major_(is_major),
scan_finish_(false),
@ -31,7 +30,7 @@ ObCompactionScheduleIterator::ObCompactionScheduleIterator(
ls_idx_(-1),
tablet_idx_(0),
schedule_tablet_cnt_(0),
max_batch_tablet_cnt_(batch_tablet_cnt),
max_batch_tablet_cnt_(0),
ls_tablet_svr_(nullptr),
ls_ids_(),
tablet_ids_()

View File

@ -33,11 +33,9 @@ namespace compaction
class ObCompactionScheduleIterator
{
public:
static const int64_t SCHEDULE_TABLET_BATCH_CNT = 50 * 1000L; // 5w
ObCompactionScheduleIterator(
const bool is_major,
storage::ObLSGetMod mod = storage::ObLSGetMod::STORAGE_MOD,
const int64_t batch_tablet_cnt = SCHEDULE_TABLET_BATCH_CNT);
storage::ObLSGetMod mod = storage::ObLSGetMod::STORAGE_MOD);
~ObCompactionScheduleIterator() { reset(); }
int build_iter(const int64_t schedule_batch_size);
int get_next_ls(ObLSHandle &ls_handle);

View File

@ -68,8 +68,6 @@ int64_t ObBatchFinishCheckStat::to_string(char *buf, const int64_t buf_len) cons
/*
* ObTenantMediumChecker implement
* */
const int64_t ObTenantMediumChecker::MAX_BATCH_CHECK_NUM;
int ObTenantMediumChecker::mtl_init(ObTenantMediumChecker *&tablet_medium_checker)
{
return tablet_medium_checker->init();
@ -254,15 +252,17 @@ int ObTenantMediumChecker::check_medium_finish_schedule()
tablet_ls_set_.clear();
}
}
if (FAILEDx(batch_tablet_ls_infos.reserve(MAX_BATCH_CHECK_NUM))) {
LOG_WARN("fail to reserve array", K(ret), "size", MAX_BATCH_CHECK_NUM);
} else if (OB_FAIL(finish_tablet_ls_infos.reserve(MAX_BATCH_CHECK_NUM))) {
LOG_WARN("fail to reserve array", K(ret), "size", MAX_BATCH_CHECK_NUM);
const int64_t batch_size = MTL(ObTenantTabletScheduler *)->get_schedule_batch_size();
if (OB_FAIL(ret) || tablet_ls_infos.empty()) {
} else if (OB_FAIL(batch_tablet_ls_infos.reserve(batch_size))) {
LOG_WARN("fail to reserve array", K(ret), "size", batch_size);
} else if (OB_FAIL(finish_tablet_ls_infos.reserve(batch_size))) {
LOG_WARN("fail to reserve array", K(ret), "size", batch_size);
} else {
// batch check
int64_t info_count = tablet_ls_infos.count();
int64_t start_idx = 0;
int64_t end_idx = min(MAX_BATCH_CHECK_NUM, info_count);
int64_t end_idx = min(batch_size, info_count);
int64_t cost_ts = ObTimeUtility::fast_current_time();
ObBatchFinishCheckStat stat;
while (start_idx < end_idx) {
@ -272,7 +272,7 @@ int ObTenantMediumChecker::check_medium_finish_schedule()
LOG_INFO("success to batch check medium finish", K(start_idx), K(end_idx), K(info_count));
}
start_idx = end_idx;
end_idx = min(start_idx + MAX_BATCH_CHECK_NUM, info_count);
end_idx = min(start_idx + batch_size, info_count);
}
cost_ts = ObTimeUtility::fast_current_time() - cost_ts;
ADD_COMPACTION_EVENT(

View File

@ -216,11 +216,11 @@ ObTenantTabletScheduler::ObTenantTabletScheduler()
minor_ls_tablet_iter_(false/*is_major*/),
medium_ls_tablet_iter_(true/*is_major*/),
gc_sst_tablet_iter_(false/*is_major*/),
schedule_tablet_batch_size_(0),
error_tablet_cnt_(0),
loop_cnt_(0),
prohibit_medium_map_(),
timer_task_mgr_()
timer_task_mgr_(),
batch_size_mgr_()
{
STATIC_ASSERT(static_cast<int64_t>(NO_MAJOR_MERGE_TYPE_CNT) == ARRAYSIZEOF(MERGE_TYPES), "merge type array len is mismatch");
}
@ -259,7 +259,7 @@ int ObTenantTabletScheduler::init()
{
int ret = OB_SUCCESS;
int64_t schedule_interval = ObTenantTabletSchedulerTaskMgr::DEFAULT_COMPACTION_SCHEDULE_INTERVAL;
int64_t schedule_batch_size = ObCompactionScheduleIterator::SCHEDULE_TABLET_BATCH_CNT;
int64_t schedule_batch_size = ObScheduleBatchSizeMgr::DEFAULT_TABLET_BATCH_CNT;
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
@ -289,7 +289,7 @@ int ObTenantTabletScheduler::init()
LOG_WARN("Fail to create prohibit medium ls id map", K(ret));
} else {
timer_task_mgr_.set_scheduler_interval(schedule_interval);
schedule_tablet_batch_size_ = schedule_batch_size;
batch_size_mgr_.set_tablet_batch_size(schedule_batch_size);
is_inited_ = true;
}
return ret;
@ -328,7 +328,7 @@ int ObTenantTabletScheduler::reload_tenant_config()
LOG_INFO("cache min data version", "old_data_version", cached_data_version, "new_data_version", compat_version);
}
int64_t merge_schedule_interval = ObTenantTabletSchedulerTaskMgr::DEFAULT_COMPACTION_SCHEDULE_INTERVAL;
int64_t schedule_batch_size = ObCompactionScheduleIterator::SCHEDULE_TABLET_BATCH_CNT;
int64_t schedule_batch_size = ObScheduleBatchSizeMgr::DEFAULT_TABLET_BATCH_CNT;
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
@ -341,9 +341,8 @@ int ObTenantTabletScheduler::reload_tenant_config()
} // end of ObTenantConfigGuard
if (OB_FAIL(timer_task_mgr_.restart_scheduler_timer_task(merge_schedule_interval))) {
LOG_WARN("failed to restart scheduler timer", K(ret));
} else if (schedule_tablet_batch_size_ != schedule_batch_size) {
schedule_tablet_batch_size_ = schedule_batch_size;
LOG_INFO("succeeded to reload new merge schedule tablet batch cnt", K(schedule_tablet_batch_size_));
} else {
batch_size_mgr_.set_tablet_batch_size(schedule_batch_size);
}
}
return ret;
@ -394,7 +393,7 @@ int ObTenantTabletScheduler::update_upper_trans_version_and_gc_sstable()
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTenantTabletScheduler not init", K(ret));
} else if (OB_FAIL(gc_sst_tablet_iter_.build_iter(schedule_tablet_batch_size_))) {
} else if (OB_FAIL(gc_sst_tablet_iter_.build_iter(get_schedule_batch_size()))) {
LOG_WARN("failed to init iterator", K(ret));
}
@ -429,7 +428,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_minor()
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("The ObTenantTabletScheduler has not been inited", K(ret));
} else if (OB_FAIL(minor_ls_tablet_iter_.build_iter(schedule_tablet_batch_size_))) {
} else if (OB_FAIL(minor_ls_tablet_iter_.build_iter(get_schedule_batch_size()))) {
LOG_WARN("failed to init iterator", K(ret));
} else {
LOG_INFO("start schedule all tablet minor merge", K(minor_ls_tablet_iter_));
@ -1641,7 +1640,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
LOG_WARN("failed to add suspect info", K(tmp_ret));
}
}
} else if (OB_FAIL(medium_ls_tablet_iter_.build_iter(schedule_tablet_batch_size_))) {
} else if (OB_FAIL(medium_ls_tablet_iter_.build_iter(get_schedule_batch_size()))) {
LOG_WARN("failed to init ls iterator", K(ret));
} else {
bool all_ls_weak_read_ts_ready = true;

View File

@ -23,6 +23,7 @@
#include "lib/hash/ob_hashset.h"
#include "storage/compaction/ob_tenant_tablet_scheduler_task_mgr.h"
#include "storage/compaction/ob_compaction_schedule_iterator.h"
#include "share/compaction/ob_schedule_batch_size_mgr.h"
namespace oceanbase
{
@ -277,6 +278,7 @@ public:
int schedule_next_round_for_leader(
const ObIArray<compaction::ObTabletCheckInfo> &tablet_ls_infos,
const ObIArray<compaction::ObTabletCheckInfo> &finish_tablet_ls_infos);
OB_INLINE int64_t get_schedule_batch_size() const { return batch_size_mgr_.get_schedule_batch_size(); }
private:
friend struct ObTenantTabletSchedulerTaskMgr;
int schedule_next_medium_for_leader(
@ -363,11 +365,11 @@ private:
ObCompactionScheduleIterator minor_ls_tablet_iter_;
ObCompactionScheduleIterator medium_ls_tablet_iter_;
ObCompactionScheduleIterator gc_sst_tablet_iter_;
int64_t schedule_tablet_batch_size_;
int64_t error_tablet_cnt_; // for diagnose
int64_t loop_cnt_;
ObProhibitScheduleMediumMap prohibit_medium_map_;
ObTenantTabletSchedulerTaskMgr timer_task_mgr_;
ObScheduleBatchSizeMgr batch_size_mgr_;
};
} // namespace compaction

View File

@ -166,8 +166,8 @@ int ObLSReservedSnapshotMgr::update_min_reserved_snapshot_for_leader(const int64
if (0 == dependent_tablet_set_.size()) {
if (new_snapshot_version < min_reserved_snapshot_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("failed to update min reserved snapshot", K(ret), K(new_snapshot_version),
K(min_reserved_snapshot_));
LOG_WARN("failed to update min reserved snapshot", K(ret), "ls_id", ls_->get_ls_id(),
K(new_snapshot_version), K(min_reserved_snapshot_));
} else if (new_snapshot_version > min_reserved_snapshot_) {
// update min_reserved_snapshot and send clog
min_reserved_snapshot_ = new_snapshot_version;
@ -204,7 +204,7 @@ int ObLSReservedSnapshotMgr::try_sync_reserved_snapshot(
LOG_WARN("invalid argument", K(ret), K(new_reserved_snapshot));
} else if (update_flag) {
if (OB_FAIL(update_min_reserved_snapshot_for_leader(new_reserved_snapshot))) {
LOG_WARN("failed to update min_reserved_snapshot", K(ret), K(new_reserved_snapshot));
LOG_WARN("failed to update min_reserved_snapshot", K(ret), "ls_id", ls_->get_ls_id(), K(new_reserved_snapshot));
}
} else if (OB_FAIL(sync_clog(new_reserved_snapshot))) {
LOG_WARN("failed to send update reserved snapshot log", K(ret), K(new_reserved_snapshot));
@ -226,7 +226,7 @@ int ObLSReservedSnapshotMgr::sync_clog(const int64_t new_reserved_snapshot)
} else {
ObMutexGuard guard(sync_clog_lock_);
if (OB_FAIL(try_update_for_leader(new_reserved_snapshot, nullptr/*allocator*/))) {
LOG_WARN("failed to send update reserved snapshot log", K(ret), K(new_reserved_snapshot));
LOG_WARN("failed to send update reserved snapshot log", K(ret), "ls_id", ls_->get_ls_id(), K(new_reserved_snapshot));
}
}
return ret;

View File

@ -4432,21 +4432,25 @@ int ObTablet::get_kept_snapshot_info(
ObTabletMediumInfoReader medium_info_reader(*this);
if (OB_FAIL(medium_info_reader.init(arena_allocator))) {
LOG_WARN("failed to init medium info reader", K(ret));
} else if (OB_FAIL(medium_info_reader.get_min_medium_snapshot(min_medium_snapshot))) {
} else if (OB_FAIL(medium_info_reader.get_min_medium_snapshot(max_merged_snapshot, min_medium_snapshot))) {
LOG_WARN("failed to get min medium snapshot", K(ret), K(tablet_id));
}
}
// for compat, if receive ls_reserved_snapshot clog, should consider ls.get_min_reserved_snapshot()
if (min_reserved_snapshot_on_ls > 0) {
ls_min_reserved_snapshot = min_reserved_snapshot_on_ls;
}
if (OB_SUCC(ret)) {
bool use_multi_version_start_on_tablet = false;
const int64_t old_min_reserved_snapshot = min_reserved_snapshot;
snapshot_info.update_by_smaller_snapshot(ObStorageSnapshotInfo::SNAPSHOT_FOR_LS_RESERVED, ls_min_reserved_snapshot);
snapshot_info.update_by_smaller_snapshot(ObStorageSnapshotInfo::SNAPSHOT_FOR_MIN_MEDIUM, min_medium_snapshot);
if (snapshot_info.snapshot_ < get_multi_version_start()) {
// tablet already had large multi_version_start
if (min_reserved_snapshot_on_ls > 0) {
snapshot_info.update_by_smaller_snapshot(ObStorageSnapshotInfo::SNAPSHOT_FOR_LS_RESERVED, ls_min_reserved_snapshot);
snapshot_info.update_by_smaller_snapshot(ObStorageSnapshotInfo::SNAPSHOT_FOR_MIN_MEDIUM, min_medium_snapshot);
if (snapshot_info.snapshot_ < get_multi_version_start()) {
use_multi_version_start_on_tablet = true;
}
} else {
// if not sync ls_reserved_snapshot yet, should use multi_version_start on tablet
use_multi_version_start_on_tablet = true;
}
if (use_multi_version_start_on_tablet) {
snapshot_info.snapshot_type_ = ObStorageSnapshotInfo::SNAPSHOT_MULTI_VERSION_START_ON_TABLET;
snapshot_info.snapshot_ = get_multi_version_start();
}
@ -4467,7 +4471,7 @@ int ObTablet::get_kept_snapshot_info(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("snapshot info is invalid", KR(ret), K(snapshot_info));
}
LOG_DEBUG("get multi version start", "ls_id", get_tablet_meta().ls_id_, K(tablet_id),
LOG_TRACE("get multi version start", "ls_id", get_tablet_meta().ls_id_, K(tablet_id),
K(snapshot_info), K(min_reserved_snapshot), K(get_tablet_meta()),
K(min_medium_snapshot), K(min_reserved_snapshot_on_ls), K(max_merged_snapshot));
return ret;

View File

@ -210,7 +210,9 @@ int ObTabletMediumInfoReader::get_specified_medium_info(
}
// temp solution, TODO(@xianzhi)
int ObTabletMediumInfoReader::get_min_medium_snapshot(int64_t &min_medium_snapshot)
int ObTabletMediumInfoReader::get_min_medium_snapshot(
const int64_t last_major_snapshot_version,
int64_t &min_medium_snapshot)
{
int ret = OB_SUCCESS;
ObArenaAllocator tmp_allocator;
@ -225,7 +227,7 @@ int ObTabletMediumInfoReader::get_min_medium_snapshot(int64_t &min_medium_snapsh
} else {
LOG_WARN("failed to get medium info", K(ret));
}
} else {
} else if (tmp_key.get_medium_snapshot() > last_major_snapshot_version) {
min_medium_snapshot = tmp_key.get_medium_snapshot();
break;
}

View File

@ -42,7 +42,9 @@ public:
const compaction::ObMediumCompactionInfoKey &key,
compaction::ObMediumCompactionInfo &medium_info);
int get_min_medium_snapshot(int64_t &min_medium_snapshot);
int get_min_medium_snapshot(
const int64_t last_major_snapshot_version,
int64_t &min_medium_snapshot);
int get_max_medium_snapshot(int64_t &max_medium_snapshot);
private:
int advance_mds_iter();

View File

@ -35,12 +35,13 @@ public:
MockObCompactionScheduleIterator(const int64_t batch_tablet_cnt)
: ObCompactionScheduleIterator(
true/*is_major, no meaning*/,
ObLSGetMod::STORAGE_MOD,
batch_tablet_cnt),
ObLSGetMod::STORAGE_MOD),
mock_tablet_id_cnt_(0),
error_tablet_idx_(-1),
errno_(OB_SUCCESS)
{}
{
max_batch_tablet_cnt_ = batch_tablet_cnt;
}
virtual int get_cur_ls_handle(ObLSHandle &ls_handle) override
{
return OB_SUCCESS;