diff --git a/src/common/backend/utils/misc/guc.cpp b/src/common/backend/utils/misc/guc.cpp index f6f402dbd..46d6e15fb 100644 --- a/src/common/backend/utils/misc/guc.cpp +++ b/src/common/backend/utils/misc/guc.cpp @@ -9287,7 +9287,7 @@ static void init_configure_names_int() NULL }, &u_sess->attr.attr_sql.max_parallel_workers_per_gather, - 0, + 2, 0, MAX_PARALLEL_WORKER_LIMIT, NULL, diff --git a/src/common/backend/utils/time/combocid.cpp b/src/common/backend/utils/time/combocid.cpp index 0985e43e2..0dcb00376 100755 --- a/src/common/backend/utils/time/combocid.cpp +++ b/src/common/backend/utils/time/combocid.cpp @@ -348,3 +348,39 @@ void StreamTxnContextRestoreComboCid(void* stc) Assert((u_sess->utils_cxt.sizeComboCids > 0 && u_sess->utils_cxt.comboHash) || (u_sess->utils_cxt.sizeComboCids == 0 && !u_sess->utils_cxt.comboHash)); } + +void SerializeComboCIDState(void *parallelCtx) +{ + ParallelInfoContext *ctx = (ParallelInfoContext*)parallelCtx; + ctx->usedComboCids = u_sess->utils_cxt.usedComboCids; + if (ctx->usedComboCids > 0) { + Size comboCidSize = sizeof(ComboCidKeyData) * (uint32)ctx->usedComboCids; + ctx->comboCids = (ComboCidKeyData*)palloc(comboCidSize); + int rc = memcpy_s(ctx->comboCids, comboCidSize, u_sess->utils_cxt.comboCids, comboCidSize); + securec_check(rc, "", ""); + } +} + +/* + * Read the ComboCID state at the specified address and initialize this + * backend with the same ComboCIDs. This is only valid in a backend that + * currently has no ComboCIDs (and only makes sense if the transaction state + * is serialized and restored as well). + */ +void RestoreComboCIDState(void *parallelCtx) +{ + ParallelInfoContext *ctx = (ParallelInfoContext*)parallelCtx; + + Assert(u_sess->utils_cxt.comboCids == NULL); + Assert(u_sess->utils_cxt.comboHash == NULL); + + /* Use GetComboCommandId to restore each ComboCID. */ + for (int i = 0; i < ctx->usedComboCids ; i++) { + CommandId cid = GetComboCommandId(ctx->comboCids[i].cmin, ctx->comboCids[i].cmax); + /* Verify that we got the expected answer. */ + if (cid != (uint32)i) { + ereport(ERROR, (errmsg("unexpected command ID while restoring combo CIDs"))); + } + } +} + diff --git a/src/common/backend/utils/time/snapmgr.cpp b/src/common/backend/utils/time/snapmgr.cpp index 0fbca1687..b594508c7 100755 --- a/src/common/backend/utils/time/snapmgr.cpp +++ b/src/common/backend/utils/time/snapmgr.cpp @@ -1519,7 +1519,7 @@ void SerializeSnapshot(Snapshot snapshot, char *start_address, Size len) * snapshot taken during recovery; all the top-level XIDs are in subxip as * well in that case, so we mustn't lose them. */ - if (snapshot->subxcnt > 0) { + if (serialized_snapshot->subxcnt > 0) { Size subxipoff = sizeof(SerializedSnapshotData) + snapshot->xcnt * sizeof(TransactionId); rc = memcpy_s(((char *)serialized_snapshot + subxipoff), len - subxipoff, snapshot->subxip, @@ -1573,7 +1573,7 @@ Snapshot RestoreSnapshot(char *start_address, Size len) /* Copy SubXIDs, if present. */ if (serialized_snapshot->subxcnt > 0) { - snapshot->subxip = snapshot->xip + serialized_snapshot->xcnt; + snapshot->subxip = ((TransactionId*)(snapshot + 1)) + serialized_snapshot->xcnt; rc = memcpy_s(snapshot->subxip, remainLen, serialized_xids + serialized_snapshot->xcnt, serialized_snapshot->subxcnt * sizeof(TransactionId)); securec_check_c(rc, "", ""); diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index 1c5dc2e02..18625fcca 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -360,7 +360,10 @@ static void knl_t_xact_init(knl_t_xact_context* xact_cxt) xact_cxt->PGXCBucketMap = NULL; xact_cxt->PGXCNodeId = -1; - xact_cxt->inheritFileNode = false; + xact_cxt->inheritFileNode = false; + + xact_cxt->nParallelCurrentXids = 0; + xact_cxt->ParallelCurrentXids = NULL; } static void knl_t_mem_init(knl_t_mem_context* mem_cxt) diff --git a/src/gausskernel/runtime/executor/execParallel.cpp b/src/gausskernel/runtime/executor/execParallel.cpp index 42da37633..7f4644d0c 100644 --- a/src/gausskernel/runtime/executor/execParallel.cpp +++ b/src/gausskernel/runtime/executor/execParallel.cpp @@ -278,7 +278,7 @@ ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, } /* Everyone's had a chance to ask for space, so now create the DSM. */ - InitializeParallelDSM(pcxt); + InitializeParallelDSM(pcxt, estate->es_snapshot); knl_u_parallel_context *cxt = (knl_u_parallel_context *)pcxt->seg; /* diff --git a/src/gausskernel/runtime/executor/nodeGather.cpp b/src/gausskernel/runtime/executor/nodeGather.cpp index 6b7eda565..e11625777 100644 --- a/src/gausskernel/runtime/executor/nodeGather.cpp +++ b/src/gausskernel/runtime/executor/nodeGather.cpp @@ -373,6 +373,10 @@ static HeapTuple gather_readnext(GatherState *gatherstate) */ static void ExecShutdownGatherWorkers(GatherState *node) { + /* wait for the workers to finish first */ + if (node->pei != NULL) + ExecParallelFinish(node->pei); + /* Shut down tuple queue readers before shutting down workers. */ if (node->reader != NULL) { for (int i = 0; i < node->nreaders; ++i) @@ -381,10 +385,6 @@ static void ExecShutdownGatherWorkers(GatherState *node) pfree(node->reader); node->reader = NULL; } - - /* Now shut down the workers. */ - if (node->pei != NULL) - ExecParallelFinish(node->pei); } /* ---------------------------------------------------------------- diff --git a/src/gausskernel/storage/access/transam/parallel.cpp b/src/gausskernel/storage/access/transam/parallel.cpp index b40e234b7..0e1c11c34 100644 --- a/src/gausskernel/storage/access/transam/parallel.cpp +++ b/src/gausskernel/storage/access/transam/parallel.cpp @@ -114,10 +114,9 @@ ParallelContext *CreateParallelContext(const char *library_name, const char *fun * copy state and other bookkeeping information that will be needed by * parallel workers into it. */ -void InitializeParallelDSM(ParallelContext *pcxt) +void InitializeParallelDSM(ParallelContext *pcxt, const void *snap) { int i; - Snapshot transaction_snapshot = GetTransactionSnapshot(); Snapshot active_snapshot = GetActiveSnapshot(); /* @@ -154,22 +153,22 @@ void InitializeParallelDSM(ParallelContext *pcxt) /* We can skip the rest of this if we're not budgeting for any workers. */ if (pcxt->nworkers > 0) { /* Serialize combo CID state. */ - cxt->pwCtx->usedComboCids = u_sess->utils_cxt.usedComboCids; - cxt->pwCtx->comboCids = u_sess->utils_cxt.comboCids; - cxt->pwCtx->sizeComboCids = u_sess->utils_cxt.sizeComboCids; - cxt->pwCtx->comboHash = u_sess->utils_cxt.comboHash; + SerializeComboCIDState(cxt->pwCtx); - /* Serialize transaction snapshot and active snapshot. */ - Size tsnaplen = EstimateSnapshotSpace(transaction_snapshot); + /* Serialize active snapshot. */ Size asnaplen = EstimateSnapshotSpace(active_snapshot); - cxt->pwCtx->tsnapspace = (char *)palloc0(tsnaplen); - cxt->pwCtx->tsnapspace_len = tsnaplen; - SerializeSnapshot(transaction_snapshot, cxt->pwCtx->tsnapspace, tsnaplen); cxt->pwCtx->asnapspace = (char *)palloc0(asnaplen); cxt->pwCtx->asnapspace_len = asnaplen; SerializeSnapshot(active_snapshot, cxt->pwCtx->asnapspace, asnaplen); + /* Save transaction snapshot */ + cxt->pwCtx->xmin = ((Snapshot)snap)->xmin; + cxt->pwCtx->xmax = ((Snapshot)snap)->xmax; + cxt->pwCtx->timeline = ((Snapshot)snap)->timeline; + cxt->pwCtx->snapshotcsn = ((Snapshot)snap)->snapshotcsn; + cxt->pwCtx->curcid = ((Snapshot)snap)->curcid; + Size searchPathLen = strlen(u_sess->attr.attr_common.namespace_search_path); cxt->pwCtx->namespace_search_path = (char *)palloc(searchPathLen + 1); int rc = strcpy_s(cxt->pwCtx->namespace_search_path, searchPathLen + 1, @@ -177,13 +176,7 @@ void InitializeParallelDSM(ParallelContext *pcxt) securec_check_c(rc, "", ""); /* Serialize transaction state. */ - cxt->pwCtx->xactIsoLevel = u_sess->utils_cxt.XactIsoLevel; - cxt->pwCtx->xactDeferrable = u_sess->attr.attr_storage.XactDeferrable; - cxt->pwCtx->topTransactionId = GetTopTransactionIdIfAny(); - cxt->pwCtx->currentTransactionId = GetCurrentTransactionIdIfAny(); - cxt->pwCtx->currentCommandId = t_thrd.xact_cxt.currentCommandId; - cxt->pwCtx->nParallelCurrentXids = t_thrd.xact_cxt.nParallelCurrentXids; - cxt->pwCtx->ParallelCurrentXids = t_thrd.xact_cxt.ParallelCurrentXids; + SerializeTransactionState(cxt->pwCtx); /* Serialize relmapper state. */ cxt->pwCtx->active_shared_updates = u_sess->relmap_cxt.active_shared_updates; @@ -988,19 +981,30 @@ void ParallelWorkerMain(Datum main_arg) StartParallelWorkerTransaction(ctx->pwCtx); /* Restore combo CID state. */ - u_sess->utils_cxt.usedComboCids = ctx->pwCtx->usedComboCids; - u_sess->utils_cxt.comboCids = ctx->pwCtx->comboCids; - u_sess->utils_cxt.sizeComboCids = ctx->pwCtx->sizeComboCids; - u_sess->utils_cxt.comboHash = ctx->pwCtx->comboHash; + RestoreComboCIDState(ctx->pwCtx); /* Restore namespace search path */ u_sess->attr.attr_common.namespace_search_path = ctx->pwCtx->namespace_search_path; /* Restore transaction snapshot. */ - RestoreTransactionSnapshot(RestoreSnapshot(ctx->pwCtx->tsnapspace, ctx->pwCtx->tsnapspace_len), - ctx->pwCtx->parallel_master_pgproc); + u_sess->utils_cxt.FirstSnapshotSet = true; + u_sess->utils_cxt.CurrentSnapshot = u_sess->utils_cxt.CurrentSnapshotData; + u_sess->utils_cxt.CurrentSnapshot->xmin = ctx->pwCtx->xmin; + u_sess->utils_cxt.CurrentSnapshot->xmax = ctx->pwCtx->xmax; + u_sess->utils_cxt.CurrentSnapshot->timeline = ctx->pwCtx->timeline; + u_sess->utils_cxt.CurrentSnapshot->snapshotcsn = ctx->pwCtx->snapshotcsn; + u_sess->utils_cxt.CurrentSnapshot->curcid = ctx->pwCtx->curcid; + u_sess->utils_cxt.CurrentSnapshot->active_count = 0; + u_sess->utils_cxt.CurrentSnapshot->regd_count = 0; + u_sess->utils_cxt.CurrentSnapshot->copied = false; + + u_sess->utils_cxt.RecentGlobalXmin = ctx->pwCtx->RecentGlobalXmin; + u_sess->utils_cxt.TransactionXmin = ctx->pwCtx->TransactionXmin; + u_sess->utils_cxt.RecentXmin = ctx->pwCtx->RecentXmin; + /* Restore active snapshot. */ - PushActiveSnapshot(RestoreSnapshot(ctx->pwCtx->asnapspace, ctx->pwCtx->asnapspace_len)); + Snapshot active_snapshot = RestoreSnapshot(ctx->pwCtx->asnapspace, ctx->pwCtx->asnapspace_len); + PushActiveSnapshot(active_snapshot); /* * We've changed which tuples we can see, and must therefore invalidate diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index fc68e1998..ac19491ca 100644 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -1146,6 +1146,17 @@ bool TransactionIdIsCurrentTransactionId(TransactionId xid) if (!TransactionIdIsNormal(xid)) return false; + /* + * In parallel workers, the XIDs we must consider as current are stored in + * ParallelCurrentXids rather than the transaction-state stack. Note that + * the XIDs in this array are sorted numerically rather than according to + * transactionIdPrecedes order. + */ + if (t_thrd.xact_cxt.nParallelCurrentXids > 0) { + return bsearch(&xid, t_thrd.xact_cxt.ParallelCurrentXids, (uint32)t_thrd.xact_cxt.nParallelCurrentXids, + sizeof(TransactionId), xidComparator) != NULL ? true : false; + } + /* * We will return true for the Xid of the current subtransaction, any of * its subcommitted children, any of its parents, or any of their @@ -3004,7 +3015,6 @@ static void CommitTransaction(bool stpCommit) s->maxChildXids = 0; s->storageEngineType = SE_TYPE_UNSPECIFIED; - t_thrd.xact_cxt.XactTopTransactionId = InvalidTransactionId; t_thrd.xact_cxt.nParallelCurrentXids = 0; #ifdef PGXC @@ -3560,6 +3570,8 @@ static void PrepareTransaction(bool stpCommit) s->maxChildXids = 0; s->storageEngineType = SE_TYPE_UNSPECIFIED; + t_thrd.xact_cxt.nParallelCurrentXids = 0; + /* * done with 1st phase commit processing, set current transaction state * back to default @@ -4015,6 +4027,8 @@ static void CleanupTransaction(void) s->maxChildXids = 0; s->storageEngineType = SE_TYPE_UNSPECIFIED; + t_thrd.xact_cxt.nParallelCurrentXids = 0; + /* done with abort processing, set current transaction state back to default */ s->state = TRANS_DEFAULT; #ifdef PGXC @@ -6574,6 +6588,86 @@ void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts) t_thrd.xact_cxt.stmtStartTimestamp = stmt_ts; } +/* + * SerializeTransactionState + * Write out relevant details of our transaction state that will be + * needed by a parallel worker. + * + * We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs + * associated with this transaction. We emit the XIDs in sorted order for + * the convenience of the receiving process. + */ +void SerializeTransactionState(ParallelInfoContext *cxt) +{ + Assert(cxt != NULL); + int rc; + int i; + int nxids = 0; + Size transSize; + TransactionState s = NULL; + cxt->xactIsoLevel = u_sess->utils_cxt.XactIsoLevel; + cxt->xactDeferrable = u_sess->attr.attr_storage.XactDeferrable; + cxt->topTransactionId = GetTopTransactionIdIfAny(); + cxt->currentTransactionId = GetCurrentTransactionIdIfAny(); + cxt->currentCommandId = t_thrd.xact_cxt.currentCommandId; + cxt->RecentGlobalXmin = u_sess->utils_cxt.RecentGlobalXmin; + cxt->TransactionXmin = u_sess->utils_cxt.TransactionXmin; + cxt->RecentXmin = u_sess->utils_cxt.RecentXmin; + cxt->nParallelCurrentXids = 0; + cxt->ParallelCurrentXids = NULL; + + /* + * If we're running in a parallel worker and launching a parallel worker + * of our own, we can just pass along the information that was passed to + * us. + */ + if (t_thrd.xact_cxt.nParallelCurrentXids > 0) { + cxt->nParallelCurrentXids = t_thrd.xact_cxt.nParallelCurrentXids; + transSize = sizeof(TransactionId) * (uint32)t_thrd.xact_cxt.nParallelCurrentXids; + cxt->ParallelCurrentXids = (TransactionId*)palloc(transSize); + rc = memcpy_s(cxt->ParallelCurrentXids, transSize, t_thrd.xact_cxt.ParallelCurrentXids, transSize); + securec_check_c(rc, "", ""); + return; + } + + /* + * OK, we need to generate a sorted list of XIDs that our workers should + * view as current. First, figure out how many there are. + */ + for (s = CurrentTransactionState; s != NULL; s = s->parent) { + if (TransactionIdIsValid(s->transactionId)) { + nxids++; + } + nxids += s->nChildXids; + } + + if (nxids <= 0) { + return; + } + + /* Copy them to our scratch space. */ + cxt->nParallelCurrentXids = nxids; + transSize = sizeof(TransactionId) * (uint32)nxids; + cxt->ParallelCurrentXids = (TransactionId*)palloc(transSize); + + for (i = 0, s = CurrentTransactionState; s != NULL; s = s->parent) { + if (TransactionIdIsValid(s->transactionId)) { + cxt->ParallelCurrentXids[i++] = s->transactionId; + } + + if (s->childXids != NULL && s->nChildXids > 0) { + rc = memcpy_s(&cxt->ParallelCurrentXids[i], transSize - (i * sizeof(TransactionId)), + s->childXids, (uint32)s->nChildXids * sizeof(TransactionId)); + securec_check_c(rc, "", ""); + i += s->nChildXids; + } + } + Assert(i == nxids); + + /* Sort them. */ + qsort(cxt->ParallelCurrentXids, nxids, sizeof(TransactionId), xidComparator); +} + /* * StartParallelWorkerTransaction * Start a parallel worker transaction, restoring the relevant diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 13b3f52f8..1775045e6 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -1,68 +1,68 @@ -/* ------------------------------------------------------------------------- - * - * parallel.h - * Infrastructure for launching parallel workers - * - * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * - * src/include/access/parallel.h - * - * ------------------------------------------------------------------------- - */ - -#ifndef PARALLEL_H -#define PARALLEL_H - -#include "access/xlogdefs.h" -#include "lib/ilist.h" -#include "postmaster/bgworker.h" -#include "storage/shm_mq.h" - -typedef void (*parallel_worker_main_type)(void *seg); - -typedef struct ParallelWorkerInfo { - BackgroundWorkerHandle *bgwhandle; - shm_mq_handle *error_mqh; - ThreadId pid; -} ParallelWorkerInfo; - -typedef struct ParallelContext { - dlist_node node; - SubTransactionId subid; - int nworkers; - int nworkers_launched; - char *library_name; - char *function_name; - ErrorContextCallback *error_context_stack; - void *seg; - void *private_memory; - ParallelWorkerInfo *worker; - int nknown_attached_workers; - bool *known_attached_workers; -} ParallelContext; - -typedef struct ParallelWorkerContext { - void *seg; -} ParallelWorkerContext; - -#define IsParallelWorker() (t_thrd.bgworker_cxt.ParallelWorkerNumber >= 0) - -extern ParallelContext *CreateParallelContext(const char *library_name, const char *function_name, int nworkers); -extern void InitializeParallelDSM(ParallelContext *pcxt); -extern void ReinitializeParallelDSM(ParallelContext *pcxt); -extern void LaunchParallelWorkers(ParallelContext *pcxt); -extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt); -extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); -extern void DestroyParallelContext(ParallelContext *pcxt); -extern bool ParallelContextActive(void); - -extern void HandleParallelMessageInterrupt(void); -extern void HandleParallelMessages(void); -extern void AtEOXact_Parallel(bool isCommit); -extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId); -extern void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end); - -extern void ParallelWorkerMain(Datum main_arg); - -#endif /* PARALLEL_H */ +/* ------------------------------------------------------------------------- + * + * parallel.h + * Infrastructure for launching parallel workers + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/parallel.h + * + * ------------------------------------------------------------------------- + */ + +#ifndef PARALLEL_H +#define PARALLEL_H + +#include "access/xlogdefs.h" +#include "lib/ilist.h" +#include "postmaster/bgworker.h" +#include "storage/shm_mq.h" + +typedef void (*parallel_worker_main_type)(void *seg); + +typedef struct ParallelWorkerInfo { + BackgroundWorkerHandle *bgwhandle; + shm_mq_handle *error_mqh; + ThreadId pid; +} ParallelWorkerInfo; + +typedef struct ParallelContext { + dlist_node node; + SubTransactionId subid; + int nworkers; + int nworkers_launched; + char *library_name; + char *function_name; + ErrorContextCallback *error_context_stack; + void *seg; + void *private_memory; + ParallelWorkerInfo *worker; + int nknown_attached_workers; + bool *known_attached_workers; +} ParallelContext; + +typedef struct ParallelWorkerContext { + void *seg; +} ParallelWorkerContext; + +#define IsParallelWorker() (t_thrd.bgworker_cxt.ParallelWorkerNumber >= 0) + +extern ParallelContext *CreateParallelContext(const char *library_name, const char *function_name, int nworkers); +extern void InitializeParallelDSM(ParallelContext *pcxt, const void *snap); +extern void ReinitializeParallelDSM(ParallelContext *pcxt); +extern void LaunchParallelWorkers(ParallelContext *pcxt); +extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt); +extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); +extern void DestroyParallelContext(ParallelContext *pcxt); +extern bool ParallelContextActive(void); + +extern void HandleParallelMessageInterrupt(void); +extern void HandleParallelMessages(void); +extern void AtEOXact_Parallel(bool isCommit); +extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId); +extern void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end); + +extern void ParallelWorkerMain(Datum main_arg); + +#endif /* PARALLEL_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index fdeaa86de..707c4380e 100755 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -328,6 +328,7 @@ extern void BeginInternalSubTransaction(const char* name); extern void ReleaseCurrentSubTransaction(void); extern void RollbackAndReleaseCurrentSubTransaction(void); extern bool IsSubTransaction(void); +extern void SerializeTransactionState(ParallelInfoContext *cxt); extern void StartParallelWorkerTransaction(ParallelInfoContext *cxt); extern void EndParallelWorkerTransaction(void); extern void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts); diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 7b1714e75..aacf84b43 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -2066,8 +2066,6 @@ typedef struct ParallelInfoContext { int pscan_num; ParallelHeapScanDescData **pscan; int usedComboCids; - int sizeComboCids; - HTAB *comboHash; struct ComboCidKeyData *comboCids; char *tsnapspace; Size tsnapspace_len; @@ -2080,6 +2078,15 @@ typedef struct ParallelInfoContext { bool xactDeferrable; TransactionId topTransactionId; TransactionId currentTransactionId; + TransactionId RecentGlobalXmin; + TransactionId TransactionXmin; + TransactionId RecentXmin; + /* CurrentSnapshot */ + TransactionId xmin; + TransactionId xmax; + CommandId curcid; + uint32 timeline; + CommitSeqNo snapshotcsn; CommandId currentCommandId; int nParallelCurrentXids; TransactionId *ParallelCurrentXids; diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index a1a916539..8eb89150f 100644 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -369,7 +369,6 @@ typedef struct knl_t_xact_context { * The XIDs are stored sorted in numerical order (not logical order) to make * lookups as fast as possible. */ - TransactionId XactTopTransactionId; int nParallelCurrentXids; TransactionId *ParallelCurrentXids; diff --git a/src/include/utils/combocid.h b/src/include/utils/combocid.h index 5d0bc3faf..1f22a1093 100644 --- a/src/include/utils/combocid.h +++ b/src/include/utils/combocid.h @@ -22,6 +22,8 @@ extern void AtEOXact_ComboCid(void); +extern void RestoreComboCIDState(void *parallelCtx); +extern void SerializeComboCIDState(void *parallelCtx); extern void StreamTxnContextSaveComboCid(void* stc); extern void StreamTxnContextRestoreComboCid(void* stc);