[mysql tablelock][master] placeholder for table lock priority queue

This commit is contained in:
jw-guo
2024-07-23 08:22:25 +00:00
committed by ob-robot
parent efbec60b0b
commit a7553305fc
7 changed files with 88 additions and 10 deletions

View File

@ -49,8 +49,22 @@ public:
public: public:
ObTableLockOp lock_op_; ObTableLockOp lock_op_;
}; };
typedef common::ObDList<ObMemCtxLockOpLinkNode> ObLockNodeList; typedef common::ObDList<ObMemCtxLockOpLinkNode> ObLockNodeList;
class ObMemCtxLockPrioOpLinkNode : public common::ObDLinkBase<ObMemCtxLockPrioOpLinkNode>
{
public:
ObMemCtxLockPrioOpLinkNode()
: prio_op_()
{}
int init(const ObTableLockOp &op_info, const ObTableLockPriority priority);
bool is_valid() const { return prio_op_.is_valid(); }
TO_STRING_KV(K_(prio_op));
public:
ObTableLockPrioOp prio_op_;
};
typedef common::ObDList<ObMemCtxLockPrioOpLinkNode> ObPrioLockNodeList;
class ObLockMemCtx class ObLockMemCtx
{ {
using RWLock = common::SpinRWLock; using RWLock = common::SpinRWLock;

View File

@ -306,6 +306,10 @@ OB_SERIALIZE_MEMBER(ObTableLockInfo,
table_lock_ops_, table_lock_ops_,
max_durable_scn_); max_durable_scn_);
OB_SERIALIZE_MEMBER(ObTableLockPrioOp,
lock_op_,
priority_);
} // tablelock } // tablelock
} // transaction } // transaction
} // oceanbase } // oceanbase

View File

@ -32,6 +32,14 @@ namespace transaction
{ {
namespace tablelock namespace tablelock
{ {
struct ObObjLockPriorityTaskID
{
explicit ObObjLockPriorityTaskID(const int64_t trans_id_value)
: trans_id_value_(trans_id_value) {}
TO_STRING_KV(K_(trans_id_value));
int64_t trans_id_value_;
};
enum class ObTableLockPriority : int8_t enum class ObTableLockPriority : int8_t
{ {
INVALID = -1, INVALID = -1,
@ -651,6 +659,41 @@ public:
share::SCN max_durable_scn_; share::SCN max_durable_scn_;
}; };
struct ObTableLockPrioOp
{
OB_UNIS_VERSION(1);
public:
ObTableLockPrioOp()
: lock_op_(), priority_(ObTableLockPriority::INVALID)
{}
ObTableLockPrioOp(const ObTableLockPriority priority, const ObTableLockOp &lock_op)
: lock_op_(lock_op), priority_(priority)
{}
bool is_valid() const
{ return ObTableLockPriority::INVALID != priority_ && lock_op_.is_valid(); }
public:
TO_STRING_KV(K_(lock_op), K_(priority));
public:
ObTableLockOp lock_op_;
ObTableLockPriority priority_;
};
typedef common::ObSEArray<ObTableLockPrioOp, 10, TransModulePageAllocator> ObTableLockPrioOpArray;
struct ObTableLockPrioArg
{
public:
ObTableLockPrioArg()
: priority_(ObTableLockPriority::INVALID) {}
explicit ObTableLockPrioArg(const ObTableLockPriority &priority)
: priority_(priority) {}
bool is_valid() const { return ObTableLockPriority::INVALID != priority_; }
public:
TO_STRING_KV(K_(priority));
public:
ObTableLockPriority priority_;
};
static inline static inline
bool is_need_retry_unlock_error(int err) bool is_need_retry_unlock_error(int err)
{ {

View File

@ -3390,6 +3390,7 @@ int ObPartTransCtx::submit_redo_active_info_log_()
ObTxLogBlock log_block; ObTxLogBlock log_block;
bool has_redo = false; bool has_redo = false;
ObRedoLogSubmitHelper helper; ObRedoLogSubmitHelper helper;
ObTableLockPrioOpArray prio_op_array;
if (OB_FAIL(submit_redo_if_parallel_logging_())) { if (OB_FAIL(submit_redo_if_parallel_logging_())) {
} else if (OB_FAIL(init_log_block_(log_block))) { } else if (OB_FAIL(init_log_block_(log_block))) {
TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this)); TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this));
@ -3416,7 +3417,8 @@ int ObPartTransCtx::submit_redo_active_info_log_()
last_scn_, exec_info_.max_submitted_seq_no_, last_scn_, exec_info_.max_submitted_seq_no_,
cluster_version_, cluster_version_,
exec_info_.xid_, exec_info_.xid_,
exec_info_.serial_final_seq_no_); exec_info_.serial_final_seq_no_,
prio_op_array);
ObTxLogCb *log_cb = nullptr; ObTxLogCb *log_cb = nullptr;
if (OB_FAIL(prepare_log_cb_(!NEED_FINAL_CB, log_cb))) { if (OB_FAIL(prepare_log_cb_(!NEED_FINAL_CB, log_cb))) {
TRANS_LOG(WARN, "get log cb failed", KR(ret), KP(log_cb), K(*this)); TRANS_LOG(WARN, "get log cb failed", KR(ret), KP(log_cb), K(*this));

View File

@ -300,7 +300,8 @@ OB_TX_SERIALIZE_MEMBER(ObTxActiveInfoLog,
/* 17 */ max_submitted_seq_no_, /* 17 */ max_submitted_seq_no_,
/* 18 */ xid_, /* 18 */ xid_,
/* 19 */ serial_final_seq_no_, /* 19 */ serial_final_seq_no_,
/* 20 */ associated_session_id_); /* 20 */ associated_session_id_,
/* 21 */ prio_op_array_);
OB_TX_SERIALIZE_MEMBER(ObTxCommitInfoLog, OB_TX_SERIALIZE_MEMBER(ObTxCommitInfoLog,
compat_bytes_, compat_bytes_,
@ -365,7 +366,7 @@ int ObTxActiveInfoLog::before_serialize()
TRANS_LOG(WARN, "reset all compat_bytes_ valid failed", K(ret)); TRANS_LOG(WARN, "reset all compat_bytes_ valid failed", K(ret));
} }
} else { } else {
if (OB_FAIL(compat_bytes_.init(19))) { if (OB_FAIL(compat_bytes_.init(21))) {
TRANS_LOG(WARN, "init compat_bytes_ failed", K(ret)); TRANS_LOG(WARN, "init compat_bytes_ failed", K(ret));
} }
} }
@ -390,6 +391,8 @@ int ObTxActiveInfoLog::before_serialize()
TX_NO_NEED_SER(!max_submitted_seq_no_.is_valid(), 17, compat_bytes_); TX_NO_NEED_SER(!max_submitted_seq_no_.is_valid(), 17, compat_bytes_);
TX_NO_NEED_SER(xid_.empty(), 18, compat_bytes_); TX_NO_NEED_SER(xid_.empty(), 18, compat_bytes_);
TX_NO_NEED_SER(!serial_final_seq_no_.is_valid(), 19, compat_bytes_); TX_NO_NEED_SER(!serial_final_seq_no_.is_valid(), 19, compat_bytes_);
TX_NO_NEED_SER(associated_session_id_ == 0, 20, compat_bytes_);
TX_NO_NEED_SER(prio_op_array_.empty(), 21, compat_bytes_);
} }
return ret; return ret;

View File

@ -643,13 +643,15 @@ private:
class ObTxActiveInfoLogTempRef { class ObTxActiveInfoLogTempRef {
public: public:
ObTxActiveInfoLogTempRef() : scheduler_(), app_trace_id_str_(), proposal_leader_(), xid_() {} ObTxActiveInfoLogTempRef()
: scheduler_(), app_trace_id_str_(), proposal_leader_(), xid_(), prio_op_array_() {}
public: public:
common::ObAddr scheduler_; common::ObAddr scheduler_;
common::ObString app_trace_id_str_; common::ObString app_trace_id_str_;
common::ObAddr proposal_leader_; common::ObAddr proposal_leader_;
ObXATransID xid_; ObXATransID xid_;
tablelock::ObTableLockPrioOpArray prio_op_array_;
}; };
class ObTxActiveInfoLog class ObTxActiveInfoLog
@ -663,7 +665,7 @@ public:
proposal_leader_(temp_ref.proposal_leader_), cur_query_start_time_(0), is_sub2pc_(false), proposal_leader_(temp_ref.proposal_leader_), cur_query_start_time_(0), is_sub2pc_(false),
is_dup_tx_(false), tx_expired_time_(0), epoch_(0), last_op_sn_(0), first_seq_no_(), is_dup_tx_(false), tx_expired_time_(0), epoch_(0), last_op_sn_(0), first_seq_no_(),
last_seq_no_(), max_submitted_seq_no_(), serial_final_seq_no_(), cluster_version_(0), last_seq_no_(), max_submitted_seq_no_(), serial_final_seq_no_(), cluster_version_(0),
xid_(temp_ref.xid_) xid_(temp_ref.xid_), prio_op_array_(temp_ref.prio_op_array_)
{ {
before_serialize(); before_serialize();
} }
@ -686,7 +688,8 @@ public:
ObTxSEQ max_submitted_seq_no, ObTxSEQ max_submitted_seq_no,
uint64_t cluster_version, uint64_t cluster_version,
const ObXATransID &xid, const ObXATransID &xid,
ObTxSEQ serial_final_seq_no) ObTxSEQ serial_final_seq_no,
tablelock::ObTableLockPrioOpArray &prio_op_array)
: scheduler_(scheduler), trans_type_(trans_type), session_id_(session_id), : scheduler_(scheduler), trans_type_(trans_type), session_id_(session_id),
associated_session_id_(associated_session_id), associated_session_id_(associated_session_id),
app_trace_id_str_(app_trace_id_str), schema_version_(schema_version), can_elr_(elr), app_trace_id_str_(app_trace_id_str), schema_version_(schema_version), can_elr_(elr),
@ -694,7 +697,7 @@ public:
is_sub2pc_(is_sub2pc), is_dup_tx_(is_dup_tx), tx_expired_time_(tx_expired_time), is_sub2pc_(is_sub2pc), is_dup_tx_(is_dup_tx), tx_expired_time_(tx_expired_time),
epoch_(epoch), last_op_sn_(last_op_sn), first_seq_no_(first_seq_no), last_seq_no_(last_seq_no), epoch_(epoch), last_op_sn_(last_op_sn), first_seq_no_(first_seq_no), last_seq_no_(last_seq_no),
max_submitted_seq_no_(max_submitted_seq_no), serial_final_seq_no_(serial_final_seq_no), max_submitted_seq_no_(max_submitted_seq_no), serial_final_seq_no_(serial_final_seq_no),
cluster_version_(cluster_version), xid_(xid) cluster_version_(cluster_version), xid_(xid), prio_op_array_(prio_op_array)
{ {
before_serialize(); before_serialize();
}; };
@ -719,6 +722,7 @@ public:
ObTxSEQ get_serial_final_seq_no() const { return serial_final_seq_no_; } ObTxSEQ get_serial_final_seq_no() const { return serial_final_seq_no_; }
uint64_t get_cluster_version() const { return cluster_version_; } uint64_t get_cluster_version() const { return cluster_version_; }
const ObXATransID &get_xid() const { return xid_; } const ObXATransID &get_xid() const { return xid_; }
const tablelock::ObTableLockPrioOpArray &get_prio_op_array() const { return prio_op_array_; }
// for ob_admin // for ob_admin
int ob_admin_dump(share::ObAdminMutatorStringArg &arg); int ob_admin_dump(share::ObAdminMutatorStringArg &arg);
@ -743,7 +747,8 @@ public:
K(max_submitted_seq_no_), K(max_submitted_seq_no_),
K(serial_final_seq_no_), K(serial_final_seq_no_),
K(cluster_version_), K(cluster_version_),
K(xid_)); K(xid_),
K(prio_op_array_));
public: public:
int before_serialize(); int before_serialize();
@ -774,6 +779,7 @@ private:
ObTxSEQ serial_final_seq_no_; ObTxSEQ serial_final_seq_no_;
uint64_t cluster_version_; uint64_t cluster_version_;
ObXATransID xid_; ObXATransID xid_;
tablelock::ObTableLockPrioOpArray &prio_op_array_;
}; };
class ObTxCommitInfoLogTempRef class ObTxCommitInfoLogTempRef

View File

@ -63,6 +63,7 @@ ObTxSEQ TEST_SERIAL_FINAL_SEQ_NO = ObTxSEQ(12346, 0);
LSKey TEST_LS_KEY; LSKey TEST_LS_KEY;
ObXATransID TEST_XID; ObXATransID TEST_XID;
ObTxPrevLogType TEST_PREV_LOG_TYPE(ObTxPrevLogType::TypeEnum::TRANSFER_IN); ObTxPrevLogType TEST_PREV_LOG_TYPE(ObTxPrevLogType::TypeEnum::TRANSFER_IN);
tablelock::ObTableLockPrioOpArray TEST_PRIO_OP_ARRAY;
struct OldTestLog struct OldTestLog
@ -216,7 +217,8 @@ TEST_F(TestObTxLog, tx_log_body_except_redo)
TEST_MAX_SUBMITTED_SEQ_NO, TEST_MAX_SUBMITTED_SEQ_NO,
TEST_CLUSTER_VERSION, TEST_CLUSTER_VERSION,
TEST_XID, TEST_XID,
TEST_SERIAL_FINAL_SEQ_NO); TEST_SERIAL_FINAL_SEQ_NO,
TEST_PRIO_OP_ARRAY);
ObTxPrepareLog filll_prepare(TEST_LS_ARRAY, TEST_LOG_OFFSET, TEST_PREV_LOG_TYPE); ObTxPrepareLog filll_prepare(TEST_LS_ARRAY, TEST_LOG_OFFSET, TEST_PREV_LOG_TYPE);
ObTxCommitLog fill_commit(share::SCN::base_scn(), ObTxCommitLog fill_commit(share::SCN::base_scn(),
TEST_CHECKSUM, TEST_CHECKSUM,
@ -596,6 +598,10 @@ TEST_F(TestObTxLog, test_default_log_deserialize)
replay_member_cnt++; replay_member_cnt++;
EXPECT_EQ(fill_active_state.get_serial_final_seq_no(), replay_active_state.get_serial_final_seq_no()); EXPECT_EQ(fill_active_state.get_serial_final_seq_no(), replay_active_state.get_serial_final_seq_no());
replay_member_cnt++; replay_member_cnt++;
EXPECT_EQ(fill_active_state.get_associated_session_id(), replay_active_state.get_associated_session_id());
replay_member_cnt++;
EXPECT_EQ(fill_active_state.get_prio_op_array().count(), replay_active_state.get_prio_op_array().count());
replay_member_cnt++;
EXPECT_EQ(replay_member_cnt, fill_member_cnt); EXPECT_EQ(replay_member_cnt, fill_member_cnt);
ObTxCommitInfoLogTempRef commit_state_temp_ref; ObTxCommitInfoLogTempRef commit_state_temp_ref;