diff --git a/src/common/backend/utils/misc/guc/guc_storage.cpp b/src/common/backend/utils/misc/guc/guc_storage.cpp index 60858ca48..ff7df7c0c 100755 --- a/src/common/backend/utils/misc/guc/guc_storage.cpp +++ b/src/common/backend/utils/misc/guc/guc_storage.cpp @@ -117,6 +117,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/dcf_replication.h" +#include "replication/ss_disaster_cluster.h" #include "storage/buf/bufmgr.h" #include "storage/cucache_mgr.h" #include "storage/smgr/fd.h" @@ -155,6 +156,7 @@ #include "access/ustore/knl_undoworker.h" #include "ddes/dms/ss_init.h" #include "ddes/dms/ss_dms.h" +#include "ddes/dms/ss_transaction.h" #include "storage/dss/dss_log.h" #define atooid(x) ((Oid)strtoul((x), NULL, 10)) @@ -7002,6 +7004,32 @@ static void assign_dcf_log_backup_file_count(int newval, void* extra) static void assign_dcf_flow_control_rto(int newval, void *extra) { + int oldval = u_sess->attr.attr_storage.target_rto; + // used in realtime-build log ctrl + if (ENABLE_DMS && !SS_DISASTER_CLUSTER && t_thrd.proc_cxt.MyProcPid == PostmasterPid) { + // clean realtime-build log ctrl cache, before realtime-build log ctrl enable. + if (g_instance.dms_cxt.dmsInited && SS_PRIMARY_MODE && oldval == 0 && newval > 0) { + g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = false; + SpinLockInit(&g_instance.dms_cxt.SSRecoveryInfo.sleepTimeSyncLock); + g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime = 0; + errno_t rc = memset_s(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl, + sizeof(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl), + 0, + sizeof(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl)); + securec_check(rc, "", ""); + } + g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto = newval; + // make realtime-build logctrl disable, when recovery_time_target is set to 0; + if (g_instance.dms_cxt.dmsInited && SS_PRIMARY_MODE && oldval > 0 && newval == 0) { + g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = false; + } + + // notify nodes, start or stop realtime-build log ctrl. + if (g_instance.dms_cxt.dmsInited && SS_PRIMARY_MODE && !SS_IN_REFORM && + ((oldval == 0 && newval > 0) || (oldval > 0 && newval == 0))) { + SSBroadcastRealtimeBuildLogCtrlEnable(true); + } + } if (t_thrd.proc_cxt.MyProcPid == PostmasterPid) { dcf_set_param("DN_FLOW_CONTROL_RTO", std::to_string(newval).c_str()); } diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index faaa31e38..63e59b873 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -1328,6 +1328,12 @@ static int32 CBProcessBroadcast(void *db_handle, dms_broadcast_context_t *broad_ case BCAST_RELOAD_REFORM_CTRL_PAGE: ret = SSReloadReformCtrlPage(len); break; + case BCAST_REALTIME_BUILD_LOG_CTRL_ENABLE: + ret = SSUpdateRealtimeBuildLogCtrl(data, len); + break; + case BCAST_REPORT_REALTIME_BUILD_PTR: + ret = SSGetStandbyRealtimeBuildPtr(data, len); + break; default: ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS] invalid broadcast operate type"))); ret = DMS_ERROR; @@ -1871,7 +1877,7 @@ static void FailoverCleanBackends() /* check and print some thread which no exit. */ int backendNum = SSCountAndPrintChildren(BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC); ereport (WARNING, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] there are %d backends can not exit! " - "wait_time = %ds", backendNum, wait_time / FAILOVER_TIME_CONVERT))); + "wait_time = %lds", backendNum, wait_time / FAILOVER_TIME_CONVERT))); if (dms_reform_failed()) { ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] reform failed during clean backends"))); @@ -1883,6 +1889,25 @@ static void FailoverCleanBackends() } } +static void RestartRealtimeBuildCtrl() +{ + if (SS_IN_REFORM && g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl) { + ereport(LOG, (errmsg("[SS reform][On-demand] reform happened, disable realtime build log ctrl, " + "and will make it enable again after reform if needed."))); + } + g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = false; + SpinLockInit(&g_instance.dms_cxt.SSRecoveryInfo.sleepTimeSyncLock); + g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime = 0; + errno_t rc = memset_s(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl, + sizeof(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl), + 0, + sizeof(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl)); + securec_check(rc, "", ""); + if (SS_PRIMARY_MODE && ENABLE_REALTIME_BUILD_TARGET_RTO) { + SSBroadcastRealtimeBuildLogCtrlEnable(false); + } +} + static int reform_type_str_len = 30; static void ReformTypeToString(dms_reform_type_t reform_type, char* ret_str) { @@ -2011,18 +2036,18 @@ static void CBReformStartNotify(void *db_handle, dms_reform_start_context_t *rs_ ReformTypeToString(reform_info->reform_type, reform_type_str); ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] reform start, role:%d, reform type:SS %s, standby scenario:%d, " - "bitmap_reconnect:%d, reform_ver:%ld.", + "bitmap_reconnect:%llu, reform_ver:%ld.", reform_info->dms_role, reform_type_str, SSPerformingStandbyScenario(), reform_info->bitmap_reconnect, reform_info->reform_ver))); if (reform_info->dms_role == DMS_ROLE_REFORMER) { SSGrantDSSWritePermission(); } - int old_primary = SSGetPrimaryInstId(); SSReadControlFile(old_primary, true); g_instance.dms_cxt.SSReformInfo.old_bitmap = g_instance.dms_cxt.SSReformerControl.list_stable; ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] old cluster node bitmap: %lu", g_instance.dms_cxt.SSReformInfo.old_bitmap))); + g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = false; if (g_instance.dms_cxt.SSRecoveryInfo.in_failover) { FailoverCleanBackends(); } else if (SSBackendNeedExitScenario()) { @@ -2075,18 +2100,21 @@ static int CBReformDoneNotify(void *db_handle) errmsg("[SS reform] Reform success, instance:%d is running.", g_instance.attr.attr_storage.dms_attr.instance_id))); + if (ENABLE_REALTIME_BUILD_TARGET_RTO && SS_PRIMARY_MODE) { + RestartRealtimeBuildCtrl(); + } /* reform success indicates that reform of primary and standby all complete, then update gaussdb.state */ g_instance.dms_cxt.dms_status = (dms_status_t)DMS_STATUS_IN; SendPostmasterSignal(PMSIGNAL_DMS_REFORM_DONE); g_instance.dms_cxt.SSClusterState = NODESTATE_NORMAL; g_instance.dms_cxt.SSRecoveryInfo.realtime_build_in_reform = false; g_instance.dms_cxt.SSReformInfo.in_reform = false; - - ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] reform done: pmState=%d, SSClusterState=%d, demotion=%d-%d, " - "rec=%d, dmsStatus=%d.", pmState, g_instance.dms_cxt.SSClusterState, - g_instance.demotion, t_thrd.walsender_cxt.WalSndCtl->demotion, - t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery, - g_instance.dms_cxt.dms_status))); + + ereport(LOG, (errmodule(MOD_DMS), + errmsg("[SS reform] reform done: pmState=%d, SSClusterState=%d, demotion=%d-%d, " + "rec=%d, dmsStatus=%d.", pmState, g_instance.dms_cxt.SSClusterState, + g_instance.demotion, t_thrd.walsender_cxt.WalSndCtl->demotion, + t_thrd.xlog_cxt.InRecovery, g_instance.dms_cxt.dms_status))); return GS_SUCCESS; } diff --git a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp index 9f96dbdba..e7bd7706e 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp @@ -39,6 +39,7 @@ #include "ddes/dms/ss_dms_recovery.h" #include "ddes/dms/ss_reform_common.h" #include "ddes/dms/ss_transaction.h" +#include "access/ondemand_extreme_rto/dispatcher.h" #include "access/double_write.h" #include "access/twophase.h" #include @@ -293,7 +294,7 @@ XLogRecPtr SSOndemandRequestPrimaryCkptAndGetRedoLsn() if (dms_req_opengauss_immediate_checkpoint(&dms_ctx, (unsigned long long *)&primaryRedoLsn) == GS_SUCCESS) { ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[SS][On-demand] request primary node %d checkpoint success, redoLoc %X/%X", SS_PRIMARY_ID, - (uint32)(primaryRedoLsn << 32), (uint32)primaryRedoLsn))); + (uint32)(primaryRedoLsn >> 32), (uint32)primaryRedoLsn))); return primaryRedoLsn; } ereport(DEBUG1, (errmodule(MOD_DMS), diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index a83491d28..e57521685 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -28,6 +28,7 @@ #include "storage/procarray.h" #include "storage/buf/bufmgr.h" #include "storage/smgr/segment_internal.h" +#include "access/multi_redo_api.h" #include "ddes/dms/ss_transaction.h" #include "ddes/dms/ss_reform_common.h" #include "ddes/dms/ss_dms_bufmgr.h" @@ -1134,4 +1135,144 @@ bool SSGetOldestXminFromAllStandby(TransactionId xmin, TransactionId xmax, Commi return false; } } while (ret != DMS_SUCCESS); +} + +/* broadcast to standby node update realtime-build logctrl enable */ +void SSBroadcastRealtimeBuildLogCtrlEnable(bool canncelInReform) +{ + dms_context_t dms_ctx; + InitDmsContext(&dms_ctx); + int ret; + SSBroadcastRealtimeBuildLogCtrl logCtrl; + logCtrl.type = BCAST_REALTIME_BUILD_LOG_CTRL_ENABLE; + logCtrl.enableLogCtrl = ENABLE_REALTIME_BUILD_TARGET_RTO; + dms_broadcast_info_t dms_broad_info = { + .data = (char *)&logCtrl, + .len = sizeof(SSBroadcastRealtimeBuildLogCtrl), + .output = NULL, + .output_len = NULL, + .scope = DMS_BROADCAST_ONLINE_LIST, + .inst_map = 0, + .timeout = SS_BROADCAST_WAIT_ONE_SECOND, + .handle_recv_msg = (unsigned char)false, + .check_session_kill = (unsigned char)true + }; + + do { + ret = dms_broadcast_msg(&dms_ctx, &dms_broad_info); + if (ret == DMS_SUCCESS || (canncelInReform && SS_IN_REFORM)) { + break; + } + pg_usleep(5000L); + } while (ret != DMS_SUCCESS); + if (ret == DMS_SUCCESS) { + ereport(LOG, + (errmsg("[SS reform][On-demand] notify standby node update realtime-build log ctrl enable success, " + "enableLogCtrl: %d", logCtrl.enableLogCtrl))); + } +} + +int SSUpdateRealtimeBuildLogCtrl(char* data, uint32 len) +{ + if (unlikely(len != sizeof(SSBroadcastRealtimeBuildLogCtrl))) { + return DMS_ERROR; + } + if (!ENABLE_ONDEMAND_REALTIME_BUILD) { + return DMS_SUCCESS; + } + SSBroadcastRealtimeBuildLogCtrl *logCtrlEnable = (SSBroadcastRealtimeBuildLogCtrl *)data; + g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = logCtrlEnable->enableLogCtrl; + ereport(LOG, (errmodule(MOD_DMS), + errmsg("[On-demand] Update standby realtime-build log ctrl %s, " + "enableLogCtrl: %s, enable_ondemand_realtime_build: true.", + logCtrlEnable->enableLogCtrl ? "enable" : "disable", + logCtrlEnable->enableLogCtrl ? "true" : "false"))); + return DMS_SUCCESS; +} + +/* report realtime-build ptr to primary */ +bool SSReportRealtimeBuildPtr(XLogRecPtr realtimeBuildPtr) +{ + if (!SS_STANDBY_ENABLE_TARGET_RTO) { + return false; + } + dms_context_t dms_ctx; + InitDmsContext(&dms_ctx); + int ret; + SSBroadcastRealtimeBuildPtr reportMessage; + reportMessage.type = BCAST_REPORT_REALTIME_BUILD_PTR; + reportMessage.realtimeBuildPtr = realtimeBuildPtr; + reportMessage.srcInstId = SS_MY_INST_ID; + dms_broadcast_info_t dms_broad_info = { + .data = (char *)&reportMessage, + .len = sizeof(SSBroadcastRealtimeBuildPtr), + .output = NULL, + .output_len = NULL, + .scope = DMS_BROADCAST_SPECIFY_LIST, + .inst_map = (unsigned long long)1 << SS_PRIMARY_ID, + .timeout = SS_BROADCAST_WAIT_ONE_SECOND, + .handle_recv_msg = (unsigned char)false, + .check_session_kill = (unsigned char)true + }; + + do { + ret = dms_broadcast_msg(&dms_ctx, &dms_broad_info); + if (ret == DMS_SUCCESS) { + break; + } + if (!SS_STANDBY_ENABLE_TARGET_RTO) { + break; + } + pg_usleep(5000L); + } while (ret != DMS_SUCCESS); + return ret == DMS_SUCCESS; +} + +int SSGetStandbyRealtimeBuildPtr(char* data, uint32 len) +{ + if (unlikely(len != sizeof(SSBroadcastRealtimeBuildPtr))) { + return DMS_ERROR; + } + if (!ENABLE_REALTIME_BUILD_TARGET_RTO || !SS_PRIMARY_MODE || SS_IN_REFORM) { + return DMS_SUCCESS; + } + SSBroadcastRealtimeBuildPtr *receiveMessage = (SSBroadcastRealtimeBuildPtr *)data; + XLogRecPtr realtimePtr = receiveMessage->realtimeBuildPtr; + int srcId = receiveMessage->srcInstId; + + if (!g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl) { + g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = true; + ereport(LOG, (errmodule(MOD_DMS), + errmsg("[SS][On-demand] Get realtime-build ptr from standby inst_id: %d," + "enable realtime-build log ctrl, replayEndRecPtr: %X/%X", + SS_PRIMARY_ID, (uint32)(realtimePtr >> 32), (uint32)realtimePtr))); + } + realtime_build_ctrl_t *rtBuildCtrl = &g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId]; + + // If the time interval betwen two reply < 100 ms, ignore this reply. + TimestampTz currentTime = GetCurrentTimestamp(); + if (SSLogCtrlCalculateTimeDiff(rtBuildCtrl->replyTime, currentTime) <= MIN_REPLY_MILLISEC_TIME_DIFF) { + return DMS_SUCCESS; + } + rtBuildCtrl->prevReplyTime = rtBuildCtrl->replyTime; + rtBuildCtrl->replyTime = currentTime; + rtBuildCtrl->prevBuildPtr = rtBuildCtrl->realtimeBuildPtr; + rtBuildCtrl->realtimeBuildPtr = realtimePtr; + if (rtBuildCtrl->prevBuildPtr > rtBuildCtrl->realtimeBuildPtr) { + ereport(WARNING, (errmodule(MOD_DMS), + errmsg("[SS][On-demand] Get realtimeBuild lsn from standby node %d is less than prevBuildPtr, " + " prevBuildPtr: %X/%X, realtime-build ptr: %X/%X.", srcId, + (uint32)(rtBuildCtrl->prevBuildPtr >> 32), (uint32)rtBuildCtrl->prevBuildPtr, + (uint32)(rtBuildCtrl->realtimeBuildPtr >> 32), (uint32)rtBuildCtrl->realtimeBuildPtr))); + } else if (rtBuildCtrl->prevBuildPtr != InvalidXLogRecPtr) { + ereport(DEBUG4, (errmodule(MOD_DMS), + errmsg("[SS][On-demand] Get realtimeBuild lsn from standby inst_id %d, " + " prevBuildPtr: %X/%X, realtime-build ptr: %X/%X.", srcId, + (uint32)(rtBuildCtrl->prevBuildPtr >> 32), (uint32)rtBuildCtrl->prevBuildPtr, + (uint32)(rtBuildCtrl->realtimeBuildPtr >> 32), (uint32)rtBuildCtrl->realtimeBuildPtr))); + if (rtBuildCtrl->prevBuildPtr != InvalidXLogRecPtr) { + SSRealtimebuildLogCtrl(srcId); + } + } + return DMS_SUCCESS; } \ No newline at end of file diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index 3d79fe62b..273f5963f 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -215,7 +215,14 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt) dms_cxt->SSRecoveryInfo.disaster_cluster_promoting = false; dms_cxt->SSRecoveryInfo.dorado_sharestorage_inited = false; dms_cxt->SSRecoveryInfo.ondemand_recovery_pause_status = NOT_PAUSE; - dms_cxt->SSRecoveryInfo.in_ondemand_recovery = false; + dms_cxt->SSRecoveryInfo.enableRealtimeBuildLogCtrl = false; + dms_cxt->SSRecoveryInfo.globalSleepTime = 0; + dms_cxt->SSRecoveryInfo.sleepTimeSyncLock = (slock_t)0; + errno_t rc = memset_s(dms_cxt->SSRecoveryInfo.rtBuildCtrl, + sizeof(dms_cxt->SSRecoveryInfo.rtBuildCtrl), + 0, + sizeof(dms_cxt->SSRecoveryInfo.rtBuildCtrl)); + securec_check(rc, "", ""); dms_cxt->log_timezone = NULL; pg_atomic_init_u32(&dms_cxt->inDmsThreShmemInitCnt, 0); pg_atomic_init_u32(&dms_cxt->inProcExitCnt, 0); diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp index 823e2f7cf..78154bba2 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp @@ -110,6 +110,9 @@ static const int invalid_worker_id = -1; static const int UNDO_START_BLK = 1; static const int UHEAP_UPDATE_UNDO_START_BLK = 2; +// unit: ms +static const int LOG_CTRL_REPORT_TIME_INTERVAL = 200; + struct ControlFileData restoreControlFile; typedef void *(*GetStateFunc)(PageRedoWorker *worker); @@ -176,6 +179,7 @@ void CopyDataFromOldReader(XLogReaderState *newReaderState, const XLogReaderStat void SendSingalToPageWorker(int signal); static void RestoreControlFileForRealtimeBuild(); +static void LogCtrlReportRealtimeBuildPtr(); /* dispatchTable must consistent with RmgrTable */ static const RmgrDispatchData g_dispatchTable[RM_MAX_ID + 1] = { @@ -439,6 +443,9 @@ void HandleStartupInterruptsForExtremeRto() if (SS_STANDBY_FAILOVER && pmState == PM_STARTUP && SS_ONDEMAND_REALTIME_BUILD_NORMAL) { OndemandRealtimeBuildHandleFailover(); } + if (SS_STANDBY_ENABLE_TARGET_RTO) { + LogCtrlReportRealtimeBuildPtr(); + } } static void SetOndemandXLogParseFlagValue(uint32 maxParseBufNum) @@ -2506,4 +2513,32 @@ static void RestoreControlFileForRealtimeBuild() { LWLockRelease(ControlFileLock); } +/* Startup report realtime-build ptr to primary node for realtime-build log ctrl. */ +static void LogCtrlReportRealtimeBuildPtr() +{ + TimestampTz currentTime = GetCurrentTimestamp(); + if (SSLogCtrlCalculateTimeDiff(g_dispatcher->reportTime, currentTime) < LOG_CTRL_REPORT_TIME_INTERVAL) { + return; + } + XLogRecPtr minEnd = t_thrd.shemem_ptr_cxt.XLogCtl->replayEndRecPtr; + if (minEnd == InvalidXLogRecPtr || !SS_STANDBY_ENABLE_TARGET_RTO) { + g_dispatcher->reportTime = currentTime; + return; + } + ereport(DEBUG4, (errmodule(MOD_DMS), + errmsg("[SS][On-demand] send primary node %d realtime-build ptr start, realtime-build %X/%X", + SS_PRIMARY_ID, (uint32)(minEnd >> 32), (uint32)minEnd))); + + if (SSReportRealtimeBuildPtr(minEnd)) { + ereport(DEBUG4, (errmodule(MOD_DMS), + errmsg("[SS][On-demand] send primary node %d realtime-build ptr success, " + "realtime-build ptr: %X/%X", SS_PRIMARY_ID, + (uint32)(minEnd >> 32), (uint32)minEnd))); + } else { + ereport(DEBUG4, (errmodule(MOD_DMS), + errmsg("[SS][On-demand] send primary node %d realtime-build ptr fail, realtime-build ptr: %X/%X", + SS_PRIMARY_ID, (uint32)(minEnd >> 32), (uint32)minEnd))); + } + g_dispatcher->reportTime = GetCurrentTimestamp(); +} } // namespace ondemand_extreme_rto diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp index 2b8f42530..2a4fc1d0f 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp @@ -30,6 +30,7 @@ #include "ddes/dms/ss_dms_bufmgr.h" #include "storage/lock/lwlock.h" #include "catalog/storage_xlog.h" +#include "replication/syncrep.h" void PrintXLogRecParseStateBlockHead(XLogRecParseState* blockState); @@ -772,4 +773,233 @@ bool OndemandAllowBufAccess() return true; } return false; -} \ No newline at end of file +} + +static const int MICROSECONDS_PER_SECONDS = 1000000; +static const int MILLISECONDS_PER_SECONDS = 1000; +static const int MILLISECONDS_PER_MICROSECONDS = 1000; +static const int SHIFT_SPEED = 3; +static const int CALCULATE_INTERVAL_MILLISECONDS = 2000; +static const uint64 MIN_BUILD_SPEED = 1; +static const int NEEDS_LARGE_RANGE = 60; +static const int LONG_LOG_CTRL_SLEEP_MICROSECONDS = 1500000; +static const int SHORT_LOG_CTRL_SLEEP_MICROSECONDS = 1000000; +static const int LOG_CTRL_REPORT_TIME_INTERVAL = 200; + +long SSLogCtrlCalculateTimeDiff(TimestampTz startTime, TimestampTz endTime) +{ + long secToTime; + int microsecToTime; + TimestampDifference(startTime, endTime, &secToTime, µsecToTime); + return secToTime * MILLISECONDS_PER_SECONDS + + microsecToTime / MILLISECONDS_PER_MICROSECONDS; +} + +bool IsRealtimeBuildRtoOverTarget(int srcId) +{ + realtime_build_ctrl_t *rtBuildCtrl = &g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId]; + if (SS_PRIMARY_ENABLE_TARGET_RTO && + rtBuildCtrl->currentRTO > g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto) { + return true; + } + return false; +} + +static inline uint64 LogCtrlCountBigSpeed(uint64 originSpeed, uint64 curSpeed) +{ + uint64 updateSpeed = (((originSpeed << SHIFT_SPEED) - originSpeed) >> SHIFT_SPEED) + curSpeed; + return updateSpeed; +} + +static bool ReplyMessageCheck(int srcId) +{ + realtime_build_ctrl_t *rtBuildCtrl = &g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId]; + bool checkResult = true; + if (XLByteLT(rtBuildCtrl->realtimeBuildPtr, rtBuildCtrl->prevBuildPtr) || + rtBuildCtrl->prevReplyTime >= rtBuildCtrl->replyTime) { + checkResult = false; + ereport(WARNING, (errmodule(MOD_RTO_RPO), + errmsg("[On-demand] ReplyMessageCheck to false, rtBuildCtrl->prevBuildPtr: %lu, " + "rtBuildCtrl->realtimeBuildPtr: %lu, " + "rtBuildCtrl->prevReplyTime: %ld, rtBuildCtrl->replyTime: %ld.", + rtBuildCtrl->prevBuildPtr, rtBuildCtrl->realtimeBuildPtr, + rtBuildCtrl->prevReplyTime, rtBuildCtrl->replyTime))); + } + // no record needs to build, rto = 0 + if (XLByteEQ(GetXLogInsertEndRecPtr(), rtBuildCtrl->realtimeBuildPtr)) { + rtBuildCtrl->currentRTO = 0; + checkResult = false; + } + // at lease get tow replies beform calculate rto. + if (rtBuildCtrl->prevReplyTime == 0 || !checkResult) { + rtBuildCtrl->prevCalculateTime = rtBuildCtrl->replyTime; + rtBuildCtrl->periodTotalBuild = 0; + checkResult = false; + } + return checkResult; +} + +static void SSRealtimeBuildCalculateCurrentRTO(int srcId) +{ + if (!SS_NORMAL_PRIMARY || !ReplyMessageCheck(srcId)) { + return; + } + realtime_build_ctrl_t *rtBuildCtrl = &g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId]; + long millisecTimeDiff = SSLogCtrlCalculateTimeDiff(rtBuildCtrl->prevReplyTime, rtBuildCtrl->replyTime); + if (millisecTimeDiff < LOG_CTRL_REPORT_TIME_INTERVAL) { + Assert(false); + return; + } + + XLogRecPtr buildPtr = rtBuildCtrl->realtimeBuildPtr; + uint64 needBuild = GetXLogInsertEndRecPtr() - buildPtr; + uint64 newBuild = buildPtr - rtBuildCtrl->prevBuildPtr; + uint64 periodTotalBuild = rtBuildCtrl->periodTotalBuild + newBuild; + + long calculateTimeDiff = SSLogCtrlCalculateTimeDiff(rtBuildCtrl->prevCalculateTime, rtBuildCtrl->replyTime); + if (calculateTimeDiff <= 0) { + ereport(LOG, (errmodule(MOD_RTO_RPO), + errmsg("[On-demand] SSRealtimeBuildCalculateCurrentRTO calculateTimeDiff <= 0, " + "rtBuildCtrl->prevCalculateTime %ld, rtBuildCtrl->replyTime %ld", + rtBuildCtrl->prevCalculateTime, rtBuildCtrl->replyTime))); + return; + } + + if ((rtBuildCtrl->buildRate >> SHIFT_SPEED) > 1) { + if (calculateTimeDiff > CALCULATE_INTERVAL_MILLISECONDS || IsRealtimeBuildRtoOverTarget(srcId)) { + rtBuildCtrl->buildRate = LogCtrlCountBigSpeed(rtBuildCtrl->buildRate, + (uint64)(periodTotalBuild / calculateTimeDiff)); + rtBuildCtrl->prevCalculateTime = rtBuildCtrl->replyTime; + } + } else { + rtBuildCtrl->buildRate = (uint64)((newBuild / millisecTimeDiff) << SHIFT_SPEED); + } + if (rtBuildCtrl->prevCalculateTime == rtBuildCtrl->replyTime) { + rtBuildCtrl->periodTotalBuild = 0; + } else { + rtBuildCtrl->periodTotalBuild = periodTotalBuild; + } + + uint64 buildSpeed = (rtBuildCtrl->buildRate >> SHIFT_SPEED); // units: byte/ms + if (buildSpeed == 0) { + buildSpeed = MIN_BUILD_SPEED; + } + + uint64 secRTO = needBuild / buildSpeed / MILLISECONDS_PER_SECONDS; + rtBuildCtrl->currentRTO = secRTO; + + ereport(DEBUG4, (errmodule(MOD_RTO_RPO), + errmsg("[On-demand] The RTO estimated is = : %lu seconds, or %lu microseconds. " + "realtimeBuildPtr is %X/%X, prevBuildPtr is %X/%X, calculateTimeDiff is %ld, " + "needBuild is %lu, buildSpeed is %lu, srcId is %d.", + secRTO, (needBuild / buildSpeed), + (uint32)(rtBuildCtrl->realtimeBuildPtr >> 32), (uint32)rtBuildCtrl->realtimeBuildPtr, + (uint32)(rtBuildCtrl->prevBuildPtr >> 32), (uint32)rtBuildCtrl->prevBuildPtr, + millisecTimeDiff, needBuild, buildSpeed, srcId))); +} + +const int CDF_RANGE = 2; +const int CDF_LEFT = -CDF_RANGE; +const int CDF_RIGHT = CDF_RANGE; +const double CDF_MEAN = 0; +const double CDF_STDDEV = 1.0; + +static inline double GaussianCdf(double x, double mean, double stddev) +{ + return 0.5 * erfc(-(x - mean) / (stddev * sqrt(2))); +} + +static double CalculateSleepTimeByCdf(int64 currentRTO, int64 targetRTO) +{ + double x = ((double)currentRTO - targetRTO / 2) / (double)(targetRTO / 2); + double tx = x * (CDF_RIGHT - CDF_LEFT) - CDF_RANGE; + double range = GaussianCdf(CDF_RIGHT, CDF_MEAN, CDF_STDDEV) - GaussianCdf(CDF_LEFT, CDF_MEAN, CDF_STDDEV); + double fx = GaussianCdf(tx, CDF_MEAN, CDF_STDDEV) - GaussianCdf(CDF_LEFT, CDF_MEAN, CDF_STDDEV); + ereport(DEBUG4, (errmodule(MOD_RTO_RPO), + errmsg("[On-demand] CalculateSleepTimeByCdf currentRTO: %ld, targetRTO: %ld, fx: %lf, " + "range: %lf, fx/range (sleepTime): %lf.", + currentRTO, targetRTO, fx, range, fx / range))); + return fx / range; +} + +#define MIN_LOG_CTRL_ENABLE_RTO (g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto / 2) +static int SSRealtimeBuildCalculateSleepTime(int srcId) +{ + realtime_build_ctrl_t *rtBuildCtrl = &g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId]; + if (!SS_PRIMARY_ENABLE_TARGET_RTO) { + return 0; + } + int maxSleepTime; + int sleepTime = 0; + if (g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto >= NEEDS_LARGE_RANGE) { + maxSleepTime = LONG_LOG_CTRL_SLEEP_MICROSECONDS; + } else { + maxSleepTime = SHORT_LOG_CTRL_SLEEP_MICROSECONDS; + } + if (rtBuildCtrl->currentRTO < MIN_LOG_CTRL_ENABLE_RTO) { + sleepTime = 0; + ereport(DEBUG4, (errmodule(MOD_RTO_RPO), + errmsg("[On-demand] The RTO estimated is = : %lu seconds. sleeptime is = %d microseconds.", + rtBuildCtrl->currentRTO, sleepTime))); + } else if (rtBuildCtrl->currentRTO >= g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto) { + sleepTime = maxSleepTime; + ereport(DEBUG4, (errmodule(MOD_RTO_RPO), + errmsg("[On-demand] The RTO estimated is = : %lu seconds. sleeptime is = %d microseconds.", + rtBuildCtrl->currentRTO, sleepTime))); + } else { + // method 2 + int64 targetRTO = (int64)g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto; + sleepTime = (int)(CalculateSleepTimeByCdf(rtBuildCtrl->currentRTO, targetRTO) *maxSleepTime); + sleepTime = Min(maxSleepTime, Max(0, sleepTime)); + ereport(DEBUG4, (errmodule(MOD_RTO_RPO), + errmsg("[On-demand] The RTO estimated is = : %lu seconds. sleeptime is = %d microseconds.", + rtBuildCtrl->currentRTO, sleepTime))); + } + return sleepTime; +} + +const int LOG_UPDATA_GAP = SHORT_LOG_CTRL_SLEEP_MICROSECONDS / 2; +void SSRealtimeBuildUpdatGlobalSleepTime(int srcId, int localSleepTime) +{ + int oldSleepTime = g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime; + int newSleepTime = 0; + SpinLockAcquire(&g_instance.dms_cxt.SSRecoveryInfo.sleepTimeSyncLock); + if (SS_PRIMARY_ENABLE_TARGET_RTO) { + g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId].sleepTime = localSleepTime; + int maxSleepTime = 0; + for (int i = 0; i < DMS_MAX_INSTANCES; i++) { + maxSleepTime = Max(maxSleepTime, g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[i].sleepTime); + } + g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime = maxSleepTime; + } else { + g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime = 0; + } + newSleepTime = g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime; + SpinLockRelease(&g_instance.dms_cxt.SSRecoveryInfo.sleepTimeSyncLock); + if ((oldSleepTime == 0 && newSleepTime > 0) || + (oldSleepTime > 0 && newSleepTime == 0) || + (oldSleepTime > LOG_UPDATA_GAP && newSleepTime < LOG_UPDATA_GAP) || + (oldSleepTime < LOG_UPDATA_GAP && newSleepTime > LOG_UPDATA_GAP) || + (oldSleepTime != LONG_LOG_CTRL_SLEEP_MICROSECONDS && newSleepTime == LONG_LOG_CTRL_SLEEP_MICROSECONDS) || + (oldSleepTime != SHORT_LOG_CTRL_SLEEP_MICROSECONDS && newSleepTime == SHORT_LOG_CTRL_SLEEP_MICROSECONDS)) { + ereport(LOG, (errmodule(MOD_RTO_RPO), + errmsg("[On-demand] realtime-build log ctl global sleep time update, " + "oldSleepTime: %d microseconds, newSleepTime: %d microseconds.", + oldSleepTime, newSleepTime))); + } +} + +void SSRealtimebuildLogCtrl(int srcId) +{ + if (!SS_PRIMARY_ENABLE_TARGET_RTO) { + return; + } + // step 1:calculate rto for instance: srcId + SSRealtimeBuildCalculateCurrentRTO(srcId); + + // step 2:calculate sleep time for instance: srcId + int localSleepTime = SSRealtimeBuildCalculateSleepTime(srcId); + + // step 3:update global sleep time + SSRealtimeBuildUpdatGlobalSleepTime(srcId, localSleepTime); +} diff --git a/src/gausskernel/storage/replication/syncrep.cpp b/src/gausskernel/storage/replication/syncrep.cpp index ff01ed30b..8993519c7 100755 --- a/src/gausskernel/storage/replication/syncrep.cpp +++ b/src/gausskernel/storage/replication/syncrep.cpp @@ -113,6 +113,7 @@ static int standby_priority_comparator(const void *a, const void *b); static inline void free_sync_standbys_list(List* sync_standbys); static int cmp_lsn(const void *a, const void *b); static bool DelayIntoMostAvaSync(bool checkSyncNum, SyncStandbyNumState state = STANDBIES_EMPTY); +SyncWaitRet SSRealtimeBuildWaitForTime(XLogRecPtr XactCommitLSN); typedef struct TransContext { /* for global */ @@ -228,6 +229,9 @@ bool SynRepWaitCatchup(XLogRecPtr XactCommitLSN) */ SyncWaitRet SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel) { + if (SS_PRIMARY_ENABLE_TARGET_RTO) { + return SSRealtimeBuildWaitForTime(XactCommitLSN); + } char *new_status = NULL; const char *old_status = NULL; int mode = u_sess->attr.attr_storage.sync_rep_wait_mode; @@ -495,6 +499,58 @@ SyncWaitRet SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel) return waitStopRes; } +/* + * Used in realtime-build log ctrl. + * threads will wait for a while befor commit if need log ctrl. + */ +SyncWaitRet SSRealtimeBuildWaitForTime(XLogRecPtr XactCommitLSN) +{ + SyncWaitRet waitStopRes = NOT_REQUEST; + + /* + * Only wait on prmiary node, and realtimeBuildLogCtrl enable. + */ + if (!SS_PRIMARY_ENABLE_TARGET_RTO) { + return NOT_REQUEST; + } + + /* Prevent the unexpected cleanups to be influenced by external interruptions */ + HOLD_INTERRUPTS(); + + /* + * Wait for a while, until globalSleepTime timeout. + */ + const int syncSleepTimeInterval = 1000; + int targetSleepTime = g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime; + if (targetSleepTime > 0) { + ereport(DEBUG4, + (errmsg("SSRealtimeBuildWaitForTime sleep %d microseconds before commit.", targetSleepTime))); + } + + if (targetSleepTime == 0) { + return NOT_REQUEST; + } + for (int sleepTime = 0; sleepTime < targetSleepTime; sleepTime += syncSleepTimeInterval) { + int currentSleepTime = g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime; + if (sleepTime >= currentSleepTime) { + waitStopRes = SYNC_COMPLETE; + t_thrd.proc->syncRepState = SYNC_REP_WAIT_COMPLETE; + RESUME_INTERRUPTS(); + return REPSYNCED; + } else if (!SS_PRIMARY_ENABLE_TARGET_RTO) { + t_thrd.proc->syncRepState = SYNC_REP_NOT_WAITING; + RESUME_INTERRUPTS(); + return STOP_WAIT; + } + pg_usleep(syncSleepTimeInterval); + } + + RESUME_INTERRUPTS(); + waitStopRes = SYNC_COMPLETE; + t_thrd.proc->syncRepState = SYNC_REP_WAIT_COMPLETE; + return REPSYNCED; +} + void SyncRepCleanupAtProcExit(void) { if (t_thrd.proc->syncRepLinks.prev || t_thrd.proc->syncRepLinks.next || diff --git a/src/include/access/ondemand_extreme_rto/dispatcher.h b/src/include/access/ondemand_extreme_rto/dispatcher.h index c2e69d89b..2f8399e5a 100644 --- a/src/include/access/ondemand_extreme_rto/dispatcher.h +++ b/src/include/access/ondemand_extreme_rto/dispatcher.h @@ -185,6 +185,7 @@ typedef struct { * control file into standby node's, when standby node shutdown. */ ControlFileData* restoreControlFile; + TimestampTz reportTime; } LogDispatcher; typedef struct { diff --git a/src/include/access/ondemand_extreme_rto/redo_utils.h b/src/include/access/ondemand_extreme_rto/redo_utils.h index 26ed887aa..224291930 100644 --- a/src/include/access/ondemand_extreme_rto/redo_utils.h +++ b/src/include/access/ondemand_extreme_rto/redo_utils.h @@ -32,6 +32,8 @@ typedef enum { PARSE_TYPE_SEG, } XLogRecParseType; +const long MIN_REPLY_MILLISEC_TIME_DIFF = 200; + Size OndemandRecoveryShmemSize(void); void OndemandRecoveryShmemInit(void); void OndemandXlogFileIdCacheInit(void); @@ -57,4 +59,10 @@ bool IsRecParseStateHaveChildState(XLogRecParseState *checkState); void OndemandGlobalXLogMemReleaseIfNeed(RedoMemManager *memctl); bool OndemandAllowBufAccess(); +bool IsRealtimeBuildRtoOverTarget(int srcId); + +// return unit: millisecond +long SSLogCtrlCalculateTimeDiff(TimestampTz startTime, TimestampTz endTime); +void SSRealtimebuildLogCtrl(int srcId); +void SSRealtimeBuildUpdateGlobalSyncLSN(); #endif /* ONDEMAND_EXTREME_RTO_REDO_UTILS_H */ \ No newline at end of file diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index 4ba2f9680..b7b59ac73 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -184,6 +184,8 @@ typedef enum SSBroadcastOp { BCAST_CHECK_DB_BACKENDS, BCAST_SEND_SNAPSHOT, BCAST_RELOAD_REFORM_CTRL_PAGE, + BCAST_REALTIME_BUILD_LOG_CTRL_ENABLE, + BCAST_REPORT_REALTIME_BUILD_PTR, BCAST_END } SSBroadcastOp; diff --git a/src/include/ddes/dms/ss_dms_recovery.h b/src/include/ddes/dms/ss_dms_recovery.h index 026de8f27..8cd55a0ab 100644 --- a/src/include/ddes/dms/ss_dms_recovery.h +++ b/src/include/ddes/dms/ss_dms_recovery.h @@ -58,6 +58,15 @@ #define SS_ONDEMAND_RECOVERY_TRXN_QUEUE_FULL (ENABLE_DMS && \ g_instance.dms_cxt.SSRecoveryInfo.ondemand_recovery_pause_status == PAUSE_FOR_PRUNE_TRXN_QUEUE) +#define ENABLE_REALTIME_BUILD_TARGET_RTO (ENABLE_DMS && \ + g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto > 0 && \ + !g_instance.attr.attr_storage.ss_enable_dorado && \ + !g_instance.attr.attr_storage.ss_stream_cluster) +#define SS_PRIMARY_ENABLE_TARGET_RTO (ENABLE_REALTIME_BUILD_TARGET_RTO && \ + SS_NORMAL_PRIMARY && g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl > 0) +#define SS_STANDBY_ENABLE_TARGET_RTO (SS_NORMAL_STANDBY && \ + SS_ONDEMAND_REALTIME_BUILD_NORMAL && g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl > 0) + #define REFORM_CTRL_VERSION 1 typedef struct st_reformer_ctrl { uint32 version; @@ -128,13 +137,25 @@ typedef struct ondemand_recovery_stat { uint64 recordItemMemUsed; } ondemand_recovery_stat; +typedef struct realtime_build_log_ctrl { + int64 currentRTO; + TimestampTz prevReplyTime; + TimestampTz prevCalculateTime; + TimestampTz replyTime; + uint64 periodTotalBuild; + uint64 buildRate; + XLogRecPtr prevBuildPtr; + XLogRecPtr realtimeBuildPtr; + int sleepTime; +} realtime_build_ctrl_t; + typedef struct ss_recovery_info { bool recovery_pause_flag; volatile failover_ckpt_status_t failover_ckpt_status; char recovery_xlog_dir[MAXPGPATH]; int recovery_inst_id; volatile SSGlobalClusterState cluster_ondemand_status; - char xlog_list[DMS_MAX_INSTANCE][MAXPGPATH];; + char xlog_list[DMS_MAX_INSTANCE][MAXPGPATH]; LWLock* update_seg_lock; bool new_primary_reset_walbuf_flag; bool ready_to_startup; // when DB start (except failover), the flag will set true @@ -152,6 +173,10 @@ typedef struct ss_recovery_info { bool disaster_cluster_promoting; // standby cluster is promoting volatile ondemand_recovery_pause_status_t ondemand_recovery_pause_status; bool realtime_build_in_reform; // used to avoid starting realtime build during reform + volatile bool enableRealtimeBuildLogCtrl; + slock_t sleepTimeSyncLock; + volatile int globalSleepTime; + realtime_build_ctrl_t rtBuildCtrl[DMS_MAX_INSTANCES]; } ss_recovery_info_t; typedef struct ondemand_htab_ctrl { diff --git a/src/include/ddes/dms/ss_transaction.h b/src/include/ddes/dms/ss_transaction.h index 656f78e23..5e3027420 100644 --- a/src/include/ddes/dms/ss_transaction.h +++ b/src/include/ddes/dms/ss_transaction.h @@ -94,6 +94,17 @@ typedef struct SSBroadcasDbBackendsAck { int count; } SSBroadcastDbBackendsAck; +typedef struct SSBroadcastRealtimeBuildLogCtrl { + SSBroadcastOp type; // must be first + bool enableLogCtrl; +} SSBroadcastRealtimeBuildLogCtrl; + +typedef struct SSBroadcastRealtimeBuildPtr { + SSBroadcastOp type; // must be first + XLogRecPtr realtimeBuildPtr; + int srcInstId; +} SSBroadcastRealtimeBuildPtr; + Snapshot SSGetSnapshotData(Snapshot snapshot); CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCommit, bool isMvcc, bool isNest, Snapshot snapshot, bool* sync); @@ -101,6 +112,8 @@ void SSTransactionIdDidCommit(TransactionId transactionId, bool *ret_did_commit) void SSTransactionIdIsInProgress(TransactionId transactionId, bool *in_progress); TransactionId SSMultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask, uint16 t_infomask2); bool SSGetOldestXminFromAllStandby(TransactionId xmin, TransactionId xmax, CommitSeqNo csn); +void SSBroadcastRealtimeBuildLogCtrlEnable(bool canncelInReform); +bool SSReportRealtimeBuildPtr(XLogRecPtr realtimeBuildPtr); int SSGetOldestXmin(char *data, uint32 len, char *output_msg, uint32 *output_msg_len); int SSGetOldestXminAck(SSBroadcastXminAck *ack_data); void SSIsPageHitDms(RelFileNode& node, BlockNumber page, int pagesNum, uint64 *pageMap, int *bitCount); @@ -125,5 +138,7 @@ int SSUpdateLatestSnapshotOfStandby(char *data, uint32 len, char *output_msg, ui int SSReloadReformCtrlPage(uint32 len); void SSRequestAllStandbyReloadReformCtrlPage(); bool SSCanFetchLocalSnapshotTxnRelatedInfo(); +int SSUpdateRealtimeBuildLogCtrl(char* data, uint32 len); +int SSGetStandbyRealtimeBuildPtr(char* data, uint32 len); #endif diff --git a/src/include/knl/knl_guc/knl_instance_attr_storage.h b/src/include/knl/knl_guc/knl_instance_attr_storage.h index fcf5f9491..caaa7c466 100755 --- a/src/include/knl/knl_guc/knl_instance_attr_storage.h +++ b/src/include/knl/knl_guc/knl_instance_attr_storage.h @@ -104,6 +104,7 @@ typedef struct knl_instance_attr_dms { bool enable_ondemand_realtime_build; bool enable_ondemand_recovery; int ondemand_recovery_mem_size; + int realtime_build_target_rto; int instance_id; int recv_msg_pool_size; char* interconnect_url; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 488433ffc..223c5d788 100755 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -153,6 +153,7 @@ extern int syncrep_scanner_yylex(syncrep_scanner_YYSTYPE* lvalp, YYLTYPE* llocp, extern void syncrep_scanner_yyerror(const char* message, syncrep_scanner_yyscan_t yyscanner); extern void AtomicUpdateIfGreater(volatile XLogRecPtr* ptr, XLogRecPtr newVal, bool* result); extern bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, bool* am_sync, bool check_am_sync = true); +extern SyncWaitRet SSRealtimeBuildWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel); #ifndef ENABLE_MULTIPLE_NODES extern void SetXactLastCommitToSyncedStandby(XLogRecPtr recptr); #endif