From ba047ea0d9ff06c04ed873173b89c546dbcc19b5 Mon Sep 17 00:00:00 2001 From: suz-yang Date: Tue, 23 Apr 2024 10:40:17 +0000 Subject: [PATCH] fix start_scn to make scn_range continuous in migrate phase --- src/storage/ddl/ob_ddl_merge_task.cpp | 79 ++++++++++++++++++++++++--- 1 file changed, 71 insertions(+), 8 deletions(-) diff --git a/src/storage/ddl/ob_ddl_merge_task.cpp b/src/storage/ddl/ob_ddl_merge_task.cpp index 455ef5ac09..5d9f8a7857 100644 --- a/src/storage/ddl/ob_ddl_merge_task.cpp +++ b/src/storage/ddl/ob_ddl_merge_task.cpp @@ -488,6 +488,42 @@ int ObDDLTableMergeTask::merge_full_direct_load_ddl_kvs(ObLSHandle &ls_handle, O return ret; } +static int refine_incremental_direct_load_merge_param(const ObTablet &tablet, + ObTabletDDLParam &ddl_param, + bool &need_check_tablet) +{ +#define PRINT_TS_WRAPPER(x) (ObPrintTableStore(*(x.get_member()))) + + int ret = OB_SUCCESS; + need_check_tablet = false; + ObITable *last_table = nullptr; + ObTabletMemberWrapper table_store_wrapper; + if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) { + LOG_WARN("fail to fetch table store", K(ret)); + } else if (OB_UNLIKELY(!table_store_wrapper.get_member()->is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("Table store not valid", K(ret), K(table_store_wrapper)); + } else if (OB_ISNULL(last_table = + table_store_wrapper.get_member()->get_minor_sstables().get_boundary_table(true/*last*/))) { + // no minor sstable, skip to cut memtable's boundary + } else if (ddl_param.table_key_.scn_range_.start_scn_ > last_table->get_end_scn()) { + need_check_tablet = true; + } else if (ddl_param.table_key_.scn_range_.start_scn_ < last_table->get_end_scn() + && !tablet.get_tablet_meta().tablet_id_.is_special_merge_tablet()) { + // fix start_scn to make scn_range continuous in migrate phase + if (ddl_param.table_key_.scn_range_.end_scn_ <= last_table->get_end_scn()) { + ret = OB_NO_NEED_MERGE; + LOG_WARN("No need mini merge memtable which is covered by existing sstable", + K(ret), K(ddl_param), KPC(last_table), K(PRINT_TS_WRAPPER(table_store_wrapper)), K(tablet)); + } else { + ddl_param.table_key_.scn_range_.start_scn_ = last_table->get_end_scn(); + FLOG_INFO("Fix mini merge result scn range", K(ret), K(ddl_param), KPC(last_table), + K(PRINT_TS_WRAPPER(table_store_wrapper)), K(tablet)); + } + } + return ret; +} + int ObDDLTableMergeTask::merge_incremental_direct_load_ddl_kvs(ObLSHandle &ls_handle, ObTablet &tablet) { int ret = OB_SUCCESS; @@ -496,13 +532,15 @@ int ObDDLTableMergeTask::merge_incremental_direct_load_ddl_kvs(ObLSHandle &ls_ha ObTableStoreIterator ddl_table_iter; ObTabletDDLParam ddl_param; ObTableHandleV2 compacted_sstable_handle; + const SCN &clog_checkpoint_scn = tablet.get_clog_checkpoint_scn(); + bool need_check_tablet = false; if (OB_UNLIKELY(frozen_ddl_kvs_.count() != 1)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected frozen ddl kvs", K(ret), K(merge_param_), K(frozen_ddl_kvs_)); } else if (OB_ISNULL(ddl_kv = frozen_ddl_kvs_.at(0).get_obj())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected invalid ddlkv handle", K(ret), K(merge_param_), K(frozen_ddl_kvs_)); - } else if (ddl_kv->get_end_scn() <= tablet.get_tablet_meta().clog_checkpoint_scn_) { + } else if (ddl_kv->get_end_scn() <= clog_checkpoint_scn) { // do nothing } else { ddl_param.direct_load_type_ = merge_param_.direct_load_type_; @@ -516,13 +554,38 @@ int ObDDLTableMergeTask::merge_incremental_direct_load_ddl_kvs(ObLSHandle &ls_ha ddl_param.table_key_.table_type_ = ObITable::MINI_SSTABLE; ddl_param.snapshot_version_ = ddl_kv->get_snapshot_version(); ddl_param.trans_id_ = ddl_kv->get_trans_id(); - if (OB_FAIL(ObTabletDDLUtil::compact_ddl_kv(*ls_handle.get_ls(), - tablet, - ddl_table_iter, - frozen_ddl_kvs_, - ddl_param, - allocator, - compacted_sstable_handle))) { + if (OB_FAIL(refine_incremental_direct_load_merge_param(tablet, ddl_param, need_check_tablet))) { + if (OB_NO_NEED_MERGE != ret) { + LOG_WARN("fail to refine incremental direct load merge param", K(ret), K(tablet), + K(ddl_param), K(frozen_ddl_kvs_)); + } else { + ret = OB_SUCCESS; + } + } else if (OB_UNLIKELY(need_check_tablet)) { + ret = OB_EAGAIN; + int tmp_ret = OB_SUCCESS; + ObTabletHandle tmp_tablet_handle; + if (OB_TMP_FAIL(ls_handle.get_ls()->get_tablet(merge_param_.tablet_id_, + tmp_tablet_handle, + 0/*timeout_us*/, + ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { + LOG_WARN("failed to get tablet", K(tmp_ret), K(merge_param_)); + } else if (OB_UNLIKELY(!tmp_tablet_handle.is_valid())) { + tmp_ret = OB_ERR_UNEXPECTED; + LOG_WARN("get invalid tablet", K(tmp_ret), K(merge_param_)); + } else if (tmp_tablet_handle.get_obj()->get_clog_checkpoint_scn() != clog_checkpoint_scn) { + // do nothing, just retry the merge task + } else { + LOG_ERROR("Unexpected uncontinuous scn_range in mini merge", K(ret), K(clog_checkpoint_scn), + K(ddl_param), K(frozen_ddl_kvs_), K(tablet), KPC(tmp_tablet_handle.get_obj())); + } + } else if (OB_FAIL(ObTabletDDLUtil::compact_ddl_kv(*ls_handle.get_ls(), + tablet, + ddl_table_iter, + frozen_ddl_kvs_, + ddl_param, + allocator, + compacted_sstable_handle))) { LOG_WARN("compact sstables failed", K(ret), K(ddl_param), K(frozen_ddl_kvs_)); } }