diff --git a/src/storage/tablelock/ob_mem_ctx_table_lock.h b/src/storage/tablelock/ob_mem_ctx_table_lock.h index 63a9d90d36..868c2b087d 100644 --- a/src/storage/tablelock/ob_mem_ctx_table_lock.h +++ b/src/storage/tablelock/ob_mem_ctx_table_lock.h @@ -49,8 +49,22 @@ public: public: ObTableLockOp lock_op_; }; - typedef common::ObDList ObLockNodeList; + +class ObMemCtxLockPrioOpLinkNode : public common::ObDLinkBase +{ +public: + ObMemCtxLockPrioOpLinkNode() + : prio_op_() + {} + int init(const ObTableLockOp &op_info, const ObTableLockPriority priority); + bool is_valid() const { return prio_op_.is_valid(); } + TO_STRING_KV(K_(prio_op)); +public: + ObTableLockPrioOp prio_op_; +}; +typedef common::ObDList ObPrioLockNodeList; + class ObLockMemCtx { using RWLock = common::SpinRWLock; diff --git a/src/storage/tablelock/ob_table_lock_common.cpp b/src/storage/tablelock/ob_table_lock_common.cpp index 9ea5e60c3f..ad659429f0 100644 --- a/src/storage/tablelock/ob_table_lock_common.cpp +++ b/src/storage/tablelock/ob_table_lock_common.cpp @@ -306,6 +306,10 @@ OB_SERIALIZE_MEMBER(ObTableLockInfo, table_lock_ops_, max_durable_scn_); +OB_SERIALIZE_MEMBER(ObTableLockPrioOp, + lock_op_, + priority_); + } // tablelock } // transaction } // oceanbase diff --git a/src/storage/tablelock/ob_table_lock_common.h b/src/storage/tablelock/ob_table_lock_common.h index e9c5aaf200..84d213e9f8 100644 --- a/src/storage/tablelock/ob_table_lock_common.h +++ b/src/storage/tablelock/ob_table_lock_common.h @@ -32,6 +32,14 @@ namespace transaction { namespace tablelock { +struct ObObjLockPriorityTaskID +{ + explicit ObObjLockPriorityTaskID(const int64_t trans_id_value) + : trans_id_value_(trans_id_value) {} + TO_STRING_KV(K_(trans_id_value)); + int64_t trans_id_value_; +}; + enum class ObTableLockPriority : int8_t { INVALID = -1, @@ -651,6 +659,41 @@ public: share::SCN max_durable_scn_; }; +struct ObTableLockPrioOp +{ + OB_UNIS_VERSION(1); +public: + ObTableLockPrioOp() + : lock_op_(), priority_(ObTableLockPriority::INVALID) + {} + ObTableLockPrioOp(const ObTableLockPriority priority, const ObTableLockOp &lock_op) + : lock_op_(lock_op), priority_(priority) + {} + bool is_valid() const + { return ObTableLockPriority::INVALID != priority_ && lock_op_.is_valid(); } +public: + TO_STRING_KV(K_(lock_op), K_(priority)); +public: + ObTableLockOp lock_op_; + ObTableLockPriority priority_; +}; + +typedef common::ObSEArray ObTableLockPrioOpArray; + +struct ObTableLockPrioArg +{ +public: + ObTableLockPrioArg() + : priority_(ObTableLockPriority::INVALID) {} + explicit ObTableLockPrioArg(const ObTableLockPriority &priority) + : priority_(priority) {} + bool is_valid() const { return ObTableLockPriority::INVALID != priority_; } +public: + TO_STRING_KV(K_(priority)); +public: + ObTableLockPriority priority_; +}; + static inline bool is_need_retry_unlock_error(int err) { diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 117a414777..bba2f2235d 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -3390,6 +3390,7 @@ int ObPartTransCtx::submit_redo_active_info_log_() ObTxLogBlock log_block; bool has_redo = false; ObRedoLogSubmitHelper helper; + ObTableLockPrioOpArray prio_op_array; if (OB_FAIL(submit_redo_if_parallel_logging_())) { } else if (OB_FAIL(init_log_block_(log_block))) { TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this)); @@ -3416,7 +3417,8 @@ int ObPartTransCtx::submit_redo_active_info_log_() last_scn_, exec_info_.max_submitted_seq_no_, cluster_version_, exec_info_.xid_, - exec_info_.serial_final_seq_no_); + exec_info_.serial_final_seq_no_, + prio_op_array); ObTxLogCb *log_cb = nullptr; if (OB_FAIL(prepare_log_cb_(!NEED_FINAL_CB, log_cb))) { TRANS_LOG(WARN, "get log cb failed", KR(ret), KP(log_cb), K(*this)); diff --git a/src/storage/tx/ob_tx_log.cpp b/src/storage/tx/ob_tx_log.cpp index a9577e7fc7..912b2902c2 100644 --- a/src/storage/tx/ob_tx_log.cpp +++ b/src/storage/tx/ob_tx_log.cpp @@ -300,7 +300,8 @@ OB_TX_SERIALIZE_MEMBER(ObTxActiveInfoLog, /* 17 */ max_submitted_seq_no_, /* 18 */ xid_, /* 19 */ serial_final_seq_no_, - /* 20 */ associated_session_id_); + /* 20 */ associated_session_id_, + /* 21 */ prio_op_array_); OB_TX_SERIALIZE_MEMBER(ObTxCommitInfoLog, compat_bytes_, @@ -365,7 +366,7 @@ int ObTxActiveInfoLog::before_serialize() TRANS_LOG(WARN, "reset all compat_bytes_ valid failed", K(ret)); } } else { - if (OB_FAIL(compat_bytes_.init(19))) { + if (OB_FAIL(compat_bytes_.init(21))) { TRANS_LOG(WARN, "init compat_bytes_ failed", K(ret)); } } @@ -390,6 +391,8 @@ int ObTxActiveInfoLog::before_serialize() TX_NO_NEED_SER(!max_submitted_seq_no_.is_valid(), 17, compat_bytes_); TX_NO_NEED_SER(xid_.empty(), 18, compat_bytes_); TX_NO_NEED_SER(!serial_final_seq_no_.is_valid(), 19, compat_bytes_); + TX_NO_NEED_SER(associated_session_id_ == 0, 20, compat_bytes_); + TX_NO_NEED_SER(prio_op_array_.empty(), 21, compat_bytes_); } return ret; diff --git a/src/storage/tx/ob_tx_log.h b/src/storage/tx/ob_tx_log.h index 92471bdd27..439d8cbfcd 100644 --- a/src/storage/tx/ob_tx_log.h +++ b/src/storage/tx/ob_tx_log.h @@ -643,13 +643,15 @@ private: class ObTxActiveInfoLogTempRef { public: - ObTxActiveInfoLogTempRef() : scheduler_(), app_trace_id_str_(), proposal_leader_(), xid_() {} + ObTxActiveInfoLogTempRef() + : scheduler_(), app_trace_id_str_(), proposal_leader_(), xid_(), prio_op_array_() {} public: common::ObAddr scheduler_; common::ObString app_trace_id_str_; common::ObAddr proposal_leader_; ObXATransID xid_; + tablelock::ObTableLockPrioOpArray prio_op_array_; }; class ObTxActiveInfoLog @@ -663,7 +665,7 @@ public: proposal_leader_(temp_ref.proposal_leader_), cur_query_start_time_(0), is_sub2pc_(false), is_dup_tx_(false), tx_expired_time_(0), epoch_(0), last_op_sn_(0), first_seq_no_(), last_seq_no_(), max_submitted_seq_no_(), serial_final_seq_no_(), cluster_version_(0), - xid_(temp_ref.xid_) + xid_(temp_ref.xid_), prio_op_array_(temp_ref.prio_op_array_) { before_serialize(); } @@ -686,7 +688,8 @@ public: ObTxSEQ max_submitted_seq_no, uint64_t cluster_version, const ObXATransID &xid, - ObTxSEQ serial_final_seq_no) + ObTxSEQ serial_final_seq_no, + tablelock::ObTableLockPrioOpArray &prio_op_array) : scheduler_(scheduler), trans_type_(trans_type), session_id_(session_id), associated_session_id_(associated_session_id), app_trace_id_str_(app_trace_id_str), schema_version_(schema_version), can_elr_(elr), @@ -694,7 +697,7 @@ public: is_sub2pc_(is_sub2pc), is_dup_tx_(is_dup_tx), tx_expired_time_(tx_expired_time), epoch_(epoch), last_op_sn_(last_op_sn), first_seq_no_(first_seq_no), last_seq_no_(last_seq_no), max_submitted_seq_no_(max_submitted_seq_no), serial_final_seq_no_(serial_final_seq_no), - cluster_version_(cluster_version), xid_(xid) + cluster_version_(cluster_version), xid_(xid), prio_op_array_(prio_op_array) { before_serialize(); }; @@ -719,6 +722,7 @@ public: ObTxSEQ get_serial_final_seq_no() const { return serial_final_seq_no_; } uint64_t get_cluster_version() const { return cluster_version_; } const ObXATransID &get_xid() const { return xid_; } + const tablelock::ObTableLockPrioOpArray &get_prio_op_array() const { return prio_op_array_; } // for ob_admin int ob_admin_dump(share::ObAdminMutatorStringArg &arg); @@ -743,7 +747,8 @@ public: K(max_submitted_seq_no_), K(serial_final_seq_no_), K(cluster_version_), - K(xid_)); + K(xid_), + K(prio_op_array_)); public: int before_serialize(); @@ -774,6 +779,7 @@ private: ObTxSEQ serial_final_seq_no_; uint64_t cluster_version_; ObXATransID xid_; + tablelock::ObTableLockPrioOpArray &prio_op_array_; }; class ObTxCommitInfoLogTempRef diff --git a/unittest/storage/tx/test_ob_tx_log.cpp b/unittest/storage/tx/test_ob_tx_log.cpp index 8eebecc637..469a418036 100644 --- a/unittest/storage/tx/test_ob_tx_log.cpp +++ b/unittest/storage/tx/test_ob_tx_log.cpp @@ -63,6 +63,7 @@ ObTxSEQ TEST_SERIAL_FINAL_SEQ_NO = ObTxSEQ(12346, 0); LSKey TEST_LS_KEY; ObXATransID TEST_XID; ObTxPrevLogType TEST_PREV_LOG_TYPE(ObTxPrevLogType::TypeEnum::TRANSFER_IN); +tablelock::ObTableLockPrioOpArray TEST_PRIO_OP_ARRAY; struct OldTestLog @@ -216,7 +217,8 @@ TEST_F(TestObTxLog, tx_log_body_except_redo) TEST_MAX_SUBMITTED_SEQ_NO, TEST_CLUSTER_VERSION, TEST_XID, - TEST_SERIAL_FINAL_SEQ_NO); + TEST_SERIAL_FINAL_SEQ_NO, + TEST_PRIO_OP_ARRAY); ObTxPrepareLog filll_prepare(TEST_LS_ARRAY, TEST_LOG_OFFSET, TEST_PREV_LOG_TYPE); ObTxCommitLog fill_commit(share::SCN::base_scn(), TEST_CHECKSUM, @@ -596,6 +598,10 @@ TEST_F(TestObTxLog, test_default_log_deserialize) replay_member_cnt++; EXPECT_EQ(fill_active_state.get_serial_final_seq_no(), replay_active_state.get_serial_final_seq_no()); replay_member_cnt++; + EXPECT_EQ(fill_active_state.get_associated_session_id(), replay_active_state.get_associated_session_id()); + replay_member_cnt++; + EXPECT_EQ(fill_active_state.get_prio_op_array().count(), replay_active_state.get_prio_op_array().count()); + replay_member_cnt++; EXPECT_EQ(replay_member_cnt, fill_member_cnt); ObTxCommitInfoLogTempRef commit_state_temp_ref;