【feature】子事务回滚时锁抢占问题优化

This commit is contained in:
congzhou2603
2023-07-06 15:03:53 +08:00
parent 26ce6b54a9
commit ba860581ca
5 changed files with 141 additions and 50 deletions

View File

@ -2177,8 +2177,24 @@ static TransactionId RecordTransactionAbort(bool isSubXact)
* subxacts, because we already have the child XID array at hand. For
* main xacts, the equivalent happens just after this function returns.
*/
if (isSubXact)
XidCacheRemoveRunningXids(xid, nchildren, children, latestXid);
if (isSubXact) {
if (LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE)) {
t_thrd.proc->procArrayGroupMemberXid = xid;
t_thrd.proc->procArrayGroupSubXactNXids = nchildren;
t_thrd.proc->procArrayGroupSubXactXids = children;
t_thrd.proc->procArrayGroupSubXactLatestXid = latestXid;
XidCacheRemoveRunningXids(t_thrd.proc, t_thrd.pgxact);
/* clear the group member cache after XidCacheRemoveRunningXids*/
t_thrd.proc->procArrayGroupMemberXid = InvalidTransactionId;
t_thrd.proc->procArrayGroupSubXactNXids = 0;
t_thrd.proc->procArrayGroupSubXactXids = NULL;
t_thrd.proc->procArrayGroupSubXactLatestXid = InvalidTransactionId;
LWLockRelease(ProcArrayLock);
} else {
ProcArrayGroupClearXid(true, t_thrd.proc, InvalidTransactionId, xid, nchildren, children, latestXid);
}
}
/* Reset XactLastRecEnd until the next transaction writes something */
if (!isSubXact)

View File

@ -185,7 +185,10 @@ static TransactionId GetMultiSnapshotOldestXmin();
static inline void ProcArrayEndTransactionInternal(PGPROC* proc, PGXACT* pgxact, TransactionId latestXid,
TransactionId* xid, uint32* nsubxids);
static void ProcArrayGroupClearXid(PGPROC* proc, TransactionId latestXid);
void XidCacheRemoveRunningXids(PGPROC* proc, PGXACT* pgxact);
void ProcArrayGroupClearXid(bool isSubTransaction, PGPROC* proc, TransactionId latestXid,
TransactionId subTranactionXid, int nSubTransactionXids,
TransactionId* subTransactionXids, TransactionId subTransactionLatestXid);
extern bool StreamTopConsumerAmI();
@ -522,7 +525,7 @@ void ProcArrayEndTransaction(PGPROC* proc, TransactionId latestXid, bool isCommi
CalculateLocalLatestSnapshot(false);
LWLockRelease(ProcArrayLock);
} else
ProcArrayGroupClearXid(proc, latestXid);
ProcArrayGroupClearXid(false, proc, latestXid, InvalidTransactionId, 0, NULL, InvalidTransactionId);
} else {
/*
* If we have no XID, we don't need to lock, since we won't affect
@ -603,6 +606,35 @@ static inline void ProcArrayEndTransactionInternal(PGPROC* proc, PGXACT* pgxact,
ResetProcXidCache(proc, true);
}
static inline void ProcInsertIntoGroup(PGPROC* proc, uint32* nextidx) {
while (true) {
*nextidx = pg_atomic_read_u32(&g_instance.proc_base->procArrayGroupFirst);
pg_atomic_write_u32(&proc->procArrayGroupNext, *nextidx);
if (pg_atomic_compare_exchange_u32(
&g_instance.proc_base->procArrayGroupFirst, nextidx, (uint32)proc->pgprocno))
break;
}
}
static inline void ClearProcArrayGroupCache(PGPROC* proc) {
proc->procArrayGroupMember = false;
proc->procArrayGroupMemberXid = InvalidTransactionId;
proc->procArrayGroupSubXactNXids = 0;
proc->procArrayGroupSubXactXids = NULL;
proc->procArrayGroupSubXactLatestXid = InvalidTransactionId;
}
static inline void SetProcArrayGroupCache(PGPROC* proc, TransactionId xid, int nxids,
TransactionId* xids, TransactionId latestXid)
{
proc->procArrayGroupMemberXid = xid;
proc->procArrayGroupSubXactNXids = nxids;
proc->procArrayGroupSubXactXids = xids;
proc->procArrayGroupSubXactLatestXid = latestXid;
}
/*
* ProcArrayGroupClearXid -- group XID clearing
*
@ -610,34 +642,36 @@ static inline void ProcArrayEndTransactionInternal(PGPROC* proc, PGXACT* pgxact,
* commit time, add ourselves to a list of processes that need their XIDs
* cleared. The first process to add itself to the list will acquire
* ProcArrayLock in exclusive mode and perform ProcArrayEndTransactionInternal
* on behalf of all group members. This avoids a great deal of contention
* for transaction group members and XidCacheRemoveRunningXids
* for subtransaction group members. This avoids a great deal of contention
* around ProcArrayLock when many processes are trying to commit at once,
* since the lock need not be repeatedly handed off from one committing
* process to the next.
*/
static void ProcArrayGroupClearXid(PGPROC* proc, TransactionId latestXid)
void ProcArrayGroupClearXid(bool isSubTransaction, PGPROC* proc,
TransactionId latestXid, TransactionId subTranactionXid,
int nSubTransactionXids, TransactionId* subTransactionXids,
TransactionId subTransactionLatestXid)
{
uint32 nextidx;
uint32 wakeidx;
TransactionId xid[PROCARRAY_MAXPROCS];
uint32 nsubxids[PROCARRAY_MAXPROCS];
uint32 index = 0;
uint32 commitcsn[PROCARRAY_MAXPROCS];
CommitSeqNo maxcsn = 0;
bool groupMemberHasTransaction = false;
/* We should definitely have an XID to clear. */
/* Add ourselves to the list of processes needing a group XID clear. */
proc->procArrayGroupMember = true;
proc->procArrayGroupMemberXid = latestXid;
while (true) {
nextidx = pg_atomic_read_u32(&g_instance.proc_base->procArrayGroupFirst);
pg_atomic_write_u32(&proc->procArrayGroupNext, nextidx);
if (pg_atomic_compare_exchange_u32(
&g_instance.proc_base->procArrayGroupFirst, &nextidx, (uint32)proc->pgprocno))
break;
if (isSubTransaction) {
SetProcArrayGroupCache(proc, subTranactionXid, nSubTransactionXids, subTransactionXids, subTransactionLatestXid);
} else {
SetProcArrayGroupCache(proc, latestXid, 0, NULL, InvalidTransactionId);
}
/* add current proc into ProcArrayGroup */
ProcInsertIntoGroup(proc, &nextidx);
/*
* If the list was not empty, the leader will clear our XID. It is
* impossible to have followers without a leader because the first process
@ -685,20 +719,45 @@ static void ProcArrayGroupClearXid(PGPROC* proc, TransactionId latestXid)
while (nextidx != INVALID_PGPROCNO) {
PGPROC* proc_member = g_instance.proc_base_all_procs[nextidx];
PGXACT* pgxact = &g_instance.proc_base_all_xacts[nextidx];
ereport(DEBUG2, (errmsg("handle group member from procArrayGroup, procno = %u, "
"procArrayGroupMemberXid = " XID_FMT ", "
"procArrayGroupSubXactNXids = %d, "
"procArrayGroupSubXactLatestXid = " XID_FMT ", "
"procArrayGroupNext = %u",
proc_member->pgprocno,
proc_member->procArrayGroupMemberXid,
proc_member->procArrayGroupSubXactNXids,
proc_member->procArrayGroupSubXactLatestXid,
proc_member->procArrayGroupNext)));
/*
* If the proc_member is a transaction, perform ProcArrayEndTransactionInternal
* to clear its XID. If the proc_member is a subtransaction,
* perform XidCacheRemoveRunningXids to clear its XIDs and
* its committed subtransaction's XIDS.
*
* proc_member->procArrayGroupSubXactLatestXid !=0 when the group member
* is a subtransaction.
*/
if (proc_member->procArrayGroupSubXactLatestXid != InvalidTransactionId) {
XidCacheRemoveRunningXids(proc_member, pgxact);
} else {
groupMemberHasTransaction = true;
ProcArrayEndTransactionInternal(
proc_member, pgxact, proc_member->procArrayGroupMemberXid, &xid[index], &nsubxids[index]);
}
/* Don't need to update csn each loop, just update once after the loop. */
commitcsn[index] = proc_member->commitCSN;
if (proc_member->commitCSN > maxcsn)
maxcsn = proc_member->commitCSN;
ProcArrayEndTransactionInternal(
proc_member, pgxact, proc_member->procArrayGroupMemberXid, &xid[index], &nsubxids[index]);
/* Move to next proc in list. */
nextidx = pg_atomic_read_u32(&proc_member->procArrayGroupNext);
index++;
}
/* already hold lock, caculate snapshot after last invocation */
CalculateLocalLatestSnapshot(false);
/* Already hold lock, caculate snapshot after last invocation,
* if there is at least one transaction in group.
*/
if (groupMemberHasTransaction) {
CalculateLocalLatestSnapshot(false);
}
/* We're done with the lock now. */
LWLockRelease(ProcArrayLock);
@ -720,7 +779,7 @@ static void ProcArrayGroupClearXid(PGPROC* proc, TransactionId latestXid)
/* ensure all previous writes are visible before follower continues. */
pg_write_barrier();
proc_member->procArrayGroupMember = false;
ClearProcArrayGroupCache(proc_member);
if (proc_member != t_thrd.proc)
PGSemaphoreUnlock(&proc_member->sem);
@ -3848,35 +3907,31 @@ void ProcArrayGetReplicationSlotXmin(TransactionId* xmin, TransactionId* catalog
LWLockRelease(ProcArrayLock);
}
#define XidCacheRemove(i) \
do { \
t_thrd.proc->subxids.xids[i] = t_thrd.proc->subxids.xids[t_thrd.pgxact->nxids - 1]; \
t_thrd.pgxact->nxids--; \
} while (0)
/*
* XidCacheRemoveRunningXids
*
* Remove a bunch of TransactionIds from the list of known-running
* subtransactions for my backend. Both the specified xid and those in
* the xids[] array (of length nxids) are removed from the subxids cache.
* latestXid must be the latest XID among the group.
* latestXid must be the latest XID among the group. We should store the
* required parameters into proc before performing XidCacheRemoveRunningXids,
* including subtransaction xid, the number of committed subtransaction,
* committed substransaction list, the latestXid between its xid and its
* committed subtransactions'.
*
* We don't do any locking here; caller must get the procArrayLock before
* perform XidCacheRemoveRunningXids.
*/
void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId* xids, TransactionId latestXid)
void XidCacheRemoveRunningXids(PGPROC* proc, PGXACT* pgxact)
{
int i, j;
TransactionId xid = proc->procArrayGroupMemberXid;
int nxids = proc->procArrayGroupSubXactNXids;
TransactionId* xids = proc->procArrayGroupSubXactXids;
TransactionId latestXid = proc->procArrayGroupSubXactLatestXid;
Assert(TransactionIdIsValid(xid));
/*
* We must hold ProcArrayLock exclusively in order to remove transactions
* from the PGPROC array. (See src/backend/access/transam/README.) It's
* possible this could be relaxed since we know this routine is only used
* to abort subtransactions, but pending closer analysis we'd best be
* conservative.
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
/*
* Under normal circumstances xid and xids[] will be in increasing order,
* as will be the entries in subxids. Scan backwards to avoid O(N^2)
@ -3885,9 +3940,10 @@ void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId
for (i = nxids - 1; i >= 0; i--) {
TransactionId anxid = xids[i];
for (j = t_thrd.pgxact->nxids - 1; j >= 0; j--) {
if (TransactionIdEquals(t_thrd.proc->subxids.xids[j], anxid)) {
XidCacheRemove(j);
for (j = pgxact->nxids - 1; j >= 0; j--) {
if (TransactionIdEquals(proc->subxids.xids[j], anxid)) {
proc->subxids.xids[j] = proc->subxids.xids[pgxact->nxids - 1];
pgxact->nxids--;
break;
}
}
@ -3903,9 +3959,10 @@ void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId
ereport(WARNING, (errmsg("did not find subXID " XID_FMT " in t_thrd.proc", anxid)));
}
for (j = t_thrd.pgxact->nxids - 1; j >= 0; j--) {
if (TransactionIdEquals(t_thrd.proc->subxids.xids[j], xid)) {
XidCacheRemove(j);
for (j = pgxact->nxids - 1; j >= 0; j--) {
if (TransactionIdEquals(proc->subxids.xids[j], xid)) {
proc->subxids.xids[j] = proc->subxids.xids[pgxact->nxids - 1];
pgxact->nxids--;
break;
}
}
@ -3918,7 +3975,6 @@ void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId
if (TransactionIdPrecedes(t_thrd.xact_cxt.ShmemVariableCache->latestCompletedXid, latestXid))
t_thrd.xact_cxt.ShmemVariableCache->latestCompletedXid = latestXid;
LWLockRelease(ProcArrayLock);
}
#ifdef XIDCACHE_DEBUG

View File

@ -905,6 +905,9 @@ void InitProcess(void)
/* Initialize fields for group XID clearing. */
t_thrd.proc->procArrayGroupMember = false;
t_thrd.proc->procArrayGroupMemberXid = InvalidTransactionId;
t_thrd.proc->procArrayGroupSubXactNXids = InvalidTransactionId;
t_thrd.proc->procArrayGroupSubXactXids = NULL;
t_thrd.proc->procArrayGroupSubXactLatestXid = InvalidTransactionId;
pg_atomic_init_u32(&t_thrd.proc->procArrayGroupNext, INVALID_PGPROCNO);
/* Initialize fields for group snapshot getting. */

View File

@ -214,6 +214,18 @@ struct PGPROC {
*/
TransactionId procArrayGroupMemberXid;
int procArrayGroupSubXactNXids;
TransactionId* procArrayGroupSubXactXids;
/*
* lastestXid amoung subtransaction's xid and it's committed children's,
* which can be detemined whether the group member is a subtransaction or
* transaction. procArrayGroupSubXactLatestXid != 0 only when the group
* memeber is subtransaction.
*/
TransactionId procArrayGroupSubXactLatestXid;
/* Support for group snapshot getting. */
bool snapshotGroupMember;
/* next ProcArray group member waiting for snapshot getting */

View File

@ -125,9 +125,13 @@ extern void CancelSingleNodeBackends(Oid databaseOid, Oid userOid, ProcSignalRea
extern int CountUserBackends(Oid roleid);
extern bool CountOtherDBBackends(Oid databaseId, int* nbackends, int* nprepared);
extern void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId* xids, TransactionId latestXid);
extern void XidCacheRemoveRunningXids(PGPROC* proc, PGXACT* pgxact);
extern void SetPgXactXidInvalid(void);
extern void ProcArrayGroupClearXid(bool isSubTransaction, PGPROC* proc, TransactionId latestXid,
TransactionId subTranactionXid, int nSubTransactionXids,
TransactionId* subTransactionXids, TransactionId subTransactionLatestXid);
extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked);
extern void ProcArrayGetReplicationSlotXmin(TransactionId* xmin, TransactionId* catalog_xmin);