From b19b08e1378a44ca33a65bea25b5024ea88532dc Mon Sep 17 00:00:00 2001 From: Charles0429 Date: Fri, 9 Feb 2024 04:56:41 +0000 Subject: [PATCH] optimize ddl kv dump scheduler and ddl commit --- src/observer/ob_rpc_processor_simple.cpp | 3 ++- .../compaction/ob_tenant_tablet_scheduler.cpp | 14 +++++++--- src/storage/ddl/ob_ddl_replay_executor.cpp | 2 +- .../ddl/ob_direct_insert_sstable_ctx_new.cpp | 26 ++++++++++++------- .../ddl/ob_direct_insert_sstable_ctx_new.h | 7 ++--- src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp | 24 +++++++++++++++++ src/storage/ddl/ob_tablet_ddl_kv_mgr.h | 1 + 7 files changed, 60 insertions(+), 17 deletions(-) diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 93a3af766c..6369f8abb4 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -2453,7 +2453,8 @@ int ObRpcRemoteWriteDDLCommitLogP::process() arg_.start_scn_, commit_scn, arg_.table_id_, - arg_.ddl_task_id_))) { + arg_.ddl_task_id_, + false/*is replay*/))) { LOG_WARN("failed to do ddl kv commit", K(ret), K(arg_)); } else { result_ = commit_scn.get_val_for_tx(); diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index 340bc364c9..7bc48509ab 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -1177,8 +1177,10 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge( int ret = OB_SUCCESS; ObDDLTableMergeDagParam param; ObTabletDirectLoadMgrHandle direct_load_mgr_handle; + ObDDLKvMgrHandle ddl_kv_mgr_handle; ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *); bool is_major_sstable_exist = false; + bool has_freezed_ddl_kv = false; if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_handle.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_handle)); @@ -1200,13 +1202,19 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge( } else { LOG_WARN("get tablet direct load mgr failed", K(ret), "tablet_id", tablet_handle.get_obj()->get_tablet_meta().tablet_id_); } + } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { + LOG_WARN("get ddl kv mgr failed", K(ret)); + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->check_has_freezed_ddl_kv(has_freezed_ddl_kv))) { + LOG_WARN("check has freezed ddl kv failed", K(ret)); } else if (OB_FAIL(direct_load_mgr_handle.get_full_obj()->prepare_ddl_merge_param(*tablet_handle.get_obj(), param))) { if (OB_EAGAIN != ret) { LOG_WARN("prepare major merge param failed", K(ret), "tablet_id", tablet_handle.get_obj()->get_tablet_meta().tablet_id_); } - } else if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { - if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) { - LOG_WARN("schedule ddl merge dag failed", K(ret), K(param)); + } else if (has_freezed_ddl_kv || param.is_commit_) { + if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { + if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) { + LOG_WARN("schedule ddl merge dag failed", K(ret), K(param)); + } } } return ret; diff --git a/src/storage/ddl/ob_ddl_replay_executor.cpp b/src/storage/ddl/ob_ddl_replay_executor.cpp index 0efb76fa5c..ccd79f30b9 100644 --- a/src/storage/ddl/ob_ddl_replay_executor.cpp +++ b/src/storage/ddl/ob_ddl_replay_executor.cpp @@ -428,7 +428,7 @@ int ObDDLCommitReplayExecutor::do_replay_(ObTabletHandle &tablet_handle) //TODO( } else if (OB_ISNULL(data_direct_load_mgr = direct_load_mgr_handle.get_full_obj())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected err", K(ret), K(table_key)); - } else if (OB_FAIL(data_direct_load_mgr->commit(*tablet_handle.get_obj(), log_->get_start_scn(), scn_, false/*wait_major_generate*/))) { + } else if (OB_FAIL(data_direct_load_mgr->commit(*tablet_handle.get_obj(), log_->get_start_scn(), scn_, 0/*unused table_id*/, 0/*unused ddl_task_id*/, true/*is replay*/))) { if (OB_TABLET_NOT_EXIST == ret || OB_TASK_EXPIRED == ret) { ret = OB_SUCCESS; // exit when tablet not exist or task expired } else { diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp index 287aa3abb3..0037e0d97a 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp @@ -2155,7 +2155,7 @@ int ObTabletFullDirectLoadMgr::close(const int64_t execution_id, const SCN &star ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected err", K(ret), KPC(this)); } else if (OB_FAIL(commit(*tablet_handle.get_obj(), start_scn, commit_scn, - sqc_build_ctx_.build_param_.runtime_only_param_.table_id_, sqc_build_ctx_.build_param_.runtime_only_param_.task_id_))) { + sqc_build_ctx_.build_param_.runtime_only_param_.table_id_, sqc_build_ctx_.build_param_.runtime_only_param_.task_id_, false/*is replay*/))) { LOG_WARN("failed to do ddl kv commit", K(ret), KPC(this)); } @@ -2163,11 +2163,11 @@ int ObTabletFullDirectLoadMgr::close(const int64_t execution_id, const SCN &star } else if (sstable_already_created || is_delay_build_major) { LOG_INFO("sstable had already created, skip waiting for major generated and reporting chksum", K(start_scn), K(commit_scn), K(sstable_already_created), K(is_delay_build_major)); - } else if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/))) { + } else if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/, false/*is_replay*/))) { LOG_WARN("schedule merge task and wait real major generate", K(ret), K(is_remote_write), K(sstable_already_created), K(start_scn), K(commit_scn)); } else if (lob_mgr_handle_.is_valid() && - OB_FAIL(lob_mgr_handle_.get_full_obj()->schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/))) { + OB_FAIL(lob_mgr_handle_.get_full_obj()->schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/, false/*is_replay*/))) { LOG_WARN("schedule merge task and wait real major generate for lob failed", K(ret), K(is_remote_write), K(sstable_already_created), K(start_scn), K(commit_scn)); } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, tablet_id_, new_tablet_handle))) { @@ -2403,7 +2403,8 @@ int ObTabletFullDirectLoadMgr::commit( const share::SCN &start_scn, const share::SCN &commit_scn, const uint64_t table_id, - const int64_t ddl_task_id) + const int64_t ddl_task_id, + const bool is_replay) { int ret = OB_SUCCESS; ObDDLKvMgrHandle ddl_kv_mgr_handle; @@ -2435,7 +2436,7 @@ int ObTabletFullDirectLoadMgr::commit( } if (OB_SUCC(ret)) { - if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, false/*wait_major_generate*/))) { + if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, false/*wait_major_generate*/, is_replay))) { LOG_WARN("schedule major merge task failed", K(ret)); } } @@ -2456,22 +2457,26 @@ int ObTabletFullDirectLoadMgr::commit( LOG_ERROR("ls should not be null", K(ret)); } else if (OB_FAIL(ls->get_tablet(lob_tablet_id, lob_tablet_handle, ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { LOG_WARN("get tablet handle failed", K(ret), K(ls_id), K(lob_tablet_id)); - } else if (OB_FAIL(lob_mgr_handle_.get_full_obj()->commit(*lob_tablet_handle.get_obj(), start_scn, commit_scn))) { + } else if (OB_FAIL(lob_mgr_handle_.get_full_obj()->commit(*lob_tablet_handle.get_obj(), start_scn, commit_scn, table_id, ddl_task_id, is_replay))) { LOG_WARN("commit for lob failed", K(ret), K(start_scn), K(commit_scn)); } } return ret; } -int ObTabletFullDirectLoadMgr::schedule_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool wait_major_generated) +int ObTabletFullDirectLoadMgr::schedule_merge_task( + const share::SCN &start_scn, + const share::SCN &commit_scn, + const bool wait_major_generated, + const bool is_replay) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret), K(is_inited_)); - } else if (OB_UNLIKELY(!start_scn.is_valid_and_not_min() || !commit_scn.is_valid_and_not_min())) { + } else if (OB_UNLIKELY(!start_scn.is_valid_and_not_min() || !commit_scn.is_valid_and_not_min() || (is_replay && wait_major_generated))) { ret = OB_ERR_SYS; - LOG_WARN("unknown start scn or commit snc", K(ret), K(start_scn), K(commit_scn)); + LOG_WARN("unknown start scn or commit snc", K(ret), K(start_scn), K(commit_scn), K(is_replay), K(wait_major_generated)); } else { const int64_t wait_start_ts = ObTimeUtility::fast_current_time(); while (OB_SUCC(ret)) { @@ -2492,6 +2497,9 @@ int ObTabletFullDirectLoadMgr::schedule_merge_task(const share::SCN &start_scn, LOG_WARN("schedule ddl merge dag failed", K(ret), K(param)); } else { ret = OB_SUCCESS; + if (is_replay) { + break; + } } } else if (!wait_major_generated) { // schedule successfully and no need to wait physical major generates. diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h index 2272cf5099..2cf4def514 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h @@ -422,8 +422,9 @@ public: ObTablet &tablet, const share::SCN &start_scn, const share::SCN &commit_scn, - const uint64_t table_id = 0, - const int64_t ddl_task_id = 0); // schedule build a major sstable + const uint64_t table_id, + const int64_t ddl_task_id, + const bool is_replay); // schedule build a major sstable void set_commit_scn_nolock(const share::SCN &scn); int set_commit_scn(const share::SCN &scn); @@ -441,7 +442,7 @@ public: INHERIT_TO_STRING_KV("ObTabletDirectLoadMgr", ObTabletDirectLoadMgr, K_(start_scn), K_(commit_scn), K_(execution_id)); private: bool is_started() { return start_scn_.is_valid_and_not_min(); } - int schedule_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool wait_major_generated); // try wait build major sstable + int schedule_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool wait_major_generated, const bool is_replay); // try wait build major sstable int cleanup_unlock(); int init_ddl_table_store(const share::SCN &start_scn, const int64_t snapshot_version, const share::SCN &ddl_checkpoint_scn); int update_major_sstable(); diff --git a/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp b/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp index e13d17a8c3..4fc874b9ba 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp +++ b/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp @@ -568,6 +568,30 @@ int ObTabletDDLKvMgr::check_has_effective_ddl_kv(bool &has_ddl_kv) return ret; } +int ObTabletDDLKvMgr::check_has_freezed_ddl_kv(bool &has_freezed_ddl_kv) +{ + int ret = OB_SUCCESS; + has_freezed_ddl_kv = false; + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret)); + } else { + for (int64_t pos = head_; !has_freezed_ddl_kv && OB_SUCC(ret) && pos < tail_; ++pos) { + const int64_t idx = get_idx(pos); + ObDDLKVHandle &cur_kv_handle = ddl_kv_handles_[idx]; + ObDDLKV *cur_kv = cur_kv_handle.get_obj(); + if (OB_ISNULL(cur_kv)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ddl kv is null", K(ret), K(ls_id_), K(tablet_id_), KP(cur_kv), K(pos), K(head_), K(tail_)); + } else if (cur_kv->is_freezed()) { + has_freezed_ddl_kv = true; + } + } + } + return ret; +} + int ObTabletDDLKvMgr::alloc_ddl_kv( const share::SCN &start_scn, const int64_t snapshot_version, diff --git a/src/storage/ddl/ob_tablet_ddl_kv_mgr.h b/src/storage/ddl/ob_tablet_ddl_kv_mgr.h index c0aaeb083d..7f0887eafc 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv_mgr.h +++ b/src/storage/ddl/ob_tablet_ddl_kv_mgr.h @@ -55,6 +55,7 @@ public: const share::SCN &freeze_scn = share::SCN::min_scn()); // freeze the active ddl kv, when memtable freeze or ddl commit int release_ddl_kvs(const share::SCN &rec_scn); // release persistent ddl kv, used in ddl merge task for free ddl kv int check_has_effective_ddl_kv(bool &has_ddl_kv); // used in ddl log handler for checkpoint + int check_has_freezed_ddl_kv(bool &has_freezed_ddl_kv); int64_t get_count(); void set_ddl_kv(const int64_t idx, ObDDLKVHandle &kv_handle); //for unittest OB_INLINE void inc_ref() { ATOMIC_INC(&ref_cnt_); }