diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 3182ffe608..ac096f562e 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -1979,19 +1979,29 @@ int ObRpcRemoteWriteDDLRedoLogP::process() MacroBlockId macro_block_id; ObMacroBlockHandle macro_handle; ObMacroBlockWriteInfo write_info; + ObLSService *ls_service = MTL(ObLSService*); + ObLSHandle ls_handle; + ObTabletHandle tablet_handle; + ObDDLKvMgrHandle ddl_kv_mgr_handle; // restruct write_info write_info.buffer_ = arg_.redo_info_.data_buffer_.ptr(); write_info.size_= arg_.redo_info_.data_buffer_.length(); write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE); const int64_t io_timeout_ms = max(DDL_FLUSH_MACRO_BLOCK_TIMEOUT / 1000L, GCONF._data_storage_io_timeout / 1000L); - if (OB_FAIL(ObBlockManager::async_write_block(write_info, macro_handle))) { + if (OB_FAIL(ls_service->get_ls(arg_.ls_id_, ls_handle, ObLSGetMod::OBSERVER_MOD))) { + LOG_WARN("get ls failed", K(ret), K(arg_)); + } else if (OB_FAIL(ls_handle.get_ls()->get_tablet(arg_.redo_info_.table_key_.tablet_id_, tablet_handle))) { + LOG_WARN("get tablet failed", K(ret)); + } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { + LOG_WARN("get ddl kv manager failed", K(ret)); + } else if (OB_FAIL(ObBlockManager::async_write_block(write_info, macro_handle))) { LOG_WARN("fail to async write block", K(ret), K(write_info), K(macro_handle)); } else if (OB_FAIL(macro_handle.wait(io_timeout_ms))) { LOG_WARN("fail to wait macro block io finish", K(ret)); } else if (OB_FAIL(sstable_redo_writer.init(arg_.ls_id_, arg_.redo_info_.table_key_.tablet_id_))) { LOG_WARN("init sstable redo writer", K(ret), K_(arg)); - } else if (OB_FAIL(sstable_redo_writer.write_redo_log(arg_.redo_info_, macro_handle.get_macro_id(), false/*allow remote write*/))) { + } else if (OB_FAIL(sstable_redo_writer.write_redo_log(arg_.redo_info_, macro_handle.get_macro_id(), false/*allow remote write*/, tablet_handle, ddl_kv_mgr_handle))) { LOG_WARN("fail to write macro redo", K(ret), K_(arg)); } else if (OB_FAIL(sstable_redo_writer.wait_redo_log_finish(arg_.redo_info_, macro_handle.get_macro_id()))) { @@ -2032,7 +2042,9 @@ int ObRpcRemoteWriteDDLCommitLogP::process() } else if (FALSE_IT(sstable_redo_writer.set_start_scn(arg_.start_scn_))) { } else { SCN commit_scn; - if (OB_FAIL(sstable_redo_writer.write_commit_log(table_key, + if (OB_FAIL(sstable_redo_writer.write_commit_log(tablet_handle, + ddl_kv_mgr_handle, + table_key, arg_.table_id_, arg_.execution_id_, arg_.ddl_task_id_, diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index 86c5c96421..c163f00042 100755 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -763,7 +763,7 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(ObTabletHandle &tab } else { ret = OB_SUCCESS; } - } else if (kv_mgr_handle.is_valid() && kv_mgr_handle.get_obj()->can_schedule_major_compaction()) { + } else if (kv_mgr_handle.is_valid() && kv_mgr_handle.get_obj()->can_schedule_major_compaction(tablet_handle.get_obj()->get_tablet_meta())) { ObDDLTableMergeDagParam param; if (OB_FAIL(kv_mgr_handle.get_obj()->get_ddl_major_merge_param(param))) { LOG_WARN("get ddl major merge param failed", K(ret)); diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index ac725b5fb4..3a929e72b4 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -981,6 +981,7 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan) ObDatumRow datum_row; int64_t rowkey_column_cnt = 0; const int64_t extra_rowkey_cnt = storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); + bool ddl_committed = false; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObComplementWriteTask is not inited", K(ret)); @@ -1020,7 +1021,7 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan) LOG_WARN("the dag of this task is null", K(ret)); } else if (FALSE_IT(sstable_redo_writer.set_start_scn( static_cast(get_dag())->get_context().data_sstable_redo_writer_.get_start_scn()))) { - } else if (OB_FAIL(callback.init(DDL_MB_DATA_TYPE, hidden_table_key, &sstable_redo_writer))) { + } else if (OB_FAIL(callback.init(DDL_MB_DATA_TYPE, hidden_table_key, &sstable_redo_writer, context_->ddl_kv_mgr_handle_))) { LOG_WARN("fail to init data callback", K(ret), K(hidden_table_key)); } else if (OB_FAIL(writer.open(data_desc, macro_start_seq, &callback))) { LOG_WARN("fail to open macro block writer", K(ret), K(data_desc)); @@ -1092,8 +1093,14 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan) t2 = ObTimeUtility::current_time(); get_next_row_time += t2 - t1; context_->row_scanned_++; - if (OB_FAIL(writer.append_row(datum_row))) { + if (!ddl_committed && OB_FAIL(writer.append_row(datum_row))) { LOG_WARN("fail to append row to macro block", K(ret), K(datum_row)); + if (OB_TRANS_COMMITED == ret) { + ret = OB_SUCCESS; + ddl_committed = true; + } + } + if (OB_FAIL(ret)) { } else if (OB_ISNULL(checksum_calculator = local_scan.get_checksum_calculator())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("checksum calculator is nullptr", K(ret), KP(checksum_calculator)); @@ -1116,8 +1123,15 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan) K(get_next_row_time), K(append_row_time)); ObRowReshapeUtil::free_row_reshape(allocator, reshape_ptr, 1); if (OB_FAIL(ret)) { - } else if (OB_FAIL(writer.close())) { - LOG_WARN("fail to close writer", K(ret)); + } else if (!ddl_committed && OB_FAIL(writer.close())) { + if (OB_TRANS_COMMITED == ret) { + ret = OB_SUCCESS; + ddl_committed = true; + } else { + LOG_WARN("fail to close writer", K(ret)); + } + } + if (OB_FAIL(ret)) { } else if (OB_FAIL(local_scan.get_origin_table_checksum(report_col_checksums, report_col_ids))) { LOG_WARN("fail to get origin table columns checksum", K(ret)); } else if (OB_FAIL(ObDDLChecksumOperator::update_checksum(param_->tenant_id_, @@ -1214,8 +1228,6 @@ int ObComplementMergeTask::add_build_hidden_table_sstable() { int ret = OB_SUCCESS; ObLSHandle ls_handle; - ObTablet *tablet = nullptr; - ObTabletHandle tablet_handle; ObITable::TableKey hidden_table_key; SCN commit_scn; if (OB_UNLIKELY(!is_inited_)) { @@ -1228,49 +1240,11 @@ int ObComplementMergeTask::add_build_hidden_table_sstable() LOG_WARN("error unexpected", K(ret), KP(param_), KP(context_)); } else if (OB_FAIL(MTL(ObLSService *)->get_ls(param_->ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { LOG_WARN("failed to get log stream", K(ret), K(param_->ls_id_)); - } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, param_->dest_tablet_id_, tablet_handle))) { - LOG_WARN("failed to get tablet", K(ret), K(param_->ls_id_), K(param_->dest_tablet_id_)); - } else if (OB_ISNULL(tablet = tablet_handle.get_obj())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("tablet is null", K(ret), K(param_->dest_tablet_id_)); } else if (OB_FAIL(param_->get_hidden_table_key(hidden_table_key))) { LOG_WARN("fail to get hidden table key", K(ret), K(hidden_table_key)); - } else if (OB_UNLIKELY(!hidden_table_key.is_valid())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("hidden table key is invalid", K(ret), K(hidden_table_key)); - } - - if (OB_FAIL(ret)) { - } else if (OB_FAIL(context_->data_sstable_redo_writer_.write_commit_log(hidden_table_key, - param_->dest_table_id_, - param_->execution_id_, - param_->task_id_, - commit_scn))) { - if (OB_TASK_EXPIRED == ret) { - LOG_INFO("ddl task expired", K(ret), K(hidden_table_key), KPC(param_)); - } else { - LOG_WARN("fail write ddl commit log", K(ret), K(hidden_table_key)); - } - } else { - ObTabletHandle new_tablet_handle; // no use here - ObDDLKvMgrHandle ddl_kv_mgr_handle; - const ObLSID &ls_id = param_->ls_id_; - const ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_; - const SCN &ddl_start_scn = static_cast(get_dag())->get_context().data_sstable_redo_writer_.get_start_scn(); - const uint64_t table_id = param_->dest_table_id_; - const int64_t ddl_task_id = param_->task_id_; - if (OB_FAIL(tablet->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { - LOG_WARN("get ddl kv manager failed", K(ret)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(ddl_start_scn, - commit_scn, - table_id, - ddl_task_id))) { - LOG_WARN("commit ddl log failed", K(ret), K(ls_id), K(tablet_id), K(commit_scn), K(hidden_table_key), - K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn()); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_merge_success(ddl_start_scn, commit_scn))) { - LOG_WARN("wait ddl merge failed", K(ret), K(ls_id), K(tablet_id), K(hidden_table_key), - K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn()); - } + } else if (OB_FAIL(context_->data_sstable_redo_writer_.end_ddl_redo_and_create_ddl_sstable( + ls_handle, hidden_table_key, param_->dest_table_id_, param_->execution_id_, param_->task_id_))) { + LOG_WARN("failed to end ddl redo", K(ret)); } return ret; } diff --git a/src/storage/ddl/ob_ddl_clog.cpp b/src/storage/ddl/ob_ddl_clog.cpp index 6ccb584a0a..30c5f43b70 100644 --- a/src/storage/ddl/ob_ddl_clog.cpp +++ b/src/storage/ddl/ob_ddl_clog.cpp @@ -66,22 +66,83 @@ void ObDDLClogCb::try_release() } } +ObDDLStartClogCb::ObDDLStartClogCb() + : is_inited_(false), status_(), lock_tid_(0), ddl_kv_mgr_handle_() +{ +} + +int ObDDLStartClogCb::init(const uint32_t lock_tid, ObDDLKvMgrHandle &ddl_kv_mgr_handle) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + LOG_WARN("init twice", K(ret)); + } else if (OB_UNLIKELY(0 == lock_tid || !ddl_kv_mgr_handle.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret)); + } else { + lock_tid_ = lock_tid; + ddl_kv_mgr_handle_ = ddl_kv_mgr_handle; + is_inited_ = true; + } + return ret; +} + +int ObDDLStartClogCb::on_success() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else { + ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_); + } + status_.set_ret_code(ret); + status_.set_state(STATE_SUCCESS); + try_release(); + return OB_SUCCESS; // force return success +} + +int ObDDLStartClogCb::on_failure() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else { + ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_); + } + status_.set_state(STATE_FAILED); + try_release(); + return OB_SUCCESS; +} + +void ObDDLStartClogCb::try_release() +{ + if (status_.try_set_release_flag()) { + } else { + op_free(this); + } +} + ObDDLMacroBlockClogCb::ObDDLMacroBlockClogCb() : is_inited_(false), status_(), ls_id_(), redo_info_(), macro_block_id_(), - arena_("ddl_clog_cb", OB_MALLOC_BIG_BLOCK_SIZE), data_buffer_lock_(), is_data_buffer_freed_(false) + arena_("ddl_clog_cb", OB_MALLOC_BIG_BLOCK_SIZE), data_buffer_lock_(), is_data_buffer_freed_(false), lock_tid_(0), ddl_kv_mgr_handle_() { } int ObDDLMacroBlockClogCb::init(const share::ObLSID &ls_id, const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, - const blocksstable::MacroBlockId ¯o_block_id) + const blocksstable::MacroBlockId ¯o_block_id, + const uint32_t lock_tid, + ObDDLKvMgrHandle &ddl_kv_mgr_handle) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); - } else if (OB_UNLIKELY(!ls_id.is_valid() || !redo_info.is_valid() || !macro_block_id.is_valid())) { + } else if (OB_UNLIKELY(!ls_id.is_valid() || !redo_info.is_valid() || !macro_block_id.is_valid() || 0 == lock_tid || !ddl_kv_mgr_handle.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(ls_id), K(redo_info), K(macro_block_id)); } else { @@ -97,6 +158,8 @@ int ObDDLMacroBlockClogCb::init(const share::ObLSID &ls_id, redo_info_.start_scn_ = redo_info.start_scn_; ls_id_ = ls_id; macro_block_id_ = macro_block_id; + lock_tid_ = lock_tid; + ddl_kv_mgr_handle_ = ddl_kv_mgr_handle; } } return ret; @@ -120,6 +183,7 @@ int ObDDLMacroBlockClogCb::on_success() ObDDLMacroBlock macro_block; ObLSHandle ls_handle; ObTabletHandle tablet_handle; + ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_); // unlock first, because set_macro_block need to acquire this lock { ObSpinLockGuard data_buffer_guard(data_buffer_lock_); if (is_data_buffer_freed_) { @@ -150,32 +214,38 @@ int ObDDLMacroBlockClogCb::on_success() int ObDDLMacroBlockClogCb::on_failure() { + ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_); status_.set_state(STATE_FAILED); try_release(); return OB_SUCCESS; } ObDDLCommitClogCb::ObDDLCommitClogCb() - : is_inited_(false), status_(), ls_id_(), tablet_id_(), start_scn_(SCN::min_scn()) + : is_inited_(false), status_(), ls_id_(), tablet_id_(), start_scn_(SCN::min_scn()), lock_tid_(0), ddl_kv_mgr_handle_() { } int ObDDLCommitClogCb::init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, - const share::SCN &start_scn) + const share::SCN &start_scn, + const uint32_t lock_tid, + ObDDLKvMgrHandle &ddl_kv_mgr_handle) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); - } else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || !start_scn.is_valid_and_not_min())) { + } else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || !start_scn.is_valid_and_not_min() + || 0 == lock_tid || !ddl_kv_mgr_handle.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_id), K(start_scn)); + LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_id), K(start_scn), K(lock_tid)); } else { ls_id_ = ls_id; tablet_id_ = tablet_id; start_scn_ = start_scn; + lock_tid_ = lock_tid; + ddl_kv_mgr_handle_ = ddl_kv_mgr_handle; is_inited_ = true; } return ret; @@ -184,6 +254,8 @@ int ObDDLCommitClogCb::init(const share::ObLSID &ls_id, int ObDDLCommitClogCb::on_success() { int ret = OB_SUCCESS; + ddl_kv_mgr_handle_.get_obj()->set_commit_scn_nolock(__get_scn()); + ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_); status_.set_ret_code(ret); status_.set_state(STATE_SUCCESS); try_release(); @@ -192,6 +264,8 @@ int ObDDLCommitClogCb::on_success() int ObDDLCommitClogCb::on_failure() { + int ret = OB_SUCCESS; + ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_); status_.set_state(STATE_FAILED); try_release(); return OB_SUCCESS; diff --git a/src/storage/ddl/ob_ddl_clog.h b/src/storage/ddl/ob_ddl_clog.h index b4d7db99ea..c52e2ec19c 100644 --- a/src/storage/ddl/ob_ddl_clog.h +++ b/src/storage/ddl/ob_ddl_clog.h @@ -16,6 +16,7 @@ #include "storage/ob_i_table.h" #include "storage/blocksstable/ob_block_sstable_struct.h" #include "storage/blocksstable/ob_index_block_builder.h" +#include "storage/meta_mem/ob_tablet_pointer.h" #include "logservice/ob_append_callback.h" namespace oceanbase @@ -74,6 +75,27 @@ private: ObDDLClogCbStatus status_; }; +class ObDDLStartClogCb : public logservice::AppendCb +{ +public: + ObDDLStartClogCb(); + virtual ~ObDDLStartClogCb() = default; + int init(const uint32_t lock_tid, ObDDLKvMgrHandle &ddl_kv_mgr_handle); + virtual int on_success() override; + virtual int on_failure() override; + inline bool is_success() const { return status_.is_success(); } + inline bool is_failed() const { return status_.is_failed(); } + inline bool is_finished() const { return status_.is_finished(); } + int get_ret_code() const { return status_.get_ret_code(); } + void try_release(); + TO_STRING_KV(K(is_inited_), K(status_), K_(lock_tid)); +private: + bool is_inited_; + ObDDLClogCbStatus status_; + uint32_t lock_tid_; + ObDDLKvMgrHandle ddl_kv_mgr_handle_; +}; + class ObDDLMacroBlockClogCb : public logservice::AppendCb { public: @@ -81,7 +103,9 @@ public: virtual ~ObDDLMacroBlockClogCb() = default; int init(const share::ObLSID &ls_id, const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, - const blocksstable::MacroBlockId ¯o_block_id); + const blocksstable::MacroBlockId ¯o_block_id, + const uint32_t lock_tid, + ObDDLKvMgrHandle &ddl_kv_mgr_handle); virtual int on_success() override; virtual int on_failure() override; inline bool is_success() const { return status_.is_success(); } @@ -98,6 +122,8 @@ private: ObArenaAllocator arena_; ObSpinLock data_buffer_lock_; bool is_data_buffer_freed_; + uint32_t lock_tid_; + ObDDLKvMgrHandle ddl_kv_mgr_handle_; }; class ObDDLCommitClogCb : public logservice::AppendCb @@ -107,7 +133,9 @@ public: virtual ~ObDDLCommitClogCb() = default; int init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, - const share::SCN &start_scn); + const share::SCN &start_scn, + const uint32_t lock_tid, + ObDDLKvMgrHandle &ddl_kv_mgr_handle); virtual int on_success() override; virtual int on_failure() override; inline bool is_success() const { return status_.is_success(); } @@ -115,13 +143,15 @@ public: inline bool is_finished() const { return status_.is_finished(); } int get_ret_code() const { return status_.get_ret_code(); } void try_release(); - TO_STRING_KV(K(is_inited_), K(status_), K(ls_id_), K(tablet_id_), K(start_scn_)); + TO_STRING_KV(K(is_inited_), K(status_), K(ls_id_), K(tablet_id_), K(start_scn_), K_(lock_tid)); private: bool is_inited_; ObDDLClogCbStatus status_; share::ObLSID ls_id_; common::ObTabletID tablet_id_; share::SCN start_scn_; + uint32_t lock_tid_; + ObDDLKvMgrHandle ddl_kv_mgr_handle_; }; class ObDDLClogHeader final diff --git a/src/storage/ddl/ob_ddl_merge_task.cpp b/src/storage/ddl/ob_ddl_merge_task.cpp index ebdd674292..a63b6a2614 100644 --- a/src/storage/ddl/ob_ddl_merge_task.cpp +++ b/src/storage/ddl/ob_ddl_merge_task.cpp @@ -480,7 +480,7 @@ int ObTabletDDLUtil::check_data_integrity(const ObTablesHandleArray &ddl_sstable } ObTabletDDLParam::ObTabletDDLParam() - : tenant_id_(0), ls_id_(), table_key_(), start_scn_(SCN::min_scn()), snapshot_version_(0), cluster_version_(0) + : tenant_id_(0), ls_id_(), table_key_(), start_scn_(SCN::min_scn()), commit_scn_(SCN::min_scn()), snapshot_version_(0), cluster_version_(0) { } @@ -496,6 +496,7 @@ bool ObTabletDDLParam::is_valid() const && ls_id_.is_valid() && table_key_.is_valid() && start_scn_.is_valid_and_not_min() + && commit_scn_.is_valid() && commit_scn_ != SCN::max_scn() && snapshot_version_ > 0 && cluster_version_ >= 0; } @@ -743,6 +744,7 @@ int ObTabletDDLUtil::update_ddl_table_store(ObSSTableIndexBuilder *sstable_index ddl_param.table_key_.is_major_sstable()); // need report checksum table_store_param.ddl_info_.keep_old_ddl_sstable_ = !ddl_param.table_key_.is_major_sstable(); table_store_param.ddl_info_.ddl_cluster_version_ = ddl_param.cluster_version_; + table_store_param.ddl_info_.ddl_commit_scn_ = ddl_param.commit_scn_; if (OB_FAIL(ls_handle.get_ls()->update_tablet_table_store(ddl_param.table_key_.get_tablet_id(), table_store_param, new_tablet_handle))) { LOG_WARN("failed to update tablet table store", K(ret), K(ddl_param.table_key_), K(table_store_param)); } else { diff --git a/src/storage/ddl/ob_ddl_merge_task.h b/src/storage/ddl/ob_ddl_merge_task.h index 1062593b73..63347e0128 100644 --- a/src/storage/ddl/ob_ddl_merge_task.h +++ b/src/storage/ddl/ob_ddl_merge_task.h @@ -131,12 +131,13 @@ public: ObTabletDDLParam(); ~ObTabletDDLParam(); bool is_valid() const; - TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(table_key), K_(start_scn), K_(snapshot_version), K_(cluster_version)); + TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(table_key), K_(start_scn), K_(commit_scn), K_(snapshot_version), K_(cluster_version)); public: uint64_t tenant_id_; share::ObLSID ls_id_; ObITable::TableKey table_key_; share::SCN start_scn_; + share::SCN commit_scn_; int64_t snapshot_version_; int64_t cluster_version_; }; diff --git a/src/storage/ddl/ob_ddl_redo_log_replayer.cpp b/src/storage/ddl/ob_ddl_redo_log_replayer.cpp index 63f00dc151..40c66b5ce0 100644 --- a/src/storage/ddl/ob_ddl_redo_log_replayer.cpp +++ b/src/storage/ddl/ob_ddl_redo_log_replayer.cpp @@ -76,7 +76,8 @@ int ObDDLRedoLogReplayer::replay_start(const ObDDLStartLog &log, const SCN &scn) LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle)); } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle, true/*try_create*/))) { LOG_WARN("create ddl kv mgr failed", K(ret), K(table_key), K(log), K(scn)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(table_key, + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(*tablet_handle.get_obj(), + table_key, scn, log.get_cluster_version(), log.get_execution_id(), @@ -183,6 +184,8 @@ int ObDDLRedoLogReplayer::replay_commit(const ObDDLCommitLog &log, const SCN &sc LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle), K(log), K(scn)); } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { LOG_WARN("get ddl kv mgr failed", K(ret), K(log), K(scn)); + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->set_commit_scn(scn))) { + LOG_WARN("failed to start prepare", K(ret), K(log), K(scn)); } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(log.get_start_scn(), scn))) { if (OB_TABLET_NOT_EXIST == ret || OB_TASK_EXPIRED == ret) { ret = OB_SUCCESS; // exit when tablet not exist or task expired diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.cpp b/src/storage/ddl/ob_ddl_redo_log_writer.cpp index 9aef15ba5d..3ae8438d10 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.cpp +++ b/src/storage/ddl/ob_ddl_redo_log_writer.cpp @@ -555,6 +555,8 @@ int ObDDLRedoLogWriter::init() } int ObDDLRedoLogWriter::write( + ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle, const ObDDLRedoLog &log, const uint64_t tenant_id, const share::ObLSID &ls_id, @@ -578,13 +580,22 @@ int ObDDLRedoLogWriter::write( const bool need_nonblock= false; SCN base_scn = SCN::min_scn(); SCN scn; + uint32_t lock_tid = 0; if (!log.is_valid() || nullptr == log_handler || !ls_id.is_valid() || OB_INVALID_TENANT_ID == tenant_id || nullptr == buffer) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(log), K(ls_id), K(tenant_id), KP(buffer)); + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->rdlock(ObDDLRedoLogHandle::DDL_REDO_LOG_TIMEOUT, lock_tid))) { + LOG_WARN("failed to wrlock", K(ret)); + } else if (ddl_kv_mgr_handle.get_obj()->get_commit_scn_nolock(tablet_handle.get_obj()->get_tablet_meta()).is_valid_and_not_min()) { + ret = OB_TRANS_COMMITED; + LOG_WARN("already commit", K(ret)); + } else if (ddl_kv_mgr_handle.get_obj()->get_start_scn() != log.get_redo_info().start_scn_) { + ret = OB_TASK_EXPIRED; + LOG_WARN("restarted", K(ret)); } else if (OB_ISNULL(cb = op_alloc(ObDDLMacroBlockClogCb))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", K(ret)); - } else if (OB_FAIL(cb->init(ls_id, log.get_redo_info(), macro_block_id))) { + } else if (OB_FAIL(cb->init(ls_id, log.get_redo_info(), macro_block_id, lock_tid, ddl_kv_mgr_handle))) { LOG_WARN("init ddl clog callback failed", K(ret)); } else if (OB_FAIL(base_header.serialize(buffer, buffer_size, pos))) { LOG_WARN("failed to serialize log base header", K(ret)); @@ -603,6 +614,7 @@ int ObDDLRedoLogWriter::write( } else { handle.cb_ = cb; cb = nullptr; + lock_tid = 0; handle.scn_ = scn; LOG_INFO("submit ddl redo log succeed", K(lsn), K(base_scn), K(scn)); } @@ -618,10 +630,14 @@ int ObDDLRedoLogWriter::write( cb = nullptr; } } + if (0 != lock_tid) { + ddl_kv_mgr_handle.get_obj()->unlock(lock_tid); + } return ret; } -int ObDDLRedoLogWriter::write_ddl_start_log(ObDDLKvMgrHandle &ddl_kv_mgr_handle, +int ObDDLRedoLogWriter::write_ddl_start_log(ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle, const ObDDLStartLog &log, ObLogHandler *log_handler, SCN &start_scn) @@ -637,19 +653,37 @@ int ObDDLRedoLogWriter::write_ddl_start_log(ObDDLKvMgrHandle &ddl_kv_mgr_handle, + log.get_serialize_size(); char buffer[buffer_size]; int64_t pos = 0; - ObDDLClogCb *cb = nullptr; + ObDDLStartClogCb *cb = nullptr; palf::LSN lsn; const bool need_nonblock= false; SCN scn = SCN::min_scn(); bool is_external_consistent = false; ObBucketHashWLockGuard guard(bucket_lock_, log.get_table_key().get_tablet_id().hash()); - if (ddl_kv_mgr_handle.get_obj()->is_execution_id_older(log.get_execution_id())) { + uint32_t lock_tid = 0; + if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wrlock(ObDDLRedoLogHandle::DDL_REDO_LOG_TIMEOUT, lock_tid))) { + LOG_WARN("failed to wrlock", K(ret)); + } else if (ddl_kv_mgr_handle.get_obj()->is_execution_id_older(log.get_execution_id())) { ret = OB_TASK_EXPIRED; LOG_INFO("receive a old execution id, don't do ddl start", K(ret), K(log)); - } else if (OB_ISNULL(cb = op_alloc(ObDDLClogCb))) { + } else if (ddl_kv_mgr_handle.get_obj()->get_commit_scn_nolock(tablet_handle.get_obj()->get_tablet_meta()).is_valid_and_not_min()) { + start_scn = ddl_kv_mgr_handle.get_obj()->get_start_scn(); + if (!start_scn.is_valid_and_not_min()) { + start_scn = tablet_handle.get_obj()->get_tablet_meta().ddl_start_scn_; + } + if (!start_scn.is_valid_and_not_min()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("start scn must be valid after commit", K(ret), K(start_scn)); + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->set_execution_id_nolock(log.get_execution_id()))) { + LOG_WARN("failed to set execution id", K(ret)); + } else { + LOG_INFO("already committed, use previous start scn", K(ret), K(tablet_handle.get_obj()->get_tablet_meta())); + } + } else if (OB_ISNULL(cb = op_alloc(ObDDLStartClogCb))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", K(ret)); + } else if (OB_FAIL(cb->init(lock_tid, ddl_kv_mgr_handle))) { + LOG_WARN("failed to init cb", K(ret)); } else if (OB_FAIL(base_header.serialize(buffer, buffer_size, pos))) { LOG_WARN("failed to serialize log base header", K(ret)); } else if (OB_FAIL(ddl_header.serialize(buffer, buffer_size, pos))) { @@ -669,8 +703,9 @@ int ObDDLRedoLogWriter::write_ddl_start_log(ObDDLKvMgrHandle &ddl_kv_mgr_handle, LOG_INFO("overwrite return to OB_NOT_MASTER"); } } else { - ObDDLClogCb *tmp_cb = cb; + ObDDLStartClogCb *tmp_cb = cb; cb = nullptr; + lock_tid = 0; bool finish = false; const int64_t start_time = ObTimeUtility::current_time(); while (OB_SUCC(ret) && !finish) { @@ -691,7 +726,8 @@ int ObDDLRedoLogWriter::write_ddl_start_log(ObDDLKvMgrHandle &ddl_kv_mgr_handle, } if (OB_SUCC(ret)) { start_scn = scn; - if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(log.get_table_key(), + if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(*tablet_handle.get_obj(), + log.get_table_key(), start_scn, log.get_cluster_version(), log.get_execution_id(), @@ -707,11 +743,16 @@ int ObDDLRedoLogWriter::write_ddl_start_log(ObDDLKvMgrHandle &ddl_kv_mgr_handle, cb = nullptr; } } + if (0 != lock_tid) { + ddl_kv_mgr_handle.get_obj()->unlock(lock_tid); + } return ret; } template -int ObDDLRedoLogWriter::write_ddl_finish_log(const T &log, +int ObDDLRedoLogWriter::write_ddl_commit_log(ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle, + const T &log, const ObDDLClogType clog_type, const share::ObLSID &ls_id, ObLogHandler *log_handler, @@ -735,15 +776,23 @@ int ObDDLRedoLogWriter::write_ddl_finish_log(const T &log, SCN base_scn = SCN::min_scn(); SCN scn = SCN::min_scn(); bool is_external_consistent = false; - if (OB_ISNULL(buffer = static_cast(ob_malloc(buffer_size)))) { + uint32_t lock_tid = 0; + if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wrlock(ObDDLRedoLogHandle::DDL_REDO_LOG_TIMEOUT, lock_tid))) { + LOG_WARN("failed to wrlock", K(ret)); + } else if (ddl_kv_mgr_handle.get_obj()->get_start_scn() != log.get_start_scn()) { + ret = OB_TASK_EXPIRED; + LOG_WARN("restarted", K(ret)); + } else if (ddl_kv_mgr_handle.get_obj()->get_commit_scn_nolock(tablet_handle.get_obj()->get_tablet_meta()).is_valid_and_not_min()) { + ret = OB_TRANS_COMMITED; + LOG_WARN("already committed", K(ret), K(log)); + } else if (OB_ISNULL(buffer = static_cast(ob_malloc(buffer_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", K(ret)); } else if (OB_ISNULL(cb = op_alloc(ObDDLCommitClogCb))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", K(ret)); - } else if (ObDDLClogType::DDL_COMMIT_LOG == clog_type && - OB_FAIL(cb->init(ls_id, log.get_table_key().tablet_id_, log.get_start_scn()))) { - LOG_WARN("init ddl commit log callback failed", K(ret), K(ls_id), K(log)); + } else if (OB_FAIL(cb->init(ls_id, log.get_table_key().tablet_id_, log.get_start_scn(), lock_tid, ddl_kv_mgr_handle))) { + LOG_WARN("init ddl commit log callback failed", K(ret), K(ls_id), K(log)); } else if (OB_FAIL(base_header.serialize(buffer, buffer_size, pos))) { LOG_WARN("failed to serialize log base header", K(ret)); } else if (OB_FAIL(ddl_header.serialize(buffer, buffer_size, pos))) { @@ -763,6 +812,7 @@ int ObDDLRedoLogWriter::write_ddl_finish_log(const T &log, } else { ObDDLCommitClogCb *tmp_cb = cb; cb = nullptr; + lock_tid = 0; bool need_retry = true; while (need_retry) { if (OB_FAIL(OB_TS_MGR.wait_gts_elapse(MTL_ID(), scn))) { @@ -787,6 +837,9 @@ int ObDDLRedoLogWriter::write_ddl_finish_log(const T &log, ob_free(buffer); buffer = nullptr; } + if (0 != lock_tid) { + ddl_kv_mgr_handle.get_obj()->unlock(lock_tid); + } if (OB_FAIL(ret)) { if (nullptr != cb) { op_free(cb); @@ -891,7 +944,9 @@ void ObDDLCommitLogHandle::reset() } } -int ObDDLMacroBlockRedoWriter::write_macro_redo(const ObDDLMacroBlockRedoInfo &redo_info, +int ObDDLMacroBlockRedoWriter::write_macro_redo(ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle, + const ObDDLMacroBlockRedoInfo &redo_info, const share::ObLSID &ls_id, ObLogHandler *log_handler, const blocksstable::MacroBlockId ¯o_block_id, @@ -907,7 +962,7 @@ int ObDDLMacroBlockRedoWriter::write_macro_redo(const ObDDLMacroBlockRedoInfo &r const uint64_t tenant_id = MTL_ID(); if (OB_FAIL(log.init(redo_info))) { LOG_WARN("fail to init DDLRedoLog", K(ret), K(redo_info)); - } else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write(log, tenant_id, ls_id, log_handler, macro_block_id, buffer, handle))) { + } else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write(tablet_handle, ddl_kv_mgr_handle, log, tenant_id, ls_id, log_handler, macro_block_id, buffer, handle))) { LOG_WARN("fail to write ddl redo log item", K(ret)); } } @@ -989,7 +1044,7 @@ int ObDDLSSTableRedoWriter::start_ddl_redo(const ObITable::TableKey &table_key, LOG_WARN("get tablet handle failed", K(ret), K(ls_id_), K(tablet_id_)); } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle, true/*try_create*/))) { LOG_WARN("create ddl kv mgr failed", K(ret)); - } else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write_ddl_start_log(ddl_kv_mgr_handle, log, ls->get_log_handler(), tmp_scn))) { + } else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write_ddl_start_log(tablet_handle, ddl_kv_mgr_handle, log, ls->get_log_handler(), tmp_scn))) { LOG_WARN("fail to write ddl start log", K(ret), K(table_key)); } else if (FALSE_IT(set_start_scn(tmp_scn))) { } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->register_to_tablet(get_start_scn(), ddl_kv_mgr_handle))) { @@ -998,7 +1053,73 @@ int ObDDLSSTableRedoWriter::start_ddl_redo(const ObITable::TableKey &table_key, return ret; } -int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_info, const blocksstable::MacroBlockId ¯o_block_id, const bool allow_remote_write) +int ObDDLSSTableRedoWriter::end_ddl_redo_and_create_ddl_sstable( + ObLSHandle &ls_handle, + const ObITable::TableKey &table_key, + const uint64_t table_id, + const int64_t execution_id, + const int64_t ddl_task_id) +{ + int ret = OB_SUCCESS; + ObTabletHandle tablet_handle; + ObDDLKvMgrHandle ddl_kv_mgr_handle; + const ObTabletID &tablet_id = table_key.tablet_id_; + ObLS *ls = nullptr; + ObLSID ls_id; + SCN ddl_start_scn = get_start_scn(); + SCN commit_scn = SCN::min_scn(); + if (OB_ISNULL(ls = ls_handle.get_ls()) || OB_UNLIKELY(!table_key.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid ls", K(ret), K(table_key)); + } else if (OB_FALSE_IT(ls_id = ls->get_ls_id())) { + } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, tablet_id, tablet_handle))) { + LOG_WARN("get tablet failed", K(ret)); + } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { + LOG_WARN("get ddl kv manager failed", K(ret), K(ls_id), K(tablet_id)); + } else if (OB_FAIL(write_commit_log(tablet_handle, ddl_kv_mgr_handle, table_key, table_id, execution_id, ddl_task_id, commit_scn))) { + if (OB_TASK_EXPIRED == ret) { + LOG_INFO("ddl task expired", K(ret), K(table_key), K(table_id), K(execution_id), K(ddl_task_id)); + } else { + LOG_WARN("fail write ddl commit log", K(ret), K(table_key)); + } + } + + if (OB_TRANS_COMMITED == ret) { + commit_scn = ddl_kv_mgr_handle.get_obj()->get_commit_scn(tablet_handle.get_obj()->get_tablet_meta()); + if (!commit_scn.is_valid_and_not_min()) { + ret = OB_EAGAIN; + LOG_WARN("committed on leader but not committed on me, retry", K(ret), K(ddl_start_scn), K(commit_scn), K(table_id), K(execution_id), K(ddl_task_id)); + } else { + ret = OB_SUCCESS; + ddl_start_scn = ddl_kv_mgr_handle.get_obj()->get_start_scn(); + set_start_scn(ddl_start_scn); + } + } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(ddl_start_scn, + commit_scn, + table_id, + ddl_task_id))) { + if (OB_TASK_EXPIRED == ret) { + LOG_INFO("ddl task expired", K(ret), K(ls_id), K(tablet_id), + K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn()); + } else { + LOG_WARN("failed to do ddl kv commit", K(ret), K(ddl_start_scn), K(commit_scn)); + } + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_merge_success(ddl_start_scn, commit_scn))) { + if (OB_TASK_EXPIRED == ret) { + LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id), + K(ddl_start_scn), "new_ddl_start_scn", + ddl_kv_mgr_handle.get_obj()->get_start_scn()); + } else { + LOG_WARN("failed to wait ddl merge", K(ret), K(ddl_start_scn)); + } + } + return ret; +} + +int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_info, const blocksstable::MacroBlockId ¯o_block_id, const bool allow_remote_write, ObTabletHandle &tablet_handle, ObDDLKvMgrHandle &ddl_kv_mgr_handle) { int ret = OB_SUCCESS; ObLSHandle ls_handle; @@ -1019,7 +1140,7 @@ int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_i ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory failed", K(ret), K(BUF_SIZE)); } else if (!remote_write_) { - if (OB_FAIL(ObDDLMacroBlockRedoWriter::write_macro_redo(redo_info, ls->get_ls_id(), ls->get_log_handler(), macro_block_id, buffer_, ddl_redo_handle_))) { + if (OB_FAIL(ObDDLMacroBlockRedoWriter::write_macro_redo(tablet_handle, ddl_kv_mgr_handle, redo_info, ls->get_ls_id(), ls->get_log_handler(), macro_block_id, buffer_, ddl_redo_handle_))) { if (ObDDLUtil::need_remote_write(ret) && allow_remote_write) { if (OB_FAIL(switch_to_remote_write())) { LOG_WARN("fail to switch to remote write", K(ret)); @@ -1060,7 +1181,9 @@ int ObDDLSSTableRedoWriter::wait_redo_log_finish(const ObDDLMacroBlockRedoInfo & return ret; } -int ObDDLSSTableRedoWriter::write_commit_log(const ObITable::TableKey &table_key, +int ObDDLSSTableRedoWriter::write_commit_log(ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle, + const ObITable::TableKey &table_key, const int64_t table_id, const int64_t execution_id, const int64_t ddl_task_id, @@ -1095,7 +1218,7 @@ int ObDDLSSTableRedoWriter::write_commit_log(const ObITable::TableKey &table_key ret = OB_ERR_UNEXPECTED; LOG_ERROR("ls should not be null", K(ret), K(table_key)); } else if (!remote_write_) { - if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write_ddl_finish_log(log, ObDDLClogType::DDL_COMMIT_LOG, ls_id_, ls->get_log_handler(), handle))) { + if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write_ddl_commit_log(tablet_handle, ddl_kv_mgr_handle, log, ObDDLClogType::DDL_COMMIT_LOG, ls_id_, ls->get_log_handler(), handle))) { if (ObDDLUtil::need_remote_write(ret)) { if (OB_FAIL(switch_to_remote_write())) { LOG_WARN("fail to switch to remote write", K(ret), K(table_key)); @@ -1178,22 +1301,34 @@ ObDDLRedoLogWriterCallback::~ObDDLRedoLogWriterCallback() int ObDDLRedoLogWriterCallback::init(const ObDDLMacroBlockType block_type, const ObITable::TableKey &table_key, - ObDDLSSTableRedoWriter *ddl_writer) + ObDDLSSTableRedoWriter *ddl_writer, + ObDDLKvMgrHandle &ddl_kv_mgr_handle) { int ret = OB_SUCCESS; ObLS *ls = nullptr; ObLSService *ls_service = nullptr; bool is_cache_hit = false; + ObLSHandle ls_handle; + ObTabletHandle tablet_handle; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("ObDDLSSTableRedoWriter has been inited twice", K(ret)); - } else if (OB_UNLIKELY(!table_key.is_valid() || nullptr == ddl_writer || DDL_MB_INVALID_TYPE == block_type)) { + } else if (OB_UNLIKELY(!table_key.is_valid() || nullptr == ddl_writer || DDL_MB_INVALID_TYPE == block_type || !ddl_kv_mgr_handle.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(table_key), K(block_type)); + } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ddl_kv_mgr_handle.get_obj()->get_ls_id(), ls_handle, ObLSGetMod::DDL_MOD))) { + LOG_WARN("failed to get log stream", K(ret), KPC(ddl_kv_mgr_handle.get_obj())); + } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, + ddl_kv_mgr_handle.get_obj()->get_tablet_id(), + tablet_handle, + ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { + LOG_WARN("get tablet handle failed", K(ret), KPC(ddl_kv_mgr_handle.get_obj())); } else { block_type_ = block_type; table_key_ = table_key; ddl_writer_ = ddl_writer; + tablet_handle_ = tablet_handle; + ddl_kv_mgr_handle_ = ddl_kv_mgr_handle; is_inited_ = true; } return ret; @@ -1217,7 +1352,7 @@ int ObDDLRedoLogWriterCallback::write(const ObMacroBlockHandle ¯o_handle, redo_info_.block_type_ = block_type_; redo_info_.logic_id_ = logic_id; redo_info_.start_scn_ = ddl_writer_->get_start_scn(); - if (OB_FAIL(ddl_writer_->write_redo_log(redo_info_, macro_block_id_, true/*allow remote write*/))) { + if (OB_FAIL(ddl_writer_->write_redo_log(redo_info_, macro_block_id_, true/*allow remote write*/, tablet_handle_, ddl_kv_mgr_handle_))) { LOG_WARN("fail to write ddl redo log", K(ret)); } } diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.h b/src/storage/ddl/ob_ddl_redo_log_writer.h index 9cab021739..6365bcf32e 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.h +++ b/src/storage/ddl/ob_ddl_redo_log_writer.h @@ -188,19 +188,24 @@ class ObDDLRedoLogWriter final public: static ObDDLRedoLogWriter &get_instance(); int init(); - int write(const ObDDLRedoLog &log, + int write(ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle, + const ObDDLRedoLog &log, const uint64_t tenant_id, const share::ObLSID &ls_id, logservice::ObLogHandler *log_handler, const blocksstable::MacroBlockId ¯o_block_id, char *buffer, ObDDLRedoLogHandle &handle); - int write_ddl_start_log(ObDDLKvMgrHandle &ddl_kv_mgr_handle, + int write_ddl_start_log(ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle, const ObDDLStartLog &log, logservice::ObLogHandler *log_handler, share::SCN &start_scn); template - int write_ddl_finish_log(const T &log, + int write_ddl_commit_log(ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle, + const T &log, const ObDDLClogType clog_type, const share::ObLSID &ls_id, logservice::ObLogHandler *log_handler, @@ -225,7 +230,9 @@ private: class ObDDLMacroBlockRedoWriter final { public: - static int write_macro_redo(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, + static int write_macro_redo(ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle, + const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, const share::ObLSID &ls_id, logservice::ObLogHandler *log_handler, const blocksstable::MacroBlockId ¯o_block_id, @@ -250,12 +257,21 @@ public: const int64_t execution_id, const int64_t ddl_cluster_version, ObDDLKvMgrHandle &ddl_kv_mgr_handle); + int end_ddl_redo_and_create_ddl_sstable(ObLSHandle &ls_handle, + const ObITable::TableKey &table_key, + const uint64_t table_id, + const int64_t execution_id, + const int64_t ddl_task_id); int write_redo_log(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, const blocksstable::MacroBlockId ¯o_block_id, - const bool allow_remote_write); + const bool allow_remote_write, + ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle); int wait_redo_log_finish(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, const blocksstable::MacroBlockId ¯o_block_id); - int write_commit_log(const ObITable::TableKey &table_key, + int write_commit_log(ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle, + const ObITable::TableKey &table_key, const int64_t table_id, const int64_t execution_id, const int64_t ddl_task_id, @@ -282,7 +298,7 @@ class ObDDLRedoLogWriterCallback : public blocksstable::ObIMacroBlockFlushCallba public: ObDDLRedoLogWriterCallback(); virtual ~ObDDLRedoLogWriterCallback(); - int init(const blocksstable::ObDDLMacroBlockType block_type, const ObITable::TableKey &table_key, ObDDLSSTableRedoWriter *ddl_writer); + int init(const blocksstable::ObDDLMacroBlockType block_type, const ObITable::TableKey &table_key, ObDDLSSTableRedoWriter *ddl_writer, ObDDLKvMgrHandle &ddl_kv_mgr_handle); int write( const ObMacroBlockHandle ¯o_handle, const blocksstable::ObLogicMacroBlockId &logic_id, @@ -298,6 +314,8 @@ private: blocksstable::MacroBlockId macro_block_id_; ObDDLSSTableRedoWriter *ddl_writer_; char *block_buffer_; + ObTabletHandle tablet_handle_; + ObDDLKvMgrHandle ddl_kv_mgr_handle_; }; } // end namespace storage diff --git a/src/storage/ddl/ob_ddl_struct.cpp b/src/storage/ddl/ob_ddl_struct.cpp index 4fe3289780..177730c496 100644 --- a/src/storage/ddl/ob_ddl_struct.cpp +++ b/src/storage/ddl/ob_ddl_struct.cpp @@ -120,7 +120,7 @@ bool ObDDLMacroBlock::is_valid() const } -ObDDLKVPendingGuard::ObDDLKVPendingGuard(ObTablet *tablet, const SCN &scn) +ObDDLKVPendingGuard::ObDDLKVPendingGuard(ObTablet *tablet, const SCN &start_scn, const SCN &scn) : tablet_(tablet), scn_(scn), kv_handle_(), ret_(OB_SUCCESS) { int ret = OB_SUCCESS; @@ -131,7 +131,7 @@ ObDDLKVPendingGuard::ObDDLKVPendingGuard(ObTablet *tablet, const SCN &scn) LOG_WARN("invalid arguments", K(ret), KP(tablet), K(scn)); } else if (OB_FAIL(tablet->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { LOG_WARN("get ddl kv mgr failed", K(ret)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_or_create_ddl_kv(scn, kv_handle_))) { + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_or_create_ddl_kv(start_scn, scn, kv_handle_))) { LOG_WARN("acquire ddl kv failed", K(ret)); } else if (OB_ISNULL(curr_kv = static_cast(kv_handle_.get_table()))) { ret = OB_ERR_UNEXPECTED; @@ -180,7 +180,7 @@ int ObDDLKVPendingGuard::set_macro_block(ObTablet *tablet, const ObDDLMacroBlock int64_t try_count = 0; while ((OB_SUCCESS == ret || OB_EAGAIN == ret) && try_count < MAX_RETRY_COUNT) { ObDDLKV *ddl_kv = nullptr; - ObDDLKVPendingGuard guard(tablet, macro_block.scn_); + ObDDLKVPendingGuard guard(tablet, macro_block.ddl_start_scn_, macro_block.scn_); if (OB_FAIL(guard.get_ddl_kv(ddl_kv))) { LOG_WARN("get ddl kv failed", K(ret)); } else if (OB_ISNULL(ddl_kv)) { diff --git a/src/storage/ddl/ob_ddl_struct.h b/src/storage/ddl/ob_ddl_struct.h index 7f5fc50404..70f05e6015 100644 --- a/src/storage/ddl/ob_ddl_struct.h +++ b/src/storage/ddl/ob_ddl_struct.h @@ -70,13 +70,14 @@ class ObDDLKVPendingGuard final public: static int set_macro_block(ObTablet *tablet, const ObDDLMacroBlock ¯o_block); public: - ObDDLKVPendingGuard(ObTablet *tablet, const share::SCN &scn); + ObDDLKVPendingGuard(ObTablet *tablet, const share::SCN &start_scn, const share::SCN &scn); ~ObDDLKVPendingGuard(); int get_ret() const { return ret_; } int get_ddl_kv(ObDDLKV *&kv); TO_STRING_KV(KP(tablet_), K(scn_), K(kv_handle_), K(ret_)); private: ObTablet *tablet_; + share::SCN start_scn_; share::SCN scn_; ObTableHandleV2 kv_handle_; int ret_; diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp b/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp index 364d011213..11b6575e2a 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp @@ -233,7 +233,8 @@ ObSSTableInsertSliceWriter::~ObSSTableInsertSliceWriter() } int ObSSTableInsertSliceWriter::init(const ObSSTableInsertSliceParam &slice_param, - const ObTableSchema *table_schema) + const ObTableSchema *table_schema, + ObDDLKvMgrHandle &ddl_kv_mgr_handle) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -249,7 +250,7 @@ int ObSSTableInsertSliceWriter::init(const ObSSTableInsertSliceParam &slice_para K(slice_param.tablet_id_)); } else if (FALSE_IT(sstable_redo_writer_.set_start_scn(slice_param.start_scn_))) { } else if (OB_FAIL(redo_log_writer_callback_.init(DDL_MB_DATA_TYPE, slice_param.table_key_, - &sstable_redo_writer_))) { + &sstable_redo_writer_, ddl_kv_mgr_handle))) { LOG_WARN("fail to init redo log writer callback", KR(ret)); } else if (OB_FAIL(data_desc_.init(*table_schema, slice_param.ls_id_, @@ -513,6 +514,7 @@ int ObSSTableInsertTabletContext::build_sstable_slice( const ObTableSchema *table_schema = nullptr; ObArenaAllocator allocator(lib::ObLabel("PartInsSstTmp")); ObSSTableInsertSliceWriter *sstable_slice_writer = nullptr; + bool ddl_committed = false; if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard( tenant_id, schema_guard, build_param.schema_version_))) { LOG_WARN("get tenant schema failed", K(ret), K(build_param)); @@ -548,7 +550,7 @@ int ObSSTableInsertTabletContext::build_sstable_slice( } else if (tablet_id != row_tablet_id) { ret = OB_SUCCESS; break; - } else if (OB_FAIL(sstable_slice_writer->append_row(*row_val))) { + } else if (!ddl_committed && OB_FAIL(sstable_slice_writer->append_row(*row_val))) { int tmp_ret = OB_SUCCESS; if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret && table_schema->is_unique_index()) { LOG_USER_ERROR(OB_ERR_PRIMARY_KEY_DUPLICATE, @@ -566,17 +568,26 @@ int ObSSTableInsertTabletContext::build_sstable_slice( task_id, row_tablet_id.id(), GCTX.self_addr(), *GCTX.sql_proxy_, index_key_buffer))) { LOG_WARN("generate index ddl error message", K(tmp_ret), K(ret)); } + } else if (OB_TRANS_COMMITED == ret) { + ret = OB_SUCCESS; + ddl_committed = true; } else { LOG_WARN("macro block writer append row failed", K(ret)); } - } else { + } + if (OB_SUCC(ret)) { LOG_DEBUG("sstable insert op append row", KPC(row_val)); ++affected_rows; } } if (OB_SUCC(ret)) { - if (OB_FAIL(sstable_slice_writer->close())) { - LOG_WARN("close writer failed", K(ret)); + if (!ddl_committed && OB_FAIL(sstable_slice_writer->close())) { + if (OB_TRANS_COMMITED == ret) { + ret = OB_SUCCESS; + ddl_committed = true; + } else { + LOG_WARN("close writer failed", K(ret)); + } } } } @@ -656,7 +667,7 @@ int ObSSTableInsertTabletContext::construct_sstable_slice_writer( if (OB_ISNULL(sstable_slice_writer = OB_NEWx(ObSSTableInsertSliceWriter, (&allocator)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObSSTableInsertSliceWriter", KR(ret)); - } else if (OB_FAIL(sstable_slice_writer->init(slice_param, table_schema))) { + } else if (OB_FAIL(sstable_slice_writer->init(slice_param, table_schema, ddl_kv_mgr_handle_))) { LOG_WARN("fail to init sstable slice writer", KR(ret), K(slice_param)); } if (OB_FAIL(ret)) { @@ -807,6 +818,7 @@ public: ~GetManageTabletIDs() = default; int operator()(common::hash::HashMapPair &entry) { + int ret = ret_code_; // for LOG_WARN if (OB_LIKELY(OB_SUCCESS == ret_code_) && OB_SUCCESS != (ret_code_ = tablet_ids_.push_back(entry.first))) { LOG_WARN("push back tablet id failed", K(ret_code_), K(entry.first)); } @@ -829,15 +841,8 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog( share::schema::ObMultiVersionSchemaService *schema_service = nullptr; const share::schema::ObTableSchema *table_schema = nullptr; const uint64_t tenant_id = MTL_ID(); - SCN commit_scn; ObSchemaGetterGuard schema_guard; - if (OB_UNLIKELY(!table_key.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(table_key)); - } else if (OB_ISNULL(ls_handle_.get_ls())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls is null", K(ret)); - } else if (OB_ISNULL(schema_service = GCTX.schema_service_)) { + if (OB_ISNULL(schema_service = GCTX.schema_service_)) { ret = OB_ERR_SYS; LOG_WARN("schema service is null", K(ret), KP(schema_service)); } else if (OB_FAIL(schema_service->get_tenant_schema_guard(tenant_id, schema_guard))) { @@ -847,51 +852,11 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog( } else if (OB_ISNULL(table_schema)) { ret = OB_TABLE_NOT_EXIST; LOG_WARN("table schema is null", K(ret), K(table_key), KP(table_schema)); - } - - if (OB_FAIL(ret)) { - } else if (OB_FAIL(data_sstable_redo_writer_.write_commit_log(table_key, - table_schema->get_table_id(), - build_param_.execution_id_, - build_param_.ddl_task_id_, - commit_scn))) { - if (OB_TASK_EXPIRED == ret) { - LOG_INFO("ddl task expired", K(ret), K(table_key), K(build_param_)); - } else { - LOG_WARN("fail write ddl commit log", K(ret), K(table_key)); - } } else { DEBUG_SYNC(AFTER_REMOTE_WRITE_DDL_PREPARE_LOG); - ObTabletHandle tablet_handle; - ObDDLKvMgrHandle ddl_kv_mgr_handle; - ObLS *ls = ls_handle_.get_ls(); - const ObLSID &ls_id = ls->get_ls_id(); - const ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_; - const SCN &ddl_start_scn = data_sstable_redo_writer_.get_start_scn(); - const uint64_t table_id = table_schema->get_table_id(); - const int64_t ddl_task_id = build_param_.ddl_task_id_; - if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle_, tablet_id, tablet_handle))) { - LOG_WARN("get tablet failed", K(ret)); - } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { - LOG_WARN("get ddl kv manager failed", K(ret), K(ls_id), K(tablet_id)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(ddl_start_scn, - commit_scn, - table_id, - ddl_task_id))) { - if (OB_TASK_EXPIRED == ret) { - LOG_INFO("ddl task expired", K(ret), K(ls_id), K(tablet_id), - K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn()); - } else { - LOG_WARN("failed to do ddl kv commit", K(ret), K(ddl_start_scn), K(commit_scn), K(build_param_)); - } - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_merge_success(ddl_start_scn, commit_scn))) { - if (OB_TASK_EXPIRED == ret) { - LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id), - K(ddl_start_scn), "new_ddl_start_scn", - ddl_kv_mgr_handle.get_obj()->get_start_scn(), K(build_param_)); - } else { - LOG_WARN("failed to wait ddl merge", K(ret), K(ddl_start_scn), K(build_param_)); - } + if (OB_FAIL(data_sstable_redo_writer_.end_ddl_redo_and_create_ddl_sstable( + ls_handle_, table_key, table_id, build_param_.execution_id_, build_param_.ddl_task_id_))) { + LOG_WARN("fail create ddl sstable", K(ret), K(table_key)); } } return ret; diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx.h b/src/storage/ddl/ob_direct_insert_sstable_ctx.h index db3a418246..074d1536d9 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx.h +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx.h @@ -135,7 +135,8 @@ public: ObSSTableInsertSliceWriter(); ~ObSSTableInsertSliceWriter(); int init(const ObSSTableInsertSliceParam &slice_param, - const share::schema::ObTableSchema *table_schema); + const share::schema::ObTableSchema *table_schema, + ObDDLKvMgrHandle &ddl_kv_mgr_handle); int append_row(blocksstable::ObDatumRow &datum_row); int append_row(const common::ObNewRow &row_val); int close(); diff --git a/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp b/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp index 26afa83785..71e593c67c 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp +++ b/src/storage/ddl/ob_tablet_ddl_kv_mgr.cpp @@ -32,8 +32,7 @@ using namespace oceanbase::storage; ObTabletDDLKvMgr::ObTabletDDLKvMgr() : is_inited_(false), success_start_scn_(SCN::min_scn()), ls_id_(), tablet_id_(), table_key_(), cluster_version_(0), start_scn_(SCN::min_scn()), commit_scn_(SCN::min_scn()), max_freeze_scn_(SCN::min_scn()), - table_id_(0), execution_id_(-1), head_(0), tail_(0), lock_(ObLatchIds::TABLET_DDL_KV_MGR_LOCK), ref_cnt_(0), - can_schedule_major_compaction_(false) + table_id_(0), execution_id_(-1), head_(0), tail_(0), lock_(), ref_cnt_(0) { } @@ -47,7 +46,7 @@ void ObTabletDDLKvMgr::destroy() if (is_started()) { LOG_INFO("start destroy ddl kv manager", K(ls_id_), K(tablet_id_), K(start_scn_), K(head_), K(tail_), K(lbt())); } - TCWLockGuard guard(lock_); + ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); ATOMIC_STORE(&ref_cnt_, 0); for (int64_t pos = head_; pos < tail_; ++pos) { const int64_t idx = get_idx(pos); @@ -69,7 +68,6 @@ void ObTabletDDLKvMgr::destroy() execution_id_ = -1; success_start_scn_.set_min(); is_inited_ = false; - can_schedule_major_compaction_ = false; } int ObTabletDDLKvMgr::init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id) @@ -94,7 +92,8 @@ int ObTabletDDLKvMgr::init(const share::ObLSID &ls_id, const common::ObTabletID // ddl start from checkpoint // keep ddl sstable table -int ObTabletDDLKvMgr::ddl_start(const ObITable::TableKey &table_key, +int ObTabletDDLKvMgr::ddl_start(ObTablet &tablet, + const ObITable::TableKey &table_key, const SCN &start_scn, const int64_t cluster_version, const int64_t execution_id, @@ -115,7 +114,7 @@ int ObTabletDDLKvMgr::ddl_start(const ObITable::TableKey &table_key, ret = OB_ERR_SYS; LOG_WARN("tablet id not same", K(ret), K(table_key), K(tablet_id_)); } else { - TCWLockGuard guard(lock_); + ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); if (start_scn_.is_valid_and_not_min()) { if (execution_id >= execution_id_ && start_scn >= start_scn_) { LOG_INFO("execution id changed, need cleanup", K(ls_id_), K(tablet_id_), K(execution_id_), K(execution_id), K(start_scn_), K(start_scn)); @@ -169,6 +168,8 @@ int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, } else if (start_scn < start_scn_) { ret = OB_TASK_EXPIRED; LOG_INFO("skip ddl commit log", K(start_scn), K(*this)); + } else if (OB_FAIL(set_commit_scn(commit_scn))) { + LOG_WARN("failed to set commit scn", K(ret)); } else if (OB_FAIL(freeze_ddl_kv(commit_scn))) { LOG_WARN("freeze ddl kv failed", K(ret), K(commit_scn)); } else { @@ -183,7 +184,6 @@ int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, } table_id_ = table_id; ddl_task_id_ = ddl_task_id; - commit_scn_ = commit_scn; ObDDLTableMergeDagParam param; param.ls_id_ = ls_id_; @@ -318,6 +318,116 @@ int ObTabletDDLKvMgr::get_ddl_major_merge_param(ObDDLTableMergeDagParam ¶m) return ret; } +int ObTabletDDLKvMgr::get_rec_scn(SCN &rec_scn) +{ + int ret = OB_SUCCESS; + ObLSHandle ls_handle; + ObTabletHandle tablet_handle; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret), K(is_inited_)); + } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { + LOG_WARN("failed to get log stream", K(ret), K(ls_id_)); + } else if (OB_FAIL(ls_handle.get_ls()->get_tablet(tablet_id_, + tablet_handle, + ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { + LOG_WARN("get tablet handle failed", K(ret), K(ls_id_), K(tablet_id_)); + } + + // rec scn of ddl start log + if (OB_SUCC(ret)) { + const ObTabletMeta &tablet_meta = tablet_handle.get_obj()->get_tablet_meta(); + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); + if (start_scn_.is_valid_and_not_min() && start_scn_ != tablet_meta.ddl_start_scn_) { + // has a latest start log and not flushed to tablet meta, keep it + rec_scn = SCN::min(rec_scn, start_scn_); + } + } + + // rec scn of ddl commit log + if (OB_SUCC(ret)) { + const ObTabletMeta &tablet_meta = tablet_handle.get_obj()->get_tablet_meta(); + if (tablet_meta.ddl_commit_scn_.is_valid_and_not_min()) { + // has commit log and already dumped to tablet meta, skip + } else { + const SCN commit_scn = get_commit_scn(tablet_meta); + if (commit_scn.is_valid_and_not_min()) { + // has commit log and not yet dumped to tablet meta + rec_scn = SCN::min(rec_scn, commit_scn); + } else { + // no commit log + } + } + } + + // rec scn of ddl redo + if (OB_SUCC(ret)) { + bool has_ddl_kv = false; + if (OB_FAIL(check_has_effective_ddl_kv(has_ddl_kv))) { + LOG_WARN("failed to check ddl kv", K(ret)); + } else if (has_ddl_kv) { + SCN min_scn; + if (OB_FAIL(get_ddl_kv_min_scn(min_scn))) { + LOG_WARN("fail to get ddl kv min log ts", K(ret)); + } else { + rec_scn = SCN::min(rec_scn, min_scn); + } + } + } + return ret; +} + +int ObTabletDDLKvMgr::set_commit_scn(const SCN &commit_scn) +{ + int ret = OB_SUCCESS; + ObLSHandle ls_handle; + ObTabletHandle tablet_handle; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret), K(is_inited_)); + } else if (OB_UNLIKELY(commit_scn <= SCN::min_scn())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(commit_scn)); + } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { + LOG_WARN("failed to get log stream", K(ret), K(ls_id_)); + } else if (OB_FAIL(ls_handle.get_ls()->get_tablet(tablet_id_, + tablet_handle, + ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { + LOG_WARN("get tablet handle failed", K(ret), K(ls_id_), K(tablet_id_)); + } else { + ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); + const SCN old_commit_scn = get_commit_scn_nolock(tablet_handle.get_obj()->get_tablet_meta()); + if (old_commit_scn.is_valid_and_not_min() && old_commit_scn != commit_scn) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("already committed by others", K(ret), K(commit_scn), K(*this)); + } else { + commit_scn_ = commit_scn; + } + } + return ret; +} + +SCN ObTabletDDLKvMgr::get_commit_scn(const ObTabletMeta &tablet_meta) +{ + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); + return get_commit_scn_nolock(tablet_meta); +} + +SCN ObTabletDDLKvMgr::get_commit_scn_nolock(const ObTabletMeta &tablet_meta) +{ + SCN commit_scn = SCN::min_scn(); + if (tablet_meta.ddl_commit_scn_.is_valid_and_not_min() || commit_scn_.is_valid_and_not_min()) { + if (tablet_meta.ddl_commit_scn_.is_valid_and_not_min()) { + commit_scn = tablet_meta.ddl_commit_scn_; + } else { + commit_scn = commit_scn_; + } + } else { + commit_scn = SCN::min_scn(); + } + return commit_scn; +} + int ObTabletDDLKvMgr::set_commit_success(const SCN &start_scn) { int ret = OB_SUCCESS; @@ -328,7 +438,7 @@ int ObTabletDDLKvMgr::set_commit_success(const SCN &start_scn) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(start_scn)); } else { - TCWLockGuard guard(lock_); + ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); if (start_scn < start_scn_) { ret = OB_TASK_EXPIRED; LOG_WARN("ddl task expired", K(ret), K(start_scn), K(*this)); @@ -349,9 +459,9 @@ int ObTabletDDLKvMgr::set_commit_success(const SCN &start_scn) return ret; } -bool ObTabletDDLKvMgr::is_commit_success() const +bool ObTabletDDLKvMgr::is_commit_success() { - TCRLockGuard guard(lock_); + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); return is_commit_success_unlock(); } @@ -360,10 +470,15 @@ bool ObTabletDDLKvMgr::is_commit_success_unlock() const return success_start_scn_ > SCN::min_scn() && success_start_scn_ == start_scn_; } -bool ObTabletDDLKvMgr::can_schedule_major_compaction() const +bool ObTabletDDLKvMgr::can_schedule_major_compaction(const ObTabletMeta &tablet_meta) { - TCRLockGuard guard(lock_); - return can_schedule_major_compaction_ && !is_commit_success_unlock(); + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); + return can_schedule_major_compaction_nolock(tablet_meta); +} + +bool ObTabletDDLKvMgr::can_schedule_major_compaction_nolock(const ObTabletMeta &tablet_meta) +{ + return get_commit_scn_nolock(tablet_meta).is_valid_and_not_min() && !is_commit_success_unlock(); } int ObTabletDDLKvMgr::cleanup() @@ -373,7 +488,7 @@ int ObTabletDDLKvMgr::cleanup() ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else { - TCWLockGuard guard(lock_); + ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); cleanup_unlock(); } return ret; @@ -399,15 +514,40 @@ void ObTabletDDLKvMgr::cleanup_unlock() table_id_ = 0; execution_id_ = -1; success_start_scn_.set_min(); - can_schedule_major_compaction_ = false; } bool ObTabletDDLKvMgr::is_execution_id_older(const int64_t execution_id) { - TCRLockGuard guard(lock_); return execution_id < execution_id_; } +int ObTabletDDLKvMgr::set_execution_id_nolock(const int64_t execution_id) +{ + int ret = OB_SUCCESS; + if (execution_id < execution_id_) { + ret = OB_TASK_EXPIRED; + LOG_WARN("ddl task expired", K(ret), K(execution_id), K(*this)); + } else { + execution_id_ = execution_id; + } + return ret; +} + +int ObTabletDDLKvMgr::set_execution_id(const int64_t execution_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret), K(is_inited_)); + } else { + ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); + if (OB_FAIL(set_execution_id_nolock(execution_id))) { + LOG_WARN("failed to set execution id", K(ret)); + } + } + return ret; +} + int ObTabletDDLKvMgr::online() { int ret = OB_SUCCESS; @@ -430,7 +570,8 @@ int ObTabletDDLKvMgr::online() table_key.version_range_.base_version_ = 0; table_key.version_range_.snapshot_version_ = tablet_meta.ddl_snapshot_version_; const SCN &start_scn = tablet_meta.ddl_start_scn_; - if (OB_FAIL(ddl_start(table_key, + if (OB_FAIL(ddl_start(*tablet_handle.get_obj(), + table_key, start_scn, tablet_meta.ddl_cluster_version_, tablet_meta.ddl_execution_id_, @@ -463,7 +604,7 @@ int ObTabletDDLKvMgr::register_to_tablet(const SCN &ddl_start_scn, ObDDLKvMgrHan ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { LOG_WARN("get tablet handle failed", K(ret), K(ls_id_), K(tablet_id_)); } else { - TCWLockGuard guard(lock_); + ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); if (ddl_start_scn < start_scn_) { ret = OB_TASK_EXPIRED; LOG_INFO("ddl task expired", K(ret), K(ls_id_), K(tablet_id_), K(start_scn_), K(ddl_start_scn)); @@ -502,7 +643,7 @@ int ObTabletDDLKvMgr::unregister_from_tablet(const SCN &ddl_start_scn, ObDDLKvMg ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { LOG_WARN("get tablet handle failed", K(ret), K(ls_id_), K(tablet_id_)); } else { - TCWLockGuard guard(lock_); + ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); if (ddl_start_scn < start_scn_) { ret = OB_TASK_EXPIRED; LOG_INFO("ddl task expired", K(ret), K(ls_id_), K(tablet_id_), K(start_scn_), K(ddl_start_scn)); @@ -523,6 +664,31 @@ int ObTabletDDLKvMgr::unregister_from_tablet(const SCN &ddl_start_scn, ObDDLKvMg return ret; } +int ObTabletDDLKvMgr::rdlock(const int64_t timeout_us, uint32_t &tid) +{ + int ret = OB_SUCCESS; + if (OB_SUCC(lock_.rdlock(ObLatchIds::TABLET_DDL_KV_MGR_LOCK, timeout_us))) { + tid = static_cast(GETTID()); + } + return ret; +} + +int ObTabletDDLKvMgr::wrlock(const int64_t timeout_us, uint32_t &tid) +{ + int ret = OB_SUCCESS; + if (OB_SUCC(lock_.wrlock(ObLatchIds::TABLET_DDL_KV_MGR_LOCK, timeout_us))) { + tid = static_cast(GETTID()); + } + return ret; +} + +void ObTabletDDLKvMgr::unlock(const uint32_t tid) +{ + if (OB_SUCCESS != lock_.unlock(&tid)) { + ob_abort(); + } +} + int ObTabletDDLKvMgr::update_tablet(const SCN &start_scn, const int64_t snapshot_version, const SCN &ddl_checkpoint_scn) { int ret = OB_SUCCESS; @@ -620,16 +786,13 @@ int ObTabletDDLKvMgr::update_ddl_major_sstable() ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { LOG_WARN("get tablet handle failed", K(ret), K(ls_id_), K(tablet_id_)); } else { - { - TCWLockGuard guard(lock_); - can_schedule_major_compaction_ = true; - } ObTabletHandle new_tablet_handle; ObUpdateTableStoreParam param(tablet_handle.get_obj()->get_snapshot_version(), ObVersionRange::MIN_VERSION, // multi_version_start &tablet_handle.get_obj()->get_storage_schema(), ls_handle.get_ls()->get_rebuild_seq()); param.ddl_info_.keep_old_ddl_sstable_ = true; + param.ddl_info_.ddl_commit_scn_ = commit_scn_; if (OB_FAIL(ls_handle.get_ls()->update_tablet_table_store(tablet_id_, param, new_tablet_handle))) { LOG_WARN("failed to update tablet table store", K(ret), K(ls_id_), K(tablet_id_), K(param)); } @@ -647,10 +810,12 @@ int ObTabletDDLKvMgr::get_ddl_param(ObTabletDDLParam &ddl_param) ret = OB_STATE_NOT_MATCH; LOG_WARN("ddl not started", K(ret)); } else { + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); ddl_param.tenant_id_ = MTL_ID(); ddl_param.ls_id_ = ls_id_; ddl_param.table_key_ = table_key_; ddl_param.start_scn_ = start_scn_; + ddl_param.commit_scn_ = commit_scn_; ddl_param.snapshot_version_ = table_key_.get_snapshot_version(); ddl_param.cluster_version_ = cluster_version_; } @@ -667,7 +832,7 @@ int ObTabletDDLKvMgr::get_freezed_ddl_kv(const SCN &freeze_scn, ObTableHandleV2 LOG_WARN("not init", K(ret), K(is_inited_)); } else { bool found = false; - TCRLockGuard guard(lock_); + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); for (int64_t i = head_; OB_SUCC(ret) && !found && i < tail_; ++i) { const int64_t idx = get_idx(i); ObTableHandleV2 &cur_kv_handle = ddl_kv_handles_[idx]; @@ -720,7 +885,7 @@ int ObTabletDDLKvMgr::get_active_ddl_kv_impl(ObTableHandleV2 &kv_handle) return ret; } -int ObTabletDDLKvMgr::get_or_create_ddl_kv(const SCN &scn, ObTableHandleV2 &kv_handle) +int ObTabletDDLKvMgr::get_or_create_ddl_kv(const SCN &start_scn, const SCN &scn, ObTableHandleV2 &kv_handle) { int ret = OB_SUCCESS; kv_handle.reset(); @@ -731,17 +896,26 @@ int ObTabletDDLKvMgr::get_or_create_ddl_kv(const SCN &scn, ObTableHandleV2 &kv_h ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(scn)); } else { - ObTableHandleV2 tmp_kv_handle; - TCRLockGuard guard(lock_); - try_get_ddl_kv_unlock(scn, kv_handle); + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); + if (start_scn != start_scn_) { + ret = OB_TASK_EXPIRED; + LOG_WARN("ddl task expired", K(ret), K(start_scn), KPC(this)); + } else { + try_get_ddl_kv_unlock(scn, kv_handle); + } } if (OB_SUCC(ret) && !kv_handle.is_valid()) { - TCWLockGuard guard(lock_); - try_get_ddl_kv_unlock(scn, kv_handle); - if (kv_handle.is_valid()) { - // do nothing - } else if (OB_FAIL(alloc_ddl_kv(kv_handle))) { - LOG_WARN("create ddl kv failed", K(ret)); + ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); + if (start_scn != start_scn_) { + ret = OB_TASK_EXPIRED; + LOG_WARN("ddl task expired", K(ret), K(start_scn), KPC(this)); + } else { + try_get_ddl_kv_unlock(scn, kv_handle); + if (kv_handle.is_valid()) { + // do nothing + } else if (OB_FAIL(alloc_ddl_kv(kv_handle))) { + LOG_WARN("create ddl kv failed", K(ret)); + } } } return ret; @@ -770,7 +944,7 @@ int ObTabletDDLKvMgr::freeze_ddl_kv(const SCN &freeze_scn) { int ret = OB_SUCCESS; ObTableHandleV2 kv_handle; - TCWLockGuard guard(lock_); + ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret)); @@ -809,7 +983,7 @@ int ObTabletDDLKvMgr::release_ddl_kvs(const SCN &end_scn) { int ret = OB_SUCCESS; DEBUG_SYNC(BEFORE_RELEASE_DDL_KV); - TCWLockGuard guard(lock_); + ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret)); @@ -844,7 +1018,7 @@ int ObTabletDDLKvMgr::release_ddl_kvs(const SCN &end_scn) int ObTabletDDLKvMgr::get_ddl_kv_min_scn(SCN &min_scn) { int ret = OB_SUCCESS; - TCRLockGuard guard(lock_); + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); min_scn = SCN::max_scn(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -868,6 +1042,7 @@ int ObTabletDDLKvMgr::get_ddl_kvs_unlock(const bool frozen_only, ObTablesHandleA { int ret = OB_SUCCESS; kv_handle_array.reset(); + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret)); @@ -893,7 +1068,7 @@ int ObTabletDDLKvMgr::get_ddl_kvs(const bool frozen_only, ObTablesHandleArray &k { int ret = OB_SUCCESS; kv_handle_array.reset(); - TCRLockGuard guard(lock_); + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret)); @@ -903,15 +1078,15 @@ int ObTabletDDLKvMgr::get_ddl_kvs(const bool frozen_only, ObTablesHandleArray &k return ret; } -int ObTabletDDLKvMgr::get_ddl_kvs_for_query(ObTablesHandleArray &kv_handle_array) +int ObTabletDDLKvMgr::get_ddl_kvs_for_query(ObTablet &tablet, ObTablesHandleArray &kv_handle_array) { int ret = OB_SUCCESS; kv_handle_array.reset(); - TCRLockGuard guard(lock_); + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret)); - } else if (!can_schedule_major_compaction_) { + } else if (!can_schedule_major_compaction_nolock(tablet.get_tablet_meta())) { // do nothing } else if (OB_FAIL(get_ddl_kvs_unlock(true/*frozen_only*/, kv_handle_array))) { LOG_WARN("get ddl kv unlock failed", K(ret)); @@ -922,7 +1097,7 @@ int ObTabletDDLKvMgr::get_ddl_kvs_for_query(ObTablesHandleArray &kv_handle_array int ObTabletDDLKvMgr::check_has_effective_ddl_kv(bool &has_ddl_kv) { int ret = OB_SUCCESS; - TCRLockGuard guard(lock_); + ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret)); diff --git a/src/storage/ddl/ob_tablet_ddl_kv_mgr.h b/src/storage/ddl/ob_tablet_ddl_kv_mgr.h index 3a39b4a045..b183823de5 100644 --- a/src/storage/ddl/ob_tablet_ddl_kv_mgr.h +++ b/src/storage/ddl/ob_tablet_ddl_kv_mgr.h @@ -36,36 +36,47 @@ public: ObTabletDDLKvMgr(); ~ObTabletDDLKvMgr(); int init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id); // init before memtable mgr - int ddl_start(const ObITable::TableKey &table_key, const share::SCN &start_scn, const int64_t cluster_version, const int64_t execution_id, const share::SCN &checkpoint_scn); + int ddl_start(ObTablet &tablet, const ObITable::TableKey &table_key, const share::SCN &start_scn, const int64_t cluster_version, const int64_t execution_id, const share::SCN &checkpoint_scn); int ddl_commit(const share::SCN &start_scn, const share::SCN &commit_scn, const uint64_t table_id = 0, const int64_t ddl_task_id = 0); // schedule build a major sstable int schedule_ddl_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool is_replay); // try wait build major sstable int wait_ddl_merge_success(const share::SCN &start_scn, const share::SCN &commit_scn); int get_ddl_param(ObTabletDDLParam &ddl_param); - int get_or_create_ddl_kv(const share::SCN &scn, ObTableHandleV2 &kv_handle); // used in active ddl kv guard + int get_or_create_ddl_kv(const share::SCN &start_scn, const share::SCN &scn, ObTableHandleV2 &kv_handle); // used in active ddl kv guard int get_freezed_ddl_kv(const share::SCN &freeze_scn, ObTableHandleV2 &kv_handle); // locate ddl kv with exeact freeze log ts int get_ddl_kvs(const bool frozen_only, ObTablesHandleArray &kv_handle_array); // get all freeze ddl kvs - int get_ddl_kvs_for_query(ObTablesHandleArray &kv_handle_array); + int get_ddl_kvs_for_query(ObTablet &tablet, ObTablesHandleArray &kv_handle_array); int freeze_ddl_kv(const share::SCN &freeze_scn = share::SCN::min_scn()); // freeze the active ddl kv, when memtable freeze or ddl commit int release_ddl_kvs(const share::SCN &rec_scn); // release persistent ddl kv, used in ddl merge task for free ddl kv int check_has_effective_ddl_kv(bool &has_ddl_kv); // used in ddl log handler for checkpoint int get_ddl_kv_min_scn(share::SCN &min_scn); // for calculate rec_scn of ls share::SCN get_start_scn() const { return start_scn_; } - share::SCN get_commit_scn() const { return commit_scn_; } bool is_started() const { return share::SCN::min_scn() != start_scn_; } + void set_commit_scn_nolock(const share::SCN &scn) { commit_scn_ = scn; } + int set_commit_scn(const share::SCN &scn); + share::SCN get_commit_scn_nolock(const ObTabletMeta &tablet_meta); + share::SCN get_commit_scn(const ObTabletMeta &tablet_meta); int set_commit_success(const share::SCN &start_scn); - bool is_commit_success() const; + bool is_commit_success(); common::ObTabletID get_tablet_id() const { return tablet_id_; } + share::ObLSID get_ls_id() const { return ls_id_; } int cleanup(); int online(); bool is_execution_id_older(const int64_t execution_id); + int set_execution_id_nolock(const int64_t execution_id); + int set_execution_id(const int64_t execution_id); int register_to_tablet(const share::SCN &ddl_start_scn, ObDDLKvMgrHandle &kv_mgr_handle); int unregister_from_tablet(const share::SCN &ddl_start_scn, ObDDLKvMgrHandle &kv_mgr_handle); + int rdlock(const int64_t timeout_us, uint32_t &lock_tid); + int wrlock(const int64_t timeout_us, uint32_t &lock_tid); + void unlock(const uint32_t lock_tid); OB_INLINE void inc_ref() { ATOMIC_INC(&ref_cnt_); } OB_INLINE int64_t dec_ref() { return ATOMIC_SAF(&ref_cnt_, 1 /* just sub 1 */); } OB_INLINE int64_t get_ref() const { return ATOMIC_LOAD(&ref_cnt_); } OB_INLINE void reset() { destroy(); } - bool can_schedule_major_compaction() const; + bool can_schedule_major_compaction(const ObTabletMeta &tablet_meta); + bool can_schedule_major_compaction_nolock(const ObTabletMeta &tablet_meta); int get_ddl_major_merge_param(ObDDLTableMergeDagParam &merge_param); + int get_rec_scn(share::SCN &rec_scn); TO_STRING_KV(K_(is_inited), K_(success_start_scn), K_(ls_id), K_(tablet_id), K_(table_key), K_(cluster_version), K_(start_scn), K_(commit_scn), K_(max_freeze_scn), K_(table_id), K_(execution_id), K_(ddl_task_id), K_(head), K_(tail), K_(ref_cnt)); @@ -101,9 +112,8 @@ private: ObTableHandleV2 ddl_kv_handles_[MAX_DDL_KV_CNT_IN_STORAGE]; int64_t head_; int64_t tail_; - common::TCRWLock lock_; + common::ObLatch lock_; volatile int64_t ref_cnt_ CACHE_ALIGNED; - bool can_schedule_major_compaction_; DISALLOW_COPY_AND_ASSIGN(ObTabletDDLKvMgr); }; diff --git a/src/storage/ls/ob_ls_ddl_log_handler.cpp b/src/storage/ls/ob_ls_ddl_log_handler.cpp index cca641b247..dc62ee7be1 100644 --- a/src/storage/ls/ob_ls_ddl_log_handler.cpp +++ b/src/storage/ls/ob_ls_ddl_log_handler.cpp @@ -46,6 +46,7 @@ int ObLSDDLLogHandler::init(ObLS *ls) TCWLockGuard guard(online_lock_); is_online_ = true; ls_ = ls; + last_rec_scn_ = ls_->get_clog_checkpoint_scn(); is_inited_ = true; } return ret; @@ -57,6 +58,7 @@ void ObLSDDLLogHandler::reset() is_online_ = false; is_inited_ = false; ls_ = nullptr; + last_rec_scn_.reset(); ddl_log_replayer_.reset(); } @@ -286,7 +288,6 @@ SCN ObLSDDLLogHandler::get_rec_scn() } else { while (OB_SUCC(ret)) { ObDDLKvMgrHandle ddl_kv_mgr_handle; - SCN min_scn = SCN::max_scn(); if (OB_FAIL(tablet_iter.get_next_ddl_kv_mgr(ddl_kv_mgr_handle))) { if (OB_ITER_END == ret) { ret = OB_SUCCESS; @@ -297,18 +298,17 @@ SCN ObLSDDLLogHandler::get_rec_scn() } else if (OB_UNLIKELY(!ddl_kv_mgr_handle.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid ddl kv mgr handle", K(ret), K(ddl_kv_mgr_handle)); - } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->check_has_effective_ddl_kv(has_ddl_kv))) { - LOG_WARN("failed to check ddl kv", K(ret)); - } else if (has_ddl_kv) { - if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_ddl_kv_min_scn(min_scn))) { - LOG_WARN("fail to get ddl kv min log ts", K(ret)); - } else { - rec_scn = SCN::min(rec_scn, min_scn); - } + } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_rec_scn(rec_scn))) { + LOG_WARN("failed to get rec scn", K(ret)); } } } - return OB_SUCC(ret) ? rec_scn : SCN::max_scn(); + if (OB_FAIL(ret)) { + rec_scn = SCN::max(last_rec_scn_, ls_->get_clog_checkpoint_scn()); + } else if (!rec_scn.is_max()) { + last_rec_scn_ = SCN::max(last_rec_scn_, rec_scn); + } + return rec_scn; } int ObLSDDLLogHandler::replay_ddl_redo_log_(const char *log_buf, diff --git a/src/storage/ls/ob_ls_ddl_log_handler.h b/src/storage/ls/ob_ls_ddl_log_handler.h index 53e096e0ab..8d3d55829c 100644 --- a/src/storage/ls/ob_ls_ddl_log_handler.h +++ b/src/storage/ls/ob_ls_ddl_log_handler.h @@ -29,7 +29,7 @@ class ObLSDDLLogHandler : public logservice::ObIReplaySubHandler, public logservice::ObICheckpointSubHandler { public: - ObLSDDLLogHandler() : is_inited_(false), is_online_(false), ls_(nullptr) {} + ObLSDDLLogHandler() : is_inited_(false), is_online_(false), ls_(nullptr), last_rec_scn_() {} ~ObLSDDLLogHandler() { reset(); } public: @@ -66,6 +66,7 @@ private: ObLS *ls_; common::TCRWLock online_lock_; ObDDLRedoLogReplayer ddl_log_replayer_; + share::SCN last_rec_scn_; }; } // storage diff --git a/src/storage/ob_storage_struct.cpp b/src/storage/ob_storage_struct.cpp index fa1c3be4b5..38bc5b7dae 100644 --- a/src/storage/ob_storage_struct.cpp +++ b/src/storage/ob_storage_struct.cpp @@ -211,6 +211,7 @@ int ObGetMergeTablesResult::assign(const ObGetMergeTablesResult &src) ObDDLTableStoreParam::ObDDLTableStoreParam() : keep_old_ddl_sstable_(true), ddl_start_scn_(SCN::min_scn()), + ddl_commit_scn_(SCN::min_scn()), ddl_checkpoint_scn_(SCN::min_scn()), ddl_snapshot_version_(0), ddl_execution_id_(-1), @@ -222,6 +223,7 @@ ObDDLTableStoreParam::ObDDLTableStoreParam() bool ObDDLTableStoreParam::is_valid() const { return ddl_start_scn_.is_valid() + && ddl_commit_scn_.is_valid() && ddl_checkpoint_scn_.is_valid() && ddl_snapshot_version_ >= 0 && ddl_execution_id_ >= 0 diff --git a/src/storage/ob_storage_struct.h b/src/storage/ob_storage_struct.h index 16566e3e37..fa9d68b30b 100644 --- a/src/storage/ob_storage_struct.h +++ b/src/storage/ob_storage_struct.h @@ -300,11 +300,12 @@ public: ObDDLTableStoreParam(); ~ObDDLTableStoreParam() = default; bool is_valid() const; - TO_STRING_KV(K_(keep_old_ddl_sstable), K_(ddl_start_scn), K_(ddl_checkpoint_scn), + TO_STRING_KV(K_(keep_old_ddl_sstable), K_(ddl_start_scn), K_(ddl_commit_scn), K_(ddl_checkpoint_scn), K_(ddl_snapshot_version), K_(ddl_execution_id), K_(ddl_cluster_version)); public: bool keep_old_ddl_sstable_; share::SCN ddl_start_scn_; + share::SCN ddl_commit_scn_; share::SCN ddl_checkpoint_scn_; int64_t ddl_snapshot_version_; int64_t ddl_execution_id_; diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index d9c33c86cc..bcb514a3c0 100644 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -88,7 +88,7 @@ ObTablet::ObTablet() is_inited_(false) { #if defined(__x86_64__) - static_assert(sizeof(ObTablet) <= 2496, "The size of ObTablet will affect the meta memory manager, and the necessity of adding new fields needs to be considered."); + static_assert(sizeof(ObTablet) <= 2560, "The size of ObTablet will affect the meta memory manager, and the necessity of adding new fields needs to be considered."); #endif } @@ -2178,6 +2178,7 @@ int ObTablet::build_migration_tablet_param(ObMigrationTabletParam &mig_tablet_pa mig_tablet_param.max_serialized_medium_scn_ = tablet_meta_.max_serialized_medium_scn_; mig_tablet_param.ddl_execution_id_ = tablet_meta_.ddl_execution_id_; mig_tablet_param.ddl_cluster_version_ = tablet_meta_.ddl_cluster_version_; + mig_tablet_param.ddl_commit_scn_ = tablet_meta_.ddl_commit_scn_; mig_tablet_param.report_status_.reset(); if (OB_FAIL(mig_tablet_param.storage_schema_.init(mig_tablet_param.allocator_, storage_schema_))) { @@ -2655,7 +2656,8 @@ int ObTablet::start_ddl_if_need() table_key.version_range_.base_version_ = 0; table_key.version_range_.snapshot_version_ = tablet_meta_.ddl_snapshot_version_; const SCN &start_scn = tablet_meta_.ddl_start_scn_; - if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(table_key, + if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(*this, + table_key, start_scn, tablet_meta_.ddl_cluster_version_, tablet_meta_.ddl_execution_id_, diff --git a/src/storage/tablet/ob_tablet_meta.cpp b/src/storage/tablet/ob_tablet_meta.cpp index cced4c2873..1389aad1d5 100644 --- a/src/storage/tablet/ob_tablet_meta.cpp +++ b/src/storage/tablet/ob_tablet_meta.cpp @@ -59,6 +59,7 @@ ObTabletMeta::ObTabletMeta() ddl_execution_id_(-1), ddl_cluster_version_(0), max_serialized_medium_scn_(0), + ddl_commit_scn_(SCN::min_scn()), is_inited_(false) { } @@ -113,6 +114,7 @@ int ObTabletMeta::init( multi_version_start_ = snapshot_version; table_store_flag_ = table_store_flag; ddl_start_scn_.set_min(); + ddl_commit_scn_.set_min(); ddl_snapshot_version_ = 0; max_sync_storage_schema_version_ = max_sync_storage_schema_version; max_serialized_medium_scn_ = max_serialized_medium_scn; @@ -177,6 +179,7 @@ int ObTabletMeta::init( create_scn_ = old_tablet_meta.create_scn_; start_scn_ = old_tablet_meta.start_scn_; ddl_start_scn_ = SCN::max(ddl_info.ddl_start_scn_, old_tablet_meta.ddl_start_scn_); + ddl_commit_scn_ = SCN::max(ddl_info.ddl_commit_scn_, old_tablet_meta.ddl_commit_scn_); clog_checkpoint_scn_ = SCN::max(clog_checkpoint_scn, old_tablet_meta.clog_checkpoint_scn_); compat_mode_ = old_tablet_meta.compat_mode_; ha_status_ = old_tablet_meta.ha_status_; @@ -236,6 +239,7 @@ int ObTabletMeta::init( tx_data_ = param.tx_data_; table_store_flag_ = param.table_store_flag_; ddl_start_scn_ = param.ddl_start_scn_; + ddl_commit_scn_ = param.ddl_commit_scn_; ddl_snapshot_version_ = param.ddl_snapshot_version_; max_sync_storage_schema_version_ = param.max_sync_storage_schema_version_; max_serialized_medium_scn_ = param.max_serialized_medium_scn_; @@ -284,6 +288,7 @@ int ObTabletMeta::init( tx_data_ = old_tablet_meta.tx_data_; table_store_flag_ = old_tablet_meta.table_store_flag_; ddl_start_scn_ = old_tablet_meta.ddl_start_scn_; + ddl_commit_scn_ = old_tablet_meta.ddl_commit_scn_; ddl_snapshot_version_ = old_tablet_meta.ddl_snapshot_version_; max_sync_storage_schema_version_ = old_tablet_meta.max_sync_storage_schema_version_; max_serialized_medium_scn_ = old_tablet_meta.max_serialized_medium_scn_; @@ -366,6 +371,7 @@ int ObTabletMeta::init( table_store_flag_ = table_store_flag; ddl_checkpoint_scn_ = old_tablet_meta.ddl_checkpoint_scn_; ddl_start_scn_ = old_tablet_meta.ddl_start_scn_; + ddl_commit_scn_ = old_tablet_meta.ddl_commit_scn_; ddl_snapshot_version_ = old_tablet_meta.ddl_snapshot_version_; max_sync_storage_schema_version_ = max_sync_storage_schema_version; max_serialized_medium_scn_ = MAX(old_tablet_meta.max_serialized_medium_scn_, @@ -406,6 +412,7 @@ void ObTabletMeta::reset() ddl_data_.reset(); table_store_flag_.reset(); ddl_start_scn_.set_min(); + ddl_commit_scn_.set_min(); ddl_snapshot_version_ = OB_INVALID_TIMESTAMP; max_sync_storage_schema_version_ = 0; max_serialized_medium_scn_ = 0; @@ -510,6 +517,8 @@ int ObTabletMeta::serialize(char *buf, const int64_t len, int64_t &pos) LOG_WARN("failed to serialize ddl cluster version", K(ret), K(len), K(new_pos), K_(ddl_cluster_version)); } else if (new_pos - pos < length_ && OB_FAIL(serialization::encode_i64(buf, len, new_pos, max_serialized_medium_scn_))) { LOG_WARN("failed to serialize max_serialized_medium_scn", K(ret), K(len), K(new_pos), K_(max_serialized_medium_scn)); + } else if (new_pos - pos < length_ && OB_FAIL(ddl_commit_scn_.fixed_serialize(buf, len, new_pos))) { + LOG_WARN("failed to serialize ddl commit scn", K(ret), K(len), K(new_pos), K_(ddl_commit_scn)); } else if (OB_UNLIKELY(length_ != new_pos - pos)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet meta's length doesn't match standard length", K(ret), K(new_pos), K(pos), K_(length)); @@ -596,6 +605,8 @@ int ObTabletMeta::deserialize( LOG_WARN("failed to deserialize ddl cluster version", K(ret), K(len), K(new_pos)); } else if (new_pos - pos < length_ && OB_FAIL(serialization::decode_i64(buf, len, new_pos, &max_serialized_medium_scn_))) { LOG_WARN("failed to deserialize max_serialized_medium_scn", K(ret), K(len), K(new_pos)); + } else if (new_pos - pos < length_ && OB_FAIL(ddl_commit_scn_.fixed_deserialize(buf, len, new_pos))) { + LOG_WARN("failed to deserialize ddl commit scn", K(ret), K(len), K(new_pos)); } else if (OB_UNLIKELY(length_ != new_pos - pos)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet's length doesn't match standard length", K(ret), K(new_pos), K(pos), K_(length)); @@ -641,6 +652,7 @@ int64_t ObTabletMeta::get_serialize_size() const size += serialization::encoded_length_i64(ddl_execution_id_); size += serialization::encoded_length_i64(ddl_cluster_version_); size += serialization::encoded_length_i64(max_serialized_medium_scn_); + size += ddl_commit_scn_.get_fixed_serialize_size(); return size; } @@ -732,6 +744,7 @@ int ObTabletMeta::update(const ObMigrationTabletParam ¶m) ddl_checkpoint_scn_ = param.ddl_checkpoint_scn_; table_store_flag_ = param.table_store_flag_; ddl_start_scn_ = param.ddl_start_scn_; + ddl_commit_scn_ = param.ddl_commit_scn_; ddl_snapshot_version_ = param.ddl_snapshot_version_; max_sync_storage_schema_version_ = param.max_sync_storage_schema_version_; max_serialized_medium_scn_ = param.max_serialized_medium_scn_; @@ -815,6 +828,7 @@ ObMigrationTabletParam::ObMigrationTabletParam() ddl_execution_id_(-1), ddl_cluster_version_(0), max_serialized_medium_scn_(0), + ddl_commit_scn_(SCN::min_scn()), allocator_() { } @@ -915,6 +929,8 @@ int ObMigrationTabletParam::serialize(char *buf, const int64_t len, int64_t &pos LOG_WARN("failed to serialize ddl cluster version", K(ret), K(len), K(new_pos), K_(ddl_cluster_version)); } else if (new_pos - pos < length && OB_FAIL(serialization::encode_i64(buf, len, new_pos, max_serialized_medium_scn_))) { LOG_WARN("failed to serialize max_serialized_medium_scn", K(ret), K(len), K(new_pos), K_(max_serialized_medium_scn)); + } else if (new_pos - pos < length && OB_FAIL(ddl_commit_scn_.fixed_serialize(buf, len, new_pos))) { + LOG_WARN("failed to serialize ddl commit scn", K(ret), K(len), K(new_pos), K_(ddl_commit_scn)); } else if (OB_UNLIKELY(length != new_pos - pos)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("length doesn't match standard length", K(ret), K(new_pos), K(pos), K(length)); @@ -981,6 +997,8 @@ int ObMigrationTabletParam::deserialize_old(const char *buf, const int64_t len, LOG_WARN("failed to deserialize ddl execution id", K(ret), K(len), K(new_pos)); } else if (OB_FAIL(serialization::decode_i64(buf, len, new_pos, &ddl_cluster_version_))) { LOG_WARN("failed to deserialize ddl cluster version", K(ret), K(len), K(new_pos)); + } else if (OB_FAIL(ddl_commit_scn_.fixed_deserialize(buf, len, new_pos))) { + LOG_WARN("failed to deserialize ddl commit scn", K(ret), K(len), K(new_pos)); } else { // old format compatibility to 4.0, DO NOT add any new member deserialization!!! compat_mode_ = static_cast(compat_mode); @@ -1067,6 +1085,8 @@ int ObMigrationTabletParam::deserialize(const char *buf, const int64_t len, int6 LOG_WARN("failed to deserialize ddl cluster version", K(ret), K(len), K(new_pos)); } else if (new_pos - pos < length && OB_FAIL(serialization::decode_i64(buf, len, new_pos, &max_serialized_medium_scn_))) { LOG_WARN("failed to deserialize max sync medium snapshot", K(ret), K(len), K(new_pos)); + } else if (new_pos - pos < length && OB_FAIL(ddl_commit_scn_.fixed_deserialize(buf, len, new_pos))) { + LOG_WARN("failed to deserialize ddl commit scn", K(ret), K(len), K(new_pos)); } else if (OB_UNLIKELY(length != new_pos - pos)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet's length doesn't match standard length", K(ret), K(new_pos), K(pos), K(length), KPC(this)); @@ -1117,6 +1137,7 @@ int64_t ObMigrationTabletParam::get_serialize_size() const size += serialization::encoded_length_i64(ddl_execution_id_); size += serialization::encoded_length_i64(ddl_cluster_version_); size += serialization::encoded_length_i64(max_serialized_medium_scn_); + size += ddl_commit_scn_.get_fixed_serialize_size(); return size; } @@ -1147,6 +1168,7 @@ void ObMigrationTabletParam::reset() ddl_execution_id_ = -1; ddl_cluster_version_ = 0; max_serialized_medium_scn_ = 0; + ddl_commit_scn_.set_min(); allocator_.reset(); } @@ -1180,6 +1202,7 @@ int ObMigrationTabletParam::assign(const ObMigrationTabletParam ¶m) max_serialized_medium_scn_ = param.max_serialized_medium_scn_; ddl_execution_id_ = param.ddl_execution_id_; ddl_cluster_version_ = param.ddl_cluster_version_; + ddl_commit_scn_ = param.ddl_commit_scn_; if (OB_FAIL(ddl_data_.assign(param.ddl_data_))) { LOG_WARN("failed to assign ddl data", K(ret), K(param), K(*this)); diff --git a/src/storage/tablet/ob_tablet_meta.h b/src/storage/tablet/ob_tablet_meta.h index e8d094c9e5..b433d21a83 100644 --- a/src/storage/tablet/ob_tablet_meta.h +++ b/src/storage/tablet/ob_tablet_meta.h @@ -142,7 +142,8 @@ public: K_(max_sync_storage_schema_version), K_(max_serialized_medium_scn), K_(ddl_execution_id), - K_(ddl_cluster_version)); + K_(ddl_cluster_version), + K_(ddl_commit_scn)); public: int32_t version_; @@ -175,6 +176,7 @@ public: int64_t ddl_execution_id_; int64_t ddl_cluster_version_; int64_t max_serialized_medium_scn_; // update when serialized medium info + share::SCN ddl_commit_scn_; //ATTENTION : Add a new variable need consider ObMigrationTabletParam // and tablet meta init interface for migration. // yuque : https://yuque.antfin.com/ob/ob-backup/zzwpuh @@ -235,7 +237,8 @@ public: K_(medium_info_list), K_(table_store_flag), K_(max_sync_storage_schema_version), - K_(max_serialized_medium_scn)); + K_(max_serialized_medium_scn), + K_(ddl_commit_scn)); private: int deserialize_old(const char *buf, const int64_t len, int64_t &pos); @@ -273,6 +276,7 @@ public: int64_t ddl_execution_id_; int64_t ddl_cluster_version_; int64_t max_serialized_medium_scn_; + share::SCN ddl_commit_scn_; // Add new serialization member before this line, below members won't serialize common::ObArenaAllocator allocator_; // for storage schema diff --git a/src/storage/tablet/ob_tablet_table_store.cpp b/src/storage/tablet/ob_tablet_table_store.cpp index d2b46097a6..1a6452ad24 100644 --- a/src/storage/tablet/ob_tablet_table_store.cpp +++ b/src/storage/tablet/ob_tablet_table_store.cpp @@ -1111,7 +1111,7 @@ int ObTabletTableStore::pull_ddl_memtables(common::ObIAllocator &allocator) LOG_TRACE("there is no ddl kv mgr in this tablet", K(ret)); ret = OB_SUCCESS; } - } else if (OB_FAIL(kv_mgr_handle.get_obj()->get_ddl_kvs_for_query(ddl_kvs_handle))) { + } else if (OB_FAIL(kv_mgr_handle.get_obj()->get_ddl_kvs_for_query(*tablet_ptr_, ddl_kvs_handle))) { LOG_WARN("failed to get all ddl freeze kvs", K(ret)); } else { SCN ddl_checkpoint_scn = tablet_ptr_->get_tablet_meta().ddl_checkpoint_scn_;