diff --git a/mittest/mtlenv/storage/blocksstable/ob_index_block_data_prepare.h b/mittest/mtlenv/storage/blocksstable/ob_index_block_data_prepare.h index 03ba0554e6..23622f86ec 100644 --- a/mittest/mtlenv/storage/blocksstable/ob_index_block_data_prepare.h +++ b/mittest/mtlenv/storage/blocksstable/ob_index_block_data_prepare.h @@ -536,7 +536,7 @@ void TestIndexBlockDataPrepare::prepare_ddl_kv() share::SCN ddl_start_scn; ddl_start_scn.convert_from_ts(ObTimeUtility::current_time()); - ASSERT_EQ(OB_SUCCESS, ddl_kv_.init(ls_id, tablet_id, ddl_start_scn, sstable_.get_data_version(), ddl_start_scn, 4000)); + ASSERT_EQ(OB_SUCCESS, ddl_kv_.init(*tablet_handle.get_obj(), ddl_start_scn, sstable_.get_data_version(), ddl_start_scn, 4000)); SMART_VAR(ObSSTableSecMetaIterator, meta_iter) { ObDatumRange query_range; diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 960c94ccb6..d88f8ae0ce 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -2142,7 +2142,7 @@ int ObRpcRemoteWriteDDLCommitLogP::process() commit_scn, is_remote_write))) { LOG_WARN("fail to remote write commit log", K(ret), K(table_key), K_(arg)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(arg_.start_scn_, commit_scn))) { + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(*tablet_handle.get_obj(), arg_.start_scn_, commit_scn))) { LOG_WARN("failed to do ddl kv commit", K(ret), K(arg_)); } else { result_ = commit_scn.get_val_for_tx(); diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index e3cb9a33e0..cc102f0aac 100755 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -901,10 +901,12 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(ObTabletHandle &tab } } else if (kv_mgr_handle.is_valid()) { ObDDLTableMergeDagParam param; - if (OB_FAIL(kv_mgr_handle.get_obj()->get_ddl_major_merge_param(tablet_handle.get_obj()->get_tablet_meta(), param))) { + if (OB_FAIL(kv_mgr_handle.get_obj()->get_ddl_major_merge_param(*tablet_handle.get_obj(), param))) { if (OB_EAGAIN != ret) { LOG_WARN("failed to get ddl major merge param", K(ret)); } + } else if (OB_FAIL(kv_mgr_handle.get_obj()->freeze_ddl_kv(*tablet_handle.get_obj()))) { + LOG_WARN("failed to freeze ddl kv", K(ret)); } else if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) { LOG_WARN("schedule ddl merge dag failed", K(ret), K(param)); diff --git a/src/storage/ddl/ob_ddl_clog.cpp b/src/storage/ddl/ob_ddl_clog.cpp index e81aa86e4c..51330764b2 100644 --- a/src/storage/ddl/ob_ddl_clog.cpp +++ b/src/storage/ddl/ob_ddl_clog.cpp @@ -146,13 +146,14 @@ ObDDLMacroBlockClogCb::ObDDLMacroBlockClogCb() int ObDDLMacroBlockClogCb::init(const share::ObLSID &ls_id, const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, const blocksstable::MacroBlockId ¯o_block_id, + ObTabletHandle &tablet_handle, ObDDLKvMgrHandle &ddl_kv_mgr_handle) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); - } else if (OB_UNLIKELY(!ls_id.is_valid() || !redo_info.is_valid() || !macro_block_id.is_valid() || !ddl_kv_mgr_handle.is_valid())) { + } else if (OB_UNLIKELY(!ls_id.is_valid() || !redo_info.is_valid() || !macro_block_id.is_valid() || !tablet_handle.is_valid() || !ddl_kv_mgr_handle.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(ls_id), K(redo_info), K(macro_block_id)); } else { @@ -163,6 +164,7 @@ int ObDDLMacroBlockClogCb::init(const share::ObLSID &ls_id, redo_info_.start_scn_ = redo_info.start_scn_; ls_id_ = ls_id; macro_block_id_ = macro_block_id; + tablet_handle_ = tablet_handle; ddl_kv_mgr_handle_ = ddl_kv_mgr_handle; } return ret; @@ -184,16 +186,10 @@ int ObDDLMacroBlockClogCb::on_success() { int ret = OB_SUCCESS; ObDDLMacroBlock macro_block; - ObLSHandle ls_handle; - ObTabletHandle tablet_handle; { ObSpinLockGuard data_buffer_guard(data_buffer_lock_); if (is_data_buffer_freed_) { LOG_INFO("data buffer is freed, do not need to callback"); - } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { - LOG_WARN("get ls handle failed", K(ret), K(ls_id_)); - } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, redo_info_.table_key_.get_tablet_id(), tablet_handle))) { - LOG_WARN("get tablet handle failed", K(ret), K(redo_info_.table_key_)); } else if (OB_FAIL(macro_block.block_handle_.set_block_id(macro_block_id_))) { LOG_WARN("set macro block id failed", K(ret), K(macro_block_id_)); } else { @@ -203,8 +199,8 @@ int ObDDLMacroBlockClogCb::on_success() macro_block.buf_ = redo_info_.data_buffer_.ptr(); macro_block.size_ = redo_info_.data_buffer_.length(); macro_block.ddl_start_scn_ = redo_info_.start_scn_; - if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle.get_obj(), macro_block))) { - LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle), K(macro_block)); + if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle_.get_obj(), macro_block))) { + LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle_), K(macro_block)); } } } diff --git a/src/storage/ddl/ob_ddl_clog.h b/src/storage/ddl/ob_ddl_clog.h index ec7ba7f782..7cb3d5e0bf 100644 --- a/src/storage/ddl/ob_ddl_clog.h +++ b/src/storage/ddl/ob_ddl_clog.h @@ -107,6 +107,7 @@ public: int init(const share::ObLSID &ls_id, const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, const blocksstable::MacroBlockId ¯o_block_id, + ObTabletHandle &tablet_handle, ObDDLKvMgrHandle &ddl_kv_mgr_handle); virtual int on_success() override; virtual int on_failure() override; @@ -124,6 +125,7 @@ private: ObArenaAllocator arena_; ObSpinLock data_buffer_lock_; bool is_data_buffer_freed_; + ObTabletHandle tablet_handle_; ObDDLKvMgrHandle ddl_kv_mgr_handle_; }; diff --git a/src/storage/ddl/ob_ddl_merge_task.cpp b/src/storage/ddl/ob_ddl_merge_task.cpp index 80aa58edec..ff15b3662e 100755 --- a/src/storage/ddl/ob_ddl_merge_task.cpp +++ b/src/storage/ddl/ob_ddl_merge_task.cpp @@ -46,8 +46,7 @@ namespace storage ObDDLTableMergeDag::ObDDLTableMergeDag() : ObIDag(ObDagType::DAG_TYPE_DDL_KV_MERGE), is_inited_(false), - ddl_param_(), - compat_mode_(lib::Worker::CompatMode::INVALID) + ddl_param_() { } @@ -69,20 +68,7 @@ int ObDDLTableMergeDag::init_by_param(const share::ObIDagInitParam *param) LOG_WARN("invalid arguments", K(ret), KP(param)); } else { ddl_param_ = *static_cast(param); - if (OB_FAIL(MTL(ObLSService *)->get_ls(ddl_param_.ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { - LOG_WARN("failed to get log stream", K(ret), K(ddl_param_)); - } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, - ddl_param_.tablet_id_, - tablet_handle, - ObMDSGetTabletMode::READ_ALL_COMMITED))) { - LOG_WARN("failed to get tablet", K(ret), K(ddl_param_)); - } else if (OB_ISNULL(tablet = tablet_handle.get_obj())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("tablet is null", K(ret), K(ddl_param_)); - } else { - compat_mode_ = tablet->get_tablet_meta().compat_mode_; - is_inited_ = true; - } + is_inited_ = true; } return ret; } @@ -90,32 +76,9 @@ int ObDDLTableMergeDag::init_by_param(const share::ObIDagInitParam *param) int ObDDLTableMergeDag::create_first_task() { int ret = OB_SUCCESS; - ObLSService *ls_service = MTL(ObLSService *); - ObLSHandle ls_handle; - ObTabletHandle tablet_handle; - ObDDLKvMgrHandle ddl_kv_mgr_handle; ObTablesHandleArray ddl_kvs_handle; ObDDLTableMergeTask *merge_task = nullptr; - if (OB_FAIL(ls_service->get_ls(ddl_param_.ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { - LOG_WARN("get ls failed", K(ret), K(ddl_param_)); - } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, - ddl_param_.tablet_id_, - tablet_handle, - ObMDSGetTabletMode::READ_ALL_COMMITED))) { - LOG_WARN("get tablet failed", K(ret), K(ddl_param_)); - } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { - if (OB_ENTRY_NOT_EXIST == ret) { - ret = OB_TASK_EXPIRED; - LOG_INFO("ddl kv mgr not exist", K(ret), K(ddl_param_)); - } else { - LOG_WARN("get ddl kv mgr failed", K(ret), K(ddl_param_)); - } - } else if (ddl_param_.start_scn_ < ddl_kv_mgr_handle.get_obj()->get_start_scn()) { - ret = OB_TASK_EXPIRED; - LOG_WARN("ddl task expired, skip it", K(ret), K(ddl_param_), "new_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn()); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->freeze_ddl_kv())) { - LOG_WARN("ddl kv manager try freeze failed", K(ret), K(ddl_param_)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_ddl_kvs(true/*frozen_only*/, ddl_kvs_handle))) { + if (OB_FAIL(ddl_param_.ddl_kv_mgr_handle_.get_obj()->get_ddl_kvs(true/*frozen_only*/, ddl_kvs_handle))) { LOG_WARN("get freezed ddl kv failed", K(ret), K(ddl_param_)); } else if (OB_FAIL(alloc_task(merge_task))) { LOG_WARN("Fail to alloc task", K(ret), K(ddl_param_)); @@ -280,7 +243,7 @@ int ObDDLTableDumpTask::process() } else if (OB_ISNULL(ddl_kv = static_cast(ddl_kv_handle.get_table()))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get ddl kv failed", K(ret)); - } else if (OB_FAIL(ddl_kv->close())) { + } else if (OB_FAIL(ddl_kv->close(*tablet_handle.get_obj()))) { if (OB_EAGAIN != ret) { LOG_WARN("close ddl kv failed", K(ret)); } @@ -393,7 +356,8 @@ int ObDDLTableMergeTask::process() } else if (merge_param_.start_scn_ > SCN::min_scn() && merge_param_.start_scn_ < ddl_param.start_scn_) { ret = OB_TASK_EXPIRED; LOG_INFO("ddl merge task expired, do nothing", K(merge_param_), "new_start_scn", ddl_param.start_scn_); - } else if (OB_FAIL(ObTabletDDLUtil::compact_ddl_sstable(ddl_table_iter, + } else if (OB_FAIL(ObTabletDDLUtil::compact_ddl_sstable(*tablet_handle.get_obj(), + ddl_table_iter, tablet_handle.get_obj()->get_rowkey_read_info(), merge_param_.is_commit_, merge_param_.rec_scn_, @@ -508,8 +472,7 @@ bool ObTabletDDLParam::is_valid() const && data_format_version_ >= 0; } -int ObTabletDDLUtil::prepare_index_data_desc(const share::ObLSID &ls_id, - const ObTabletID &tablet_id, +int ObTabletDDLUtil::prepare_index_data_desc(ObTablet &tablet, const int64_t snapshot_version, const int64_t data_format_version, const ObSSTable *first_ddl_sstable, @@ -518,24 +481,14 @@ int ObTabletDDLUtil::prepare_index_data_desc(const share::ObLSID &ls_id, int ret = OB_SUCCESS; data_desc.reset(); ObLSService *ls_service = MTL(ObLSService *); - ObLSHandle ls_handle; - ObTabletHandle tablet_handle; ObArenaAllocator tmp_arena("DDLIdxDescTmp"); const ObStorageSchema *storage_schema = nullptr; + const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; + const ObLSID &ls_id = tablet.get_tablet_meta().ls_id_; if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || snapshot_version <= 0 || data_format_version < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_id), K(snapshot_version), K(data_format_version)); - } else if (OB_ISNULL(ls_service)) { - ret = OB_ERR_SYS; - LOG_WARN("ls service is null", K(ret), K(ls_id)); - } else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::DDL_MOD))) { - LOG_WARN("get ls failed", K(ret), K(ls_id)); - } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, - tablet_id, - tablet_handle, - ObMDSGetTabletMode::READ_ALL_COMMITED))) { - LOG_WARN("get tablet failed", K(ret)); - } else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(tmp_arena, storage_schema))) { + } else if (OB_FAIL(tablet.load_storage_schema(tmp_arena, storage_schema))) { LOG_WARN("fail to get storage schema", K(ret)); } else if (OB_FAIL(data_desc.init_as_index(*storage_schema, ls_id, @@ -586,7 +539,8 @@ int ObTabletDDLUtil::prepare_index_data_desc(const share::ObLSID &ls_id, return ret; } -int ObTabletDDLUtil::create_ddl_sstable(const ObTabletDDLParam &ddl_param, +int ObTabletDDLUtil::create_ddl_sstable(ObTablet &tablet, + const ObTabletDDLParam &ddl_param, const ObIArray &meta_array, const ObSSTable *first_ddl_sstable, common::ObArenaAllocator &allocator, @@ -601,8 +555,7 @@ int ObTabletDDLUtil::create_ddl_sstable(const ObTabletDDLParam &ddl_param, if (OB_UNLIKELY(!ddl_param.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(ddl_param)); - } else if (OB_FAIL(ObTabletDDLUtil::prepare_index_data_desc(ddl_param.ls_id_, - ddl_param.table_key_.tablet_id_, + } else if (OB_FAIL(ObTabletDDLUtil::prepare_index_data_desc(tablet, ddl_param.table_key_.version_range_.snapshot_version_, ddl_param.data_format_version_, first_ddl_sstable, @@ -634,7 +587,7 @@ int ObTabletDDLUtil::create_ddl_sstable(const ObTabletDDLParam &ddl_param, if (OB_SUCC(ret)) { if (OB_FAIL(index_block_rebuilder->close())) { LOG_WARN("close index block rebuilder failed", K(ret)); - } else if (OB_FAIL(ObTabletDDLUtil::create_ddl_sstable(sstable_index_builder, ddl_param, first_ddl_sstable, allocator, sstable))) { + } else if (OB_FAIL(ObTabletDDLUtil::create_ddl_sstable(tablet, sstable_index_builder, ddl_param, first_ddl_sstable, allocator, sstable))) { LOG_WARN("create ddl sstable failed", K(ret), K(ddl_param)); } } @@ -651,34 +604,22 @@ int ObTabletDDLUtil::create_ddl_sstable(const ObTabletDDLParam &ddl_param, return ret; } -int ObTabletDDLUtil::create_ddl_sstable(ObSSTableIndexBuilder *sstable_index_builder, +int ObTabletDDLUtil::create_ddl_sstable(ObTablet &tablet, + ObSSTableIndexBuilder *sstable_index_builder, const ObTabletDDLParam &ddl_param, const ObSSTable *first_ddl_sstable, common::ObArenaAllocator &allocator, blocksstable::ObSSTable &sstable) { int ret = OB_SUCCESS; - ObLSService *ls_service = MTL(ObLSService *); - ObLSHandle ls_handle; - ObTabletHandle tablet_handle; ObArenaAllocator tmp_arena("CreateDDLSstTmp"); const ObStorageSchema *storage_schema = nullptr; SMART_VAR(ObSSTableMergeRes, res) { if (OB_UNLIKELY(nullptr == sstable_index_builder || !ddl_param.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), KP(sstable_index_builder), K(ddl_param)); - } else if (OB_ISNULL(ls_service)) { - ret = OB_ERR_SYS; - LOG_WARN("ls service is null", K(ret), K(ddl_param)); - } else if (OB_FAIL(ls_service->get_ls(ddl_param.ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { - LOG_WARN("get ls failed", K(ret), K(ddl_param)); - } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, - ddl_param.table_key_.tablet_id_, - tablet_handle, - ObMDSGetTabletMode::READ_ALL_COMMITED))) { - LOG_WARN("get tablet failed", K(ret), K(ddl_param)); - } else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(tmp_arena, storage_schema))) { - LOG_WARN("failed to load storage schema", K(ret), K(tablet_handle)); + } else if (OB_FAIL(tablet.load_storage_schema(tmp_arena, storage_schema))) { + LOG_WARN("failed to load storage schema", K(ret), K(tablet)); } else { int64_t column_count = 0; share::schema::ObTableMode table_mode = storage_schema->get_table_mode_struct(); @@ -709,7 +650,7 @@ int ObTabletDDLUtil::create_ddl_sstable(ObSSTableIndexBuilder *sstable_index_bui LOG_WARN("max_merged_trans_version_ in res is different from ddl snapshot version", K(ret), K(res), K(ddl_param)); } else { - const int64_t create_schema_version_on_tablet = tablet_handle.get_obj()->get_tablet_meta().create_schema_version_; + const int64_t create_schema_version_on_tablet = tablet.get_tablet_meta().create_schema_version_; ObTabletCreateSSTableParam param; param.table_key_ = ddl_param.table_key_; param.table_mode_ = table_mode; @@ -765,7 +706,8 @@ int ObTabletDDLUtil::create_ddl_sstable(ObSSTableIndexBuilder *sstable_index_bui return ret; } -int ObTabletDDLUtil::update_ddl_table_store(const ObTabletDDLParam &ddl_param, +int ObTabletDDLUtil::update_ddl_table_store(ObTablet &tablet, + const ObTabletDDLParam &ddl_param, common::ObArenaAllocator &allocator, blocksstable::ObSSTable &sstable) { @@ -781,19 +723,14 @@ int ObTabletDDLUtil::update_ddl_table_store(const ObTabletDDLParam &ddl_param, const ObStorageSchema *tablet_storage_schema = nullptr; if (OB_FAIL(ls_service->get_ls(ddl_param.ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { LOG_WARN("get ls failed", K(ret), K(ddl_param)); - } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, - ddl_param.table_key_.tablet_id_, - tablet_handle, - ObMDSGetTabletMode::READ_ALL_COMMITED))) { - LOG_WARN("get tablet failed", K(ret), K(ddl_param)); - } else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(allocator, tablet_storage_schema))) { + } else if (OB_FAIL(tablet.load_storage_schema(allocator, tablet_storage_schema))) { LOG_WARN("fail to load storage schema failed", K(ret)); } else { const bool is_major_sstable = ddl_param.table_key_.is_major_sstable(); const int64_t rebuild_seq = ls_handle.get_ls()->get_rebuild_seq(); - const int64_t snapshot_version = is_major_sstable ? max(ddl_param.snapshot_version_, tablet_handle.get_obj()->get_snapshot_version()) - : tablet_handle.get_obj()->get_snapshot_version(); - const int64_t multi_version_start = is_major_sstable ? max(ddl_param.snapshot_version_, tablet_handle.get_obj()->get_multi_version_start()) + const int64_t snapshot_version = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_snapshot_version()) + : tablet.get_snapshot_version(); + const int64_t multi_version_start = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_multi_version_start()) : 0; ObTabletHandle new_tablet_handle; ObUpdateTableStoreParam table_store_param(&sstable, @@ -821,7 +758,8 @@ int ObTabletDDLUtil::update_ddl_table_store(const ObTabletDDLParam &ddl_param, return ret; } -int ObTabletDDLUtil::compact_ddl_sstable(ObTableStoreIterator &ddl_sstable_iter, +int ObTabletDDLUtil::compact_ddl_sstable(ObTablet &tablet, + ObTableStoreIterator &ddl_sstable_iter, const ObITableReadInfo &read_info, const bool is_commit, const share::SCN &rec_scn, @@ -867,7 +805,7 @@ int ObTabletDDLUtil::compact_ddl_sstable(ObTableStoreIterator &ddl_sstable_iter, if (TC_REACH_TIME_INTERVAL(10L * 1000L * 1000L)) { LOG_WARN("current ddl sstables not contain all data", K(ddl_sstable_iter), K(ddl_param)); } - } else if (OB_FAIL(meta_tree.init(ddl_param.ls_id_, ddl_param.table_key_, ddl_param.start_scn_, ddl_param.data_format_version_))) { + } else if (OB_FAIL(meta_tree.init(tablet, ddl_param.table_key_, ddl_param.start_scn_, ddl_param.data_format_version_))) { LOG_WARN("init meta tree failed", K(ret), K(ddl_param)); } else if (FALSE_IT(ddl_sstable_iter.resume())) { } else { @@ -957,13 +895,14 @@ int ObTabletDDLUtil::compact_ddl_sstable(ObTableStoreIterator &ddl_sstable_iter, LOG_WARN("build sorted rowkey failed", K(ret)); } else if (OB_FAIL(meta_tree.get_sorted_meta_array(sorted_metas))) { LOG_WARN("get sorted metas failed", K(ret)); - } else if (OB_FAIL(create_ddl_sstable(ddl_param, + } else if (OB_FAIL(create_ddl_sstable(tablet, + ddl_param, sorted_metas, static_cast(first_ddl_sstable), allocator, sstable))) { LOG_WARN("create ddl sstable failed", K(ret)); - } else if (OB_FAIL(update_ddl_table_store(ddl_param, allocator, sstable))) { + } else if (OB_FAIL(update_ddl_table_store(tablet, ddl_param, allocator, sstable))) { LOG_WARN("update ddl table store failed", K(ret)); } else { LOG_INFO("compact ddl sstable success", K(ddl_param)); diff --git a/src/storage/ddl/ob_ddl_merge_task.h b/src/storage/ddl/ob_ddl_merge_task.h index 563d3c448f..f8c8fc5b10 100755 --- a/src/storage/ddl/ob_ddl_merge_task.h +++ b/src/storage/ddl/ob_ddl_merge_task.h @@ -45,20 +45,24 @@ public: tablet_id_(), rec_scn_(share::SCN::min_scn()), is_commit_(false), - start_scn_(share::SCN::min_scn()) + start_scn_(share::SCN::min_scn()), + compat_mode_(lib::Worker::CompatMode::INVALID), + ddl_kv_mgr_handle_() { } bool is_valid() const { - return ls_id_.is_valid() && tablet_id_.is_valid() && start_scn_.is_valid_and_not_min(); + return ls_id_.is_valid() && tablet_id_.is_valid() && start_scn_.is_valid_and_not_min() && ddl_kv_mgr_handle_.is_valid(); } virtual ~ObDDLTableMergeDagParam() = default; - TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(rec_scn), K_(is_commit), K_(start_scn)); + TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(rec_scn), K_(is_commit), K_(start_scn), K_(compat_mode), K_(ddl_kv_mgr_handle)); public: share::ObLSID ls_id_; ObTabletID tablet_id_; share::SCN rec_scn_; bool is_commit_; share::SCN start_scn_; // start log ts at schedule, for skipping expired task + lib::Worker::CompatMode compat_mode_; + ObDDLKvMgrHandle ddl_kv_mgr_handle_; }; class ObDDLTableMergeDag : public share::ObIDag @@ -68,7 +72,7 @@ public: virtual ~ObDDLTableMergeDag(); virtual int init_by_param(const share::ObIDagInitParam *param) override; virtual int create_first_task() override; - INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(ddl_param), K_(compat_mode)); + INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(ddl_param)); public: virtual bool operator == (const ObIDag &other) const override; virtual int64_t hash() const override; @@ -77,13 +81,12 @@ public: virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; virtual bool ignore_warning() override; virtual lib::Worker::CompatMode get_compat_mode() const override - { return compat_mode_; } + { return ddl_param_.compat_mode_; } virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } private: bool is_inited_; ObDDLTableMergeDagParam ddl_param_; - lib::Worker::CompatMode compat_mode_; DISALLOW_COPY_AND_ASSIGN(ObDDLTableMergeDag); }; @@ -141,30 +144,33 @@ public: class ObTabletDDLUtil { public: - static int prepare_index_data_desc(const share::ObLSID &ls_id, - const ObTabletID &tablet_id, + static int prepare_index_data_desc(ObTablet &tablet, const int64_t snapshot_version, const int64_t ddl_format_version, const blocksstable::ObSSTable *first_ddl_sstable, blocksstable::ObDataStoreDesc &data_desc); - static int create_ddl_sstable(const ObTabletDDLParam &ddl_param, + static int create_ddl_sstable(ObTablet &tablet, + const ObTabletDDLParam &ddl_param, const ObIArray &meta_array, const blocksstable::ObSSTable *first_ddl_sstable, common::ObArenaAllocator &allocator, blocksstable::ObSSTable &sstable); - static int create_ddl_sstable(blocksstable::ObSSTableIndexBuilder *sstable_index_builder, + static int create_ddl_sstable(ObTablet &tablet, + blocksstable::ObSSTableIndexBuilder *sstable_index_builder, const ObTabletDDLParam &ddl_param, const blocksstable::ObSSTable *first_ddl_sstable, common::ObArenaAllocator &allocator, blocksstable::ObSSTable &sstable); - static int update_ddl_table_store(const ObTabletDDLParam &ddl_param, + static int update_ddl_table_store(ObTablet &tablet, + const ObTabletDDLParam &ddl_param, common::ObArenaAllocator &allocator, blocksstable::ObSSTable &sstable); - static int compact_ddl_sstable(ObTableStoreIterator &ddl_sstable_iter, + static int compact_ddl_sstable(ObTablet &tablet, + ObTableStoreIterator &ddl_sstable_iter, const ObITableReadInfo &read_info, const bool is_commit, const share::SCN &rec_scn, diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.cpp b/src/storage/ddl/ob_ddl_redo_log_writer.cpp index 810267f44b..ea9b486460 100755 --- a/src/storage/ddl/ob_ddl_redo_log_writer.cpp +++ b/src/storage/ddl/ob_ddl_redo_log_writer.cpp @@ -716,7 +716,7 @@ int ObDDLRedoLogWriter::write( } else if (OB_FAIL(tmp_log.deserialize(buffer, buffer_size, log_start_pos))) { LOG_WARN("fail to deserialize ddl redo log", K(ret)); /* use the ObString data_buffer_ in tmp_log.redo_info_, do not rely on the macro_block_buf in original log*/ - } else if (OB_FAIL(cb->init(ls_id, tmp_log.get_redo_info(), macro_block_id, ddl_kv_mgr_handle))) { + } else if (OB_FAIL(cb->init(ls_id, tmp_log.get_redo_info(), macro_block_id, tablet_handle, ddl_kv_mgr_handle))) { LOG_WARN("init ddl clog callback failed", K(ret)); } else if (OB_FAIL(log_handler->append(buffer, buffer_size, @@ -834,7 +834,7 @@ int ObDDLRedoLogWriter::write_ddl_start_log(ObTabletHandle &tablet_handle, const int64_t saved_snapshot_version = log.get_table_key().get_snapshot_version(); start_scn = scn; // remove ddl sstable if exists and flush ddl start log ts and snapshot version into tablet meta - if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->update_tablet(start_scn, saved_snapshot_version, log.get_data_format_version(), log.get_execution_id(), start_scn))) { + if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->update_tablet(*tablet_handle.get_obj(), start_scn, saved_snapshot_version, log.get_data_format_version(), log.get_execution_id(), start_scn))) { LOG_WARN("clean up ddl sstable failed", K(ret), K(log)); } FLOG_INFO("start ddl kv mgr finished", K(ret), K(start_scn), K(log)); @@ -1220,7 +1220,7 @@ int ObDDLSSTableRedoWriter::end_ddl_redo_and_create_ddl_sstable( if (OB_FAIL(ret)) { } else if (is_remote_write) { LOG_INFO("ddl commit log is written in remote, need wait replay", K(ddl_task_id), K(tablet_id), K(ddl_start_scn), K(commit_scn)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(ddl_start_scn, commit_scn))) { + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(*tablet_handle.get_obj(), ddl_start_scn, commit_scn))) { if (OB_TASK_EXPIRED == ret) { LOG_INFO("ddl task expired", K(ret), K(ls_id), K(tablet_id), K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn()); @@ -1229,7 +1229,7 @@ int ObDDLSSTableRedoWriter::end_ddl_redo_and_create_ddl_sstable( } } if (OB_FAIL(ret)) { - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_merge_success(ddl_start_scn, commit_scn))) { + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_merge_success(*tablet_handle.get_obj(), ddl_start_scn, commit_scn))) { if (OB_TASK_EXPIRED == ret) { LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id), K(ddl_start_scn), "new_ddl_start_scn", diff --git a/src/storage/ddl/ob_ddl_replay_executor.cpp b/src/storage/ddl/ob_ddl_replay_executor.cpp index 2ea59746a5..5b337ee878 100755 --- a/src/storage/ddl/ob_ddl_replay_executor.cpp +++ b/src/storage/ddl/ob_ddl_replay_executor.cpp @@ -273,9 +273,9 @@ int ObDDLCommitReplayExecutor::do_replay_(ObTabletHandle &handle) //TODO(jianyun LOG_WARN("skip replay ddl commit", K(ret), "ls_id", ls_->get_ls_id(), K(handle)); } else if (OB_FAIL(handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { LOG_WARN("get ddl kv mgr failed", K(ret), K_(scn), KPC_(log)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->set_commit_scn(scn_))) { + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->set_commit_scn(handle.get_obj()->get_tablet_meta(), scn_))) { LOG_WARN("failed to start prepare", K(ret), KPC_(log), K_(scn)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(log_->get_start_scn(), scn_))) { + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(*handle.get_obj(), log_->get_start_scn(), scn_))) { if (OB_TABLET_NOT_EXIST == ret || OB_TASK_EXPIRED == ret) { ret = OB_SUCCESS; // exit when tablet not exist or task expired } else { diff --git a/src/storage/ddl/ob_ddl_struct.cpp b/src/storage/ddl/ob_ddl_struct.cpp index 177730c496..e09292aacc 100644 --- a/src/storage/ddl/ob_ddl_struct.cpp +++ b/src/storage/ddl/ob_ddl_struct.cpp @@ -131,7 +131,7 @@ ObDDLKVPendingGuard::ObDDLKVPendingGuard(ObTablet *tablet, const SCN &start_scn, LOG_WARN("invalid arguments", K(ret), KP(tablet), K(scn)); } else if (OB_FAIL(tablet->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { LOG_WARN("get ddl kv mgr failed", K(ret)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_or_create_ddl_kv(start_scn, scn, kv_handle_))) { + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_or_create_ddl_kv(*tablet, start_scn, scn, kv_handle_))) { LOG_WARN("acquire ddl kv failed", K(ret)); } else if (OB_ISNULL(curr_kv = static_cast(kv_handle_.get_table()))) { ret = OB_ERR_UNEXPECTED; @@ -186,7 +186,7 @@ int ObDDLKVPendingGuard::set_macro_block(ObTablet *tablet, const ObDDLMacroBlock } else if (OB_ISNULL(ddl_kv)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ddl kv is null", K(ret), KP(ddl_kv), K(guard)); - } else if (OB_FAIL(ddl_kv->set_macro_block(macro_block))) { + } else if (OB_FAIL(ddl_kv->set_macro_block(*tablet, macro_block))) { LOG_WARN("fail to set macro block info", K(ret)); } else { break; diff --git a/src/storage/ddl/ob_tablet_ddl_kv.cpp b/src/storage/ddl/ob_tablet_ddl_kv.cpp index 9d91eded5e..87ee0038ba 100755 --- a/src/storage/ddl/ob_tablet_ddl_kv.cpp +++ b/src/storage/ddl/ob_tablet_ddl_kv.cpp @@ -49,7 +49,7 @@ ObBlockMetaTree::~ObBlockMetaTree() destroy(); } -int ObBlockMetaTree::init(const share::ObLSID &ls_id, +int ObBlockMetaTree::init(ObTablet &tablet, const ObITable::TableKey &table_key, const share::SCN &ddl_start_scn, const int64_t data_format_version) @@ -59,47 +59,36 @@ int ObBlockMetaTree::init(const share::ObLSID &ls_id, if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); - } else if (OB_UNLIKELY(!ls_id.is_valid() || !table_key.is_valid() || data_format_version <= 0)) { + } else if (OB_UNLIKELY(!table_key.is_valid() || data_format_version <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(table_key)); } else if (FALSE_IT(arena_.set_attr(mem_attr))) { } else if (OB_FAIL(block_tree_.init())) { LOG_WARN("init block tree failed", K(ret)); - } else if (OB_FAIL(ObTabletDDLUtil::prepare_index_data_desc(ls_id, - table_key.tablet_id_, + } else if (OB_FAIL(ObTabletDDLUtil::prepare_index_data_desc(tablet, table_key.get_snapshot_version(), data_format_version, nullptr, // first ddl sstable data_desc_))) { - LOG_WARN("prepare data store desc failed", K(ret), K(ls_id), K(table_key), K(data_format_version)); + LOG_WARN("prepare data store desc failed", K(ret), K(table_key), K(data_format_version)); } else { is_inited_ = true; } return ret; } -int ObDDLKV::init_sstable_param(const share::ObLSID &ls_id, +int ObDDLKV::init_sstable_param(ObTablet &tablet, const ObITable::TableKey &table_key, const share::SCN &ddl_start_scn, ObTabletCreateSSTableParam &sstable_param) { int ret = OB_SUCCESS; - ObLSService *ls_service = MTL(ObLSService *); - ObLSHandle ls_handle; - ObTabletHandle tablet_handle; const ObStorageSchema *storage_schema_ptr = nullptr; ObArenaAllocator allocator; - if (OB_UNLIKELY(!ls_id.is_valid() || !table_key.is_valid() || !ddl_start_scn.is_valid_and_not_min())) { + if (OB_UNLIKELY(!table_key.is_valid() || !ddl_start_scn.is_valid_and_not_min())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(ls_id), K(table_key), K(ddl_start_scn)); - } else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::DDL_MOD))) { - LOG_WARN("get ls failed", K(ret), K(ls_id)); - } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, - table_key.tablet_id_, - tablet_handle, - ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { - LOG_WARN("get tablet failed", K(ret)); - } else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(allocator, storage_schema_ptr))) { + LOG_WARN("invalid argument", K(ret), K(table_key), K(ddl_start_scn)); + } else if (OB_FAIL(tablet.load_storage_schema(allocator, storage_schema_ptr))) { LOG_WARN("load storage schema fail", K(ret), K(table_key)); } else { int64_t column_count = 0; @@ -440,8 +429,7 @@ ObDDLKV::~ObDDLKV() reset(); } -int ObDDLKV::init(const share::ObLSID &ls_id, - const common::ObTabletID &tablet_id, +int ObDDLKV::init(ObTablet &tablet, const SCN &ddl_start_scn, const int64_t snapshot_version, const SCN &last_freezed_scn, @@ -449,6 +437,8 @@ int ObDDLKV::init(const share::ObLSID &ls_id, { int ret = OB_SUCCESS; + const ObLSID &ls_id = tablet.get_tablet_meta().ls_id_; + const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("ObDDLKV has been inited twice", K(ret), KP(this)); @@ -472,9 +462,9 @@ int ObDDLKV::init(const share::ObLSID &ls_id, ddl_param.snapshot_version_ = snapshot_version; ddl_param.data_format_version_ = data_format_version; ObTabletCreateSSTableParam sstable_param; - if (OB_FAIL(block_meta_tree_.init(ls_id, ddl_param.table_key_, ddl_start_scn, data_format_version))) { + if (OB_FAIL(block_meta_tree_.init(tablet, ddl_param.table_key_, ddl_start_scn, data_format_version))) { LOG_WARN("init mem index sstable failed", K(ret), K(ddl_param)); - } else if (OB_FAIL(init_sstable_param(ls_id, ddl_param.table_key_, ddl_start_scn, sstable_param))) { + } else if (OB_FAIL(init_sstable_param(tablet, ddl_param.table_key_, ddl_start_scn, sstable_param))) { LOG_WARN("init sstable param failed", K(ret)); } else if (OB_FAIL(ObSSTable::init(sstable_param, &arena_allocator_))) { LOG_WARN("init sstable failed", K(ret)); @@ -513,7 +503,7 @@ void ObDDLKV::reset() arena_allocator_.reset(); } -int ObDDLKV::set_macro_block(const ObDDLMacroBlock ¯o_block) +int ObDDLKV::set_macro_block(ObTablet &tablet, const ObDDLMacroBlock ¯o_block) { int ret = OB_SUCCESS; const int64_t MAX_DDL_BLOCK_COUNT = 10L * 1024L * 1024L * 1024L / OB_SERVER_BLOCK_MGR.get_macro_block_size(); @@ -547,12 +537,11 @@ int ObDDLKV::set_macro_block(const ObDDLMacroBlock ¯o_block) } } if (OB_SUCC(ret) && get_macro_block_cnt() >= freeze_block_count) { - ObDDLTableMergeDagParam param; - param.ls_id_ = ls_id_; - param.tablet_id_ = tablet_id_; - param.start_scn_ = ddl_start_scn_; + ObDDLKvMgrHandle ddl_kv_mgr_handle; int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { + if (OB_TMP_FAIL(tablet.get_ddl_kv_mgr(ddl_kv_mgr_handle))) { + LOG_WARN("failed to get ddl kv mgr", K(ret)); + } else if (OB_TMP_FAIL(ddl_kv_mgr_handle.get_obj()->schedule_ddl_dump_task(tablet, ddl_start_scn_, SCN::min_scn()))) { LOG_WARN("try schedule ddl merge dag failed when ddl kv is full ", K(tmp_ret), K(ls_id_), K(tablet_id_), K(get_macro_block_cnt())); } @@ -660,7 +649,7 @@ int ObDDLKV::prepare_sstable(const bool need_check/*=true*/) return ret; } -int ObDDLKV::close() +int ObDDLKV::close(ObTablet &tablet) { int ret = OB_SUCCESS; ObArray meta_array; @@ -687,9 +676,9 @@ int ObDDLKV::close() ddl_param.start_scn_ = ddl_start_scn_; ddl_param.snapshot_version_ = snapshot_version_; ddl_param.data_format_version_ = data_format_version_; - if (OB_FAIL(ObTabletDDLUtil::create_ddl_sstable(ddl_param, meta_array, nullptr/*first_ddl_sstable*/, allocator, sstable))) { + if (OB_FAIL(ObTabletDDLUtil::create_ddl_sstable(tablet, ddl_param, meta_array, nullptr/*first_ddl_sstable*/, allocator, sstable))) { LOG_WARN("create ddl sstable failed", K(ret), K(ddl_param)); - } else if (OB_FAIL(ObTabletDDLUtil::update_ddl_table_store(ddl_param, allocator, sstable))) { + } else if (OB_FAIL(ObTabletDDLUtil::update_ddl_table_store(tablet, ddl_param, allocator, sstable))) { LOG_WARN("update ddl table store failed", K(ret), K(ddl_param), K(sstable)); } else { is_closed_ = true; diff --git a/src/storage/ddl/ob_tablet_ddl_kv.h b/src/storage/ddl/ob_tablet_ddl_kv.h index c630b0d3ab..7d165560cd 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv.h +++ b/src/storage/ddl/ob_tablet_ddl_kv.h @@ -49,7 +49,7 @@ class ObBlockMetaTree public: ObBlockMetaTree(); virtual ~ObBlockMetaTree(); - int init(const share::ObLSID &ls_id, + int init(ObTablet &tablet, const ObITable::TableKey &table_key, const share::SCN &ddl_start_scn, const int64_t data_format_version); @@ -115,18 +115,17 @@ public: virtual void inc_ref() override { ATOMIC_AAF(&ref_cnt_, 1); } virtual int64_t dec_ref() override { return ATOMIC_SAF(&ref_cnt_, 1 /* just sub 1 */); } virtual int64_t get_ref() const override { return ObITable::get_ref(); } - int init(const share::ObLSID &ls_id, - const common::ObTabletID &tablet_id, + int init(ObTablet &tablet, const share::SCN &ddl_start_scn, const int64_t snapshot_version, const share::SCN &last_freezed_scn, const int64_t data_format_version); void reset(); - int set_macro_block(const ObDDLMacroBlock ¯o_block); + int set_macro_block(ObTablet &tablet, const ObDDLMacroBlock ¯o_block); int freeze(const share::SCN &freeze_scn); bool is_freezed() const { return ATOMIC_LOAD(&is_freezed_); } - int close(); + int close(ObTablet &tablet); int prepare_sstable(const bool need_check = true); bool is_closed() const { return is_closed_; } share::SCN get_min_scn() const { return min_scn_; } @@ -146,7 +145,7 @@ public: private: int insert_block_meta_tree(const ObDDLMacroHandle ¯o_handle, blocksstable::ObDataMacroBlockMeta *data_macro_meta); - int init_sstable_param(const share::ObLSID &ls_id, + int init_sstable_param(ObTablet &tablet, const ObITable::TableKey &table_key, const share::SCN &ddl_start_scn, ObTabletCreateSSTableParam &sstable_param); diff --git a/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp b/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp index a815fad691..8546d5c0ac 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp +++ b/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp @@ -168,7 +168,7 @@ int ObTabletDDLKvMgr::ddl_start(ObTablet &tablet, } if (OB_SUCC(ret) && !checkpoint_scn.is_valid_and_not_min()) { // remove ddl sstable if exists and flush ddl start log ts and snapshot version into tablet meta - if (OB_FAIL(update_tablet(saved_start_scn, saved_snapshot_version, data_format_version, execution_id, saved_start_scn))) { + if (OB_FAIL(update_tablet(tablet, saved_start_scn, saved_snapshot_version, data_format_version, execution_id, saved_start_scn))) { LOG_WARN("clean up ddl sstable failed", K(ret), K(ls_id_), K(tablet_id_)); } } @@ -176,7 +176,7 @@ int ObTabletDDLKvMgr::ddl_start(ObTablet &tablet, return ret; } -int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, const SCN &commit_scn) +int ObTabletDDLKvMgr::ddl_commit(ObTablet &tablet, const SCN &start_scn, const SCN &commit_scn) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { @@ -188,14 +188,14 @@ int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, const SCN &commit_scn) } else if (start_scn < get_start_scn()) { ret = OB_TASK_EXPIRED; LOG_INFO("skip ddl commit log", K(start_scn), K(*this)); - } else if (OB_FAIL(set_commit_scn(commit_scn))) { + } else if (OB_FAIL(set_commit_scn(tablet.get_tablet_meta(), commit_scn))) { LOG_WARN("failed to set commit scn", K(ret)); - } else if (OB_FAIL(freeze_ddl_kv(commit_scn))) { + } else if (OB_FAIL(freeze_ddl_kv(tablet, commit_scn))) { LOG_WARN("freeze ddl kv failed", K(ret), K(commit_scn)); } else { ret = OB_EAGAIN; while (OB_EAGAIN == ret) { - if (OB_FAIL(update_ddl_major_sstable())) { + if (OB_FAIL(update_ddl_major_sstable(tablet))) { LOG_WARN("update ddl major sstable failed", K(ret)); } if (OB_EAGAIN == ret) { @@ -209,9 +209,12 @@ int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, const SCN &commit_scn) param.rec_scn_ = commit_scn; param.is_commit_ = true; param.start_scn_ = start_scn; + param.compat_mode_ = tablet.get_tablet_meta().compat_mode_; const int64_t start_ts = ObTimeUtility::fast_current_time(); - - if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { + if (OB_FAIL(ret)) { + } else if (OB_FAIL(tablet.get_ddl_kv_mgr(param.ddl_kv_mgr_handle_))) { + LOG_WARN("failed to get ddl kv mgr", K(ret)); + } else if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) { LOG_WARN("schedule ddl merge dag failed", K(ret), K(param)); } else { @@ -227,10 +230,35 @@ int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, const SCN &commit_scn) return ret; } -int ObTabletDDLKvMgr::schedule_ddl_merge_task(const SCN &start_scn, const SCN &commit_scn) +int ObTabletDDLKvMgr::schedule_ddl_dump_task(ObTablet &tablet, const SCN &start_scn, const SCN &rec_scn) +{ + int ret = OB_SUCCESS; + ObDDLTableMergeDagParam param; + param.ls_id_ = ls_id_; + param.tablet_id_ = tablet_id_; + param.rec_scn_ = rec_scn; + param.is_commit_ = false; + param.start_scn_ = start_scn; + param.compat_mode_ = tablet.get_tablet_meta().compat_mode_; + LOG_INFO("schedule ddl dump task", K(param)); + if (OB_UNLIKELY(tablet.get_tablet_meta().tablet_id_ != tablet_id_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("tablet id mismatched", K(ret), K(tablet), KPC(this)); + } else if (OB_FAIL(tablet.get_ddl_kv_mgr(param.ddl_kv_mgr_handle_))) { + LOG_WARN("failed to get ddl kv mgr", K(ret)); + } else if (OB_FAIL(freeze_ddl_kv(tablet))) { + LOG_WARN("ddl kv manager try freeze failed", K(ret), K(param)); + } else if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { + if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) { + LOG_WARN("schedule ddl merge dag failed", K(ret), K(param)); + } + } + return ret; +} + +int ObTabletDDLKvMgr::schedule_ddl_merge_task(ObTablet &tablet, const SCN &start_scn, const SCN &commit_scn) { int ret = OB_SUCCESS; - ObLSHandle ls_handle; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret), K(is_inited_)); @@ -239,18 +267,29 @@ int ObTabletDDLKvMgr::schedule_ddl_merge_task(const SCN &start_scn, const SCN &c } else if (start_scn < get_start_scn()) { ret = OB_TASK_EXPIRED; LOG_INFO("skip ddl commit log", K(start_scn), K(commit_scn), K(*this)); - } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { - LOG_WARN("failed to get log stream", K(ret), K(ls_id_)); } else { - + ObLSHandle ls_handle; + ObTabletHandle tablet_handle; ObDDLTableMergeDagParam param; param.ls_id_ = ls_id_; param.tablet_id_ = tablet_id_; param.rec_scn_ = commit_scn; param.is_commit_ = true; param.start_scn_ = start_scn; - // retry submit dag in case of the previous dag failed - if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { + param.compat_mode_ = tablet.get_tablet_meta().compat_mode_; + // check ls/tablet state by get_ls/ddl_get_tablet, and retry submit dag in case of the previous dag failed + if (OB_FAIL(tablet.get_ddl_kv_mgr(param.ddl_kv_mgr_handle_))) { + LOG_WARN("failed to get ddl kv", K(ret), K(param)); + } else if (OB_FAIL(freeze_ddl_kv(tablet))) { + LOG_WARN("ddl kv manager try freeze failed", K(ret), K(param)); + } else if (OB_FAIL(MTL(ObLSService *)->get_ls(param.ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { + LOG_WARN("failed to get log stream", K(ret), K(param)); + } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, + param.tablet_id_, + tablet_handle, + ObMDSGetTabletMode::READ_ALL_COMMITED))) { + LOG_WARN("failed to get tablet", K(ret), K(param)); + } else if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { if (OB_SIZE_OVERFLOW == ret || OB_EAGAIN == ret) { ret = OB_EAGAIN; } else { @@ -263,7 +302,7 @@ int ObTabletDDLKvMgr::schedule_ddl_merge_task(const SCN &start_scn, const SCN &c return ret; } -int ObTabletDDLKvMgr::wait_ddl_merge_success(const SCN &start_scn, const SCN &commit_scn) +int ObTabletDDLKvMgr::wait_ddl_merge_success(ObTablet &tablet, const SCN &start_scn, const SCN &commit_scn) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { @@ -283,7 +322,7 @@ int ObTabletDDLKvMgr::wait_ddl_merge_success(const SCN &start_scn, const SCN &co while (OB_SUCC(ret)) { if (OB_FAIL(THIS_WORKER.check_status())) { LOG_WARN("check status failed", K(ret)); - } else if (OB_FAIL(schedule_ddl_merge_task(start_scn, commit_scn))) { + } else if (OB_FAIL(schedule_ddl_merge_task(tablet, start_scn, commit_scn))) { if (OB_EAGAIN == ret) { ob_usleep(100L); // 100us. ret = OB_SUCCESS; // retry @@ -302,18 +341,21 @@ int ObTabletDDLKvMgr::wait_ddl_merge_success(const SCN &start_scn, const SCN &co return ret; } -int ObTabletDDLKvMgr::get_ddl_major_merge_param(const ObTabletMeta &tablet_meta, ObDDLTableMergeDagParam ¶m) +int ObTabletDDLKvMgr::get_ddl_major_merge_param(ObTablet &tablet, ObDDLTableMergeDagParam ¶m) { int ret = OB_SUCCESS; uint32_t lock_tid = 0; - if (OB_FAIL(rdlock(TRY_LOCK_TIMEOUT, lock_tid))) { + if (OB_FAIL(tablet.get_ddl_kv_mgr(param.ddl_kv_mgr_handle_))) { + LOG_WARN("failed to get ddl kv mgr", K(ret)); + } else if (OB_FAIL(rdlock(TRY_LOCK_TIMEOUT, lock_tid))) { LOG_WARN("failed to rdlock", K(ret), KPC(this)); - } else if (can_schedule_major_compaction_nolock(tablet_meta)) { + } else if (can_schedule_major_compaction_nolock(tablet.get_tablet_meta())) { param.ls_id_ = ls_id_; param.tablet_id_ = tablet_id_; - param.rec_scn_ = get_commit_scn(tablet_meta); + param.rec_scn_ = get_commit_scn(tablet.get_tablet_meta()); param.is_commit_ = true; param.start_scn_ = start_scn_; + param.compat_mode_ = tablet.get_tablet_meta().compat_mode_; } else { ret = OB_EAGAIN; } @@ -387,27 +429,18 @@ int ObTabletDDLKvMgr::get_rec_scn(SCN &rec_scn) return ret; } -int ObTabletDDLKvMgr::set_commit_scn(const SCN &commit_scn) +int ObTabletDDLKvMgr::set_commit_scn(const ObTabletMeta &tablet_meta, const SCN &commit_scn) { int ret = OB_SUCCESS; - ObLSHandle ls_handle; - ObTabletHandle tablet_handle; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret), K(is_inited_)); } else if (OB_UNLIKELY(commit_scn <= SCN::min_scn())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(commit_scn)); - } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { - LOG_WARN("failed to get log stream", K(ret), K(ls_id_)); - } else if (OB_FAIL(ls_handle.get_ls()->get_tablet(tablet_id_, - tablet_handle, - 0, - ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { - LOG_WARN("get tablet handle failed", K(ret), K(ls_id_), K(tablet_id_)); } else { ObLatchWGuard state_guard(state_lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); - const SCN old_commit_scn = get_commit_scn(tablet_handle.get_obj()->get_tablet_meta()); + const SCN old_commit_scn = get_commit_scn(tablet_meta); if (old_commit_scn.is_valid_and_not_min() && old_commit_scn != commit_scn) { ret = OB_ERR_UNEXPECTED; LOG_WARN("already committed by others", K(ret), K(commit_scn), K(*this)); @@ -679,7 +712,8 @@ void ObTabletDDLKvMgr::unlock(const uint32_t tid) } } -int ObTabletDDLKvMgr::update_tablet(const SCN &start_scn, +int ObTabletDDLKvMgr::update_tablet(ObTablet &tablet, + const SCN &start_scn, const int64_t snapshot_version, const int64_t data_format_version, const int64_t execution_id, @@ -687,7 +721,6 @@ int ObTabletDDLKvMgr::update_tablet(const SCN &start_scn, { int ret = OB_SUCCESS; ObLSHandle ls_handle; - ObTabletHandle tablet_handle; ObArenaAllocator tmp_arena("DDLUpdateTblTmp"); const ObStorageSchema *storage_schema = nullptr; if (OB_UNLIKELY(!is_inited_)) { @@ -698,19 +731,13 @@ int ObTabletDDLKvMgr::update_tablet(const SCN &start_scn, LOG_WARN("invalid argument", K(ret), K(start_scn), K(snapshot_version), K(ddl_checkpoint_scn)); } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { LOG_WARN("failed to get log stream", K(ret), K(ls_id_)); - } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, - tablet_id_, - tablet_handle, - ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { - LOG_WARN("get tablet handle failed", K(ret), K(ls_id_), K(tablet_id_)); - } else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(tmp_arena, storage_schema))) { - LOG_WARN("failed to load storage schema", K(ret), K(tablet_handle)); + } else if (OB_FAIL(tablet.load_storage_schema(tmp_arena, storage_schema))) { + LOG_WARN("failed to load storage schema", K(ret), K(tablet)); } else { ObSSTable sstable; const int64_t rebuild_seq = ls_handle.get_ls()->get_rebuild_seq(); ObTabletHandle new_tablet_handle; - - ObUpdateTableStoreParam param(tablet_handle.get_obj()->get_snapshot_version(), + ObUpdateTableStoreParam param(tablet.get_snapshot_version(), ObVersionRange::MIN_VERSION, // multi_version_start storage_schema, rebuild_seq); @@ -720,7 +747,7 @@ int ObTabletDDLKvMgr::update_tablet(const SCN &start_scn, param.ddl_info_.ddl_checkpoint_scn_ = ddl_checkpoint_scn; param.ddl_info_.ddl_execution_id_ = execution_id; param.ddl_info_.data_format_version_ = data_format_version; - if (OB_FAIL(create_empty_ddl_sstable(tmp_arena, sstable))) { + if (OB_FAIL(create_empty_ddl_sstable(tablet, tmp_arena, sstable))) { LOG_WARN("create empty ddl sstable failed", K(ret)); } else if (FALSE_IT(param.sstable_ = &sstable)) { } else if (OB_FAIL(ls_handle.get_ls()->update_tablet_table_store(tablet_id_, param, new_tablet_handle))) { @@ -733,7 +760,7 @@ int ObTabletDDLKvMgr::update_tablet(const SCN &start_scn, return ret; } -int ObTabletDDLKvMgr::create_empty_ddl_sstable(common::ObArenaAllocator &allocator, blocksstable::ObSSTable &sstable) +int ObTabletDDLKvMgr::create_empty_ddl_sstable(ObTablet &tablet, common::ObArenaAllocator &allocator, blocksstable::ObSSTable &sstable) { int ret = OB_SUCCESS; ObTabletDDLParam ddl_param; @@ -744,14 +771,14 @@ int ObTabletDDLKvMgr::create_empty_ddl_sstable(common::ObArenaAllocator &allocat ddl_param.table_key_.scn_range_.start_scn_ = SCN::scn_dec(start_scn_); ddl_param.table_key_.scn_range_.end_scn_ = start_scn_; ObArray empty_meta_array; - if (OB_FAIL(ObTabletDDLUtil::create_ddl_sstable(ddl_param, empty_meta_array, nullptr/*first_ddl_sstable*/, allocator, sstable))) { + if (OB_FAIL(ObTabletDDLUtil::create_ddl_sstable(tablet, ddl_param, empty_meta_array, nullptr/*first_ddl_sstable*/, allocator, sstable))) { LOG_WARN("create empty ddl sstable failed", K(ret)); } } return ret; } -int ObTabletDDLKvMgr::update_ddl_major_sstable() +int ObTabletDDLKvMgr::update_ddl_major_sstable(ObTablet &tablet) { int ret = OB_SUCCESS; ObLSHandle ls_handle; @@ -763,21 +790,16 @@ int ObTabletDDLKvMgr::update_ddl_major_sstable() LOG_WARN("not init", K(ret)); } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { LOG_WARN("failed to get log stream", K(ret), K(ls_id_)); - } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, - tablet_id_, - tablet_handle, - ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { - LOG_WARN("get tablet handle failed", K(ret), K(ls_id_), K(tablet_id_)); - } else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(allocator, storage_schema))) { + } else if (OB_FAIL(tablet.load_storage_schema(allocator, storage_schema))) { LOG_WARN("load storage schema failed", K(ret), K(ls_id_), K(tablet_id_)); } else { ObTabletHandle new_tablet_handle; - ObUpdateTableStoreParam param(tablet_handle.get_obj()->get_snapshot_version(), + ObUpdateTableStoreParam param(tablet.get_snapshot_version(), ObVersionRange::MIN_VERSION, // multi_version_start storage_schema, ls_handle.get_ls()->get_rebuild_seq()); param.ddl_info_.keep_old_ddl_sstable_ = true; - param.ddl_info_.ddl_commit_scn_ = get_commit_scn(tablet_handle.get_obj()->get_tablet_meta()); + param.ddl_info_.ddl_commit_scn_ = get_commit_scn(tablet.get_tablet_meta()); if (OB_FAIL(ls_handle.get_ls()->update_tablet_table_store(tablet_id_, param, new_tablet_handle))) { LOG_WARN("failed to update tablet table store", K(ret), K(ls_id_), K(tablet_id_), K(param)); } @@ -881,7 +903,7 @@ int ObTabletDDLKvMgr::get_active_ddl_kv_impl(ObTableHandleV2 &kv_handle) return ret; } -int ObTabletDDLKvMgr::get_or_create_ddl_kv(const SCN &start_scn, const SCN &scn, ObTableHandleV2 &kv_handle) +int ObTabletDDLKvMgr::get_or_create_ddl_kv(ObTablet &tablet, const SCN &start_scn, const SCN &scn, ObTableHandleV2 &kv_handle) { int ret = OB_SUCCESS; kv_handle.reset(); @@ -918,7 +940,7 @@ int ObTabletDDLKvMgr::get_or_create_ddl_kv(const SCN &start_scn, const SCN &scn, try_get_ddl_kv_unlock(scn, kv_handle); if (kv_handle.is_valid()) { // do nothing - } else if (OB_FAIL(alloc_ddl_kv(kv_handle))) { + } else if (OB_FAIL(alloc_ddl_kv(tablet, kv_handle))) { LOG_WARN("create ddl kv failed", K(ret)); } } @@ -948,7 +970,7 @@ void ObTabletDDLKvMgr::try_get_ddl_kv_unlock(const SCN &scn, ObTableHandleV2 &kv } } -int ObTabletDDLKvMgr::freeze_ddl_kv(const SCN &freeze_scn) +int ObTabletDDLKvMgr::freeze_ddl_kv(ObTablet &tablet, const SCN &freeze_scn) { int ret = OB_SUCCESS; ObTableHandleV2 kv_handle; @@ -965,7 +987,7 @@ int ObTabletDDLKvMgr::freeze_ddl_kv(const SCN &freeze_scn) if (OB_SUCC(ret) && !kv_handle.is_valid() && freeze_scn > max_freeze_scn_) { // freeze_scn > 0 only occured when ddl commit // assure there is an alive ddl kv, for waiting pre-logs - if (OB_FAIL(alloc_ddl_kv(kv_handle))) { + if (OB_FAIL(alloc_ddl_kv(tablet, kv_handle))) { LOG_WARN("create ddl kv failed", K(ret)); } } @@ -1115,7 +1137,7 @@ int ObTabletDDLKvMgr::check_has_effective_ddl_kv(bool &has_ddl_kv) return ret; } -int ObTabletDDLKvMgr::alloc_ddl_kv(ObTableHandleV2 &kv_handle) +int ObTabletDDLKvMgr::alloc_ddl_kv(ObTablet &tablet, ObTableHandleV2 &kv_handle) { int ret = OB_SUCCESS; kv_handle.reset(); @@ -1136,8 +1158,7 @@ int ObTabletDDLKvMgr::alloc_ddl_kv(ObTableHandleV2 &kv_handle) } else if (OB_ISNULL(kv = static_cast(tmp_kv_handle.get_table()))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ddl kv is null", K(ret)); - } else if (OB_FAIL(kv->init(ls_id_, - tablet_id_, + } else if (OB_FAIL(kv->init(tablet, start_scn_, table_key_.get_snapshot_version(), max_freeze_scn_, diff --git a/src/storage/ddl/ob_tablet_ddl_kv_mgr.h b/src/storage/ddl/ob_tablet_ddl_kv_mgr.h index a3d36b733d..3f770532cc 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv_mgr.h +++ b/src/storage/ddl/ob_tablet_ddl_kv_mgr.h @@ -38,22 +38,23 @@ public: int init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id); // init before memtable mgr int ddl_start_nolock(const ObITable::TableKey &table_key, const share::SCN &start_scn, const int64_t data_format_version, const int64_t execution_id, const share::SCN &checkpoint_scn); int ddl_start(ObTablet &tablet, const ObITable::TableKey &table_key, const share::SCN &start_scn, const int64_t data_format_version, const int64_t execution_id, const share::SCN &checkpoint_scn); - int ddl_commit(const share::SCN &start_scn, const share::SCN &commit_scn); // schedule build a major sstable - int schedule_ddl_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn); // try wait build major sstable - int wait_ddl_merge_success(const share::SCN &start_scn, const share::SCN &commit_scn); + int ddl_commit(ObTablet &tablet, const share::SCN &start_scn, const share::SCN &commit_scn); // schedule build a major sstable + int schedule_ddl_dump_task(ObTablet &tablet, const share::SCN &start_scn, const share::SCN &rec_scn); + int schedule_ddl_merge_task(ObTablet &tablet, const share::SCN &start_scn, const share::SCN &commit_scn); // try wait build major sstable + int wait_ddl_merge_success(ObTablet &tablet, const share::SCN &start_scn, const share::SCN &commit_scn); int get_ddl_param(ObTabletDDLParam &ddl_param); - int get_or_create_ddl_kv(const share::SCN &start_scn, const share::SCN &scn, ObTableHandleV2 &kv_handle); // used in active ddl kv guard + int get_or_create_ddl_kv(ObTablet &tablet, const share::SCN &start_scn, const share::SCN &scn, ObTableHandleV2 &kv_handle); // used in active ddl kv guard int get_freezed_ddl_kv(const share::SCN &freeze_scn, ObTableHandleV2 &kv_handle); // locate ddl kv with exeact freeze log ts int get_ddl_kvs(const bool frozen_only, ObTablesHandleArray &kv_handle_array); // get all freeze ddl kvs int get_ddl_kvs_for_query(ObTablet &tablet, ObTablesHandleArray &kv_handle_array); - int freeze_ddl_kv(const share::SCN &freeze_scn = share::SCN::min_scn()); // freeze the active ddl kv, when memtable freeze or ddl commit + int freeze_ddl_kv(ObTablet &tablet, const share::SCN &freeze_scn = share::SCN::min_scn()); // freeze the active ddl kv, when memtable freeze or ddl commit int release_ddl_kvs(const share::SCN &rec_scn); // release persistent ddl kv, used in ddl merge task for free ddl kv int check_has_effective_ddl_kv(bool &has_ddl_kv); // used in ddl log handler for checkpoint int get_ddl_kv_min_scn(share::SCN &min_scn); // for calculate rec_scn of ls share::SCN get_start_scn() const { return start_scn_.atomic_load(); } bool is_started() const { return share::SCN::min_scn() != start_scn_; } void set_commit_scn_nolock(const share::SCN &scn) { commit_scn_ = scn; } - int set_commit_scn(const share::SCN &scn); + int set_commit_scn(const ObTabletMeta &tablet_meta, const share::SCN &scn); share::SCN get_commit_scn(const ObTabletMeta &tablet_meta); int set_commit_success(const share::SCN &start_scn); bool is_commit_success(); @@ -68,14 +69,14 @@ public: int rdlock(const int64_t timeout_us, uint32_t &lock_tid); int wrlock(const int64_t timeout_us, uint32_t &lock_tid); void unlock(const uint32_t lock_tid); - int update_tablet(const share::SCN &start_scn, const int64_t snapshot_version, const int64_t data_format_version, const int64_t execution_id, const share::SCN &ddl_checkpoint_scn); + int update_tablet(ObTablet &tablet, const share::SCN &start_scn, const int64_t snapshot_version, const int64_t data_format_version, const int64_t execution_id, const share::SCN &ddl_checkpoint_scn); int64_t get_count(); OB_INLINE void inc_ref() { ATOMIC_INC(&ref_cnt_); } OB_INLINE int64_t dec_ref() { return ATOMIC_SAF(&ref_cnt_, 1 /* just sub 1 */); } OB_INLINE int64_t get_ref() const { return ATOMIC_LOAD(&ref_cnt_); } OB_INLINE void reset() { destroy(); } bool can_schedule_major_compaction_nolock(const ObTabletMeta &tablet_meta); - int get_ddl_major_merge_param(const ObTabletMeta &tablet_meta, ObDDLTableMergeDagParam &merge_param); + int get_ddl_major_merge_param(ObTablet &tablet, ObDDLTableMergeDagParam &merge_param); int get_rec_scn(share::SCN &rec_scn); TO_STRING_KV(K_(is_inited), K_(success_start_scn), K_(ls_id), K_(tablet_id), K_(table_key), K_(data_format_version), K_(start_scn), K_(commit_scn), K_(max_freeze_scn), @@ -83,14 +84,14 @@ public: private: int64_t get_idx(const int64_t pos) const; - int alloc_ddl_kv(ObTableHandleV2 &kv_handle); + int alloc_ddl_kv(ObTablet &tablet, ObTableHandleV2 &kv_handle); void free_ddl_kv(const int64_t idx); int get_active_ddl_kv_impl(ObTableHandleV2 &kv_handle); void try_get_ddl_kv_unlock(const share::SCN &scn, ObTableHandleV2 &kv_handle); int get_ddl_kvs_unlock(const bool frozen_only, ObTablesHandleArray &kv_handle_array); int64_t get_count_nolock() const; - int update_ddl_major_sstable(); - int create_empty_ddl_sstable(common::ObArenaAllocator &allocator, blocksstable::ObSSTable &sstable); + int update_ddl_major_sstable(ObTablet &tablet); + int create_empty_ddl_sstable(ObTablet &tablet, common::ObArenaAllocator &allocator, blocksstable::ObSSTable &sstable); void cleanup_unlock(); void destroy(); public: diff --git a/src/storage/ls/ob_ls_ddl_log_handler.cpp b/src/storage/ls/ob_ls_ddl_log_handler.cpp index e85a5a3ce1..0c5656c789 100644 --- a/src/storage/ls/ob_ls_ddl_log_handler.cpp +++ b/src/storage/ls/ob_ls_ddl_log_handler.cpp @@ -237,13 +237,12 @@ int ObLSDDLLogHandler::flush(SCN &rec_scn) LOG_WARN("failed to check ddl kv", K(ret)); } else if (has_ddl_kv) { DEBUG_SYNC(BEFORE_DDL_CHECKPOINT); - ObDDLTableMergeDagParam param; - param.ls_id_ = ls_->get_ls_id(); - param.tablet_id_ = ddl_kv_mgr_handle.get_obj()->get_tablet_id(); - param.start_scn_ = ddl_kv_mgr_handle.get_obj()->get_start_scn(); - param.rec_scn_ = rec_scn; - LOG_INFO("schedule ddl merge dag", K(param)); - if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { + const SCN start_scn = ddl_kv_mgr_handle.get_obj()->get_start_scn(); + const ObTabletID &tablet_id = ddl_kv_mgr_handle.get_obj()->get_tablet_id(); + ObTabletHandle tablet_handle; + if (OB_FAIL(ls_->get_tablet(tablet_id, tablet_handle))) { + LOG_WARN("failed to get tablet", K(ret), K(ls_->get_ls_id()), K(tablet_id)); + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->schedule_ddl_dump_task(*tablet_handle.get_obj(), start_scn, rec_scn))) { if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) { LOG_WARN("failed to schedule ddl kv merge dag", K(ret)); } else {