diff --git a/deps/oblib/src/lib/stat/ob_latch_define.cpp b/deps/oblib/src/lib/stat/ob_latch_define.cpp index 6c1acc1408..ad17ffa3a5 100644 --- a/deps/oblib/src/lib/stat/ob_latch_define.cpp +++ b/deps/oblib/src/lib/stat/ob_latch_define.cpp @@ -22,7 +22,7 @@ const ObLatchDesc OB_LATCHES[] = { #undef LATCH_DEF }; -static_assert(ARRAYSIZEOF(OB_LATCHES) == 312, "DO NOT delete latch defination"); +static_assert(ARRAYSIZEOF(OB_LATCHES) == 313, "DO NOT delete latch defination"); static_assert(ObLatchIds::LATCH_END == ARRAYSIZEOF(OB_LATCHES) - 1, "update id of LATCH_END before adding your defination"); } diff --git a/deps/oblib/src/lib/stat/ob_latch_define.h b/deps/oblib/src/lib/stat/ob_latch_define.h index 7f7e148db1..dcb41f1229 100644 --- a/deps/oblib/src/lib/stat/ob_latch_define.h +++ b/deps/oblib/src/lib/stat/ob_latch_define.h @@ -324,8 +324,8 @@ LATCH_DEF(LOG_EXTERNAL_STORAGE_HANDLER_RW_LOCK, 308, "log external storage handl LATCH_DEF(LOG_EXTERNAL_STORAGE_HANDLER_LOCK, 309, "log external storage handler spin lock", LATCH_FIFO, 2000, 0, LOG_EXTERNAL_STORAGE_HANDLER_WAIT, "log external storage handler spin lock") LATCH_DEF(PL_DEBUG_RUNTIMEINFO_LOCK, 310, "PL DEBUG RuntimeInfo lock", LATCH_FIFO, 2000, 0, PL_DEBUG_RUNTIME_INFO_WAIT, "PL DEBUG RuntimeInfo lock") - -LATCH_DEF(LATCH_END, 311, "latch end", LATCH_FIFO, 2000, 0, WAIT_EVENT_END, "latch end") +LATCH_DEF(MDS_TABLE_HANDLER_LOCK, 311, "mds table handler lock", LATCH_READ_PREFER, 2000, 0, MDS_TABLE_HANDLER_WAIT, "mds table handler lock") +LATCH_DEF(LATCH_END, 312, "latch end", LATCH_FIFO, 2000, 0, WAIT_EVENT_END, "latch end") #endif #ifndef OB_LATCH_DEFINE_H_ diff --git a/deps/oblib/src/lib/wait_event/ob_wait_event.h b/deps/oblib/src/lib/wait_event/ob_wait_event.h index 0d2895a022..cc61bab2f4 100644 --- a/deps/oblib/src/lib/wait_event/ob_wait_event.h +++ b/deps/oblib/src/lib/wait_event/ob_wait_event.h @@ -368,6 +368,7 @@ WAIT_EVENT_DEF(GC_HANDLER_WAIT, 16054, "gc handler lock wait", "", "", "", CONCU WAIT_EVENT_DEF(FREEZE_THREAD_POOL_WAIT, 16055, "freeze thread pool wait", "", "", "", CONCURRENCY, "FREEZE_THREAD_POOL_WAIT", true) WAIT_EVENT_DEF(DDL_EXECUTE_LOCK_WAIT, 16056, "ddl execute lock wait", "", "", "", CONCURRENCY, "DDL_EXECUTE_LOCK_WAIT", true) WAIT_EVENT_DEF(DUP_TABLET_LOCK_WAIT, 16057, "dup tablet lock wait", "", "", "", CONCURRENCY, "DUP_TABLET_LOCK_WAIT", true) +WAIT_EVENT_DEF(MDS_TABLE_HANDLER_WAIT, 16058, "mds table handler wait", "", "", "", CONCURRENCY, "MDS_TABLE_HANDLER_WAIT", true) // WAIT_EVENT_DEF(TENANT_MGR_TENANT_BUCKET_LOCK_WAIT, 16056, "tenant mgr tenant bucket lock wait", "", "", "", CONCURRENCY, "TENANT_MGR_TENANT_BUCKET_LOCK_WAIT", true) diff --git a/src/storage/multi_data_source/buffer_ctx.cpp b/src/storage/multi_data_source/buffer_ctx.cpp index 1cb0a0764f..11e0cfe7f0 100644 --- a/src/storage/multi_data_source/buffer_ctx.cpp +++ b/src/storage/multi_data_source/buffer_ctx.cpp @@ -36,7 +36,7 @@ int BufferCtxNode::serialize(char *buf, const int64_t buf_len, int64_t &pos) con MDS_TG(10_ms); if (OB_NOT_NULL(ctx_)) { // 序列化时,如果ctx不为空,那么其类型必须是有效的,这里防御一下,否则反序列化的报错会增加排查难度 - OB_ASSERT(ctx_->get_binding_type_id() != INVALID_VALUE); + MDS_ASSERT(ctx_->get_binding_type_id() != INVALID_VALUE); int64_t type_id = ctx_->get_binding_type_id(); if (MDS_FAIL(serialization::encode(buf, buf_len, pos, type_id))) { MDS_LOG(ERROR, "serialize buffer ctx id failed", KR(ret), K(type_id)); diff --git a/src/storage/multi_data_source/buffer_ctx.h b/src/storage/multi_data_source/buffer_ctx.h index c1d5a30df7..6f177243b1 100644 --- a/src/storage/multi_data_source/buffer_ctx.h +++ b/src/storage/multi_data_source/buffer_ctx.h @@ -63,7 +63,7 @@ class BufferCtxNode { public: BufferCtxNode() : ctx_(nullptr) {} - void set_ctx(BufferCtx *ctx) { OB_ASSERT(ctx_ == nullptr); ctx_ = ctx; }// 预期不应该出现覆盖的情况 + void set_ctx(BufferCtx *ctx) { MDS_ASSERT(ctx_ == nullptr); ctx_ = ctx; }// 预期不应该出现覆盖的情况 const BufferCtx *get_ctx() const { return ctx_; } void destroy_ctx(); void on_redo(const share::SCN &redo_scn) { ctx_->on_redo(redo_scn); } diff --git a/src/storage/multi_data_source/mds_ctx.cpp b/src/storage/multi_data_source/mds_ctx.cpp index 0a4409452b..9e3d52ec9a 100644 --- a/src/storage/multi_data_source/mds_ctx.cpp +++ b/src/storage/multi_data_source/mds_ctx.cpp @@ -35,7 +35,7 @@ MdsCtx::~MdsCtx() MdsWLockGuard lg(lock_); list_empty = write_list_.empty(); if (!list_empty) { - OB_ASSERT(state_ != TwoPhaseCommitState::ON_COMMIT);// if decided, list is empty + MDS_ASSERT(state_ != TwoPhaseCommitState::ON_COMMIT);// if decided, list is empty MDS_LOG_RET(INFO, OB_SUCCESS, "nodes not commit or abort when mds ctx destroyed", K(*this)); } } @@ -79,7 +79,7 @@ void MdsCtx::record_written_node(ListNode *node) void MdsCtx::on_redo(const share::SCN &redo_scn) { - OB_ASSERT(writer_.writer_type_ == WriterType::TRANSACTION);// can only called by TRANS, or must call single_log_commit() + MDS_ASSERT(writer_.writer_type_ == WriterType::TRANSACTION);// can only called by TRANS, or must call single_log_commit() do_while_retry_with_lock_until_success_for_all_([this, redo_scn]() {// if failed on any node, will release lock and try from first node again return for_each_node_try_([redo_scn](MdsNode &node) {// the operation tried on each node bool try_success = true; @@ -93,11 +93,13 @@ void MdsCtx::on_redo(const share::SCN &redo_scn) void MdsCtx::before_prepare() { - OB_ASSERT(writer_.writer_type_ == WriterType::TRANSACTION);// can only called by TRANS, or must call single_log_commit() + MDS_ASSERT(writer_.writer_type_ == WriterType::TRANSACTION);// can only called by TRANS, or must call single_log_commit() do_while_retry_with_lock_until_success_for_all_([this]() {// if failed on any node, will release lock and try from first node again return for_each_node_try_([](MdsNode &node) {// the operation tried on each node bool try_success = true; - if (node.status_.get_state() < TwoPhaseCommitState::BEFORE_PREPARE) {// avoid try lock + if (node.status_.get_state() == TwoPhaseCommitState::ON_PREPARE) {// due to force majeure + // do nothing, just accept it + } else if (node.status_.get_state() < TwoPhaseCommitState::BEFORE_PREPARE) {// avoid try lock try_success = node.try_before_prepare(); } return try_success; @@ -107,7 +109,7 @@ void MdsCtx::before_prepare() void MdsCtx::on_prepare(const share::SCN &prepare_version) { - OB_ASSERT(writer_.writer_type_ == WriterType::TRANSACTION);// can only called by TRANS, or must call single_log_commit() + MDS_ASSERT(writer_.writer_type_ == WriterType::TRANSACTION);// can only called by TRANS, or must call single_log_commit() do_while_retry_with_lock_until_success_for_all_([this, prepare_version]() {// if failed on any node, will release lock and try from first node again return for_each_node_try_([prepare_version](MdsNode &node) {// the operation tried on each node bool try_success = true; @@ -121,7 +123,7 @@ void MdsCtx::on_prepare(const share::SCN &prepare_version) void MdsCtx::on_commit(const share::SCN &commit_version, const share::SCN &commit_scn) { - OB_ASSERT(writer_.writer_type_ == WriterType::TRANSACTION);// can only called by TRANS, or must call single_log_commit() + MDS_ASSERT(writer_.writer_type_ == WriterType::TRANSACTION);// can only called by TRANS, or must call single_log_commit() do_while_retry_with_lock_until_success_for_all_([this, commit_version, commit_scn]() {// if failed on any node, will release lock and try from first node again return for_each_node_fetch_to_try_([commit_version, commit_scn](MdsNode &node) {// the operation tried on each node, if failed, the fetched node will be insert to head again to rollback bool try_success = true; @@ -161,7 +163,7 @@ void MdsCtx::single_log_commit(const share::SCN commit_version, const share::SCN void MdsCtx::single_log_abort() { - OB_ASSERT(writer_.writer_type_ != WriterType::TRANSACTION);// TRANSACTION use two-phase-commit + MDS_ASSERT(writer_.writer_type_ != WriterType::TRANSACTION);// TRANSACTION use two-phase-commit do_while_retry_with_lock_until_success_for_all_([this]() {// if failed on any node, will release lock and try from first node again return for_each_node_fetch_to_try_([](MdsNode &node) {// the operation tried on each node, if failed, the fetched node will be insert to head again to rollback bool try_success = true; diff --git a/src/storage/multi_data_source/mds_ctx.h b/src/storage/multi_data_source/mds_ctx.h index de5b86e5de..fcac52f7ef 100644 --- a/src/storage/multi_data_source/mds_ctx.h +++ b/src/storage/multi_data_source/mds_ctx.h @@ -70,13 +70,18 @@ private: int64_t try_times = 0; do { MdsWLockGuard lg(lock_); - operate_all_nodes_succeed = op(); - if (OB_LIKELY(operate_all_nodes_succeed)) { - if (new_state != TwoPhaseCommitState::STATE_END) { - check_and_advance_two_phase_commit(state_, new_state); + if (state_ == TwoPhaseCommitState::ON_PREPARE && new_state == TwoPhaseCommitState::BEFORE_PREPARE) {// due to force majeure + // do nothing, just accept it + operate_all_nodes_succeed = true; + } else { + operate_all_nodes_succeed = op(); + if (OB_LIKELY(operate_all_nodes_succeed)) { + if (new_state != TwoPhaseCommitState::STATE_END) { + check_and_advance_two_phase_commit(state_, new_state); + } + } else if (OB_UNLIKELY((++try_times % 10000) == 0)) { + MDS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "do-while retry too much times", K(try_times), K(*this)); } - } else if (OB_UNLIKELY((++try_times % 10000) == 0)) { - MDS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "do-while retry too much times", K(try_times), K(*this)); } } while (!operate_all_nodes_succeed && ({PAUSE(); true;}));// keep trying lock until success, while-try is for avoid thread deadlock } diff --git a/src/storage/multi_data_source/mds_node.cpp b/src/storage/multi_data_source/mds_node.cpp index f5e9fe81eb..88f65be9bd 100644 --- a/src/storage/multi_data_source/mds_node.cpp +++ b/src/storage/multi_data_source/mds_node.cpp @@ -63,10 +63,10 @@ void MdsNodeStatus::advance(TwoPhaseCommitState new_stat)// ATOMIC MdsNodeStatus old_node_status, new_node_status; do { old_node_status.union_.value_ = ATOMIC_LOAD(&union_.value_); - OB_ASSERT(new_stat >= old_node_status.union_.field_.state_ && + MDS_ASSERT(new_stat >= old_node_status.union_.field_.state_ && new_stat < TwoPhaseCommitState::STATE_END && new_stat > TwoPhaseCommitState::STATE_INIT); - OB_ASSERT(STATE_CHECK_ALLOWED_MAP[(int)old_node_status.union_.field_.state_][(int)new_stat] == true); + MDS_ASSERT(STATE_CHECK_ALLOWED_MAP[(int)old_node_status.union_.field_.state_][(int)new_stat] == true); new_node_status.union_.value_ = old_node_status.union_.value_; new_node_status.union_.field_.state_ = new_stat; } while (!ATOMIC_BCAS(&union_.value_, old_node_status.union_.value_, new_node_status.union_.value_)); diff --git a/src/storage/multi_data_source/mds_node.ipp b/src/storage/multi_data_source/mds_node.ipp index 85485583c7..f8d2e8ebc9 100644 --- a/src/storage/multi_data_source/mds_node.ipp +++ b/src/storage/multi_data_source/mds_node.ipp @@ -167,7 +167,7 @@ template void UserMdsNode::before_prepare_() { if (status_.union_.field_.state_ == TwoPhaseCommitState::BEFORE_PREPARE) {// reentrant - OB_ASSERT(trans_version_.is_min()); + MDS_ASSERT(trans_version_.is_min()); } else { status_.advance(TwoPhaseCommitState::BEFORE_PREPARE); trans_version_.set_min();// block all read operation @@ -192,7 +192,7 @@ void UserMdsNode::on_prepare_(const share::SCN &prepare_version) { if (status_.union_.field_.state_ == TwoPhaseCommitState::ON_PREPARE) {// reentrant if (is_valid_scn_(prepare_version)) { - OB_ASSERT(trans_version_ == prepare_version); + MDS_ASSERT(trans_version_ == prepare_version); } } else { status_.advance(TwoPhaseCommitState::ON_PREPARE); @@ -226,8 +226,8 @@ void UserMdsNode::on_commit_(const share::SCN &commit_version, K(*this), K(commit_version), K(commit_scn)); } else { if (status_.union_.field_.state_ == TwoPhaseCommitState::ON_COMMIT) {// reentrant - OB_ASSERT(trans_version_ == commit_version); - OB_ASSERT(end_scn_ == commit_scn); + MDS_ASSERT(trans_version_ == commit_version); + MDS_ASSERT(end_scn_ == commit_scn); } else { status_.advance(TwoPhaseCommitState::ON_COMMIT); trans_version_ = commit_version; diff --git a/src/storage/multi_data_source/mds_row.ipp b/src/storage/multi_data_source/mds_row.ipp index 5ea190c4f0..3e15f23872 100644 --- a/src/storage/multi_data_source/mds_row.ipp +++ b/src/storage/multi_data_source/mds_row.ipp @@ -396,7 +396,7 @@ int MdsRow::get_with_read_wrapper_(READ_OP &&read_operation, }; for_each_node_([&](const UserMdsNode &node) { bool can_read = false; - OB_ASSERT(!node.is_aborted_());// should not meet aborted node, cause aborted node deleted immediately + MDS_ASSERT(!node.is_aborted_());// should not meet aborted node, cause aborted node deleted immediately if (OB_FAIL(check_node_op(node, can_read))) { MDS_LOG_GET(WARN, "check node can read meet failed"); } else if (can_read) { @@ -537,19 +537,19 @@ int MdsRow::scan_dump_node_from_tail_to_head(DUMP_OP &&op, [this, &op, &ret, &dump_kv, flush_scn, for_flush, mds_table_id, mds_unit_id, &has_meet_undump_node](const UserMdsNode &node) { bool need_break = false; MDS_TG(5_ms); - OB_ASSERT(!node.is_aborted_());// should not see aborted node, cause it is deleted immediatly + MDS_ASSERT(!node.is_aborted_());// should not see aborted node, cause it is deleted immediatly if (!node.is_dumped_()) { has_meet_undump_node = true;// this is a barrier, mark it to defense } if (!node.is_committed_()) { if (node.redo_scn_.is_valid() && !flush_scn.is_max()) { - OB_ASSERT(node.redo_scn_ > flush_scn);// defense + MDS_ASSERT(node.redo_scn_ > flush_scn);// defense } need_break = true; } else if (node.is_dumped_() && for_flush) {// just skip it // all nodes before first undumped node should be dumped status,(reverse scan order) // and all nodes after first undumped node should be undemped status(reverse scan order) - OB_ASSERT(has_meet_undump_node == false);// defense + MDS_ASSERT(has_meet_undump_node == false);// defense } else if (!check_node_scn_beflow_flush(node, flush_scn)) { need_break = true; } else if (MDS_FAIL(dump_kv.v_.init(mds_table_id, @@ -602,7 +602,7 @@ void MdsRow::node_abort_callback_(ListNodeBase *node) } return ret; }); - OB_ASSERT(has_meet_input_arg_node); + MDS_ASSERT(has_meet_input_arg_node); } #undef PRINT_WRAPPER } diff --git a/src/storage/multi_data_source/mds_table_base.cpp b/src/storage/multi_data_source/mds_table_base.cpp index e55ff7bcda..5ff27daba4 100644 --- a/src/storage/multi_data_source/mds_table_base.cpp +++ b/src/storage/multi_data_source/mds_table_base.cpp @@ -90,8 +90,12 @@ int MdsTableBase::register_to_mds_table_mgr() void MdsTableBase::mark_removed_from_t3m(ObTabletPointer *pointer) { + MDS_TG(1_ms); + int ret = OB_SUCCESS; if (ATOMIC_LOAD(&debug_info_.remove_ts_) != 0) { MDS_LOG_RET(WARN, OB_ERR_UNEXPECTED, "this MdsTable has been marked removed", K(*this)); + } else if (MDS_FAIL(unregister_from_mds_table_mgr())) { + MDS_LOG(WARN, "unregister from mds_table_mgr failed", KR(ret), K(*this)); } else { debug_info_.do_remove_tablet_pointer_ = pointer; debug_info_.remove_trace_id_ = *ObCurTraceId::get_trace_id(); @@ -133,7 +137,20 @@ int MdsTableBase::unregister_from_mds_table_mgr() MDS_LOG(INFO, "no need unregister from mds_table_mgr cause invalid id", KR(ret), K(*this)); } else if (MDS_FAIL(mgr_handle_.get_mds_table_mgr()->unregister_from_mds_table_mgr(this))) { MDS_LOG(ERROR, "fail to unregister mds table", K(*this)); + } + return ret; +} + +int MdsTableBase::unregister_from_removed_recorder() +{ + int ret = OB_SUCCESS; + MDS_TG(1_ms); + if (!mgr_handle_.is_valid()) { + MDS_LOG(INFO, "no need unregister from mds_table_mgr cause invalid mds_table_mgr", KR(ret), K(*this)); + } else if (!ls_id_.is_valid() || !tablet_id_.is_valid()) { + MDS_LOG(INFO, "no need unregister from mds_table_mgr cause invalid id", KR(ret), K(*this)); } else { + mgr_handle_.get_mds_table_mgr()->unregister_from_removed_mds_table_recorder(this); mgr_handle_.reset(); } return ret; diff --git a/src/storage/multi_data_source/mds_table_base.h b/src/storage/multi_data_source/mds_table_base.h index 106c2604d5..8798db6125 100644 --- a/src/storage/multi_data_source/mds_table_base.h +++ b/src/storage/multi_data_source/mds_table_base.h @@ -159,7 +159,7 @@ public: share::ObLSID get_ls_id() const; int64_t get_node_cnt() const; virtual share::SCN get_rec_scn(); - virtual int dump_status() const = 0; + virtual int operate(const ObFunction &operation) = 0; virtual int flush(share::SCN need_advanced_rec_scn_lower_limit) = 0; virtual ObTabletID get_tablet_id() const; virtual bool is_flushing() const; @@ -178,7 +178,8 @@ protected: void try_decline_rec_scn(const share::SCN scn); int get_ls_max_consequent_callbacked_scn_(share::SCN &max_consequent_callbacked_scn) const; int register_to_mds_table_mgr(); - int unregister_from_mds_table_mgr(); + int unregister_from_mds_table_mgr();// call when marked deleted or released directly + int unregister_from_removed_recorder();// call when marked deleted int merge(const int64_t construct_sequence, const share::SCN &flushing_scn); template void report_rec_scn_event_(const char (&event_str)[N], diff --git a/src/storage/multi_data_source/mds_table_handle.h b/src/storage/multi_data_source/mds_table_handle.h index 12baec1133..e03e40fa04 100644 --- a/src/storage/multi_data_source/mds_table_handle.h +++ b/src/storage/multi_data_source/mds_table_handle.h @@ -110,7 +110,6 @@ public: int get_ref_cnt(int64_t &ref_cnt) const; int get_node_cnt(int64_t &valid_cnt) const; int get_rec_scn(share::SCN &rec_scn) const; - int dump_status() const; bool is_valid() const; void reset(); MdsTableBase *get_mds_table_ptr() { return p_mds_table_base_.ptr(); } diff --git a/src/storage/multi_data_source/mds_table_handle.ipp b/src/storage/multi_data_source/mds_table_handle.ipp index b8c40f1508..924e761c04 100644 --- a/src/storage/multi_data_source/mds_table_handle.ipp +++ b/src/storage/multi_data_source/mds_table_handle.ipp @@ -549,14 +549,6 @@ inline int MdsTableHandle::get_rec_scn(share::SCN &rec_scn) const return ret; } -inline int MdsTableHandle::dump_status() const -{ - int ret = OB_SUCCESS; - CHECK_MDS_TABLE_INIT(); - p_mds_table_base_->dump_status(); - return ret; -} - inline int MdsTableHandle::get_node_cnt(int64_t &valid_cnt) const { int ret = OB_SUCCESS; diff --git a/src/storage/multi_data_source/mds_table_handler.cpp b/src/storage/multi_data_source/mds_table_handler.cpp index 76aea2b270..e62b059b72 100644 --- a/src/storage/multi_data_source/mds_table_handler.cpp +++ b/src/storage/multi_data_source/mds_table_handler.cpp @@ -141,7 +141,7 @@ int ObMdsTableHandler::try_release_nodes_below(const share::SCN &scn) MDS_TG(5_ms); MdsTableHandle mds_table_handle; { - MdsWLockGuard guard(lock_); + MdsRLockGuard guard(lock_); CLICK(); mds_table_handle = mds_table_handle_; } @@ -174,7 +174,7 @@ ObMdsTableHandler &ObMdsTableHandler::operator=(const ObMdsTableHandler &rhs)// void ObMdsTableHandler::mark_removed_from_t3m(ObTabletPointer *pointer) { int ret = OB_SUCCESS; - MdsWLockGuard guard(lock_); + MdsRLockGuard guard(lock_); if (mds_table_handle_.is_valid()) { if (OB_FAIL(mds_table_handle_.mark_removed_from_t3m(pointer))) { MDS_LOG(WARN, "fail to unregister_from_mds_table_mgr", K(*this)); diff --git a/src/storage/multi_data_source/mds_table_handler.h b/src/storage/multi_data_source/mds_table_handler.h index 69e22379bb..1e1107ae99 100644 --- a/src/storage/multi_data_source/mds_table_handler.h +++ b/src/storage/multi_data_source/mds_table_handler.h @@ -15,7 +15,9 @@ namespace mds class ObMdsTableHandler { public: - ObMdsTableHandler() : is_written_(false) {} + ObMdsTableHandler() + : is_written_(false), + lock_(common::ObLatchIds::MDS_TABLE_HANDLER_LOCK) {} ~ObMdsTableHandler(); ObMdsTableHandler(const ObMdsTableHandler &) = delete; ObMdsTableHandler &operator=(const ObMdsTableHandler &);// value sematic for tablet ponter deep copy @@ -36,6 +38,19 @@ private: MdsTableHandle mds_table_handle_;// primary handle, all other handles are copied from this one bool is_written_; mutable MdsLock lock_; + // lock_ is defined as read prefered lock, this is necessary because if this lock is write prefered, can not avoid thread deadlock: + // thread A is doing flush, access to MdsTableMgr's Bucket Lock first, then create DAG, the DAG create action will read tablet_status, and will lock mds_table_handler with RLockGuard. + // thread B is doing mark_removed_from_t3m action, which will lock mds_table_handler with RLockGuard, then access to MdsTableMgr's Bucket to remove mds_table. + // so far so good, cause both A and B lock mds_table_handler with RLockGuard, no wait relationship between A and B. + // now, just consider if there is a another thread C, locking mds_table_handler with WLockGuard, and this lock operation is just happend after B before A, and lock_ is write prefered defined. + // this is what may happened: + // 1. thread A holding MdsTableMgr's Bucket Lock, trying to lock mds_table_handler with RLockGuard, hung there though, cause lock_ is write prefered, and thread C is trying to lock it with WLockGuard. + // 2. thread B holding mds_table_handler's RLock, trying to lock MdsTableMgr's Bucket Lock, hung there obviously, cause thread A is holding it. + // 3. thread C is trying lock mds_table_handler with WLockGuard, it will never got it, cause read count on lock_ will never decline to 0. + + // So, lock_ must be read predered, if you don't understand it, just keep it in your mind. + // I know you just wonder why it's so complicated, the reason is complicated also, i can't explain it to you in a simple way. + // Better not refact this in an "elegant way", i strongly suggest you just accept it. }; } diff --git a/src/storage/multi_data_source/mds_table_impl.h b/src/storage/multi_data_source/mds_table_impl.h index eaaed43706..b72e2162d4 100644 --- a/src/storage/multi_data_source/mds_table_impl.h +++ b/src/storage/multi_data_source/mds_table_impl.h @@ -139,7 +139,7 @@ public: ObFunction &for_each_op, const int64_t mds_construct_sequence, const bool for_flush) const override; - virtual int dump_status() const override; + virtual int operate(const ObFunction &operation) override; int calculate_flush_scn_and_need_dumped_nodes_cnt_(share::SCN need_advanced_rec_scn_lower_limit, share::SCN &flush_scn, int64_t &need_dumped_nodes_cnt); diff --git a/src/storage/multi_data_source/mds_table_impl.ipp b/src/storage/multi_data_source/mds_table_impl.ipp index b3dab031c0..5643bcc2c4 100644 --- a/src/storage/multi_data_source/mds_table_impl.ipp +++ b/src/storage/multi_data_source/mds_table_impl.ipp @@ -65,8 +65,14 @@ MdsTableImpl::MdsTableImpl() : MdsTableBase::MdsTableBase() template MdsTableImpl::~MdsTableImpl() { int ret = OB_SUCCESS; - if (OB_FAIL(unregister_from_mds_table_mgr())) { - MDS_LOG(ERROR, "fail to unregister from mds table mgr", K(*this)); + if (!is_removed_from_t3m()) { + if (OB_FAIL(unregister_from_mds_table_mgr())) { + MDS_LOG(ERROR, "fail to unregister from mds table mgr", K(*this)); + ret = OB_SUCCESS; + } + } + if (OB_FAIL(unregister_from_removed_recorder())) { + MDS_LOG(ERROR, "fail to unregister from removed_recorder", K(*this)); } MDS_LOG(INFO, "mds table destructed", K(*this)); } @@ -1209,7 +1215,7 @@ template int MdsTableImpl::is_locked_by_others(bool &is_locked, const MdsWriter &self) const { auto read_op_wrapper = [&is_locked, &self](const UserMdsNode &node) -> int { - OB_ASSERT(node.status_.union_.field_.state_ != TwoPhaseCommitState::ON_ABORT); + MDS_ASSERT(node.status_.union_.field_.state_ != TwoPhaseCommitState::ON_ABORT); if (node.status_.union_.field_.state_ == TwoPhaseCommitState::ON_COMMIT) {// no lock on decided node is_locked = false; } else if (node.status_.union_.field_.writer_type_ == self.writer_type_ && @@ -1328,7 +1334,7 @@ int MdsTableImpl::is_locked_by_others(const Key &key, const MdsWriter &self) const { auto read_op_wrapper = [&is_locked, &self](const UserMdsNode &node) -> int { - OB_ASSERT(node.status_.union_.field_.state_ != TwoPhaseCommitState::ON_ABORT); + MDS_ASSERT(node.status_.union_.field_.state_ != TwoPhaseCommitState::ON_ABORT); if (node.status_.union_.field_.state_ == TwoPhaseCommitState::ON_COMMIT) { is_locked = false; } else if (node.status_.union_.field_.writer_type_ == self.writer_type_ && @@ -1369,7 +1375,7 @@ struct RecycleNodeOp MDS_TG(1_ms); { CLICK(); - OB_ASSERT(!mds_node.is_aborted_());// should not see aborted node cause cause it is deleted immediatly + MDS_ASSERT(!mds_node.is_aborted_());// should not see aborted node cause cause it is deleted immediatly if (mds_node.is_committed_()) { if (mds_node.end_scn_ == share::SCN::max_scn()) {// must has an associated valid end LOG to commit/abort ret = OB_ERR_UNEXPECTED; @@ -1471,13 +1477,16 @@ int MdsTableImpl::forcely_reset_mds_table(const char *reason) } template -inline int MdsTableImpl::dump_status() const +inline int MdsTableImpl::operate(const ObFunction &operation) { - #define PRINT_WRAPPER K(*this) + #define PRINT_WRAPPER KR(ret), K(*this) + int ret = OB_SUCCESS; MDS_TG(10_ms); MdsWLockGuard lg(lock_); - MDS_LOG_NONE(INFO, "dump mds table status"); - return OB_SUCCESS; + if (OB_FAIL(operation(*this))) { + MDS_LOG_NONE(INFO, "fail to apply upper layer operation"); + } + return ret; #undef PRINT_WRAPPER } diff --git a/src/storage/multi_data_source/mds_table_mgr.cpp b/src/storage/multi_data_source/mds_table_mgr.cpp index b8fac4e337..919ffe4ce3 100644 --- a/src/storage/multi_data_source/mds_table_mgr.cpp +++ b/src/storage/multi_data_source/mds_table_mgr.cpp @@ -29,6 +29,18 @@ using namespace share; namespace storage { namespace mds { +void RemovedMdsTableRecorder::record(MdsTableBase *mds_table) +{ + SpinWLockGuard guard(lock_); + removed_mds_table_list_.append(mds_table); +} + +void RemovedMdsTableRecorder::del(MdsTableBase *mds_table) +{ + SpinWLockGuard guard(lock_); + removed_mds_table_list_.del(mds_table); +} + int ObMdsTableMgr::init(ObLS *ls) { int ret = OB_SUCCESS; @@ -79,55 +91,10 @@ int ObMdsTableMgr::register_to_mds_table_mgr(MdsTableBase *p_mds_table) ret = OB_INVALID_ARGUMENT; MDS_LOG(ERROR, "invalid mdstable handle", KR(ret), KP(p_mds_table)); } else if (FALSE_IT(tablet_id = p_mds_table->get_tablet_id())) { + } else if (OB_FAIL(mds_table_map_.insert(tablet_id, p_mds_table))) { + MDS_LOG(ERROR, "fail to insert mds table to map", KR(ret), KPC(p_mds_table)); } else { - int op_result = OB_SUCCESS; - auto op = [p_mds_table, &op_result](const ObTabletID &id, List &list) {// with map's bucket lock protected - if (list.empty()) { - list.insert_into_head(p_mds_table); - } else { - MdsTableBase *head_mds_table = static_cast(list.list_head_); - if (!head_mds_table->is_removed_from_t3m()) { - op_result = OB_ENTRY_EXIST; - MDS_LOG_RET(INFO, op_result, "register meet conflct", K(id), K(list), KPC(p_mds_table)); - } else { - list.insert_into_head(p_mds_table); - } - } - return true; - }; - bool need_break = false; - List tmp_list; - tmp_list.append(p_mds_table); - while (!need_break) { - if (OB_FAIL(mds_table_map_.insert(tablet_id, tmp_list))) { - if (OB_ENTRY_EXIST == ret) { - ret = OB_SUCCESS; - if (OB_FAIL(mds_table_map_.operate(tablet_id, op))) { - if (OB_ENTRY_NOT_EXIST == ret) {// meet concurrent erase - ret = OB_SUCCESS; - } else {// meet errors can't handle, no need retry - need_break = true; - MDS_LOG(WARN, "operate list failed and ret is not ENTRY_NOT_EXIST", KR(ret), K(tablet_id)); - } - } else {// insert to existing list success, no need retry - need_break = true; - } - } else { - need_break = true;// meet errors can't handle, no need retry - MDS_LOG(WARN, "insert list failed and ret is not ENTRY_EXIST", KR(ret), K(tablet_id)); - } - } else { - need_break = true;// insert new list success, no need retry - } - }; - if (OB_FAIL(ret)) { - // do nothing - } else { - ret = op_result; - if (OB_SUCC(ret)) { - MDS_LOG(INFO, "register success", KR(ret), KPC(p_mds_table)); - } - } + MDS_LOG(INFO, "register success", KR(ret), KPC(p_mds_table)); } return ret; } @@ -136,31 +103,32 @@ int ObMdsTableMgr::unregister_from_mds_table_mgr(MdsTableBase *p_mds_table) { int ret = OB_SUCCESS; common::ObTabletID tablet_id; - auto op = [p_mds_table](const ObTabletID &id, List &list) {// with map's bucket lock protected - if (list.check_node_exist(p_mds_table)) { - list.del(p_mds_table); - } else { - MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "mds table not in list", KPC(p_mds_table), K(id), K(list)); - ob_abort(); + auto op = [p_mds_table, this](const ObTabletID &id, MdsTableBase *&p_mds_table_in_map) {// with map's bucket ock protected + if (p_mds_table != p_mds_table_in_map) { + MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, + "erased mds table is not same with mds table in map, but shared same tablet id", + K(id), KPC(p_mds_table), KPC(p_mds_table_in_map)); } - return list.empty(); + removed_mds_table_recorder_.record(p_mds_table); + return true;// always erase it }; if (OB_ISNULL(p_mds_table)) { ret = OB_INVALID_ARGUMENT; MDS_LOG(ERROR, "invalid mdstable handle", KR(ret), KP(p_mds_table)); } else if (FALSE_IT(tablet_id = p_mds_table->get_tablet_id())) { } else if (OB_FAIL(mds_table_map_.erase_if(tablet_id, op))) { - if (OB_EAGAIN == ret) { - ret = OB_SUCCESS; - } else { - MDS_LOG(WARN, "fail to erase kv", KR(ret), K(tablet_id)); - } + MDS_LOG(WARN, "fail to erase kv", KR(ret), K(tablet_id)); } else { MDS_LOG(INFO, "unregister success", KR(ret), KPC(p_mds_table)); } return ret; } +void ObMdsTableMgr::unregister_from_removed_mds_table_recorder(MdsTableBase *p_mds_table) +{ + removed_mds_table_recorder_.del(p_mds_table); +} + int ObMdsTableMgr::first_scan_to_get_min_rec_scn_(share::SCN &min_rec_scn) { MDS_TG(10_s); @@ -184,7 +152,14 @@ int ObMdsTableMgr::first_scan_to_get_min_rec_scn_(share::SCN &min_rec_scn) // true means iterating the next mds table return true; }; - if (OB_FAIL(mds_table_map_.for_each(get_max_min_rec_scn_op))) { + auto get_min_rec_scn_op = + [&min_rec_scn, &scan_cnt](const common::ObTabletID &, MdsTableBase *&mds_table) { + share::SCN rec_scn = mds_table->get_rec_scn(); + min_rec_scn = std::min(rec_scn, min_rec_scn); + ++scan_cnt; + return true;// true means iterating the next mds table + }; + if (OB_FAIL(mds_table_map_.for_each(get_min_rec_scn_op))) { MDS_LOG(WARN, "fail to do map for_each", KR(ret), K(*this), K(scan_cnt), K(min_rec_scn)); } return ret; @@ -196,22 +171,14 @@ int ObMdsTableMgr::second_scan_to_do_flush_(share::SCN do_flush_limit_scn) int ret = OB_SUCCESS; int64_t scan_mds_table_cnt = 0; auto flush_op = [do_flush_limit_scn, &scan_mds_table_cnt](const common::ObTabletID &tablet_id, - List &mds_table_list) { + MdsTableBase *&mds_table) { int tmp_ret = OB_SUCCESS; - if (mds_table_list.empty()) { - MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "meet empty mds table list", K(tablet_id), K(scan_mds_table_cnt)); + if (OB_TMP_FAIL(mds_table->flush(do_flush_limit_scn))) { + MDS_LOG_RET(WARN, ret, "flush mds table failed", KR(tmp_ret), K(tablet_id), K(scan_mds_table_cnt)); } else { - MdsTableBase *mds_table = static_cast(mds_table_list.list_head_); - if (mds_table->is_removed_from_t3m()) { - // jsut skip it - } else if (OB_TMP_FAIL(mds_table->flush(do_flush_limit_scn))) { - MDS_LOG_RET(WARN, ret, "flush mds table failed", KR(tmp_ret), K(tablet_id), K(scan_mds_table_cnt)); - } else { - ++scan_mds_table_cnt; - } + ++scan_mds_table_cnt; } - // true means iterating the next mds table - return true; + return true;// true means iterating the next mds table }; if (OB_FAIL(mds_table_map_.for_each(flush_op))) { MDS_LOG(WARN, "fail to do map for_each", KR(ret), K(*this), K(scan_mds_table_cnt), K(do_flush_limit_scn)); @@ -258,6 +225,10 @@ int ObMdsTableMgr::flush(SCN recycle_scn, bool need_freeze) freezing_scn_.atomic_store(recycle_scn); } } + if (freezing_scn_ > max_consequent_callbacked_scn) { + freezing_scn_ = max_consequent_callbacked_scn; + MDS_LOG_FREEZE(INFO, "freezing_scn decline to max_consequent_callbacked_scn"); + } if (min_rec_scn <= freezing_scn_) { if (MDS_FAIL(second_scan_to_do_flush_(freezing_scn_))) { MDS_LOG_FREEZE(WARN, "fail to do flush"); diff --git a/src/storage/multi_data_source/mds_table_mgr.h b/src/storage/multi_data_source/mds_table_mgr.h index e89fbaea11..666b8117ee 100644 --- a/src/storage/multi_data_source/mds_table_mgr.h +++ b/src/storage/multi_data_source/mds_table_mgr.h @@ -13,6 +13,7 @@ #ifndef SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_MGR_H #define SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_MGR_H +#include "lib/lock/ob_small_spin_lock.h" #include "lib/lock/ob_tc_rwlock.h" #include "meta_programming/ob_type_traits.h" #include "storage/checkpoint/ob_common_checkpoint.h" @@ -26,9 +27,26 @@ class ObTabletHandle; namespace mds { class MdsTableFreezeGuard; class MdsTableBase; + +class RemovedMdsTableRecorder// tablet leak will resulting mds table leak, we must prove it +{ +public: + RemovedMdsTableRecorder() = default; + void record(MdsTableBase *mds_table); + void del(MdsTableBase *mds_table); + template + void for_each(OP &&op) { + SpinRLockGuard guard(lock_); + removed_mds_table_list_.for_each_node_from_head_to_tail_until_true(op); + } +private: + SpinRWLock lock_; + mds::List removed_mds_table_list_;// this list is for record those removed mds tables from t3m, but not destroed yet +}; + class ObMdsTableMgr final : public checkpoint::ObCommonCheckpoint { - using MdsTableMap = common::ObLinearHashMap>; + using MdsTableMap = common::ObLinearHashMap; friend MdsTableFreezeGuard; public: ObMdsTableMgr() @@ -37,23 +55,44 @@ public: freezing_scn_(share::SCN::min_scn()), ref_cnt_(0), ls_(nullptr), - mds_table_map_() {} + mds_table_map_(), + removed_mds_table_recorder_() {} ~ObMdsTableMgr() { destroy(); } int init(ObLS *ls); int reset(); void offline(); void destroy(); - int register_to_mds_table_mgr(MdsTableBase *p_mds_table); - int unregister_from_mds_table_mgr(MdsTableBase *p_mds_table); - template &))> - int for_each_mds_table_list(OP &&op) { - return mds_table_map_.for_each(op); + int register_to_mds_table_mgr(MdsTableBase *p_mds_table);// call when create new mds table + int unregister_from_mds_table_mgr(MdsTableBase *p_mds_table);// call when remove tablet pointer from t3m map, record mds table in removed_mds_table_recorder + void unregister_from_removed_mds_table_recorder(MdsTableBase *p_mds_table);// call when mds table released(tablet pointer released), del from removed_mds_table_recorder + template // if op return FAIL, break for-each + int for_each_removed_mds_table(OP &&op) { + int ret = OB_SUCCESS; + auto op_wrapper = [&op, &ret](const MdsTableBase &mds_table) -> bool { + bool break_flag = false;// means keep iterating next mds_table + if (OB_FAIL(op(const_cast(mds_table)))) { + break_flag = true; + } + return break_flag; + }; + removed_mds_table_recorder_.for_each(op_wrapper); + return ret; + } + template // if op return FAIL, break for-each + int for_each_in_t3m_mds_table(OP &&op) { + auto op_wrapper = [&op](const common::ObTabletID &k, MdsTableBase* &v) -> bool { + bool keep_iterating = true;// means keep iterating next mds_table + int ret = OB_SUCCESS; + if (OB_FAIL(op(*v))) { + keep_iterating = false; + } + return keep_iterating; + }; + return mds_table_map_.for_each(op_wrapper); } - DECLARE_TO_STRING; - public: // derived from ObCommonCheckpoint share::SCN get_freezing_scn() const; virtual share::SCN get_rec_scn() override; @@ -77,6 +116,7 @@ private: int64_t ref_cnt_; ObLS *ls_; MdsTableMap mds_table_map_; + RemovedMdsTableRecorder removed_mds_table_recorder_; }; class MdsTableFreezeGuard diff --git a/src/storage/multi_data_source/runtime_utility/common_define.h b/src/storage/multi_data_source/runtime_utility/common_define.h index 5559568802..96986a8779 100644 --- a/src/storage/multi_data_source/runtime_utility/common_define.h +++ b/src/storage/multi_data_source/runtime_utility/common_define.h @@ -9,6 +9,20 @@ #include "src/share/scn.h" #include "src/share/ob_occam_time_guard.h" +#ifdef OB_BUILD_RPM + #define MDS_ASSERT(x) (void)(x) +#else + #define MDS_ASSERT(x) \ + do{ \ + bool v=(x); \ + if(OB_UNLIKELY(!(v))) { \ + _OB_LOG_RET(ERROR, oceanbase::common::OB_ERROR, "assert fail, exp=%s", #x); \ + BACKTRACE_RET(ERROR, oceanbase::common::OB_ERROR, 1, "assert fail"); \ + ob_abort(); \ + } \ + } while(false) +#endif + namespace oceanbase { namespace storage @@ -90,7 +104,7 @@ static constexpr bool STATE_CHECK_ALLOWED_MAP[static_cast(TwoPhaseCommitSta static inline void check_and_advance_two_phase_commit(TwoPhaseCommitState &state, TwoPhaseCommitState new_state) { - OB_ASSERT(STATE_CHECK_ALLOWED_MAP[(int)state][(int)new_state] == true); + MDS_ASSERT(STATE_CHECK_ALLOWED_MAP[(int)state][(int)new_state] == true); state = new_state; } @@ -168,6 +182,7 @@ enum LogPhase FLUSH, GC, FREEZE, + NOTICE, NONE }; @@ -212,6 +227,9 @@ do {\ case mds::LogPhase::FREEZE:\ oceanbase::common::databuff_printf(joined_info, joined_length, pos, "[FREEZE]%s", info);\ break;\ + case mds::LogPhase::NOTICE:\ + oceanbase::common::databuff_printf(joined_info, joined_length, pos, "[NOTICE]%s", info);\ + break;\ case mds::LogPhase::NONE:\ oceanbase::common::databuff_printf(joined_info, joined_length, pos, "[NONE]%s", info);\ break;\ @@ -232,8 +250,9 @@ do {\ #define MDS_LOG_SCAN(level, info, args...) _MDS_LOG_PHASE(level, mds::LogPhase::SCAN, info, ##args) #define MDS_LOG_FLUSH(level, info, args...) _MDS_LOG_PHASE(level, mds::LogPhase::FLUSH, info, ##args) #define MDS_LOG_GC(level, info, args...) _MDS_LOG_PHASE(level, mds::LogPhase::GC, info, ##args) -#define MDS_LOG_NONE(level, info, args...) _MDS_LOG_PHASE(level, mds::LogPhase::NONE, info, ##args) #define MDS_LOG_FREEZE(level, info, args...) _MDS_LOG_PHASE(level, mds::LogPhase::FREEZE, info, ##args) +#define MDS_LOG_NOTICE(level, info, args...) _MDS_LOG_PHASE(level, mds::LogPhase::NOTICE, info, ##args) +#define MDS_LOG_NONE(level, info, args...) _MDS_LOG_PHASE(level, mds::LogPhase::NONE, info, ##args) // flag is needed to rollback logic #define MDS_FAIL_FLAG(stmt, flag) (CLICK_FAIL(stmt) || FALSE_IT(flag = true)) diff --git a/src/storage/multi_data_source/runtime_utility/list_helper.h b/src/storage/multi_data_source/runtime_utility/list_helper.h index e16809aa26..ab803b2b33 100644 --- a/src/storage/multi_data_source/runtime_utility/list_helper.h +++ b/src/storage/multi_data_source/runtime_utility/list_helper.h @@ -4,6 +4,7 @@ #include "lib/ob_errno.h" #include "lib/utility/ob_macro_utils.h" #include "common_define.h" +#include "lib/utility/utility.h" #include "src/share/ob_delegate.h" namespace oceanbase @@ -58,26 +59,26 @@ struct ListBase// erase List Type void check_invariance_() const { #ifdef UNITTEST_DEBUG if (OB_NOT_NULL(list_head_)) { - OB_ASSERT(OB_NOT_NULL(list_tail_)); - OB_ASSERT(OB_ISNULL(list_head_->prev_)); - OB_ASSERT(OB_ISNULL(list_tail_->next_)); + MDS_ASSERT(OB_NOT_NULL(list_tail_)); + MDS_ASSERT(OB_ISNULL(list_head_->prev_)); + MDS_ASSERT(OB_ISNULL(list_tail_->next_)); // can reach tail from head const ListNodeBase *iter = list_head_; while (iter != list_tail_) { iter = iter->next_; } - OB_ASSERT(iter == list_tail_); + MDS_ASSERT(iter == list_tail_); } if (OB_NOT_NULL(list_tail_)) { - OB_ASSERT(OB_NOT_NULL(list_tail_)); - OB_ASSERT(OB_ISNULL(list_head_->prev_)); - OB_ASSERT(OB_ISNULL(list_tail_->next_)); + MDS_ASSERT(OB_NOT_NULL(list_tail_)); + MDS_ASSERT(OB_ISNULL(list_head_->prev_)); + MDS_ASSERT(OB_ISNULL(list_tail_->next_)); // can reach head from tail const ListNodeBase *iter = list_tail_; while (iter != list_head_) { iter = iter->prev_; } - OB_ASSERT(iter == list_head_); + MDS_ASSERT(iter == list_head_); } #endif } @@ -104,14 +105,14 @@ struct ListBase// erase List Type void append(ListNodeBase *new_node) { new_node->reset(); if (OB_ISNULL(list_head_)) {// insert into head - OB_ASSERT(OB_ISNULL(list_tail_)); + MDS_ASSERT(OB_ISNULL(list_tail_)); list_head_ = new_node; list_tail_ = new_node; } else {// insert after tail - OB_ASSERT(OB_ISNULL(list_tail_->next_)); - OB_ASSERT(OB_NOT_NULL(new_node)); - OB_ASSERT(OB_ISNULL(new_node->prev_)); - OB_ASSERT(OB_ISNULL(new_node->next_)); + MDS_ASSERT(OB_ISNULL(list_tail_->next_)); + MDS_ASSERT(OB_NOT_NULL(new_node)); + MDS_ASSERT(OB_ISNULL(new_node->prev_)); + MDS_ASSERT(OB_ISNULL(new_node->next_)); list_tail_->next_ = new_node; new_node->prev_ = list_tail_; new_node->next_ = nullptr; @@ -120,11 +121,11 @@ struct ListBase// erase List Type check_invariance_(); } void del(ListNodeBase *list_node) { - OB_ASSERT(OB_NOT_NULL(list_node)); + MDS_ASSERT(OB_NOT_NULL(list_node)); ListNodeBase *before = list_node->prev_; ListNodeBase *after = list_node->next_; if (OB_ISNULL(before)) {// the first node - OB_ASSERT(list_head_ == list_node); + MDS_ASSERT(list_head_ == list_node); list_head_ = after; if (OB_NOT_NULL(list_head_)) { list_head_->prev_ = nullptr; @@ -133,7 +134,7 @@ struct ListBase// erase List Type before->next_ = list_node->next_; } if (OB_ISNULL(after)) {// the last node - OB_ASSERT(list_tail_ == list_node); + MDS_ASSERT(list_tail_ == list_node); list_tail_ = before; if (OB_NOT_NULL(list_tail_)) { list_tail_->next_ = nullptr; @@ -207,16 +208,16 @@ public: return head; } void insert_into_head(ListNode *new_node) { - OB_ASSERT(OB_NOT_NULL(new_node)); + MDS_ASSERT(OB_NOT_NULL(new_node)); new_node->reset(); if (OB_ISNULL(list_head_)) {// insert into head - OB_ASSERT(OB_ISNULL(list_tail_)); + MDS_ASSERT(OB_ISNULL(list_tail_)); list_head_ = new_node; list_tail_ = new_node; } else {// insert before head - OB_ASSERT(OB_ISNULL(list_head_->prev_)); - OB_ASSERT(OB_ISNULL(new_node->prev_)); - OB_ASSERT(OB_ISNULL(new_node->next_)); + MDS_ASSERT(OB_ISNULL(list_head_->prev_)); + MDS_ASSERT(OB_ISNULL(new_node->prev_)); + MDS_ASSERT(OB_ISNULL(new_node->next_)); list_head_->prev_ = new_node; new_node->next_ = list_head_; new_node->prev_ = nullptr; @@ -251,9 +252,9 @@ public: SortedList &operator=(const SortedList &) = delete; SortedList &operator=(SortedList &&) = delete; void insert(ListNode *new_node) { - OB_ASSERT(OB_NOT_NULL(new_node)); - OB_ASSERT(OB_ISNULL(new_node->next_)); - OB_ASSERT(OB_ISNULL(new_node->prev_)); + MDS_ASSERT(OB_NOT_NULL(new_node)); + MDS_ASSERT(OB_ISNULL(new_node->next_)); + MDS_ASSERT(OB_ISNULL(new_node->prev_)); if (ListBase::empty()) { ListBase::list_head_ = new_node; ListBase::list_tail_ = new_node; @@ -276,7 +277,7 @@ public: new_node->prev_ = ListBase::list_tail_; ListBase::list_tail_ = new_node; } else if (OB_ISNULL(next_node->prev_)) {// insert to head - OB_ASSERT(ListBase::list_head_ == next_node); + MDS_ASSERT(ListBase::list_head_ == next_node); ListBase::list_head_->prev_ = new_node; new_node->next_ = ListBase::list_head_; ListBase::list_head_ = new_node; @@ -291,12 +292,12 @@ public: } T &get_head() { T *data = nullptr; - OB_ASSERT(OB_NOT_NULL(data = dynamic_cast(ListBase::list_head_))); + MDS_ASSERT(OB_NOT_NULL(data = dynamic_cast(ListBase::list_head_))); return *data; } T &get_tail() { T *data = nullptr; - OB_ASSERT(OB_NOT_NULL(data = dynamic_cast(ListBase::list_tail_))); + MDS_ASSERT(OB_NOT_NULL(data = dynamic_cast(ListBase::list_tail_))); return *data; } }; diff --git a/src/storage/multi_data_source/runtime_utility/mds_factory.cpp b/src/storage/multi_data_source/runtime_utility/mds_factory.cpp index 629d4707e6..ea47627703 100644 --- a/src/storage/multi_data_source/runtime_utility/mds_factory.cpp +++ b/src/storage/multi_data_source/runtime_utility/mds_factory.cpp @@ -58,7 +58,7 @@ int deepcopy(const transaction::ObTransID &trans_id, using ImplType = GET_CTX_TYPE_BY_TUPLE_IDX(IDX); ImplType *p_impl = nullptr; const ImplType *p_old_impl_ctx = dynamic_cast(&old_ctx); - OB_ASSERT(OB_NOT_NULL(p_old_impl_ctx)); + MDS_ASSERT(OB_NOT_NULL(p_old_impl_ctx)); const ImplType &old_impl_ctx = *p_old_impl_ctx; set_mds_mem_check_thread_local_info(MdsWriter(trans_id), typeid(ImplType).name(), alloc_file, alloc_func, line); if (CLICK() && diff --git a/src/storage/multi_data_source/runtime_utility/mds_tenant_service.cpp b/src/storage/multi_data_source/runtime_utility/mds_tenant_service.cpp index 1d9e749854..79433ddb7d 100644 --- a/src/storage/multi_data_source/runtime_utility/mds_tenant_service.cpp +++ b/src/storage/multi_data_source/runtime_utility/mds_tenant_service.cpp @@ -17,6 +17,7 @@ #include "lib/string/ob_string_holder.h" #include "lib/time/ob_time_utility.h" #include "lib/utility/utility.h" +#include "ob_clock_generator.h" #include "share/rc/ob_tenant_base.h" #include "storage/meta_mem/ob_tablet_map_key.h" #include "storage/meta_mem/ob_tenant_meta_mem_mgr.h" @@ -256,10 +257,10 @@ void ObTenantMdsTimer::try_recycle_mds_table_task() void ObTenantMdsTimer::dump_special_mds_table_status_task() { - #define PRINT_WRAPPER KR(ret), KPC(this) + #define PRINT_WRAPPER KR(ret) MDS_TG(1_s); ObCurTraceId::init(GCONF.self_addr_); - ObTenantMdsService::for_each_ls_in_tenant([this](ObLS &ls) -> int { + ObTenantMdsService::for_each_ls_in_tenant([](ObLS &ls) -> int { int ret = OB_SUCCESS; MDS_TG(1_s); MdsTableMgrHandle mds_table_mge_handle; @@ -268,24 +269,25 @@ void ObTenantMdsTimer::dump_special_mds_table_status_task() MDS_LOG_NONE(WARN, "fail to get mds table mgr"); } else if (FALSE_IT(ls_mds_freezing_scn = mds_table_mge_handle.get_mds_table_mgr()->get_freezing_scn())) { } else { - ObTenantMdsService::for_each_mds_table_in_ls(ls, [this, ls_mds_freezing_scn](ObTablet &tablet) -> int { - int ret = OB_SUCCESS; - MDS_TG(1_s); - MdsTableHandle mds_table_handle; - share::SCN rec_scn; - const ObTablet::ObTabletPointerHandle &pointer_handle = tablet.get_pointer_handle(); - ObMetaPointer *resource_ptr = pointer_handle.get_resource_ptr(); - ObTabletPointer *tablet_pointer = dynamic_cast(resource_ptr); - if (OB_FAIL(tablet_pointer->get_mds_table(mds_table_handle))) { - if (OB_ENTRY_NOT_EXIST != ret) { - MDS_LOG_NONE(WARN, "fail to get mds table", K(tablet.get_tablet_meta().tablet_id_)); + (void)mds_table_mge_handle.get_mds_table_mgr()->for_each_in_t3m_mds_table([ls_mds_freezing_scn](MdsTableBase &mds_table) -> int {// with hash map bucket's lock protected + (void) mds_table.operate([ls_mds_freezing_scn](MdsTableBase &mds_table)-> int {// with MdsTable's lock protected + int ret = OB_SUCCESS; + if (mds_table.get_rec_scn() <= ls_mds_freezing_scn) { + MDS_LOG_NOTICE(WARN, "dump rec_scn lagging freeze_scn mds_table", K(ls_mds_freezing_scn), K(mds_table)); } - } else if (OB_FAIL(mds_table_handle.get_rec_scn(rec_scn))) { - MDS_LOG_NONE(WARN, "fail to get mds table rec_scn", K(tablet.get_tablet_meta().tablet_id_)); - } else if (rec_scn <= ls_mds_freezing_scn) { - mds_table_handle.dump_status(); - } - return OB_SUCCESS;// keep doing ignore error + return OB_SUCCESS;// keep iterating + }); + return OB_SUCCESS;// keep iterating + }); + (void)mds_table_mge_handle.get_mds_table_mgr()->for_each_removed_mds_table([](MdsTableBase &mds_table) -> int { + (void) mds_table.operate([](MdsTableBase &mds_table)-> int {// with MdsTable's lock protected + int ret = OB_SUCCESS; + if (ObClockGenerator::getClock() - mds_table.get_removed_from_t3m_ts() > 1_min) { + MDS_LOG_NOTICE(WARN, "dump maybe leaked mds_table", K(mds_table)); + } + return OB_SUCCESS;// keep iterating + }); + return OB_SUCCESS;// keep iterating }); }; return OB_SUCCESS;// keep doing ignore error @@ -378,51 +380,18 @@ int ObTenantMdsService::for_each_mds_table_in_ls(ObLS &ls, const ObFunction ids_in_t3m_array; if (MDS_FAIL(ls.get_mds_table_mgr(mgr_handle))) { MDS_LOG_NONE(WARN, "fail to get mds table mgr"); - } else if (MDS_FAIL(mgr_handle.get_mds_table_mgr()->for_each_mds_table_list( - [&mds_table_total_num, &ids_in_t3m_array, &ls](const ObTabletID &tablet_id, List &mds_table_list) -> bool {// with map's bucket lock protected + } else if (MDS_FAIL(mgr_handle.get_mds_table_mgr()->for_each_in_t3m_mds_table( + [&mds_table_total_num, &ids_in_t3m_array, &ls](MdsTableBase &mds_table) -> int {// with map's bucket lock protected MDS_TG(1_s); int ret = OB_SUCCESS; - if (mds_table_list.empty()) { - ret = OB_ERR_UNEXPECTED; - MDS_LOG_NONE(ERROR, "meet empty mds_table_list", K(tablet_id)); - } else { - MdsTableBase *p_mds_table = static_cast(mds_table_list.list_head_); - if (!p_mds_table->is_removed_from_t3m()) { - MDS_LOG_NONE(TRACE, "process with mds_table", KPC(p_mds_table)); - if (MDS_FAIL(ids_in_t3m_array.push_back(p_mds_table->get_tablet_id()))) { - MDS_LOG_NONE(WARN, "fail to push array", KPC(p_mds_table)); - } - } else { - MDS_LOG_NONE(WARN, "consider mds_table leaked if this log keep printing", KPC(p_mds_table)); - } - ++mds_table_total_num; - MdsTableBase *iter = static_cast(p_mds_table->next_); - while (OB_NOT_NULL(iter)) { - if (ObTimeUtility::fast_current_time() - iter->get_removed_from_t3m_ts() > 5_min) { - MDS_LOG_NONE(WARN, "consider mds_table leaked if this log keep printing", KPC(iter)); - } - ++mds_table_total_num; - iter = static_cast(iter->next_); - } + if (MDS_FAIL(ids_in_t3m_array.push_back(mds_table.get_tablet_id()))) { + MDS_LOG_NONE(WARN, "fail to push array"); } - return true;// keep iter + return ret; } ))) { MDS_LOG_NONE(WARN, "fail to scan mds_table"); } else { - MDS_LOG_NONE(INFO, "succ to scan mds_table"); - constexpr int64_t PRINT_SIZE = 128; - // print those in t3m array ids, and if ids too many, print random part of them - if (ids_in_t3m_array.count() < PRINT_SIZE) { - MDS_LOG_NONE(INFO, "dump tablet ids in t3m", K(ids_in_t3m_array)); - } else { - int64_t random_start = rand() % ids_in_t3m_array.count(); - ObSEArray print_array; - for (int64_t idx = 0; idx < PRINT_SIZE; ++idx) { - print_array.push_back(ids_in_t3m_array[(random_start + idx) % ids_in_t3m_array.count()]); - } - MDS_LOG_NONE(INFO, "dump random part tablet ids in t3m", K(print_array), K(ids_in_t3m_array.count()), K(PRINT_SIZE)); - } for (int64_t idx = 0; idx < ids_in_t3m_array.count(); ++idx) {// ignore ret ObTabletHandle tablet_handle; if (OB_FAIL(ls.get_tablet(ids_in_t3m_array[idx], tablet_handle, 1_s, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { diff --git a/unittest/storage/multi_data_source/example_user_data_define.h b/unittest/storage/multi_data_source/example_user_data_define.h index f2455f30bf..97aeba977a 100644 --- a/unittest/storage/multi_data_source/example_user_data_define.h +++ b/unittest/storage/multi_data_source/example_user_data_define.h @@ -6,6 +6,7 @@ #include "lib/utility/serialization.h" #include "meta_programming/ob_meta_serialization.h" #include "common_define.h" + namespace oceanbase { namespace unittest { struct ExampleUserKey { diff --git a/unittest/storage/multi_data_source/test_mds_dump_kv.cpp b/unittest/storage/multi_data_source/test_mds_dump_kv.cpp index 45239c5967..5518f511ae 100644 --- a/unittest/storage/multi_data_source/test_mds_dump_kv.cpp +++ b/unittest/storage/multi_data_source/test_mds_dump_kv.cpp @@ -95,7 +95,7 @@ void TestMdsDumpKV::test_dump_node_convert_and_serialize_and_compare() ObArenaAllocator allocator; ASSERT_EQ(OB_SUCCESS, dump_node2.deserialize(allocator, buffer, buf_len, pos)); - OB_ASSERT(dump_node2.crc_check_number_ == dump_node2.generate_hash()); + MDS_ASSERT(dump_node2.crc_check_number_ == dump_node2.generate_hash()); ASSERT_EQ(dump_node2.generate_hash(), dump_node2.crc_check_number_); ASSERT_EQ(dump_node2.crc_check_number_, dump_node.crc_check_number_); UserMdsNode user_mds_node2; diff --git a/unittest/storage/multi_data_source/test_mds_table.cpp b/unittest/storage/multi_data_source/test_mds_table.cpp index af2342fe50..6b41a59cf7 100644 --- a/unittest/storage/multi_data_source/test_mds_table.cpp +++ b/unittest/storage/multi_data_source/test_mds_table.cpp @@ -139,7 +139,7 @@ void TestMdsTable::replay() { share::SCN recorde_scn = mock_scn(0); unit.single_row_.v_.sorted_list_.for_each_node_from_tail_to_head_until_true([&](const UserMdsNode &node) -> int { if (!node.is_aborted_()) { - OB_ASSERT(node.redo_scn_ > recorde_scn); + MDS_ASSERT(node.redo_scn_ > recorde_scn); recorde_scn = node.redo_scn_; } return OB_SUCCESS; @@ -383,8 +383,8 @@ void TestMdsTable::test_flush() { ASSERT_EQ(mock_scn(199), mds_table_.p_mds_table_base_->flushing_scn_);// 2. 实际上以199为版本号进行flush动作 ASSERT_EQ(OB_SUCCESS, mds_table_.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump( [&idx](const MdsDumpKV &kv) -> int {// 2. 转储时扫描mds table - OB_ASSERT(kv.v_.end_scn_ < mock_scn(199));// 扫描时看不到199版本以上的提交 - OB_ASSERT(idx < 10); + MDS_ASSERT(kv.v_.end_scn_ < mock_scn(199));// 扫描时看不到199版本以上的提交 + MDS_ASSERT(idx < 10); MDS_LOG(INFO, "print dump node kv", K(kv)); return OB_SUCCESS; }, 0, true)