From c63724ab3299d04db4dfa88e277198cab95532f1 Mon Sep 17 00:00:00 2001 From: looplocked Date: Wed, 29 Jul 2020 16:53:06 +0800 Subject: [PATCH] merge logical replication code --- contrib/mppdb_decoding/mppdb_decoding.cpp | 2 - contrib/test_decoding/test_decoding.cpp | 2 - src/bin/gs_guc/cluster_guc.conf | 6 + src/common/backend/utils/error/elog.cpp | 4 + src/common/backend/utils/misc/guc.cpp | 25 +-- .../backend/utils/misc/postgresql.conf.sample | 6 + .../process/postmaster/postmaster.cpp | 6 +- .../storage/access/transam/transam.cpp | 2 - .../storage/replication/dataqueue.cpp | 25 +-- .../storage/replication/datarcvwriter.cpp | 8 +- .../storage/replication/datareceiver.cpp | 85 +++++----- .../storage/replication/datasyncrep.cpp | 4 +- .../storage/replication/logical/decode.cpp | 5 +- .../replication/logical/logicalfuncs.cpp | 2 +- .../replication/logical/reorderbuffer.cpp | 31 ++-- .../storage/replication/logical/snapbuild.cpp | 17 +- .../storage/replication/slotfuncs.cpp | 10 -- .../storage/replication/walreceiver.cpp | 73 +++++---- .../storage/replication/walsender.cpp | 149 +++++++++++++++--- .../knl/knl_guc/knl_instance_attr_storage.h | 3 +- src/include/knl/knl_thread.h | 3 + src/include/replication/logicalfuncs.h | 1 + src/include/replication/reorderbuffer.h | 2 +- src/test/regress/expected/rangefuncs.out | 3 +- .../regress/output/recovery_2pc_tools.source | 1 + 25 files changed, 312 insertions(+), 163 deletions(-) diff --git a/contrib/mppdb_decoding/mppdb_decoding.cpp b/contrib/mppdb_decoding/mppdb_decoding.cpp index 3d77e5d91..f28a909fd 100755 --- a/contrib/mppdb_decoding/mppdb_decoding.cpp +++ b/contrib/mppdb_decoding/mppdb_decoding.cpp @@ -50,8 +50,6 @@ PG_MODULE_MAGIC; /* These must be available to pg_dlsym() */ -extern "C" PG_FUNCTION_INFO_V1(_PG_init); -extern "C" PG_FUNCTION_INFO_V1(_PG_output_plugin_init); extern "C" void _PG_init(void); extern "C" void _PG_output_plugin_init(OutputPluginCallbacks* cb); diff --git a/contrib/test_decoding/test_decoding.cpp b/contrib/test_decoding/test_decoding.cpp index 28eeaed0e..b341453c2 100644 --- a/contrib/test_decoding/test_decoding.cpp +++ b/contrib/test_decoding/test_decoding.cpp @@ -33,8 +33,6 @@ PG_MODULE_MAGIC; /* These must be available to pg_dlsym() */ -extern "C" PG_FUNCTION_INFO_V1(_PG_init); -extern "C" PG_FUNCTION_INFO_V1(_PG_output_plugin_init); extern "C" void _PG_init(void); extern "C" void _PG_output_plugin_init(OutputPluginCallbacks* cb); diff --git a/src/bin/gs_guc/cluster_guc.conf b/src/bin/gs_guc/cluster_guc.conf index 86863b7cd..b9ad7f149 100644 --- a/src/bin/gs_guc/cluster_guc.conf +++ b/src/bin/gs_guc/cluster_guc.conf @@ -303,6 +303,10 @@ max_process_memory|int|2097152,2147483647|kB|NULL| session_statistics_memory|int|5120,1073741823|kB|NULL| session_history_memory|int|10240,1073741823|kB|NULL| max_query_retry_times|int|0,20|NULL|NULL| +max_replication_slots|int|0,262143|NULL|NULL| +enable_slot_log|bool|0,0|NULL|NULL| +max_changes_in_memory|int|1,2147483647|NULL|NULL| +max_cached_tuplebufs|int|1,2147483647|NULL|NULL| max_stack_depth|int|100,2147483647|kB|NULL| max_standby_archive_delay|int|-1,2147483647|ms|'-1' means to permit backup machine waits until the query of conflict is completed.| max_standby_streaming_delay|int|-1,2147483647|ms|NULL| @@ -519,6 +523,7 @@ numa_distribute_mode|string|0,0|NULL|NULL| defer_csn_cleanup_time|int|0,2147483647|ms|NULL| tcp_recv_timeout|int|0,86400|s|Specify the receiving timeouts until reporting an error.| max_inner_tool_connections|int|1,8388607|NULL|NULL| +max_keep_log_seg|int|0,2147483647|NULL|NULL| [gtm] nodename|string|0,0|NULL|Name of this GTM/GTM-Standby.| port|int|1,65535|NULL|Listen Port of GTM or GTM standby server.| @@ -658,6 +663,7 @@ cstore_buffers|int|16384,1073741823|kB|NULL| udfworkermemhardlimit|int|0,2147483647|kB|Sets the hard memory limit to be used for fenced UDF.| wal_buffers|int|-1,262143|kB|Every time a transaction is committed, the contents of WAL buffers are written to disk, it is set to a large value will not bring significant performance gains. If you set it to hundreds of megabytes, you may have written to the disk to improve performance on the server a lot of real-time transaction commits. According to experience, the default value is sufficient for most situations.| max_wal_senders|int|0,8388607|NULL|Check whether the new value of max_wal_senders is less than max_connections and wal_level is archive or hot_standby, otherwise the gaussdb will start failed.| +max_replication_slots|int|0,262143|NULL|NULL| autovacuum_freeze_max_age|int64|100000,576460752303423487|NULL|NULL| autovacuum_max_workers|int|0,8388607|NULL|NULL| track_activity_query_size|int|100,102400|NULL|NULL| diff --git a/src/common/backend/utils/error/elog.cpp b/src/common/backend/utils/error/elog.cpp index d65b4bd89..749b7ddce 100644 --- a/src/common/backend/utils/error/elog.cpp +++ b/src/common/backend/utils/error/elog.cpp @@ -92,6 +92,7 @@ #include #include "tcop/stmt_retry.h" +#include "replication/walsender.h" #undef _ #define _(x) err_gettext(x) @@ -3300,6 +3301,9 @@ static void send_message_to_frontend(ErrorData* edata) if (edata->elevel == FATAL) t_thrd.log_cxt.flush_message_immediately = true; } + if (AM_WAL_DB_SENDER) { + ReadyForQuery((CommandDest)t_thrd.postgres_cxt.whereToSendOutput); + } } /* diff --git a/src/common/backend/utils/misc/guc.cpp b/src/common/backend/utils/misc/guc.cpp index cfa110e6a..ddd4a2e65 100644 --- a/src/common/backend/utils/misc/guc.cpp +++ b/src/common/backend/utils/misc/guc.cpp @@ -3560,8 +3560,6 @@ static void init_configure_names_bool() NULL }, #endif -#ifdef ENABLE_MULTIPLE_NODES - { { "enable_slot_log", @@ -3576,7 +3574,6 @@ static void init_configure_names_bool() NULL, NULL }, -#endif #ifdef ENABLE_MULTIPLE_NODES { { @@ -6435,7 +6432,6 @@ static void init_configure_names_int() NULL, NULL }, -#ifdef ENABLE_MULTIPLE_NODES { /* see max_connections */ { @@ -6453,7 +6449,6 @@ static void init_configure_names_int() NULL, NULL }, -#endif { { "recovery_time_target", @@ -8804,7 +8799,6 @@ static void init_configure_names_int() NULL, NULL }, -#ifdef ENABLE_MULTIPLE_NODES { { "max_changes_in_memory", @@ -8821,8 +8815,6 @@ static void init_configure_names_int() NULL, NULL }, -#endif -#ifdef ENABLE_MULTIPLE_NODES { { "max_cached_tuplebufs", @@ -8839,7 +8831,6 @@ static void init_configure_names_int() NULL, NULL }, -#endif { { "table_skewness_warning_rows", @@ -9075,6 +9066,22 @@ static void init_configure_names_int() NULL, NULL }, + { + { + "max_keep_log_seg", + PGC_SUSET, + WAL, + gettext_noop("Sets the threshold for implementing logical replication flow control."), + NULL + }, + &g_instance.attr.attr_storage.max_keep_log_seg, + 0, + 0, + INT_MAX, + NULL, + NULL, + NULL + }, /* End-of-list marker */ { { diff --git a/src/common/backend/utils/misc/postgresql.conf.sample b/src/common/backend/utils/misc/postgresql.conf.sample index 732d146c5..e44025b79 100755 --- a/src/common/backend/utils/misc/postgresql.conf.sample +++ b/src/common/backend/utils/misc/postgresql.conf.sample @@ -258,6 +258,12 @@ max_wal_senders = 4 # max number of walsender processes # (change requires restart) wal_keep_segments = 16 # in logfile segments, 16MB each; 0 disables #wal_sender_timeout = 6s # in milliseconds; 0 disables +enable_slot_log = off +max_replication_slots = 8 # max number of replication slots.i + # The value is classically set to 8. + # (change requires restart) +#max_changes_in_memory = 4096 +#max_cached_tuplebufs = 8192 #replconninfo1 = '' # replication connection information used to connect primary on standby, or standby on primary, # or connect primary or standby on secondary diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 5ca52500c..95dcfea0e 100755 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -10207,8 +10207,12 @@ Datum disable_conn(PG_FUNCTION_ARGS) ereport( ERROR, (errcode(ERRCODE_INVALID_ATTRIBUTE), errmsg("Invalid null pointer attribute for disable_conn()"))); } - char* host; + if (!superuser()) { + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser account to perform disable_conn()"))); + } + char* host; const char* disconn_mode = TextDatumGetCString(arg0); ValidateName(disconn_mode); diff --git a/src/gausskernel/storage/access/transam/transam.cpp b/src/gausskernel/storage/access/transam/transam.cpp index a8ea8e94a..39b6e9b93 100755 --- a/src/gausskernel/storage/access/transam/transam.cpp +++ b/src/gausskernel/storage/access/transam/transam.cpp @@ -35,8 +35,6 @@ #include "utils/builtins.h" #endif -extern bool IsPostmasterEnvironment; - static CLogXidStatus TransactionLogFetch(TransactionId transactionId); #ifdef PGXC diff --git a/src/gausskernel/storage/replication/dataqueue.cpp b/src/gausskernel/storage/replication/dataqueue.cpp index 904c9edf7..4704b01ea 100755 --- a/src/gausskernel/storage/replication/dataqueue.cpp +++ b/src/gausskernel/storage/replication/dataqueue.cpp @@ -254,7 +254,7 @@ DataQueuePtr PushToSenderQueue(const RelFileNode& rnode, BlockNumber blockNum, S /* structure the data_header. */ RelFileNodeRelCopy(data_header.rnode, rnode); data_header.blocknum = blockNum; - data_header.attid = attid | ((uint32)(rnode.bucketNode + 1) << 16); + data_header.attid = (int)((uint32)attid | ((uint32)(rnode.bucketNode + 1) << 16)); data_header.type = type; data_header.data_size = data_len; @@ -703,7 +703,7 @@ void PushCUToDataQueue(Relation rel, int col, const char* mem, _in_ uint64 offse /* read current bcm block */ cuSliceOffset = offset; - curBcmBlock = CSTORE_OFFSET_TO_BCMBLOCK(cuSliceOffset); + curBcmBlock = (BlockNumber)CSTORE_OFFSET_TO_BCMBLOCK(cuSliceOffset); nextBcmBlock = curBcmBlock; BCM_CStore_pin(rel, col, cuSliceOffset, &bcmbuffer); LockBuffer(bcmbuffer, BUFFER_LOCK_EXCLUSIVE); @@ -724,7 +724,7 @@ void PushCUToDataQueue(Relation rel, int col, const char* mem, _in_ uint64 offse BCMSetStatusBit(rel, cuBlock, bcmbuffer, NOTSYNCED, col); cuSliceOffset += ALIGNOF_CUSIZE; - nextBcmBlock = CSTORE_OFFSET_TO_BCMBLOCK(cuSliceOffset); + nextBcmBlock = (BlockNumber)CSTORE_OFFSET_TO_BCMBLOCK(cuSliceOffset); } while (cuSliceOffset < offset + size); UnlockReleaseBuffer(bcmbuffer); @@ -883,7 +883,8 @@ static void PushToBCMElementArray(const RelFileNode& rnode, BlockNumber blockNum RelFileNodeRelCopy(t_thrd.dataqueue_cxt.BCMElementArray[array_index].rnode, rnode); t_thrd.dataqueue_cxt.BCMElementArray[array_index].blocknum = blockNum; - t_thrd.dataqueue_cxt.BCMElementArray[array_index].attid = attid | ((uint32)(rnode.bucketNode + 1) << 16); + t_thrd.dataqueue_cxt.BCMElementArray[array_index].attid = + (int)((uint32)attid | ((uint32)(rnode.bucketNode + 1) << 16)); t_thrd.dataqueue_cxt.BCMElementArray[array_index].type = type; t_thrd.dataqueue_cxt.BCMElementArray[array_index].offset = offset; t_thrd.dataqueue_cxt.BCMElementArray[array_index].data_size = data_len; @@ -955,7 +956,7 @@ static void ClearBCMStatus(uint32 first, uint32 end) bcmhdr.blocknum, bcmhdr.offset, bcmhdr.data_size, - GETATTID(bcmhdr.attid)))); + (int)GETATTID((uint32)bcmhdr.attid)))); } ereport(DEBUG5, @@ -967,7 +968,7 @@ static void ClearBCMStatus(uint32 first, uint32 end) bcmhdr.blocknum, bcmhdr.offset, bcmhdr.data_size, - GETATTID(bcmhdr.attid)))); + (int)GETATTID((uint32)bcmhdr.attid)))); if (bcmhdr.type == ROW_STORE) { Buffer buffer; @@ -1000,7 +1001,7 @@ static void ClearBCMStatus(uint32 first, uint32 end) cuSliceOffset = bcmhdr.offset; curBcmBlock = CSTORE_OFFSET_TO_BCMBLOCK(cuSliceOffset); nextBcmBlock = curBcmBlock; - BCM_CStore_pin(relation, GETATTID(bcmhdr.attid), cuSliceOffset, &bcmbuffer); + BCM_CStore_pin(relation, (int)GETATTID((uint32)bcmhdr.attid), cuSliceOffset, &bcmbuffer); LockBuffer(bcmbuffer, BUFFER_LOCK_EXCLUSIVE); do { @@ -1011,12 +1012,12 @@ static void ClearBCMStatus(uint32 first, uint32 end) /* release last bcm block and read in the next one */ UnlockReleaseBuffer(bcmbuffer); - BCM_CStore_pin(relation, GETATTID(bcmhdr.attid), cuSliceOffset, &bcmbuffer); + BCM_CStore_pin(relation, (int)GETATTID((uint32)bcmhdr.attid), cuSliceOffset, &bcmbuffer); LockBuffer(bcmbuffer, BUFFER_LOCK_EXCLUSIVE); } cuBlock = CSTORE_OFFSET_TO_CSTOREBLOCK(cuSliceOffset); - BCMSetStatusBit(relation, cuBlock, bcmbuffer, SYNCED, GETATTID(bcmhdr.attid)); + BCMSetStatusBit(relation, cuBlock, bcmbuffer, SYNCED, (int)GETATTID((uint32)bcmhdr.attid)); cuSliceOffset += ALIGNOF_CUSIZE; nextBcmBlock = CSTORE_OFFSET_TO_BCMBLOCK(cuSliceOffset); @@ -1025,16 +1026,16 @@ static void ClearBCMStatus(uint32 first, uint32 end) UnlockReleaseBuffer(bcmbuffer); /* record one log_cu_bcm xlog */ - BCMLogCU(relation, bcmhdr.offset, GETATTID(bcmhdr.attid), SYNCED, cuUnitCount); + BCMLogCU(relation, bcmhdr.offset, (int)GETATTID((uint32)bcmhdr.attid), SYNCED, cuUnitCount); if (u_sess->attr.attr_storage.HaModuleDebug) { ereport(LOG, - (errmsg("HA-ClearBCMStatus: rnode %u/%u/%u, col %d, blockno %lu " + (errmsg("HA-ClearBCMStatus: rnode %u/%u/%u, col %u, blockno %lu " "cuUnitCount %u, status %u", relation->rd_node.spcNode, relation->rd_node.dbNode, relation->rd_node.relNode, - GETATTID(bcmhdr.attid), + GETATTID((uint)bcmhdr.attid), bcmhdr.offset / ALIGNOF_CUSIZE, bcmhdr.data_size / ALIGNOF_CUSIZE, SYNCED))); diff --git a/src/gausskernel/storage/replication/datarcvwriter.cpp b/src/gausskernel/storage/replication/datarcvwriter.cpp index 7108d1514..b071c6393 100755 --- a/src/gausskernel/storage/replication/datarcvwriter.cpp +++ b/src/gausskernel/storage/replication/datarcvwriter.cpp @@ -449,7 +449,7 @@ uint32 DoDataWrite(char* buf, uint32 nbytes) errorno = memcpy_s((void*)&datahdr, headerlen, buf, headerlen); securec_check(errorno, "", ""); RelFileNodeCopy(curnode, datahdr.rnode, GETBUCKETID(datahdr.attid)); - curattid = GETATTID(datahdr.attid); + curattid = (int)GETATTID((uint32)datahdr.attid); buf += headerlen; if (!g_instance.attr.attr_storage.enable_mix_replication) { @@ -475,7 +475,7 @@ uint32 DoDataWrite(char* buf, uint32 nbytes) datahdr.data_size, datahdr.queue_offset.queueid, datahdr.queue_offset.queueoff, - GETATTID(datahdr.attid)))); + (int)GETATTID((uint32)datahdr.attid)))); #ifdef DATA_DEBUG INIT_CRC32(crc); @@ -666,12 +666,12 @@ uint32 DoDataWrite(char* buf, uint32 nbytes) if (u_sess->attr.attr_storage.HaModuleDebug) { check_cu_block(buf, datahdr.data_size); ereport(LOG, - (errmsg("HA-DoDataWrite: rnode %u/%u/%u, col %d, " + (errmsg("HA-DoDataWrite: rnode %u/%u/%u, col %u, " "blockno %lu, cuUnitCount %u", datahdr.rnode.spcNode, datahdr.rnode.dbNode, datahdr.rnode.relNode, - GETATTID(datahdr.attid), + GETATTID((uint)datahdr.attid), datahdr.offset / ALIGNOF_CUSIZE, datahdr.data_size / ALIGNOF_CUSIZE))); } diff --git a/src/gausskernel/storage/replication/datareceiver.cpp b/src/gausskernel/storage/replication/datareceiver.cpp index b4e1931c3..7ee213721 100755 --- a/src/gausskernel/storage/replication/datareceiver.cpp +++ b/src/gausskernel/storage/replication/datareceiver.cpp @@ -150,6 +150,47 @@ static void ReadDummyFile(HTAB* relFileNodeTab, uint32 readFileNum, int* bufferS static bool ReadDummySlice(FILE* readFileFd, char* buf, uint32 length, char* path); static int ParseDataHeader(HTAB* relFileNodeTab, char* bufPtr, uint32 nbytes); +void DataReceiverPing(bool* ping_ptr, TimestampTz* last_recv_timestamp_ptr) +{ + /* + * We didn't receive anything new. If we haven't heard anything + * from the server for more than wal_receiver_timeout / 2, + * ping the server. Also, if it's been longer than + * wal_receiver_status_interval since the last update we sent, + * send a status update to the master anyway, to report any + * progress in applying WAL. + */ + bool requestReply = false; + + /* + * Check if time since last receive from master has reached the + * configured limit. + */ + if (u_sess->attr.attr_storage.wal_receiver_timeout > 0) { + TimestampTz nowtime = GetCurrentTimestamp(); + TimestampTz timeout = 0; + + timeout = TimestampTzPlusMilliseconds( + *last_recv_timestamp_ptr, u_sess->attr.attr_storage.wal_receiver_timeout / 2); + + /* + * We didn't receive anything new, for half of receiver + * replication timeout. Ping the server. + */ + if (nowtime >= timeout) { + if (!(*ping_ptr)) { + requestReply = true; + *ping_ptr = true; + *last_recv_timestamp_ptr = nowtime; + } else { + ereport(ERROR, + (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating datareceiver due to timeout"))); + } + } + } + DataRcvSendReply(requestReply, requestReply); +} + /* Main entry point for datareceiver process */ void DataReceiverMain(void) { @@ -319,42 +360,7 @@ void DataReceiverMain(void) /* Let the master know that we received some data. */ DataRcvSendReply(false, false); } else { - /* - * We didn't receive anything new. If we haven't heard anything - * from the server for more than wal_receiver_timeout / 2, - * ping the server. Also, if it's been longer than - * wal_receiver_status_interval since the last update we sent, - * send a status update to the master anyway, to report any - * progress in applying WAL. - */ - bool requestReply = false; - - /* - * Check if time since last receive from master has reached the - * configured limit. - */ - if (u_sess->attr.attr_storage.wal_receiver_timeout > 0) { - TimestampTz nowtime = GetCurrentTimestamp(); - TimestampTz timeout = 0; - - timeout = TimestampTzPlusMilliseconds( - last_recv_timestamp, u_sess->attr.attr_storage.wal_receiver_timeout / 2); - /* - * We didn't receive anything new, for half of receiver - * replication timeout. Ping the server. - */ - if (nowtime >= timeout) { - if (!ping_sent) { - requestReply = true; - ping_sent = true; - last_recv_timestamp = nowtime; - } else { - ereport(ERROR, - (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating datareceiver due to timeout"))); - } - } - } - DataRcvSendReply(requestReply, requestReply); + DataReceiverPing(&ping_sent, &last_recv_timestamp); } } } @@ -905,7 +911,6 @@ static void ReadDummyFile(HTAB* relFileNodeTab, uint32 readFileNum, int* bufferS } char* elementBuf = NULL; elementBuf = (char*)palloc(nbytes); - /* * 2. then, read the data which according to nbytes; */ @@ -972,7 +977,7 @@ static int ParseDataHeader(HTAB* relFileNodeTab, char* bufPtr, uint32 nbytes) elementKey = (RelFileNodeKey*)palloc(sizeof(RelFileNodeKey)); /* Fill the key during this read */ RelFileNodeCopy(elementKey->relfilenode, dataInfo.rnode, GETBUCKETID(dataInfo.attid)); - elementKey->columnid = GETATTID(dataInfo.attid); + elementKey->columnid = (int)GETATTID((uint)dataInfo.attid); if (u_sess->attr.attr_storage.HaModuleDebug) { ereport(LOG, @@ -1272,11 +1277,11 @@ static void DataRcvReceive(char* buf, Size nbytes) buf += headerlen; if (u_sess->attr.attr_storage.HaModuleDebug) { ereport(LOG, - (errmsg("DataRcvReceive element info: %u, %u, %u, %d ", + (errmsg("DataRcvReceive element info: %u, %u, %u, %u ", dataelemheader.rnode.dbNode, dataelemheader.rnode.spcNode, dataelemheader.rnode.relNode, - GETATTID(dataelemheader.attid)))); + GETATTID((uint)dataelemheader.attid)))); } cursegno = dataelemheader.blocknum / ((BlockNumber)RELSEG_SIZE); diff --git a/src/gausskernel/storage/replication/datasyncrep.cpp b/src/gausskernel/storage/replication/datasyncrep.cpp index b04c5875f..dddc695d8 100755 --- a/src/gausskernel/storage/replication/datasyncrep.cpp +++ b/src/gausskernel/storage/replication/datasyncrep.cpp @@ -85,8 +85,7 @@ void WaitForDataSync(void) } /* - * Set our wait offset so DataSender will know when to wake us, and add - * ourselves to the queue. + * Set our wait offset so DataSender will know when to wake us, and add ourselves to the queue. */ t_thrd.proc->dataSyncRepState = SYNC_REP_WAITING; DataSyncRepQueueInsert(); @@ -102,7 +101,6 @@ void WaitForDataSync(void) t_thrd.dataqueue_cxt.DataSenderQueue->use_tail2.queueoff, t_thrd.proc->waitDataSyncPoint.queueid, t_thrd.proc->waitDataSyncPoint.queueoff))); - WaitState oldStatus = pgstat_report_waitstatus(STATE_WAIT_DATASYNC); /* diff --git a/src/gausskernel/storage/replication/logical/decode.cpp b/src/gausskernel/storage/replication/logical/decode.cpp index b184179be..7f8fceede 100644 --- a/src/gausskernel/storage/replication/logical/decode.cpp +++ b/src/gausskernel/storage/replication/logical/decode.cpp @@ -348,6 +348,7 @@ static void DecodeStandbyOp(LogicalDecodingContext* ctx, XLogRecordBuffer* buf) SnapBuild* builder = ctx->snapshot_builder; XLogReaderState* r = buf->record; uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + XLogRecPtr lsn = buf->endptr; ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); @@ -364,10 +365,12 @@ static void DecodeStandbyOp(LogicalDecodingContext* ctx, XLogRecordBuffer* buf) * while shutdown checkpoints just know that no non-prepared * transactions are in progress. */ - ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid); + ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid, lsn); } break; case XLOG_STANDBY_LOCK: case XLOG_STANDBY_CSN: + case XLOG_STANDBY_CSN_COMMITTING: + case XLOG_STANDBY_CSN_ABORTED: break; default: ereport(ERROR, diff --git a/src/gausskernel/storage/replication/logical/logicalfuncs.cpp b/src/gausskernel/storage/replication/logical/logicalfuncs.cpp index a5f277ecf..67db95fec 100755 --- a/src/gausskernel/storage/replication/logical/logicalfuncs.cpp +++ b/src/gausskernel/storage/replication/logical/logicalfuncs.cpp @@ -102,7 +102,7 @@ static void LogicalOutputWrite(LogicalDecodingContext* ctx, XLogRecPtr lsn, Tran p->returned_rows++; } -static void check_permissions(void) +void check_permissions(void) { if (!superuser() && !has_rolreplication(GetUserId())) ereport(ERROR, diff --git a/src/gausskernel/storage/replication/logical/reorderbuffer.cpp b/src/gausskernel/storage/replication/logical/reorderbuffer.cpp index 1abd118ec..5a0600196 100755 --- a/src/gausskernel/storage/replication/logical/reorderbuffer.cpp +++ b/src/gausskernel/storage/replication/logical/reorderbuffer.cpp @@ -179,7 +179,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer* rb, ReorderBufferTXN* txn); static void ReorderBufferSerializeChange(ReorderBuffer* rb, ReorderBufferTXN* txn, int fd, ReorderBufferChange* change); static Size ReorderBufferRestoreChanges(ReorderBuffer* rb, ReorderBufferTXN* txn, int* fd, XLogSegNo* segno); static void ReorderBufferRestoreChange(ReorderBuffer* rb, ReorderBufferTXN* txn, char* change); -static void ReorderBufferRestoreCleanup(ReorderBuffer* rb, ReorderBufferTXN* txn); +static void ReorderBufferRestoreCleanup(ReorderBuffer* rb, ReorderBufferTXN* txn, XLogRecPtr lsn); static void ReorderBufferFreeSnap(ReorderBuffer* rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer* rb, Snapshot orig_snap, ReorderBufferTXN* txn, CommandId cid); @@ -1011,7 +1011,7 @@ static void ReorderBufferIterTXNFinish(ReorderBuffer* rb, ReorderBufferIterTXNSt * Cleanup the contents of a transaction, usually after the transaction * committed or aborted. */ -static void ReorderBufferCleanupTXN(ReorderBuffer* rb, ReorderBufferTXN* txn) +static void ReorderBufferCleanupTXN(ReorderBuffer* rb, ReorderBufferTXN* txn, XLogRecPtr lsn = InvalidXLogRecPtr) { bool found = false; dlist_mutable_iter iter; @@ -1031,7 +1031,7 @@ static void ReorderBufferCleanupTXN(ReorderBuffer* rb, ReorderBufferTXN* txn) Assert(subtxn->is_known_as_subxact); Assert(subtxn->nsubtxns == 0); - ReorderBufferCleanupTXN(rb, subtxn); + ReorderBufferCleanupTXN(rb, subtxn, lsn); } /* cleanup changes in the toplevel txn */ @@ -1080,7 +1080,7 @@ static void ReorderBufferCleanupTXN(ReorderBuffer* rb, ReorderBufferTXN* txn) /* remove entries spilled to disk */ if (txn->serialized) - ReorderBufferRestoreCleanup(rb, txn); + ReorderBufferRestoreCleanup(rb, txn, lsn); /* deallocate */ ReorderBufferReturnTXN(rb, txn); @@ -1393,6 +1393,7 @@ void ReorderBufferCommit(ReorderBuffer* rb, TransactionId xid, XLogRecPtr commit if (snapshot_now->copied) { ReorderBufferFreeSnap(rb, snapshot_now); + snapshot_now = NULL; snapshot_now = ReorderBufferCopySnap(rb, change->data.snapshot, txn, command_id); } else if (change->data.snapshot->copied) { /* @@ -1484,7 +1485,7 @@ void ReorderBufferCommit(ReorderBuffer* rb, TransactionId xid, XLogRecPtr commit TeardownHistoricSnapshot(true); - if (snapshot_now->copied) + if (snapshot_now != NULL && snapshot_now->copied) ReorderBufferFreeSnap(rb, snapshot_now); if (subtxn_started) @@ -1541,7 +1542,7 @@ void ReorderBufferAbort(ReorderBuffer* rb, TransactionId xid, XLogRecPtr lsn) * NB: These really have to be transactions that have aborted due to a server * crash/immediate restart, as we don't deal with invalidations here. */ -void ReorderBufferAbortOld(ReorderBuffer* rb, TransactionId oldestRunningXid) +void ReorderBufferAbortOld(ReorderBuffer* rb, TransactionId oldestRunningXid, XLogRecPtr lsn) { dlist_mutable_iter it; @@ -1562,7 +1563,7 @@ void ReorderBufferAbortOld(ReorderBuffer* rb, TransactionId oldestRunningXid) ereport(DEBUG2, (errmsg("aborting old transaction %lu", txn->xid))); /* remove potential on-disk data, and deallocate this tx */ - ReorderBufferCleanupTXN(rb, txn); + ReorderBufferCleanupTXN(rb, txn, lsn); } else return; } @@ -1804,9 +1805,12 @@ bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer* rb, TransactionId xid) return false; /* a known subtxn? operate on top-level txn instead */ - if (txn->is_known_as_subxact) + if (txn->is_known_as_subxact) { txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, NULL, InvalidXLogRecPtr, false); - + if (txn == NULL) { + return false; + } + } return txn->base_snapshot != NULL; } @@ -2259,15 +2263,17 @@ static void ReorderBufferRestoreChange(ReorderBuffer* rb, ReorderBufferTXN* txn, /* * Remove all on-disk stored for the passed in transaction. */ -static void ReorderBufferRestoreCleanup(ReorderBuffer* rb, ReorderBufferTXN* txn) +static void ReorderBufferRestoreCleanup(ReorderBuffer* rb, ReorderBufferTXN* txn, XLogRecPtr lsn = InvalidXLogRecPtr) { XLogSegNo first; XLogSegNo cur; XLogSegNo last; ReplicationSlot* slot = NULL; int rc = 0; + if (txn->final_lsn == InvalidXLogRecPtr) { + txn->final_lsn = lsn; + } Assert(!XLByteEQ(txn->first_lsn, InvalidXLogRecPtr)); - Assert(!XLByteEQ(txn->final_lsn, InvalidXLogRecPtr)); first = (txn->first_lsn) / XLogSegSize; last = (txn->final_lsn) / XLogSegSize; @@ -2426,6 +2432,9 @@ static void ReorderBufferToastAppendChunk( chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull)); Assert(!isnull); + if (isnull) { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("fail to get toast chunk"))); + } /* calculate size so we can allocate the right size at once later */ if (!VARATT_IS_EXTENDED(chunk)) { diff --git a/src/gausskernel/storage/replication/logical/snapbuild.cpp b/src/gausskernel/storage/replication/logical/snapbuild.cpp index cacaa7c20..a9a86eb6d 100755 --- a/src/gausskernel/storage/replication/logical/snapbuild.cpp +++ b/src/gausskernel/storage/replication/logical/snapbuild.cpp @@ -253,12 +253,6 @@ struct SnapBuild { #define MAX_SIZE_T_NUM 4294967295 #endif -/* - * Starting a transaction -- which we need to do while exporting a snapshot -- - * removes knowledge about the previously used resowner, so we save it here. - */ -ResourceOwner g_saved_resource_owner_during_export = NULL; - /* ->committed manipulation */ static void SnapBuildPurgeCommittedTxn(SnapBuild* builder); @@ -542,6 +536,9 @@ const char* SnapBuildExportSnapshot(SnapBuild* builder) const int newxcnt = 0; int maxcnt = GetMaxSnapshotXidCount(); + if (u_sess->utils_cxt.FirstSnapshotSet) { + return NULL; + } if (builder->state != SNAPBUILD_CONSISTENT) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -561,10 +558,10 @@ const char* SnapBuildExportSnapshot(SnapBuild* builder) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot export a snapshot from within a transaction"))); - if (g_saved_resource_owner_during_export) + if (t_thrd.logical_cxt.SavedResourceOwnerDuringExport) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("can only export one snapshot at a time"))); - g_saved_resource_owner_during_export = t_thrd.utils_cxt.CurrentResourceOwner; + t_thrd.logical_cxt.SavedResourceOwnerDuringExport = t_thrd.utils_cxt.CurrentResourceOwner; t_thrd.logical_cxt.ExportInProgress = true; StartTransactionCommand(); @@ -627,8 +624,8 @@ void SnapBuildClearExportedSnapshot() /* make sure nothing could have ever happened */ AbortCurrentTransaction(); - t_thrd.utils_cxt.CurrentResourceOwner = g_saved_resource_owner_during_export; - g_saved_resource_owner_during_export = NULL; + t_thrd.utils_cxt.CurrentResourceOwner = t_thrd.logical_cxt.SavedResourceOwnerDuringExport; + t_thrd.logical_cxt.SavedResourceOwnerDuringExport = NULL; t_thrd.logical_cxt.ExportInProgress = false; } diff --git a/src/gausskernel/storage/replication/slotfuncs.cpp b/src/gausskernel/storage/replication/slotfuncs.cpp index 2366930e3..1f290a041 100644 --- a/src/gausskernel/storage/replication/slotfuncs.cpp +++ b/src/gausskernel/storage/replication/slotfuncs.cpp @@ -85,8 +85,6 @@ void log_slot_advance(const ReplicationSlotPersistentData* slotInfo) if (g_instance.attr.attr_storage.max_wal_senders > 0) WalSndWakeup(); END_CRIT_SECTION(); - if (u_sess->attr.attr_storage.guc_synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH) - SyncRepWaitForLSN(Ptr); } void log_slot_drop(const char* name) @@ -177,14 +175,6 @@ Size GetAllLogicalSlot(LogicalPersistentData *&LogicalSlot) return size; } -static void check_permissions(void) -{ - if (!superuser() && !has_rolreplication(GetUserId())) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - (errmsg("must be system admin or replication role to use replication slots")))); -} - /* * SQL function for creating a new physical (streaming replication) * replication slot. diff --git a/src/gausskernel/storage/replication/walreceiver.cpp b/src/gausskernel/storage/replication/walreceiver.cpp index 795a1bab5..4db61e6f4 100755 --- a/src/gausskernel/storage/replication/walreceiver.cpp +++ b/src/gausskernel/storage/replication/walreceiver.cpp @@ -124,6 +124,7 @@ const char* g_reserve_param[RESERVE_SIZE] = {"application_name", static void EnableWalRcvImmediateExit(void); static void DisableWalRcvImmediateExit(void); static void WalRcvDie(int code, Datum arg); +static void XLogWalRcvDataPageReplication(char* buf, Size len); static void XLogWalRcvProcessMsg(unsigned char type, char* buf, Size len); static void XLogWalRcvReceive(char* buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvReceiveInBuf(char* buf, Size nbytes, XLogRecPtr recptr); @@ -809,6 +810,44 @@ bool WalRcvIsShutdown(void) return t_thrd.walreceiver_cxt.got_SIGTERM; } +static void XLogWalRcvDataPageReplication(char* buf, Size len) +{ + WalDataPageMessageHeader msghdr; + Assert(true == g_instance.attr.attr_storage.enable_mix_replication); + + if (!g_instance.attr.attr_storage.enable_mix_replication) { + ereport(PANIC, (errmsg("WAL streaming isn't employed to sync all the replication data log."))); + } + if (len < sizeof(WalDataPageMessageHeader)) { + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid wal data page message received from primary"))); + } + + /* memcpy is required here for alignment reasons */ + error_t rc = memcpy_s(&msghdr, sizeof(WalDataPageMessageHeader), buf, sizeof(WalDataPageMessageHeader)); + securec_check(rc, "\0", "\0"); + + ProcessWalDataHeaderMessage(&msghdr); + + buf += sizeof(WalDataPageMessageHeader); + len -= sizeof(WalDataPageMessageHeader); + + if (len > WS_MAX_DATA_QUEUE_SIZE) { + Assert(false); + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal( + "unexpected wal data size %lu bytes exceeds the max receiving data queue size %u bytes", + len, + WS_MAX_DATA_QUEUE_SIZE))); + } + if (u_sess->attr.attr_storage.HaModuleDebug) { + WSDataRcvCheck(buf, len); + } + WalDataRcvReceive(buf, len, 0); +} + /* * Accept the message from XLOG stream, and process it. */ @@ -858,39 +897,7 @@ static void XLogWalRcvProcessMsg(unsigned char type, char* buf, Size len) } case 'd': /* Data page replication for the logical xlog */ { - WalDataPageMessageHeader msghdr; - - Assert(true == g_instance.attr.attr_storage.enable_mix_replication); - - if (!g_instance.attr.attr_storage.enable_mix_replication) - ereport(PANIC, (errmsg("WAL streaming isn't employed to sync all the replication data log."))); - - if (len < sizeof(WalDataPageMessageHeader)) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("invalid wal data page message received from primary"))); - - /* memcpy is required here for alignment reasons */ - errorno = memcpy_s(&msghdr, sizeof(WalDataPageMessageHeader), buf, sizeof(WalDataPageMessageHeader)); - securec_check(errorno, "\0", "\0"); - - ProcessWalDataHeaderMessage(&msghdr); - - buf += sizeof(WalDataPageMessageHeader); - len -= sizeof(WalDataPageMessageHeader); - - if (len > WS_MAX_DATA_QUEUE_SIZE) { - Assert(false); - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal( - "unexpected wal data size %lu bytes exceeds the max receiving data queue size %u bytes", - len, - WS_MAX_DATA_QUEUE_SIZE))); - } - if (u_sess->attr.attr_storage.HaModuleDebug) - WSDataRcvCheck(buf, len); - WalDataRcvReceive(buf, len, 0); + XLogWalRcvDataPageReplication(buf, len); break; } case 'k': /* Keepalive */ diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 63e497ba3..ed3ba47a5 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -111,6 +111,7 @@ static bool WalSndCaughtUp = false; bool WalSegmemtRemovedhappened = false; volatile bool bSyncStat = false; volatile bool bSyncStatStatBefore = false; +long g_logical_slot_sleep_time = 0; #define AmWalSenderToDummyStandby() (t_thrd.walsender_cxt.MyWalSnd->sendRole == SNDROLE_PRIMARY_DUMMYSTANDBY) #define AmWalSenderOnDummyStandby() (t_thrd.walsender_cxt.MyWalSnd->sendRole == SNDROLE_DUMMYSTANDBY_STANDBY) @@ -163,6 +164,8 @@ static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessStandbySwitchRequestMessage(void); static void ProcessRepliesIfAny(void); +static bool LogicalSlotSleepFlag(void); +static void do_actual_sleep(volatile WalSnd* walsnd); static void LogCtrlCountSleepLimit(void); static void LogCtrlSleep(void); static void LogCtrlCalculateCurrentRTO(StandbyReplyMessage* reply); @@ -447,9 +450,13 @@ static void WalSndHandshake(void) } break; case 'X': + case 'c': /* standby is closing the connection */ proc_exit(0); /* fall-through */ + case 'P': + /* standby is closing the connection */ + break; case EOF: /* standby disconnected unexpectedly */ ereport( @@ -1573,6 +1580,43 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) return RecentFlushPtr; } +/* + * Check cmdString format. + */ +bool cmdStringCheck(const char* cmd_string) +{ + const int maxStack = 100; + char charStack[maxStack]; + int stackLen = 0; + for (int i = 0; cmd_string[i] != '\0'; i++) { + if (cmd_string[i] == '\"') { + if (stackLen > 0 && charStack[stackLen - 1] == '\"') { + stackLen--; + } else { + charStack[stackLen++] = '\"'; + } + } else if (cmd_string[i] == '\'') { + if (stackLen > 0 && charStack[stackLen - 1] == '\'') { + stackLen--; + } else { + charStack[stackLen++] = '\''; + } + } else if (cmd_string[i] == '(') { + charStack[stackLen++] = '('; + } else if (cmd_string[i] == ')') { + if (stackLen > 0 && charStack[stackLen - 1] == '(') { + stackLen--; + } else { + return false; + } + } + } + if (stackLen == 0) { + return true; + } + return false; +} + /* * Execute an incoming replication command. */ @@ -1593,6 +1637,11 @@ static bool HandleWalReplicationCommand(const char* cmd_string) ereport(LOG, (errmsg("received wal replication command: %s", cmd_string))); + if (cmdStringCheck(cmd_string) == false) { + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + (errmsg_internal("replication command, syntax error.")))); + } + cmd_context = AllocSetContextCreate(CurrentMemoryContext, "Replication command context", ALLOCSET_DEFAULT_MINSIZE, @@ -1729,6 +1778,7 @@ static void ProcessRepliesIfAny(void) /* * 'X' means that the standby is closing down the socket. */ + case 'c': case 'X': proc_exit(0); /* fall-through */ @@ -1878,6 +1928,74 @@ char* remote_role_to_string(int role) return "UNKNOW"; } +/* + * When flush_lsn exceeds min_restart_lsn by a margin of max_keep_log_seg, + * walsender stream limitation is triggered. + */ +static bool LogicalSlotSleepFlag(void) +{ + const int xlog_offset = 24; + const int sleep_time_unit = 100000; + int64 max_keep_log_seg = (int64)g_instance.attr.attr_storage.max_keep_log_seg; + if (max_keep_log_seg <= 0) { + return false; + } + XLogRecPtr min_restart_lsn = InvalidXLogRecPtr; + for (int i = 0; i < g_instance.attr.attr_storage.max_replication_slots; i++) { + ReplicationSlot* s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && s->data.database != InvalidOid) { + min_restart_lsn = min_restart_lsn == InvalidXLogRecPtr ? s->data.restart_lsn : + Min(min_restart_lsn, s->data.restart_lsn); + } + } + if (min_restart_lsn == InvalidXLogRecPtr) { + return false; + } + XLogRecPtr flush_lsn = GetFlushRecPtr(); + if (((flush_lsn - min_restart_lsn) >> xlog_offset) > (uint64)max_keep_log_seg) { + g_logical_slot_sleep_time += sleep_time_unit; + if (g_logical_slot_sleep_time > MICROSECONDS_PER_SECONDS) { + g_logical_slot_sleep_time = MICROSECONDS_PER_SECONDS; + } + ereport(LOG, + (errmsg("flush_lsn %X/%X exceed min_restart_lsn %X/%X by threshold %ld, sleep time increase by 0.1s.\n", + (uint32)(flush_lsn >> 32), + (uint32)flush_lsn, + (uint32)(min_restart_lsn >> 32), + (uint32)min_restart_lsn, + max_keep_log_seg))); + return true; + } else { + g_logical_slot_sleep_time = 0; + } + return false; +} + +static void do_actual_sleep(volatile WalSnd* walsnd) +{ + bool logical_slot_sleep_flag = LogicalSlotSleepFlag(); + /* try to control log sent rate so that standby can flush and apply log under RTO seconds */ + if (walsnd->state == WALSNDSTATE_STREAMING && IS_PGXC_DATANODE) { + if (u_sess->attr.attr_storage.target_rto > 0) { + if (walsnd->log_ctrl.sleep_count % walsnd->log_ctrl.sleep_count_limit == 0) { + LogCtrlCalculateSleepTime(); + LogCtrlCountSleepLimit(); + } + LogCtrlSleep(); + if (logical_slot_sleep_flag && + g_logical_slot_sleep_time > t_thrd.walsender_cxt.MyWalSnd->log_ctrl.sleep_time) { + pg_usleep(g_logical_slot_sleep_time - t_thrd.walsender_cxt.MyWalSnd->log_ctrl.sleep_time); + } + } else { + if (logical_slot_sleep_flag) { + pg_usleep(g_logical_slot_sleep_time); + } + } + } + walsnd->log_ctrl.sleep_count++; +} + /* * Regular reply from standby advising of WAL positions on standby server. */ @@ -1945,27 +2063,20 @@ static void ProcessStandbyReplyMessage(void) } /* - * calculate RTO every sleep_count_limit. + * Only sleep when local role is not WAL_DB_SENDER. */ - if (IS_PGXC_DATANODE && walsnd->log_ctrl.sleep_count % walsnd->log_ctrl.sleep_count_limit == 0) { - LogCtrlCalculateCurrentRTO(&reply); - walsnd->log_ctrl.prev_reply_time = reply.sendTime; - walsnd->log_ctrl.prev_flush = reply.flush; - walsnd->log_ctrl.prev_apply = reply.apply; - } - - /* try to control log sent rate so that standby can flush and apply log under RTO seconds */ - if (walsnd->state == WALSNDSTATE_STREAMING && u_sess->attr.attr_storage.target_rto > 0 && IS_PGXC_DATANODE) { - if (walsnd->log_ctrl.sleep_count % walsnd->log_ctrl.sleep_count_limit == 0) { - LogCtrlCalculateSleepTime(); - LogCtrlCountSleepLimit(); + if (!AM_WAL_DB_SENDER) { + if (IS_PGXC_DATANODE && walsnd->log_ctrl.sleep_count % walsnd->log_ctrl.sleep_count_limit == 0) { + LogCtrlCalculateCurrentRTO(&reply); + walsnd->log_ctrl.prev_reply_time = reply.sendTime; + walsnd->log_ctrl.prev_flush = reply.flush; + walsnd->log_ctrl.prev_apply = reply.apply; } - LogCtrlSleep(); + do_actual_sleep(walsnd); } - walsnd->log_ctrl.sleep_count++; - - if (!AM_WAL_STANDBY_SENDER) + if (!AM_WAL_STANDBY_SENDER) { SyncRepReleaseWaiters(); + } /* * Advance our local xmin horizon when the client confirmed a flush. @@ -2693,7 +2804,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data) if (sync_config_needed) { if (t_thrd.walsender_cxt.walsender_shutdown_requested) { - if (!SendConfigFile(t_thrd.walsender_cxt.gucconf_file)) + if (!AM_WAL_DB_SENDER && !SendConfigFile(t_thrd.walsender_cxt.gucconf_file)) ereport(LOG, (errmsg("failed to send config to the peer when walsender shutdown."))); sync_config_needed = false; } else { @@ -2703,7 +2814,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data) sync_config_needed = false; /* begin send file to standby */ if (t_thrd.walsender_cxt.MyWalSnd && t_thrd.walsender_cxt.MyWalSnd->peer_state != BUILDING_STATE) { - if (!SendConfigFile(t_thrd.walsender_cxt.gucconf_file)) + if (!AM_WAL_DB_SENDER && !SendConfigFile(t_thrd.walsender_cxt.gucconf_file)) sync_config_needed = true; else last_syncconf_timestamp = nowtime; diff --git a/src/include/knl/knl_guc/knl_instance_attr_storage.h b/src/include/knl/knl_guc/knl_instance_attr_storage.h index 281997e6f..fc9805f80 100755 --- a/src/include/knl/knl_guc/knl_instance_attr_storage.h +++ b/src/include/knl/knl_guc/knl_instance_attr_storage.h @@ -76,10 +76,11 @@ typedef struct knl_instance_attr_storage { int recovery_redo_workers_per_paser_worker; int pagewriter_thread_num; int real_recovery_parallelism; - int batch_redo_num; + int batch_redo_num; int remote_read_mode; int advance_xlog_file_num; int gtm_option; + int max_keep_log_seg; } knl_instance_attr_storage; #endif /* SRC_INCLUDE_KNL_KNL_INSTANCE_ATTR_STORAGE_H_ */ diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index a823451ed..944fd5b95 100644 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -74,6 +74,8 @@ #define RESERVE_SIZE 32 +typedef struct ResourceOwnerData* ResourceOwner; + typedef struct knl_t_codegen_context { void* thr_codegen_obj; @@ -2176,6 +2178,7 @@ typedef struct knl_t_logical_context { uint64 sendSegNo; uint32 sendOff; bool ExportInProgress; + ResourceOwner SavedResourceOwnerDuringExport; } knl_t_logical_context; typedef struct knl_t_dataqueue_context { diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h index 27c9431ce..4b41b15aa 100644 --- a/src/include/replication/logicalfuncs.h +++ b/src/include/replication/logicalfuncs.h @@ -25,6 +25,7 @@ extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_peek_changes(PG_FUNCTION_ARGS); +extern void check_permissions(void); #endif diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index ec148bb9b..76441b92b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -355,7 +355,7 @@ void ReorderBufferCommit(ReorderBuffer*, TransactionId, XLogRecPtr commit_lsn, X void ReorderBufferAssignChild(ReorderBuffer*, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer*, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); void ReorderBufferAbort(ReorderBuffer*, TransactionId, XLogRecPtr lsn); -void ReorderBufferAbortOld(ReorderBuffer*, TransactionId xid); +void ReorderBufferAbortOld(ReorderBuffer*, TransactionId xid, XLogRecPtr lsn); void ReorderBufferForget(ReorderBuffer*, TransactionId, XLogRecPtr lsn); void ReorderBufferSetBaseSnapshot(ReorderBuffer*, TransactionId, XLogRecPtr lsn, struct SnapshotData* snap); diff --git a/src/test/regress/expected/rangefuncs.out b/src/test/regress/expected/rangefuncs.out index 2ffc0a576..253635246 100644 --- a/src/test/regress/expected/rangefuncs.out +++ b/src/test/regress/expected/rangefuncs.out @@ -65,6 +65,7 @@ SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%' ORDER BY name; enableSeparationOfDuty | off enable_seqscan | on enable_show_any_tuples | off + enable_slot_log | off enable_sonic_hashagg | on enable_sonic_hashjoin | on enable_sonic_optspill | on @@ -78,7 +79,7 @@ SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%' ORDER BY name; enable_vector_engine | on enable_wdr_snapshot | off enable_xlog_prune | on -(77 rows) +(78 rows) CREATE TABLE foo2(fooid int, f2 int); INSERT INTO foo2 VALUES(1, 11); diff --git a/src/test/regress/output/recovery_2pc_tools.source b/src/test/regress/output/recovery_2pc_tools.source index 1c835d8a6..cbec567e8 100644 --- a/src/test/regress/output/recovery_2pc_tools.source +++ b/src/test/regress/output/recovery_2pc_tools.source @@ -340,6 +340,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c max_function_args | integer | | 666 | 666 max_identifier_length | integer | | 63 | 63 max_index_keys | integer | | 32 | 32 + max_keep_log_seg | integer | | 0 | 2147483647 max_loaded_cudesc | integer | | 100 | 1073741823 max_locks_per_transaction | integer | | 10 | 2147483647 max_pred_locks_per_transaction | integer | | 10 | 2147483647