fix start_scn to make scn_range continuous in migrate phase

This commit is contained in:
suz-yang
2024-04-23 10:40:17 +00:00
committed by ob-robot
parent 3dde2243e3
commit ba047ea0d9

View File

@ -488,6 +488,42 @@ int ObDDLTableMergeTask::merge_full_direct_load_ddl_kvs(ObLSHandle &ls_handle, O
return ret;
}
static int refine_incremental_direct_load_merge_param(const ObTablet &tablet,
ObTabletDDLParam &ddl_param,
bool &need_check_tablet)
{
#define PRINT_TS_WRAPPER(x) (ObPrintTableStore(*(x.get_member())))
int ret = OB_SUCCESS;
need_check_tablet = false;
ObITable *last_table = nullptr;
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) {
LOG_WARN("fail to fetch table store", K(ret));
} else if (OB_UNLIKELY(!table_store_wrapper.get_member()->is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Table store not valid", K(ret), K(table_store_wrapper));
} else if (OB_ISNULL(last_table =
table_store_wrapper.get_member()->get_minor_sstables().get_boundary_table(true/*last*/))) {
// no minor sstable, skip to cut memtable's boundary
} else if (ddl_param.table_key_.scn_range_.start_scn_ > last_table->get_end_scn()) {
need_check_tablet = true;
} else if (ddl_param.table_key_.scn_range_.start_scn_ < last_table->get_end_scn()
&& !tablet.get_tablet_meta().tablet_id_.is_special_merge_tablet()) {
// fix start_scn to make scn_range continuous in migrate phase
if (ddl_param.table_key_.scn_range_.end_scn_ <= last_table->get_end_scn()) {
ret = OB_NO_NEED_MERGE;
LOG_WARN("No need mini merge memtable which is covered by existing sstable",
K(ret), K(ddl_param), KPC(last_table), K(PRINT_TS_WRAPPER(table_store_wrapper)), K(tablet));
} else {
ddl_param.table_key_.scn_range_.start_scn_ = last_table->get_end_scn();
FLOG_INFO("Fix mini merge result scn range", K(ret), K(ddl_param), KPC(last_table),
K(PRINT_TS_WRAPPER(table_store_wrapper)), K(tablet));
}
}
return ret;
}
int ObDDLTableMergeTask::merge_incremental_direct_load_ddl_kvs(ObLSHandle &ls_handle, ObTablet &tablet)
{
int ret = OB_SUCCESS;
@ -496,13 +532,15 @@ int ObDDLTableMergeTask::merge_incremental_direct_load_ddl_kvs(ObLSHandle &ls_ha
ObTableStoreIterator ddl_table_iter;
ObTabletDDLParam ddl_param;
ObTableHandleV2 compacted_sstable_handle;
const SCN &clog_checkpoint_scn = tablet.get_clog_checkpoint_scn();
bool need_check_tablet = false;
if (OB_UNLIKELY(frozen_ddl_kvs_.count() != 1)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected frozen ddl kvs", K(ret), K(merge_param_), K(frozen_ddl_kvs_));
} else if (OB_ISNULL(ddl_kv = frozen_ddl_kvs_.at(0).get_obj())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected invalid ddlkv handle", K(ret), K(merge_param_), K(frozen_ddl_kvs_));
} else if (ddl_kv->get_end_scn() <= tablet.get_tablet_meta().clog_checkpoint_scn_) {
} else if (ddl_kv->get_end_scn() <= clog_checkpoint_scn) {
// do nothing
} else {
ddl_param.direct_load_type_ = merge_param_.direct_load_type_;
@ -516,13 +554,38 @@ int ObDDLTableMergeTask::merge_incremental_direct_load_ddl_kvs(ObLSHandle &ls_ha
ddl_param.table_key_.table_type_ = ObITable::MINI_SSTABLE;
ddl_param.snapshot_version_ = ddl_kv->get_snapshot_version();
ddl_param.trans_id_ = ddl_kv->get_trans_id();
if (OB_FAIL(ObTabletDDLUtil::compact_ddl_kv(*ls_handle.get_ls(),
tablet,
ddl_table_iter,
frozen_ddl_kvs_,
ddl_param,
allocator,
compacted_sstable_handle))) {
if (OB_FAIL(refine_incremental_direct_load_merge_param(tablet, ddl_param, need_check_tablet))) {
if (OB_NO_NEED_MERGE != ret) {
LOG_WARN("fail to refine incremental direct load merge param", K(ret), K(tablet),
K(ddl_param), K(frozen_ddl_kvs_));
} else {
ret = OB_SUCCESS;
}
} else if (OB_UNLIKELY(need_check_tablet)) {
ret = OB_EAGAIN;
int tmp_ret = OB_SUCCESS;
ObTabletHandle tmp_tablet_handle;
if (OB_TMP_FAIL(ls_handle.get_ls()->get_tablet(merge_param_.tablet_id_,
tmp_tablet_handle,
0/*timeout_us*/,
ObMDSGetTabletMode::READ_WITHOUT_CHECK))) {
LOG_WARN("failed to get tablet", K(tmp_ret), K(merge_param_));
} else if (OB_UNLIKELY(!tmp_tablet_handle.is_valid())) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN("get invalid tablet", K(tmp_ret), K(merge_param_));
} else if (tmp_tablet_handle.get_obj()->get_clog_checkpoint_scn() != clog_checkpoint_scn) {
// do nothing, just retry the merge task
} else {
LOG_ERROR("Unexpected uncontinuous scn_range in mini merge", K(ret), K(clog_checkpoint_scn),
K(ddl_param), K(frozen_ddl_kvs_), K(tablet), KPC(tmp_tablet_handle.get_obj()));
}
} else if (OB_FAIL(ObTabletDDLUtil::compact_ddl_kv(*ls_handle.get_ls(),
tablet,
ddl_table_iter,
frozen_ddl_kvs_,
ddl_param,
allocator,
compacted_sstable_handle))) {
LOG_WARN("compact sstables failed", K(ret), K(ddl_param), K(frozen_ddl_kvs_));
}
}