diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 809382880a2..d5cf0fa02b0 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -70,7 +70,7 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx, int nrelations, Relation relations[], ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, - RepOriginId origin_id); + ReplOriginId origin_id); static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, @@ -461,11 +461,11 @@ pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, static bool pg_decode_filter(LogicalDecodingContext *ctx, - RepOriginId origin_id) + ReplOriginId origin_id) { TestDecodingData *data = ctx->output_plugin_private; - if (data->only_local && origin_id != InvalidRepOriginId) + if (data->only_local && origin_id != InvalidReplOriginId) return true; return false; } diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index f36bf9462fa..6dc49108997 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -1100,7 +1100,7 @@ typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, output plugin. typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, - RepOriginId origin_id); + ReplOriginId origin_id); The ctx parameter has the same contents as for the other callbacks. No information but the origin is diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 010f4ccbb78..4f53d3035cc 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -331,7 +331,7 @@ xact_desc_stats(StringInfo buf, const char *label, } static void -xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id) +xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, ReplOriginId origin_id) { xl_xact_parsed_commit parsed; @@ -367,7 +367,7 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId } static void -xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec, RepOriginId origin_id) +xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec, ReplOriginId origin_id) { xl_xact_parsed_abort parsed; @@ -394,7 +394,7 @@ xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec, RepOriginId or } static void -xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec, RepOriginId origin_id) +xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec, ReplOriginId origin_id) { xl_xact_parsed_prepare parsed; @@ -417,7 +417,7 @@ xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec, RepOriginI * Check if the replication origin has been set in this record in the same * way as PrepareRedoAdd(). */ - if (origin_id != InvalidRepOriginId) + if (origin_id != InvalidReplOriginId) appendStringInfo(buf, "; origin: node %u, lsn %X/%08X, at %s", origin_id, LSN_FORMAT_ARGS(parsed.origin_lsn), diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c index 082b564da8f..6fa2178f1dd 100644 --- a/src/backend/access/transam/commit_ts.c +++ b/src/backend/access/transam/commit_ts.c @@ -54,11 +54,11 @@ typedef struct CommitTimestampEntry { TimestampTz time; - RepOriginId nodeid; + ReplOriginId nodeid; } CommitTimestampEntry; #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \ - sizeof(RepOriginId)) + sizeof(ReplOriginId)) #define COMMIT_TS_XACTS_PER_PAGE \ (BLCKSZ / SizeOfCommitTimestampEntry) @@ -110,9 +110,9 @@ bool track_commit_timestamp; static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, - RepOriginId nodeid, int64 pageno); + ReplOriginId nodeid, int64 pageno); static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, - RepOriginId nodeid, int slotno); + ReplOriginId nodeid, int slotno); static void error_commit_ts_disabled(void); static bool CommitTsPagePrecedes(int64 page1, int64 page2); static void ActivateCommitTs(void); @@ -138,7 +138,7 @@ static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid); void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, - RepOriginId nodeid) + ReplOriginId nodeid) { int i; TransactionId headxid; @@ -219,7 +219,7 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, - RepOriginId nodeid, int64 pageno) + ReplOriginId nodeid, int64 pageno) { LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno); int slotno; @@ -245,7 +245,7 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids, */ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, - RepOriginId nodeid, int slotno) + ReplOriginId nodeid, int slotno) { int entryno = TransactionIdToCTsEntry(xid); CommitTimestampEntry entry; @@ -270,7 +270,7 @@ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, */ bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, - RepOriginId *nodeid) + ReplOriginId *nodeid) { int64 pageno = TransactionIdToCTsPage(xid); int entryno = TransactionIdToCTsEntry(xid); @@ -327,7 +327,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, { *ts = 0; if (nodeid) - *nodeid = InvalidRepOriginId; + *nodeid = InvalidReplOriginId; return false; } @@ -355,7 +355,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, * as NULL if not wanted. */ TransactionId -GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid) +GetLatestCommitTsData(TimestampTz *ts, ReplOriginId *nodeid) { TransactionId xid; @@ -418,7 +418,7 @@ Datum pg_last_committed_xact(PG_FUNCTION_ARGS) { TransactionId xid; - RepOriginId nodeid; + ReplOriginId nodeid; TimestampTz ts; Datum values[3]; bool nulls[3]; @@ -462,7 +462,7 @@ Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS) { TransactionId xid = PG_GETARG_TRANSACTIONID(0); - RepOriginId nodeid; + ReplOriginId nodeid; TimestampTz ts; Datum values[2]; bool nulls[2]; @@ -568,7 +568,7 @@ CommitTsShmemInit(void) commitTsShared->xidLastCommit = InvalidTransactionId; TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); - commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId; + commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId; commitTsShared->commitTsActive = false; } else @@ -763,7 +763,7 @@ DeactivateCommitTs(void) commitTsShared->commitTsActive = false; commitTsShared->xidLastCommit = InvalidTransactionId; TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); - commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId; + commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId; TransamVariables->oldestCommitTsXid = InvalidTransactionId; TransamVariables->newestCommitTsXid = InvalidTransactionId; diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e50abb331cc..71dd3a9a7ef 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1157,7 +1157,7 @@ EndPrepare(GlobalTransaction gxact) Assert(hdr->magic == TWOPHASE_MAGIC); hdr->total_len = records.total_len + sizeof(pg_crc32c); - replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin = (replorigin_session_origin != InvalidReplOriginId && replorigin_session_origin != DoNotReplicateId); if (replorigin) @@ -1924,7 +1924,7 @@ restoreTwoPhaseData(void) continue; PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr, - InvalidXLogRecPtr, InvalidRepOriginId); + InvalidXLogRecPtr, InvalidReplOriginId); } } LWLockRelease(TwoPhaseStateLock); @@ -2330,7 +2330,7 @@ RecordTransactionCommitPrepared(TransactionId xid, * Are we using the replication origins feature? Or, in other words, are * we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin = (replorigin_session_origin != InvalidReplOriginId && replorigin_session_origin != DoNotReplicateId); /* Load the injection point before entering the critical section */ @@ -2445,7 +2445,7 @@ RecordTransactionAbortPrepared(TransactionId xid, * Are we using the replication origins feature? Or, in other words, are * we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin = (replorigin_session_origin != InvalidReplOriginId && replorigin_session_origin != DoNotReplicateId); /* @@ -2506,7 +2506,7 @@ RecordTransactionAbortPrepared(TransactionId xid, void PrepareRedoAdd(FullTransactionId fxid, char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn, - RepOriginId origin_id) + ReplOriginId origin_id) { TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf; char *bufptr; @@ -2595,7 +2595,7 @@ PrepareRedoAdd(FullTransactionId fxid, char *buf, Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; - if (origin_id != InvalidRepOriginId) + if (origin_id != InvalidReplOriginId) { /* recover apply progress */ replorigin_advance(origin_id, hdr->origin_lsn, end_lsn, diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c857e23552f..12a1505b539 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1413,7 +1413,7 @@ RecordTransactionCommit(void) * Are we using the replication origins feature? Or, in other words, * are we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin = (replorigin_session_origin != InvalidReplOriginId && replorigin_session_origin != DoNotReplicateId); /* @@ -1810,7 +1810,7 @@ RecordTransactionAbort(bool isSubXact) * Are we using the replication origins feature? Or, in other words, are * we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin = (replorigin_session_origin != InvalidReplOriginId && replorigin_session_origin != DoNotReplicateId); /* Fetch the data we need for the abort record */ @@ -5928,7 +5928,7 @@ XactLogCommitRecord(TimestampTz commit_time, } /* dump transaction origin information */ - if (replorigin_session_origin != InvalidRepOriginId) + if (replorigin_session_origin != InvalidReplOriginId) { xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; @@ -6081,7 +6081,7 @@ XactLogAbortRecord(TimestampTz abort_time, * Dump transaction origin information. We need this during recovery to * update the replication origin progress. */ - if (replorigin_session_origin != InvalidRepOriginId) + if (replorigin_session_origin != InvalidReplOriginId) { xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; @@ -6152,7 +6152,7 @@ static void xact_redo_commit(xl_xact_parsed_commit *parsed, TransactionId xid, XLogRecPtr lsn, - RepOriginId origin_id) + ReplOriginId origin_id) { TransactionId max_xid; TimestampTz commit_time; @@ -6165,7 +6165,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, AdvanceNextFullTransactionIdPastXid(max_xid); Assert(((parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == 0) == - (origin_id == InvalidRepOriginId)); + (origin_id == InvalidReplOriginId)); if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) commit_time = parsed->origin_timestamp; @@ -6304,7 +6304,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, */ static void xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid, - XLogRecPtr lsn, RepOriginId origin_id) + XLogRecPtr lsn, ReplOriginId origin_id) { TransactionId max_xid; diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index f928bc7c0ef..e2db6adc382 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -115,7 +115,7 @@ static uint8 curinsert_flags = 0; static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; -#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) +#define SizeOfXlogOrigin (sizeof(ReplOriginId) + sizeof(char)) #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char)) #define HEADER_SCRATCH_SIZE \ @@ -861,7 +861,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, /* followed by the record's origin, if any */ if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) && - replorigin_session_origin != InvalidRepOriginId) + replorigin_session_origin != InvalidReplOriginId) { *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN; memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin)); diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index c60aa9a51e9..03ada8aa0c5 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1707,7 +1707,7 @@ DecodeXLogRecord(XLogReaderState *state, decoded->header = *record; decoded->lsn = lsn; decoded->next = NULL; - decoded->record_origin = InvalidRepOriginId; + decoded->record_origin = InvalidReplOriginId; decoded->toplevel_xid = InvalidTransactionId; decoded->main_data = NULL; decoded->main_data_len = 0; @@ -1747,7 +1747,7 @@ DecodeXLogRecord(XLogReaderState *state, } else if (block_id == XLR_BLOCK_ID_ORIGIN) { - COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId)); + COPY_HEADER_FIELD(&decoded->record_origin, sizeof(ReplOriginId)); } else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) { diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d6674f20fc2..0b3c8499b49 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1897,7 +1897,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, */ if (XLogRecPtrIsValid(opts.lsn)) { - RepOriginId originid; + ReplOriginId originid; char originname[NAMEDATALEN]; XLogRecPtr remote_lsn; diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 72f2bff7708..743b1ee2b28 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -480,7 +480,7 @@ update_most_recent_deletion_info(TupleTableSlot *scanslot, TransactionId oldestxmin, TransactionId *delete_xid, TimestampTz *delete_time, - RepOriginId *delete_origin) + ReplOriginId *delete_origin) { BufferHeapTupleTableSlot *hslot; HeapTuple tuple; @@ -488,7 +488,7 @@ update_most_recent_deletion_info(TupleTableSlot *scanslot, bool recently_dead = false; TransactionId xmax; TimestampTz localts; - RepOriginId localorigin; + ReplOriginId localorigin; hslot = (BufferHeapTupleTableSlot *) scanslot; @@ -562,7 +562,7 @@ bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, - RepOriginId *delete_origin, + ReplOriginId *delete_origin, TimestampTz *delete_time) { TupleTableSlot *scanslot; @@ -574,7 +574,7 @@ RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor)); *delete_xid = InvalidTransactionId; - *delete_origin = InvalidRepOriginId; + *delete_origin = InvalidReplOriginId; *delete_time = 0; /* @@ -632,7 +632,7 @@ RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, - RepOriginId *delete_origin, + ReplOriginId *delete_origin, TimestampTz *delete_time) { Relation idxrel; @@ -649,7 +649,7 @@ RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, *delete_xid = InvalidTransactionId; *delete_time = 0; - *delete_origin = InvalidRepOriginId; + *delete_origin = InvalidReplOriginId; isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid); diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 5ebd2353fed..2f5d74180dc 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -864,7 +864,7 @@ ParallelApplyWorkerMain(Datum main_arg) shm_mq *mq; shm_mq_handle *mqh; shm_mq_handle *error_mqh; - RepOriginId originid; + ReplOriginId originid; int worker_slot = DatumGetInt32(main_arg); char originname[NAMEDATALEN]; diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 722514149fe..ca71a81c7bf 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -42,7 +42,7 @@ static void errdetail_apply_conflict(EState *estate, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, - RepOriginId localorigin, + ReplOriginId localorigin, TimestampTz localts, StringInfo err_msg); static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo, ConflictType type, char **key_desc, @@ -61,7 +61,7 @@ static char *build_index_value_desc(EState *estate, Relation localrel, */ bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, - RepOriginId *localorigin, TimestampTz *localts) + ReplOriginId *localorigin, TimestampTz *localts) { Datum xminDatum; bool isnull; @@ -77,7 +77,7 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, */ if (!track_commit_timestamp) { - *localorigin = InvalidRepOriginId; + *localorigin = InvalidReplOriginId; *localts = 0; return false; } @@ -253,7 +253,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, - RepOriginId localorigin, TimestampTz localts, + ReplOriginId localorigin, TimestampTz localts, StringInfo err_msg) { StringInfoData err_detail; @@ -292,7 +292,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, if (localts) { - if (localorigin == InvalidRepOriginId) + if (localorigin == InvalidReplOriginId) appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s"), get_rel_name(indexoid), localxmin, timestamptz_to_str(localts)); @@ -323,7 +323,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, break; case CT_UPDATE_ORIGIN_DIFFERS: - if (localorigin == InvalidRepOriginId) + if (localorigin == InvalidReplOriginId) appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s"), localxmin, timestamptz_to_str(localts)); else if (replorigin_by_oid(localorigin, true, &origin_name)) @@ -350,7 +350,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, if (localts) { - if (localorigin == InvalidRepOriginId) + if (localorigin == InvalidReplOriginId) appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s"), localxmin, timestamptz_to_str(localts)); else if (replorigin_by_oid(localorigin, true, &origin_name)) @@ -377,7 +377,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, break; case CT_DELETE_ORIGIN_DIFFERS: - if (localorigin == InvalidRepOriginId) + if (localorigin == InvalidReplOriginId) appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s"), localxmin, timestamptz_to_str(localts)); else if (replorigin_by_oid(localorigin, true, &origin_name)) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index e25dd6bc366..32af1249610 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -66,7 +66,7 @@ static inline bool FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, Oid txn_dbid, - RepOriginId origin_id); + ReplOriginId origin_id); /* * Take every XLogReadRecord()ed record and perform the actions required to @@ -566,7 +566,7 @@ FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid, } static inline bool -FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) +FilterByOrigin(LogicalDecodingContext *ctx, ReplOriginId origin_id) { if (ctx->callbacks.filter_by_origin_cb == NULL) return false; @@ -584,7 +584,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogReaderState *r = buf->record; TransactionId xid = XLogRecGetXid(r); uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; - RepOriginId origin_id = XLogRecGetOrigin(r); + ReplOriginId origin_id = XLogRecGetOrigin(r); Snapshot snapshot = NULL; xl_logical_message *message; @@ -665,7 +665,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, { XLogRecPtr origin_lsn = InvalidXLogRecPtr; TimestampTz commit_time = parsed->xact_time; - RepOriginId origin_id = XLogRecGetOrigin(buf->record); + ReplOriginId origin_id = XLogRecGetOrigin(buf->record); int i; if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) @@ -761,7 +761,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, SnapBuild *builder = ctx->snapshot_builder; XLogRecPtr origin_lsn = parsed->origin_lsn; TimestampTz prepare_time = parsed->xact_time; - RepOriginId origin_id = XLogRecGetOrigin(buf->record); + ReplOriginId origin_id = XLogRecGetOrigin(buf->record); int i; TransactionId xid = parsed->twophase_xid; @@ -837,7 +837,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, int i; XLogRecPtr origin_lsn = InvalidXLogRecPtr; TimestampTz abort_time = parsed->xact_time; - RepOriginId origin_id = XLogRecGetOrigin(buf->record); + ReplOriginId origin_id = XLogRecGetOrigin(buf->record); bool skip_xact; if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) @@ -1289,7 +1289,7 @@ DecodeXLogTuple(char *data, Size len, HeapTuple tuple) */ static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - Oid txn_dbid, RepOriginId origin_id) + Oid txn_dbid, ReplOriginId origin_id) { if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index b0ef1a12520..85060d19a49 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1187,7 +1187,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, } bool -filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) +filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, ReplOriginId origin_id) { LogicalErrorCallbackState state; ErrorContextCallback errcallback; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 13808a4674b..d91be9ebc18 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -111,7 +111,7 @@ typedef struct ReplicationState /* * Local identifier for the remote node. */ - RepOriginId roident; + ReplOriginId roident; /* * Location of the latest commit from the remote side. @@ -149,7 +149,7 @@ typedef struct ReplicationState */ typedef struct ReplicationStateOnDisk { - RepOriginId roident; + ReplOriginId roident; XLogRecPtr remote_lsn; } ReplicationStateOnDisk; @@ -163,7 +163,7 @@ typedef struct ReplicationStateCtl } ReplicationStateCtl; /* external variables */ -RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */ +ReplOriginId replorigin_session_origin = InvalidReplOriginId; /* assumed identity */ XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr; TimestampTz replorigin_session_origin_timestamp = 0; @@ -225,7 +225,7 @@ IsReservedOriginName(const char *name) * * Returns InvalidOid if the node isn't known yet and missing_ok is true. */ -RepOriginId +ReplOriginId replorigin_by_name(const char *roname, bool missing_ok) { Form_pg_replication_origin ident; @@ -256,7 +256,7 @@ replorigin_by_name(const char *roname, bool missing_ok) * * Needs to be called in a transaction. */ -RepOriginId +ReplOriginId replorigin_create(const char *roname) { Oid roident; @@ -369,7 +369,7 @@ replorigin_create(const char *roname) * Helper function to drop a replication origin. */ static void -replorigin_state_clear(RepOriginId roident, bool nowait) +replorigin_state_clear(ReplOriginId roident, bool nowait) { int i; @@ -426,7 +426,7 @@ restart: } /* then clear the in-memory slot */ - state->roident = InvalidRepOriginId; + state->roident = InvalidReplOriginId; state->remote_lsn = InvalidXLogRecPtr; state->local_lsn = InvalidXLogRecPtr; break; @@ -444,7 +444,7 @@ restart: void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait) { - RepOriginId roident; + ReplOriginId roident; Relation rel; HeapTuple tuple; @@ -496,13 +496,13 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait) * Returns true if the origin is known, false otherwise. */ bool -replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname) +replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname) { HeapTuple tuple; Form_pg_replication_origin ric; Assert(OidIsValid((Oid) roident)); - Assert(roident != InvalidRepOriginId); + Assert(roident != InvalidReplOriginId); Assert(roident != DoNotReplicateId); tuple = SearchSysCache1(REPLORIGIDENT, @@ -656,7 +656,7 @@ CheckPointReplicationOrigin(void) ReplicationState *curstate = &replication_states[i]; XLogRecPtr local_lsn; - if (curstate->roident == InvalidRepOriginId) + if (curstate->roident == InvalidReplOriginId) continue; /* zero, to avoid uninitialized padding bytes */ @@ -884,7 +884,7 @@ replorigin_redo(XLogReaderState *record) if (state->roident == xlrec->node_id) { /* reset entry */ - state->roident = InvalidRepOriginId; + state->roident = InvalidReplOriginId; state->remote_lsn = InvalidXLogRecPtr; state->local_lsn = InvalidXLogRecPtr; break; @@ -913,7 +913,7 @@ replorigin_redo(XLogReaderState *record) * unless running in recovery. */ void -replorigin_advance(RepOriginId node, +replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log) { @@ -921,7 +921,7 @@ replorigin_advance(RepOriginId node, ReplicationState *replication_state = NULL; ReplicationState *free_state = NULL; - Assert(node != InvalidRepOriginId); + Assert(node != InvalidReplOriginId); /* we don't track DoNotReplicateId */ if (node == DoNotReplicateId) @@ -946,7 +946,7 @@ replorigin_advance(RepOriginId node, ReplicationState *curstate = &replication_states[i]; /* remember where to insert if necessary */ - if (curstate->roident == InvalidRepOriginId && + if (curstate->roident == InvalidReplOriginId && free_state == NULL) { free_state = curstate; @@ -997,7 +997,7 @@ replorigin_advance(RepOriginId node, replication_state->roident = node; } - Assert(replication_state->roident != InvalidRepOriginId); + Assert(replication_state->roident != InvalidReplOriginId); /* * If somebody "forcefully" sets this slot, WAL log it, so it's durable @@ -1042,7 +1042,7 @@ replorigin_advance(RepOriginId node, XLogRecPtr -replorigin_get_progress(RepOriginId node, bool flush) +replorigin_get_progress(ReplOriginId node, bool flush) { int i; XLogRecPtr local_lsn = InvalidXLogRecPtr; @@ -1141,7 +1141,7 @@ ReplicationOriginExitCleanup(int code, Datum arg) * acquired_by = PID of the first process. */ void -replorigin_session_setup(RepOriginId node, int acquired_by) +replorigin_session_setup(ReplOriginId node, int acquired_by) { static bool registered_cleanup; int i; @@ -1172,7 +1172,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by) ReplicationState *curstate = &replication_states[i]; /* remember where to insert if necessary */ - if (curstate->roident == InvalidRepOriginId && + if (curstate->roident == InvalidReplOriginId && free_slot == -1) { free_slot = i; @@ -1239,7 +1239,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by) } - Assert(session_replication_state->roident != InvalidRepOriginId); + Assert(session_replication_state->roident != InvalidReplOriginId); if (acquired_by == 0) { @@ -1308,7 +1308,7 @@ void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit) { Assert(session_replication_state != NULL); - Assert(session_replication_state->roident != InvalidRepOriginId); + Assert(session_replication_state->roident != InvalidReplOriginId); LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE); if (session_replication_state->local_lsn < local_commit) @@ -1358,7 +1358,7 @@ Datum pg_replication_origin_create(PG_FUNCTION_ARGS) { char *name; - RepOriginId roident; + ReplOriginId roident; replorigin_check_prerequisites(false, false); @@ -1418,7 +1418,7 @@ Datum pg_replication_origin_oid(PG_FUNCTION_ARGS) { char *name; - RepOriginId roident; + ReplOriginId roident; replorigin_check_prerequisites(false, false); @@ -1439,7 +1439,7 @@ Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS) { char *name; - RepOriginId origin; + ReplOriginId origin; int pid; replorigin_check_prerequisites(true, false); @@ -1466,7 +1466,7 @@ pg_replication_origin_session_reset(PG_FUNCTION_ARGS) replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin = InvalidReplOriginId; replorigin_session_origin_lsn = InvalidXLogRecPtr; replorigin_session_origin_timestamp = 0; @@ -1481,7 +1481,7 @@ pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS) { replorigin_check_prerequisites(false, false); - PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId); + PG_RETURN_BOOL(replorigin_session_origin != InvalidReplOriginId); } @@ -1548,7 +1548,7 @@ pg_replication_origin_advance(PG_FUNCTION_ARGS) { text *name = PG_GETARG_TEXT_PP(0); XLogRecPtr remote_commit = PG_GETARG_LSN(1); - RepOriginId node; + ReplOriginId node; replorigin_check_prerequisites(true, false); @@ -1583,7 +1583,7 @@ pg_replication_origin_progress(PG_FUNCTION_ARGS) { char *name; bool flush; - RepOriginId roident; + ReplOriginId roident; XLogRecPtr remote_lsn = InvalidXLogRecPtr; replorigin_check_prerequisites(true, true); @@ -1633,7 +1633,7 @@ pg_show_replication_origin_status(PG_FUNCTION_ARGS) state = &replication_states[i]; /* unused slot, nothing to display */ - if (state->roident == InvalidRepOriginId) + if (state->roident == InvalidReplOriginId) continue; memset(values, 0, sizeof(values)); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index a0293f6ec7c..d84fa120b9f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2824,7 +2824,7 @@ ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) + ReplOriginId origin_id, XLogRecPtr origin_lsn) { Snapshot snapshot_now; CommandId command_id = FirstCommandId; @@ -2884,7 +2884,7 @@ void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) + ReplOriginId origin_id, XLogRecPtr origin_lsn) { ReorderBufferTXN *txn; @@ -2907,7 +2907,7 @@ bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) + ReplOriginId origin_id, XLogRecPtr origin_lsn) { ReorderBufferTXN *txn; @@ -3001,7 +3001,7 @@ void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, - TimestampTz commit_time, RepOriginId origin_id, + TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit) { ReorderBufferTXN *txn; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 67e57520386..c3e24ca4305 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -323,7 +323,7 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn) * This is needed to allow the origin to be dropped. */ replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin = InvalidReplOriginId; replorigin_session_origin_lsn = InvalidXLogRecPtr; replorigin_session_origin_timestamp = 0; @@ -1226,7 +1226,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) AclResult aclresult; WalRcvExecResult *res; char originname[NAMEDATALEN]; - RepOriginId originid; + ReplOriginId originid; UserContext ucxt; bool must_use_password; bool run_as_owner; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ad281e7069b..c66ca54d003 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -608,7 +608,7 @@ static bool FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, - RepOriginId *delete_origin, + ReplOriginId *delete_origin, TimestampTz *delete_time); static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, @@ -3268,7 +3268,7 @@ IsIndexUsableForFindingDeletedTuple(Oid localindexoid, static bool FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, - TransactionId *delete_xid, RepOriginId *delete_origin, + TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time) { TransactionId oldestxmin; @@ -5627,7 +5627,7 @@ run_apply_worker(void) XLogRecPtr origin_startpos = InvalidXLogRecPtr; char *slotname = NULL; WalRcvStreamOptions options; - RepOriginId originid; + ReplOriginId originid; TimeLineID startpointTLI; char *err; bool must_use_password; @@ -5874,7 +5874,7 @@ InitializeLogRepWorker(void) static void replorigin_reset(int code, Datum arg) { - replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin = InvalidReplOriginId; replorigin_session_origin_lsn = InvalidXLogRecPtr; replorigin_session_origin_timestamp = 0; } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9ee8949e040..e016f64e0b3 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -59,7 +59,7 @@ static void pgoutput_message(LogicalDecodingContext *ctx, bool transactional, const char *prefix, Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, - RepOriginId origin_id); + ReplOriginId origin_id); static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, @@ -89,7 +89,7 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_repl_origin(LogicalDecodingContext *ctx, - RepOriginId origin_id, XLogRecPtr origin_lsn, + ReplOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); /* @@ -609,7 +609,7 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { - bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + bool send_replication_origin = txn->origin_id != InvalidReplOriginId; PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; Assert(txndata); @@ -663,7 +663,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { - bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + bool send_replication_origin = txn->origin_id != InvalidReplOriginId; OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin_prepare(ctx->out, txn); @@ -1767,11 +1767,11 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, - RepOriginId origin_id) + ReplOriginId origin_id) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; - if (data->publish_no_origin && origin_id != InvalidRepOriginId) + if (data->publish_no_origin && origin_id != InvalidReplOriginId) return true; return false; @@ -1841,7 +1841,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; - bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + bool send_replication_origin = txn->origin_id != InvalidReplOriginId; /* we can't nest streaming of transactions */ Assert(!data->in_streaming); @@ -2457,7 +2457,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) /* Send Replication origin */ static void -send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, +send_repl_origin(LogicalDecodingContext *ctx, ReplOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin) { if (send_origin) diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index 8953a17753e..697143aec44 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -372,7 +372,7 @@ binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS) Oid subid; char *subname; char originname[NAMEDATALEN]; - RepOriginId node; + ReplOriginId node; XLogRecPtr remote_commit; CHECK_IS_BINARY_UPGRADE; diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index c8ace1c7732..2bc84505aab 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -1103,7 +1103,7 @@ check_subscriber(const struct LogicalRepInfo *dbinfo) bool failed = false; int max_lrworkers; - int max_reporigins; + int max_replorigins; int max_wprocs; pg_log_info("checking settings on subscriber"); @@ -1142,7 +1142,7 @@ check_subscriber(const struct LogicalRepInfo *dbinfo) disconnect_database(conn, true); } - max_reporigins = atoi(PQgetvalue(res, 0, 0)); + max_replorigins = atoi(PQgetvalue(res, 0, 0)); max_lrworkers = atoi(PQgetvalue(res, 1, 0)); max_wprocs = atoi(PQgetvalue(res, 2, 0)); if (strcmp(PQgetvalue(res, 3, 0), "") != 0) @@ -1150,7 +1150,7 @@ check_subscriber(const struct LogicalRepInfo *dbinfo) pg_log_debug("subscriber: max_logical_replication_workers: %d", max_lrworkers); - pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins); + pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins); pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs); if (primary_slot_name) pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name); @@ -1159,10 +1159,10 @@ check_subscriber(const struct LogicalRepInfo *dbinfo) disconnect_database(conn, false); - if (max_reporigins < num_dbs) + if (max_replorigins < num_dbs) { pg_log_error("subscriber requires %d active replication origins, but only %d remain", - num_dbs, max_reporigins); + num_dbs, max_replorigins); pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.", "max_active_replication_origins", num_dbs); failed = true; diff --git a/src/bin/pg_upgrade/t/004_subscription.pl b/src/bin/pg_upgrade/t/004_subscription.pl index 3a8c8b88976..f68821df2a3 100644 --- a/src/bin/pg_upgrade/t/004_subscription.pl +++ b/src/bin/pg_upgrade/t/004_subscription.pl @@ -175,9 +175,9 @@ $old_sub->safe_psql('postgres', ); my $sub_oid = $old_sub->safe_psql('postgres', "SELECT oid FROM pg_subscription WHERE subname = 'regress_sub3'"); -my $reporigin = 'pg_' . qq($sub_oid); +my $replorigin = 'pg_' . qq($sub_oid); $old_sub->safe_psql('postgres', - "SELECT pg_replication_origin_drop('$reporigin')"); + "SELECT pg_replication_origin_drop('$replorigin')"); $old_sub->stop; diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h index bc3b81687b1..49ee21cd5d2 100644 --- a/src/include/access/commit_ts.h +++ b/src/include/access/commit_ts.h @@ -21,11 +21,11 @@ extern PGDLLIMPORT bool track_commit_timestamp; extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, - RepOriginId nodeid); + ReplOriginId nodeid); extern bool TransactionIdGetCommitTsData(TransactionId xid, - TimestampTz *ts, RepOriginId *nodeid); + TimestampTz *ts, ReplOriginId *nodeid); extern TransactionId GetLatestCommitTsData(TimestampTz *ts, - RepOriginId *nodeid); + ReplOriginId *nodeid); extern Size CommitTsShmemSize(void); extern void CommitTsShmemInit(void); diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index d6059919c85..e312514ba87 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -58,7 +58,7 @@ extern void FinishPreparedTransaction(const char *gid, bool isCommit); extern void PrepareRedoAdd(FullTransactionId fxid, char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn, - RepOriginId origin_id); + ReplOriginId origin_id); extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index 1dfed85c0a3..f896dbe149f 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -66,7 +66,7 @@ typedef uint32 TimeLineID; * Replication origin id - this is located in this file to avoid having to * include origin.h in a bunch of xlog related places. */ -typedef uint16 RepOriginId; +typedef uint16 ReplOriginId; /* * This chunk of hackery attempts to determine which file sync methods diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 9b63b6aff75..80f1a548e08 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -163,7 +163,7 @@ typedef struct DecodedXLogRecord XLogRecPtr lsn; /* location */ XLogRecPtr next_lsn; /* location of next record */ XLogRecord header; /* header */ - RepOriginId record_origin; + ReplOriginId record_origin; TransactionId toplevel_xid; /* XID of top-level transaction */ char *main_data; /* record's main data portion */ uint32 main_data_len; /* main data portion's length */ diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 5929aabc353..55a7d930d26 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -769,13 +769,13 @@ extern bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, - RepOriginId *delete_origin, + ReplOriginId *delete_origin, TimestampTz *delete_time); extern bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, - RepOriginId *delete_origin, + ReplOriginId *delete_origin, TimestampTz *delete_time); extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot); diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 2de7b624eb2..1cade336c91 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -74,14 +74,14 @@ typedef struct ConflictTupleInfo * occurred */ TransactionId xmin; /* transaction ID of the modification causing * the conflict */ - RepOriginId origin; /* origin identifier of the modification */ + ReplOriginId origin; /* origin identifier of the modification */ TimestampTz ts; /* timestamp of when the modification on the * conflicting local row occurred */ } ConflictTupleInfo; extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, - RepOriginId *localorigin, + ReplOriginId *localorigin, TimestampTz *localts); extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 5b43e181135..7f03537bda7 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -144,7 +144,7 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); -extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); +extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, ReplOriginId origin_id); extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index 1da77363955..63e198e29a8 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -18,19 +18,19 @@ typedef struct xl_replorigin_set { XLogRecPtr remote_lsn; - RepOriginId node_id; + ReplOriginId node_id; bool force; } xl_replorigin_set; typedef struct xl_replorigin_drop { - RepOriginId node_id; + ReplOriginId node_id; } xl_replorigin_drop; #define XLOG_REPLORIGIN_SET 0x00 #define XLOG_REPLORIGIN_DROP 0x10 -#define InvalidRepOriginId 0 +#define InvalidReplOriginId 0 #define DoNotReplicateId PG_UINT16_MAX /* @@ -40,7 +40,7 @@ typedef struct xl_replorigin_drop */ #define MAX_RONAME_LEN 512 -extern PGDLLIMPORT RepOriginId replorigin_session_origin; +extern PGDLLIMPORT ReplOriginId replorigin_session_origin; extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn; extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; @@ -48,22 +48,22 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; extern PGDLLIMPORT int max_active_replication_origins; /* API for querying & manipulating replication origins */ -extern RepOriginId replorigin_by_name(const char *roname, bool missing_ok); -extern RepOriginId replorigin_create(const char *roname); +extern ReplOriginId replorigin_by_name(const char *roname, bool missing_ok); +extern ReplOriginId replorigin_create(const char *roname); extern void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait); -extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok, +extern bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname); /* API for querying & manipulating replication progress tracking */ -extern void replorigin_advance(RepOriginId node, +extern void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log); -extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush); +extern XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush); extern void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit); -extern void replorigin_session_setup(RepOriginId node, int acquired_by); +extern void replorigin_session_setup(ReplOriginId node, int acquired_by); extern void replorigin_session_reset(void); extern XLogRecPtr replorigin_session_get_progress(bool flush); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index c7981a56209..842fcde67f9 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -94,7 +94,7 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, * Filter changes by origin. */ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, - RepOriginId origin_id); + ReplOriginId origin_id); /* * Called to shutdown an output plugin. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 314e35592c0..2d717a9e152 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -83,7 +83,7 @@ typedef struct ReorderBufferChange /* Transaction this change belongs to. */ struct ReorderBufferTXN *txn; - RepOriginId origin_id; + ReplOriginId origin_id; /* * Context data for the change. Which part of the union is valid depends @@ -347,7 +347,7 @@ typedef struct ReorderBufferTXN XLogRecPtr restart_decoding_lsn; /* origin of the change that caused this transaction */ - RepOriginId origin_id; + ReplOriginId origin_id; XLogRecPtr origin_lsn; /* @@ -724,12 +724,12 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Size message_size, const char *message); extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); + TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn); extern void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn, + ReplOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit); extern void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn); @@ -768,7 +768,7 @@ extern bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid extern bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, - RepOriginId origin_id, XLogRecPtr origin_lsn); + ReplOriginId origin_id, XLogRecPtr origin_lsn); extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid); extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid); extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *rb); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ddbe4c64971..7ad16c8ad23 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2575,8 +2575,8 @@ ReorderBufferTupleCidEnt ReorderBufferTupleCidKey ReorderBufferUpdateProgressTxnCB ReorderTuple -RepOriginId ReparameterizeForeignPathByChild_function +ReplOriginId ReplaceVarsFromTargetList_context ReplaceVarsNoMatchOption ReplaceWrapOption