patch refactor compaction iter & del useless code
This commit is contained in:
@ -1121,50 +1121,40 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
||||
} else {
|
||||
ObITable *last_major = table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true/*last*/);
|
||||
if (MTL(ObTenantTabletScheduler *)->could_major_merge_start() && OB_NOT_NULL(last_major)) {
|
||||
ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info
|
||||
const int64_t last_major_snapshot = last_major->get_snapshot_version();
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
const ObLSID &ls_id = ls.get_ls_id();
|
||||
const int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot;
|
||||
ObMediumCompactionInfo::ObCompactionType compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX;
|
||||
int64_t schedule_scn = 0;
|
||||
{
|
||||
storage::ObTabletMediumInfoReader reader(tablet);
|
||||
ObMediumCompactionInfoKey key;
|
||||
ObMediumCompactionInfo medium_info;
|
||||
if (OB_FAIL(reader.init(temp_allocator))) {
|
||||
LOG_WARN("failed to init medium info reader", K(ret));
|
||||
} else {
|
||||
LOG_DEBUG("schedule tablet medium merge", K(ret), K(schedule_scn), K(major_frozen_snapshot), K(ls_id), K(tablet_id));
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(reader.get_next_medium_info(temp_allocator, key, medium_info))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("failed to get medium info", K(ret));
|
||||
}
|
||||
} else if (key.get_medium_snapshot() <= last_major_snapshot) {
|
||||
// finished, this medium info could recycle
|
||||
} else {
|
||||
if (medium_info.is_medium_compaction() || medium_info.medium_snapshot_ <= major_frozen_snapshot) {
|
||||
schedule_scn = medium_info.medium_snapshot_;
|
||||
compaction_type = (ObMediumCompactionInfo::ObCompactionType)medium_info.compaction_type_;
|
||||
LOG_TRACE("set schedule scn and compaction type", K(ret), K(ls_id), K(tablet_id),
|
||||
K(schedule_scn), K(compaction_type), K(major_frozen_snapshot));
|
||||
}
|
||||
break; // break when met one undo medium info
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
LOG_DEBUG("iter read medium info", K(ret), K(ls_id), K(tablet_id), K(medium_info));
|
||||
}
|
||||
medium_info.reset();
|
||||
} // end of while
|
||||
ObArenaAllocator temp_allocator("GetMediumInfo", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); // for load medium info
|
||||
const ObMediumCompactionInfoList *medium_list = nullptr;
|
||||
bool schedule_flag = false;
|
||||
const int64_t inner_table_merged_version = MTL(ObTenantTabletScheduler *)->get_inner_table_merged_scn();
|
||||
|
||||
if (OB_FAIL(tablet.read_medium_info_list(temp_allocator, medium_list))) {
|
||||
LOG_WARN("failed to load medium info list", K(ret), K(tablet));
|
||||
} else if (ObMediumCompactionInfo::MAJOR_COMPACTION == medium_list->get_last_compaction_type()
|
||||
&& inner_table_merged_version < medium_list->get_last_compaction_scn()
|
||||
&& !MTL_IS_PRIMARY_TENANT()) { // for STANDBY/RESTORE TENANT
|
||||
ObTabletCompactionScnInfo ret_info;
|
||||
// for standby/restore tenant, need select inner_table to check RS status before schedule new round
|
||||
if (!scheduler_called) { // should not visit inner table, wait for scheduler loop
|
||||
} else if (OB_FAIL(get_status_from_inner_table(ls_id, tablet_id, ret_info))) {
|
||||
LOG_WARN("failed to get status from inner tablet", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (ret_info.could_schedule_next_round(medium_list->get_last_compaction_scn())) {
|
||||
LOG_INFO("success to check RS major checksum validation finished", K(ret), K(ls_id), K(tablet_id));
|
||||
schedule_flag = true;
|
||||
}
|
||||
} else {
|
||||
schedule_flag = true;
|
||||
}
|
||||
if (OB_FAIL(ret) || !schedule_flag) {
|
||||
} else {
|
||||
const int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot;
|
||||
ObMediumCompactionInfo::ObCompactionType compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX;
|
||||
int64_t schedule_scn = 0;
|
||||
if (OB_FAIL(read_medium_info_from_list(*medium_list, last_major->get_snapshot_version(),
|
||||
major_frozen_snapshot, compaction_type, schedule_scn))) {
|
||||
} else if (schedule_scn > 0 && OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, compaction_type))) {
|
||||
LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn));
|
||||
}
|
||||
}// to make reader destruct to release mds unit lock
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (schedule_scn > 0 && OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, compaction_type))) {
|
||||
LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1172,6 +1162,29 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::read_medium_info_from_list(
|
||||
const ObMediumCompactionInfoList &medium_list,
|
||||
const int64_t last_major_snapshot,
|
||||
const int64_t major_frozen_snapshot,
|
||||
ObMediumCompactionInfo::ObCompactionType &compaction_type,
|
||||
int64_t &schedule_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
DLIST_FOREACH_X(info, medium_list.get_list(), OB_SUCC(ret)) {
|
||||
if (info->medium_snapshot_ <= last_major_snapshot) {
|
||||
// finished, this medium info could recycle
|
||||
} else {
|
||||
if (info->is_medium_compaction()
|
||||
|| info->medium_snapshot_ <= major_frozen_snapshot) {
|
||||
schedule_scn = info->medium_snapshot_;
|
||||
compaction_type = (ObMediumCompactionInfo::ObCompactionType)info->compaction_type_;
|
||||
}
|
||||
break; // found one unfinish medium info, loop end
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::get_palf_role(const ObLSID &ls_id, ObRole &role)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -41,6 +41,12 @@ public:
|
||||
ObTablet &tablet,
|
||||
const int64_t major_frozen_scn = 0,
|
||||
const bool scheduler_called = false);
|
||||
static int read_medium_info_from_list(
|
||||
const ObMediumCompactionInfoList &medium_list,
|
||||
const int64_t major_frozen_snapshot,
|
||||
const int64_t last_major_snapshot,
|
||||
ObMediumCompactionInfo::ObCompactionType &compaction_type,
|
||||
int64_t &schedule_scn);
|
||||
static int get_palf_role(const share::ObLSID &ls_id, ObRole &role);
|
||||
static int get_table_schema_to_merge(
|
||||
ObMultiVersionSchemaService &schema_service,
|
||||
|
@ -475,18 +475,17 @@ int ObTenantTabletScheduler::schedule_all_tablets_minor()
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
int64_t schedule_tablet_cnt = 0;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("The ObTenantTabletScheduler has not been inited", K(ret));
|
||||
} else if (!minor_ls_tablet_iter_.is_valid() && OB_FAIL(minor_ls_tablet_iter_.build_iter())) {
|
||||
} else if (OB_FAIL(minor_ls_tablet_iter_.build_iter())) {
|
||||
LOG_WARN("failed to init iterator", K(ret));
|
||||
} else {
|
||||
LOG_INFO("start schedule all tablet minor merge", K(minor_ls_tablet_iter_));
|
||||
}
|
||||
|
||||
while (OB_SUCC(ret) && schedule_tablet_cnt < SCHEDULE_TABLET_BATCH_CNT) {
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(minor_ls_tablet_iter_.get_next_ls(ls_handle))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
@ -499,7 +498,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_minor()
|
||||
LOG_WARN("ls is null", K(ret), K(ls));
|
||||
} else {
|
||||
const ObLSID &ls_id = ls->get_ls_id();
|
||||
if (OB_TMP_FAIL(schedule_ls_minor_merge(ls_handle, schedule_tablet_cnt))) {
|
||||
if (OB_TMP_FAIL(schedule_ls_minor_merge(ls_handle))) {
|
||||
LOG_TRACE("meet error when schedule", K(tmp_ret), K(minor_ls_tablet_iter_));
|
||||
minor_ls_tablet_iter_.skip_cur_ls();
|
||||
if (!schedule_ignore_error(tmp_ret)) {
|
||||
@ -1077,8 +1076,7 @@ int ObTenantTabletScheduler::schedule_merge_execute_dag(
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::schedule_ls_minor_merge(
|
||||
ObLSHandle &ls_handle,
|
||||
int64_t &schedule_tablet_cnt)
|
||||
ObLSHandle &ls_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool need_merge = false;
|
||||
@ -1096,9 +1094,9 @@ int ObTenantTabletScheduler::schedule_ls_minor_merge(
|
||||
ObTablet *tablet = nullptr;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool schedule_minor_flag = true;
|
||||
while (OB_SUCC(ret) && schedule_minor_flag && schedule_tablet_cnt < SCHEDULE_TABLET_BATCH_CNT) { // loop all tablet in ls
|
||||
while (OB_SUCC(ret) && schedule_minor_flag) { // loop all tablet in ls
|
||||
bool tablet_merge_finish = false;
|
||||
if (OB_FAIL(minor_ls_tablet_iter_.get_next_tablet(ls_handle, tablet_handle))) {
|
||||
if (OB_FAIL(minor_ls_tablet_iter_.get_next_tablet(tablet_handle))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
@ -1127,7 +1125,6 @@ int ObTenantTabletScheduler::schedule_ls_minor_merge(
|
||||
}
|
||||
}
|
||||
} else { // data tablet
|
||||
schedule_tablet_cnt++;
|
||||
if (OB_TMP_FAIL(schedule_tablet_minor_merge<ObTabletMergeExecuteDag>(ls_handle, tablet_handle))) {
|
||||
if (OB_SIZE_OVERFLOW == tmp_ret) {
|
||||
schedule_minor_flag = false;
|
||||
@ -1180,8 +1177,7 @@ int ObTenantTabletScheduler::check_tablet_could_schedule_by_status(const ObTable
|
||||
int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
int64_t &merge_version,
|
||||
ObLSHandle &ls_handle,
|
||||
bool &all_ls_weak_read_ts_ready,
|
||||
int64_t &schedule_tablet_cnt)
|
||||
bool &all_ls_weak_read_ts_ready)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool need_merge = false;
|
||||
@ -1264,11 +1260,11 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
FLOG_INFO("disable adaptive compaction due to the high load CPU", K(ret), K(cur_sys_stat));
|
||||
}
|
||||
|
||||
while (OB_SUCC(ret) && schedule_tablet_cnt < SCHEDULE_TABLET_BATCH_CNT) { // loop all tablet in ls
|
||||
while (OB_SUCC(ret)) { // loop all tablet in ls
|
||||
bool tablet_merge_finish = false;
|
||||
// ATTENTION!!! load weak ts before get tablet
|
||||
const share::SCN &weak_read_ts = ls.get_ls_wrs_handler()->get_ls_weak_read_ts();
|
||||
if (OB_FAIL(medium_ls_tablet_iter_.get_next_tablet(ls_handle, tablet_handle))) {
|
||||
if (OB_FAIL(medium_ls_tablet_iter_.get_next_tablet(tablet_handle))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
@ -1288,7 +1284,6 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
} else if (FALSE_IT(tablet_id = tablet->get_tablet_meta().tablet_id_)) {
|
||||
} else if (tablet_id.is_ls_inner_tablet()) {
|
||||
// do nothing
|
||||
} else if (FALSE_IT(++schedule_tablet_cnt)) { // inc tablet cnt
|
||||
} else if (OB_TMP_FAIL(schedule_tablet_medium(
|
||||
ls_handle, tablet_handle, major_frozen_scn, weak_read_ts,
|
||||
could_major_merge, enable_adaptive_compaction, ls_could_schedule_medium, ls_locality,
|
||||
@ -1421,14 +1416,13 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
|
||||
LOG_WARN("failed to add suspect info", K(tmp_ret));
|
||||
}
|
||||
}
|
||||
} else if (!medium_ls_tablet_iter_.is_valid() && OB_FAIL(medium_ls_tablet_iter_.build_iter())) {
|
||||
} else if (OB_FAIL(medium_ls_tablet_iter_.build_iter())) {
|
||||
LOG_WARN("failed to init iterator", K(ret));
|
||||
} else {
|
||||
bool all_ls_weak_read_ts_ready = true;
|
||||
int64_t merge_version = get_frozen_version();
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
int64_t schedule_tablet_cnt = 0;
|
||||
LOG_INFO("start schedule all tablet merge", K(merge_version), K(medium_ls_tablet_iter_));
|
||||
|
||||
if (INIT_COMPACTION_SCN == merge_version) {
|
||||
@ -1454,7 +1448,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
|
||||
medium_ls_tablet_iter_.set_report_scn_flag();
|
||||
#endif
|
||||
|
||||
while (OB_SUCC(ret) && schedule_tablet_cnt < SCHEDULE_TABLET_BATCH_CNT) {
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(medium_ls_tablet_iter_.get_next_ls(ls_handle))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
@ -1467,7 +1461,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
|
||||
LOG_WARN("ls is null", K(ret), K(ls));
|
||||
} else if (OB_TMP_FAIL(schedule_ls_medium_merge(
|
||||
merge_version, ls_handle,
|
||||
all_ls_weak_read_ts_ready, schedule_tablet_cnt))) {
|
||||
all_ls_weak_read_ts_ready))) {
|
||||
medium_ls_tablet_iter_.skip_cur_ls(); // for any errno, skip cur ls
|
||||
medium_ls_tablet_iter_.update_merge_finish(false);
|
||||
if (OB_SIZE_OVERFLOW == tmp_ret) {
|
||||
@ -1538,7 +1532,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
|
||||
|
||||
LOG_INFO("finish schedule all tablet merge", K(merge_version), K(schedule_stats_),
|
||||
"tenant_merge_finish", medium_ls_tablet_iter_.tenant_merge_finish(),
|
||||
K(merged_version_), K(schedule_tablet_cnt));
|
||||
K(merged_version_));
|
||||
if (medium_ls_tablet_iter_.is_scan_finish()) {
|
||||
schedule_stats_.clear_tablet_cnt();
|
||||
}
|
||||
@ -1610,22 +1604,27 @@ int ObTenantTabletScheduler::update_report_scn_as_ls_leader(ObLS &ls)
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
// ------------------- ObCompactionScheduleIterator -------------------- //
|
||||
int ObCompactionScheduleIterator::build_iter()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ls_ids_.reuse();
|
||||
if (OB_FAIL(MTL(ObLSService *)->get_ls_ids(ls_ids_))) {
|
||||
LOG_WARN("failed to get all ls id", K(ret));
|
||||
} else {
|
||||
ls_idx_ = -1;
|
||||
tablet_idx_ = 0;
|
||||
tablet_ids_.reuse();
|
||||
scan_finish_ = false;
|
||||
merge_finish_ = true;
|
||||
report_scn_flag_ = false;
|
||||
LOG_TRACE("build iter", K(ret), K(ls_ids_));
|
||||
if (!is_valid()) {
|
||||
ls_ids_.reuse();
|
||||
if (OB_FAIL(MTL(ObLSService *)->get_ls_ids(ls_ids_))) {
|
||||
LOG_WARN("failed to get all ls id", K(ret));
|
||||
} else {
|
||||
ls_idx_ = -1;
|
||||
tablet_idx_ = 0;
|
||||
tablet_ids_.reuse();
|
||||
scan_finish_ = false;
|
||||
merge_finish_ = true;
|
||||
ls_tablet_svr_ = nullptr;
|
||||
schedule_tablet_cnt_ = 0;
|
||||
report_scn_flag_ = false;
|
||||
LOG_TRACE("build iter", K(ret), KPC(this));
|
||||
}
|
||||
} else { // iter is invalid, no need to build, just set var to start cur batch
|
||||
(void) start_cur_batch();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1636,20 +1635,25 @@ int ObCompactionScheduleIterator::get_next_ls(ObLSHandle &ls_handle)
|
||||
if (-1 == ls_idx_
|
||||
|| tablet_idx_ >= tablet_ids_.count()) { // tablet iter end, need get next ls
|
||||
++ls_idx_;
|
||||
ls_tablet_svr_ = nullptr;
|
||||
tablet_ids_.reuse();
|
||||
LOG_TRACE("tablet iter end", K(ret), K(ls_idx_), K(tablet_idx_));
|
||||
LOG_TRACE("tablet iter end", K(ret), K(ls_idx_), K(tablet_idx_), "tablet_cnt", tablet_ids_.count());
|
||||
}
|
||||
do {
|
||||
if (ls_idx_ >= ls_ids_.count()) {
|
||||
if (finish_cur_batch_) {
|
||||
ret = OB_ITER_END;
|
||||
} else if (ls_idx_ >= ls_ids_.count()) {
|
||||
scan_finish_ = true;
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_FAIL((MTL(storage::ObLSService *)->get_ls(ls_ids_[ls_idx_], ls_handle, mod_)))) {
|
||||
} else if (OB_FAIL(get_cur_ls_handle(ls_handle))) {
|
||||
if (OB_LS_NOT_EXIST == ret) {
|
||||
LOG_TRACE("ls not exist", K(ret), K(ls_idx_), K(ls_ids_[ls_idx_]));
|
||||
skip_cur_ls();
|
||||
} else {
|
||||
LOG_WARN("failed to get ls", K(ret), K(ls_idx_), K(ls_ids_[ls_idx_]));
|
||||
}
|
||||
} else {
|
||||
ls_tablet_svr_ = ls_handle.get_ls()->get_tablet_svr();
|
||||
}
|
||||
} while (OB_LS_NOT_EXIST == ret);
|
||||
return ret;
|
||||
@ -1657,36 +1661,34 @@ int ObCompactionScheduleIterator::get_next_ls(ObLSHandle &ls_handle)
|
||||
|
||||
void ObCompactionScheduleIterator::reset()
|
||||
{
|
||||
mod_ = ObLSGetMod::INVALID_MOD;
|
||||
timeout_us_ = 0;
|
||||
ls_idx_ = 0;
|
||||
tablet_idx_ = 0;
|
||||
ls_ids_.reuse();
|
||||
tablet_ids_.reuse();
|
||||
scan_finish_ = false;
|
||||
merge_finish_ = false;
|
||||
finish_cur_batch_ = false;
|
||||
ls_idx_ = 0;
|
||||
tablet_idx_ = 0;
|
||||
schedule_tablet_cnt_ = 0;
|
||||
ls_ids_.reuse();
|
||||
tablet_ids_.reuse();
|
||||
ls_tablet_svr_ = nullptr;
|
||||
report_scn_flag_ = false;
|
||||
}
|
||||
|
||||
bool ObCompactionScheduleIterator::is_valid() const
|
||||
{
|
||||
return ls_ids_.count() > 0 && ls_idx_ >= 0
|
||||
return ls_ids_.count() > 0 && ls_idx_ >= 0 && nullptr != ls_tablet_svr_
|
||||
&& (ls_idx_ < ls_ids_.count() - 1
|
||||
|| (ls_idx_ == ls_ids_.count() - 1 && tablet_idx_ < tablet_ids_.count()));
|
||||
// have remain ls or have remain tablet
|
||||
}
|
||||
|
||||
int ObCompactionScheduleIterator::get_next_tablet(
|
||||
ObLSHandle &ls_handle,
|
||||
ObTabletHandle &tablet_handle)
|
||||
int ObCompactionScheduleIterator::get_next_tablet(ObTabletHandle &tablet_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!ls_handle.is_valid())) {
|
||||
if (OB_ISNULL(ls_tablet_svr_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls_handle is invalid", K(ret), K(ls_idx_), K(ls_ids_[ls_idx_]));
|
||||
LOG_WARN("ls tablet svr is unexpected null", K(ret), KPC(this));
|
||||
} else if (tablet_ids_.empty()) {
|
||||
if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->get_all_tablet_ids(
|
||||
is_major_/*except_ls_inner_tablet*/, tablet_ids_))) {
|
||||
if (OB_FAIL(get_tablet_ids())) {
|
||||
LOG_WARN("failed to get tablet ids", K(ret));
|
||||
} else {
|
||||
tablet_idx_ = 0; // for new ls, set tablet_idx_ = 0
|
||||
@ -1697,32 +1699,53 @@ int ObCompactionScheduleIterator::get_next_tablet(
|
||||
do {
|
||||
if (tablet_idx_ >= tablet_ids_.count()) {
|
||||
ret = OB_ITER_END;
|
||||
} else if (schedule_tablet_cnt_ >= max_batch_tablet_cnt_) {
|
||||
finish_cur_batch_ = true;
|
||||
ret = OB_ITER_END;
|
||||
} else {
|
||||
const common::ObTabletID &tablet_id = tablet_ids_.at(tablet_idx_);
|
||||
if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->get_tablet(tablet_id, tablet_handle, timeout_us_, ObMDSGetTabletMode::READ_ALL_COMMITED))) {
|
||||
if (OB_FAIL(get_tablet_handle(tablet_id, tablet_handle))) {
|
||||
if (OB_TABLET_NOT_EXIST == ret) {
|
||||
tablet_idx_++;
|
||||
} else {
|
||||
LOG_WARN("fail to get tablet", K(ret), K(tablet_idx_), K(tablet_id), K_(timeout_us));
|
||||
LOG_WARN("fail to get tablet", K(ret), K(tablet_idx_), K(tablet_id));
|
||||
}
|
||||
} else {
|
||||
tablet_handle.set_wash_priority(WashTabletPriority::WTP_LOW);
|
||||
tablet_idx_++;
|
||||
schedule_tablet_cnt_++;
|
||||
}
|
||||
}
|
||||
} while (OB_TABLET_NOT_EXIST == ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
// TODO(@lixia.yq) add errsim obtest here
|
||||
int ObCompactionScheduleIterator::get_cur_ls_handle(ObLSHandle &ls_handle)
|
||||
{
|
||||
return MTL(storage::ObLSService *)->get_ls(ls_ids_[ls_idx_], ls_handle, mod_);
|
||||
}
|
||||
|
||||
int ObCompactionScheduleIterator::get_tablet_ids()
|
||||
{
|
||||
return ls_tablet_svr_->get_all_tablet_ids(is_major_/*except_ls_inner_tablet*/, tablet_ids_);
|
||||
}
|
||||
|
||||
int ObCompactionScheduleIterator::get_tablet_handle(
|
||||
const ObTabletID &tablet_id, ObTabletHandle &tablet_handle)
|
||||
{
|
||||
return ls_tablet_svr_->get_tablet(tablet_id, tablet_handle, 0/*timeout*/);
|
||||
}
|
||||
|
||||
int64_t ObCompactionScheduleIterator::to_string(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
int64_t pos = 0;
|
||||
J_OBJ_START();
|
||||
J_KV(K_(report_scn_flag), K_(ls_idx), K_(ls_ids), K_(tablet_idx), K(tablet_ids_.count()));
|
||||
J_KV(K_(ls_idx), K_(ls_ids), K_(tablet_idx), K(tablet_ids_.count()), K_(schedule_tablet_cnt), K_(max_batch_tablet_cnt));
|
||||
if (is_valid()) {
|
||||
J_COMMA();
|
||||
J_KV("cur_ls", ls_ids_.at(ls_idx_), K_(tablet_idx));
|
||||
if (!tablet_ids_.empty() && tablet_idx_ < tablet_ids_.count()) {
|
||||
J_KV("cur_ls", ls_ids_.at(ls_idx_));
|
||||
if (!tablet_ids_.empty() && tablet_idx_ > 0 && tablet_idx_ < tablet_ids_.count()) {
|
||||
J_COMMA();
|
||||
J_KV("next_tablet", tablet_ids_.at(tablet_idx_));
|
||||
}
|
||||
|
@ -37,6 +37,7 @@ class ObLS;
|
||||
class ObTablet;
|
||||
class ObITable;
|
||||
class ObTabletDDLKvMgr;
|
||||
class ObLSTabletService;
|
||||
enum ProhibitMediumTask
|
||||
{
|
||||
TRANSFER = 0,
|
||||
@ -59,7 +60,6 @@ private:
|
||||
bool enable_fast_freeze_;
|
||||
};
|
||||
|
||||
|
||||
// record ls_id/tablet_id
|
||||
class ObCompactionScheduleIterator
|
||||
{
|
||||
@ -67,54 +67,76 @@ public:
|
||||
ObCompactionScheduleIterator(
|
||||
const bool is_major,
|
||||
ObLSGetMod mod = ObLSGetMod::STORAGE_MOD,
|
||||
const int64_t timeout_us = 0)
|
||||
const int64_t batch_tablet_cnt = SCHEDULE_TABLET_BATCH_CNT)
|
||||
: mod_(mod),
|
||||
report_scn_flag_(false),
|
||||
is_major_(is_major),
|
||||
scan_finish_(false),
|
||||
merge_finish_(false),
|
||||
timeout_us_(timeout_us),
|
||||
ls_idx_(0),
|
||||
finish_cur_batch_(false),
|
||||
report_scn_flag_(false),
|
||||
ls_idx_(-1),
|
||||
tablet_idx_(0),
|
||||
schedule_tablet_cnt_(0),
|
||||
max_batch_tablet_cnt_(batch_tablet_cnt),
|
||||
ls_tablet_svr_(nullptr),
|
||||
ls_ids_(),
|
||||
tablet_ids_()
|
||||
{}
|
||||
{
|
||||
ls_ids_.set_attr(ObMemAttr(MTL_ID(), "CompIter"));
|
||||
tablet_ids_.set_attr(ObMemAttr(MTL_ID(), "CompIter"));
|
||||
}
|
||||
~ObCompactionScheduleIterator() { reset(); }
|
||||
int build_iter();
|
||||
int get_next_ls(ObLSHandle &ls_handle);
|
||||
int get_next_tablet(ObLSHandle &ls_handle, ObTabletHandle &tablet_handle);
|
||||
int get_next_tablet(ObTabletHandle &tablet_handle);
|
||||
bool is_scan_finish() const { return scan_finish_; }
|
||||
bool tenant_merge_finish() const { return merge_finish_ & scan_finish_; }
|
||||
void set_report_scn_flag() { report_scn_flag_ = true; }
|
||||
bool need_report_scn() const { return report_scn_flag_; }
|
||||
void update_merge_finish(bool merge_finish) {
|
||||
void update_merge_finish(const bool merge_finish) {
|
||||
merge_finish_ &= merge_finish;
|
||||
}
|
||||
void set_report_scn_flag() { report_scn_flag_ = true; }
|
||||
bool need_report_scn() const { return report_scn_flag_; }
|
||||
void reset();
|
||||
bool is_valid() const;
|
||||
void skip_cur_ls()
|
||||
{
|
||||
++ls_idx_;
|
||||
tablet_idx_ = -1;
|
||||
ls_tablet_svr_ = nullptr;
|
||||
tablet_ids_.reuse();
|
||||
}
|
||||
OB_INLINE int64_t to_string(char *buf, const int64_t buf_len) const;
|
||||
void start_cur_batch()
|
||||
{
|
||||
schedule_tablet_cnt_ = 0;
|
||||
finish_cur_batch_ = false;
|
||||
}
|
||||
int64_t to_string(char *buf, const int64_t buf_len) const;
|
||||
private:
|
||||
// virtual for unittest
|
||||
virtual int get_cur_ls_handle(ObLSHandle &ls_handle);
|
||||
virtual int get_tablet_ids();
|
||||
virtual int get_tablet_handle(const ObTabletID &tablet_id, ObTabletHandle &tablet_handle);
|
||||
|
||||
static const int64_t LS_ID_ARRAY_CNT = 10;
|
||||
static const int64_t TABLET_ID_ARRAY_CNT = 2000;
|
||||
static const int64_t SCHEDULE_TABLET_BATCH_CNT = 50 * 1000L; // 5w
|
||||
|
||||
ObLSGetMod mod_;
|
||||
bool report_scn_flag_;
|
||||
bool is_major_;
|
||||
bool scan_finish_;
|
||||
bool merge_finish_;
|
||||
bool finish_cur_batch_;
|
||||
bool report_scn_flag_;
|
||||
int64_t timeout_us_;
|
||||
int64_t ls_idx_;
|
||||
uint64_t tablet_idx_;
|
||||
int64_t tablet_idx_;
|
||||
int64_t schedule_tablet_cnt_;
|
||||
int64_t max_batch_tablet_cnt_;
|
||||
storage::ObLSTabletService *ls_tablet_svr_;
|
||||
common::ObSEArray<share::ObLSID, LS_ID_ARRAY_CNT> ls_ids_;
|
||||
common::ObSEArray<ObTabletID, TABLET_ID_ARRAY_CNT> tablet_ids_;
|
||||
};
|
||||
|
||||
|
||||
class ObTenantTabletScheduler
|
||||
{
|
||||
public:
|
||||
@ -243,8 +265,7 @@ private:
|
||||
int schedule_ls_medium_merge(
|
||||
int64_t &merge_version,
|
||||
ObLSHandle &ls_handle,
|
||||
bool &all_ls_weak_read_ts_ready,
|
||||
int64_t &schedule_tablet_cnt);
|
||||
bool &all_ls_weak_read_ts_ready);
|
||||
OB_INLINE int schedule_tablet_medium(
|
||||
ObLSHandle &ls_handle,
|
||||
ObTabletHandle &tablet_handle,
|
||||
@ -258,8 +279,7 @@ private:
|
||||
bool &is_leader,
|
||||
bool &tablet_merge_finish);
|
||||
int schedule_ls_minor_merge(
|
||||
ObLSHandle &ls_handle,
|
||||
int64_t &schedule_tablet_cnt);
|
||||
ObLSHandle &ls_handle);
|
||||
int try_remove_old_table(ObLS &ls);
|
||||
int restart_schedule_timer_task(
|
||||
const int64_t interval,
|
||||
|
@ -87,6 +87,7 @@ storage_unittest(test_parallel_minor_dag)
|
||||
storage_dml_unittest(test_partition_range_splite)
|
||||
storage_dml_unittest(test_major_rows_merger)
|
||||
storage_dml_unittest(test_tablet tablet/test_tablet.cpp)
|
||||
storage_unittest(test_compaction_iter compaction/test_compaction_iter.cpp)
|
||||
|
||||
#storage_dml_unittest(test_table_scan_pure_index_table)
|
||||
|
||||
|
233
unittest/storage/compaction/test_compaction_iter.cpp
Normal file
233
unittest/storage/compaction/test_compaction_iter.cpp
Normal file
@ -0,0 +1,233 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#define USING_LOG_PREFIX STORAGE
|
||||
#define protected public
|
||||
#define private public
|
||||
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "storage/ls/ob_ls.h"
|
||||
#include "storage/compaction/ob_tenant_tablet_scheduler.h"
|
||||
#include "mtlenv/mock_tenant_module_env.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
using namespace share;
|
||||
using namespace common;
|
||||
namespace unittest
|
||||
{
|
||||
|
||||
class MockObCompactionScheduleIterator : public storage::ObCompactionScheduleIterator
|
||||
{
|
||||
public:
|
||||
MockObCompactionScheduleIterator(const int64_t max_batch_tablet_cnt)
|
||||
: ObCompactionScheduleIterator(
|
||||
true/*is_major, no meaning*/,
|
||||
ObLSGetMod::STORAGE_MOD,
|
||||
max_batch_tablet_cnt),
|
||||
mock_tablet_id_cnt_(0),
|
||||
error_tablet_idx_(-1),
|
||||
errno_(OB_SUCCESS)
|
||||
{}
|
||||
virtual int get_cur_ls_handle(ObLSHandle &ls_handle) override
|
||||
{
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
virtual int get_tablet_ids() override
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < mock_tablet_id_cnt_; ++i) {
|
||||
ret = tablet_ids_.push_back(ObTabletID(i));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
virtual int get_tablet_handle(const ObTabletID &tablet_id, ObTabletHandle &tablet_handle) override
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (tablet_idx_ == error_tablet_idx_) {
|
||||
ret = errno_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
void prepare_ls_id_array(const int64_t ls_cnt)
|
||||
{
|
||||
for (int64_t i = 0; i < ls_cnt; ++i) {
|
||||
ASSERT_EQ(OB_SUCCESS, ls_ids_.push_back(ObLSID(i)));
|
||||
}
|
||||
}
|
||||
int64_t mock_tablet_id_cnt_;
|
||||
int64_t error_tablet_idx_;
|
||||
int errno_;
|
||||
};
|
||||
|
||||
class TestCompactionIter : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
void test_iter(
|
||||
const int64_t ls_cnt,
|
||||
const int64_t max_batch_tablet_cnt,
|
||||
const int64_t tablet_cnt_per_ls,
|
||||
const int64_t error_tablet_idx = -1,
|
||||
const int input_errno = OB_SUCCESS);
|
||||
};
|
||||
|
||||
void TestCompactionIter::test_iter(
|
||||
const int64_t ls_cnt,
|
||||
const int64_t max_batch_tablet_cnt,
|
||||
const int64_t tablet_cnt_per_ls,
|
||||
const int64_t error_tablet_idx,
|
||||
const int input_errno)
|
||||
{
|
||||
MockObCompactionScheduleIterator iter(max_batch_tablet_cnt);
|
||||
iter.prepare_ls_id_array(ls_cnt);
|
||||
iter.mock_tablet_id_cnt_ = tablet_cnt_per_ls;
|
||||
iter.error_tablet_idx_ = error_tablet_idx;
|
||||
iter.errno_ = input_errno;
|
||||
|
||||
int ret = OB_SUCCESS;
|
||||
int iter_cnt = 0;
|
||||
int loop_cnt = 0;
|
||||
ObLSHandle ls_handle;
|
||||
ObTabletHandle tablet_handle;
|
||||
while (OB_SUCC(ret)) {
|
||||
ret = iter.get_next_ls(ls_handle);
|
||||
if (OB_ITER_END == ret) {
|
||||
if (iter.ls_idx_ == iter.ls_ids_.count()) {
|
||||
loop_cnt++;
|
||||
} else {
|
||||
STORAGE_LOG(WARN, "unexpected error", K(ret), K(iter), K(iter_cnt));
|
||||
}
|
||||
}
|
||||
while (OB_SUCC(ret)) {
|
||||
ret = iter.get_next_tablet(tablet_handle);
|
||||
if (OB_SUCC(ret)) {
|
||||
iter_cnt++;
|
||||
} else if (OB_ITER_END != ret) {
|
||||
} else if (iter.tablet_idx_ == iter.tablet_ids_.count()) {
|
||||
STORAGE_LOG(INFO, "iter tablet array finish", K(ret), K(iter), K(iter_cnt));
|
||||
} else if (iter_cnt > 0 && iter_cnt % max_batch_tablet_cnt == 0) {
|
||||
STORAGE_LOG(INFO, "iter batch finish", K(ret), K(iter), K(iter_cnt));
|
||||
ASSERT_EQ(iter.finish_cur_batch_, true);
|
||||
iter.start_cur_batch();
|
||||
loop_cnt++;
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "unexpected error", K(ret), K(iter), K(iter_cnt));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_ITER_END != ret) {
|
||||
iter.skip_cur_ls();
|
||||
}
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
}
|
||||
} // end of while
|
||||
}
|
||||
if (input_errno == OB_SUCCESS) {
|
||||
ASSERT_EQ(iter_cnt, ls_cnt * tablet_cnt_per_ls);
|
||||
ASSERT_EQ(loop_cnt, iter_cnt / max_batch_tablet_cnt + (iter_cnt % max_batch_tablet_cnt != 0));
|
||||
} else if (error_tablet_idx < 0 || error_tablet_idx >= tablet_cnt_per_ls) {
|
||||
// no errno
|
||||
} else if (OB_TABLET_NOT_EXIST == input_errno) {
|
||||
// for this errno, just skip this tablet
|
||||
ASSERT_EQ(iter_cnt, ls_cnt * (tablet_cnt_per_ls - 1));
|
||||
ASSERT_EQ(loop_cnt, iter_cnt / max_batch_tablet_cnt + (iter_cnt % max_batch_tablet_cnt != 0));
|
||||
} else {
|
||||
// for this errno, just skip cur ls, reset tablet won't be scheduled
|
||||
ASSERT_EQ(iter_cnt, ls_cnt * error_tablet_idx);
|
||||
ASSERT_EQ(loop_cnt, iter_cnt / max_batch_tablet_cnt + (iter_cnt % max_batch_tablet_cnt != 0));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestCompactionIter, test_normal_loop)
|
||||
{
|
||||
test_iter(
|
||||
3,/*ls_cnt*/
|
||||
10000,/*max_batch_tablet_cnt*/
|
||||
10000/*tablet_cnt_per_ls*/
|
||||
);
|
||||
test_iter(
|
||||
5,/*ls_cnt*/
|
||||
1000,/*max_batch_tablet_cnt*/
|
||||
10000/*tablet_cnt_per_ls*/
|
||||
);
|
||||
test_iter(
|
||||
5,/*ls_cnt*/
|
||||
1000,/*max_batch_tablet_cnt*/
|
||||
100/*tablet_cnt_per_ls*/
|
||||
);
|
||||
}
|
||||
|
||||
TEST_F(TestCompactionIter, test_single_ls)
|
||||
{
|
||||
test_iter(
|
||||
1,/*ls_cnt*/
|
||||
1000,/*max_batch_tablet_cnt*/
|
||||
100/*tablet_cnt_per_ls*/
|
||||
);
|
||||
test_iter(
|
||||
1,/*ls_cnt*/
|
||||
1000,/*max_batch_tablet_cnt*/
|
||||
1000/*tablet_cnt_per_ls*/
|
||||
);
|
||||
test_iter(
|
||||
1,/*ls_cnt*/
|
||||
1000,/*max_batch_tablet_cnt*/
|
||||
10000/*tablet_cnt_per_ls*/
|
||||
);
|
||||
}
|
||||
|
||||
TEST_F(TestCompactionIter, test_loop_with_not_exist_tablet)
|
||||
{
|
||||
test_iter(
|
||||
2,/*ls_cnt*/
|
||||
1000,/*max_batch_tablet_cnt*/
|
||||
10000,/*tablet_cnt_per_ls*/
|
||||
50,/*error_tablet_idx*/
|
||||
OB_TABLET_NOT_EXIST/*errno*/
|
||||
);
|
||||
test_iter(
|
||||
2,/*ls_cnt*/
|
||||
1000,/*max_batch_tablet_cnt*/
|
||||
10000,/*tablet_cnt_per_ls*/
|
||||
50,/*error_tablet_idx*/
|
||||
OB_TABLET_NOT_EXIST/*errno*/
|
||||
);
|
||||
test_iter(
|
||||
2,/*ls_cnt*/
|
||||
1000,/*max_batch_tablet_cnt*/
|
||||
10000,/*tablet_cnt_per_ls*/
|
||||
50,/*error_tablet_idx*/
|
||||
OB_ERR_UNEXPECTED/*errno*/
|
||||
);
|
||||
test_iter(
|
||||
2,/*ls_cnt*/
|
||||
1000,/*max_batch_tablet_cnt*/
|
||||
1000,/*tablet_cnt_per_ls*/
|
||||
999,/*error_tablet_idx*/
|
||||
OB_ERR_UNEXPECTED/*errno*/
|
||||
);
|
||||
}
|
||||
|
||||
} // namespace unittest
|
||||
} //namespace oceanbase
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
system("rm -rf test_compaction_iter.log*");
|
||||
OB_LOGGER.set_file_name("test_compaction_iter.log");
|
||||
oceanbase::common::ObLogger::get_logger().set_log_level("TRACE");
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
Reference in New Issue
Block a user