diff --git a/src/bin/gs_guc/cluster_guc.conf b/src/bin/gs_guc/cluster_guc.conf index e1acd980a..823394594 100755 --- a/src/bin/gs_guc/cluster_guc.conf +++ b/src/bin/gs_guc/cluster_guc.conf @@ -787,6 +787,7 @@ ss_txnstatus_cache_size|int|0,524288|NULL|NULL| subscription_conflict_resolution|enum|error,apply_remote,keep_local|NULL|NULL| time_record_level|int|0,10|NULL|NULL| ss_enable_dorado|bool|0,0|NULL|NULL| +ss_stream_cluster|bool|0,0|NULL|NULL| enable_uwal|bool|0,0|NULL|NULL| uwal_config|string|0,0|NULL|GUC to control paxos cluster| uwal_disk_size|int64|8589934592,4398046511104|NULL|NULL| diff --git a/src/bin/pg_ctl/backup.cpp b/src/bin/pg_ctl/backup.cpp index e4a7bec89..458c3bb23 100755 --- a/src/bin/pg_ctl/backup.cpp +++ b/src/bin/pg_ctl/backup.cpp @@ -1026,12 +1026,22 @@ static bool ReceiveAndUnpackTarFile(PGconn* conn, PGresult* res, int rownum) continue; } - /* pg_control will be written into pages of each interconnect nodes in stanby cluster corresponding to */ + /* pg_control will be written into pages of each interconnect nodes in dorado stanby cluster corresponding to */ if (ss_instance_config.dss.enable_dss && strcmp(filename, pg_control_file) == 0) { pg_log(PG_WARNING, _("file size %d. \n"), r); - int node; - for (node = 0; node < ss_instance_config.dss.interNodeNum; node++) { - off_t seekpos = (off_t)BLCKSZ * node; + if (ss_instance_config.dss.enable_dorado) { + for (int node = 0; node < ss_instance_config.dss.interNodeNum; node++) { + off_t seekpos = (off_t)BLCKSZ * node; + fseek(file, seekpos, SEEK_SET); + if (fwrite(copybuf, r, 1, file) != 1) { + pg_log(PG_WARNING, _("could not write to file \"%s\": %s\n"), filename, strerror(errno)); + DisconnectConnection(); + FREE_AND_RESET(copybuf); + return false; + } + } + } else { + off_t seekpos = (off_t)BLCKSZ * ss_instance_config.dss.instance_id; fseek(file, seekpos, SEEK_SET); if (fwrite(copybuf, r, 1, file) != 1) { pg_log(PG_WARNING, _("could not write to file \"%s\": %s\n"), filename, strerror(errno)); diff --git a/src/common/backend/utils/misc/guc/guc_storage.cpp b/src/common/backend/utils/misc/guc/guc_storage.cpp index 5ee440dee..99f532ca0 100755 --- a/src/common/backend/utils/misc/guc/guc_storage.cpp +++ b/src/common/backend/utils/misc/guc/guc_storage.cpp @@ -221,6 +221,7 @@ static bool check_ss_dss_conn_path(char** newval, void** extra, GucSource source static bool check_ss_enable_ssl(bool* newval, void** extra, GucSource source); static bool check_normal_cluster_replication_config_para(char** newval, void** extra, GucSource source); static bool check_ss_cluster_replication_control_para(bool* newval, void** extra, GucSource source); +static bool check_ss_cluster_disaster_control_para(bool* newval, void** extra, GucSource source); #ifdef USE_ASSERT_CHECKING static void assign_ss_enable_verify_page(bool newval, void *extra); @@ -1171,6 +1172,7 @@ static void InitStorageConfigureNamesBool() NULL, NULL, NULL}, + {{"ss_enable_dorado", PGC_POSTMASTER, NODE_SINGLENODE, @@ -1184,6 +1186,19 @@ static void InitStorageConfigureNamesBool() NULL, NULL}, + {{"ss_stream_cluster", + PGC_POSTMASTER, + NODE_SINGLENODE, + WAL, + gettext_noop("Use to enabel disaster in ss disaster recovery cluster mode."), + NULL, + GUC_SUPERUSER_ONLY}, + &g_instance.attr.attr_storage.ss_stream_cluster, + false, + check_ss_cluster_disaster_control_para, + NULL, + NULL}, + #ifdef USE_ASSERT_CHECKING {{"enable_hashbucket", PGC_SUSET, @@ -6351,6 +6366,13 @@ static bool check_normal_cluster_replication_config_para(char** newval, void** e return false; } + if (g_instance.attr.attr_storage.ss_stream_cluster) { + ereport(ERROR, (errmsg("Do not allow both enable normal cluster replication " + "and disaster replication with \"ss_stream_cluster\" = %d", \ + g_instance.attr.attr_storage.ss_stream_cluster))); + return false; + } + return true; } @@ -6360,6 +6382,13 @@ static bool check_ss_cluster_replication_control_para(bool* newval, void** extra return true; } + if (*newval && g_instance.attr.attr_storage.ss_stream_cluster) { + ereport(ERROR, (errmsg("Do not allow both enable ss cluster replication " + "and disaster replication with \"ss_stream_cluster\" = %d", \ + g_instance.attr.attr_storage.ss_stream_cluster))); + return false; + } + if (g_instance.attr.attr_storage.xlog_file_path != NULL) { ereport(ERROR, (errmsg("Do not allow both enable ss cluster replication " "and normal cluster repliction with \"xlog_file_path\" = %s", \ @@ -6370,6 +6399,29 @@ static bool check_ss_cluster_replication_control_para(bool* newval, void** extra return true; } +static bool check_ss_cluster_disaster_control_para(bool* newval, void** extra, GucSource source) +{ + if (!(*newval)) { + return true; + } + + if (g_instance.attr.attr_storage.ss_enable_dorado) { + ereport(ERROR, (errmsg("Do not allow both enable ss cluster replication " + "and dorado replication with \"ss_enable_dorado\" = %d", \ + g_instance.attr.attr_storage.ss_enable_dorado))); + return false; + } + + if (g_instance.attr.attr_storage.xlog_file_path != NULL) { + ereport(ERROR, (errmsg("Do not allow both enable ss cluster replication " + "and normal cluster replication with \"xlog_file_path\" = %s", \ + g_instance.attr.attr_storage.xlog_file_path))); + return false; + } + + return true; +} + extern bool check_special_character(char c); static bool check_ss_ock_log_path(char **newval, void **extra, GucSource source) diff --git a/src/common/backend/utils/misc/postgresql_single.conf.sample b/src/common/backend/utils/misc/postgresql_single.conf.sample index a3add0a10..92df4843d 100644 --- a/src/common/backend/utils/misc/postgresql_single.conf.sample +++ b/src/common/backend/utils/misc/postgresql_single.conf.sample @@ -853,6 +853,7 @@ job_queue_processes = 10 # Number of concurrent jobs, optional: [0..1000] #ss_enable_ondemand_recovery = off #ss_ondemand_recovery_mem_size = 4GB # min: 1GB, max: 100GB #ss_enable_dorado = off +#ss_stream_cluster = off #enable_segment = off #ss_work_thread_pool_attr = '' diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index b0742971c..499a4fede 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -26,7 +26,7 @@ #include "storage/proc.h" #include "storage/buf/bufmgr.h" #include "storage/smgr/segment.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "utils/resowner.h" #include "ddes/dms/ss_dms_bufmgr.h" #include "ddes/dms/ss_reform_common.h" @@ -735,12 +735,12 @@ dms_session_e DMSGetProcType4RequestPage() // proc type used in DMS request page if (AmDmsReformProcProcess() || (AmPageRedoProcess() && !SS_ONDEMAND_BUILD_DONE) || (AmStartupProcess() && !SS_ONDEMAND_BUILD_DONE)) { - /* When xlog_file_path is not null and enable_dms is set on, main standby always is in recovery. + /* When SS double cluster, main standby always is in recovery. * When pmState is PM_HOT_STANDBY, this case indicates main standby support to read only. So here * DMS_SESSION_RECOVER_HOT_STANDBY will be returned, it indicates that normal threads can access * page in recovery state. */ - if (SS_REPLICATION_MAIN_STANBY_NODE && pmState == PM_HOT_STANDBY) { + if (SS_DISASTER_MAIN_STANDBY_NODE && pmState == PM_HOT_STANDBY) { return DMS_SESSION_RECOVER_HOT_STANDBY; } else { return DMS_SESSION_RECOVER; diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 4ae9469ed..b6af66f96 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -41,7 +41,7 @@ #include "storage/sinvaladt.h" #include "replication/walsender_private.h" #include "replication/walreceiver.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "ddes/dms/ss_switchover.h" #include "ddes/dms/ss_reform_common.h" #include "ddes/dms/ss_dms_bufmgr.h" @@ -61,7 +61,7 @@ void SSWakeupRecovery(void) /* need make sure pagewriter started first */ bool need_recovery = true; - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DISASTER_MAIN_STANDBY_NODE) { g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag = false; return; } @@ -211,8 +211,12 @@ static int CBGetTxnCSN(void *db_handle, dms_opengauss_xid_csn_t *csn_req, dms_op static int CBGetSnapshotData(void *db_handle, dms_opengauss_txn_snapshot_t *txn_snapshot, uint8 inst_id) { - /* SS_REPLICATION_MAIN_STANBY_NODE always is in recovery progress, but it can acquire snapshot*/ - if (RecoveryInProgress() && !(SS_NORMAL_PRIMARY && SS_REPLICATION_MAIN_STANBY_NODE)) { + /* SS_MAIN_STANDBY_NODE always is in recovery progress, but it can acquire snapshot*/ + if (RecoveryInProgress() && !(SS_NORMAL_PRIMARY && SS_DISASTER_MAIN_STANDBY_NODE)) { + return DMS_ERROR; + } + + if (!SSCanFetchLocalSnapshotTxnRelatedInfo()) { return DMS_ERROR; } @@ -438,7 +442,7 @@ static void CBSwitchoverResult(void *db_handle, int result) } else { /* abort and restore state */ g_instance.dms_cxt.SSClusterState = NODESTATE_NORMAL; - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DISASTER_STANDBY_CLUSTER) { g_instance.dms_cxt.SSReformInfo.in_reform = false; } ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS switchover] Switchover failed, errno: %d.", result))); @@ -1755,6 +1759,10 @@ static void CBReformSetDmsRole(void *db_handle, unsigned int reformer_id) ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS switchover]role and lock switched, updated inst:%d with role:%d success", SS_MY_INST_ID, reform_info->dms_role))); + /* we need change ha cur mode for switchover in ss double cluster here */ + if (SS_DISASTER_CLUSTER) { + SSDisasterUpdateHAmode(); + } } static void ReformCleanBackends() @@ -1931,8 +1939,8 @@ static void CBReformStartNotify(void *db_handle, dms_reform_start_context_t *rs_ ReformCleanBackends(); } - if (SS_REPLICATION_DORADO_CLUSTER) { - SSDoradoUpdateHAmode(); + if (SS_DISASTER_CLUSTER && reform_info->reform_type != DMS_REFORM_TYPE_FOR_SWITCHOVER_OPENGAUSS) { + SSDisasterUpdateHAmode(); } } @@ -1946,8 +1954,8 @@ static int CBReformDoneNotify(void *db_handle) } } - if (SS_REPLICATION_DORADO_CLUSTER) { - SSDoradoUpdateHAmode(); + if (SS_DISASTER_CLUSTER) { + SSDisasterUpdateHAmode(); } /* SSClusterState and in_reform must be set atomically */ @@ -2046,7 +2054,7 @@ static int CBUpdateNodeOldestXmin(void *db_handle, uint8 inst_id, unsigned long void DmsCallbackThreadShmemInit(unsigned char need_startup, char **reg_data) { /* in dorado mode, we need to wait sharestorageinit finished */ - while (!g_instance.dms_cxt.SSRecoveryInfo.dorado_sharestorage_inited && SS_REPLICATION_DORADO_CLUSTER) { + while (!g_instance.dms_cxt.SSRecoveryInfo.dorado_sharestorage_inited && SS_DORADO_CLUSTER) { pg_usleep(REFORM_WAIT_TIME); } IsUnderPostmaster = true; diff --git a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp index d965542ce..42fb31b29 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp @@ -33,7 +33,7 @@ #include "storage/smgr/segment.h" #include "postmaster/postmaster.h" #include "storage/file/fio_device.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "ddes/dms/ss_dms_bufmgr.h" #include "ddes/dms/ss_dms_recovery.h" #include "ddes/dms/ss_reform_common.h" @@ -128,7 +128,7 @@ bool SSRecoveryNodes() * recovery phase could be regarded successful in hot_standby thus set pmState = PM_HOT_STANDBY, which * indicate database systerm is ready to accept read only connections. */ - if (SS_REPLICATION_MAIN_STANBY_NODE && pmState == PM_HOT_STANDBY) { + if (SS_DISASTER_MAIN_STANDBY_NODE && pmState == PM_HOT_STANDBY) { result = true; break; } @@ -149,7 +149,7 @@ bool SSRecoveryApplyDelay() return false; } - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DISASTER_STANDBY_CLUSTER) { return true; } diff --git a/src/gausskernel/ddes/adapter/ss_reform_common.cpp b/src/gausskernel/ddes/adapter/ss_reform_common.cpp index 9a6175e4b..a9ad4fcec 100644 --- a/src/gausskernel/ddes/adapter/ss_reform_common.cpp +++ b/src/gausskernel/ddes/adapter/ss_reform_common.cpp @@ -36,7 +36,7 @@ #include "storage/file/fio_device.h" #include "storage/smgr/segment_internal.h" #include "replication/walreceiver.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" /* * Add xlog reader private structure for page read. @@ -47,23 +47,6 @@ typedef struct XLogPageReadPrivate { bool randAccess; } XLogPageReadPrivate; -std::vector SSGetAllStableNodeId() -{ - std::vector posList; - int pos = 0; - uint64 stableInstId = g_instance.dms_cxt.SSReformerControl.list_stable; - while (stableInstId) { - uint64 res = stableInstId & 0x01; - if (res) { - posList.emplace_back(pos); - } - pos++; - stableInstId = stableInstId >> 1; - } - - return posList; -} - int SSXLogFileOpenAnyTLI(XLogSegNo segno, int emode, uint32 sources, char* xlog_path) { char path[MAXPGPATH]; @@ -101,10 +84,10 @@ int SSXLogFileOpenAnyTLI(XLogSegNo segno, int emode, uint32 sources, char* xlog_ } /* - * When SS_REPLICATION_DORADO_CLUSTER enabled, current xlog dictionary may be not the correct dictionary, + * When SS_DORADO_CLUSTER enabled, current xlog dictionary may be not the correct dictionary, * because all xlog dictionaries are in the same LUN, we need loop over other dictionaries. */ - if (fd < 0 && SS_REPLICATION_DORADO_CLUSTER) { + if (fd < 0 && SS_DORADO_CLUSTER) { return -1; } @@ -147,8 +130,8 @@ int SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XL /* * That preReadStartPtr is InvalidXlogPreReadStartPtr has three kinds of occasions. */ - if (xlogreader->preReadStartPtr == InvalidXlogPreReadStartPtr && SS_REPLICATION_MAIN_STANBY_NODE) { - ereport(LOG, (errmsg("In ss replication daorao cluster mode, preReadStartPtr is 0."))); + if (xlogreader->preReadStartPtr == InvalidXlogPreReadStartPtr && SS_DISASTER_CLUSTER) { + ereport(LOG, (errmsg("In ss disaster cluster mode, preReadStartPtr is 0."))); } // pre-reading for dss @@ -207,7 +190,43 @@ void SSGetRecoveryXlogPath() securec_check_ss(rc, "", ""); } -void SSDoradoGetXlogPathList() +char* SSGetNextXLogPath(TimeLineID tli, XLogRecPtr startptr) +{ + char path[MAXPGPATH]; + char fileName[MAXPGPATH]; + char temp[MAXPGPATH]; + struct stat buffer; + errno_t rc = EOK; + XLogSegNo segno; + XLByteToSeg(startptr, segno); + rc = snprintf_s(fileName, MAXPGPATH, MAXPGPATH - 1, "%08X%08X%08X", tli, + (uint32)((segno) / XLogSegmentsPerXLogId), + (uint32)((segno) % XLogSegmentsPerXLogId)); + securec_check_ss_c(rc, "\0", "\0"); + + for (int i = 1; i < DMS_MAX_INSTANCE; i++) { + if (g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i][0] == '\0') { + ereport(LOG, (errmsg("No valid next xlog file path"))); + break; + } + rc = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i], fileName); + securec_check_ss_c(rc, "\0", "\0"); + + if (stat(path, &buffer) == 0) { + rc = strcpy_s(temp, sizeof(temp), g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i]); + securec_check_c(rc, "\0", "\0"); + rc = strcpy_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i], MAXPGPATH, g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0]); + securec_check_c(rc, "\0", "\0"); + rc = strcpy_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0], MAXPGPATH, temp); + securec_check_c(rc, "\0", "\0"); + break; + } + } + + return g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0]; +} + +void SSDisasterGetXlogPathList() { errno_t rc = EOK; for (int i = 0; i < DMS_MAX_INSTANCE; i++) { @@ -242,7 +261,7 @@ void SSDoradoGetXlogPathList() void SSUpdateReformerCtrl() { - Assert(SS_PRIMARY_MODE); + Assert(SS_PRIMARY_MODE || SS_DISASTER_CLUSTER); int fd = -1; int len; errno_t err = EOK; @@ -459,25 +478,26 @@ static void SSReadClusterRunMode() LWLockRelease(ControlFileLock); } -void SSDoradoRefreshMode() +void SSDisasterRefreshMode() { LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + SSUpdateReformerCtrl(); LWLockRelease(ControlFileLock); - ereport(LOG, (errmsg("SSDoradoRefreshMode change control file cluster run mode to: %d", + ereport(LOG, (errmsg("SSDisasterRefreshMode change control file cluster run mode to: %d", g_instance.dms_cxt.SSReformerControl.clusterRunMode))); } -void SSDoradoUpdateHAmode() +void SSDisasterUpdateHAmode() { SSReadClusterRunMode(); - if (SS_REFORM_REFORMER && SS_REPLICATION_DORADO_CLUSTER) { - if (SS_REPLICATION_PRIMARY_CLUSTER) { + if (SS_REFORM_REFORMER) { + if (SS_DISASTER_PRIMARY_CLUSTER) { t_thrd.postmaster_cxt.HaShmData->current_mode = PRIMARY_MODE; - } else if (SS_REPLICATION_STANDBY_CLUSTER) { + } else if (SS_DISASTER_STANDBY_CLUSTER) { t_thrd.postmaster_cxt.HaShmData->current_mode = STANDBY_MODE; } - ereport(LOG, (errmsg("SSDoradoUpdateHAmode change control file cluster run mode to: %d", + ereport(LOG, (errmsg("SSDisasterUpdateHAmode change Ha current mode to: %d", t_thrd.postmaster_cxt.HaShmData->current_mode))); } } diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index cc25d03ae..a8b42f46a 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -32,6 +32,7 @@ #include "ddes/dms/ss_dms_bufmgr.h" #include "storage/sinvaladt.h" #include "replication/libpqsw.h" +#include "replication/walsender.h" static inline void txnstatusNetworkStats(uint64 timeDiff); static inline void txnstatusHashStats(uint64 timeDiff); @@ -60,6 +61,10 @@ static Snapshot SSGetSnapshotDataFromMaster(Snapshot snapshot) dms_ctx.xmap_ctx.dest_id = (unsigned int)SS_PRIMARY_ID; if (dms_request_opengauss_txn_snapshot(&dms_ctx, &dms_snapshot) == DMS_SUCCESS) { break; + } + + if (AM_WAL_SENDER && SS_IN_REFORM) { + return NULL; } pg_usleep(USECS_PER_SEC); diff --git a/src/gausskernel/po/zh_CN.po b/src/gausskernel/po/zh_CN.po index 90ec18bba..832c80457 100644 --- a/src/gausskernel/po/zh_CN.po +++ b/src/gausskernel/po/zh_CN.po @@ -46917,8 +46917,8 @@ msgstr "获取主节点replconninfo失败, 请检查replconninfo配置!" #: ddes/adapter/ss_reform_common.cpp:442 #, c-format -msgid "zatest: SSDoradoRefreshMode change control file cluster run mode to: %d" -msgstr "zatest: SSDoradoRefreshMode将控制文件集群运行模式更改为:%d" +msgid "zatest: SSDisasterRefreshMode change control file cluster run mode to: %d" +msgstr "zatest: SSDisasterRefreshMode将控制文件集群运行模式更改为:%d" #: ddes/adapter/ss_switchover.cpp:59 # @@ -102966,8 +102966,8 @@ msgstr "SEG_STORE_EXTEND_EXTENT%s:分段扩展一个范围!\n" #: storage/smgr/segstore.cpp:374 #, c-format -msgid "can segment address translation when role is SS_REPLICATION_MAIN_STANBY_NODE" -msgstr "当角色为SS_REPLICATION_MAIN_STANBY_NODE时, 可以进行段地址转换吗" +msgid "can segment address translation when role is SS_DISASTER_STANDBY_CLUSTER" +msgstr "当角色为SS_DISASTER_STANDBY_CLUSTER时, 可以进行段地址转换吗" #: storage/smgr/segstore.cpp:378 #, c-format diff --git a/src/gausskernel/process/postmaster/checkpointer.cpp b/src/gausskernel/process/postmaster/checkpointer.cpp index 1d811a88c..fffe0a476 100755 --- a/src/gausskernel/process/postmaster/checkpointer.cpp +++ b/src/gausskernel/process/postmaster/checkpointer.cpp @@ -47,7 +47,7 @@ #include "postmaster/bgwriter.h" #include "postmaster/pagewriter.h" #include "replication/syncrep.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "storage/buf/bufmgr.h" #include "storage/ipc.h" #include "storage/lock/lwlock.h" @@ -543,7 +543,7 @@ void CheckpointerMain(void) } else { CheckPointBuffers(flags, true); } - } else if (!do_restartpoint && !SS_REPLICATION_STANDBY_CLUSTER) { + } else if (!do_restartpoint && !SS_DISASTER_STANDBY_CLUSTER) { CreateCheckPoint(flags); ckpt_performed = true; if (!bgwriter_first_startup && CheckFpwBeforeFirstCkpt()) { diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index fd7545aaa..4f05c3382 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -153,7 +153,7 @@ #include "replication/dcf_replication.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "postmaster/bgwriter.h" #include "postmaster/cbmwriter.h" #include "postmaster/startup.h" @@ -3052,10 +3052,10 @@ int PostmasterMain(int argc, char* argv[]) if (g_instance.attr.attr_storage.dms_attr.enable_dms) { /* load primary id and reform stable list from control file */ SSReadControlFile(REFORM_CTRL_PAGE); - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DISASTER_CLUSTER) { /* fresh ss dorado cluster run mode */ g_instance.dms_cxt.SSReformerControl.clusterRunMode = ss_dorado_mode; - SSDoradoRefreshMode(); + SSDisasterRefreshMode(); } int src_id = g_instance.dms_cxt.SSReformerControl.primaryInstId; ereport(LOG, (errmsg("[SS reform] node%d starts, found cluster PRIMARY:%d", @@ -3838,7 +3838,7 @@ static int ServerLoop(void) fd_set rmask; int selres; - if (t_thrd.postmaster_cxt.HaShmData->current_mode != NORMAL_MODE || IS_SHARED_STORAGE_MODE || SS_REPLICATION_DORADO_CLUSTER) { + if (t_thrd.postmaster_cxt.HaShmData->current_mode != NORMAL_MODE || IS_SHARED_STORAGE_MODE || SS_DISASTER_CLUSTER) { check_and_reset_ha_listen_port(); #ifdef HAVE_POLL @@ -4159,7 +4159,7 @@ static int ServerLoop(void) * one. But this is needed only in normal operation (else we cannot * be writing any new WAL). */ - if (g_instance.pid_cxt.WalWriterPID == 0 && pmState == PM_RUN && !SS_REPLICATION_STANDBY_CLUSTER) { + if (g_instance.pid_cxt.WalWriterPID == 0 && pmState == PM_RUN && !SS_DORADO_STANDBY_CLUSTER) { g_instance.pid_cxt.WalWriterPID = initialize_util_thread(WALWRITER); if (g_instance.attr.attr_storage.enable_uwal) { if (t_thrd.postmaster_cxt.audit_standby_switchover || t_thrd.postmaster_cxt.audit_primary_failover) { @@ -6742,7 +6742,7 @@ dms_demote: signal_child(g_instance.pid_cxt.StatementPID, SIGTERM); } - if (g_instance.pid_cxt.StartupPID != 0 && (SS_REPLICATION_STANDBY_CLUSTER)) { + if (g_instance.pid_cxt.StartupPID != 0 && (SS_DISASTER_STANDBY_CLUSTER)) { signal_child(g_instance.pid_cxt.StartupPID, SIGTERM); } @@ -6751,7 +6751,7 @@ dms_demote: signal_child(g_instance.pid_cxt.WalWriterPID, SIGTERM); StopAliveBuildSender(); - if (g_instance.pid_cxt.WalReceiverPID != 0 && SS_REPLICATION_STANDBY_CLUSTER) { + if (g_instance.pid_cxt.WalReceiverPID != 0 && SS_DISASTER_STANDBY_CLUSTER) { signal_child(g_instance.pid_cxt.WalReceiverPID, SIGTERM); } @@ -7058,7 +7058,7 @@ static void reaper(SIGNAL_ARGS) } } - if (g_instance.pid_cxt.WalWriterPID == 0 && !SS_REPLICATION_STANDBY_CLUSTER) { + if (g_instance.pid_cxt.WalWriterPID == 0 && !SS_DORADO_STANDBY_CLUSTER) { g_instance.pid_cxt.WalWriterPID = initialize_util_thread(WALWRITER); if (g_instance.attr.attr_storage.enable_uwal) { if (t_thrd.postmaster_cxt.audit_standby_switchover || @@ -10122,14 +10122,14 @@ static void sigusr1_handler(SIGNAL_ARGS) * update cluster_run_mode from pg_control file, * in case failover has been performed between two dorado cluster. */ - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DISASTER_CLUSTER) { SSReadControlFile(REFORM_CTRL_PAGE); } - if (SS_REPLICATION_MAIN_STANBY_NODE) { + if (SS_DISASTER_MAIN_STANDBY_NODE) { ereport(LOG, (errmsg("Failover between two dorado cluster start, change current run mode and dssserver mode to primary_cluster"))); g_instance.dms_cxt.SSReformerControl.clusterRunMode = RUN_MODE_PRIMARY; - SSDoradoRefreshMode(); + SSDisasterRefreshMode(); while (dss_set_server_status_wrapper() != GS_SUCCESS) { pg_usleep(REFORM_WAIT_LONG); ereport(WARNING, (errmodule(MOD_DMS), @@ -10261,9 +10261,9 @@ static void sigusr1_handler(SIGNAL_ARGS) if (CheckPostmasterSignal(PMSIGNAL_START_WALRECEIVER) && g_instance.pid_cxt.WalReceiverPID == 0 && (pmState == PM_STARTUP || pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY || pmState == PM_WAIT_READONLY) && g_instance.status == NoShutdown && - (!ENABLE_DMS || SS_REPLICATION_DORADO_CLUSTER)) { - /* when SS_REPLICATION_DORADO_CLUSTER enabled, don't start walrecwrite */ - if (g_instance.pid_cxt.WalRcvWriterPID == 0 && !SS_REPLICATION_DORADO_CLUSTER) { + (!ENABLE_DMS || SS_DORADO_CLUSTER || SS_DISASTER_CLUSTER)) { + /* when SS_DORADO_CLUSTER enabled, don't start walrecwrite */ + if (g_instance.pid_cxt.WalRcvWriterPID == 0 && !SS_DORADO_CLUSTER) { g_instance.pid_cxt.WalRcvWriterPID = initialize_util_thread(WALRECWRITE); SetWalRcvWriterPID(g_instance.pid_cxt.WalRcvWriterPID); } @@ -10357,7 +10357,7 @@ static void sigusr1_handler(SIGNAL_ARGS) /* shut down all backends and autovac workers */ (void)SignalSomeChildren(SIGTERM, BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC); - if (g_instance.pid_cxt.PgStatPID != 0 && SS_REPLICATION_STANDBY_CLUSTER) { + if (g_instance.pid_cxt.PgStatPID != 0 && SS_DISASTER_STANDBY_CLUSTER) { signal_child(g_instance.pid_cxt.PgStatPID, SIGQUIT); } @@ -10393,23 +10393,23 @@ static void sigusr1_handler(SIGNAL_ARGS) signal_child(g_instance.pid_cxt.WLMCollectPID, SIGTERM); } - if (g_instance.pid_cxt.UndoLauncherPID != 0 && (SS_REPLICATION_STANDBY_CLUSTER)) { + if (g_instance.pid_cxt.UndoLauncherPID != 0 && (SS_DISASTER_STANDBY_CLUSTER)) { signal_child(g_instance.pid_cxt.UndoLauncherPID, SIGTERM); } #ifndef ENABLE_MULTIPLE_NODES - if (g_instance.pid_cxt.ApplyLauncerPID != 0 && (SS_REPLICATION_STANDBY_CLUSTER)) { + if (g_instance.pid_cxt.ApplyLauncerPID != 0 && (SS_DISASTER_STANDBY_CLUSTER)) { signal_child(g_instance.pid_cxt.ApplyLauncerPID, SIGTERM); } #endif - if (g_instance.pid_cxt.GlobalStatsPID != 0 && (SS_REPLICATION_STANDBY_CLUSTER)) { + if (g_instance.pid_cxt.GlobalStatsPID != 0 && (SS_DISASTER_STANDBY_CLUSTER)) { signal_child(g_instance.pid_cxt.GlobalStatsPID, SIGTERM); } - if (g_instance.pid_cxt.UndoRecyclerPID != 0 && (SS_REPLICATION_STANDBY_CLUSTER)) { + if (g_instance.pid_cxt.UndoRecyclerPID != 0 && (SS_DISASTER_STANDBY_CLUSTER)) { signal_child(g_instance.pid_cxt.UndoRecyclerPID, SIGTERM); } - if (g_instance.pid_cxt.FaultMonitorPID != 0 && (SS_REPLICATION_STANDBY_CLUSTER)) { + if (g_instance.pid_cxt.FaultMonitorPID != 0 && (SS_DISASTER_STANDBY_CLUSTER)) { signal_child(g_instance.pid_cxt.FaultMonitorPID, SIGTERM); } @@ -10527,7 +10527,7 @@ static void sigusr1_handler(SIGNAL_ARGS) if (g_instance.pid_cxt.AutoVacPID != 0) signal_child(g_instance.pid_cxt.AutoVacPID, SIGTERM); - if (g_instance.pid_cxt.PgStatPID != 0 && SS_REPLICATION_STANDBY_CLUSTER) { + if (g_instance.pid_cxt.PgStatPID != 0 && SS_DISASTER_STANDBY_CLUSTER) { signal_child(g_instance.pid_cxt.PgStatPID, SIGQUIT); } @@ -12859,7 +12859,7 @@ const char* wal_get_db_state_string(DbState db_state) static ServerMode get_cur_mode(void) { - if (SS_REPLICATION_MAIN_STANBY_NODE) { + if (SS_DISASTER_MAIN_STANDBY_NODE) { return STANDBY_MODE; } else if (ENABLE_DMS) { return SS_OFFICIAL_PRIMARY ? PRIMARY_MODE : STANDBY_MODE; @@ -15200,7 +15200,7 @@ void InitShmemForDmsCallBack() const char *GetSSServerMode(ServerMode mode) { - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DISASTER_CLUSTER) { if (SS_OFFICIAL_PRIMARY && mode == PRIMARY_MODE) { return "Primary"; } diff --git a/src/gausskernel/process/postmaster/startup.cpp b/src/gausskernel/process/postmaster/startup.cpp index 65d06d7be..9e8a14d1f 100755 --- a/src/gausskernel/process/postmaster/startup.cpp +++ b/src/gausskernel/process/postmaster/startup.cpp @@ -29,7 +29,7 @@ #include "postmaster/startup.h" #include "postmaster/postmaster.h" #include "replication/shared_storage_walreceiver.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "storage/ipc.h" #include "storage/latch.h" #include "storage/pmsignal.h" @@ -232,7 +232,7 @@ void HandleStartupProcInterrupts(void) * Check if we were requested to exit without finishing recovery. */ if (t_thrd.startup_cxt.shutdown_requested && SmartShutdown != g_instance.status) { - if (t_thrd.xlog_cxt.StandbyModeRequested && SS_REPLICATION_MAIN_STANBY_NODE) { + if (t_thrd.xlog_cxt.StandbyModeRequested && SS_DISASTER_MAIN_STANDBY_NODE) { ereport(LOG, (errmsg("dorado standby cluster switchover shutdown startup\n"))); if (!IsExtremeRedo()) { DisownLatch(&t_thrd.shemem_ptr_cxt.XLogCtl->recoveryWakeupLatch); @@ -285,7 +285,7 @@ static void StartupReleaseAllLocks(int code, Datum arg) void DeleteDisConnFileInClusterStandby() { - if (!(IS_SHARED_STORAGE_MODE || SS_REPLICATION_DORADO_CLUSTER)) { + if (!(IS_SHARED_STORAGE_MODE || SS_DORADO_CLUSTER)) { return; } diff --git a/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp b/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp index 292da0091..83bead554 100755 --- a/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp @@ -73,7 +73,7 @@ #include "access/multi_redo_api.h" #include "replication/walreceiver.h" #include "replication/datareceiver.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "pgxc/barrier.h" #include "storage/file/fio_device.h" #include "utils/timestamp.h" @@ -972,7 +972,7 @@ void PageManagerProcSegFullSyncState(XLogRecParseState *parseState) void PageManagerProcSegPipeLineSyncState(XLogRecParseState *parseState) { - if (!SS_REPLICATION_MAIN_STANBY_NODE) { + if (!SS_DISASTER_STANDBY_CLUSTER) { WaitCurrentPipeLineRedoWorkersQueueEmpty(); } MemoryContext oldCtx = MemoryContextSwitchTo(g_redoWorker->oldCtx); diff --git a/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp b/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp index bd0063ccb..b9e093c06 100644 --- a/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp +++ b/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp @@ -28,8 +28,8 @@ #include "ddes/dms/ss_reform_common.h" #include "replication/walreceiver.h" #include "replication/dcf_replication.h" +#include "replication/ss_disaster_cluster.h" #include "replication/shared_storage_walreceiver.h" -#include "replication/ss_cluster_replication.h" #include "storage/ipc.h" namespace extreme_rto { @@ -551,15 +551,11 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, for (;;) { uint32 readSource = pg_atomic_read_u32(&(g_recordbuffer->readSource)); - if (readSource & XLOG_FROM_STREAM && !SS_REPLICATION_DORADO_CLUSTER) { + if (readSource & XLOG_FROM_STREAM && !SS_DISASTER_STANDBY_CLUSTER) { readLen = ParallelXLogReadWorkBufRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI); } else { if (ENABLE_DMS && ENABLE_DSS) { - /* - * when ss_enable_dorado = on, traversing xlog directory to read xlog record. - * Loop doesn't quit until it find valid xlog record. - */ - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { for (int i = 0; i < DMS_MAX_INSTANCE; i++) { if (g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i][0] == '\0') { break; @@ -1005,16 +1001,16 @@ XLogRecord *XLogParallelReadNextRecord(XLogReaderState *xlogreader) latestRecordCrc = record->xl_crc; latestRecordLen = record->xl_tot_len; ADD_ABNORMAL_POSITION(9); - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { t_thrd.xlog_cxt.ssXlogReadFailedTimes = 0; } /* Great, got a record */ return record; } else { - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { t_thrd.xlog_cxt.ssXlogReadFailedTimes++; - /* In SS_REPLICATION_DORADO_CLUSTER mode, loop back to retry. */ + /* In SS_DISASTER_STANDBY_CLUSTER mode, loop back to retry. */ xlogreader->preReadStartPtr = InvalidXlogPreReadStartPtr; } diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index b6d1ba2ad..e7b813bd6 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -92,7 +92,7 @@ #include "replication/reorderbuffer.h" #include "replication/replicainternal.h" #include "replication/shared_storage_walreceiver.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "replication/slot.h" #include "replication/snapbuild.h" #include "replication/syncrep.h" @@ -273,7 +273,6 @@ static void remove_xlogtemp_files(void); static bool validate_parse_delay_ddl_file(DelayDDLRange *delayRange); static bool write_delay_ddl_file(const DelayDDLRange &delayRange, bool onErrDelete); extern void CalculateLocalLatestSnapshot(bool forceCalc); -extern std::vector SSGetAllStableNodeId(); extern int ock_uwal_append(IN UwalAppendParam *param, OUT uint64_t *offset, OUT void *result); /* @@ -667,7 +666,7 @@ static XLogRecPtr XLogInsertRecordGroup(XLogRecData *rdata, XLogRecPtr fpw_lsn) ereport(ERROR, (errcode(ERRCODE_CASE_NOT_FOUND), errmsg("cannot make new WAL entries during recovery"))); } - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DORADO_STANDBY_CLUSTER) { ereport(FATAL, (errmsg("SS dorado standby cluster cannot insert XLOG entries"))); } @@ -1111,7 +1110,7 @@ static XLogRecPtr XLogInsertRecordSingle(XLogRecData *rdata, XLogRecPtr fpw_lsn) ereport(ERROR, (errcode(ERRCODE_CASE_NOT_FOUND), errmsg("cannot make new WAL entries during recovery"))); } - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DORADO_STANDBY_CLUSTER) { ereport(FATAL, (errmsg("SS dorado standby cluster cannot insert XLOG entries"))); } @@ -3699,7 +3698,7 @@ bool XLogBackgroundFlush(void) return false; } - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DORADO_STANDBY_CLUSTER) { return false; } @@ -4026,7 +4025,7 @@ static int XLogFileInitInternal(XLogSegNo logsegno, bool *use_existent, bool use } } - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DORADO_STANDBY_CLUSTER) { ereport(FATAL, (errmsg("SS dorado standby cluster cannot init xlog files due to DSS"))); } @@ -4489,7 +4488,7 @@ static int XLogFileOpenInternal(XLogSegNo segno, const char *xlog_dir) (uint32)((segno) / XLogSegmentsPerXLogId), (uint32)((segno) % XLogSegmentsPerXLogId)); securec_check_ss(errorno, "", ""); - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { fd = SSErgodicOpenXlogFile(segno, O_RDWR | PG_BINARY | (unsigned int)get_sync_bit(u_sess->attr.attr_storage.sync_method), S_IRUSR | S_IWUSR); } else { @@ -4607,38 +4606,6 @@ retry: return fd; } - - /* - * When SS_REPLICATION_DORADO_CLUSTER enabled, current xlog dictionary may be not the correct dictionary, - * because all xlog dictionaries are in the same LUN, we need loop over other dictionaries. - * Do we need source == XLOG_FROM_STREAM? - */ - if (SS_REPLICATION_DORADO_CLUSTER && source == XLOG_FROM_STREAM) { - std::vector nodeList = SSGetAllStableNodeId(); // stable node list, - Assert(!nodeList.empty()); - char xlogPath[MAXPGPATH]; - char *dssdir = g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name; - for (auto elem : nodeList) { - if (elem == g_instance.dms_cxt.SSReformerControl.recoveryInstId) { - continue; - } - - /* try to read from other xlog dictionary */ - errorno = snprintf_s(xlogPath, MAXPGPATH, MAXPGPATH - 1, "%s/pg_xlog%d", dssdir, elem); - securec_check_ss(errorno, "", ""); - - errorno = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%08X%08X%08X", xlogPath, tli, - (uint32)((segno) / XLogSegmentsPerXLogId), (uint32)((segno) % XLogSegmentsPerXLogId)); - securec_check_ss(errorno, "", ""); - - fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); - if (fd < 0) { - continue; - } - ereport(LOG, (errmsg("find xlog file in path : \"%s\"", path))); - goto retry; - } - } if (!FILE_POSSIBLY_DELETED(errno) || !notfoundOk) { /* unexpected failure? */ ereport(PANIC, (errcode_for_file_access(), errmsg("could not open file \"%s\" (log segment %s): %s", path, @@ -5159,7 +5126,7 @@ static void ExecuteRecoveryCommand(char *command, char *commandName, bool failOn static void PreallocXlogFiles(XLogRecPtr endptr) { /* In ss repplication dorado cluster, standby cluster doesn't need to preallocate xlog files */ - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DORADO_STANDBY_CLUSTER) { return; } XLogSegNo _logSegNo; @@ -5308,7 +5275,7 @@ static void UpdateLastRemovedPtr(const char *filename) */ static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr endptr) { - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DORADO_STANDBY_CLUSTER) { return; } @@ -5594,7 +5561,7 @@ static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, in for (;;) { char *errormsg = NULL; - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { record = SSXLogReadRecordErgodic(xlogreader, RecPtr, &errormsg); } else { record = XLogReadRecord(xlogreader, RecPtr, &errormsg); @@ -5650,13 +5617,13 @@ static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, in latestValidRecord = t_thrd.xlog_cxt.ReadRecPtr; latestRecordCrc = record->xl_crc; latestRecordLen = record->xl_tot_len; - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { t_thrd.xlog_cxt.ssXlogReadFailedTimes = 0; } /* Great, got a record */ return record; } else { - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { t_thrd.xlog_cxt.ssXlogReadFailedTimes++; xlogreader->preReadStartPtr = InvalidXlogPreReadStartPtr; @@ -6442,7 +6409,7 @@ void UpdateControlFile(void) if (fd < 0) { ereport(FATAL, (errcode_for_file_access(), errmsg("could not open control file \"%s\": %s", fname[i], TRANSLATE_ERRNO))); } - + off_t seekpos = GetControlPageByInstanceId(); (void)lseek(fd, seekpos, SEEK_SET); @@ -9322,7 +9289,7 @@ void StartupXLOG(void) ereport(LOG, (errmsg("[On-demand]: Ondemand recovery do not finish in last reform, " "reading control file of original primary:%d", src_id))); SSOndemandRecoveryExitNormal = false; - } else if (SS_REPLICATION_DORADO_CLUSTER) { + } else if (SS_DORADO_CLUSTER) { src_id = SSGetPrimaryInstId(); } else { if (SS_STANDBY_FAILOVER || SS_STANDBY_PROMOTING) { @@ -9535,8 +9502,8 @@ void StartupXLOG(void) if (ENABLE_DMS && ENABLE_DSS) { SSGetRecoveryXlogPath(); - if (SS_REPLICATION_DORADO_CLUSTER) { - SSDoradoGetXlogPathList(); + if (SS_DISASTER_CLUSTER) { + SSDisasterGetXlogPathList(); } xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); close_readFile_if_open(); @@ -10027,7 +9994,7 @@ void StartupXLOG(void) } } - if (SS_PRIMARY_MODE || SS_REPLICATION_MAIN_STANBY_NODE) { + if (SS_PRIMARY_MODE || SS_DISASTER_MAIN_STANDBY_NODE) { LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); SSUpdateReformerCtrl(); LWLockRelease(ControlFileLock); @@ -10690,7 +10657,7 @@ void StartupXLOG(void) EndOfLog = t_thrd.xlog_cxt.EndRecPtr; XLByteToPrevSeg(EndOfLog, endLogSegNo); - if (!SS_REPLICATION_STANDBY_CLUSTER && + if (!SS_DISASTER_STANDBY_CLUSTER && ((ENABLE_DMS && SS_STANDBY_FAILOVER) || SS_STANDBY_PROMOTING)) { bool use_existent = true; (void)XLogFileInit(endLogSegNo, &use_existent, true); @@ -11124,11 +11091,11 @@ void StartupXLOG(void) } #endif - if (ENABLE_DMS && ENABLE_REFORM && !SS_PRIMARY_DEMOTED && !SS_REPLICATION_STANDBY_CLUSTER) { + if (ENABLE_DMS && ENABLE_REFORM && !SS_PRIMARY_DEMOTED && !SS_DISASTER_STANDBY_CLUSTER) { StartupWaitReform(); } - if (SS_REPLICATION_MAIN_STANBY_NODE) { + if (SS_DISASTER_MAIN_STANDBY_NODE) { SendPostmasterSignal(PMSIGNAL_UPDATE_PROMOTING); } } @@ -11850,7 +11817,7 @@ void ShutdownXLOG(int code, Datum arg) { if (SS_PRIMARY_DEMOTING) { ereport(LOG, (errmsg("[SS switchover] primary demote: doing shutdown checkpoint"))); - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DISASTER_STANDBY_CLUSTER) { CreateRestartPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); } else { CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); @@ -11883,7 +11850,7 @@ void ShutdownXLOG(int code, Datum arg) } if (g_instance.wal_cxt.upgradeSwitchMode != ExtremelyFast) { - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DISASTER_STANDBY_CLUSTER) { CreateRestartPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); } else { CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); @@ -12104,7 +12071,7 @@ void CreateCheckPoint(int flags) (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("can't create a checkpoint during recovery"))); } - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DORADO_STANDBY_CLUSTER) { ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("can't create a checkpoint on SS dorado standby cluster"))); } @@ -13636,7 +13603,7 @@ static XlogKeeper* KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo, XLogRecPtr * while dms and dss enable, t_thrd.xlog_cxt.server_mode only is normal_mode, we do additional * check for dms and dss enabling. */ - if ((!ENABLE_DMS && t_thrd.xlog_cxt.server_mode == PRIMARY_MODE) || SS_REPLICATION_PRIMARY_NODE) { + if ((!ENABLE_DMS && t_thrd.xlog_cxt.server_mode == PRIMARY_MODE) || SS_DISASTER_PRIMARY_NODE) { if (WalSndInProgress(SNDROLE_PRIMARY_BUILDSTANDBY) || pg_atomic_read_u32(&g_instance.comm_cxt.current_gsrewind_count) > 0) { /* segno = 1 show all file should be keep */ @@ -13829,7 +13796,7 @@ static void XLogReportParameters(void) * to keep them up-to-date to avoid confusion. */ if ((g_instance.attr.attr_storage.wal_level != t_thrd.shemem_ptr_cxt.ControlFile->wal_level || XLogIsNeeded()) - && !SS_REPLICATION_STANDBY_CLUSTER) { + && !SS_DISASTER_STANDBY_CLUSTER) { xl_parameter_change xlrec; XLogRecPtr recptr; @@ -18024,7 +17991,7 @@ bool CheckForFailoverTrigger(void) */ ResetSlotLSNEndRecovery(slotname); - if (!SS_REPLICATION_MAIN_STANBY_NODE) { + if (!SS_DISASTER_MAIN_STANDBY_NODE) { SendPostmasterSignal(PMSIGNAL_UPDATE_PROMOTING); } ret = true; @@ -19593,7 +19560,7 @@ void NormalClusterDoradoStorageInit() void ShareStorageInit() { - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { SSClusterDoradoStorageInit(); } else if (IS_SHARED_STORAGE_MODE) { NormalClusterDoradoStorageInit(); @@ -20090,7 +20057,7 @@ retry: t_thrd.xlog_cxt.RedoDone = IsRedoDonePromoting(); pg_memory_barrier(); - if (SS_REPLICATION_MAIN_STANBY_NODE && WalRcvIsDone() && CheckForFailoverTrigger()) { + if (SS_DORADO_MAIN_STANDBY_NODE && WalRcvIsDone() && CheckForFailoverTrigger()) { ProcTxnWorkLoad(true); ereport(LOG, (errmsg("RecPtr(%X/%X), receivedUpto(%X/%X).", (uint32)(RecPtr >> 32), (uint32)RecPtr, (uint32)(t_thrd.xlog_cxt.receivedUpto >> 32), @@ -20130,7 +20097,7 @@ retry: if (randAccess) { t_thrd.xlog_cxt.curFileTLI = 0; } - if (SS_REPLICATION_MAIN_STANBY_NODE && CheckForFailoverTrigger()) { + if (SS_DORADO_MAIN_STANDBY_NODE && CheckForFailoverTrigger()) { goto triggered; } /* @@ -20292,13 +20259,428 @@ triggered: return -1; } +static int SSStreamReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path) +{ + /* Load reader private data */ + XLogPageReadPrivate *readprivate = (XLogPageReadPrivate*)xlogreader->private_data; + bool fetching_ckpt = readprivate->fetching_ckpt; + int emode = readprivate->emode; + XLogRecPtr RecPtr = targetPagePtr; + uint32 targetPageOff; + bool processtrxn = false; + bool randAccess = readprivate->randAccess; + XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl; + XLogSegNo replayedSegNo; + uint32 actualBytes; + bool havedata = false; + + targetPageOff = targetPagePtr % XLogSegSize; + + if (t_thrd.xlog_cxt.readFile >= 0 && !XLByteInSeg(targetPagePtr, t_thrd.xlog_cxt.readSegNo)) { + /* + * Request a restartpoint if we've replayed too much xlog since the + * last one. + */ + if (t_thrd.xlog_cxt.StandbyModeRequested && + (t_thrd.xlog_cxt.bgwriterLaunched || t_thrd.xlog_cxt.pagewriter_launched) && !dummyStandbyMode) { + if (get_real_recovery_parallelism() > 1) { + XLByteToSeg(GetXLogReplayRecPtr(NULL), replayedSegNo); + } else { + replayedSegNo = t_thrd.xlog_cxt.readSegNo; + } + if (XLogCheckpointNeeded(replayedSegNo)) { + (void)GetRedoRecPtr(); + if (XLogCheckpointNeeded(replayedSegNo)) { + RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); + } + } + } + + close(t_thrd.xlog_cxt.readFile); + t_thrd.xlog_cxt.readFile = -1; + t_thrd.xlog_cxt.readSource = 0; + } + + t_thrd.xlog_cxt.readOff = targetPageOff; + t_thrd.xlog_cxt.readLen = XLOG_BLCKSZ; + XLByteToSeg(targetPagePtr, t_thrd.xlog_cxt.readSegNo); + XLByteAdvance(RecPtr, reqLen); + + XLogRecPtr expectedRecPtr = RecPtr; + if (RecPtr % XLogSegSize == 0) { + XLByteAdvance(expectedRecPtr, SizeOfXLogLongPHD); + } else if (RecPtr % XLOG_BLCKSZ == 0) { + XLByteAdvance(expectedRecPtr, SizeOfXLogShortPHD); + } + +retry: + /* See if we need to retrieve more data */ + /* In ss dorado replication, we don't start walrecwrite thread, so t_thrd.xlog_cxt.receivedUpto = 0 */ + if (t_thrd.xlog_cxt.readFile < 0 || + (t_thrd.xlog_cxt.readSource == XLOG_FROM_STREAM && XLByteLT(t_thrd.xlog_cxt.receivedUpto, RecPtr))) { + if (t_thrd.xlog_cxt.StandbyMode && t_thrd.xlog_cxt.startup_processing) { + for (;;) { + /* + * Need to check here also for the case where consistency level is + * already reached without replaying any record i.e. just after reading + * of checkpoint data it has reached to minRecoveryPoint. Also + * whenever we are going to loop in the data receive from master + * node its bettwe we check if consistency level has reached. So + * instead of keeping in all places before ReadRecord, we can keep + * here in centralised location. + */ + ProcTxnWorkLoad(false); + + CheckRecoveryConsistency(); + if (WalRcvInProgress()) { + + if (t_thrd.xlog_cxt.failedSources & XLOG_FROM_STREAM) { + ProcTxnWorkLoad(true); + ereport(LOG, (errmsg("read from stream failed, request xlog receivedupto at %X/%X.", + (uint32)(t_thrd.xlog_cxt.receivedUpto >> 32), + (uint32)t_thrd.xlog_cxt.receivedUpto))); + ShutdownWalRcv(); + continue; + } + + /* + * Walreceiver is active, so see if new data has arrived. + * + * We only advance XLogReceiptTime when we obtain fresh + * WAL from walreceiver and observe that we had already + * processed everything before the most recent "chunk" + * that it flushed to disk. In steady state where we are + * keeping up with the incoming data, XLogReceiptTime will + * be updated on each cycle. When we are behind, + * XLogReceiptTime will not advance, so the grace time + * alloted to conflicting queries will decrease. + */ + if (RecPtr % XLogSegSize == 0) { + XLByteAdvance(expectedRecPtr, SizeOfXLogLongPHD); + } else if (RecPtr % XLOG_BLCKSZ == 0) { + XLByteAdvance(expectedRecPtr, SizeOfXLogShortPHD); + } + + if (XLByteLT(expectedRecPtr, t_thrd.xlog_cxt.receivedUpto)) { + havedata = true; + } else { + XLogRecPtr latestChunkStart; + + t_thrd.xlog_cxt.receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); + if (XLByteLT(expectedRecPtr, t_thrd.xlog_cxt.receivedUpto)) { + havedata = true; + if (!XLByteLT(RecPtr, latestChunkStart)) { + t_thrd.xlog_cxt.XLogReceiptTime = GetCurrentTimestamp(); + SetCurrentChunkStartTime(t_thrd.xlog_cxt.XLogReceiptTime); + } + } else { + havedata = false; + } + } + if (havedata) { + /* + * Great, streamed far enough. Open the file if it's + * not open already. Use XLOG_FROM_STREAM so that + * source info is set correctly and XLogReceiptTime + * isn't changed. + */ + if (t_thrd.xlog_cxt.readFile < 0) { + t_thrd.xlog_cxt.readFile = SSXLogFileOpenAnyTLI(t_thrd.xlog_cxt.readSegNo, emode, XLOG_FROM_STREAM, SS_XLOGDIR); + Assert(t_thrd.xlog_cxt.readFile >= 0); + } else { + t_thrd.xlog_cxt.readSource = XLOG_FROM_STREAM; + t_thrd.xlog_cxt.XLogReceiptSource = XLOG_FROM_STREAM; + } + break; + } + + t_thrd.xlog_cxt.RedoDone = IsRedoDonePromoting(); + pg_memory_barrier(); + + if (WalRcvIsDone() && (CheckForSwitchoverTrigger() || CheckForFailoverTrigger())) { + if (t_thrd.xlog_cxt.is_cascade_standby && t_thrd.xlog_cxt.server_mode == STANDBY_MODE) { + HandleCascadeStandbyPromote(fetching_ckpt ? &t_thrd.xlog_cxt.RedoStartLSN : &targetRecPtr); + continue; + } + goto retry; + } + if (!processtrxn) { + ProcTxnWorkLoad(true); + processtrxn = true; + goto retry; + } + + /* + * Wait for more WAL to arrive, or timeout to be reached + */ + WaitLatch(&t_thrd.shemem_ptr_cxt.XLogCtl->recoveryWakeupLatch, WL_LATCH_SET | WL_TIMEOUT, 1000L); + processtrxn = false; + ResetLatch(&t_thrd.shemem_ptr_cxt.XLogCtl->recoveryWakeupLatch); + } else { + /* + * Until walreceiver manages to reconnect, poll the + * archive. + */ + if (t_thrd.xlog_cxt.readFile >= 0) { + close(t_thrd.xlog_cxt.readFile); + t_thrd.xlog_cxt.readFile = -1; + } + /* Reset curFileTLI if random fetch. */ + if (randAccess) { + t_thrd.xlog_cxt.curFileTLI = 0; + } + + /* + * Try to restore the file from archive, or read an + * existing file from pg_xlog. + */ + uint32 sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG; + ereport(DEBUG5, (errmsg("failedSources: %u", t_thrd.xlog_cxt.failedSources))); + if (!(sources & ~t_thrd.xlog_cxt.failedSources)) { + /* + * We've exhausted all options for retrieving the + * file. Retry. + */ + t_thrd.xlog_cxt.failedSources = 0; + + /* + * Before we sleep, re-scan for possible new timelines + * if we were requested to recover to the latest + * timeline. + */ + if (t_thrd.xlog_cxt.recoveryTargetIsLatest) { + if (rescanLatestTimeLine()) { + continue; + } + } + + if (t_thrd.startup_cxt.shutdown_requested) { + ereport(LOG, (errmsg("startup shutdown"))); + proc_exit(0); + } + + if (!xlogctl->IsRecoveryDone) { + g_instance.comm_cxt.predo_cxt.redoPf.redo_done_time = GetCurrentTimestamp(); + g_instance.comm_cxt.predo_cxt.redoPf.recovery_done_ptr = t_thrd.xlog_cxt.ReadRecPtr; + ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), + errmsg("XLogPageRead IsRecoveryDone is set true," + "ReadRecPtr:%X/%X, EndRecPtr:%X/%X", + (uint32)(t_thrd.xlog_cxt.ReadRecPtr >> 32), + (uint32)(t_thrd.xlog_cxt.ReadRecPtr), + (uint32)(t_thrd.xlog_cxt.EndRecPtr >> 32), + (uint32)(t_thrd.xlog_cxt.EndRecPtr)))); + parallel_recovery::redo_dump_all_stats(); + } + + /* + * signal postmaster to update local redo end + * point to gaussdb state file. + */ + ProcTxnWorkLoad(true); + if (!xlogctl->IsRecoveryDone) { + SendPostmasterSignal(PMSIGNAL_LOCAL_RECOVERY_DONE); + if (SS_PERFORMING_SWITCHOVER) { + g_instance.dms_cxt.SSClusterState = NODESTATE_STANDBY_PROMOTED; + } + } + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->IsRecoveryDone = true; + SpinLockRelease(&xlogctl->info_lck); + static uint64 printFrequency = 0; + if (!(IS_SHARED_STORAGE_MODE) || + pg_atomic_read_u32(&t_thrd.walreceiverfuncs_cxt.WalRcv->rcvDoneFromShareStorage)) { + knl_g_set_redo_finish_status(REDO_FINISH_STATUS_LOCAL | REDO_FINISH_STATUS_CM); + if ((printFrequency & 0xFF) == 0) { + ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), + errmsg("XLogPageRead set redo finish status," + "ReadRecPtr:%X/%X, EndRecPtr:%X/%X", + (uint32)(t_thrd.xlog_cxt.ReadRecPtr >> 32), + (uint32)(t_thrd.xlog_cxt.ReadRecPtr), + (uint32)(t_thrd.xlog_cxt.EndRecPtr >> 32), + (uint32)(t_thrd.xlog_cxt.EndRecPtr)))); + } + printFrequency++; + + pg_usleep(50000L); + } + /* + * If primary_conninfo is set, launch walreceiver to + * try to stream the missing WAL, before retrying to + * restore from archive/pg_xlog. + * + * If fetching_ckpt is TRUE, RecPtr points to the + * initial checkpoint location. In that case, we use + * RedoStartLSN as the streaming start position + * instead of RecPtr, so that when we later jump + * backwards to start redo at RedoStartLSN, we will + * have the logs streamed already. + */ + load_server_mode(); + + /* Get xlog and data from DummyStandby */ + if (CheckForFailoverTrigger()) { + goto triggered; + } + + ProcTxnWorkLoad(false); + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; + CheckMaxPageFlushLSN(targetRecPtr); + rename_recovery_conf_for_roach(); + ereport(LOG, (errmsg("request xlog stream at %X/%X.", + fetching_ckpt ? (uint32)(t_thrd.xlog_cxt.RedoStartLSN >> 32) + : (uint32)(targetRecPtr >> 32), + fetching_ckpt ? (uint32)t_thrd.xlog_cxt.RedoStartLSN + : (uint32)targetRecPtr))); + ShutdownWalRcv(); + t_thrd.xlog_cxt.receivedUpto = 0; + SpinLockAcquire(&walrcv->mutex); + walrcv->receivedUpto = 0; + SpinLockRelease(&walrcv->mutex); + + RequestXLogStreaming(fetching_ckpt ? &t_thrd.xlog_cxt.RedoStartLSN : &targetRecPtr, + t_thrd.xlog_cxt.PrimaryConnInfo, REPCONNTARGET_PRIMARY, + u_sess->attr.attr_storage.PrimarySlotName); + continue; + } + /* Don't try to read from a source that just failed */ + sources &= ~t_thrd.xlog_cxt.failedSources; + t_thrd.xlog_cxt.readFile = SSXLogFileOpenAnyTLI(t_thrd.xlog_cxt.readSegNo, emode, sources, xlog_path); + if (t_thrd.xlog_cxt.readFile >= 0) { + break; + } + + ereport(DEBUG5, (errmsg("do not find any more files.sources=%u failedSources=%u", sources, + t_thrd.xlog_cxt.failedSources))); + /* + * Nope, not found in archive and/or pg_xlog. + */ + t_thrd.xlog_cxt.failedSources |= sources; + + /* + * Check to see if the trigger file exists. Note that we + * do this only after failure, so when you create the + * trigger file, we still finish replaying as much as we + * can from archive and pg_xlog before failover. + */ + if (CheckForFailoverTrigger() || CheckForSwitchoverTrigger()) { + XLogRecPtr receivedUpto = GetWalRcvWriteRecPtr(NULL); + XLogRecPtr EndRecPtrTemp = t_thrd.xlog_cxt.EndRecPtr; + XLByteAdvance(EndRecPtrTemp, SizeOfXLogRecord); + if (XLByteLT(EndRecPtrTemp, receivedUpto) && !FORCE_FINISH_ENABLED && + t_thrd.xlog_cxt.currentRetryTimes++ < g_retryTimes) { + ereport(WARNING, (errmsg("there are some received xlog have not been redo " + "the tail of last redo lsn:%X/%X, received lsn:%X/%X, retry %d times", + (uint32)(EndRecPtrTemp >> 32), (uint32)EndRecPtrTemp, + (uint32)(receivedUpto >> 32), (uint32)receivedUpto, + t_thrd.xlog_cxt.currentRetryTimes))); + return -1; + } + ereport(LOG, + (errmsg("read record failed when promoting, current lsn (%X/%X), received lsn(%X/%X)," + "sources[%u], failedSources[%u], readSource[%u], readFile[%d], readId[%u]," + "readSeg[%u], readOff[%u], readLen[%u]", + (uint32)(RecPtr >> 32), (uint32)RecPtr, + (uint32)(t_thrd.xlog_cxt.receivedUpto >> 32), + (uint32)t_thrd.xlog_cxt.receivedUpto, sources, t_thrd.xlog_cxt.failedSources, + t_thrd.xlog_cxt.readSource, t_thrd.xlog_cxt.readFile, + (uint32)(t_thrd.xlog_cxt.readSegNo >> 32), (uint32)t_thrd.xlog_cxt.readSegNo, + t_thrd.xlog_cxt.readOff, t_thrd.xlog_cxt.readLen))); + goto triggered; + } + } + + /* + * This possibly-long loop needs to handle interrupts of + * startup process. + */ + RedoInterruptCallBack(); + } + } else { + /* In archive or crash recovery. */ + if (t_thrd.xlog_cxt.readFile < 0) { + uint32 sources; + + /* Reset curFileTLI if random fetch. */ + if (randAccess) { + t_thrd.xlog_cxt.curFileTLI = 0; + } + + sources = XLOG_FROM_PG_XLOG; + if (t_thrd.xlog_cxt.InArchiveRecovery) { + sources |= XLOG_FROM_ARCHIVE; + } + + t_thrd.xlog_cxt.readFile = SSXLogFileOpenAnyTLI(t_thrd.xlog_cxt.readSegNo, emode, sources, SS_XLOGDIR); + + if (t_thrd.xlog_cxt.readFile < 0) { + return -1; + } + } + } + } + + if (t_thrd.xlog_cxt.readSource == XLOG_FROM_STREAM) { + if ((targetPagePtr / XLOG_BLCKSZ) != (t_thrd.xlog_cxt.receivedUpto / XLOG_BLCKSZ)) { + t_thrd.xlog_cxt.readLen = XLOG_BLCKSZ; + } else { + t_thrd.xlog_cxt.readLen = t_thrd.xlog_cxt.receivedUpto % XLogSegSize - targetPageOff; + } + } else { + t_thrd.xlog_cxt.readLen = XLOG_BLCKSZ; + } + + /* Read the requested page */ + t_thrd.xlog_cxt.readOff = targetPageOff; + + actualBytes = (uint32)pread(t_thrd.xlog_cxt.readFile, readBuf, t_thrd.xlog_cxt.readLen, t_thrd.xlog_cxt.readOff); + if (actualBytes != t_thrd.xlog_cxt.readLen) { + ereport(LOG, (errmsg("%s read failed", xlog_path))); + goto next_record_is_invalid; + } + + Assert(targetPageOff == t_thrd.xlog_cxt.readOff); + Assert((uint32)reqLen <= t_thrd.xlog_cxt.readLen); + *readTLI = t_thrd.xlog_cxt.curFileTLI; + + return t_thrd.xlog_cxt.readLen; + +next_record_is_invalid: + t_thrd.xlog_cxt.failedSources |= t_thrd.xlog_cxt.readSource; + + if (t_thrd.xlog_cxt.readFile >= 0) { + close(t_thrd.xlog_cxt.readFile); + } + t_thrd.xlog_cxt.readFile = -1; + t_thrd.xlog_cxt.readLen = 0; + t_thrd.xlog_cxt.readSource = 0; + + return -1; + +triggered: + if (t_thrd.xlog_cxt.readFile >= 0) { + close(t_thrd.xlog_cxt.readFile); + } + t_thrd.xlog_cxt.readFile = -1; + t_thrd.xlog_cxt.readLen = 0; + t_thrd.xlog_cxt.readSource = 0; + t_thrd.xlog_cxt.recoveryTriggered = true; + + return -1; +} + int SSXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path) -{ - int read_len; - if (SS_REPLICATION_DORADO_CLUSTER) { +{ + int read_len; + if (SS_DORADO_CLUSTER) { read_len = SSDoradoReadXLog(xlogreader, targetPagePtr, reqLen, targetRecPtr, readBuf, readTLI, xlog_path); + } else if (SS_STREAM_MAIN_STANDBY_NODE) { + read_len = SSStreamReadXLog(xlogreader, targetPagePtr, reqLen, targetRecPtr, + readBuf, readTLI, g_instance.dms_cxt.SSRecoveryInfo.recovery_xlog_dir); } else { read_len = SSReadXLog(xlogreader, targetPagePtr, reqLen, targetRecPtr, readBuf, readTLI, g_instance.dms_cxt.SSRecoveryInfo.recovery_xlog_dir); diff --git a/src/gausskernel/storage/access/transam/xloginsert.cpp b/src/gausskernel/storage/access/transam/xloginsert.cpp index 69edb9273..4e27cda69 100755 --- a/src/gausskernel/storage/access/transam/xloginsert.cpp +++ b/src/gausskernel/storage/access/transam/xloginsert.cpp @@ -35,7 +35,7 @@ #include "utils/guc.h" #include "pg_trace.h" #include "replication/logical.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "pgstat.h" #include "access/ustore/knl_upage.h" @@ -96,7 +96,7 @@ void XLogBeginInsert(void) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("XLogBeginInsert was already called"))); - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DISASTER_STANDBY_CLUSTER) { ereport(LOG, (errmsg("SS dorado standby cluster cannot insert XLOG entries"))); return; } diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index b986b1a0d..3ffda0e6a 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -74,7 +74,7 @@ #include "utils/evp_cipher.h" #include "replication/walsender_private.h" #include "replication/walsender.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "workload/workload.h" #include "utils/builtins.h" #include "catalog/pg_namespace.h" @@ -1783,7 +1783,7 @@ Buffer ReadBufferExtended(Relation reln, ForkNumber fork_num, BlockNumber block_ { /* In ss replication dorado cluster mode, it is not supported that standby read in extreme rto. */ if (IsDefaultExtremeRtoMode() && RecoveryInProgress() && IsExtremeRtoRunning() && is_exrto_standby_read_worker() && - !SS_REPLICATION_DORADO_CLUSTER) { + !SS_DISASTER_STANDBY_CLUSTER) { return standby_read_buf(reln, fork_num, block_num, mode, strategy); } else { return buffer_read_extended_internal(reln, fork_num, block_num, mode, strategy); @@ -2939,7 +2939,7 @@ void PageCheckIfCanEliminate(BufferDesc *buf, uint64 *oldFlags, bool *needGetLoc #ifdef USE_ASSERT_CHECKING void PageCheckWhenChosedElimination(const BufferDesc *buf, uint64 oldFlags) { - if (SS_REFORM_REFORMER || SS_REPLICATION_MAIN_STANBY_NODE) { + if (SS_REFORM_REFORMER || SS_DISASTER_STANDBY_CLUSTER) { return; } diff --git a/src/gausskernel/storage/file/fd.cpp b/src/gausskernel/storage/file/fd.cpp index e80e07894..67e351ca9 100755 --- a/src/gausskernel/storage/file/fd.cpp +++ b/src/gausskernel/storage/file/fd.cpp @@ -871,7 +871,7 @@ tryAgain: /* -* When SS_REPLICATION_DORADO_CLUSTER enabled, current xlog dictionary may be not the correct dictionary, +* When SS_DORADO_CLUSTER enabled, current xlog dictionary may be not the correct dictionary, * because all xlog dictionaries are in the same LUN, we need loop over other dictionaries. */ int SSErgodicOpenXlogFile(XLogSegNo segno, int fileFlags, int fileMode) diff --git a/src/gausskernel/storage/ipc/procarray.cpp b/src/gausskernel/storage/ipc/procarray.cpp index 12f40b9b5..312e80e1d 100755 --- a/src/gausskernel/storage/ipc/procarray.cpp +++ b/src/gausskernel/storage/ipc/procarray.cpp @@ -124,7 +124,7 @@ #include "ddes/dms/ss_common_attr.h" #include "ddes/dms/ss_transaction.h" #include "ddes/dms/ss_reform_common.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #ifdef ENABLE_UT #define static @@ -2378,7 +2378,7 @@ GROUP_GET_SNAPSHOT: } /* Check whether there's a standby requiring an older xmin when dms is enabled. */ - if (SS_NORMAL_PRIMARY && SS_REPLICATION_MAIN_STANBY_NODE) { + if (SS_NORMAL_PRIMARY && SS_DISASTER_MAIN_STANDBY_NODE) { uint64 global_xmin = SSGetGlobalOldestXmin(u_sess->utils_cxt.RecentGlobalXmin); u_sess->utils_cxt.RecentGlobalXmin = global_xmin; } diff --git a/src/gausskernel/storage/replication/basebackup.cpp b/src/gausskernel/storage/replication/basebackup.cpp index 9432b444f..ea2e3b8ac 100755 --- a/src/gausskernel/storage/replication/basebackup.cpp +++ b/src/gausskernel/storage/replication/basebackup.cpp @@ -31,7 +31,7 @@ #include "replication/dcf_data.h" #include "replication/walsender.h" #include "replication/walsender_private.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "replication/slot.h" #include "access/xlog.h" #include "storage/cfs/cfs_converter.h" @@ -2243,7 +2243,7 @@ static bool sendFile(char *readfilename, char *tarfilename, struct stat *statbuf ereport(ERROR, (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", readfilename))); } } - if (g_instance.attr.attr_storage.enableIncrementalCheckpoint && isNeedCheck && !SS_REPLICATION_DORADO_CLUSTER) { + if (g_instance.attr.attr_storage.enableIncrementalCheckpoint && isNeedCheck && !SS_DISASTER_CLUSTER) { uint32 segSize; GET_SEG_SIZE(undoFileType, segSize); /* len and cnt must be integer multiple of BLCKSZ. */ diff --git a/src/gausskernel/storage/replication/dataqueue.cpp b/src/gausskernel/storage/replication/dataqueue.cpp index 716b2e956..1b376c7ad 100644 --- a/src/gausskernel/storage/replication/dataqueue.cpp +++ b/src/gausskernel/storage/replication/dataqueue.cpp @@ -37,7 +37,7 @@ #include "replication/syncrep.h" #include "replication/walsender.h" #include "replication/walreceiver.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "storage/lock/lwlock.h" #include "storage/proc.h" #include "storage/shmem.h" @@ -304,7 +304,7 @@ DataQueuePtr PushToSenderQueue(const RelFileNode &rnode, BlockNumber blockNum, S LWLockRelease(DataSyncRepLock); if (g_instance.attr.attr_storage.max_wal_senders > 0) { - if (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !(IS_SHARED_STORAGE_MODE || SS_REPLICATION_DORADO_CLUSTER)) { + if (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !(IS_SHARED_STORAGE_MODE || SS_DORADO_CLUSTER)) { ereport( LOG, (errmsg("failed to push rnode %u/%u/%u blockno %u into data-queue becuase sync_master_standalone " diff --git a/src/gausskernel/storage/replication/datasyncrep.cpp b/src/gausskernel/storage/replication/datasyncrep.cpp index c7acda72f..2f852d614 100644 --- a/src/gausskernel/storage/replication/datasyncrep.cpp +++ b/src/gausskernel/storage/replication/datasyncrep.cpp @@ -34,7 +34,7 @@ #include "replication/datasender_private.h" #include "replication/walsender_private.h" #include "replication/shared_storage_walreceiver.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "replication/syncrep.h" #include "storage/cu.h" #include "storage/pmsignal.h" @@ -186,7 +186,7 @@ void WaitForDataSync(void) /* * if we modify the syncmode dynamically, we'll stop wait */ - if ((t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !(IS_SHARED_STORAGE_MODE || SS_REPLICATION_DORADO_CLUSTER)) || + if ((t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !(IS_SHARED_STORAGE_MODE || SS_DORADO_CLUSTER)) || u_sess->attr.attr_storage.guc_synchronous_commit <= SYNCHRONOUS_COMMIT_LOCAL_FLUSH) { ereport(WARNING, (errmsg("canceling wait for synchronous replication due to syncmaster standalone."), diff --git a/src/gausskernel/storage/replication/libpqwalreceiver.cpp b/src/gausskernel/storage/replication/libpqwalreceiver.cpp index 736755914..2006a7e36 100755 --- a/src/gausskernel/storage/replication/libpqwalreceiver.cpp +++ b/src/gausskernel/storage/replication/libpqwalreceiver.cpp @@ -28,7 +28,7 @@ #include "replication/walreceiver.h" #include "replication/walsender_private.h" #include "replication/libpqwalreceiver.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "storage/pmsignal.h" #include "storage/proc.h" #include "utils/guc.h" @@ -239,11 +239,11 @@ static bool CheckRemoteServerSharedStorage(ServerMode remoteMode, PGresult* res) static bool CheckSSRemoteServerMode(ServerMode remoteMode, PGresult* res) { - if (SS_REPLICATION_MAIN_STANBY_NODE) { + if (SS_DISASTER_MAIN_STANDBY_NODE) { if (remoteMode != PRIMARY_MODE) { PQclear(res); ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("when ss replication, main standby of standby cluster of the remote server must be primary, current is %s", + errmsg("when ss disaster, main standby of standby cluster of the remote server must be primary, current is %s", wal_get_role_string(remoteMode, true)))); return false; } @@ -493,7 +493,7 @@ ServerMode IdentifyRemoteMode() !t_thrd.walreceiver_cxt.AmWalReceiverForFailover && (!IS_PRIMARY_NORMAL(remoteMode)) && /* remoteMode of cascade standby is a standby */ - !t_thrd.xlog_cxt.is_cascade_standby && !(IS_SHARED_STORAGE_MODE || SS_REPLICATION_DORADO_CLUSTER)) { + !t_thrd.xlog_cxt.is_cascade_standby && !(IS_SHARED_STORAGE_MODE || SS_DORADO_CLUSTER)) { PQclear(res); if (dummyStandbyMode) { @@ -506,7 +506,7 @@ ServerMode IdentifyRemoteMode() } if (t_thrd.postmaster_cxt.HaShmData->is_cascade_standby && remoteMode != STANDBY_MODE && - !(IS_SHARED_STORAGE_MODE || SS_REPLICATION_DORADO_CLUSTER)) { + !(IS_SHARED_STORAGE_MODE || SS_DORADO_CLUSTER)) { PQclear(res); SpinLockAcquire(&walrcv->mutex); @@ -524,7 +524,7 @@ ServerMode IdentifyRemoteMode() } } - if (SS_REPLICATION_DORADO_CLUSTER && !CheckSSRemoteServerMode(remoteMode, res)) { + if (SS_DORADO_CLUSTER && !CheckSSRemoteServerMode(remoteMode, res)) { return UNKNOWN_MODE; } @@ -577,7 +577,7 @@ static int32 IdentifyRemoteVersion() (errcode(ERRCODE_INVALID_STATUS), errmsg("could not get the local protocal version, make sure the PG_PROTOCOL_VERSION is defined"))); } - if (!IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && !SS_REPLICATION_MAIN_STANBY_NODE) { + if (!IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && !SS_DORADO_MAIN_STANDBY_NODE) { if (walrcv->conn_target != REPCONNTARGET_DUMMYSTANDBY && (localTerm == 0 || localTerm > remoteTerm) && !AM_HADR_WAL_RECEIVER) { PQclear(res); @@ -700,7 +700,7 @@ bool libpqrcv_connect(char *conninfo, XLogRecPtr *startpoint, char *slotname, in rc = memset_s(passwd, MAXPGPATH, 0, MAXPGPATH); securec_check(rc, "\0", "\0"); } else if (IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE || - SS_REPLICATION_MAIN_STANBY_NODE) { + SS_DORADO_MAIN_STANDBY_NODE) { nRet = snprintf_s(conninfoRepl, sizeof(conninfoRepl), sizeof(conninfoRepl) - 1, "%s dbname=postgres replication=standby_cluster " "fallback_application_name=%s_hass " @@ -1417,7 +1417,7 @@ bool libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) } *type = *((unsigned char *)t_thrd.libwalreceiver_cxt.recvBuf); - if ((IS_SHARED_STORAGE_MODE || SS_REPLICATION_DORADO_CLUSTER) && !AM_HADR_WAL_RECEIVER && *type == 'w') { + if ((IS_SHARED_STORAGE_MODE || SS_DORADO_CLUSTER) && !AM_HADR_WAL_RECEIVER && *type == 'w') { *len = 0; return false; } diff --git a/src/gausskernel/storage/replication/shared_storage_walreceiver.cpp b/src/gausskernel/storage/replication/shared_storage_walreceiver.cpp index dbc14eb8a..7a8ce2a26 100644 --- a/src/gausskernel/storage/replication/shared_storage_walreceiver.cpp +++ b/src/gausskernel/storage/replication/shared_storage_walreceiver.cpp @@ -30,7 +30,7 @@ #include "miscadmin.h" #include "replication/walreceiver.h" #include "replication/shared_storage_walreceiver.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "replication/slot.h" #include "storage/pmsignal.h" #include "storage/proc.h" @@ -369,7 +369,7 @@ bool shared_storage_connect(char *conninfo, XLogRecPtr *startpoint, char *slotna walrcv->peer_state = NORMAL_STATE; walrcv->isFirstTimeAccessStorage = true; - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { libpgConnected = libpqrcv_connect(conninfo, startpoint, slotname, channel_identifier); return libpgConnected; } @@ -438,7 +438,7 @@ bool shared_storage_receive(int timeout, unsigned char *type, char **buffer, int * When ss cluster replication enabled, no xlog will receive, so return false directly. * Xlog will replicated by Dorado synchronous replication. */ - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { return false; } @@ -447,7 +447,7 @@ bool shared_storage_receive(int timeout, unsigned char *type, char **buffer, int void shared_storage_send(const char *buffer, int nbytes) { - if (IS_SHARED_STORAGE_STANBY_MODE || SS_REPLICATION_DORADO_CLUSTER) { + if (IS_SHARED_STORAGE_STANBY_MODE || SS_DORADO_CLUSTER) { if (t_thrd.libwalreceiver_cxt.streamConn) libpqrcv_send(buffer, nbytes); } diff --git a/src/gausskernel/storage/replication/ss_cluster_replication.cpp b/src/gausskernel/storage/replication/ss_cluster_replication.cpp index 5f5e035f3..dbc68cbbe 100644 --- a/src/gausskernel/storage/replication/ss_cluster_replication.cpp +++ b/src/gausskernel/storage/replication/ss_cluster_replication.cpp @@ -22,7 +22,7 @@ * --------------------------------------------------------------------------------------- */ -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "access/xlog_internal.h" #include "storage/file/fio_device.h" #include "storage/smgr/fd.h" @@ -174,7 +174,7 @@ void SSClusterDoradoStorageInit() void UpdateSSDoradoCtlInfoAndSync() { - if (!SS_REPLICATION_PRIMARY_NODE) { + if (!SS_DORADO_PRIMARY_NODE) { return; } @@ -202,7 +202,7 @@ void InitSSDoradoCtlInfo(ShareStorageXLogCtl *ctlInfo, uint64 sysidentifier) void CheckSSDoradoCtlInfo(XLogRecPtr localEnd) { - if (!SS_REPLICATION_DORADO_CLUSTER) { + if (!SS_DORADO_CLUSTER) { return; } diff --git a/src/gausskernel/storage/replication/syncrep.cpp b/src/gausskernel/storage/replication/syncrep.cpp index 5fb2124fd..faf481c4f 100755 --- a/src/gausskernel/storage/replication/syncrep.cpp +++ b/src/gausskernel/storage/replication/syncrep.cpp @@ -56,7 +56,7 @@ #include "replication/walsender.h" #include "replication/walsender_private.h" #include "replication/shared_storage_walreceiver.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "storage/pmsignal.h" #include "storage/proc.h" #include "tcop/tcopprot.h" @@ -218,7 +218,7 @@ SyncWaitRet SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel) * sync replication standby names defined. Note that those standbys don't * need to be connected. */ - if (ENABLE_DMS || !u_sess->attr.attr_storage.enable_stream_replication || !SyncRepRequested() || + if ((ENABLE_DMS && !SS_STREAM_CLUSTER) || !u_sess->attr.attr_storage.enable_stream_replication || !SyncRepRequested() || !SyncStandbysDefined() || (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE)) return NOT_REQUEST; @@ -250,7 +250,7 @@ SyncWaitRet SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel) return REPSYNCED; } if (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !DelayIntoMostAvaSync(false) && - !IS_SHARED_STORAGE_MODE && !SS_REPLICATION_DORADO_CLUSTER) { + !IS_SHARED_STORAGE_MODE && !SS_DORADO_CLUSTER) { LWLockRelease(SyncRepLock); RESUME_INTERRUPTS(); return STAND_ALONE; diff --git a/src/gausskernel/storage/replication/walreceiver.cpp b/src/gausskernel/storage/replication/walreceiver.cpp index ef06d3885..15d9a91e6 100755 --- a/src/gausskernel/storage/replication/walreceiver.cpp +++ b/src/gausskernel/storage/replication/walreceiver.cpp @@ -59,7 +59,7 @@ #include "replication/walsender.h" #include "replication/walsender_private.h" #include "replication/dcf_replication.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "storage/copydir.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -424,7 +424,7 @@ void WalRcvrProcessData(TimestampTz *last_recv_timestamp, bool *ping_sent) } /* walrec in dorado replication mode not need walrecwrite */ - if (!SS_REPLICATION_MAIN_STANBY_NODE && !WalRcvWriterInProgress()) + if (!SS_DORADO_MAIN_STANDBY_NODE && !WalRcvWriterInProgress()) ereport(FATAL, (errmsg("terminating walreceiver process due to the death of walrcvwriter"))); #ifndef ENABLE_MULTIPLE_NODES /* For Paxos, receive wal should be done by send log callback function */ @@ -506,7 +506,7 @@ void WalReceiverMain(void) int nRet = 0; errno_t rc = 0; - if (SS_REPLICATION_MAIN_STANBY_NODE) { + if (SS_DISASTER_MAIN_STANDBY_NODE) { ereport(LOG, (errmsg("walreceiver thread started for main standby"))); } else { Assert(ENABLE_DSS == false); @@ -613,7 +613,7 @@ void WalReceiverMain(void) int walreplindex = hashmdata->current_repl; SpinLockRelease(&hashmdata->mutex); - if (!IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && !SS_REPLICATION_MAIN_STANBY_NODE) { + if (!IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && !SS_DISASTER_MAIN_STANDBY_NODE) { replConnInfo = t_thrd.postmaster_cxt.ReplConnArray[walreplindex]; } else if (walreplindex >= MAX_REPLNODE_NUM) { replConnInfo = t_thrd.postmaster_cxt.CrossClusterReplConnArray[walreplindex - MAX_REPLNODE_NUM]; @@ -883,7 +883,7 @@ bool HasBuildReason() static void rcvAllXlog() { - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { return; } @@ -924,7 +924,7 @@ static void WalRcvDie(int code, Datum arg) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; - if ((IS_SHARED_STORAGE_MODE || SS_REPLICATION_DORADO_CLUSTER) && !t_thrd.walreceiver_cxt.termChanged) { + if ((IS_SHARED_STORAGE_MODE || SS_DORADO_CLUSTER) && !t_thrd.walreceiver_cxt.termChanged) { SpinLockAcquire(&walrcv->mutex); walrcv->walRcvState = WALRCV_STOPPING; SpinLockRelease(&walrcv->mutex); @@ -1177,10 +1177,10 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) { #ifndef ENABLE_MULTIPLE_NODES if (IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE || AM_HADR_WAL_RECEIVER - || SS_REPLICATION_DORADO_CLUSTER) { + || SS_DISASTER_CLUSTER) { #else if (IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE || AM_HADR_WAL_RECEIVER - || SS_REPLICATION_DORADO_CLUSTER || AM_HADR_CN_WAL_RECEIVER) { + || SS_DISASTER_CLUSTER || AM_HADR_CN_WAL_RECEIVER) { #endif break; } @@ -1623,7 +1623,7 @@ void XLogWalRcvSendReply(bool force, bool requestReply) ReadShareStorageCtlInfo(ctlInfo); receivePtr = ctlInfo->insertHead; AlignFreeShareStorageCtl(ctlInfo); - } else if (SS_REPLICATION_DORADO_CLUSTER) { + } else if (SS_DORADO_CLUSTER) { ReadSSDoradoCtlInfoFile(); receivePtr = g_instance.xlog_cxt.ssReplicationXLogCtl->insertHead; writePtr = g_instance.xlog_cxt.ssReplicationXLogCtl->insertHead; @@ -1673,7 +1673,7 @@ void XLogWalRcvSendReply(bool force, bool requestReply) t_thrd.walreceiver_cxt.reply_message->replyRequested = requestReply; SpinLockAcquire(&hashmdata->mutex); - if (!IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && !SS_REPLICATION_MAIN_STANBY_NODE) + if (!IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && !SS_DORADO_MAIN_STANDBY_NODE) t_thrd.walreceiver_cxt.reply_message->peer_role = hashmdata->current_mode; else t_thrd.walreceiver_cxt.reply_message->peer_role = STANDBY_CLUSTER_MODE; @@ -2414,7 +2414,7 @@ Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) sndReplay = sendLocFix; } - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { ReadSSDoradoCtlInfoFile(); XLogRecPtr sendLocFix = rcvReceived; ShareStorageXLogCtl *ctlInfo = g_instance.xlog_cxt.ssReplicationXLogCtl; diff --git a/src/gausskernel/storage/replication/walreceiverfuncs.cpp b/src/gausskernel/storage/replication/walreceiverfuncs.cpp index 7bd1802f3..d0f234b83 100755 --- a/src/gausskernel/storage/replication/walreceiverfuncs.cpp +++ b/src/gausskernel/storage/replication/walreceiverfuncs.cpp @@ -36,7 +36,7 @@ #include "replication/replicainternal.h" #include "replication/walreceiver.h" #include "replication/slot.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" #include "replication/dcf_replication.h" #include "replication/walsender_private.h" #include "storage/pmsignal.h" @@ -260,7 +260,7 @@ static void SetWalRcvConninfo(ReplConnTarget conn_target) SpinLockRelease(&walrcv->mutex); ereport(LOG, (errmsg("wal receiver try to connect to %s index %d .", walrcv->conninfo, useIndex))); SpinLockAcquire(&hashmdata->mutex); - if (!IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && !SS_REPLICATION_MAIN_STANBY_NODE) + if (!IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && !SS_DISASTER_MAIN_STANDBY_NODE) hashmdata->current_repl = useIndex; else hashmdata->current_repl = MAX_REPLNODE_NUM + useIndex; @@ -418,7 +418,7 @@ static void set_rcv_slot_name(const char *slotname) SpinLockAcquire(&hashmdata->mutex); replIdx = hashmdata->current_repl; SpinLockRelease(&hashmdata->mutex); - if ((IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE || SS_REPLICATION_MAIN_STANBY_NODE) + if ((IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE || SS_DISASTER_MAIN_STANDBY_NODE) && replIdx >= MAX_REPLNODE_NUM) { replIdx = replIdx - MAX_REPLNODE_NUM; } @@ -897,7 +897,7 @@ ReplConnInfo *GetRepConnArray(int *cur_idx) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid replication node index:%d", *cur_idx))); } - if (!IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && !SS_REPLICATION_MAIN_STANBY_NODE) + if (!IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && !SS_DISASTER_MAIN_STANDBY_NODE) replConnInfoArray = &t_thrd.postmaster_cxt.ReplConnArray[0]; else replConnInfoArray = &t_thrd.postmaster_cxt.CrossClusterReplConnArray[0]; diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 5ce7ba8be..f36c4d882 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -87,7 +87,8 @@ #include "replication/parallel_decode.h" #include "replication/parallel_decode_worker.h" #include "replication/parallel_reorderbuffer.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" +#include "ddes/dms/ss_reform_common.h" #include "storage/buf/bufmgr.h" #include "storage/smgr/fd.h" #include "storage/ipc.h" @@ -139,8 +140,8 @@ static int g_appname_extra_len = 3; /* [+]+\0 */ #define AmWalSenderToStandby() (t_thrd.walsender_cxt.MyWalSnd->sendRole == SNDROLE_PRIMARY_STANDBY) #define USE_PHYSICAL_XLOG_SEND \ - (AM_WAL_HADR_SENDER || !SS_REPLICATION_DORADO_CLUSTER || !IS_SHARED_STORAGE_MODE || (walsnd->sendRole == SNDROLE_PRIMARY_BUILDSTANDBY)) -#define USE_SYNC_REP_FLUSH_PTR (AM_WAL_HADR_SENDER && (!IS_SHARED_STORAGE_MODE && !SS_REPLICATION_DORADO_CLUSTER)) + (AM_WAL_HADR_SENDER || !SS_DORADO_CLUSTER || !IS_SHARED_STORAGE_MODE || (walsnd->sendRole == SNDROLE_PRIMARY_BUILDSTANDBY)) +#define USE_SYNC_REP_FLUSH_PTR (AM_WAL_HADR_SENDER && (!IS_SHARED_STORAGE_MODE && !SS_DORADO_CLUSTER)) /* Statistics for log control */ static const int MICROSECONDS_PER_SECONDS = 1000000; @@ -172,15 +173,20 @@ static void WalSndLastCycleHandler(SIGNAL_ARGS); static void IdentifyCommand(Node* cmd_node, ReplicationCxt* repCxt, const char *cmd_string); static void HandleWalReplicationCommand(const char *cmd_string, ReplicationCxt* repCxt); -typedef void (*WalSndSendDataCallback)(void); -static int WalSndLoop(WalSndSendDataCallback send_data); +typedef void (*WalSndSendDataCallback)(char*); +static int WalSndLoop(WalSndSendDataCallback send_data, char* xlogPath = NULL); static void InitWalSnd(void); static void WalSndHandshake(void); static void WalSndKill(int code, Datum arg); -static void XLogSendPhysical(void); -static void XLogSendUwalLSN(void); -static void XLogSendLogical(void); -static void XLogSendUwalStatus(void); + + +static void XLogSendLSN(char* xlogPath = NULL); +static void XLogSendPhysical(char* xlogPath = NULL); +static void XLogSendUwalLSN(char* xlogPath = NULL); +static void XLogSendLogical(char* xlogPath = NULL); +static void XLogSendUwalStatus(char* xlogPath = NULL); +static void XLogSendParallelLogical(char* xlogPath = NULL); + static void IdentifySystem(void); static void IdentifyVersion(void); static void IdentifyConsistence(IdentifyConsistenceCmd *cmd); @@ -226,7 +232,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); -static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +static void XLogRead(char *buf, XLogRecPtr startptr, Size count, char* xlogPath = NULL); static void SetWalSndPeerMode(ServerMode mode); static void SetWalSndPeerDbstate(DbState state); @@ -252,7 +258,7 @@ static int WalSndTimeout(); char *DataDir = "."; -static void XLogSendLSN(void) +static void XLogSendLSN(char* xlogPath) { PrimaryKeepaliveMessage keepalive_message; volatile HaShmemData* hashmdata = t_thrd.postmaster_cxt.HaShmData; @@ -508,7 +514,12 @@ int WalSenderMain(void) else { if (USE_PHYSICAL_XLOG_SEND) { if (!g_instance.attr.attr_storage.enable_uwal) { - return WalSndLoop(XLogSendPhysical); + if (SS_DISASTER_CLUSTER && walsnd->sendRole != SNDROLE_PRIMARY_BUILDSTANDBY) { + return WalSndLoop(XLogSendPhysical, + SSGetNextXLogPath(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.walsender_cxt.sentPtr)); + } else { + return WalSndLoop(XLogSendPhysical); + } } else { return WalSndLoop(XLogSendUwalStatus); } @@ -990,7 +1001,7 @@ static void IdentifyConsistence(IdentifyConsistenceCmd *cmd) */ if (t_thrd.proc && t_thrd.proc->workingVersionNum >= 92060) { /* Don't care max xlog when check with building process */ - if (IsWalSenderToBuild() == false) { + if (!IsWalSenderToBuild()) { if (dummyStandbyMode) { localMaxPtr = FindMaxLSN(t_thrd.proc_cxt.DataDir, msgBuf, XLOG_READER_MAX_MSGLENTH, &localMaxLsnCrc); } else { @@ -2684,7 +2695,7 @@ static void LogCtrlDoActualSleep(volatile WalSnd *walsnd, bool forceUpdate) u_sess->attr.attr_storage.hadr_recovery_point_target > 0) { LogCtrlExecuteSleeping(walsnd, forceUpdate, logical_slot_sleep_flag); } else { - if (logical_slot_sleep_flag && !IS_SHARED_STORAGE_MODE && !SS_REPLICATION_DORADO_CLUSTER) { + if (logical_slot_sleep_flag && !IS_SHARED_STORAGE_MODE && !SS_DORADO_CLUSTER) { pg_usleep(g_logical_slot_sleep_time); } } @@ -2692,7 +2703,7 @@ static void LogCtrlDoActualSleep(volatile WalSnd *walsnd, bool forceUpdate) if (u_sess->attr.attr_storage.target_rto > 0) { LogCtrlExecuteSleeping(walsnd, forceUpdate, logical_slot_sleep_flag); } else { - if (logical_slot_sleep_flag && !IS_SHARED_STORAGE_MODE && !SS_REPLICATION_DORADO_CLUSTER) { + if (logical_slot_sleep_flag && !IS_SHARED_STORAGE_MODE && !SS_DORADO_CLUSTER) { pg_usleep(g_logical_slot_sleep_time); } } @@ -2723,7 +2734,7 @@ static void LogCtrlExecuteSleeping(volatile WalSnd *walsnd, bool forceUpdate, bo } LogCtrlSleep(); if (logicalSlotSleepFlag && g_logical_slot_sleep_time > t_thrd.walsender_cxt.MyWalSnd->log_ctrl.sleep_time && - !IS_SHARED_STORAGE_MODE && !SS_REPLICATION_DORADO_CLUSTER) { + !IS_SHARED_STORAGE_MODE && !SS_DORADO_CLUSTER) { pg_usleep(g_logical_slot_sleep_time - t_thrd.walsender_cxt.MyWalSnd->log_ctrl.sleep_time); } } @@ -2964,7 +2975,7 @@ static void ProcessStandbyReplyMessage(void) * because primary xlog will cover standby xlog by Dorado synchronous replication. * 2. Otherwise, we only need to confirm that standby xlog has been flushed successfully. */ - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { AdvanceReplicationSlot(reply.apply); } else { AdvanceReplicationSlot(reply.flush); @@ -3214,7 +3225,7 @@ static void LogCtrlCountSleepLimit(void) static void LogCtrlSleep(void) { volatile WalSnd *walsnd = t_thrd.walsender_cxt.MyWalSnd; - if (IS_SHARED_STORAGE_MODE || SS_REPLICATION_DORADO_CLUSTER) { + if (IS_SHARED_STORAGE_MODE || SS_DORADO_CLUSTER) { if (walsnd->log_ctrl.sleep_time > MICROSECONDS_PER_SECONDS) { walsnd->log_ctrl.sleep_time = MICROSECONDS_PER_SECONDS; } @@ -3605,7 +3616,7 @@ static void LogCtrlCalculateCurrentRPO(StandbyReplyMessage *reply) if (AM_WAL_HADR_CN_SENDER) { flushPtr = GetFlushRecPtr(); } else if (AM_WAL_SHARE_STORE_SENDER) { - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DORADO_CLUSTER) { flushPtr = g_instance.xlog_cxt.ssReplicationXLogCtl->insertHead; } else { flushPtr = g_instance.xlog_cxt.shareStorageXLogCtl->insertHead; @@ -3900,7 +3911,7 @@ static void WSDataSendInit() } /* Main loop of walsender process */ -static int WalSndLoop(WalSndSendDataCallback send_data) +static int WalSndLoop(WalSndSendDataCallback send_data, char* xlogPath) { bool first_startup = true; bool sync_config_needed = false; @@ -4160,7 +4171,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data) ChooseStartPointForDummyStandby(); if (!pq_is_send_pending()) { - send_data(); + send_data(xlogPath); } else { t_thrd.walsender_cxt.walSndCaughtUp = false; } @@ -4184,7 +4195,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data) */ LogCtrlSleep(); if (!pq_is_send_pending()) - send_data(); + send_data(xlogPath); else t_thrd.walsender_cxt.walSndCaughtUp = false; @@ -4268,15 +4279,16 @@ static int WalSndLoop(WalSndSendDataCallback send_data) */ if (AmWalSenderToDummyStandby() && WalSndInProgress(SNDROLE_PRIMARY_STANDBY)) ; /* nothing to do */ - else - send_data(); + else { + send_data(xlogPath); + } if (t_thrd.walsender_cxt.walSndCaughtUp && !pq_is_send_pending()) { if (dummyStandbyMode || XLByteEQ(t_thrd.walsender_cxt.sentPtr, t_thrd.walsender_cxt.MyWalSnd->flush)) t_thrd.walsender_cxt.walsender_shutdown_requested = true; } - if (IS_SHARED_STORAGE_MODE || SS_REPLICATION_DORADO_CLUSTER) { + if (IS_SHARED_STORAGE_MODE || SS_DORADO_CLUSTER) { t_thrd.walsender_cxt.walsender_shutdown_requested = true; } } @@ -4824,7 +4836,8 @@ static void WalSndShutdown(void) * once, there will always be one descriptor left open until the process ends, but never * more than one. */ -static void XLogRead(char *buf, XLogRecPtr startptr, Size count) + +static void XLogRead(char *buf, XLogRecPtr startptr, Size count, char* xlogPath) { char *p = NULL; XLogRecPtr recptr; @@ -4894,7 +4907,16 @@ retry: } XLByteToSeg(recptr, t_thrd.walsender_cxt.sendSegNo); - XLogFilePath(path, MAXPGPATH, t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.walsender_cxt.sendSegNo); + + if (xlogPath == NULL) { + XLogFilePath(path, MAXPGPATH, t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.walsender_cxt.sendSegNo); + } else { + int nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%08X%08X%08X", + xlogPath, t_thrd.xlog_cxt.ThisTimeLineID, + (uint32)((t_thrd.walsender_cxt.sendSegNo) / XLogSegmentsPerXLogId), + (uint32)((t_thrd.walsender_cxt.sendSegNo) % XLogSegmentsPerXLogId)); + securec_check_ss_c(nRet, "\0", "\0"); + } t_thrd.walsender_cxt.sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (t_thrd.walsender_cxt.sendFile < 0) { @@ -5174,7 +5196,7 @@ void LogicalLogHandle(ParallelReorderBuffer *prb, logicalLog *logChange) /* * Get the logical logs in logical queue in turn, and send them after processing. */ -void XLogSendParallelLogical() +static void XLogSendParallelLogical(char* xlogPath) { int slotId = t_thrd.walsender_cxt.LogicalSlot; @@ -5253,7 +5275,7 @@ void XLogSendParallelLogical() /* * Stream out logically decoded data. */ -static void XLogSendLogical(void) +static void XLogSendLogical(char* xlogPath) { XLogRecord *record = NULL; char *errm = NULL; @@ -5309,7 +5331,7 @@ static void XLogSendLogical(void) * If there is no unsent WAL remaining, *caughtup is set to true, otherwise * *caughtup is set to false. */ -static void XLogSendPhysical(void) +static void XLogSendPhysical(char* xlogPath) { XLogRecPtr SendRqstPtr = InvalidXLogRecPtr; XLogRecPtr startptr = InvalidXLogRecPtr; @@ -5427,7 +5449,7 @@ static void XLogSendPhysical(void) t_thrd.walsender_cxt.output_xlog_message[0] = 'C'; XLogCompression(&compressedSize, startptr, nbytes); } else { - XLogRead(t_thrd.walsender_cxt.output_xlog_message + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); + XLogRead(t_thrd.walsender_cxt.output_xlog_message + 1 + sizeof(WalDataMessageHeader), startptr, nbytes, xlogPath); ereport(DEBUG5, (errmsg("conninfo:(%s,%d) start: %X/%X, end: %X/%X, %lu bytes", t_thrd.walsender_cxt.MyWalSnd->wal_sender_channel.localhost, t_thrd.walsender_cxt.MyWalSnd->wal_sender_channel.localport, (uint32)(startptr >> 32), @@ -5516,7 +5538,7 @@ static void XLogSendPhysical(void) return; } -static void XLogSendUwalLSN(void) +static void XLogSendUwalLSN(char* xlogPath) { PrimaryKeepaliveMessage keepalive_message; volatile HaShmemData* hashmdata = t_thrd.postmaster_cxt.HaShmData; @@ -5565,16 +5587,16 @@ static void XLogSendUwalLSN(void) WalSndShutdown(); } -static void XLogSendUwalStatus(void) +static void XLogSendUwalStatus(char* xlogPath) { if (t_thrd.walsender_cxt.MyWalSnd->sendRole == SNDROLE_PRIMARY_BUILDSTANDBY) { - return XLogSendPhysical(); + return XLogSendPhysical(xlogPath); } else if (t_thrd.walsender_cxt.MyWalSnd->state < WALSNDSTATE_UWALCATCHUP) { - return XLogSendPhysical(); + return XLogSendPhysical(xlogPath); } else if (t_thrd.walsender_cxt.MyWalSnd->state < WALSNDSTATE_STREAMING) { return; } else { - return XLogSendUwalLSN(); + return XLogSendUwalLSN(xlogPath); } } @@ -6020,7 +6042,7 @@ bool WalSndAllInProgress(int type) allNum++; } /* in dorado cluster of share storage mode we only need one sender one moment */ - if (SS_REPLICATION_DORADO_CLUSTER) { + if (SS_DISASTER_CLUSTER) { allNum = 1; } @@ -6539,7 +6561,7 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) AlignFreeShareStorageCtl(ctlInfo); } - if (SS_REPLICATION_DORADO_CLUSTER && !AM_WAL_HADR_SENDER) { + if (SS_DORADO_CLUSTER && !AM_WAL_HADR_SENDER) { ReadSSDoradoCtlInfoFile(); ShareStorageXLogCtl *ctlInfo = g_instance.xlog_cxt.ssReplicationXLogCtl; sentRecPtr = ctlInfo->insertHead; @@ -7148,7 +7170,7 @@ static bool SendConfigFile(char *path) errno_t errorno = EOK; bool read_guc_file_success = true; - if (AmWalSenderToDummyStandby() || AmWalSenderOnDummyStandby()) + if (AmWalSenderToDummyStandby() || AmWalSenderOnDummyStandby() || SS_DISASTER_CLUSTER) return true; if (lstat(path, &statbuf) < 0 || statbuf.st_size == 0) { diff --git a/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp b/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp index e3b253725..c37285cdd 100644 --- a/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp +++ b/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp @@ -30,7 +30,7 @@ #include "storage/smgr/segment.h" #include "utils/palloc.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" /* * In segment-page storage, an atomic operation may cross multiple functions, touching multiple pages. For example, @@ -397,7 +397,7 @@ void XLogAtomicOpStart() errhint("cannot make new WAL entries during recovery"))); } - if (SS_REPLICATION_STANDBY_CLUSTER) { + if (SS_DORADO_STANDBY_CLUSTER) { ereport(FATAL, (errmsg("SS dorado standby cluster cannot make new WAL entries"))); } diff --git a/src/gausskernel/storage/smgr/segstore.cpp b/src/gausskernel/storage/smgr/segstore.cpp index 7ff5c9893..6102cce6a 100755 --- a/src/gausskernel/storage/smgr/segstore.cpp +++ b/src/gausskernel/storage/smgr/segstore.cpp @@ -40,7 +40,7 @@ #include "storage/procarray.h" #include "ddes/dms/ss_transaction.h" #include "ddes/dms/ss_dms_bufmgr.h" -#include "replication/ss_cluster_replication.h" +#include "replication/ss_disaster_cluster.h" /* * This code manages relations that reside on segment-page storage. It implements functions used for smgr.cpp. * @@ -370,8 +370,8 @@ SegPageLocation seg_logic_to_physic_mapping(SMgrRelation reln, SegmentHead *seg_ BlockNumber blocknum; /* Recovery thread should use physical location to read data directly. */ - if (SS_REPLICATION_MAIN_STANBY_NODE) { - ereport(DEBUG1, (errmsg("can segment address translation when role is SS_REPLICATION_MAIN_STANBY_NODE"))); + if (SS_DISASTER_MAIN_STANDBY_NODE) { + ereport(DEBUG1, (errmsg("can segment address translation when role is SS_DISASTER_MAIN_STANDBY_NODE"))); } else { if (RecoveryInProgress() && !CurrentThreadIsWorker() && !SS_IN_FLUSHCOPY) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is in progress"), diff --git a/src/include/ddes/dms/ss_reform_common.h b/src/include/ddes/dms/ss_reform_common.h index b610ed21e..e638ea4b1 100644 --- a/src/include/ddes/dms/ss_reform_common.h +++ b/src/include/ddes/dms/ss_reform_common.h @@ -43,7 +43,8 @@ int SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XL int readLen); XLogReaderState *SSXLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data, Size alignedSize); void SSGetRecoveryXlogPath(); -void SSDoradoGetXlogPathList(); +char* SSGetNextXLogPath(TimeLineID tli, XLogRecPtr startptr); +void SSDisasterGetXlogPathList(); void SSUpdateReformerCtrl(); void SSReadControlFile(int id, bool updateDmsCtx = false); void SSClearSegCache(); @@ -51,6 +52,6 @@ int SSCancelTransactionOfAllStandby(SSBroadcastOp type); int SSProcessCancelTransaction(SSBroadcastOp type); int SSXLogFileOpenAnyTLI(XLogSegNo segno, int emode, uint32 sources, char* xlog_path); void SSStandbySetLibpqswConninfo(); -void SSDoradoRefreshMode(); -void SSDoradoUpdateHAmode(); +void SSDisasterRefreshMode(); +void SSDisasterUpdateHAmode(); bool SSPerformingStandbyScenario(); diff --git a/src/include/knl/knl_guc/knl_instance_attr_storage.h b/src/include/knl/knl_guc/knl_instance_attr_storage.h index 7666af0df..5c93de563 100755 --- a/src/include/knl/knl_guc/knl_instance_attr_storage.h +++ b/src/include/knl/knl_guc/knl_instance_attr_storage.h @@ -227,6 +227,7 @@ typedef struct knl_instance_attr_storage { int parallel_recovery_timeout; int parallel_recovery_batch; bool ss_enable_dorado; + bool ss_stream_cluster; bool enable_uwal; char* uwal_config; diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 015552c67..0c57c0e1d 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -342,7 +342,6 @@ extern void ParallelDecodeWorkerMain(void* point); extern void LogicalReadWorkerMain(void* point); extern void ParseProcessRecord(ParallelLogicalDecodingContext *ctx, XLogReaderState *record, ParallelDecodeReaderWorker *worker); -extern void XLogSendParallelLogical(); extern void StartLogicalLogWorkers(char* dbUser, char* dbName, char* slotname, List *options, int parallelDecodeNum); extern void CheckBooleanOption(DefElem *elem, bool *booleanOption, bool defaultValue); extern void CheckIntOption(DefElem *elem, int *intOption, int defaultValue, int minVal, int maxVal); diff --git a/src/include/replication/ss_cluster_replication.h b/src/include/replication/ss_cluster_replication.h deleted file mode 100644 index 8b36b5219..000000000 --- a/src/include/replication/ss_cluster_replication.h +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2020 Huawei Technologies Co.,Ltd. - * Portions Copyright (c) 2021, openGauss Contributors - * - * Description: openGauss is licensed under Mulan PSL v2. - * You can use this software according to the terms and conditions of the Mulan PSL v2. - * You may obtain a copy of Mulan PSL v2 at: - * - * http://license.coscl.org.cn/MulanPSL2 - * - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PSL v2 for more details. - * --------------------------------------------------------------------------------------- - * - * - * - * IDENTIFICATION - * src/include/replication/ss_cluster_replication.h - * - * --------------------------------------------------------------------------------------- - */ - -#ifndef INCLUDE_REPLICATION_SS_CLUSTER_REPLICATION_H_ -#define INCLUDE_REPLICATION_SS_CLUSTER_REPLICATION_H_ - -#include "postgres.h" -#include "replication/walprotocol.h" -#include "knl/knl_instance.h" -#include - -const uint32 SS_DORADO_CTL_INFO_SIZE = 512; - -#define SS_REPLICATION_DORADO_CLUSTER \ - (ENABLE_DMS && ENABLE_DSS && g_instance.attr.attr_storage.ss_enable_dorado) - -/* Primary Cluster in SS replication */ -#define SS_REPLICATION_PRIMARY_CLUSTER \ - (SS_REPLICATION_DORADO_CLUSTER && (g_instance.dms_cxt.SSReformerControl.clusterRunMode == RUN_MODE_PRIMARY)) - -/* Standby Cluster in SS replication */ -#define SS_REPLICATION_STANDBY_CLUSTER \ - (SS_REPLICATION_DORADO_CLUSTER && (g_instance.dms_cxt.SSReformerControl.clusterRunMode == RUN_MODE_STANDBY)) - -/* Primary node in SS replication, means primary node in main cluster. */ -#define SS_REPLICATION_PRIMARY_NODE \ - (SS_REPLICATION_PRIMARY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == PRIMARY_MODE)) - -/* Standby node in SS replication, means standby node in main cluster. */ -#define SS_REPLICATION_PRIMARY_CLUSTER_STANDBY_NODE \ - (SS_REPLICATION_PRIMARY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE)) - -/* Main standby node in SS replication, means primary node in standby cluster. */ -#define SS_REPLICATION_MAIN_STANBY_NODE \ - (SS_REPLICATION_STANDBY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE)) - -/* Standby node in SS replication, means standby node in standby cluster. */ -#define SS_REPLICATION_STANDBY_CLUSTER_STANDBY_NODE \ - (SS_REPLICATION_STANDBY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE)) - -/* All Standby in SS replication, means nodes other than primary node in primary cluster and standby cluster */ -#define SS_REPLICATION_STANBY_NODE \ - (SS_REPLICATION_DORADO_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE)) - - -void SSClusterDoradoStorageInit(); -void InitSSDoradoCtlInfo(ShareStorageXLogCtl *ctlInfo, uint64 sysidentifier); -void UpdateSSDoradoCtlInfoAndSync(); -void ReadSSDoradoCtlInfoFile(); -void CheckSSDoradoCtlInfo(XLogRecPtr localEnd); -#endif // INCLUDE_REPLICATION_SS_CLUSTER_REPLICATION_H_ \ No newline at end of file diff --git a/src/include/replication/ss_disaster_cluster.h b/src/include/replication/ss_disaster_cluster.h new file mode 100644 index 000000000..8f5c4c351 --- /dev/null +++ b/src/include/replication/ss_disaster_cluster.h @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2020 Huawei Technologies Co.,Ltd. + * Portions Copyright (c) 2021, openGauss Contributors + * + * Description: openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * --------------------------------------------------------------------------------------- + * + * + * + * IDENTIFICATION + * src/include/replication/ss_disaster_cluster.h + * + * --------------------------------------------------------------------------------------- + */ + +#ifndef INCLUDE_SS_DISASTER_CLUSTER_H_ +#define INCLUDE_SS_DISASTER_CLUSTER_H_ + +#include "postgres.h" +#include "replication/walprotocol.h" +#include "knl/knl_instance.h" + +/* stream cluster in share storage mode */ +#define SS_STREAM_CLUSTER \ + (ENABLE_DSS && g_instance.attr.attr_storage.ss_stream_cluster) + +/* Primary Cluster in SS disaster */ +#define SS_STREAM_PRIMARY_CLUSTER \ + (SS_STREAM_CLUSTER && (g_instance.dms_cxt.SSReformerControl.clusterRunMode == RUN_MODE_PRIMARY)) + +/* Standby Cluster in SS disaster */ +#define SS_STREAM_STANDBY_CLUSTER \ + (SS_STREAM_CLUSTER && (g_instance.dms_cxt.SSReformerControl.clusterRunMode == RUN_MODE_STANDBY)) + +/* Primary node in SS disaster, means primary node in main cluster. */ +#define SS_STREAM_PRIMARY_NODE \ + (SS_STREAM_PRIMARY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == PRIMARY_MODE)) + +/* Standby node in SS disaster, means standby node in main cluster. */ +#define SS_STREAM_PRIMARY_CLUSTER_STANDBY_NODE \ + (SS_STREAM_PRIMARY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE)) + +/* Main standby node in SS disaster, means primary node in standby cluster. */ +#define SS_STREAM_MAIN_STANDBY_NODE \ + (SS_STREAM_STANDBY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE)) + +/* Standby node in SS disaster, means standby node in standby cluster. */ +#define SS_STREAM_STANDBY_CLUSTER_STANDBY_NODE \ + (SS_STREAM_STANDBY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE)) + +/* All Standby in SS disaster, means nodes other than primary node in primary cluster and standby cluster */ +#define SS_STREAM_STANDBY_NODE \ + (SS_STREAM_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE)) + + +/* dorado replication cluster in share storage mode */ +const uint32 SS_DORADO_CTL_INFO_SIZE = 512; + +#define SS_DORADO_CLUSTER \ + (ENABLE_DMS && ENABLE_DSS && g_instance.attr.attr_storage.ss_enable_dorado) + +/* Primary Cluster in SS replication */ +#define SS_DORADO_PRIMARY_CLUSTER \ + (SS_DORADO_CLUSTER && (g_instance.dms_cxt.SSReformerControl.clusterRunMode == RUN_MODE_PRIMARY)) + +/* Standby Cluster in SS replication */ +#define SS_DORADO_STANDBY_CLUSTER \ + (SS_DORADO_CLUSTER && (g_instance.dms_cxt.SSReformerControl.clusterRunMode == RUN_MODE_STANDBY)) + +/* Primary node in SS replication, means primary node in main cluster. */ +#define SS_DORADO_PRIMARY_NODE \ + (SS_DORADO_PRIMARY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == PRIMARY_MODE)) + +/* Standby node in SS replication, means standby node in main cluster. */ +#define SS_DORADO_PRIMARY_CLUSTER_STANDBY_NODE \ + (SS_DORADO_PRIMARY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE)) + +/* Main standby node in SS replication, means primary node in standby cluster. */ +#define SS_DORADO_MAIN_STANDBY_NODE \ + (SS_DORADO_STANDBY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE)) + +/* Standby node in SS replication, means standby node in standby cluster. */ +#define SS_DORADO_STANDBY_CLUSTER_STANDBY_NODE \ + (SS_DORADO_STANDBY_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE)) + +/* All Standby in SS replication, means nodes other than primary node in primary cluster and standby cluster */ +#define SS_DORADO_STANDBY_NODE \ + (SS_DORADO_CLUSTER && (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE)) + +/* there are some same logic between ss dorado cluster and ss stream cluster */ +#define SS_DISASTER_CLUSTER (SS_DORADO_CLUSTER || SS_STREAM_CLUSTER) +#define SS_DISASTER_PRIMARY_CLUSTER (SS_DORADO_PRIMARY_CLUSTER || SS_STREAM_PRIMARY_CLUSTER) +#define SS_DISASTER_STANDBY_CLUSTER (SS_DORADO_STANDBY_CLUSTER || SS_STREAM_STANDBY_CLUSTER) +#define SS_DISASTER_PRIMARY_NODE (SS_DORADO_PRIMARY_NODE || SS_STREAM_PRIMARY_NODE) +#define SS_DISASTER_MAIN_STANDBY_NODE (SS_DORADO_MAIN_STANDBY_NODE || SS_STREAM_MAIN_STANDBY_NODE) + +void SSClusterDoradoStorageInit(); +void InitSSDoradoCtlInfo(ShareStorageXLogCtl *ctlInfo, uint64 sysidentifier); +void UpdateSSDoradoCtlInfoAndSync(); +void ReadSSDoradoCtlInfoFile(); +void CheckSSDoradoCtlInfo(XLogRecPtr localEnd); + +#endif // INCLUDE_SS_DISASTER_CLUSTER_H_ \ No newline at end of file diff --git a/src/test/regress/output/recovery_2pc_tools.source b/src/test/regress/output/recovery_2pc_tools.source index 7530f7330..29d73cd6b 100644 --- a/src/test/regress/output/recovery_2pc_tools.source +++ b/src/test/regress/output/recovery_2pc_tools.source @@ -339,6 +339,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c enable_sort | bool | | | enable_sortgroup_agg | bool | | | ss_enable_dorado | bool | | | + ss_stream_cluster | bool | | | enable_startwith_debug | bool | | | enable_stmt_track | bool | | | enable_stream_replication | bool | | |