fix free csn
This commit is contained in:
@ -624,26 +624,7 @@ void TransactionIdAsyncCommitTree(TransactionId xid, int nxids, TransactionId* x
|
||||
if (csn > 0)
|
||||
CSNLogSetCommitSeqNo(xid, nxids, xids, csn);
|
||||
} else {
|
||||
#ifdef ENABLE_MULTIPLE_NODES
|
||||
/*
|
||||
* when the transaction does not involve GTM, set csn to commit-in-progress
|
||||
* first here and set the true csn while procarray clear.
|
||||
*/
|
||||
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE) {
|
||||
/* Set CSN to commit-in-progress */
|
||||
Assert(csn == 0);
|
||||
CSNLogSetCommitSeqNo(xid,
|
||||
nxids,
|
||||
xids,
|
||||
(COMMITSEQNO_COMMIT_INPROGRESS | t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo));
|
||||
} else { /* for gtm transactions, set the true csn here just after the clog is set. */
|
||||
UpdateCSNLogAtTransactionEND(NULL, xid, nxids, xids, csn, true);
|
||||
}
|
||||
#else
|
||||
if (!(useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE)) {
|
||||
UpdateCSNLogAtTransactionEND(NULL, xid, nxids, xids, csn, true);
|
||||
}
|
||||
#endif
|
||||
UpdateCSNLogAtTransactionEND(xid, nxids, xids, csn, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -2112,7 +2112,7 @@ void FinishPreparedTransaction(const char* gid, bool isCommit)
|
||||
abortLibraryLen);
|
||||
}
|
||||
|
||||
ProcArrayRemove(proc, latestXid, isCommit);
|
||||
ProcArrayRemove(proc, latestXid);
|
||||
|
||||
/*
|
||||
* In case we fail while running the callbacks, mark the gxact invalid so
|
||||
@ -3401,7 +3401,7 @@ void RemoveStaleTwophaseState(TransactionId xid)
|
||||
* it to ProcArrayRemove. Don't calculate new local snapshot.
|
||||
*/
|
||||
latestCompletedXid = t_thrd.xact_cxt.ShmemVariableCache->latestCompletedXid;
|
||||
ProcArrayRemove(g_instance.proc_base_all_procs[gxact->pgprocno], latestCompletedXid, false);
|
||||
ProcArrayRemove(g_instance.proc_base_all_procs[gxact->pgprocno], latestCompletedXid);
|
||||
|
||||
/*
|
||||
* And now we can clean up any files we may have left.
|
||||
|
||||
@ -1404,6 +1404,11 @@ static void AtSubStart_ResourceOwner(void)
|
||||
t_thrd.utils_cxt.CurrentResourceOwner = s->curTransactionOwner;
|
||||
}
|
||||
|
||||
CommitSeqNo GetLocalNextCSN()
|
||||
{
|
||||
return pg_atomic_fetch_add_u64(&t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo, 1);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* CommitTransaction stuff
|
||||
* ----------------------------------------------------------------
|
||||
|
||||
@ -315,7 +315,7 @@ void ProcArrayAdd(PGPROC* proc)
|
||||
* the ProcArrayLock only once, and don't damage the content of the PGPROC;
|
||||
* twophase.c depends on the latter.)
|
||||
*/
|
||||
void ProcArrayRemove(PGPROC* proc, TransactionId latestXid, bool isCommit)
|
||||
void ProcArrayRemove(PGPROC* proc, TransactionId latestXid)
|
||||
{
|
||||
ProcArrayStruct* arrayP = g_instance.proc_array_idx;
|
||||
PGXACT* pgxact = &g_instance.proc_base_all_xacts[proc->pgprocno];
|
||||
@ -352,19 +352,14 @@ void ProcArrayRemove(PGPROC* proc, TransactionId latestXid, bool isCommit)
|
||||
arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
|
||||
arrayP->numProcs--;
|
||||
|
||||
if (isCommit) {
|
||||
/* Update csn in shared memory after transaction commit. */
|
||||
csn = UpdateCSNAtTransactionCommit(0);
|
||||
}
|
||||
/* Update csn in shared memory after transaction commit. */
|
||||
csn = UpdateCSNAtTransactionCommit(0);
|
||||
|
||||
/* Calc new sanpshot. */
|
||||
if (TransactionIdIsValid(latestXid))
|
||||
CalculateLocalLatestSnapshot(false);
|
||||
LWLockRelease(ProcArrayLock);
|
||||
|
||||
/* Update csn log after transaction end. */
|
||||
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE)
|
||||
UpdateCSNLogAtTransactionEND(proc, pgxact->xid, pgxact->nxids, proc->subxids.xids, csn, isCommit);
|
||||
/* Free xid cache memory if needed, must after procarray remove */
|
||||
ResetProcXidCache(proc, false);
|
||||
proc->commitCSN = 0;
|
||||
@ -444,12 +439,9 @@ void ProcArrayEndTransaction(PGPROC* proc, TransactionId latestXid, bool isCommi
|
||||
ProcArrayEndTransactionInternal(proc, pgxact, latestXid, &xid, &nsubxids, &csn, isCommit);
|
||||
CalculateLocalLatestSnapshot(false);
|
||||
LWLockRelease(ProcArrayLock);
|
||||
|
||||
/* Update csn log after transaction commit. */
|
||||
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE)
|
||||
UpdateCSNLogAtTransactionEND(proc, xid, nsubxids, proc->subxids.xids, csn, isCommit);
|
||||
} else
|
||||
} else {
|
||||
ProcArrayGroupClearXid(proc, latestXid);
|
||||
}
|
||||
} else {
|
||||
/*
|
||||
* If we have no XID, we don't need to lock, since we won't affect
|
||||
@ -521,8 +513,7 @@ static inline void ProcArrayEndTransactionInternal(PGPROC* proc, PGXACT* pgxact,
|
||||
proc->commitCSN = 0;
|
||||
pgxact->needToSyncXid = false;
|
||||
|
||||
if (!GTM_FREE_MODE && !useLocalXid && IsPostmasterEnvironment)
|
||||
ResetProcXidCache(proc, true);
|
||||
ResetProcXidCache(proc, true);
|
||||
|
||||
if (csn != NULL)
|
||||
*csn = result;
|
||||
@ -651,23 +642,9 @@ static void ProcArrayGroupClearXid(PGPROC* proc, TransactionId latestXid)
|
||||
|
||||
proc_member->procArrayGroupMember = false;
|
||||
|
||||
/* Update CSN log. */
|
||||
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE) {
|
||||
CommitSeqNo commitCsn = commitcsn[index];
|
||||
|
||||
#ifdef ENABLE_MULTIPLE_NODES
|
||||
commitCsn = commitCsn ? commitCsn: csn;
|
||||
#endif
|
||||
UpdateCSNLogAtTransactionEND(proc_member,
|
||||
xid[index],
|
||||
nsubxids[index],
|
||||
proc_member->subxids.xids,
|
||||
commitCsn,
|
||||
commitcsn[index] != COMMITSEQNO_ABORTED);
|
||||
}
|
||||
|
||||
if (proc_member != t_thrd.proc)
|
||||
if (proc_member != t_thrd.proc) {
|
||||
PGSemaphoreUnlock(&proc_member->sem);
|
||||
}
|
||||
|
||||
index++;
|
||||
}
|
||||
@ -720,37 +697,34 @@ void ProcArrayClearTransaction(PGPROC* proc)
|
||||
*/
|
||||
static CommitSeqNo UpdateCSNAtTransactionCommit(CommitSeqNo maxCommitCSN)
|
||||
{
|
||||
CommitSeqNo result = 0;
|
||||
CommitSeqNo result;
|
||||
|
||||
/* Update nextCommitSeqNo in Shmem Memory */
|
||||
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE) {
|
||||
/*
|
||||
* In LocalXid or gtm-free mode,
|
||||
* get CSN local and update nextCommitSeqNo
|
||||
*/
|
||||
/*
|
||||
* In LocalXid or gtm-free mode,
|
||||
* get CSN local and update nextCommitSeqNo
|
||||
*/
|
||||
#ifdef ENABLE_MULTIPLE_NODES
|
||||
result = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo++;
|
||||
result = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo++;
|
||||
#else
|
||||
result = GetCommitCsn();
|
||||
result = GetCommitCsn();
|
||||
#endif
|
||||
/* Get CSN and update nextCommitSeqNo to csn+1 */
|
||||
if (maxCommitCSN) {
|
||||
result = maxCommitCSN;
|
||||
} else {
|
||||
/* Get CSN and update nextCommitSeqNo to csn+1 */
|
||||
if (maxCommitCSN)
|
||||
result = maxCommitCSN;
|
||||
else
|
||||
result = GetCommitCsn();
|
||||
result = GetCommitCsn();
|
||||
}
|
||||
|
||||
if (t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo < result + 1)
|
||||
t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo = result + 1;
|
||||
if (t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo < result + 1) {
|
||||
t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo = result + 1;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
void UpdateCSNLogAtTransactionEND(
|
||||
PGPROC* proc, TransactionId xid, uint32 nsubxids, TransactionId* subXids, CommitSeqNo csn, bool isCommit)
|
||||
TransactionId xid, uint32 nsubxids, TransactionId* subXids, CommitSeqNo csn, bool isCommit)
|
||||
{
|
||||
Assert(proc != NULL || (!GTM_FREE_MODE && !useLocalXid && IsPostmasterEnvironment));
|
||||
if (TransactionIdIsNormal(xid) && isCommit) {
|
||||
Assert(csn >= COMMITSEQNO_FROZEN);
|
||||
|
||||
@ -761,10 +735,6 @@ void UpdateCSNLogAtTransactionEND(
|
||||
CSNLogSetCommitSeqNo(xid, nsubxids, subXids, csn & ~COMMITSEQNO_COMMIT_INPROGRESS);
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Free xid cache memory if needed */
|
||||
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE)
|
||||
ResetProcXidCache(proc, true);
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@ -992,7 +992,7 @@ void ProcReleaseLocks(bool isCommit)
|
||||
static void RemoveProcFromArray(int code, Datum arg)
|
||||
{
|
||||
Assert(t_thrd.proc != NULL);
|
||||
ProcArrayRemove(t_thrd.proc, InvalidTransactionId, false);
|
||||
ProcArrayRemove(t_thrd.proc, InvalidTransactionId);
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@ -386,6 +386,7 @@ extern bool IsInLiveSubtransaction();
|
||||
extern void ExtendCsnlogForSubtrans(TransactionId parent_xid, int nsub_xid, TransactionId* sub_xids);
|
||||
extern CommitSeqNo SetXact2CommitInProgress(TransactionId xid, CommitSeqNo csn);
|
||||
extern void XactGetRelFiles(XLogReaderState* record, ColFileNodeRel** xnodesPtr, int* nrelsPtr);
|
||||
extern CommitSeqNo GetLocalNextCSN();
|
||||
extern bool IsMMEngineUsed();
|
||||
extern bool IsMMEngineUsedInParentTransaction();
|
||||
extern bool IsPGEngineUsed();
|
||||
|
||||
@ -22,7 +22,7 @@
|
||||
extern Size ProcArrayShmemSize(void);
|
||||
extern void CreateSharedProcArray(void);
|
||||
extern void ProcArrayAdd(PGPROC* proc);
|
||||
extern void ProcArrayRemove(PGPROC* proc, TransactionId latestXid, bool isCommit);
|
||||
extern void ProcArrayRemove(PGPROC* proc, TransactionId latestXid);
|
||||
|
||||
extern Size RingBufferShmemSize(void);
|
||||
extern void CreateSharedRingBuffer(void);
|
||||
@ -121,7 +121,7 @@ extern CommitSeqNo getNextCSN();
|
||||
extern void SyncLocalXidWait(TransactionId xid);
|
||||
|
||||
extern void UpdateCSNLogAtTransactionEND(
|
||||
PGPROC* proc, TransactionId xid, uint32 nsubxids, TransactionId* subXids, CommitSeqNo csn, bool isCommit);
|
||||
TransactionId xid, uint32 nsubxids, TransactionId* subXids, CommitSeqNo csn, bool isCommit);
|
||||
|
||||
#endif /* PROCARRAY_H */
|
||||
|
||||
|
||||
Reference in New Issue
Block a user