del useless code
This commit is contained in:
parent
5895ce4113
commit
3eb34fe37e
@ -230,7 +230,6 @@ int ObCOMergePrepareTask::create_schedule_dag(ObCOTabletMergeCtx &ctx)
|
||||
ObCOMergeScheduleDag *schedule_dag = nullptr;
|
||||
ObTabletMergeExecuteDag *minor_exe_dag = nullptr;
|
||||
ObTablet *tablet = ctx.get_tablet();
|
||||
result.create_snapshot_version_ = 0;
|
||||
result.version_range_.multi_version_start_ = ctx.get_tablet()->get_multi_version_start();
|
||||
result.version_range_.base_version_ = 0;
|
||||
result.version_range_.snapshot_version_ = ctx.get_tablet()->get_snapshot_version();
|
||||
|
@ -35,6 +35,7 @@ ObStaticMergeParam::ObStaticMergeParam(ObTabletMergeDagParam &dag_param)
|
||||
is_rebuild_column_store_(false),
|
||||
is_schema_changed_(false),
|
||||
need_parallel_minor_merge_(true),
|
||||
is_tenant_major_merge_(false),
|
||||
merge_level_(MICRO_BLOCK_MERGE_LEVEL),
|
||||
merge_reason_(ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE),
|
||||
sstable_logic_seq_(0),
|
||||
@ -176,7 +177,7 @@ int ObStaticMergeParam::get_basic_info_from_result(
|
||||
scn_range_ = get_merge_table_result.scn_range_;
|
||||
merge_scn_ = scn_range_.end_scn_;
|
||||
snapshot_info_ = get_merge_table_result.snapshot_info_;
|
||||
create_snapshot_version_ = get_merge_table_result.create_snapshot_version_;
|
||||
create_snapshot_version_ = tables_handle_.get_table(0)->get_snapshot_version();
|
||||
if (is_major_merge_type(get_merge_type())) {
|
||||
report_ = GCTX.ob_service_;
|
||||
}
|
||||
@ -997,7 +998,7 @@ int ObBasicTabletMergeCtx::get_medium_compaction_info()
|
||||
static_param_.schema_version_ = medium_info.storage_schema_.schema_version_;
|
||||
static_param_.data_version_ = medium_info.data_version_;
|
||||
static_param_.is_rebuild_column_store_ = (medium_info.medium_merge_reason_ == ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP);
|
||||
static_param_.dag_param_.is_tenant_major_merge_ = medium_info.is_major_compaction();
|
||||
static_param_.is_tenant_major_merge_ = medium_info.is_major_compaction();
|
||||
static_param_.is_schema_changed_ = medium_info.is_schema_changed_;
|
||||
static_param_.merge_reason_ = (ObAdaptiveMergePolicy::AdaptiveMergeReason)medium_info.medium_merge_reason_;
|
||||
FLOG_INFO("get storage schema to merge", "param", get_dag_param(), KPC_(static_param_.schema), K(medium_info));
|
||||
@ -1083,7 +1084,6 @@ int ObBasicTabletMergeCtx::get_meta_compaction_info()
|
||||
static_param_.schema_version_ = storage_schema->schema_version_;
|
||||
static_param_.data_version_ = DATA_CURRENT_VERSION;
|
||||
static_param_.is_rebuild_column_store_ = false;
|
||||
static_param_.dag_param_.is_tenant_major_merge_ = false;
|
||||
static_param_.is_schema_changed_ = true; // use MACRO_BLOCK_MERGE_LEVEL
|
||||
static_param_.merge_reason_ = ObAdaptiveMergePolicy::TOMBSTONE_SCENE;
|
||||
FLOG_INFO("get storage schema to meta merge", "param", get_dag_param(), KPC_(static_param_.schema));
|
||||
|
@ -54,7 +54,7 @@ public:
|
||||
TO_STRING_KV(K_(dag_param), K_(scn_range), K_(version_range),
|
||||
K_(is_full_merge), K_(concurrent_cnt), K_(merge_level),
|
||||
"merge_reason", ObAdaptiveMergePolicy::merge_reason_to_str(merge_reason_),
|
||||
K_(sstable_logic_seq), K_(tables_handle), K_(is_rebuild_column_store), K_(is_schema_changed),
|
||||
K_(sstable_logic_seq), K_(tables_handle), K_(is_rebuild_column_store), K_(is_schema_changed), K_(is_tenant_major_merge),
|
||||
K_(read_base_version), K_(merge_scn), K_(need_parallel_minor_merge),
|
||||
K_(progressive_merge_round), K_(progressive_merge_step), K_(progressive_merge_num),
|
||||
K_(schema_version), KP_(schema), K_(multi_version_column_descs), K_(ls_handle), K_(snapshot_info), KP_(report));
|
||||
@ -64,6 +64,7 @@ public:
|
||||
bool is_rebuild_column_store_;
|
||||
bool is_schema_changed_;
|
||||
bool need_parallel_minor_merge_;
|
||||
bool is_tenant_major_merge_;
|
||||
ObMergeLevel merge_level_;
|
||||
ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason_;
|
||||
int16_t sstable_logic_seq_;
|
||||
@ -177,12 +178,12 @@ public:
|
||||
CTX_DEFINE_FUNC(var_type, get_dag_param(), var_name)
|
||||
#define STATIC_PARAM_FUNC(var_type, var_name) \
|
||||
CTX_DEFINE_FUNC(var_type, static_param_, var_name)
|
||||
DAG_PARAM_FUNC(bool, is_tenant_major_merge);
|
||||
DAG_PARAM_FUNC(ObMergeType, merge_type);
|
||||
DAG_PARAM_FUNC(const ObLSID &, ls_id);
|
||||
DAG_PARAM_FUNC(const ObTabletID &, tablet_id);
|
||||
DAG_PARAM_FUNC(int64_t, merge_version);
|
||||
DAG_PARAM_FUNC(int64_t, transfer_seq);
|
||||
STATIC_PARAM_FUNC(bool, is_tenant_major_merge);
|
||||
STATIC_PARAM_FUNC(bool, is_full_merge);
|
||||
STATIC_PARAM_FUNC(bool, need_parallel_minor_merge);
|
||||
STATIC_PARAM_FUNC(int64_t, read_base_version);
|
||||
|
@ -83,71 +83,64 @@ int ObMediumCompactionScheduleFunc::choose_medium_snapshot(
|
||||
LOG_WARN("failed to get meta merge tables", K(ret), K(param));
|
||||
}
|
||||
} else {
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
||||
medium_info.medium_merge_reason_ = merge_reason;
|
||||
medium_info.medium_snapshot_ = result.version_range_.snapshot_version_;
|
||||
medium_info.set_basic_info(
|
||||
ObMediumCompactionInfo::MEDIUM_COMPACTION,
|
||||
merge_reason,
|
||||
result.version_range_.snapshot_version_);
|
||||
LOG_TRACE("choose_medium_snapshot", K(ret), "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_, K(result), K(medium_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObArenaAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version)
|
||||
int ObMediumCompactionScheduleFunc::find_valid_freeze_info(
|
||||
ObTablet &tablet,
|
||||
ObArenaAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
share::ObFreezeInfo &freeze_info,
|
||||
bool &force_schedule_medium_merge)
|
||||
{
|
||||
UNUSED(merge_reason);
|
||||
int ret = OB_SUCCESS;
|
||||
const ObLSID &ls_id = ls.get_ls_id();
|
||||
force_schedule_medium_merge = false;
|
||||
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
share::ObFreezeInfo freeze_info;
|
||||
int64_t schedule_snapshot = 0;
|
||||
bool schedule_with_newer_info = false;
|
||||
const int64_t scheduler_frozen_version = MTL(ObTenantTabletScheduler*)->get_frozen_version();
|
||||
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
|
||||
ObSSTable *last_major = nullptr;
|
||||
int64_t last_sstable_schema_version = 0;
|
||||
bool schedule_with_newer_info = false;
|
||||
ObMultiVersionSchemaService *schema_service = nullptr;
|
||||
if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) {
|
||||
if (OB_ISNULL(schema_service = MTL(ObTenantSchemaService *)->get_schema_service())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("failed to get schema service from MTL", K(ret));
|
||||
} else if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) {
|
||||
LOG_WARN("load medium info list fail", K(ret), K(tablet));
|
||||
} else {
|
||||
last_major = static_cast<ObSSTable *>(table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true/*last*/));
|
||||
if (OB_ISNULL(last_major)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("major sstable is unexpected null", K(ret), KPC(last_major));
|
||||
LOG_WARN("major sstable is unexpected null", K(ret), K(tablet_id), KPC(last_major));
|
||||
} else if (OB_FAIL(last_major->get_frozen_schema_version(last_sstable_schema_version))) {
|
||||
LOG_WARN("failed to get frozen schema version", KR(ret), KPC(last_major));
|
||||
} else {
|
||||
schedule_snapshot = last_major->get_snapshot_version();
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && OB_ISNULL(schema_service = MTL(ObTenantSchemaService *)->get_schema_service())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("failed to get schema service from MTL", K(ret));
|
||||
}
|
||||
|
||||
bool schedule_medium_merge = false;
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(MTL_CALL_FREEZE_INFO_MGR(get_freeze_info_behind_snapshot_version, schedule_snapshot, freeze_info))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get freeze info", K(ret), K(schedule_snapshot), K(ls_id), K(tablet_id));
|
||||
LOG_WARN("failed to get freeze info", K(ret), K(tablet_id), K(schedule_snapshot));
|
||||
} else {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
}
|
||||
} else if (OB_UNLIKELY(freeze_info.schema_version_ <= 0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema version is invalid", K(ret), K(ls_id), K(tablet_id), K(freeze_info));
|
||||
LOG_WARN("schema version is invalid", K(ret), K(freeze_info));
|
||||
} else if (OB_UNLIKELY(freeze_info.schema_version_ < last_sstable_schema_version)) {
|
||||
schedule_medium_merge = true;
|
||||
force_schedule_medium_merge = true;
|
||||
FLOG_INFO("schema version in freeze info is too small, try to schedule medium compaction instead", K(ret),
|
||||
K(ls_id), K(tablet_id), K(last_sstable_schema_version), K(freeze_info));
|
||||
K(tablet_id), K(last_sstable_schema_version), K(freeze_info));
|
||||
break;
|
||||
} else if (OB_FAIL(get_table_schema_to_merge(
|
||||
*schema_service, tablet, freeze_info.schema_version_, medium_info.medium_compat_version_, allocator, medium_info.storage_schema_))) {
|
||||
@ -162,37 +155,57 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("failed to get table schema", K(ret), K(ls_id), K(tablet_id), K(medium_info));
|
||||
LOG_WARN("failed to get table schema", K(ret), K(medium_info));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) { // success to get table schema
|
||||
if (schedule_with_newer_info) {
|
||||
FLOG_INFO("schedule with newer freeze info", K(ret), K(ls_id), K(tablet_id), K(freeze_info));
|
||||
FLOG_INFO("schedule with newer freeze info", K(ret), K(freeze_info));
|
||||
}
|
||||
break;
|
||||
}
|
||||
} // end of while
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (schedule_medium_merge) {
|
||||
int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason &merge_reason,
|
||||
ObArenaAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObLSID &ls_id = ls.get_ls_id();
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
share::ObFreezeInfo freeze_info;
|
||||
bool force_schedule_medium_merge = false;
|
||||
|
||||
if (OB_FAIL(find_valid_freeze_info(tablet, allocator, medium_info, freeze_info, force_schedule_medium_merge))) {
|
||||
LOG_WARN("failed to find valid freeze info", KR(ret));
|
||||
} else if (force_schedule_medium_merge) {
|
||||
if (OB_FAIL(switch_to_choose_medium_snapshot(func, allocator, ls, tablet, freeze_info.frozen_scn_.get_val_for_tx(), medium_info, schema_version))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("failed to switch to choose medium snapshot", K(ret), K(tablet));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MAJOR_COMPACTION;
|
||||
medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::TENANT_MAJOR;
|
||||
medium_info.medium_snapshot_ = freeze_info.frozen_scn_.get_val_for_tx();
|
||||
medium_info.set_basic_info(
|
||||
ObMediumCompactionInfo::MAJOR_COMPACTION,
|
||||
ObAdaptiveMergePolicy::AdaptiveMergeReason::TENANT_MAJOR,
|
||||
freeze_info.frozen_scn_.get_val_for_tx());
|
||||
schema_version = freeze_info.schema_version_;
|
||||
}
|
||||
|
||||
if (FAILEDx(get_result_for_major(tablet, medium_info, result))) {
|
||||
if (FAILEDx(ObPartitionMergePolicy::get_result_by_snapshot(tablet, medium_info.medium_snapshot_, result))) {
|
||||
LOG_WARN("failed get result for major", K(ret), K(medium_info));
|
||||
} else {
|
||||
LOG_TRACE("choose_major_snapshot", K(ret), "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_, K(medium_info), K(freeze_info), K(result),
|
||||
K(last_sstable_schema_version), K(medium_info), K(schema_version));
|
||||
K(medium_info), K(schema_version));
|
||||
#ifdef ERRSIM
|
||||
if (tablet.get_tablet_meta().tablet_id_.id() == 1) {
|
||||
ret = OB_E(EventTable::EN_SPECIAL_TABLE_HAVE_LARGER_SCN) ret;
|
||||
@ -228,9 +241,10 @@ int ObMediumCompactionScheduleFunc::switch_to_choose_medium_snapshot(
|
||||
} else if (OB_FAIL(tablet.get_newest_schema_version(schema_version))) {
|
||||
LOG_WARN("fail to choose medium schema version", K(ret), K(tablet));
|
||||
} else {
|
||||
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
|
||||
medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE;
|
||||
medium_info.medium_snapshot_ = medium_snapshot;
|
||||
medium_info.set_basic_info(
|
||||
ObMediumCompactionInfo::MEDIUM_COMPACTION,
|
||||
ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE,
|
||||
medium_snapshot);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -373,8 +387,10 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
bool schedule_flag = false;
|
||||
if (OB_FAIL(ret) || !schedule_medium_flag) {
|
||||
} else if (MTL(ObTenantTabletScheduler*)->get_inner_table_merged_scn() >= last_major_snapshot_version) {
|
||||
schedule_flag = true;
|
||||
} else if (ObMediumCompactionInfo::MAJOR_COMPACTION == medium_info_list_->get_last_compaction_type()) {
|
||||
// for normal medium, checksum error happened, wait_check_medium_scn_ will never = 0
|
||||
// for major, need select inner_table to check RS status
|
||||
@ -382,12 +398,15 @@ int ObMediumCompactionScheduleFunc::schedule_next_medium_primary_cluster(
|
||||
LOG_WARN("failed to get status from inner tablet", K(ret), KPC(this));
|
||||
} else if (ret_info.could_schedule_next_round(medium_info_list_->get_last_compaction_scn())) {
|
||||
LOG_INFO("success to check RS major checksum validation finished", K(ret), KPC(this), K(ret_info));
|
||||
ret = decide_medium_snapshot(adaptive_merge_reason);
|
||||
schedule_flag = true;
|
||||
} else if (OB_NOT_NULL(schedule_stat_)) {
|
||||
++schedule_stat_->wait_rs_validate_cnt_;
|
||||
LOG_TRACE("cannot schedule next round merge now", K(ret), K(ret_info));
|
||||
}
|
||||
} else {
|
||||
schedule_flag = true;
|
||||
}
|
||||
if (OB_SUCC(ret) && schedule_flag) {
|
||||
ret = decide_medium_snapshot(adaptive_merge_reason);
|
||||
}
|
||||
|
||||
@ -440,52 +459,22 @@ int ObMediumCompactionScheduleFunc::choose_medium_scn_for_user_request(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::check_medium_scn_valid_and_fix(ObMediumCompactionInfo &medium_info)
|
||||
int ObMediumCompactionScheduleFunc::check_frequency(
|
||||
const int64_t max_reserved_snapshot,
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
const int64_t snapshot_gc_ts = MTL(ObTenantFreezeInfoMgr*)->get_snapshot_gc_ts();
|
||||
ObTablet *tablet = nullptr;
|
||||
|
||||
if (OB_UNLIKELY(!tablet_handle_.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid tablet_handle", K(ret), K(tablet_handle_));
|
||||
} else if (FALSE_IT(tablet = tablet_handle_.get_obj())) {
|
||||
} else if (OB_FAIL(tablet->get_max_sync_medium_scn(max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get max reserved snapshot", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot || medium_info.medium_snapshot_ >= snapshot_gc_ts) {
|
||||
// chosen medium snapshot is far too old or too new
|
||||
LOG_INFO("chosen medium snapshot is invalid for max_reserved_snapshot", K(ret), KPC(this),
|
||||
K(medium_info), K(max_reserved_snapshot));
|
||||
const share::SCN &weak_read_ts = ls_.get_ls_wrs_handler()->get_ls_weak_read_ts();
|
||||
if (medium_info.medium_snapshot_ == tablet->get_snapshot_version() // no uncommitted sstable
|
||||
&& weak_read_ts.get_val_for_tx() <= max_reserved_snapshot
|
||||
&& weak_read_ts.convert_to_ts() + DEFAULT_SCHEDULE_MEDIUM_INTERVAL < ObTimeUtility::current_time_ns()) {
|
||||
const int64_t snapshot_gc_ts = MTL(ObTenantFreezeInfoMgr*)->get_snapshot_gc_ts();
|
||||
medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, MIN(weak_read_ts.get_val_for_tx(), snapshot_gc_ts));
|
||||
LOG_INFO("use weak_read_ts to schedule medium", K(ret), KPC(this),
|
||||
K(medium_info), K(max_reserved_snapshot), K(weak_read_ts), K(snapshot_gc_ts));
|
||||
} else {
|
||||
ObTablet *tablet = tablet_handle_.get_obj();
|
||||
const int64_t current_time = ObTimeUtility::current_time_ns();
|
||||
if (max_reserved_snapshot < current_time) {
|
||||
const int64_t time_interval = (current_time - max_reserved_snapshot) / 2;
|
||||
const int64_t last_major_snapshot_version = tablet->get_last_major_snapshot_version();
|
||||
if (0 >= last_major_snapshot_version) {
|
||||
LOG_WARN("major sstable should not be empty", K(ret), K(last_major_snapshot_version));
|
||||
} else if (last_major_snapshot_version + time_interval > medium_info.medium_snapshot_) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
}
|
||||
}
|
||||
// check schedule frequently
|
||||
if (OB_SUCC(ret)) {
|
||||
const int64_t current_time = ObTimeUtility::current_time_ns();
|
||||
if (max_reserved_snapshot < current_time) {
|
||||
const int64_t time_interval = (current_time - max_reserved_snapshot) / 2;
|
||||
const int64_t last_major_snapshot_version = tablet->get_last_major_snapshot_version();
|
||||
if (0 >= last_major_snapshot_version) {
|
||||
LOG_WARN("major sstable should not be empty", K(ret), K(last_major_snapshot_version));
|
||||
} else if (last_major_snapshot_version + time_interval > medium_info.medium_snapshot_) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_DEBUG("schedule medium frequently", K(ret), K(last_major_snapshot_version), K(medium_info), K(time_interval));
|
||||
}
|
||||
LOG_DEBUG("schedule medium frequently", K(ret), K(last_major_snapshot_version), K(medium_info),
|
||||
K(time_interval));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -499,7 +488,6 @@ int ObMediumCompactionScheduleFunc::get_max_reserved_snapshot(int64_t &max_reser
|
||||
ObStorageSnapshotInfo snapshot_info;
|
||||
int64_t last_major_snapshot_version = 0;
|
||||
ObTablet *tablet = nullptr;
|
||||
|
||||
if (OB_UNLIKELY(!tablet_handle_.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid tablet_handle", K(ret), K(tablet_handle_));
|
||||
@ -533,9 +521,10 @@ int ObMediumCompactionScheduleFunc::choose_new_medium_snapshot(
|
||||
&& weak_read_ts_ + DEFAULT_SCHEDULE_MEDIUM_INTERVAL < ObTimeUtility::current_time_ns()) {
|
||||
snapshot_gc_ts = MTL(ObTenantFreezeInfoMgr *)->get_snapshot_gc_ts();
|
||||
// data before weak_read_ts & latest storage schema on memtable is match for schedule medium
|
||||
medium_info.medium_snapshot_ = MIN(weak_read_ts_, snapshot_gc_ts);
|
||||
medium_info.medium_snapshot_ = MAX(max_reserved_snapshot, MIN(weak_read_ts_, snapshot_gc_ts));
|
||||
}
|
||||
if (medium_info.medium_snapshot_ < max_reserved_snapshot) {
|
||||
// may not rewrite medium_snapshot above
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else {
|
||||
LOG_INFO("use weak_read_ts to schedule medium", K(ret), KPC(this),
|
||||
@ -609,7 +598,7 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (is_major || is_user_request(merge_reason)) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(check_medium_scn_valid_and_fix(medium_info))) { // check schedule interval
|
||||
} else if (OB_FAIL(check_frequency(max_reserved_snapshot, medium_info))) { // check schedule interval
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to check medium scn valid", K(ret), KPC(this));
|
||||
}
|
||||
@ -644,17 +633,6 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
} else {
|
||||
LOG_WARN("failed to prepare medium info", K(ret), K(result));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
// add schedule suspect info
|
||||
if (OB_TMP_FAIL(ADD_SUSPECT_INFO(MEDIUM_MERGE, ObDiagnoseTabletType::TYPE_MEDIUM_MERGE,
|
||||
ls_.get_ls_id(), tablet_id,
|
||||
ObSuspectInfoType::SUSPECT_SCHEDULE_MEDIUM_FAILED,
|
||||
medium_info.medium_snapshot_,
|
||||
medium_info.storage_schema_.store_column_cnt_,
|
||||
static_cast<int64_t>(ret)))) {
|
||||
LOG_WARN("failed to add suspect info", K(tmp_ret));
|
||||
}
|
||||
}
|
||||
} else if (OB_FAIL(submit_medium_clog(medium_info))) {
|
||||
LOG_WARN("failed to submit medium clog and update inner table", K(ret), KPC(this));
|
||||
} else if (OB_NOT_NULL(schedule_stat_)) {
|
||||
@ -793,49 +771,6 @@ int ObMediumCompactionScheduleFunc::init_parallel_range_and_schema_changed(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::get_result_for_major(
|
||||
ObTablet &tablet,
|
||||
const ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSSTable *base_table = nullptr;
|
||||
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
|
||||
|
||||
if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) {
|
||||
LOG_WARN("fail to fetch table store", K(ret));
|
||||
} else if (OB_UNLIKELY(!table_store_wrapper.get_member()->is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid argument", K(ret), K(medium_info), KPC(table_store_wrapper.get_member()));
|
||||
} else if (OB_ISNULL(base_table = static_cast<ObSSTable*>(
|
||||
table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true/*last*/)))) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
LOG_WARN("major sstable not exist", K(ret), KPC(table_store_wrapper.get_member()));
|
||||
} else if (base_table->get_snapshot_version() >= medium_info.medium_snapshot_) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (OB_FAIL(result.handle_.add_sstable(base_table, table_store_wrapper.get_meta_handle()))) {
|
||||
LOG_WARN("failed to add table into iterator", K(ret), KP(base_table));
|
||||
} else {
|
||||
const ObSSTableArray &minor_tables = table_store_wrapper.get_member()->get_minor_sstables();
|
||||
bool start_add_table_flag = false;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < minor_tables.count(); ++i) {
|
||||
if (OB_ISNULL(minor_tables[i])) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("table must not null", K(ret), K(i), K(minor_tables));
|
||||
} else if (!start_add_table_flag
|
||||
&& minor_tables[i]->get_upper_trans_version() >= base_table->get_snapshot_version()) {
|
||||
start_add_table_flag = true;
|
||||
}
|
||||
if (OB_SUCC(ret) && start_add_table_flag) {
|
||||
if (OB_FAIL(result.handle_.add_sstable(minor_tables[i], table_store_wrapper.get_meta_handle()))) {
|
||||
LOG_WARN("failed to add table", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::prepare_iter(
|
||||
const ObGetMergeTablesResult &result,
|
||||
ObTableStoreIterator &table_iter)
|
||||
@ -861,7 +796,6 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info(
|
||||
int ret = OB_SUCCESS;
|
||||
medium_info.cluster_id_ = GCONF.cluster_id;
|
||||
medium_info.tenant_id_ = MTL_ID();
|
||||
// get table schema
|
||||
if (OB_UNLIKELY(!tablet_handle_.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid tablet_handle", K(ret), K(tablet_handle_));
|
||||
@ -888,13 +822,8 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info(
|
||||
}
|
||||
if (FAILEDx(init_parallel_range_and_schema_changed(result, medium_info))) {
|
||||
LOG_WARN("failed to init parallel range", K(ret), K(medium_info));
|
||||
} else if (OB_UNLIKELY(result.handle_.empty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table handle in result is empty", KR(ret), K(result));
|
||||
} else {
|
||||
medium_info.last_medium_snapshot_ = result.handle_.get_table(0)->get_snapshot_version();
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
LOG_INFO("success to prepare medium info", K(ret), K(medium_info));
|
||||
}
|
||||
return ret;
|
||||
@ -1060,7 +989,7 @@ int ObMediumCompactionScheduleFunc::batch_check_medium_meta_table(
|
||||
const int64_t check_medium_scn = tablet_ls_infos.at(i).get_medium_scn();
|
||||
if (tablet_ls_info.get_ls_id() != ls_id
|
||||
|| tablet_ls_info.get_tablet_id() != tablet_id) {
|
||||
LOG_INFO("tablet_ls_info has been deleted", K(tablet_ls_info));
|
||||
LOG_INFO("tablet_ls_info has been deleted", K(tablet_ls_info), K(tablet_info));
|
||||
} else {
|
||||
if (OB_TMP_FAIL(check_medium_meta_table(check_medium_scn, tablet_info, filters, ls_info_map, merge_finish))) {
|
||||
LOG_WARN("failed to check medium meta table", K(tmp_ret), K(check_medium_scn), K(tablet_info));
|
||||
@ -1310,7 +1239,6 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
||||
return ret;
|
||||
}
|
||||
#endif
|
||||
|
||||
const int64_t last_major_snapshot = tablet.get_last_major_snapshot_version();
|
||||
if (MTL(ObTenantTabletScheduler *)->could_major_merge_start() && last_major_snapshot > 0) {
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
@ -1336,7 +1264,6 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
||||
} else {
|
||||
schedule_flag = true;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) || !schedule_flag) {
|
||||
} else {
|
||||
const int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot;
|
||||
@ -1412,9 +1339,9 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule(
|
||||
const ObLSID &ls_id = ls.get_ls_id();
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
|
||||
if (OB_UNLIKELY(0 == schedule_scn || !ObMediumCompactionInfo::is_valid_compaction_type(compaction_type))) {
|
||||
if (OB_UNLIKELY(0 == schedule_scn)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(schedule_scn), K(compaction_type));
|
||||
LOG_WARN("invalid argument", K(ret), K(schedule_scn));
|
||||
} else if (OB_FAIL(ObPartitionMergePolicy::check_need_medium_merge(
|
||||
ls,
|
||||
tablet,
|
||||
@ -1428,8 +1355,7 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule(
|
||||
ls.get_ls_id(),
|
||||
tablet,
|
||||
MEDIUM_MERGE,
|
||||
schedule_scn,
|
||||
ObMediumCompactionInfo::is_major_compaction(compaction_type)))) {
|
||||
schedule_scn))) {
|
||||
if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) {
|
||||
ret = tmp_ret;
|
||||
LOG_WARN("failed to schedule medium merge dag", K(ret), K(ls_id), K(tablet_id));
|
||||
|
@ -97,10 +97,6 @@ protected:
|
||||
ObMediumCompactionInfo &medium_info);
|
||||
int init_schema_changed(
|
||||
ObMediumCompactionInfo &medium_info);
|
||||
static int get_result_for_major(
|
||||
ObTablet &tablet,
|
||||
const ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result);
|
||||
int prepare_iter(
|
||||
const ObGetMergeTablesResult &result,
|
||||
ObTableStoreIterator &table_iter);
|
||||
@ -143,6 +139,12 @@ protected:
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
int64_t &schema_version);
|
||||
static int find_valid_freeze_info(
|
||||
ObTablet &tablet,
|
||||
ObArenaAllocator &allocator,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
share::ObFreezeInfo &freeze_info,
|
||||
bool &force_schedule_medium_merge);
|
||||
static int switch_to_choose_medium_snapshot(
|
||||
const ObMediumCompactionScheduleFunc &func,
|
||||
ObArenaAllocator &allocator,
|
||||
@ -175,7 +177,9 @@ protected:
|
||||
const int64_t schema_version,
|
||||
uint64_t &table_id);
|
||||
|
||||
int check_medium_scn_valid_and_fix(ObMediumCompactionInfo &medium_info);
|
||||
int check_frequency(
|
||||
const int64_t max_reserved_snapshot,
|
||||
ObMediumCompactionInfo &medium_info);
|
||||
int choose_medium_scn_for_user_request(
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result,
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "storage/ob_storage_schema.h"
|
||||
#include "lib/container/ob_array_array.h"
|
||||
#include "observer/ob_server_struct.h"
|
||||
#include "storage/compaction/ob_partition_merge_policy.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -180,6 +181,15 @@ public:
|
||||
int assign(ObIAllocator &allocator, const ObMediumCompactionInfo &medium_info);
|
||||
int init(ObIAllocator &allocator, const ObMediumCompactionInfo &medium_info);
|
||||
int init_data_version();
|
||||
void set_basic_info(
|
||||
const ObCompactionType type,
|
||||
const ObAdaptiveMergePolicy::AdaptiveMergeReason merge_reason,
|
||||
const int64_t medium_snapshot)
|
||||
{
|
||||
compaction_type_ = type;
|
||||
medium_merge_reason_ = merge_reason;
|
||||
medium_snapshot_ = medium_snapshot;
|
||||
}
|
||||
int gene_parallel_info(
|
||||
ObIAllocator &allocator,
|
||||
common::ObArrayArray<ObStoreRange> ¶l_range);
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include "storage/tx/ob_trans_define.h"
|
||||
#include "storage/tablet/ob_tablet_obj_load_helper.h"
|
||||
#include "storage/tablet/ob_tablet_service_clog_replay_executor.h"
|
||||
#include "storage/compaction/ob_tenant_tablet_scheduler.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -678,5 +679,17 @@ int ObMediumCompactionInfoList::get_max_sync_medium_scn(int64_t &max_sync_medium
|
||||
return ret;
|
||||
}
|
||||
|
||||
} //namespace compaction
|
||||
bool ObMediumCompactionInfoList::need_check_finish() const
|
||||
{
|
||||
const int64_t wait_check_scn = get_wait_check_medium_scn();
|
||||
bool need_check = (wait_check_scn > 0);
|
||||
#ifndef ERRSIM
|
||||
if (need_check && ObMediumCompactionInfo::MAJOR_COMPACTION == get_last_compaction_type()) {
|
||||
need_check = wait_check_scn > MTL(ObTenantTabletScheduler*)->get_inner_table_merged_scn();
|
||||
}
|
||||
#endif
|
||||
return need_check;
|
||||
}
|
||||
|
||||
} // namespace compaction
|
||||
} // namespace oceanbase
|
||||
|
@ -112,7 +112,7 @@ public:
|
||||
}
|
||||
OB_INLINE const MediumInfoList &get_list() const { return medium_info_list_; }
|
||||
OB_INLINE int64_t get_wait_check_medium_scn() const { return extra_info_.wait_check_flag_ ? extra_info_.last_medium_scn_ : 0; }
|
||||
OB_INLINE bool need_check_finish() const { return get_wait_check_medium_scn() > 0; }
|
||||
bool need_check_finish() const;
|
||||
// check status on serialized medium list
|
||||
OB_INLINE bool could_schedule_next_round(const int64_t last_major_snapshot) const
|
||||
{
|
||||
|
@ -154,9 +154,6 @@ int ObPartitionMergePolicy::get_medium_merge_tables(
|
||||
result.version_range_.snapshot_version_ = param.merge_version_;
|
||||
if (OB_FAIL(get_multi_version_start(param.merge_type_, ls, tablet, result.version_range_, result.snapshot_info_))) {
|
||||
LOG_WARN("failed to get multi version_start", K(ret));
|
||||
} else {
|
||||
result.read_base_version_ = base_table->get_snapshot_version();
|
||||
result.create_snapshot_version_ = base_table->get_snapshot_version();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -1532,7 +1529,6 @@ int ObAdaptiveMergePolicy::add_meta_merge_result(
|
||||
result.version_range_.base_version_ = 0;
|
||||
result.version_range_.multi_version_start_ = table->get_snapshot_version();
|
||||
result.version_range_.snapshot_version_ = table->get_snapshot_version();
|
||||
result.create_snapshot_version_ = table->get_snapshot_version();
|
||||
} else if (update_snapshot_flag) {
|
||||
int64_t max_snapshot = MAX(result.version_range_.snapshot_version_, table->get_max_merged_trans_version());
|
||||
result.version_range_.multi_version_start_ = max_snapshot;
|
||||
|
@ -306,7 +306,6 @@ void ObCompactionParam::estimate_concurrent_count(const compaction::ObMergeType
|
||||
*/
|
||||
ObTabletMergeDagParam::ObTabletMergeDagParam()
|
||||
: skip_get_tablet_(false),
|
||||
is_tenant_major_merge_(false),
|
||||
need_swap_tablet_flag_(false),
|
||||
is_reserve_mode_(false),
|
||||
merge_type_(INVALID_MERGE_TYPE),
|
||||
@ -323,7 +322,6 @@ ObTabletMergeDagParam::ObTabletMergeDagParam(
|
||||
const ObTabletID &tablet_id,
|
||||
const int64_t transfer_seq)
|
||||
: skip_get_tablet_(false),
|
||||
is_tenant_major_merge_(false),
|
||||
need_swap_tablet_flag_(false),
|
||||
is_reserve_mode_(false),
|
||||
merge_type_(merge_type),
|
||||
|
@ -152,10 +152,9 @@ struct ObTabletMergeDagParam : public share::ObIDagInitParam
|
||||
const int64_t transfer_seq);
|
||||
virtual bool is_valid() const override;
|
||||
VIRTUAL_TO_STRING_KV(K_(skip_get_tablet), "merge_type", merge_type_to_str(merge_type_), K_(merge_version),
|
||||
K_(ls_id), K_(tablet_id), K_(is_tenant_major_merge), K_(need_swap_tablet_flag), K_(is_reserve_mode), K_(transfer_seq));
|
||||
K_(ls_id), K_(tablet_id), K_(need_swap_tablet_flag), K_(is_reserve_mode), K_(transfer_seq));
|
||||
|
||||
bool skip_get_tablet_;
|
||||
bool is_tenant_major_merge_;
|
||||
bool need_swap_tablet_flag_;
|
||||
bool is_reserve_mode_;
|
||||
compaction::ObMergeType merge_type_;
|
||||
|
@ -218,6 +218,7 @@ ObTenantTabletScheduler::ObTenantTabletScheduler()
|
||||
gc_sst_tablet_iter_(false/*is_major*/),
|
||||
schedule_tablet_batch_size_(0),
|
||||
error_tablet_cnt_(0),
|
||||
loop_cnt_(0),
|
||||
prohibit_medium_map_(),
|
||||
timer_task_mgr_()
|
||||
{
|
||||
@ -340,10 +341,7 @@ int ObTenantTabletScheduler::reload_tenant_config()
|
||||
} // end of ObTenantConfigGuard
|
||||
if (OB_FAIL(timer_task_mgr_.restart_scheduler_timer_task(merge_schedule_interval))) {
|
||||
LOG_WARN("failed to restart scheduler timer", K(ret));
|
||||
} else {
|
||||
schedule_tablet_batch_size_ = schedule_batch_size;
|
||||
}
|
||||
if (OB_SUCC(ret) && schedule_tablet_batch_size_ != schedule_batch_size) {
|
||||
} else if (schedule_tablet_batch_size_ != schedule_batch_size) {
|
||||
schedule_tablet_batch_size_ = schedule_batch_size;
|
||||
LOG_INFO("succeeded to reload new merge schedule tablet batch cnt", K(schedule_tablet_batch_size_));
|
||||
}
|
||||
@ -557,6 +555,7 @@ int ObTenantTabletScheduler::schedule_merge(const int64_t broadcast_version)
|
||||
if (OB_TMP_FAIL(MTL(ObTenantCompactionProgressMgr *)->add_progress(broadcast_version))) {
|
||||
LOG_WARN("failed to add progress", K(tmp_ret), K(broadcast_version));
|
||||
}
|
||||
loop_cnt_ = 0;
|
||||
clear_error_tablet_cnt();
|
||||
|
||||
schedule_stats_.start_merge(); // set all statistics
|
||||
@ -832,8 +831,7 @@ int ObTenantTabletScheduler::schedule_merge_dag(
|
||||
const ObLSID &ls_id,
|
||||
const storage::ObTablet &tablet,
|
||||
const ObMergeType merge_type,
|
||||
const int64_t &merge_snapshot_version,
|
||||
const bool is_tenant_major_merge)
|
||||
const int64_t &merge_snapshot_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_major_merge_type(merge_type) && !tablet.is_row_store()) {
|
||||
@ -842,7 +840,6 @@ int ObTenantTabletScheduler::schedule_merge_dag(
|
||||
param.tablet_id_ = tablet.get_tablet_meta().tablet_id_;
|
||||
param.merge_type_ = merge_type;
|
||||
param.merge_version_ = merge_snapshot_version;
|
||||
param.is_tenant_major_merge_ = is_tenant_major_merge;
|
||||
param.compat_mode_ = tablet.get_tablet_meta().compat_mode_;
|
||||
param.transfer_seq_ = tablet.get_tablet_meta().transfer_info_.transfer_seq_;
|
||||
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tablet_co_merge_dag_net(param))) {
|
||||
@ -857,7 +854,6 @@ int ObTenantTabletScheduler::schedule_merge_dag(
|
||||
param.tablet_id_ = tablet.get_tablet_meta().tablet_id_;
|
||||
param.merge_type_ = merge_type;
|
||||
param.merge_version_ = merge_snapshot_version;
|
||||
param.is_tenant_major_merge_ = is_tenant_major_merge;
|
||||
param.transfer_seq_ = tablet.get_tablet_meta().transfer_info_.transfer_seq_;
|
||||
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tablet_merge_dag(param))) {
|
||||
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
|
||||
@ -924,7 +920,6 @@ int ObTenantTabletScheduler::schedule_tablet_meta_merge(
|
||||
dag_param.tablet_id_ = tablet->get_tablet_meta().tablet_id_;
|
||||
dag_param.merge_type_ = META_MAJOR_MERGE;
|
||||
dag_param.merge_version_ = result.merge_version_;
|
||||
dag_param.is_tenant_major_merge_ = false;
|
||||
dag_param.compat_mode_ = tablet->get_tablet_meta().compat_mode_;
|
||||
dag_param.transfer_seq_ = tablet->get_tablet_meta().transfer_info_.transfer_seq_;
|
||||
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tablet_co_merge_dag_net(dag_param))) {
|
||||
@ -1635,7 +1630,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTenantTabletScheduler has not been inited", K(ret));
|
||||
} else if (OB_FAIL(MTL(ObTenantTabletScheduler*)->get_min_data_version(compat_version))) {
|
||||
} else if (OB_FAIL(get_min_data_version(compat_version))) {
|
||||
LOG_WARN("failed to get min data version", KR(ret));
|
||||
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
||||
// do nothing, should not loop tablets
|
||||
@ -1721,19 +1716,23 @@ int ObTenantTabletScheduler::after_schedule_tenant_medium(
|
||||
schedule_stats_.check_weak_read_ts_cnt_++;
|
||||
}
|
||||
|
||||
if (medium_ls_tablet_iter_.is_scan_finish() && REACH_TENANT_TIME_INTERVAL(ADD_LOOP_EVENT_INTERVAL)) {
|
||||
ADD_COMPACTION_EVENT(
|
||||
if (medium_ls_tablet_iter_.is_scan_finish()) {
|
||||
loop_cnt_++;
|
||||
if (REACH_TENANT_TIME_INTERVAL(ADD_LOOP_EVENT_INTERVAL)) {
|
||||
ADD_COMPACTION_EVENT(
|
||||
merge_version,
|
||||
ObServerCompactionEvent::SCHEDULER_LOOP,
|
||||
ObTimeUtility::fast_current_time(),
|
||||
"schedule_stats",
|
||||
schedule_stats_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && medium_ls_tablet_iter_.tenant_merge_finish() && merge_version > merged_version_) {
|
||||
merged_version_ = merge_version;
|
||||
LOG_INFO("all tablet major merge finish", K(merged_version_));
|
||||
LOG_INFO("all tablet major merge finish", K(merged_version_), K_(loop_cnt));
|
||||
loop_cnt_ = 0;
|
||||
DEL_SUSPECT_INFO(MEDIUM_MERGE, UNKNOW_LS_ID, UNKNOW_TABLET_ID, share::ObDiagnoseTabletType::TYPE_MEDIUM_MERGE);
|
||||
if (OB_TMP_FAIL(MTL(ObTenantCompactionProgressMgr *)->update_progress_status(
|
||||
merge_version,
|
||||
@ -1755,7 +1754,7 @@ int ObTenantTabletScheduler::after_schedule_tenant_medium(
|
||||
|
||||
LOG_INFO("finish schedule all tablet merge", K(merge_version), K(schedule_stats_),
|
||||
"tenant_merge_finish", medium_ls_tablet_iter_.tenant_merge_finish(),
|
||||
K(merged_version_));
|
||||
K(merged_version_), "is_scan_all_tablet_finish", medium_ls_tablet_iter_.is_scan_finish());
|
||||
if (medium_ls_tablet_iter_.is_scan_finish()) {
|
||||
schedule_stats_.clear_tablet_cnt();
|
||||
}
|
||||
@ -1787,7 +1786,7 @@ int ObTenantTabletScheduler::try_schedule_tablet_medium_merge(
|
||||
} else if (OB_UNLIKELY(tablet_id.is_ls_inner_tablet())) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("not supported to schedule medium for ls inner tablet", K(ret), K(tablet_id));
|
||||
} else if (OB_FAIL(MTL(ObTenantTabletScheduler*)->get_min_data_version(compat_version))) {
|
||||
} else if (OB_FAIL(get_min_data_version(compat_version))) {
|
||||
LOG_WARN("failed to get min data version", KR(ret));
|
||||
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
|
@ -264,8 +264,7 @@ public:
|
||||
const share::ObLSID &ls_id,
|
||||
const storage::ObTablet &tablet,
|
||||
const ObMergeType merge_type,
|
||||
const int64_t &merge_snapshot_version,
|
||||
const bool is_tenant_major_merge = false);
|
||||
const int64_t &merge_snapshot_version);
|
||||
static int schedule_tablet_ddl_major_merge(
|
||||
ObTabletHandle &tablet_handle);
|
||||
|
||||
@ -278,7 +277,6 @@ public:
|
||||
int schedule_next_round_for_leader(
|
||||
const ObIArray<compaction::ObTabletCheckInfo> &tablet_ls_infos,
|
||||
const ObIArray<compaction::ObTabletCheckInfo> &finish_tablet_ls_infos);
|
||||
|
||||
private:
|
||||
friend struct ObTenantTabletSchedulerTaskMgr;
|
||||
int schedule_next_medium_for_leader(
|
||||
@ -367,6 +365,7 @@ private:
|
||||
ObCompactionScheduleIterator gc_sst_tablet_iter_;
|
||||
int64_t schedule_tablet_batch_size_;
|
||||
int64_t error_tablet_cnt_; // for diagnose
|
||||
int64_t loop_cnt_;
|
||||
ObProhibitScheduleMediumMap prohibit_medium_map_;
|
||||
ObTenantTabletSchedulerTaskMgr timer_task_mgr_;
|
||||
};
|
||||
|
@ -246,12 +246,10 @@ ObGetMergeTablesResult::ObGetMergeTablesResult()
|
||||
: version_range_(),
|
||||
handle_(),
|
||||
merge_version_(),
|
||||
create_snapshot_version_(INVALID_INT_VALUE),
|
||||
update_tablet_directly_(false),
|
||||
schedule_major_(false),
|
||||
is_simplified_(false),
|
||||
scn_range_(),
|
||||
read_base_version_(0),
|
||||
error_location_(nullptr),
|
||||
snapshot_info_()
|
||||
{
|
||||
@ -261,8 +259,7 @@ bool ObGetMergeTablesResult::is_valid() const
|
||||
{
|
||||
return scn_range_.is_valid()
|
||||
&& (is_simplified_ || handle_.get_count() >= 1)
|
||||
&& merge_version_ >= 0
|
||||
&& create_snapshot_version_ >= 0;
|
||||
&& merge_version_ >= 0;
|
||||
}
|
||||
|
||||
void ObGetMergeTablesResult::reset_handle_and_range()
|
||||
@ -283,10 +280,8 @@ void ObGetMergeTablesResult::reset()
|
||||
version_range_.reset();
|
||||
handle_.reset();
|
||||
merge_version_ = ObVersionRange::MIN_VERSION;
|
||||
create_snapshot_version_ = 0;
|
||||
schedule_major_ = false;
|
||||
scn_range_.reset();
|
||||
read_base_version_ = 0;
|
||||
error_location_ = nullptr;
|
||||
is_simplified_ = false;
|
||||
snapshot_info_.reset();
|
||||
@ -301,7 +296,6 @@ int ObGetMergeTablesResult::copy_basic_info(const ObGetMergeTablesResult &src)
|
||||
} else {
|
||||
version_range_ = src.version_range_;
|
||||
merge_version_ = src.merge_version_;
|
||||
create_snapshot_version_ = src.create_snapshot_version_;
|
||||
schedule_major_ = src.schedule_major_;
|
||||
scn_range_ = src.scn_range_;
|
||||
error_location_ = src.error_location_;
|
||||
|
@ -303,15 +303,12 @@ struct ObGetMergeTablesResult
|
||||
common::ObVersionRange version_range_;
|
||||
ObTablesHandleArray handle_;
|
||||
int64_t merge_version_;
|
||||
int64_t create_snapshot_version_;
|
||||
bool update_tablet_directly_;
|
||||
bool schedule_major_;
|
||||
bool is_simplified_;
|
||||
share::ObScnRange scn_range_;
|
||||
int64_t read_base_version_;
|
||||
share::ObDiagnoseLocation *error_location_;
|
||||
ObStorageSnapshotInfo snapshot_info_;
|
||||
static const int64_t INVALID_INT_VALUE = -1;
|
||||
ObGetMergeTablesResult();
|
||||
bool is_valid() const;
|
||||
void reset_handle_and_range();
|
||||
@ -320,8 +317,7 @@ struct ObGetMergeTablesResult
|
||||
int assign(const ObGetMergeTablesResult &src);
|
||||
int copy_basic_info(const ObGetMergeTablesResult &src);
|
||||
TO_STRING_KV(K_(version_range), K_(scn_range), K_(merge_version),
|
||||
K_(create_snapshot_version), K_(handle),
|
||||
K_(update_tablet_directly), K_(schedule_major), K_(read_base_version));
|
||||
K_(handle), K_(update_tablet_directly), K_(schedule_major));
|
||||
};
|
||||
|
||||
OB_INLINE bool is_valid_migrate_status(const ObMigrateStatus &status)
|
||||
@ -329,7 +325,6 @@ OB_INLINE bool is_valid_migrate_status(const ObMigrateStatus &status)
|
||||
return status >= OB_MIGRATE_STATUS_NONE && status < OB_MIGRATE_STATUS_MAX;
|
||||
}
|
||||
|
||||
|
||||
struct ObDDLTableStoreParam final
|
||||
{
|
||||
public:
|
||||
|
@ -554,8 +554,13 @@ int ObAdminParserLogEntry::parse_data_dict_log_()
|
||||
|
||||
int ObAdminParserLogEntry::parse_reserved_snapshot_log_()
|
||||
{
|
||||
//not supported so far, just reserved
|
||||
int ret = OB_NOT_SUPPORTED;
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t update_version = 0;
|
||||
if (OB_FAIL(serialization::decode_i64(buf_, buf_len_, pos_, &update_version))) {
|
||||
LOG_WARN("fail to deserialize update_version", K(ret));
|
||||
} else {
|
||||
fprintf(stdout, " ###<LSReservedSnapshotLog>: snapshot: %ld\n", update_version);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -112,7 +112,6 @@ int TestParallelMinorDag::prepare_merge_result(
|
||||
result.version_range_.snapshot_version_ = 100;
|
||||
result.version_range_.multi_version_start_ = 100;
|
||||
result.merge_version_ = 0;
|
||||
result.create_snapshot_version_ = 0;
|
||||
|
||||
int64_t log_ts = 1;
|
||||
for (int i = 0; OB_SUCC(ret) && i < sstable_cnt; ++i) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user