optimate schedule_ls_medium_merge
This commit is contained in:
parent
e288a7ad87
commit
ebf454f130
@ -46,7 +46,13 @@ int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_l
|
||||
if (OB_ISNULL(buf) || buf_len <= 0) {
|
||||
} else {
|
||||
J_OBJ_START();
|
||||
J_KV("ls_id", ls_.get_ls_id(), "tablet_id", tablet_.get_tablet_meta().tablet_id_);
|
||||
J_KV("ls_id", ls_.get_ls_id());
|
||||
J_COMMA();
|
||||
if (OB_NOT_NULL(tablet_)) {
|
||||
J_KV("tablet_id", tablet_->get_tablet_meta().tablet_id_);
|
||||
} else {
|
||||
J_KV("tablet", tablet_);
|
||||
}
|
||||
J_OBJ_END();
|
||||
}
|
||||
return pos;
|
||||
@ -192,9 +198,9 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_for_leader(
|
||||
#ifdef ERRSIM
|
||||
ret = OB_E(EventTable::EN_SKIP_INDEX_MAJOR) ret;
|
||||
// skip schedule major for user index table
|
||||
if (OB_FAIL(ret)) {
|
||||
if (tablet_.get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID
|
||||
&& tablet_.get_tablet_meta().tablet_id_ != tablet_.get_tablet_meta().data_tablet_id_) {
|
||||
if (OB_FAIL(ret) && OB_NOT_NULL(tablet_)) {
|
||||
if (tablet_->get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID
|
||||
&& tablet_->get_tablet_meta().tablet_id_ != tablet_->get_tablet_meta().data_tablet_id_) {
|
||||
return ret;
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
@ -216,58 +222,63 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
// check last medium type, select inner table for last major
|
||||
bool schedule_medium_flag = false;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
ObITable *last_major = tablet_.get_table_store().get_major_sstables().get_boundary_table(true/*last*/);
|
||||
const ObMediumCompactionInfoList &medium_list = tablet_.get_medium_compaction_info_list();
|
||||
ObITable *last_major = nullptr;
|
||||
const bool is_major = 0 != schedule_major_snapshot;
|
||||
if (OB_ISNULL(last_major)) {
|
||||
// no major, do nothing
|
||||
} else if (!medium_list.could_schedule_next_round()) { // check serialized list
|
||||
// do nothing
|
||||
} else if (OB_FAIL(tablet_.get_max_sync_medium_scn(max_sync_medium_scn))) { // check info in memory
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), K(max_sync_medium_scn));
|
||||
} else if (is_major && schedule_major_snapshot > max_sync_medium_scn) {
|
||||
schedule_medium_flag = true;
|
||||
} else if (nullptr != last_major && last_major->get_snapshot_version() < max_sync_medium_scn) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(ObAdaptiveMergePolicy::get_adaptive_merge_reason(tablet_, adaptive_merge_reason))) {
|
||||
if (OB_HASH_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get meta merge priority", K(ret), KPC(this));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else if (adaptive_merge_reason > ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE) {
|
||||
schedule_medium_flag = true;
|
||||
}
|
||||
LOG_TRACE("schedule next medium in primary cluster", K(ret), KPC(this), K(schedule_medium_flag),
|
||||
K(schedule_major_snapshot), K(adaptive_merge_reason), KPC(last_major), K(medium_list), K(max_sync_medium_scn));
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret)) {
|
||||
if (tablet_.get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret;
|
||||
LOG_INFO("errsim", K(ret), KPC(this));
|
||||
if (OB_FAIL(ret)) {
|
||||
FLOG_INFO("set schedule medium with errsim", KPC(this));
|
||||
if (OB_ISNULL(tablet_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null tablet", K(ret), KP(tablet_));
|
||||
} else {
|
||||
const ObMediumCompactionInfoList &medium_list = tablet_->get_medium_compaction_info_list();
|
||||
if (OB_ISNULL(last_major = tablet_->get_table_store().get_major_sstables().get_boundary_table(true/*last*/))) {
|
||||
// no major, do nothing
|
||||
} else if (!medium_list.could_schedule_next_round()) { // check serialized list
|
||||
// do nothing
|
||||
} else if (OB_FAIL(tablet_->get_max_sync_medium_scn(max_sync_medium_scn))) { // check info in memory
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), K(max_sync_medium_scn));
|
||||
} else if (is_major && schedule_major_snapshot > max_sync_medium_scn) {
|
||||
schedule_medium_flag = true;
|
||||
} else if (nullptr != last_major && last_major->get_snapshot_version() < max_sync_medium_scn) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(ObAdaptiveMergePolicy::get_adaptive_merge_reason(*tablet_, adaptive_merge_reason))) {
|
||||
if (OB_HASH_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get meta merge priority", K(ret), KPC(this));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
schedule_medium_flag = true;
|
||||
}
|
||||
} else if (adaptive_merge_reason > ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE) {
|
||||
schedule_medium_flag = true;
|
||||
}
|
||||
LOG_TRACE("schedule next medium in primary cluster", K(ret), KPC(this), K(schedule_medium_flag),
|
||||
K(schedule_major_snapshot), K(adaptive_merge_reason), KPC(last_major), K(medium_list), K(max_sync_medium_scn));
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret)) {
|
||||
if (tablet_->get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret;
|
||||
LOG_INFO("errsim", K(ret), KPC(this));
|
||||
if (OB_FAIL(ret)) {
|
||||
FLOG_INFO("set schedule medium with errsim", KPC(this));
|
||||
ret = OB_SUCCESS;
|
||||
schedule_medium_flag = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
if (OB_FAIL(ret) || !schedule_medium_flag) {
|
||||
} else if (ObMediumCompactionInfo::MAJOR_COMPACTION == medium_list.get_last_compaction_type()) {
|
||||
// for normal medium, checksum error happened, wait_check_medium_scn_ will never = 0
|
||||
// for major, need select inner_table to check RS status
|
||||
if (OB_FAIL(get_status_from_inner_table(ls_.get_ls_id(), tablet_.get_tablet_meta().tablet_id_, ret_info))) {
|
||||
LOG_WARN("failed to get status from inner tablet", K(ret), KPC(this));
|
||||
} else if (ret_info.could_schedule_next_round(medium_list.get_last_compaction_scn())) {
|
||||
LOG_INFO("success to check RS major checksum validation finished", K(ret), KPC(this), K(ret_info));
|
||||
ret = decide_medium_snapshot(is_major);
|
||||
if (OB_FAIL(ret) || !schedule_medium_flag) {
|
||||
} else if (ObMediumCompactionInfo::MAJOR_COMPACTION == medium_list.get_last_compaction_type()) {
|
||||
// for normal medium, checksum error happened, wait_check_medium_scn_ will never = 0
|
||||
// for major, need select inner_table to check RS status
|
||||
if (OB_FAIL(get_status_from_inner_table(ls_.get_ls_id(), tablet_->get_tablet_meta().tablet_id_, ret_info))) {
|
||||
LOG_WARN("failed to get status from inner tablet", K(ret), KPC(this));
|
||||
} else if (ret_info.could_schedule_next_round(medium_list.get_last_compaction_scn())) {
|
||||
LOG_INFO("success to check RS major checksum validation finished", K(ret), KPC(this), K(ret_info));
|
||||
ret = decide_medium_snapshot(is_major);
|
||||
} else {
|
||||
++schedule_stat.wait_rs_validate_cnt_;
|
||||
}
|
||||
} else {
|
||||
++schedule_stat.wait_rs_validate_cnt_;
|
||||
ret = decide_medium_snapshot(is_major, adaptive_merge_reason);
|
||||
}
|
||||
} else {
|
||||
ret = decide_medium_snapshot(is_major, adaptive_merge_reason);
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -280,19 +291,24 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser
|
||||
|
||||
int64_t max_merged_snapshot = 0;
|
||||
int64_t min_reserved_snapshot = INT64_MAX;
|
||||
const ObTabletTableStore &table_store = tablet_.get_table_store();
|
||||
if (0 == table_store.get_major_sstables().count()) {
|
||||
if (OB_ISNULL(tablet_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("major sstable should not be empty", K(ret), K(tablet_));
|
||||
} else if (0 == ls_.get_min_reserved_snapshot()) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
// not sync reserved snapshot yet, should not schedule now
|
||||
} else if (FALSE_IT(max_merged_snapshot = table_store.get_major_sstables().get_boundary_table(true/*last*/)->get_snapshot_version())) {
|
||||
} else if (OB_FAIL(MTL(ObTenantFreezeInfoMgr*)->get_min_reserved_snapshot(
|
||||
tablet_.get_tablet_meta().tablet_id_, max_merged_snapshot, min_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get multi version from freeze info mgr", K(ret), K(table_id));
|
||||
LOG_WARN("unexpected null tablet", K(ret), KP(tablet_));
|
||||
} else {
|
||||
max_reserved_snapshot = MAX(ls_.get_min_reserved_snapshot(), min_reserved_snapshot);
|
||||
const ObTabletTableStore &table_store = tablet_->get_table_store();
|
||||
if (0 == table_store.get_major_sstables().count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("major sstable should not be empty", K(ret), K(tablet_));
|
||||
} else if (0 == ls_.get_min_reserved_snapshot()) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
// not sync reserved snapshot yet, should not schedule now
|
||||
} else if (FALSE_IT(max_merged_snapshot = table_store.get_major_sstables().get_boundary_table(true/*last*/)->get_snapshot_version())) {
|
||||
} else if (OB_FAIL(MTL(ObTenantFreezeInfoMgr*)->get_min_reserved_snapshot(
|
||||
tablet_->get_tablet_meta().tablet_id_, max_merged_snapshot, min_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get multi version from freeze info mgr", K(ret), K(table_id));
|
||||
} else {
|
||||
max_reserved_snapshot = MAX(ls_.get_min_reserved_snapshot(), min_reserved_snapshot);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -303,104 +319,109 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const ObTabletID &tablet_id = tablet_.get_tablet_meta().tablet_id_;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
uint64_t compat_version = 0;
|
||||
LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(tablet_id));
|
||||
if (OB_FAIL(tablet_.get_max_sync_medium_scn(max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(ls_.add_dependent_medium_tablet(tablet_id))) { // add dependent_id in ObLSReservedSnapshotMgr
|
||||
LOG_WARN("failed to add dependent tablet", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
|
||||
LOG_WARN("fail to get data version", K(ret));
|
||||
} else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_1_0_0)) {
|
||||
if (OB_ISNULL(tablet_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid data version to schedule medium compaction", K(ret), K(compat_version));
|
||||
LOG_WARN("unexpected null tablet", K(ret), KP(tablet_));
|
||||
} else {
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
ObGetMergeTablesResult result;
|
||||
ObMediumCompactionInfo medium_info;
|
||||
medium_info.data_version_ = compat_version;
|
||||
const ObTabletID &tablet_id = tablet_->get_tablet_meta().tablet_id_;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
uint64_t compat_version = 0;
|
||||
LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(tablet_id));
|
||||
if (OB_FAIL(tablet_->get_max_sync_medium_scn(max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(ls_.add_dependent_medium_tablet(tablet_id))) { // add dependent_id in ObLSReservedSnapshotMgr
|
||||
LOG_WARN("failed to add dependent tablet", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
|
||||
LOG_WARN("fail to get data version", K(ret));
|
||||
} else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_1_0_0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid data version to schedule medium compaction", K(ret), K(compat_version));
|
||||
} else {
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
ObGetMergeTablesResult result;
|
||||
ObMediumCompactionInfo medium_info;
|
||||
medium_info.data_version_ = compat_version;
|
||||
|
||||
if (OB_FAIL(choose_medium_scn[is_major](ls_, tablet_, merge_reason, allocator_, medium_info, result))) {
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this));
|
||||
}
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (is_major) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get multi_version_start", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot) {
|
||||
// chosen medium snapshot is far too old
|
||||
LOG_INFO("chosen medium snapshot is invalid for multi_version_start", K(ret), KPC(this),
|
||||
K(medium_info), K(max_reserved_snapshot));
|
||||
const share::SCN &weak_read_ts = ls_.get_ls_wrs_handler()->get_ls_weak_read_ts();
|
||||
if (medium_info.medium_snapshot_ == tablet_.get_snapshot_version() // no uncommitted sstable
|
||||
&& weak_read_ts.get_val_for_tx() <= max_reserved_snapshot
|
||||
&& weak_read_ts.get_val_for_tx() + DEFAULT_SCHEDULE_MEDIUM_INTERVAL < ObTimeUtility::current_time_ns()) {
|
||||
const int64_t snapshot_gc_ts = MTL(ObTenantFreezeInfoMgr*)->get_snapshot_gc_ts();
|
||||
medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, MIN(weak_read_ts.get_val_for_tx(), snapshot_gc_ts));
|
||||
LOG_INFO("use weak_read_ts to schedule medium", K(ret), KPC(this),
|
||||
K(medium_info), K(max_reserved_snapshot), K(weak_read_ts), K(snapshot_gc_ts));
|
||||
} else {
|
||||
if (OB_FAIL(choose_medium_scn[is_major](ls_, *tablet_, merge_reason, allocator_, medium_info, result))) {
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this));
|
||||
}
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && !is_major) {
|
||||
const int64_t current_time = ObTimeUtility::current_time_ns();
|
||||
if (max_reserved_snapshot < current_time) {
|
||||
const int64_t time_interval = (current_time - max_reserved_snapshot) / 2;
|
||||
ObSSTable *table = static_cast<ObSSTable *>(tablet_.get_table_store().get_major_sstables().get_boundary_table(true/*last*/));
|
||||
if (OB_ISNULL(table)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table is unexpected null", K(ret), KP(table));
|
||||
} else if (table->get_snapshot_version() + time_interval > medium_info.medium_snapshot_) {
|
||||
} else if (is_major) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get multi_version_start", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot) {
|
||||
// chosen medium snapshot is far too old
|
||||
LOG_INFO("chosen medium snapshot is invalid for multi_version_start", K(ret), KPC(this),
|
||||
K(medium_info), K(max_reserved_snapshot));
|
||||
const share::SCN &weak_read_ts = ls_.get_ls_wrs_handler()->get_ls_weak_read_ts();
|
||||
if (medium_info.medium_snapshot_ == tablet_->get_snapshot_version() // no uncommitted sstable
|
||||
&& weak_read_ts.get_val_for_tx() <= max_reserved_snapshot
|
||||
&& weak_read_ts.get_val_for_tx() + DEFAULT_SCHEDULE_MEDIUM_INTERVAL < ObTimeUtility::current_time_ns()) {
|
||||
const int64_t snapshot_gc_ts = MTL(ObTenantFreezeInfoMgr*)->get_snapshot_gc_ts();
|
||||
medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, MIN(weak_read_ts.get_val_for_tx(), snapshot_gc_ts));
|
||||
LOG_INFO("use weak_read_ts to schedule medium", K(ret), KPC(this),
|
||||
K(medium_info), K(max_reserved_snapshot), K(weak_read_ts), K(snapshot_gc_ts));
|
||||
} else {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_DEBUG("schedule medium frequently", K(ret), KPC(table), K(medium_info), K(time_interval));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && !is_major) {
|
||||
const int64_t current_time = ObTimeUtility::current_time_ns();
|
||||
if (max_reserved_snapshot < current_time) {
|
||||
const int64_t time_interval = (current_time - max_reserved_snapshot) / 2;
|
||||
ObSSTable *table = static_cast<ObSSTable *>(tablet_->get_table_store().get_major_sstables().get_boundary_table(true/*last*/));
|
||||
if (OB_ISNULL(table)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table is unexpected null", K(ret), KP(table));
|
||||
} else if (table->get_snapshot_version() + time_interval > medium_info.medium_snapshot_) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_DEBUG("schedule medium frequently", K(ret), KPC(table), K(medium_info), K(time_interval));
|
||||
}
|
||||
}
|
||||
}
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret) || OB_NO_NEED_MERGE == ret) {
|
||||
if (tablet_->get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret;
|
||||
LOG_INFO("errsim", K(ret), KPC(this));
|
||||
if (OB_FAIL(ret)) {
|
||||
FLOG_INFO("set schedule medium with errsim", KPC(this));
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret) || OB_NO_NEED_MERGE == ret) {
|
||||
if (tablet_.get_tablet_meta().tablet_id_.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
ret = OB_E(EventTable::EN_SCHEDULE_MEDIUM_COMPACTION) ret;
|
||||
LOG_INFO("errsim", K(ret), KPC(this));
|
||||
#endif
|
||||
if (FAILEDx(prepare_medium_info(result, medium_info))) {
|
||||
if (OB_TABLE_IS_DELETED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("failed to prepare medium info", K(ret), K(result), K(tablet_->get_storage_schema()));
|
||||
}
|
||||
} else if (OB_FAIL(submit_medium_clog(medium_info))) {
|
||||
LOG_WARN("failed to submit medium clog and update inner table", K(ret), KPC(this));
|
||||
} else if (OB_TMP_FAIL(ls_.tablet_freeze(tablet_id, true/*is_sync*/))) {
|
||||
// need to freeze memtable with MediumCompactionInfo
|
||||
LOG_WARN("failed to freeze tablet", K(tmp_ret), KPC(this));
|
||||
}
|
||||
// delete tablet_id in ObLSReservedSnapshotMgr even if submit clog or update inner table failed
|
||||
if (OB_TMP_FAIL(ls_.del_dependent_medium_tablet(tablet_id))) {
|
||||
LOG_ERROR("failed to delete dependent medium tablet", K(tmp_ret), KPC(this));
|
||||
ob_abort();
|
||||
}
|
||||
ret = OB_NO_NEED_MERGE == ret ? OB_SUCCESS : ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
FLOG_INFO("set schedule medium with errsim", KPC(this));
|
||||
ret = OB_SUCCESS;
|
||||
// add schedule suspect info
|
||||
ADD_SUSPECT_INFO(MEDIUM_MERGE, ls_.get_ls_id(), tablet_id,
|
||||
"schedule medium failed",
|
||||
"compaction_scn", medium_info.medium_snapshot_,
|
||||
"schema_version", medium_info.storage_schema_.schema_version_,
|
||||
"error_no", ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if (FAILEDx(prepare_medium_info(result, medium_info))) {
|
||||
if (OB_TABLE_IS_DELETED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("failed to prepare medium info", K(ret), K(result), K(tablet_.get_storage_schema()));
|
||||
}
|
||||
} else if (OB_FAIL(submit_medium_clog(medium_info))) {
|
||||
LOG_WARN("failed to submit medium clog and update inner table", K(ret), KPC(this));
|
||||
} else if (OB_TMP_FAIL(ls_.tablet_freeze(tablet_id, true/*is_sync*/))) {
|
||||
// need to freeze memtable with MediumCompactionInfo
|
||||
LOG_WARN("failed to freeze tablet", K(tmp_ret), KPC(this));
|
||||
}
|
||||
// delete tablet_id in ObLSReservedSnapshotMgr even if submit clog or update inner table failed
|
||||
if (OB_TMP_FAIL(ls_.del_dependent_medium_tablet(tablet_id))) {
|
||||
LOG_ERROR("failed to delete dependent medium tablet", K(tmp_ret), KPC(this));
|
||||
ob_abort();
|
||||
}
|
||||
ret = OB_NO_NEED_MERGE == ret ? OB_SUCCESS : ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
// add schedule suspect info
|
||||
ADD_SUSPECT_INFO(MEDIUM_MERGE, ls_.get_ls_id(), tablet_id,
|
||||
"schedule medium failed",
|
||||
"compaction_scn", medium_info.medium_snapshot_,
|
||||
"schema_version", medium_info.storage_schema_.schema_version_,
|
||||
"error_no", ret);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -410,54 +431,59 @@ int ObMediumCompactionScheduleFunc::init_parallel_range(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
int64_t expected_task_count = 0;
|
||||
const int64_t tablet_size = medium_info.storage_schema_.get_tablet_size();
|
||||
const ObSSTable *first_sstable = static_cast<const ObSSTable *>(result.handle_.get_table(0));
|
||||
if (OB_ISNULL(first_sstable)) {
|
||||
if (OB_ISNULL(tablet_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sstable is unexpected null", K(ret), K(tablet_));
|
||||
LOG_WARN("unexpected null tablet", K(ret), KP(tablet_));
|
||||
} else {
|
||||
const int64_t macro_block_cnt = first_sstable->get_meta().get_macro_info().get_data_block_ids().count();
|
||||
int64_t inc_row_cnt = 0;
|
||||
for (int64_t i = 0; i < result.handle_.get_count(); ++i) {
|
||||
inc_row_cnt += static_cast<const ObSSTable*>(result.handle_.get_table(i))->get_meta().get_row_count();
|
||||
}
|
||||
if ((0 == macro_block_cnt && inc_row_cnt > SCHEDULE_RANGE_ROW_COUNT_THRESHOLD)
|
||||
|| (first_sstable->get_meta().get_row_count() >= SCHEDULE_RANGE_ROW_COUNT_THRESHOLD
|
||||
&& inc_row_cnt >= first_sstable->get_meta().get_row_count() * SCHEDULE_RANGE_INC_ROW_COUNT_PERCENRAGE_THRESHOLD)) {
|
||||
if (OB_FAIL(ObParallelMergeCtx::get_concurrent_cnt(tablet_size, macro_block_cnt, expected_task_count))) {
|
||||
STORAGE_LOG(WARN, "failed to get concurrent cnt", K(ret), K(tablet_size), K(expected_task_count),
|
||||
KPC(first_sstable));
|
||||
int64_t expected_task_count = 0;
|
||||
const int64_t tablet_size = medium_info.storage_schema_.get_tablet_size();
|
||||
const ObSSTable *first_sstable = static_cast<const ObSSTable *>(result.handle_.get_table(0));
|
||||
if (OB_ISNULL(first_sstable)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sstable is unexpected null", K(ret), K(tablet_));
|
||||
} else {
|
||||
const int64_t macro_block_cnt = first_sstable->get_meta().get_macro_info().get_data_block_ids().count();
|
||||
int64_t inc_row_cnt = 0;
|
||||
for (int64_t i = 0; i < result.handle_.get_count(); ++i) {
|
||||
inc_row_cnt += static_cast<const ObSSTable*>(result.handle_.get_table(i))->get_meta().get_row_count();
|
||||
}
|
||||
if ((0 == macro_block_cnt && inc_row_cnt > SCHEDULE_RANGE_ROW_COUNT_THRESHOLD)
|
||||
|| (first_sstable->get_meta().get_row_count() >= SCHEDULE_RANGE_ROW_COUNT_THRESHOLD
|
||||
&& inc_row_cnt >= first_sstable->get_meta().get_row_count() * SCHEDULE_RANGE_INC_ROW_COUNT_PERCENRAGE_THRESHOLD)) {
|
||||
if (OB_FAIL(ObParallelMergeCtx::get_concurrent_cnt(tablet_size, macro_block_cnt, expected_task_count))) {
|
||||
STORAGE_LOG(WARN, "failed to get concurrent cnt", K(ret), K(tablet_size), K(expected_task_count),
|
||||
KPC(first_sstable));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (expected_task_count <= 1) {
|
||||
medium_info.clear_parallel_range();
|
||||
} else {
|
||||
ObTableStoreIterator table_iter;
|
||||
ObArrayArray<ObStoreRange> range_array;
|
||||
ObPartitionMultiRangeSpliter range_spliter;
|
||||
ObSEArray<ObStoreRange, 1> input_range_array;
|
||||
ObStoreRange range;
|
||||
range.set_start_key(ObStoreRowkey::MIN_STORE_ROWKEY);
|
||||
range.set_end_key(ObStoreRowkey::MAX_STORE_ROWKEY);
|
||||
const bool is_major = medium_info.is_major_compaction();
|
||||
if (OB_FAIL(prepare_iter(result, table_iter))) {
|
||||
LOG_WARN("failed to get table iter", K(ret), K(range_array));
|
||||
} else if (OB_FAIL(input_range_array.push_back(range))) {
|
||||
LOG_WARN("failed to push back range", K(ret), K(range));
|
||||
} else if (OB_FAIL(range_spliter.get_split_multi_ranges(
|
||||
input_range_array,
|
||||
expected_task_count,
|
||||
tablet_.get_index_read_info(),
|
||||
table_iter,
|
||||
allocator_,
|
||||
range_array))) {
|
||||
LOG_WARN("failed to get split multi range", K(ret), K(range_array));
|
||||
} else if (OB_FAIL(medium_info.gene_parallel_info(allocator_, range_array))) {
|
||||
LOG_WARN("failed to get parallel ranges", K(ret), K(range_array));
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (expected_task_count <= 1) {
|
||||
medium_info.clear_parallel_range();
|
||||
} else {
|
||||
ObTableStoreIterator table_iter;
|
||||
ObArrayArray<ObStoreRange> range_array;
|
||||
ObPartitionMultiRangeSpliter range_spliter;
|
||||
ObSEArray<ObStoreRange, 1> input_range_array;
|
||||
ObStoreRange range;
|
||||
range.set_start_key(ObStoreRowkey::MIN_STORE_ROWKEY);
|
||||
range.set_end_key(ObStoreRowkey::MAX_STORE_ROWKEY);
|
||||
const bool is_major = medium_info.is_major_compaction();
|
||||
if (OB_FAIL(prepare_iter(result, table_iter))) {
|
||||
LOG_WARN("failed to get table iter", K(ret), K(range_array));
|
||||
} else if (OB_FAIL(input_range_array.push_back(range))) {
|
||||
LOG_WARN("failed to push back range", K(ret), K(range));
|
||||
} else if (OB_FAIL(range_spliter.get_split_multi_ranges(
|
||||
input_range_array,
|
||||
expected_task_count,
|
||||
tablet_->get_index_read_info(),
|
||||
table_iter,
|
||||
allocator_,
|
||||
range_array))) {
|
||||
LOG_WARN("failed to get split multi range", K(ret), K(range_array));
|
||||
} else if (OB_FAIL(medium_info.gene_parallel_info(allocator_, range_array))) {
|
||||
LOG_WARN("failed to get parallel ranges", K(ret), K(range_array));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -524,39 +550,44 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info(
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableStoreIterator table_iter;
|
||||
medium_info.cluster_id_ = GCONF.cluster_id; // set cluster id
|
||||
if (OB_ISNULL(tablet_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null tablet", K(ret), KP(tablet_));
|
||||
} else {
|
||||
ObTableStoreIterator table_iter;
|
||||
medium_info.cluster_id_ = GCONF.cluster_id; // set cluster id
|
||||
|
||||
if (medium_info.is_medium_compaction()) {
|
||||
ObStorageSchema tmp_storage_schema;
|
||||
bool use_storage_schema_on_tablet = true;
|
||||
if (medium_info.medium_snapshot_ > tablet_.get_snapshot_version()) {
|
||||
ObSEArray<ObITable*, MAX_MEMSTORE_CNT> memtables;
|
||||
if (OB_FAIL(tablet_.get_table_store().get_memtables(memtables, true/*need_active*/))) {
|
||||
LOG_WARN("failed to get memtables", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_latest_storage_schema_from_memtable(
|
||||
allocator_, memtables, tmp_storage_schema))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS; // clear errno
|
||||
if (medium_info.is_medium_compaction()) {
|
||||
ObStorageSchema tmp_storage_schema;
|
||||
bool use_storage_schema_on_tablet = true;
|
||||
if (medium_info.medium_snapshot_ > tablet_->get_snapshot_version()) {
|
||||
ObSEArray<ObITable*, MAX_MEMSTORE_CNT> memtables;
|
||||
if (OB_FAIL(tablet_->get_table_store().get_memtables(memtables, true/*need_active*/))) {
|
||||
LOG_WARN("failed to get memtables", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_latest_storage_schema_from_memtable(
|
||||
allocator_, memtables, tmp_storage_schema))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS; // clear errno
|
||||
} else {
|
||||
LOG_WARN("failed to get storage schema from memtable", K(ret));
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("failed to get storage schema from memtable", K(ret));
|
||||
use_storage_schema_on_tablet = false;
|
||||
}
|
||||
} else {
|
||||
use_storage_schema_on_tablet = false;
|
||||
}
|
||||
|
||||
if (FAILEDx(medium_info.save_storage_schema(
|
||||
allocator_,
|
||||
use_storage_schema_on_tablet ? tablet_->get_storage_schema() : tmp_storage_schema))) {
|
||||
LOG_WARN("failed to save storage schema", K(ret), K(use_storage_schema_on_tablet), K(tmp_storage_schema));
|
||||
}
|
||||
}
|
||||
|
||||
if (FAILEDx(medium_info.save_storage_schema(
|
||||
allocator_,
|
||||
use_storage_schema_on_tablet ? tablet_.get_storage_schema() : tmp_storage_schema))) {
|
||||
LOG_WARN("failed to save storage schema", K(ret), K(use_storage_schema_on_tablet), K(tmp_storage_schema));
|
||||
if (FAILEDx(init_parallel_range(result, medium_info))) {
|
||||
LOG_WARN("failed to init parallel range", K(ret), K(medium_info));
|
||||
} else {
|
||||
LOG_INFO("success to prepare medium info", K(ret), K(medium_info));
|
||||
}
|
||||
}
|
||||
if (FAILEDx(init_parallel_range(result, medium_info))) {
|
||||
LOG_WARN("failed to init parallel range", K(ret), K(medium_info));
|
||||
} else {
|
||||
LOG_INFO("success to prepare medium info", K(ret), K(medium_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -671,7 +702,10 @@ int ObMediumCompactionScheduleFunc::submit_medium_clog(
|
||||
return ret;
|
||||
}
|
||||
#endif
|
||||
if (OB_FAIL(tablet_.submit_medium_compaction_clog(medium_info, allocator_))) {
|
||||
if (OB_ISNULL(tablet_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null tablet", K(ret), KP(tablet_));
|
||||
} else if (OB_FAIL(tablet_->submit_medium_compaction_clog(medium_info, allocator_))) {
|
||||
LOG_WARN("failed to submit medium compaction clog", K(ret), K(medium_info));
|
||||
} else {
|
||||
LOG_INFO("success to submit medium compaction clog", K(ret), KPC(this), K(medium_info));
|
||||
@ -805,33 +839,43 @@ int ObMediumCompactionScheduleFunc::check_medium_checksum_table(
|
||||
}
|
||||
|
||||
// for Leader, clean wait_check_medium_scn
|
||||
// may change tablet_ to the new tablet
|
||||
int ObMediumCompactionScheduleFunc::check_medium_finish(const ObLSLocality &ls_locality)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObLSID &ls_id = ls_.get_ls_id();
|
||||
const ObTabletID &tablet_id = tablet_.get_tablet_meta().tablet_id_;
|
||||
const int64_t wait_check_medium_scn = tablet_.get_medium_compaction_info_list().get_wait_check_medium_scn();
|
||||
bool merge_finish = false;
|
||||
|
||||
if (0 == wait_check_medium_scn) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(check_medium_meta_table(wait_check_medium_scn, ls_id, tablet_id, ls_locality, merge_finish))) {
|
||||
LOG_WARN("failed to check inner table", K(ret), KPC(this));
|
||||
} else if (!merge_finish) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(check_medium_checksum_table(wait_check_medium_scn, ls_id, tablet_id))) { // check checksum
|
||||
LOG_WARN("failed to check checksum", K(ret), K(wait_check_medium_scn), KPC(this));
|
||||
if (OB_ISNULL(tablet_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null tablet", K(ret), KP(tablet_));
|
||||
} else {
|
||||
const ObMediumCompactionInfo::ObCompactionType compaction_type = tablet_.get_medium_compaction_info_list().get_last_compaction_type();
|
||||
FLOG_INFO("check medium compaction info", K(ret), K(ls_id), K(tablet_id), K(compaction_type));
|
||||
const ObLSID &ls_id = ls_.get_ls_id();
|
||||
const ObTabletID &tablet_id = tablet_->get_tablet_meta().tablet_id_;
|
||||
const int64_t wait_check_medium_scn = tablet_->get_medium_compaction_info_list().get_wait_check_medium_scn();
|
||||
bool merge_finish = false;
|
||||
|
||||
// clear wait_check_medium_scn on Tablet
|
||||
ObTabletHandle unused_handle;
|
||||
if (OB_FAIL(ls_.update_medium_compaction_info(tablet_id, unused_handle))) {
|
||||
LOG_WARN("failed to update medium compaction info", K(ret), K(ls_id), K(tablet_id));
|
||||
if (0 == wait_check_medium_scn) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(check_medium_meta_table(wait_check_medium_scn, ls_id, tablet_id, ls_locality, merge_finish))) {
|
||||
LOG_WARN("failed to check inner table", K(ret), KPC(this));
|
||||
} else if (!merge_finish) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(check_medium_checksum_table(wait_check_medium_scn, ls_id, tablet_id))) { // check checksum
|
||||
LOG_WARN("failed to check checksum", K(ret), K(wait_check_medium_scn), KPC(this));
|
||||
} else {
|
||||
const ObMediumCompactionInfo::ObCompactionType compaction_type = tablet_->get_medium_compaction_info_list().get_last_compaction_type();
|
||||
FLOG_INFO("check medium compaction info", K(ret), K(ls_id), K(tablet_id), K(compaction_type));
|
||||
|
||||
// clear wait_check_medium_scn on Tablet
|
||||
ObTabletHandle new_handle;
|
||||
if (OB_FAIL(ls_.update_medium_compaction_info(tablet_id, new_handle))) {
|
||||
LOG_WARN("failed to update medium compaction info", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (!new_handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid tablet handle", K(ret), K(ls_id), K(new_handle));
|
||||
} else {
|
||||
tablet_ = new_handle.get_obj();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,7 @@ namespace compaction
|
||||
class ObMediumCompactionScheduleFunc
|
||||
{
|
||||
public:
|
||||
ObMediumCompactionScheduleFunc(ObLS &ls, ObTablet &tablet)
|
||||
ObMediumCompactionScheduleFunc(ObLS &ls, ObTablet *tablet)
|
||||
: allocator_("MediumSchedule"),
|
||||
ls_(ls),
|
||||
tablet_(tablet),
|
||||
@ -141,7 +141,7 @@ protected:
|
||||
private:
|
||||
ObArenaAllocator allocator_;
|
||||
ObLS &ls_;
|
||||
ObTablet &tablet_;
|
||||
ObTablet *tablet_;
|
||||
bool filters_inited_;
|
||||
share::ObTabletReplicaFilterHolder filters_;
|
||||
};
|
||||
|
@ -982,7 +982,7 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
} else if (tablet_id.is_special_merge_tablet()) { // data tablet
|
||||
// do nothing
|
||||
} else {
|
||||
ObMediumCompactionScheduleFunc func(ls, *tablet);
|
||||
ObMediumCompactionScheduleFunc func(ls, tablet);
|
||||
ObITable *latest_major = tablet->get_table_store().get_major_sstables().get_boundary_table(true/*last*/);
|
||||
if (OB_NOT_NULL(latest_major) && latest_major->get_snapshot_version() >= merge_version) {
|
||||
tablet_merge_finish = true;
|
||||
@ -998,11 +998,15 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
}
|
||||
LOG_DEBUG("schedule tablet medium", K(ret), K(ls_id), K(tablet_id), K(tablet_merge_finish),
|
||||
KPC(latest_major), K(merge_version));
|
||||
bool could_schedule_next_medium = true;
|
||||
bool check_medium_finish = false;
|
||||
if (!is_leader || OB_ISNULL(latest_major)) {
|
||||
// follower or no major: do nothing
|
||||
could_schedule_next_medium = false;
|
||||
} else if (tablet->get_medium_compaction_info_list().need_check_finish()) { // need check finished
|
||||
if (OB_TMP_FAIL(func.check_medium_finish(ls_locality))) {
|
||||
LOG_WARN("failed to check medium finish", K(tmp_ret), K(ls_id), K(tablet_id));
|
||||
} else if (FALSE_IT(check_medium_finish = true)) {
|
||||
} else if (ObTimeUtility::fast_current_time() <
|
||||
tablet->get_medium_compaction_info_list().get_wait_check_medium_scn() + WAIT_MEDIUM_CHECK_THRESHOLD) {
|
||||
// need wait 10 mins before schedule meta major
|
||||
@ -1011,8 +1015,9 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
LOG_WARN("failed to schedule tablet merge", K(tmp_ret), K(ls_id), K(tablet_id));
|
||||
}
|
||||
}
|
||||
} else if (could_major_merge
|
||||
&& (!tablet_merge_finish || enable_adaptive_compaction_)
|
||||
}
|
||||
if (could_schedule_next_medium && could_major_merge
|
||||
&& (!tablet_merge_finish || enable_adaptive_compaction_ || check_medium_finish)
|
||||
&& OB_TMP_FAIL(func.schedule_next_medium_for_leader(
|
||||
tablet_merge_finish ? 0 : merge_version, schedule_stats_))) { // schedule another round
|
||||
LOG_WARN("failed to schedule next medium", K(tmp_ret), K(ls_id), K(tablet_id));
|
||||
|
Loading…
x
Reference in New Issue
Block a user