!246 write csn when write commit log

Merge pull request !246 from 吴岳川/master
This commit is contained in:
opengauss-bot
2020-09-25 17:11:41 +08:00
committed by Gitee
7 changed files with 35 additions and 78 deletions

View File

@ -624,26 +624,7 @@ void TransactionIdAsyncCommitTree(TransactionId xid, int nxids, TransactionId* x
if (csn > 0)
CSNLogSetCommitSeqNo(xid, nxids, xids, csn);
} else {
#ifdef ENABLE_MULTIPLE_NODES
/*
* when the transaction does not involve GTM, set csn to commit-in-progress
* first here and set the true csn while procarray clear.
*/
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE) {
/* Set CSN to commit-in-progress */
Assert(csn == 0);
CSNLogSetCommitSeqNo(xid,
nxids,
xids,
(COMMITSEQNO_COMMIT_INPROGRESS | t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo));
} else { /* for gtm transactions, set the true csn here just after the clog is set. */
UpdateCSNLogAtTransactionEND(NULL, xid, nxids, xids, csn, true);
}
#else
if (!(useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE)) {
UpdateCSNLogAtTransactionEND(NULL, xid, nxids, xids, csn, true);
}
#endif
UpdateCSNLogAtTransactionEND(xid, nxids, xids, csn, true);
}
}

View File

@ -2112,7 +2112,7 @@ void FinishPreparedTransaction(const char* gid, bool isCommit)
abortLibraryLen);
}
ProcArrayRemove(proc, latestXid, isCommit);
ProcArrayRemove(proc, latestXid);
/*
* In case we fail while running the callbacks, mark the gxact invalid so
@ -3401,7 +3401,7 @@ void RemoveStaleTwophaseState(TransactionId xid)
* it to ProcArrayRemove. Don't calculate new local snapshot.
*/
latestCompletedXid = t_thrd.xact_cxt.ShmemVariableCache->latestCompletedXid;
ProcArrayRemove(g_instance.proc_base_all_procs[gxact->pgprocno], latestCompletedXid, false);
ProcArrayRemove(g_instance.proc_base_all_procs[gxact->pgprocno], latestCompletedXid);
/*
* And now we can clean up any files we may have left.

View File

@ -1404,6 +1404,11 @@ static void AtSubStart_ResourceOwner(void)
t_thrd.utils_cxt.CurrentResourceOwner = s->curTransactionOwner;
}
CommitSeqNo GetLocalNextCSN()
{
return pg_atomic_fetch_add_u64(&t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo, 1);
}
/* ----------------------------------------------------------------
* CommitTransaction stuff
* ----------------------------------------------------------------

View File

@ -315,7 +315,7 @@ void ProcArrayAdd(PGPROC* proc)
* the ProcArrayLock only once, and don't damage the content of the PGPROC;
* twophase.c depends on the latter.)
*/
void ProcArrayRemove(PGPROC* proc, TransactionId latestXid, bool isCommit)
void ProcArrayRemove(PGPROC* proc, TransactionId latestXid)
{
ProcArrayStruct* arrayP = g_instance.proc_array_idx;
PGXACT* pgxact = &g_instance.proc_base_all_xacts[proc->pgprocno];
@ -352,19 +352,14 @@ void ProcArrayRemove(PGPROC* proc, TransactionId latestXid, bool isCommit)
arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
arrayP->numProcs--;
if (isCommit) {
/* Update csn in shared memory after transaction commit. */
csn = UpdateCSNAtTransactionCommit(0);
}
/* Update csn in shared memory after transaction commit. */
csn = UpdateCSNAtTransactionCommit(0);
/* Calc new sanpshot. */
if (TransactionIdIsValid(latestXid))
CalculateLocalLatestSnapshot(false);
LWLockRelease(ProcArrayLock);
/* Update csn log after transaction end. */
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE)
UpdateCSNLogAtTransactionEND(proc, pgxact->xid, pgxact->nxids, proc->subxids.xids, csn, isCommit);
/* Free xid cache memory if needed, must after procarray remove */
ResetProcXidCache(proc, false);
proc->commitCSN = 0;
@ -444,12 +439,9 @@ void ProcArrayEndTransaction(PGPROC* proc, TransactionId latestXid, bool isCommi
ProcArrayEndTransactionInternal(proc, pgxact, latestXid, &xid, &nsubxids, &csn, isCommit);
CalculateLocalLatestSnapshot(false);
LWLockRelease(ProcArrayLock);
/* Update csn log after transaction commit. */
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE)
UpdateCSNLogAtTransactionEND(proc, xid, nsubxids, proc->subxids.xids, csn, isCommit);
} else
} else {
ProcArrayGroupClearXid(proc, latestXid);
}
} else {
/*
* If we have no XID, we don't need to lock, since we won't affect
@ -521,8 +513,7 @@ static inline void ProcArrayEndTransactionInternal(PGPROC* proc, PGXACT* pgxact,
proc->commitCSN = 0;
pgxact->needToSyncXid = false;
if (!GTM_FREE_MODE && !useLocalXid && IsPostmasterEnvironment)
ResetProcXidCache(proc, true);
ResetProcXidCache(proc, true);
if (csn != NULL)
*csn = result;
@ -651,23 +642,9 @@ static void ProcArrayGroupClearXid(PGPROC* proc, TransactionId latestXid)
proc_member->procArrayGroupMember = false;
/* Update CSN log. */
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE) {
CommitSeqNo commitCsn = commitcsn[index];
#ifdef ENABLE_MULTIPLE_NODES
commitCsn = commitCsn ? commitCsn: csn;
#endif
UpdateCSNLogAtTransactionEND(proc_member,
xid[index],
nsubxids[index],
proc_member->subxids.xids,
commitCsn,
commitcsn[index] != COMMITSEQNO_ABORTED);
}
if (proc_member != t_thrd.proc)
if (proc_member != t_thrd.proc) {
PGSemaphoreUnlock(&proc_member->sem);
}
index++;
}
@ -720,37 +697,34 @@ void ProcArrayClearTransaction(PGPROC* proc)
*/
static CommitSeqNo UpdateCSNAtTransactionCommit(CommitSeqNo maxCommitCSN)
{
CommitSeqNo result = 0;
CommitSeqNo result;
/* Update nextCommitSeqNo in Shmem Memory */
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE) {
/*
* In LocalXid or gtm-free mode,
* get CSN local and update nextCommitSeqNo
*/
/*
* In LocalXid or gtm-free mode,
* get CSN local and update nextCommitSeqNo
*/
#ifdef ENABLE_MULTIPLE_NODES
result = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo++;
result = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo++;
#else
result = GetCommitCsn();
result = GetCommitCsn();
#endif
/* Get CSN and update nextCommitSeqNo to csn+1 */
if (maxCommitCSN) {
result = maxCommitCSN;
} else {
/* Get CSN and update nextCommitSeqNo to csn+1 */
if (maxCommitCSN)
result = maxCommitCSN;
else
result = GetCommitCsn();
result = GetCommitCsn();
}
if (t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo < result + 1)
t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo = result + 1;
if (t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo < result + 1) {
t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo = result + 1;
}
return result;
}
void UpdateCSNLogAtTransactionEND(
PGPROC* proc, TransactionId xid, uint32 nsubxids, TransactionId* subXids, CommitSeqNo csn, bool isCommit)
TransactionId xid, uint32 nsubxids, TransactionId* subXids, CommitSeqNo csn, bool isCommit)
{
Assert(proc != NULL || (!GTM_FREE_MODE && !useLocalXid && IsPostmasterEnvironment));
if (TransactionIdIsNormal(xid) && isCommit) {
Assert(csn >= COMMITSEQNO_FROZEN);
@ -761,10 +735,6 @@ void UpdateCSNLogAtTransactionEND(
CSNLogSetCommitSeqNo(xid, nsubxids, subXids, csn & ~COMMITSEQNO_COMMIT_INPROGRESS);
#endif
}
/* Free xid cache memory if needed */
if (useLocalXid || !IsPostmasterEnvironment || GTM_FREE_MODE)
ResetProcXidCache(proc, true);
}
/*

View File

@ -992,7 +992,7 @@ void ProcReleaseLocks(bool isCommit)
static void RemoveProcFromArray(int code, Datum arg)
{
Assert(t_thrd.proc != NULL);
ProcArrayRemove(t_thrd.proc, InvalidTransactionId, false);
ProcArrayRemove(t_thrd.proc, InvalidTransactionId);
}
/*

View File

@ -386,6 +386,7 @@ extern bool IsInLiveSubtransaction();
extern void ExtendCsnlogForSubtrans(TransactionId parent_xid, int nsub_xid, TransactionId* sub_xids);
extern CommitSeqNo SetXact2CommitInProgress(TransactionId xid, CommitSeqNo csn);
extern void XactGetRelFiles(XLogReaderState* record, ColFileNodeRel** xnodesPtr, int* nrelsPtr);
extern CommitSeqNo GetLocalNextCSN();
extern bool IsMMEngineUsed();
extern bool IsMMEngineUsedInParentTransaction();
extern bool IsPGEngineUsed();

View File

@ -22,7 +22,7 @@
extern Size ProcArrayShmemSize(void);
extern void CreateSharedProcArray(void);
extern void ProcArrayAdd(PGPROC* proc);
extern void ProcArrayRemove(PGPROC* proc, TransactionId latestXid, bool isCommit);
extern void ProcArrayRemove(PGPROC* proc, TransactionId latestXid);
extern Size RingBufferShmemSize(void);
extern void CreateSharedRingBuffer(void);
@ -121,7 +121,7 @@ extern CommitSeqNo getNextCSN();
extern void SyncLocalXidWait(TransactionId xid);
extern void UpdateCSNLogAtTransactionEND(
PGPROC* proc, TransactionId xid, uint32 nsubxids, TransactionId* subXids, CommitSeqNo csn, bool isCommit);
TransactionId xid, uint32 nsubxids, TransactionId* subXids, CommitSeqNo csn, bool isCommit);
#endif /* PROCARRAY_H */