From e09b341e60c790b850c0c72cfa0cb2530b4393f3 Mon Sep 17 00:00:00 2001 From: fengdeyiji <546976189@qq.com> Date: Mon, 24 Jun 2024 05:40:47 +0000 Subject: [PATCH] [DeadLock] add request stat machine and meesage member occupy --- deps/oblib/src/rpc/ob_lock_wait_node.cpp | 2 +- deps/oblib/src/rpc/ob_lock_wait_node.h | 81 ++++++++++++++++++++++ src/observer/ob_srv_deliver.cpp | 8 +++ src/observer/omt/ob_th_worker.cpp | 3 + src/share/deadlock/ob_deadlock_message.cpp | 2 +- src/share/deadlock/ob_deadlock_message.h | 4 +- src/sql/ob_sql_trans_control.cpp | 3 + src/storage/memtable/ob_lock_wait_mgr.cpp | 11 +++ src/storage/memtable/ob_lock_wait_mgr.h | 8 +++ 9 files changed, 119 insertions(+), 3 deletions(-) diff --git a/deps/oblib/src/rpc/ob_lock_wait_node.cpp b/deps/oblib/src/rpc/ob_lock_wait_node.cpp index 6f32fe964d..a5af11f21e 100644 --- a/deps/oblib/src/rpc/ob_lock_wait_node.cpp +++ b/deps/oblib/src/rpc/ob_lock_wait_node.cpp @@ -18,7 +18,7 @@ namespace rpc { ObLockWaitNode::ObLockWaitNode() : - hold_key_(0), need_wait_(false), addr_(NULL), recv_ts_(0), lock_ts_(0), lock_seq_(0), + hold_key_(0), need_wait_(false), request_stat_(), addr_(NULL), recv_ts_(0), lock_ts_(0), lock_seq_(0), abs_timeout_(0), tablet_id_(common::OB_INVALID_ID), try_lock_times_(0), sessid_(0), holder_sessid_(0), block_sessid_(0), tx_id_(0), holder_tx_id_(0), run_ts_(0), is_standalone_task_(false), last_compact_cnt_(0), total_update_cnt_(0) {} diff --git a/deps/oblib/src/rpc/ob_lock_wait_node.h b/deps/oblib/src/rpc/ob_lock_wait_node.h index eb41ac89e4..22c19688b8 100644 --- a/deps/oblib/src/rpc/ob_lock_wait_node.h +++ b/deps/oblib/src/rpc/ob_lock_wait_node.h @@ -14,8 +14,11 @@ #define OCEANBASE_RPC_OB_LOCK_WAIT_NODE_ #include "lib/hash/ob_fixed_hash2.h" +#include "lib/list/ob_dlist.h" +#include "lib/ob_errno.h" #include "lib/utility/ob_print_utils.h" #include "lib/time/ob_time_utility.h" +#include namespace oceanbase { @@ -26,6 +29,79 @@ class ObLSID; namespace rpc { +struct RequestLockWaitStat { + enum RequestStat : uint8_t { + DEFAULT = 0, + EXECUTE = 1, + START = 2, + CONFLICTED = 3, + END = 4, + INQUEUE = 5, + OUTQUEUE = 6, + SIZE + }; + const char *stat_to_string(RequestStat stat) const { + const char *ret = nullptr; + switch (state_) { + case RequestStat::DEFAULT: + ret = "DEFAULT"; + break; + case RequestStat::EXECUTE: + ret = "EXECUTE"; + break; + case RequestStat::START: + ret = "START"; + break; + case RequestStat::CONFLICTED: + ret = "CONFLICTED"; + break; + case RequestStat::END: + ret = "END"; + break; + case RequestStat::INQUEUE: + ret = "INQUEUE"; + break; + case RequestStat::OUTQUEUE: + ret = "OUTQUEUE"; + break; + default: + ret = "UNKNOWN"; + break; + } + return ret; + } + static constexpr bool ConvertMap[RequestStat::SIZE][RequestStat::SIZE] = { + /*DEFAULT*/ /*EXECUTE*/ /*START*/ /*CONFLICTED*/ /*END*/ /*INQUEUE*/ /*OUTQUEUE*/ + /*DEFAULT*/ { 1, 1, 0, 0, 0, 0, 0},// from DEFAULT, only allowed to EXECUTE + /*EXECUTE*/ { 0, 1, 1, 1, 0, 0, 0},// from EXECUTE, only allowed to START, there maybe switch to CONFLICTED without START in test for some unknown reason + /*START*/ { 0, 1, 1, 1, 1, 0, 0},// from START, only allowed to CONFLICTED/END, there maybe switch to START in test for some unknown reason + /*CONFLICTED*/ { 0, 0, 0, 1, 1, 1, 0},// from CONFLICTED, only allowed to END, there maybe switch to INQUEUE in test for some unknown reason + /*END*/ { 0, 1, 1, 1, 1, 1, 0},// from END, allowed to EXECUTE/START/INQUEUE, there maybe switch to CONFLICTED without START in test for some unknown reason + /*INQUEUE*/ { 0, 0, 0, 0, 0, 1, 1},// from INQUEUE, only allowed to OUTQUEUE + /*OUTQUEUE*/ { 0, 1, 0, 0, 0, 0, 1},// from UTQUEUE, only allowed to EXECUTE + }; + RequestLockWaitStat() : state_(RequestStat::DEFAULT) {} + void advance_to(RequestStat new_stat) { + if (OB_UNLIKELY(!ConvertMap[state_][new_stat])) { +#ifdef OB_BUILD_PACKAGE // serious env, just WARN + DETECT_LOG_RET(WARN, OB_ERR_UNEXPECTED, "stat advance unexpected", K(state_), K(new_stat), KP(this)); +#else // test env, print ERROR and abort if necessary + DETECT_LOG_RET(WARN, OB_ERR_UNEXPECTED, "stat advance unexpected", K(state_), K(new_stat), KP(this), K(lbt())); + if (RequestStat::INQUEUE == state_) { + ob_abort();// this is not expectecd, something is beyond control and break important assumption. + } +#endif + } + state_ = new_stat; + } + int64_t to_string(char *buffer, const int64_t buffer_len) const { + int64_t pos = 0; + databuff_printf(buffer, buffer_len, pos, "%ld(%s)", (int64_t)state_, stat_to_string(state_)); + return pos; + } + RequestStat state_; +}; + struct ObLockWaitNode: public common::SpHashNode { ObLockWaitNode(); @@ -66,8 +142,12 @@ struct ObLockWaitNode: public common::SpHashNode UNUSED(ret); } void set_block_sessid(const uint32_t block_sessid) { block_sessid_ = block_sessid; } + void advance_stat(RequestLockWaitStat::RequestStat new_stat) { + request_stat_.advance_to(new_stat); + } TO_STRING_KV(KP(this), + K_(request_stat), KP_(addr), K_(hash), K_(lock_ts), @@ -91,6 +171,7 @@ struct ObLockWaitNode: public common::SpHashNode uint64_t hold_key_; ObLink retire_link_; bool need_wait_; + RequestLockWaitStat request_stat_; void* addr_; int64_t recv_ts_; int64_t lock_ts_; diff --git a/src/observer/ob_srv_deliver.cpp b/src/observer/ob_srv_deliver.cpp index acbe59b779..a0d70be00a 100644 --- a/src/observer/ob_srv_deliver.cpp +++ b/src/observer/ob_srv_deliver.cpp @@ -769,6 +769,14 @@ int ObSrvDeliver::repost(void* p) int ObSrvDeliver::deliver(rpc::ObRequest &req) { int ret = OB_SUCCESS; + RequestLockWaitStat::RequestStat req_stat = req.lock_wait_node_.request_stat_.state_; + if (OB_UNLIKELY(req_stat == RequestLockWaitStat::RequestStat::INQUEUE)) { +#ifdef OB_BUILD_PACKAGE // serious env, just WARN + LOG_WARN("deliver rpc request in unexpected state", KP(&req), K(req_stat)); +#else + LOG_WARN("deliver rpc request in unexpected state", KP(&req), K(req_stat), K(lbt())); +#endif + } LOG_DEBUG("deliver ob_request:", K(req)); if (ObRequest::OB_RPC == req.get_type()) { if (OB_FAIL(deliver_rpc_request(req))) { diff --git a/src/observer/omt/ob_th_worker.cpp b/src/observer/omt/ob_th_worker.cpp index d0cccefcd5..b2ddaa4686 100644 --- a/src/observer/omt/ob_th_worker.cpp +++ b/src/observer/omt/ob_th_worker.cpp @@ -10,6 +10,8 @@ * See the Mulan PubL v2 for more details. */ +#include "rpc/ob_lock_wait_node.h" +#include "share/rc/ob_tenant_base.h" #define USING_LOG_PREFIX SERVER_OMT #include "ob_th_worker.h" @@ -242,6 +244,7 @@ inline void ObThWorker::process_request(rpc::ObRequest &req) set_req_flag(&req); MTL(memtable::ObLockWaitMgr*)->setup(req.get_lock_wait_node(), req.get_receive_timestamp()); + memtable::advance_tlocal_request_lock_wait_stat(rpc::RequestLockWaitStat::RequestStat::EXECUTE); if (OB_FAIL(procor_.process(req))) { LOG_WARN("process request fail", K(ret)); } diff --git a/src/share/deadlock/ob_deadlock_message.cpp b/src/share/deadlock/ob_deadlock_message.cpp index 529e85e91c..24f5d88715 100644 --- a/src/share/deadlock/ob_deadlock_message.cpp +++ b/src/share/deadlock/ob_deadlock_message.cpp @@ -23,7 +23,7 @@ using namespace common; OB_SERIALIZE_MEMBER(ObDeadLockCollectInfoMessage, dest_key_, collected_info_); OB_SERIALIZE_MEMBER(ObDeadLockNotifyParentMessage, parent_addr_, parent_key_, - src_addr_, src_key_); + src_addr_, src_key_, action_); int ObDeadLockCollectInfoMessage::append(const ObDetectorInnerReportInfo &info) { diff --git a/src/share/deadlock/ob_deadlock_message.h b/src/share/deadlock/ob_deadlock_message.h index 50c309356d..d07d618ee3 100644 --- a/src/share/deadlock/ob_deadlock_message.h +++ b/src/share/deadlock/ob_deadlock_message.h @@ -14,6 +14,7 @@ #define OCEANBASE_SHARE_DEADLOCK_OB_DEADLOCK_COLLECT_INFO_MESSAGE_H #include "ob_deadlock_detector_common_define.h" +#include "lib/string/ob_string_holder.h" namespace oceanbase { @@ -57,12 +58,13 @@ public: const common::ObAddr &get_src_addr() const { return src_addr_; } const UserBinaryKey &get_src_key() const { return src_key_; } bool is_valid() const; - TO_STRING_KV(K_(parent_addr), K_(parent_key), K_(src_addr), K_(src_key)); + TO_STRING_KV(K_(parent_addr), K_(parent_key), K_(src_addr), K_(src_key), K_(action)); private: common::ObAddr parent_addr_; UserBinaryKey parent_key_; common::ObAddr src_addr_; UserBinaryKey src_key_; + ObStringHolder action_; }; } diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index fe8918aa68..b4018d7c7b 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -42,6 +42,7 @@ #include "sql/das/ob_das_dml_ctx_define.h" #include "share/deadlock/ob_deadlock_detector_mgr.h" #include "sql/engine/cmd/ob_table_direct_insert_ctx.h" +#include "storage/memtable/ob_lock_wait_mgr.h" #ifdef CHECK_SESSION #error "redefine macro CHECK_SESSION" @@ -526,6 +527,7 @@ int ObSqlTransControl::decide_trans_read_interface_specs( int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx) { int ret = OB_SUCCESS; + memtable::advance_tlocal_request_lock_wait_stat(rpc::RequestLockWaitStat::RequestStat::START); DISABLE_SQL_MEMLEAK_GUARD; ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx); ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(exec_ctx); @@ -1261,6 +1263,7 @@ int ObSqlTransControl::end_stmt(ObExecContext &exec_ctx, const bool rollback) if (OB_NOT_NULL(session)) { session->get_trans_result().reset(); } + memtable::advance_tlocal_request_lock_wait_stat(rpc::RequestLockWaitStat::RequestStat::END); return ret; } diff --git a/src/storage/memtable/ob_lock_wait_mgr.cpp b/src/storage/memtable/ob_lock_wait_mgr.cpp index 902c2fa8de..4620b1802d 100644 --- a/src/storage/memtable/ob_lock_wait_mgr.cpp +++ b/src/storage/memtable/ob_lock_wait_mgr.cpp @@ -247,7 +247,11 @@ bool ObLockWaitMgr::post_process(bool need_retry, bool& need_wait) } else { DETECT_LOG(TRACE, "register to deadlock detector success", K(tmp_ret), K(*node)); } + advance_tlocal_request_lock_wait_stat(rpc::RequestLockWaitStat::RequestStat::INQUEUE); wait_succ = wait(node); + if (!wait_succ) { + advance_tlocal_request_lock_wait_stat(rpc::RequestLockWaitStat::RequestStat::OUTQUEUE); + } if (OB_UNLIKELY(!wait_succ && (OB_SUCCESS == tmp_ret))) { (void) ObTransDeadlockDetectorAdapter::unregister_from_deadlock_detector(self_tx_id, ObTransDeadlockDetectorAdapter:: @@ -255,7 +259,11 @@ bool ObLockWaitMgr::post_process(bool need_retry, bool& need_wait) LOCK_WAIT_MGR_WAIT_FAILED); } } else { + advance_tlocal_request_lock_wait_stat(rpc::RequestLockWaitStat::RequestStat::INQUEUE); wait_succ = wait(node); + if (!wait_succ) { + advance_tlocal_request_lock_wait_stat(rpc::RequestLockWaitStat::RequestStat::OUTQUEUE); + } } if (OB_UNLIKELY(!wait_succ)) { TRANS_LOG_RET(WARN, tmp_ret, "fail to wait node", KR(tmp_ret), KPC(node)); @@ -628,6 +636,7 @@ int ObLockWaitMgr::post_lock(const int tmp_ret, holder_tx_id, ls_id); node->set_need_wait(); + advance_tlocal_request_lock_wait_stat(rpc::RequestLockWaitStat::RequestStat::CONFLICTED); } } } @@ -698,6 +707,7 @@ int ObLockWaitMgr::post_lock(const int tmp_ret, ls_id); node->set_need_wait(); node->set_lock_mode(lock_mode); + advance_tlocal_request_lock_wait_stat(rpc::RequestLockWaitStat::RequestStat::CONFLICTED); } } } @@ -718,6 +728,7 @@ int ObLockWaitMgr::repost(Node* node) ObTransDeadlockDetectorAdapter:: UnregisterPath:: LOCK_WAIT_MGR_REPOST); + node->advance_stat(rpc::RequestLockWaitStat::RequestStat::OUTQUEUE); if (OB_FAIL(OBSERVER.get_net_frame().get_deliver().repost((void*)node))) { TRANS_LOG(WARN, "report error", K(ret)); } diff --git a/src/storage/memtable/ob_lock_wait_mgr.h b/src/storage/memtable/ob_lock_wait_mgr.h index 2cb796cb1e..58f34a698f 100644 --- a/src/storage/memtable/ob_lock_wait_mgr.h +++ b/src/storage/memtable/ob_lock_wait_mgr.h @@ -23,6 +23,7 @@ #include "lib/utility/utility.h" #include "ob_memtable_key.h" #include "observer/ob_server_struct.h" +#include "rpc/ob_lock_wait_node.h" #include "rpc/ob_request.h" #include "share/deadlock/ob_deadlock_detector_common_define.h" #include "share/ob_thread_pool.h" @@ -403,6 +404,13 @@ public: bool is_table_lock_hash(const uint64_t hash) { return (hash & ~HASH_MASK) == TABLE_LOCK_FLAG; } }; +inline void advance_tlocal_request_lock_wait_stat(rpc::RequestLockWaitStat::RequestStat new_stat) { + rpc::ObLockWaitNode *node = memtable::ObLockWaitMgr::get_thread_node(); + if (OB_NOT_NULL(node)) { + node->advance_stat(new_stat); + } +} + }; // end namespace memtable }; // end namespace oceanbase