From 62c19f20fda63d77d6888482838c7ca23f62b7fb Mon Sep 17 00:00:00 2001 From: chendong76 <1209756284@qq.com> Date: Thu, 21 Sep 2023 11:25:26 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E6=8C=89=E9=9C=80=E5=9B=9E?= =?UTF-8?q?=E6=94=BE=E4=B8=AD=EF=BC=8C=E6=9C=AA=E5=8D=87=E4=B8=BB=E5=A4=87?= =?UTF-8?q?=E6=9C=BAlsn=E6=A0=A1=E9=AA=8C=E5=A4=B1=E8=B4=A5=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98;=E5=88=A0=E9=99=A4=E6=8C=89=E9=9C=80?= =?UTF-8?q?=E5=9B=9E=E6=94=BE=E6=94=AF=E6=8C=81=E5=8D=87=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/backend/utils/init/globals.cpp | 1 - src/common/backend/utils/init/miscinit.cpp | 11 -- src/common/backend/utils/misc/guc-file.l | 1 - .../backend/utils/misc/guc/guc_storage.cpp | 14 +- .../ddes/adapter/ss_dms_bufmgr.cpp | 21 +-- .../ddes/adapter/ss_dms_callback.cpp | 9 +- .../ddes/adapter/ss_dms_recovery.cpp | 4 +- .../ddes/adapter/ss_reform_common.cpp | 132 ++---------------- .../ddes/adapter/ss_transaction.cpp | 29 ++++ .../process/threadpool/knl_instance.cpp | 1 + .../ondemand_extreme_rto/dispatcher.cpp | 13 +- .../storage/access/transam/xlog.cpp | 19 ++- src/gausskernel/storage/buffer/bufmgr.cpp | 17 ++- src/include/ddes/dms/ss_common_attr.h | 10 +- src/include/ddes/dms/ss_dms_recovery.h | 1 + src/include/ddes/dms/ss_reform_common.h | 2 +- src/include/ddes/dms/ss_transaction.h | 2 + src/include/miscadmin.h | 2 - 18 files changed, 105 insertions(+), 184 deletions(-) diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index 0de9b5986..ef40e5678 100644 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -80,7 +80,6 @@ const uint32 GRAND_VERSION_NUM = 92853; * 2.VERSION NUM FOR EACH FEATURE * Please write indescending order. ********************************************/ -const uint32 ONDEMAND_REDO_VERSION_NUM = 92851; const uint32 SRF_FUSION_VERSION_NUM = 92847; const uint32 INNER_UNIQUE_VERSION_NUM = 92845; const uint32 PARTITION_ENHANCE_VERSION_NUM = 92844; diff --git a/src/common/backend/utils/init/miscinit.cpp b/src/common/backend/utils/init/miscinit.cpp index f9d3fc622..f0c8a652d 100644 --- a/src/common/backend/utils/init/miscinit.cpp +++ b/src/common/backend/utils/init/miscinit.cpp @@ -2040,17 +2040,6 @@ void register_backend_version(uint32 backend_version){ } } -void SSUpgradeFileBeforeCommit() -{ - // upgrade reform control file - if (pg_atomic_read_u32(&WorkingGrandVersionNum) < ONDEMAND_REDO_VERSION_NUM) { - if (SS_PRIMARY_MODE) { - SSReadControlFile(REFORM_CTRL_PAGE); - SSSaveReformerCtrl(true); - } - } -} - /* * Check whether the version contains the backend_version parameter. */ diff --git a/src/common/backend/utils/misc/guc-file.l b/src/common/backend/utils/misc/guc-file.l index fc79beb87..3a7096b9c 100644 --- a/src/common/backend/utils/misc/guc-file.l +++ b/src/common/backend/utils/misc/guc-file.l @@ -330,7 +330,6 @@ ProcessConfigFile(GucContext context) case MASTER_THREAD: { if (strcmp(item->name, "upgrade_mode") == 0) { if (strcmp(pre_value, "0") != 0 && strcmp(post_value, "0") == 0) { - SSUpgradeFileBeforeCommit(); pg_atomic_write_u32(&WorkingGrandVersionNum, GRAND_VERSION_NUM); } } diff --git a/src/common/backend/utils/misc/guc/guc_storage.cpp b/src/common/backend/utils/misc/guc/guc_storage.cpp index 4c78a8ece..aa725b659 100755 --- a/src/common/backend/utils/misc/guc/guc_storage.cpp +++ b/src/common/backend/utils/misc/guc/guc_storage.cpp @@ -213,7 +213,6 @@ static bool check_ss_rdma_work_config(char** newval, void** extra, GucSource sou static bool check_ss_dss_vg_name(char** newval, void** extra, GucSource source); static bool check_ss_dss_conn_path(char** newval, void** extra, GucSource source); static bool check_ss_enable_ssl(bool* newval, void** extra, GucSource source); -static bool check_ss_enable_ondemand_recovery(bool* newval, void** extra, GucSource source); static void assign_ss_enable_aio(bool newval, void *extra); #ifdef USE_ASSERT_CHECKING static void assign_ss_enable_verify_page(bool newval, void *extra); @@ -1043,7 +1042,7 @@ static void InitStorageConfigureNamesBool() GUC_SUPERUSER_ONLY}, &g_instance.attr.attr_storage.dms_attr.enable_ondemand_recovery, false, - check_ss_enable_ondemand_recovery, + NULL, NULL, NULL}, @@ -5961,17 +5960,6 @@ static bool check_ss_enable_ssl(bool *newval, void **extra, GucSource source) return true; } -static bool check_ss_enable_ondemand_recovery(bool* newval, void** extra, GucSource source) -{ - if (*newval) { - if (pg_atomic_read_u32(&WorkingGrandVersionNum) < ONDEMAND_REDO_VERSION_NUM) { - ereport(ERROR, (errmsg("Do not allow enable ondemand_recovery if openGauss run in old version."))); - return false; - } - } - return true; -} - #ifdef USE_ASSERT_CHECKING static void assign_ss_enable_verify_page(bool newval, void *extra) { diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 5a5390600..c88576b80 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -483,16 +483,15 @@ Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, boo } // standby node must notify primary node for prepare lastest page in ondemand recovery - if (SS_STANDBY_ONDEMAND_RECOVERY) { - while (!SSOndemandRequestPrimaryRedo(buf_desc->tag)) { - SSReadControlFile(REFORM_CTRL_PAGE); - if (SS_STANDBY_ONDEMAND_NORMAL) { - break; // ondemand recovery finish, skip - } else if (SS_STANDBY_ONDEMAND_BUILD) { - return 0; // in new reform - } - // still need requset page + while (SS_STANDBY_ONDEMAND_NOT_NORMAL) { + /* in new reform */ + if (unlikely(SS_STANDBY_ONDEMAND_BUILD)) { + return 0; } + if (SSOndemandRequestPrimaryRedo(buf_desc->tag)) { + break; + } + SSReadControlFile(REFORM_CTRL_PAGE); } if (!StartReadPage(buf_desc, mode)) { @@ -506,7 +505,9 @@ bool SSOndemandRequestPrimaryRedo(BufferTag tag) dms_context_t dms_ctx; int32 redo_status = ONDEMAND_REDO_INVALID; - if (!SS_STANDBY_ONDEMAND_RECOVERY) { + if (unlikely(SS_STANDBY_ONDEMAND_BUILD)) { + return false; + } else if (SS_STANDBY_ONDEMAND_NORMAL) { return true; } diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 03db8a14d..0dc91c3a3 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -413,6 +413,7 @@ static int CBSaveStableList(void *db_handle, unsigned long long list_stable, uns unsigned long long list_in, unsigned int save_ctrl) { int primary_id = (int)reformer_id; + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); g_instance.dms_cxt.SSReformerControl.primaryInstId = primary_id; g_instance.dms_cxt.SSReformerControl.list_stable = list_stable; int ret = DMS_ERROR; @@ -422,7 +423,8 @@ static int CBSaveStableList(void *db_handle, unsigned long long list_stable, uns Assert(g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_PROMOTED || g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_FAILOVER_PROMOTING); } - SSSaveReformerCtrl(); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); Assert(g_instance.dms_cxt.SSReformerControl.primaryInstId == (int)primary_id); ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS %s] set current instance:%d as primary.", SS_PERFORMING_SWITCHOVER ? "switchover" : "reform", primary_id))); @@ -432,6 +434,7 @@ static int CBSaveStableList(void *db_handle, unsigned long long list_stable, uns } ret = DMS_SUCCESS; } else { /* we are on standby */ + LWLockRelease(ControlFileLock); ret = SetPrimaryIdOnStandby(primary_id); } return ret; @@ -1065,6 +1068,9 @@ static int32 CBProcessBroadcast(void *db_handle, dms_broadcast_context_t *broad_ case BCAST_CHECK_DB_BACKENDS: ret = SSCheckDbBackends(data, len, output_msg, output_msg_len); break; + case BCAST_RELOAD_REFORM_CTRL_PAGE: + ret = SSReloadReformCtrlPage(len); + break; default: ereport(WARNING, (errmodule(MOD_DMS), errmsg("invalid broadcast operate type"))); ret = DMS_ERROR; @@ -1790,7 +1796,6 @@ static int CBReformDoneNotify(void *db_handle) g_instance.dms_cxt.SSRecoveryInfo.startup_reform = false; g_instance.dms_cxt.SSRecoveryInfo.restart_failover_flag = false; g_instance.dms_cxt.SSRecoveryInfo.failover_ckpt_status = NOT_ACTIVE; - SSReadControlFile(REFORM_CTRL_PAGE); Assert(g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy == false); ereport(LOG, (errmodule(MOD_DMS), diff --git a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp index 72e26b91e..f6b224234 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp @@ -50,8 +50,10 @@ int SSGetPrimaryInstId() void SSSavePrimaryInstId(int id) { + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); g_instance.dms_cxt.SSReformerControl.primaryInstId = id; - SSSaveReformerCtrl(); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); } void SSWaitStartupExit() diff --git a/src/gausskernel/ddes/adapter/ss_reform_common.cpp b/src/gausskernel/ddes/adapter/ss_reform_common.cpp index addf898e9..061cbdd54 100644 --- a/src/gausskernel/ddes/adapter/ss_reform_common.cpp +++ b/src/gausskernel/ddes/adapter/ss_reform_common.cpp @@ -192,120 +192,13 @@ void SSGetRecoveryXlogPath() securec_check_ss(rc, "", ""); } -static void SSSaveOldReformerCtrl() -{ - ss_reformer_ctrl_t new_ctrl = g_instance.dms_cxt.SSReformerControl; - ss_old_reformer_ctrl_t old_ctrl = {new_ctrl.list_stable, new_ctrl.primaryInstId, new_ctrl.crc}; - - int len = sizeof(ss_old_reformer_ctrl_t); - int write_size = (int)BUFFERALIGN(len); - char buffer[write_size] __attribute__((__aligned__(ALIGNOF_BUFFER))) = { 0 }; - char *fname[2]; - int fd = -1; - - errno_t err = memcpy_s(&buffer, write_size, &old_ctrl, len); - securec_check(err, "\0", "\0"); - - INIT_CRC32C(((ss_old_reformer_ctrl_t *)buffer)->crc); - COMP_CRC32C(((ss_old_reformer_ctrl_t *)buffer)->crc, (char *)buffer, offsetof(ss_old_reformer_ctrl_t, crc)); - FIN_CRC32C(((ss_old_reformer_ctrl_t *)buffer)->crc); - - fname[0] = XLOG_CONTROL_FILE_BAK; - fname[1] = XLOG_CONTROL_FILE; - - for (int i = 0; i < BAK_CTRL_FILE_NUM; i++) { - if (i == 0) { - fd = BasicOpenFile(fname[i], O_CREAT | O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); - } else { - fd = BasicOpenFile(fname[i], O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); - } - - if (fd < 0) { - ereport(FATAL, (errcode_for_file_access(), errmsg("could not open control file \"%s\": %m", fname[i]))); - } - - SSWriteInstanceControlFile(fd, buffer, REFORM_CTRL_PAGE, write_size); - if (close(fd)) { - ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m"))); - } - } -} - -static bool SSReadOldReformerCtrl() -{ - ss_reformer_ctrl_t *new_ctrl = &g_instance.dms_cxt.SSReformerControl; - ss_old_reformer_ctrl_t old_ctrl; - pg_crc32c crc; - int fd = -1; - bool retry = false; - char *fname = XLOG_CONTROL_FILE; - -loop: - fd = BasicOpenFile(fname, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); - if (fd < 0) { - ereport(FATAL, (errcode_for_file_access(), errmsg("could not open control file \"%s\": %m", fname))); - } - - off_t seekpos = (off_t)BLCKSZ * REFORM_CTRL_PAGE; - int len = sizeof(ss_old_reformer_ctrl_t); - - int read_size = (int)BUFFERALIGN(len); - char buffer[read_size] __attribute__((__aligned__(ALIGNOF_BUFFER))); - if (pread(fd, buffer, read_size, seekpos) != read_size) { - ereport(PANIC, (errcode_for_file_access(), errmsg("could not read from control file: %m"))); - } - - errno_t rc = memcpy_s(&old_ctrl, len, buffer, len); - securec_check(rc, "", ""); - if (close(fd) < 0) { - ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m"))); - } - - /* Now check the CRC. */ - INIT_CRC32C(crc); - COMP_CRC32C(crc, (char *)&old_ctrl, offsetof(ss_old_reformer_ctrl_t, crc)); - FIN_CRC32C(crc); - - if (!EQ_CRC32C(crc, old_ctrl.crc)) { - if (retry == false) { - ereport(WARNING, - (errmsg("control file \"%s\" contains incorrect checksum in upgrade mode, try backup file", fname))); - fname = XLOG_CONTROL_FILE_BAK; - retry = true; - goto loop; - } else { - ereport(WARNING, - (errmsg("backup control file \"%s\" contains incorrect checksum in upgrade mode, " - "try again in post-upgrade mode", fname))); - return false; - } - } - - // new params set to initial value - new_ctrl->version = REFORM_CTRL_VERSION; - new_ctrl->recoveryInstId = INVALID_INSTANCEID; - new_ctrl->clusterStatus = CLUSTER_NORMAL; - - // exist param inherit - new_ctrl->primaryInstId = old_ctrl.primaryInstId; - new_ctrl->list_stable = old_ctrl.list_stable; - new_ctrl->crc = old_ctrl.crc; - - return true; -} - -void SSSaveReformerCtrl(bool force) +void SSUpdateReformerCtrl(bool force) { int fd = -1; int len; errno_t err = EOK; char *fname[2]; - if ((pg_atomic_read_u32(&WorkingGrandVersionNum) < ONDEMAND_REDO_VERSION_NUM) && !force) { - SSSaveOldReformerCtrl(); - return; - } - len = sizeof(ss_reformer_ctrl_t); int write_size = (int)BUFFERALIGN(len); char buffer[write_size] __attribute__((__aligned__(ALIGNOF_BUFFER))) = { 0 }; @@ -348,24 +241,12 @@ void SSReadControlFile(int id, bool updateDmsCtx) int read_size = 0; int len = 0; fname = XLOG_CONTROL_FILE; - - if ((pg_atomic_read_u32(&WorkingGrandVersionNum) < ONDEMAND_REDO_VERSION_NUM) && (id == REFORM_CTRL_PAGE)) { - if (SSReadOldReformerCtrl()) { - return; - } - - // maybe primary node already upgrade pg_control file, sleep and try read in lastest mode again - if (SS_STANDBY_MODE) { - pg_usleep(5000000); /* 5 sec */ - goto loop; - } else { - ereport(PANIC, (errmsg("incorrect checksum in control file"))); - } - } + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); loop: fd = BasicOpenFile(fname, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); if (fd < 0) { + LWLockRelease(ControlFileLock); ereport(FATAL, (errcode_for_file_access(), errmsg("could not open control file \"%s\": %m", fname))); } @@ -380,6 +261,7 @@ loop: read_size = (int)BUFFERALIGN(len); char buffer[read_size] __attribute__((__aligned__(ALIGNOF_BUFFER))); if (pread(fd, buffer, read_size, seekpos) != read_size) { + LWLockRelease(ControlFileLock); ereport(PANIC, (errcode_for_file_access(), errmsg("could not read from control file: %m"))); } @@ -387,6 +269,7 @@ loop: rc = memcpy_s(&g_instance.dms_cxt.SSReformerControl, len, buffer, len); securec_check(rc, "", ""); if (close(fd) < 0) { + LWLockRelease(ControlFileLock); ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m"))); } @@ -402,9 +285,11 @@ loop: retry = true; goto loop; } else { + LWLockRelease(ControlFileLock); ereport(FATAL, (errmsg("incorrect checksum in control file"))); } } + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status= g_instance.dms_cxt.SSReformerControl.clusterStatus; } else { ControlFileData* controlFile = NULL; ControlFileData tempControlFile; @@ -417,6 +302,7 @@ loop: rc = memcpy_s(controlFile, (size_t)len, buffer, (size_t)len); securec_check(rc, "", ""); if (close(fd) < 0) { + LWLockRelease(ControlFileLock); ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m"))); } @@ -432,6 +318,7 @@ loop: retry = true; goto loop; } else { + LWLockRelease(ControlFileLock); ereport(FATAL, (errmsg("incorrect checksum in control file"))); } } @@ -440,6 +327,7 @@ loop: g_instance.dms_cxt.ckptRedo = controlFile->checkPointCopy.redo; } } + LWLockRelease(ControlFileLock); } void SSClearSegCache() diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index 18c28f90d..12158116f 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -28,6 +28,7 @@ #include "storage/buf/bufmgr.h" #include "storage/smgr/segment_internal.h" #include "ddes/dms/ss_transaction.h" +#include "ddes/dms/ss_reform_common.h" #include "ddes/dms/ss_dms_bufmgr.h" #include "storage/sinvaladt.h" @@ -316,6 +317,16 @@ bool SSGetOldestXminFromAllStandby() return true; } +int SSReloadReformCtrlPage(uint32 len) +{ + if (unlikely(len != sizeof(SSBroadcastCmdOnly))) { + return DMS_ERROR; + } + + SSReadControlFile(REFORM_CTRL_PAGE); + return DMS_SUCCESS; +} + int SSCheckDbBackends(char *data, uint32 len, char *output_msg, uint32 *output_msg_len) { if (unlikely(len != sizeof(SSBroadcastDbBackends))) { @@ -367,6 +378,24 @@ bool SSCheckDbBackendsFromAllStandby(Oid dbid) return false; } +void SSRequestAllStandbyReloadReformCtrlPage() +{ + dms_context_t dms_ctx; + InitDmsContext(&dms_ctx); + int ret; + SSBroadcastCmdOnly ssmsg; + ssmsg.type = BCAST_RELOAD_REFORM_CTRL_PAGE; + do { + ret = dms_broadcast_msg(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastCmdOnly), + (unsigned char)false, SS_BROADCAST_WAIT_ONE_SECOND); + + if (ret == DMS_SUCCESS) { + return; + } + pg_usleep(5000L); + } while (ret != DMS_SUCCESS); +} + void SSSendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n) { dms_context_t dms_ctx; diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index 4e4da180a..1b010de53 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -181,6 +181,7 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt) dms_cxt->SSReformInfo.dms_role = DMS_ROLE_UNKNOW; dms_cxt->SSClusterState = NODESTATE_NORMAL; dms_cxt->SSRecoveryInfo.recovery_inst_id = INVALID_INSTANCEID; + dms_cxt->SSRecoveryInfo.cluster_ondemand_status= CLUSTER_NORMAL; dms_cxt->SSRecoveryInfo.recovery_pause_flag = true; dms_cxt->SSRecoveryInfo.failover_ckpt_status = NOT_ACTIVE; dms_cxt->SSRecoveryInfo.new_primary_reset_walbuf_flag = false; diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp index 148b31344..3eea22256 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp @@ -77,6 +77,7 @@ #include "utils/atomic.h" #include "pgstat.h" #include "ddes/dms/ss_reform_common.h" +#include "ddes/dms/ss_transaction.h" #ifdef PGXC #include "pgxc/pgxc.h" @@ -1905,14 +1906,18 @@ void WaitRedoFinish() pmState = PM_RUN; write_stderr_with_prefix("[On-demand] LOG: database system is ready to accept connections"); + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status= CLUSTER_IN_ONDEMAND_REDO; + /* for other nodes in cluster */ + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_IN_ONDEMAND_REDO; + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + SSRequestAllStandbyReloadReformCtrlPage(); + SpinLockAcquire(&t_thrd.shemem_ptr_cxt.XLogCtl->info_lck); t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandBuildDone = true; SpinLockRelease(&t_thrd.shemem_ptr_cxt.XLogCtl->info_lck); - /* for other nodes in cluster */ - g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_IN_ONDEMAND_RECOVERY; - SSSaveReformerCtrl(); - #ifdef USE_ASSERT_CHECKING XLogRecPtr minStart = MAX_XLOG_REC_PTR; XLogRecPtr minEnd = MAX_XLOG_REC_PTR; diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 2c6ced50a..ba0e91e0d 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -152,6 +152,7 @@ #include "vectorsonic/vsonichash.h" #include "ddes/dms/ss_reform_common.h" +#include "ddes/dms/ss_transaction.h" #include "ddes/dms/ss_dms_recovery.h" #include "ddes/dms/ss_dms_bufmgr.h" #include "storage/file/fio_device.h" @@ -9494,10 +9495,14 @@ void StartupXLOG(void) t_thrd.xlog_cxt.InRecovery == true) { if (SSOndemandRecoveryExitNormal) { g_instance.dms_cxt.SSRecoveryInfo.in_ondemand_recovery = true; + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status= CLUSTER_IN_ONDEMAND_BUILD; /* for other nodes in cluster and ondeamnd recovery failed */ + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_IN_ONDEMAND_BUILD; g_instance.dms_cxt.SSReformerControl.recoveryInstId = g_instance.dms_cxt.SSRecoveryInfo.recovery_inst_id; - SSSaveReformerCtrl(); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + SSRequestAllStandbyReloadReformCtrlPage(); SetOndemandExtremeRtoMode(); ereport(LOG, (errmsg("[On-demand] replayed in extreme rto ondemand recovery mode"))); } else { @@ -10060,8 +10065,12 @@ void StartupXLOG(void) ereport(LOG, (errmsg("redo is not required"))); if (SS_IN_ONDEMAND_RECOVERY) { g_instance.dms_cxt.SSRecoveryInfo.in_ondemand_recovery = false; + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status= CLUSTER_NORMAL; + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_NORMAL; - SSSaveReformerCtrl(); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + SSRequestAllStandbyReloadReformCtrlPage(); } } } @@ -10559,9 +10568,13 @@ void StartupXLOG(void) } if (SS_PRIMARY_MODE) { + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status= CLUSTER_NORMAL; /* for other nodes in cluster */ + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_NORMAL; - SSSaveReformerCtrl(); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + SSRequestAllStandbyReloadReformCtrlPage(); } ereport(LOG, (errmsg("redo done, nextXid: " XID_FMT ", startupMaxXid: " XID_FMT ", recentLocalXmin: " XID_FMT diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index e3789398c..3891ad039 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -2415,16 +2415,15 @@ found_branch: /* DMS: Try get page remote */ if (ENABLE_DMS) { // standby node must notify primary node for prepare lastest page in ondemand recovery - if (SS_STANDBY_ONDEMAND_RECOVERY) { - while (!SSOndemandRequestPrimaryRedo(bufHdr->tag)) { - SSReadControlFile(REFORM_CTRL_PAGE); - if (SS_STANDBY_ONDEMAND_NORMAL) { - break; // ondemand recovery finish, skip - } else if (SS_STANDBY_ONDEMAND_BUILD) { - return 0; // in new reform - } - // still need requset page + while (SS_STANDBY_ONDEMAND_NOT_NORMAL) { + /* in new reform */ + if (unlikely(SS_STANDBY_ONDEMAND_BUILD)) { + return 0; } + if (SSOndemandRequestPrimaryRedo(bufHdr->tag)) { + break; + } + SSReadControlFile(REFORM_CTRL_PAGE); } MarkReadHint(bufHdr->buf_id, relpersistence, isExtend, pblk); if (mode != RBM_FOR_REMOTE && relpersistence != RELPERSISTENCE_TEMP && !isLocalBuf) { diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index f70811c4c..61f144ee3 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -144,16 +144,17 @@ (SS_NORMAL_STANDBY && (g_instance.attr.attr_storage.xlog_file_path != 0)) #define SS_CLUSTER_ONDEMAND_NOT_NORAML \ - (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus != CLUSTER_NORMAL)) + (ENABLE_DMS && (g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status!= CLUSTER_NORMAL)) #define SS_CLUSTER_ONDEMAND_BUILD \ - (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_IN_ONDEMAND_BUILD)) + (ENABLE_DMS && (g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status== CLUSTER_IN_ONDEMAND_BUILD)) #define SS_CLUSTER_ONDEMAND_RECOVERY \ - (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_IN_ONDEMAND_RECOVERY)) + (ENABLE_DMS && (g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status== CLUSTER_IN_ONDEMAND_REDO)) #define SS_CLUSTER_ONDEMAND_NORMAL \ - (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_NORMAL)) + (ENABLE_DMS && (g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status== CLUSTER_NORMAL)) #define SS_STANDBY_ONDEMAND_BUILD (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_BUILD) #define SS_STANDBY_ONDEMAND_RECOVERY (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_RECOVERY) #define SS_STANDBY_ONDEMAND_NORMAL (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_NORMAL) +#define SS_STANDBY_ONDEMAND_NOT_NORMAL (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_NOT_NORAML) /* DMS_BUF_NEED_LOAD */ #define BUF_NEED_LOAD 0x1 @@ -209,6 +210,7 @@ typedef enum SSBroadcastOp { BCAST_DDLLOCKRELEASE, BCAST_DDLLOCKRELEASE_ALL, BCAST_CHECK_DB_BACKENDS, + BCAST_RELOAD_REFORM_CTRL_PAGE, BCAST_END } SSBroadcastOp; diff --git a/src/include/ddes/dms/ss_dms_recovery.h b/src/include/ddes/dms/ss_dms_recovery.h index 98d210b17..08a39145a 100644 --- a/src/include/ddes/dms/ss_dms_recovery.h +++ b/src/include/ddes/dms/ss_dms_recovery.h @@ -75,6 +75,7 @@ typedef struct ss_recovery_info { volatile failover_ckpt_status_t failover_ckpt_status; char recovery_xlog_dir[MAXPGPATH]; int recovery_inst_id; + volatile SSGlobalClusterState cluster_ondemand_status; LWLock* update_seg_lock; bool new_primary_reset_walbuf_flag; bool ready_to_startup; // when DB start (except failover), the flag will set true diff --git a/src/include/ddes/dms/ss_reform_common.h b/src/include/ddes/dms/ss_reform_common.h index a79c1378d..119831923 100644 --- a/src/include/ddes/dms/ss_reform_common.h +++ b/src/include/ddes/dms/ss_reform_common.h @@ -43,7 +43,7 @@ int SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XL int readLen); XLogReaderState *SSXLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data, Size alignedSize); void SSGetRecoveryXlogPath(); -void SSSaveReformerCtrl(bool force = false); +void SSUpdateReformerCtrl(); void SSReadControlFile(int id, bool updateDmsCtx = false); void SSClearSegCache(); int SSCancelTransactionOfAllStandby(SSBroadcastOp type); diff --git a/src/include/ddes/dms/ss_transaction.h b/src/include/ddes/dms/ss_transaction.h index ccdc67af9..c03bd23e6 100644 --- a/src/include/ddes/dms/ss_transaction.h +++ b/src/include/ddes/dms/ss_transaction.h @@ -111,5 +111,7 @@ int SSProcessDropSegSpace(char *data, uint32 len); int SSCheckDbBackends(char *data, uint32 len, char *output_msg, uint32 *output_msg_len); int SSCheckDbBackendsAck(char *data, unsigned int len); bool SSCheckDbBackendsFromAllStandby(Oid dbid); +int SSReloadReformCtrlPage(uint32 len); +void SSRequestAllStandbyReloadReformCtrlPage(); #endif diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 6aa4be628..9dbce0e30 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -37,7 +37,6 @@ /***************************************************************************** * Backend version and inplace upgrade staffs *****************************************************************************/ -extern const uint32 ONDEMAND_REDO_VERSION_NUM; extern const uint32 SRF_FUSION_VERSION_NUM; extern const uint32 INNER_UNIQUE_VERSION_NUM; extern const uint32 PARTITION_ENHANCE_VERSION_NUM; @@ -132,7 +131,6 @@ extern const uint32 CREATE_TABLE_AS_VERSION_NUM; extern void register_backend_version(uint32 backend_version); extern bool contain_backend_version(uint32 version_number); -extern void SSUpgradeFileBeforeCommit(); #define INPLACE_UPGRADE_PRECOMMIT_VERSION 1