diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index dce8617dd5..43e8ce68a1 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -378,7 +378,6 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam ¶m) { int ret = OB_SUCCESS; ObITable::TableKey hidden_table_key; - SCN start_scn; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObComplementDataContext not init", K(ret)); @@ -396,8 +395,11 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam ¶m) ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected err", K(ret), K(MTL_ID())); } else if (OB_FAIL(tenant_direct_load_mgr->open_tablet_direct_load(true, /*is_full_direct_load*/ - param.dest_ls_id_, param.dest_tablet_id_, context_id_, start_scn, tablet_direct_load_mgr_handle_))) { + param.dest_ls_id_, param.dest_tablet_id_, context_id_, start_scn_, tablet_direct_load_mgr_handle_))) { LOG_WARN("write ddl start log failed", K(ret)); + } else if (OB_UNLIKELY(!start_scn_.is_valid_and_not_min())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid start scn", K(ret), K(start_scn_)); } LOG_INFO("complement task start ddl redo success", K(ret), K(param)); } @@ -1367,10 +1369,13 @@ int ObComplementWriteTask::append_row(ObScan *scan) } else if (OB_UNLIKELY(!direct_load_hdl.get_full_obj()->get_start_scn().is_valid_and_not_min())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected err", K(ret), K(direct_load_hdl.get_full_obj()->get_start_scn())); + } else if (OB_UNLIKELY(context_->start_scn_ != direct_load_hdl.get_full_obj()->get_start_scn())) { + ret = OB_TASK_EXPIRED; + LOG_WARN("task expired", K(ret), K(context_->start_scn_), "start_scn", direct_load_hdl.get_full_obj()->get_start_scn()); } else if (OB_FAIL(callback.init(DDL_MB_DATA_TYPE, hidden_table_key, param_->task_id_, - direct_load_hdl.get_full_obj()->get_start_scn(), + context_->start_scn_, param_->data_format_version_, &sstable_redo_writer))) { LOG_WARN("fail to init data callback", K(ret), K(hidden_table_key)); diff --git a/src/storage/ddl/ob_complement_data_task.h b/src/storage/ddl/ob_complement_data_task.h index e7104d9491..8e4ef19b7b 100644 --- a/src/storage/ddl/ob_complement_data_task.h +++ b/src/storage/ddl/ob_complement_data_task.h @@ -121,7 +121,7 @@ public: ObComplementDataContext(): is_inited_(false), is_major_sstable_exist_(false), complement_data_ret_(common::OB_SUCCESS), allocator_("CompleteDataCtx", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), lock_(ObLatchIds::COMPLEMENT_DATA_CONTEXT_LOCK), concurrent_cnt_(0), - data_sstable_redo_writer_(), index_builder_(nullptr), tablet_direct_load_mgr_handle_(), row_scanned_(0), row_inserted_(0), context_id_(0) + data_sstable_redo_writer_(), index_builder_(nullptr), start_scn_(share::SCN::min_scn()), tablet_direct_load_mgr_handle_(), row_scanned_(0), row_inserted_(0), context_id_(0) {} ~ObComplementDataContext() { destroy(); } int init(const ObComplementDataParam ¶m, const blocksstable::ObDataStoreDesc &desc); @@ -133,7 +133,8 @@ public: const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, bool &is_commited); - TO_STRING_KV(K_(is_inited), K_(complement_data_ret), K_(concurrent_cnt), KP_(index_builder), K_(tablet_direct_load_mgr_handle), K_(row_scanned), K_(row_inserted)); + TO_STRING_KV(K_(is_inited), K_(complement_data_ret), K_(concurrent_cnt), KP_(index_builder), + K_(start_scn), K_(tablet_direct_load_mgr_handle), K_(row_scanned), K_(row_inserted)); public: bool is_inited_; bool is_major_sstable_exist_; @@ -143,6 +144,7 @@ public: int64_t concurrent_cnt_; ObDDLRedoLogWriter data_sstable_redo_writer_; blocksstable::ObSSTableIndexBuilder *index_builder_; + share::SCN start_scn_; ObTabletDirectLoadMgrHandle tablet_direct_load_mgr_handle_; int64_t row_scanned_; int64_t row_inserted_; diff --git a/src/storage/ddl/ob_ddl_clog.cpp b/src/storage/ddl/ob_ddl_clog.cpp index f901a7f3f2..ec702cc5f3 100644 --- a/src/storage/ddl/ob_ddl_clog.cpp +++ b/src/storage/ddl/ob_ddl_clog.cpp @@ -244,7 +244,7 @@ int ObDDLMacroBlockClogCb::on_success() const int64_t snapshot_version = redo_info_.table_key_.get_snapshot_version(); const uint64_t data_format_version = redo_info_.data_format_version_; if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle_.get_obj(), macro_block, - snapshot_version, data_format_version))) { + snapshot_version, data_format_version, direct_load_mgr_handle_))) { LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle_), K(macro_block), K(snapshot_version), K(data_format_version)); } diff --git a/src/storage/ddl/ob_ddl_replay_executor.cpp b/src/storage/ddl/ob_ddl_replay_executor.cpp index ccd79f30b9..597b239fec 100644 --- a/src/storage/ddl/ob_ddl_replay_executor.cpp +++ b/src/storage/ddl/ob_ddl_replay_executor.cpp @@ -299,35 +299,36 @@ int ObDDLRedoReplayExecutor::do_replay_(ObTabletHandle &tablet_handle) const ObITable::TableKey &table_key = redo_info.table_key_; bool is_major_sstable_exist = false; uint64_t data_format_version = redo_info.data_format_version_; - if (data_format_version <= 0) { - // to upgrade from lower version without `data_format_version` in redo log, - // use data_format_version in start log instead. - ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *); - ObTabletDirectLoadMgrHandle direct_load_mgr_handle; - if (OB_ISNULL(tenant_direct_load_mgr)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected err", K(ret)); - } else if (OB_FAIL(tenant_direct_load_mgr->get_tablet_mgr_and_check_major( - ls_->get_ls_id(), - redo_info.table_key_.tablet_id_, - true/* is_full_direct_load */, - direct_load_mgr_handle, - is_major_sstable_exist))) { - if (OB_ENTRY_NOT_EXIST == ret && is_major_sstable_exist) { - ret = OB_SUCCESS; - LOG_INFO("major sstable already exist", K(ret), K(scn_), K(table_key)); - } else { - LOG_WARN("get tablet mgr failed", K(ret), K(table_key)); - } + + // to upgrade from lower version without `data_format_version` in redo log, + // use data_format_version in start log instead. + ObTabletDirectLoadMgrHandle direct_load_mgr_handle; + ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *); + if (OB_ISNULL(tenant_direct_load_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected err", K(ret)); + } else if (OB_FAIL(tenant_direct_load_mgr->get_tablet_mgr_and_check_major( + ls_->get_ls_id(), + redo_info.table_key_.tablet_id_, + true/* is_full_direct_load */, + direct_load_mgr_handle, + is_major_sstable_exist))) { + if (OB_ENTRY_NOT_EXIST == ret && is_major_sstable_exist) { + need_replay = false; + ret = OB_SUCCESS; + LOG_INFO("major sstable already exist, ship replay", K(ret), K(scn_), K(table_key)); } else { - data_format_version = direct_load_mgr_handle.get_obj()->get_data_format_version(); + LOG_WARN("get tablet mgr failed", K(ret), K(table_key)); } + } else if (data_format_version <= 0) { + data_format_version = direct_load_mgr_handle.get_obj()->get_data_format_version(); } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle.get_obj(), macro_block, - snapshot_version, data_format_version))) { - LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle), K(macro_block), - K(snapshot_version), K(data_format_version)); + if (OB_SUCC(ret) && need_replay) { + if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle.get_obj(), macro_block, + snapshot_version, data_format_version, direct_load_mgr_handle))) { + LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle), K(macro_block), + K(snapshot_version), K(data_format_version)); + } } } } diff --git a/src/storage/ddl/ob_ddl_struct.cpp b/src/storage/ddl/ob_ddl_struct.cpp index d9ae461acf..a92265a295 100644 --- a/src/storage/ddl/ob_ddl_struct.cpp +++ b/src/storage/ddl/ob_ddl_struct.cpp @@ -162,20 +162,20 @@ void ObDDLKVHandle::reset() } } -ObDDLKVPendingGuard::ObDDLKVPendingGuard(ObTablet *tablet, const SCN &start_scn, const SCN &scn, - const int64_t snapshot_version, const uint64_t data_format_version) +ObDDLKVPendingGuard::ObDDLKVPendingGuard(ObTablet *tablet, const SCN &scn, + ObTabletDirectLoadMgrHandle &direct_load_mgr_handle) : tablet_(tablet), scn_(scn), kv_handle_(), ret_(OB_SUCCESS) { int ret = OB_SUCCESS; ObDDLKV *curr_kv = nullptr; ObDDLKvMgrHandle ddl_kv_mgr_handle; - if (OB_UNLIKELY(nullptr == tablet || !scn.is_valid_and_not_min())) { + if (OB_UNLIKELY(nullptr == tablet || !scn.is_valid_and_not_min() || !direct_load_mgr_handle.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(tablet), K(scn)); + LOG_WARN("invalid arguments", K(ret), KP(tablet), K(scn), KPC(direct_load_mgr_handle.get_obj())); } else if (OB_FAIL(tablet->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { LOG_WARN("get ddl kv mgr failed", K(ret)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_or_create_ddl_kv(start_scn, scn, - snapshot_version, data_format_version, kv_handle_))) { + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_or_create_ddl_kv( + scn, direct_load_mgr_handle, kv_handle_))) { LOG_WARN("acquire ddl kv failed", K(ret)); } else if (OB_ISNULL(curr_kv = kv_handle_.get_obj())) { ret = OB_ERR_UNEXPECTED; @@ -219,7 +219,8 @@ int ObDDLKVPendingGuard::set_macro_block( ObTablet *tablet, const ObDDLMacroBlock ¯o_block, const int64_t snapshot_version, - const uint64_t data_format_version) + const uint64_t data_format_version, + ObTabletDirectLoadMgrHandle &direct_load_mgr_handle) { int ret = OB_SUCCESS; static const int64_t MAX_RETRY_COUNT = 10; @@ -230,8 +231,7 @@ int ObDDLKVPendingGuard::set_macro_block( int64_t try_count = 0; while ((OB_SUCCESS == ret || OB_EAGAIN == ret) && try_count < MAX_RETRY_COUNT) { ObDDLKV *ddl_kv = nullptr; - ObDDLKVPendingGuard guard(tablet, macro_block.ddl_start_scn_, macro_block.scn_, - snapshot_version, data_format_version); + ObDDLKVPendingGuard guard(tablet, macro_block.scn_, direct_load_mgr_handle); if (OB_FAIL(guard.get_ddl_kv(ddl_kv))) { LOG_WARN("get ddl kv failed", K(ret)); } else if (OB_ISNULL(ddl_kv)) { diff --git a/src/storage/ddl/ob_ddl_struct.h b/src/storage/ddl/ob_ddl_struct.h index c16c0ef111..8029c8fa04 100644 --- a/src/storage/ddl/ob_ddl_struct.h +++ b/src/storage/ddl/ob_ddl_struct.h @@ -82,7 +82,7 @@ private: class ObTablet; - +class ObTabletDirectLoadMgrHandle; class ObDDLKVPendingGuard final { public: @@ -90,10 +90,13 @@ public: ObTablet *tablet, const ObDDLMacroBlock ¯o_block, const int64_t snapshot_version, - const uint64_t data_format_version); + const uint64_t data_format_version, + ObTabletDirectLoadMgrHandle &direct_load_mgr_handle); public: - ObDDLKVPendingGuard(ObTablet *tablet, const share::SCN &start_scn, const share::SCN &scn, - const int64_t snapshot_version, const uint64_t data_format_version); + ObDDLKVPendingGuard( + ObTablet *tablet, + const share::SCN &scn, + ObTabletDirectLoadMgrHandle &direct_load_mgr_handle); ~ObDDLKVPendingGuard(); int get_ret() const { return ret_; } int get_ddl_kv(ObDDLKV *&kv); @@ -101,7 +104,6 @@ public: TO_STRING_KV(KP(tablet_), K(scn_), K(kv_handle_), K(ret_)); private: ObTablet *tablet_; - share::SCN start_scn_; share::SCN scn_; ObDDLKVHandle kv_handle_; int ret_; diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp index 0037e0d97a..b26dee89b0 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp @@ -1887,6 +1887,19 @@ int ObTabletDirectLoadMgr::wrlock(const int64_t timeout_us, uint32_t &tid) return ret; } +int ObTabletDirectLoadMgr::rdlock(const int64_t timeout_us, uint32_t &tid) +{ + int ret = OB_SUCCESS; + const int64_t abs_timeout_us = timeout_us + ObTimeUtility::current_time(); + if (OB_SUCC(lock_.rdlock(ObLatchIds::TABLET_DIRECT_LOAD_MGR_LOCK, abs_timeout_us))) { + tid = static_cast(GETTID()); + } + if (OB_TIMEOUT == ret) { + ret = OB_EAGAIN; + } + return ret; +} + void ObTabletDirectLoadMgr::unlock(const uint32_t tid) { if (OB_SUCCESS != lock_.unlock(&tid)) { diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h index 2cf4def514..3d24411059 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h @@ -340,6 +340,7 @@ public: // virtual int get_online_stat_collect_result(); virtual int wrlock(const int64_t timeout_us, uint32_t &lock_tid); + virtual int rdlock(const int64_t timeout_us, uint32_t &lock_tid); virtual void unlock(const uint32_t lock_tid); int prepare_index_builder_if_need(const ObTableSchema &table_schema); virtual int wait_notify(const ObDirectLoadSliceWriter *slice_writer, const share::SCN &start_scn); diff --git a/src/storage/ddl/ob_tablet_ddl_kv.cpp b/src/storage/ddl/ob_tablet_ddl_kv.cpp index 981cba4f9f..75befda724 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv.cpp +++ b/src/storage/ddl/ob_tablet_ddl_kv.cpp @@ -1107,6 +1107,9 @@ int ObDDLKV::set_macro_block( } else if (macro_block.scn_ > freeze_scn_) { ret = OB_EAGAIN; LOG_INFO("this ddl kv is freezed, retry other ddl kv", K(ret), K(ls_id_), K(tablet_id_), K(macro_block), K(freeze_scn_)); + } else if (OB_UNLIKELY(snapshot_version != snapshot_version_ || data_format_version != data_format_version_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected error", K(ret), K(snapshot_version), K(data_format_version), KPC(this)); } else { ObDDLMemtable *ddl_memtable = nullptr; // 1. try find the ddl memtable diff --git a/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp b/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp index 4fc874b9ba..25ac79cff9 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp +++ b/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp @@ -336,24 +336,26 @@ int ObTabletDDLKvMgr::get_active_ddl_kv_impl(ObDDLKVHandle &kv_handle) } int ObTabletDDLKvMgr::get_or_create_ddl_kv( - const share::SCN &start_scn, const share::SCN &scn, - const int64_t snapshot_version, - const uint64_t data_format_version, + ObTabletDirectLoadMgrHandle &direct_load_mgr_handle, ObDDLKVHandle &kv_handle) { int ret = OB_SUCCESS; kv_handle.reset(); + uint32_t direct_load_lock_tid = 0; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret)); } else if (!scn.is_valid_and_not_min()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(scn)); + } else if (OB_FAIL(direct_load_mgr_handle.get_obj()->rdlock(TRY_LOCK_TIMEOUT/*10s*/, direct_load_lock_tid))) { + // usually use the latest start scn to allocate kv. + LOG_WARN("lock failed", K(ret)); } else { uint32_t lock_tid = 0; // try lock to avoid hang in clog callback if (OB_FAIL(rdlock(TRY_LOCK_TIMEOUT, lock_tid))) { - LOG_WARN("failed to rdlock", K(ret), K(start_scn), KPC(this)); + LOG_WARN("failed to rdlock", K(ret), KPC(this)); } else { try_get_ddl_kv_unlock(scn, kv_handle); } @@ -364,20 +366,25 @@ int ObTabletDDLKvMgr::get_or_create_ddl_kv( if (OB_SUCC(ret) && !kv_handle.is_valid()) { uint32_t lock_tid = 0; // try lock to avoid hang in clog callback if (OB_FAIL(wrlock(TRY_LOCK_TIMEOUT, lock_tid))) { - LOG_WARN("failed to wrlock", K(ret), K(start_scn), KPC(this)); + LOG_WARN("failed to wrlock", K(ret), KPC(this)); } else { try_get_ddl_kv_unlock(scn, kv_handle); if (kv_handle.is_valid()) { // do nothing - } else if (OB_FAIL(alloc_ddl_kv(start_scn, - snapshot_version, data_format_version, kv_handle))) { - LOG_WARN("create ddl kv failed", K(ret)); + } else if (OB_FAIL(alloc_ddl_kv(direct_load_mgr_handle.get_obj()->get_start_scn(), + direct_load_mgr_handle.get_obj()->get_table_key().get_snapshot_version(), + direct_load_mgr_handle.get_obj()->get_data_format_version(), + kv_handle))) { + LOG_WARN("create ddl kv failed", K(ret), KPC(direct_load_mgr_handle.get_obj())); } } if (lock_tid != 0) { unlock(lock_tid); } } + if (direct_load_lock_tid != 0) { + direct_load_mgr_handle.get_obj()->unlock(direct_load_lock_tid); + } return ret; } diff --git a/src/storage/ddl/ob_tablet_ddl_kv_mgr.h b/src/storage/ddl/ob_tablet_ddl_kv_mgr.h index 7f0887eafc..81f5c17ba1 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv_mgr.h +++ b/src/storage/ddl/ob_tablet_ddl_kv_mgr.h @@ -40,10 +40,8 @@ public: int init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id); // init before memtable mgr int set_max_freeze_scn(const share::SCN &checkpoint_scn); int get_or_create_ddl_kv( - const share::SCN &start_scn, const share::SCN &scn, - const int64_t snapshot_version, - const uint64_t data_format_version, + ObTabletDirectLoadMgrHandle &direct_load_mgr_handle, ObDDLKVHandle &kv_handle); // used in active ddl kv guard int get_freezed_ddl_kv(const share::SCN &freeze_scn, ObDDLKVHandle &kv_handle); // locate ddl kv with exeact freeze log ts int get_ddl_kvs(const bool frozen_only, ObIArray &kv_handle_array); // get all freeze ddl kvs