!6677 【feature】资源池化实时构建适配流控

Merge pull request !6677 from 周聪/dev_realtime_build_log_ctrl_pr
This commit is contained in:
opengauss_bot
2024-11-19 09:34:49 +00:00
committed by Gitee
15 changed files with 592 additions and 13 deletions

View File

@ -117,6 +117,7 @@
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/dcf_replication.h"
#include "replication/ss_disaster_cluster.h"
#include "storage/buf/bufmgr.h"
#include "storage/cucache_mgr.h"
#include "storage/smgr/fd.h"
@ -155,6 +156,7 @@
#include "access/ustore/knl_undoworker.h"
#include "ddes/dms/ss_init.h"
#include "ddes/dms/ss_dms.h"
#include "ddes/dms/ss_transaction.h"
#include "storage/dss/dss_log.h"
#define atooid(x) ((Oid)strtoul((x), NULL, 10))
@ -7002,6 +7004,32 @@ static void assign_dcf_log_backup_file_count(int newval, void* extra)
static void assign_dcf_flow_control_rto(int newval, void *extra)
{
int oldval = u_sess->attr.attr_storage.target_rto;
// used in realtime-build log ctrl
if (ENABLE_DMS && !SS_DISASTER_CLUSTER && t_thrd.proc_cxt.MyProcPid == PostmasterPid) {
// clean realtime-build log ctrl cache, before realtime-build log ctrl enable.
if (g_instance.dms_cxt.dmsInited && SS_PRIMARY_MODE && oldval == 0 && newval > 0) {
g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = false;
SpinLockInit(&g_instance.dms_cxt.SSRecoveryInfo.sleepTimeSyncLock);
g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime = 0;
errno_t rc = memset_s(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl,
sizeof(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl),
0,
sizeof(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl));
securec_check(rc, "", "");
}
g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto = newval;
// make realtime-build logctrl disable, when recovery_time_target is set to 0;
if (g_instance.dms_cxt.dmsInited && SS_PRIMARY_MODE && oldval > 0 && newval == 0) {
g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = false;
}
// notify nodes, start or stop realtime-build log ctrl.
if (g_instance.dms_cxt.dmsInited && SS_PRIMARY_MODE && !SS_IN_REFORM &&
((oldval == 0 && newval > 0) || (oldval > 0 && newval == 0))) {
SSBroadcastRealtimeBuildLogCtrlEnable(true);
}
}
if (t_thrd.proc_cxt.MyProcPid == PostmasterPid) {
dcf_set_param("DN_FLOW_CONTROL_RTO", std::to_string(newval).c_str());
}

View File

@ -1328,6 +1328,12 @@ static int32 CBProcessBroadcast(void *db_handle, dms_broadcast_context_t *broad_
case BCAST_RELOAD_REFORM_CTRL_PAGE:
ret = SSReloadReformCtrlPage(len);
break;
case BCAST_REALTIME_BUILD_LOG_CTRL_ENABLE:
ret = SSUpdateRealtimeBuildLogCtrl(data, len);
break;
case BCAST_REPORT_REALTIME_BUILD_PTR:
ret = SSGetStandbyRealtimeBuildPtr(data, len);
break;
default:
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS] invalid broadcast operate type")));
ret = DMS_ERROR;
@ -1871,7 +1877,7 @@ static void FailoverCleanBackends()
/* check and print some thread which no exit. */
int backendNum = SSCountAndPrintChildren(BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC);
ereport (WARNING, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] there are %d backends can not exit! "
"wait_time = %ds", backendNum, wait_time / FAILOVER_TIME_CONVERT)));
"wait_time = %lds", backendNum, wait_time / FAILOVER_TIME_CONVERT)));
if (dms_reform_failed()) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] reform failed during clean backends")));
@ -1883,6 +1889,25 @@ static void FailoverCleanBackends()
}
}
static void RestartRealtimeBuildCtrl()
{
if (SS_IN_REFORM && g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl) {
ereport(LOG, (errmsg("[SS reform][On-demand] reform happened, disable realtime build log ctrl, "
"and will make it enable again after reform if needed.")));
}
g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = false;
SpinLockInit(&g_instance.dms_cxt.SSRecoveryInfo.sleepTimeSyncLock);
g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime = 0;
errno_t rc = memset_s(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl,
sizeof(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl),
0,
sizeof(g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl));
securec_check(rc, "", "");
if (SS_PRIMARY_MODE && ENABLE_REALTIME_BUILD_TARGET_RTO) {
SSBroadcastRealtimeBuildLogCtrlEnable(false);
}
}
static int reform_type_str_len = 30;
static void ReformTypeToString(dms_reform_type_t reform_type, char* ret_str)
{
@ -2011,18 +2036,18 @@ static void CBReformStartNotify(void *db_handle, dms_reform_start_context_t *rs_
ReformTypeToString(reform_info->reform_type, reform_type_str);
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform] reform start, role:%d, reform type:SS %s, standby scenario:%d, "
"bitmap_reconnect:%d, reform_ver:%ld.",
"bitmap_reconnect:%llu, reform_ver:%ld.",
reform_info->dms_role, reform_type_str, SSPerformingStandbyScenario(),
reform_info->bitmap_reconnect, reform_info->reform_ver)));
if (reform_info->dms_role == DMS_ROLE_REFORMER) {
SSGrantDSSWritePermission();
}
int old_primary = SSGetPrimaryInstId();
SSReadControlFile(old_primary, true);
g_instance.dms_cxt.SSReformInfo.old_bitmap = g_instance.dms_cxt.SSReformerControl.list_stable;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] old cluster node bitmap: %lu", g_instance.dms_cxt.SSReformInfo.old_bitmap)));
g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = false;
if (g_instance.dms_cxt.SSRecoveryInfo.in_failover) {
FailoverCleanBackends();
} else if (SSBackendNeedExitScenario()) {
@ -2075,18 +2100,21 @@ static int CBReformDoneNotify(void *db_handle)
errmsg("[SS reform] Reform success, instance:%d is running.",
g_instance.attr.attr_storage.dms_attr.instance_id)));
if (ENABLE_REALTIME_BUILD_TARGET_RTO && SS_PRIMARY_MODE) {
RestartRealtimeBuildCtrl();
}
/* reform success indicates that reform of primary and standby all complete, then update gaussdb.state */
g_instance.dms_cxt.dms_status = (dms_status_t)DMS_STATUS_IN;
SendPostmasterSignal(PMSIGNAL_DMS_REFORM_DONE);
g_instance.dms_cxt.SSClusterState = NODESTATE_NORMAL;
g_instance.dms_cxt.SSRecoveryInfo.realtime_build_in_reform = false;
g_instance.dms_cxt.SSReformInfo.in_reform = false;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] reform done: pmState=%d, SSClusterState=%d, demotion=%d-%d, "
"rec=%d, dmsStatus=%d.", pmState, g_instance.dms_cxt.SSClusterState,
g_instance.demotion, t_thrd.walsender_cxt.WalSndCtl->demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery,
g_instance.dms_cxt.dms_status)));
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform] reform done: pmState=%d, SSClusterState=%d, demotion=%d-%d, "
"rec=%d, dmsStatus=%d.", pmState, g_instance.dms_cxt.SSClusterState,
g_instance.demotion, t_thrd.walsender_cxt.WalSndCtl->demotion,
t_thrd.xlog_cxt.InRecovery, g_instance.dms_cxt.dms_status)));
return GS_SUCCESS;
}

View File

@ -39,6 +39,7 @@
#include "ddes/dms/ss_dms_recovery.h"
#include "ddes/dms/ss_reform_common.h"
#include "ddes/dms/ss_transaction.h"
#include "access/ondemand_extreme_rto/dispatcher.h"
#include "access/double_write.h"
#include "access/twophase.h"
#include <sys/types.h>
@ -293,7 +294,7 @@ XLogRecPtr SSOndemandRequestPrimaryCkptAndGetRedoLsn()
if (dms_req_opengauss_immediate_checkpoint(&dms_ctx, (unsigned long long *)&primaryRedoLsn) == GS_SUCCESS) {
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("[SS][On-demand] request primary node %d checkpoint success, redoLoc %X/%X", SS_PRIMARY_ID,
(uint32)(primaryRedoLsn << 32), (uint32)primaryRedoLsn)));
(uint32)(primaryRedoLsn >> 32), (uint32)primaryRedoLsn)));
return primaryRedoLsn;
}
ereport(DEBUG1, (errmodule(MOD_DMS),

View File

@ -28,6 +28,7 @@
#include "storage/procarray.h"
#include "storage/buf/bufmgr.h"
#include "storage/smgr/segment_internal.h"
#include "access/multi_redo_api.h"
#include "ddes/dms/ss_transaction.h"
#include "ddes/dms/ss_reform_common.h"
#include "ddes/dms/ss_dms_bufmgr.h"
@ -1134,4 +1135,144 @@ bool SSGetOldestXminFromAllStandby(TransactionId xmin, TransactionId xmax, Commi
return false;
}
} while (ret != DMS_SUCCESS);
}
/* broadcast to standby node update realtime-build logctrl enable */
void SSBroadcastRealtimeBuildLogCtrlEnable(bool canncelInReform)
{
dms_context_t dms_ctx;
InitDmsContext(&dms_ctx);
int ret;
SSBroadcastRealtimeBuildLogCtrl logCtrl;
logCtrl.type = BCAST_REALTIME_BUILD_LOG_CTRL_ENABLE;
logCtrl.enableLogCtrl = ENABLE_REALTIME_BUILD_TARGET_RTO;
dms_broadcast_info_t dms_broad_info = {
.data = (char *)&logCtrl,
.len = sizeof(SSBroadcastRealtimeBuildLogCtrl),
.output = NULL,
.output_len = NULL,
.scope = DMS_BROADCAST_ONLINE_LIST,
.inst_map = 0,
.timeout = SS_BROADCAST_WAIT_ONE_SECOND,
.handle_recv_msg = (unsigned char)false,
.check_session_kill = (unsigned char)true
};
do {
ret = dms_broadcast_msg(&dms_ctx, &dms_broad_info);
if (ret == DMS_SUCCESS || (canncelInReform && SS_IN_REFORM)) {
break;
}
pg_usleep(5000L);
} while (ret != DMS_SUCCESS);
if (ret == DMS_SUCCESS) {
ereport(LOG,
(errmsg("[SS reform][On-demand] notify standby node update realtime-build log ctrl enable success, "
"enableLogCtrl: %d", logCtrl.enableLogCtrl)));
}
}
int SSUpdateRealtimeBuildLogCtrl(char* data, uint32 len)
{
if (unlikely(len != sizeof(SSBroadcastRealtimeBuildLogCtrl))) {
return DMS_ERROR;
}
if (!ENABLE_ONDEMAND_REALTIME_BUILD) {
return DMS_SUCCESS;
}
SSBroadcastRealtimeBuildLogCtrl *logCtrlEnable = (SSBroadcastRealtimeBuildLogCtrl *)data;
g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = logCtrlEnable->enableLogCtrl;
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[On-demand] Update standby realtime-build log ctrl %s, "
"enableLogCtrl: %s, enable_ondemand_realtime_build: true.",
logCtrlEnable->enableLogCtrl ? "enable" : "disable",
logCtrlEnable->enableLogCtrl ? "true" : "false")));
return DMS_SUCCESS;
}
/* report realtime-build ptr to primary */
bool SSReportRealtimeBuildPtr(XLogRecPtr realtimeBuildPtr)
{
if (!SS_STANDBY_ENABLE_TARGET_RTO) {
return false;
}
dms_context_t dms_ctx;
InitDmsContext(&dms_ctx);
int ret;
SSBroadcastRealtimeBuildPtr reportMessage;
reportMessage.type = BCAST_REPORT_REALTIME_BUILD_PTR;
reportMessage.realtimeBuildPtr = realtimeBuildPtr;
reportMessage.srcInstId = SS_MY_INST_ID;
dms_broadcast_info_t dms_broad_info = {
.data = (char *)&reportMessage,
.len = sizeof(SSBroadcastRealtimeBuildPtr),
.output = NULL,
.output_len = NULL,
.scope = DMS_BROADCAST_SPECIFY_LIST,
.inst_map = (unsigned long long)1 << SS_PRIMARY_ID,
.timeout = SS_BROADCAST_WAIT_ONE_SECOND,
.handle_recv_msg = (unsigned char)false,
.check_session_kill = (unsigned char)true
};
do {
ret = dms_broadcast_msg(&dms_ctx, &dms_broad_info);
if (ret == DMS_SUCCESS) {
break;
}
if (!SS_STANDBY_ENABLE_TARGET_RTO) {
break;
}
pg_usleep(5000L);
} while (ret != DMS_SUCCESS);
return ret == DMS_SUCCESS;
}
int SSGetStandbyRealtimeBuildPtr(char* data, uint32 len)
{
if (unlikely(len != sizeof(SSBroadcastRealtimeBuildPtr))) {
return DMS_ERROR;
}
if (!ENABLE_REALTIME_BUILD_TARGET_RTO || !SS_PRIMARY_MODE || SS_IN_REFORM) {
return DMS_SUCCESS;
}
SSBroadcastRealtimeBuildPtr *receiveMessage = (SSBroadcastRealtimeBuildPtr *)data;
XLogRecPtr realtimePtr = receiveMessage->realtimeBuildPtr;
int srcId = receiveMessage->srcInstId;
if (!g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl) {
g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl = true;
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS][On-demand] Get realtime-build ptr from standby inst_id: %d,"
"enable realtime-build log ctrl, replayEndRecPtr: %X/%X",
SS_PRIMARY_ID, (uint32)(realtimePtr >> 32), (uint32)realtimePtr)));
}
realtime_build_ctrl_t *rtBuildCtrl = &g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId];
// If the time interval betwen two reply < 100 ms, ignore this reply.
TimestampTz currentTime = GetCurrentTimestamp();
if (SSLogCtrlCalculateTimeDiff(rtBuildCtrl->replyTime, currentTime) <= MIN_REPLY_MILLISEC_TIME_DIFF) {
return DMS_SUCCESS;
}
rtBuildCtrl->prevReplyTime = rtBuildCtrl->replyTime;
rtBuildCtrl->replyTime = currentTime;
rtBuildCtrl->prevBuildPtr = rtBuildCtrl->realtimeBuildPtr;
rtBuildCtrl->realtimeBuildPtr = realtimePtr;
if (rtBuildCtrl->prevBuildPtr > rtBuildCtrl->realtimeBuildPtr) {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS][On-demand] Get realtimeBuild lsn from standby node %d is less than prevBuildPtr, "
" prevBuildPtr: %X/%X, realtime-build ptr: %X/%X.", srcId,
(uint32)(rtBuildCtrl->prevBuildPtr >> 32), (uint32)rtBuildCtrl->prevBuildPtr,
(uint32)(rtBuildCtrl->realtimeBuildPtr >> 32), (uint32)rtBuildCtrl->realtimeBuildPtr)));
} else if (rtBuildCtrl->prevBuildPtr != InvalidXLogRecPtr) {
ereport(DEBUG4, (errmodule(MOD_DMS),
errmsg("[SS][On-demand] Get realtimeBuild lsn from standby inst_id %d, "
" prevBuildPtr: %X/%X, realtime-build ptr: %X/%X.", srcId,
(uint32)(rtBuildCtrl->prevBuildPtr >> 32), (uint32)rtBuildCtrl->prevBuildPtr,
(uint32)(rtBuildCtrl->realtimeBuildPtr >> 32), (uint32)rtBuildCtrl->realtimeBuildPtr)));
if (rtBuildCtrl->prevBuildPtr != InvalidXLogRecPtr) {
SSRealtimebuildLogCtrl(srcId);
}
}
return DMS_SUCCESS;
}

View File

@ -215,7 +215,14 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt)
dms_cxt->SSRecoveryInfo.disaster_cluster_promoting = false;
dms_cxt->SSRecoveryInfo.dorado_sharestorage_inited = false;
dms_cxt->SSRecoveryInfo.ondemand_recovery_pause_status = NOT_PAUSE;
dms_cxt->SSRecoveryInfo.in_ondemand_recovery = false;
dms_cxt->SSRecoveryInfo.enableRealtimeBuildLogCtrl = false;
dms_cxt->SSRecoveryInfo.globalSleepTime = 0;
dms_cxt->SSRecoveryInfo.sleepTimeSyncLock = (slock_t)0;
errno_t rc = memset_s(dms_cxt->SSRecoveryInfo.rtBuildCtrl,
sizeof(dms_cxt->SSRecoveryInfo.rtBuildCtrl),
0,
sizeof(dms_cxt->SSRecoveryInfo.rtBuildCtrl));
securec_check(rc, "", "");
dms_cxt->log_timezone = NULL;
pg_atomic_init_u32(&dms_cxt->inDmsThreShmemInitCnt, 0);
pg_atomic_init_u32(&dms_cxt->inProcExitCnt, 0);

View File

@ -110,6 +110,9 @@ static const int invalid_worker_id = -1;
static const int UNDO_START_BLK = 1;
static const int UHEAP_UPDATE_UNDO_START_BLK = 2;
// unit: ms
static const int LOG_CTRL_REPORT_TIME_INTERVAL = 200;
struct ControlFileData restoreControlFile;
typedef void *(*GetStateFunc)(PageRedoWorker *worker);
@ -176,6 +179,7 @@ void CopyDataFromOldReader(XLogReaderState *newReaderState, const XLogReaderStat
void SendSingalToPageWorker(int signal);
static void RestoreControlFileForRealtimeBuild();
static void LogCtrlReportRealtimeBuildPtr();
/* dispatchTable must consistent with RmgrTable */
static const RmgrDispatchData g_dispatchTable[RM_MAX_ID + 1] = {
@ -439,6 +443,9 @@ void HandleStartupInterruptsForExtremeRto()
if (SS_STANDBY_FAILOVER && pmState == PM_STARTUP && SS_ONDEMAND_REALTIME_BUILD_NORMAL) {
OndemandRealtimeBuildHandleFailover();
}
if (SS_STANDBY_ENABLE_TARGET_RTO) {
LogCtrlReportRealtimeBuildPtr();
}
}
static void SetOndemandXLogParseFlagValue(uint32 maxParseBufNum)
@ -2506,4 +2513,32 @@ static void RestoreControlFileForRealtimeBuild() {
LWLockRelease(ControlFileLock);
}
/* Startup report realtime-build ptr to primary node for realtime-build log ctrl. */
static void LogCtrlReportRealtimeBuildPtr()
{
TimestampTz currentTime = GetCurrentTimestamp();
if (SSLogCtrlCalculateTimeDiff(g_dispatcher->reportTime, currentTime) < LOG_CTRL_REPORT_TIME_INTERVAL) {
return;
}
XLogRecPtr minEnd = t_thrd.shemem_ptr_cxt.XLogCtl->replayEndRecPtr;
if (minEnd == InvalidXLogRecPtr || !SS_STANDBY_ENABLE_TARGET_RTO) {
g_dispatcher->reportTime = currentTime;
return;
}
ereport(DEBUG4, (errmodule(MOD_DMS),
errmsg("[SS][On-demand] send primary node %d realtime-build ptr start, realtime-build %X/%X",
SS_PRIMARY_ID, (uint32)(minEnd >> 32), (uint32)minEnd)));
if (SSReportRealtimeBuildPtr(minEnd)) {
ereport(DEBUG4, (errmodule(MOD_DMS),
errmsg("[SS][On-demand] send primary node %d realtime-build ptr success, "
"realtime-build ptr: %X/%X", SS_PRIMARY_ID,
(uint32)(minEnd >> 32), (uint32)minEnd)));
} else {
ereport(DEBUG4, (errmodule(MOD_DMS),
errmsg("[SS][On-demand] send primary node %d realtime-build ptr fail, realtime-build ptr: %X/%X",
SS_PRIMARY_ID, (uint32)(minEnd >> 32), (uint32)minEnd)));
}
g_dispatcher->reportTime = GetCurrentTimestamp();
}
} // namespace ondemand_extreme_rto

View File

@ -30,6 +30,7 @@
#include "ddes/dms/ss_dms_bufmgr.h"
#include "storage/lock/lwlock.h"
#include "catalog/storage_xlog.h"
#include "replication/syncrep.h"
void PrintXLogRecParseStateBlockHead(XLogRecParseState* blockState);
@ -772,4 +773,233 @@ bool OndemandAllowBufAccess()
return true;
}
return false;
}
}
static const int MICROSECONDS_PER_SECONDS = 1000000;
static const int MILLISECONDS_PER_SECONDS = 1000;
static const int MILLISECONDS_PER_MICROSECONDS = 1000;
static const int SHIFT_SPEED = 3;
static const int CALCULATE_INTERVAL_MILLISECONDS = 2000;
static const uint64 MIN_BUILD_SPEED = 1;
static const int NEEDS_LARGE_RANGE = 60;
static const int LONG_LOG_CTRL_SLEEP_MICROSECONDS = 1500000;
static const int SHORT_LOG_CTRL_SLEEP_MICROSECONDS = 1000000;
static const int LOG_CTRL_REPORT_TIME_INTERVAL = 200;
long SSLogCtrlCalculateTimeDiff(TimestampTz startTime, TimestampTz endTime)
{
long secToTime;
int microsecToTime;
TimestampDifference(startTime, endTime, &secToTime, &microsecToTime);
return secToTime * MILLISECONDS_PER_SECONDS +
microsecToTime / MILLISECONDS_PER_MICROSECONDS;
}
bool IsRealtimeBuildRtoOverTarget(int srcId)
{
realtime_build_ctrl_t *rtBuildCtrl = &g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId];
if (SS_PRIMARY_ENABLE_TARGET_RTO &&
rtBuildCtrl->currentRTO > g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto) {
return true;
}
return false;
}
static inline uint64 LogCtrlCountBigSpeed(uint64 originSpeed, uint64 curSpeed)
{
uint64 updateSpeed = (((originSpeed << SHIFT_SPEED) - originSpeed) >> SHIFT_SPEED) + curSpeed;
return updateSpeed;
}
static bool ReplyMessageCheck(int srcId)
{
realtime_build_ctrl_t *rtBuildCtrl = &g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId];
bool checkResult = true;
if (XLByteLT(rtBuildCtrl->realtimeBuildPtr, rtBuildCtrl->prevBuildPtr) ||
rtBuildCtrl->prevReplyTime >= rtBuildCtrl->replyTime) {
checkResult = false;
ereport(WARNING, (errmodule(MOD_RTO_RPO),
errmsg("[On-demand] ReplyMessageCheck to false, rtBuildCtrl->prevBuildPtr: %lu, "
"rtBuildCtrl->realtimeBuildPtr: %lu, "
"rtBuildCtrl->prevReplyTime: %ld, rtBuildCtrl->replyTime: %ld.",
rtBuildCtrl->prevBuildPtr, rtBuildCtrl->realtimeBuildPtr,
rtBuildCtrl->prevReplyTime, rtBuildCtrl->replyTime)));
}
// no record needs to build, rto = 0
if (XLByteEQ(GetXLogInsertEndRecPtr(), rtBuildCtrl->realtimeBuildPtr)) {
rtBuildCtrl->currentRTO = 0;
checkResult = false;
}
// at lease get tow replies beform calculate rto.
if (rtBuildCtrl->prevReplyTime == 0 || !checkResult) {
rtBuildCtrl->prevCalculateTime = rtBuildCtrl->replyTime;
rtBuildCtrl->periodTotalBuild = 0;
checkResult = false;
}
return checkResult;
}
static void SSRealtimeBuildCalculateCurrentRTO(int srcId)
{
if (!SS_NORMAL_PRIMARY || !ReplyMessageCheck(srcId)) {
return;
}
realtime_build_ctrl_t *rtBuildCtrl = &g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId];
long millisecTimeDiff = SSLogCtrlCalculateTimeDiff(rtBuildCtrl->prevReplyTime, rtBuildCtrl->replyTime);
if (millisecTimeDiff < LOG_CTRL_REPORT_TIME_INTERVAL) {
Assert(false);
return;
}
XLogRecPtr buildPtr = rtBuildCtrl->realtimeBuildPtr;
uint64 needBuild = GetXLogInsertEndRecPtr() - buildPtr;
uint64 newBuild = buildPtr - rtBuildCtrl->prevBuildPtr;
uint64 periodTotalBuild = rtBuildCtrl->periodTotalBuild + newBuild;
long calculateTimeDiff = SSLogCtrlCalculateTimeDiff(rtBuildCtrl->prevCalculateTime, rtBuildCtrl->replyTime);
if (calculateTimeDiff <= 0) {
ereport(LOG, (errmodule(MOD_RTO_RPO),
errmsg("[On-demand] SSRealtimeBuildCalculateCurrentRTO calculateTimeDiff <= 0, "
"rtBuildCtrl->prevCalculateTime %ld, rtBuildCtrl->replyTime %ld",
rtBuildCtrl->prevCalculateTime, rtBuildCtrl->replyTime)));
return;
}
if ((rtBuildCtrl->buildRate >> SHIFT_SPEED) > 1) {
if (calculateTimeDiff > CALCULATE_INTERVAL_MILLISECONDS || IsRealtimeBuildRtoOverTarget(srcId)) {
rtBuildCtrl->buildRate = LogCtrlCountBigSpeed(rtBuildCtrl->buildRate,
(uint64)(periodTotalBuild / calculateTimeDiff));
rtBuildCtrl->prevCalculateTime = rtBuildCtrl->replyTime;
}
} else {
rtBuildCtrl->buildRate = (uint64)((newBuild / millisecTimeDiff) << SHIFT_SPEED);
}
if (rtBuildCtrl->prevCalculateTime == rtBuildCtrl->replyTime) {
rtBuildCtrl->periodTotalBuild = 0;
} else {
rtBuildCtrl->periodTotalBuild = periodTotalBuild;
}
uint64 buildSpeed = (rtBuildCtrl->buildRate >> SHIFT_SPEED); // units: byte/ms
if (buildSpeed == 0) {
buildSpeed = MIN_BUILD_SPEED;
}
uint64 secRTO = needBuild / buildSpeed / MILLISECONDS_PER_SECONDS;
rtBuildCtrl->currentRTO = secRTO;
ereport(DEBUG4, (errmodule(MOD_RTO_RPO),
errmsg("[On-demand] The RTO estimated is = : %lu seconds, or %lu microseconds. "
"realtimeBuildPtr is %X/%X, prevBuildPtr is %X/%X, calculateTimeDiff is %ld, "
"needBuild is %lu, buildSpeed is %lu, srcId is %d.",
secRTO, (needBuild / buildSpeed),
(uint32)(rtBuildCtrl->realtimeBuildPtr >> 32), (uint32)rtBuildCtrl->realtimeBuildPtr,
(uint32)(rtBuildCtrl->prevBuildPtr >> 32), (uint32)rtBuildCtrl->prevBuildPtr,
millisecTimeDiff, needBuild, buildSpeed, srcId)));
}
const int CDF_RANGE = 2;
const int CDF_LEFT = -CDF_RANGE;
const int CDF_RIGHT = CDF_RANGE;
const double CDF_MEAN = 0;
const double CDF_STDDEV = 1.0;
static inline double GaussianCdf(double x, double mean, double stddev)
{
return 0.5 * erfc(-(x - mean) / (stddev * sqrt(2)));
}
static double CalculateSleepTimeByCdf(int64 currentRTO, int64 targetRTO)
{
double x = ((double)currentRTO - targetRTO / 2) / (double)(targetRTO / 2);
double tx = x * (CDF_RIGHT - CDF_LEFT) - CDF_RANGE;
double range = GaussianCdf(CDF_RIGHT, CDF_MEAN, CDF_STDDEV) - GaussianCdf(CDF_LEFT, CDF_MEAN, CDF_STDDEV);
double fx = GaussianCdf(tx, CDF_MEAN, CDF_STDDEV) - GaussianCdf(CDF_LEFT, CDF_MEAN, CDF_STDDEV);
ereport(DEBUG4, (errmodule(MOD_RTO_RPO),
errmsg("[On-demand] CalculateSleepTimeByCdf currentRTO: %ld, targetRTO: %ld, fx: %lf, "
"range: %lf, fx/range (sleepTime): %lf.",
currentRTO, targetRTO, fx, range, fx / range)));
return fx / range;
}
#define MIN_LOG_CTRL_ENABLE_RTO (g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto / 2)
static int SSRealtimeBuildCalculateSleepTime(int srcId)
{
realtime_build_ctrl_t *rtBuildCtrl = &g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId];
if (!SS_PRIMARY_ENABLE_TARGET_RTO) {
return 0;
}
int maxSleepTime;
int sleepTime = 0;
if (g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto >= NEEDS_LARGE_RANGE) {
maxSleepTime = LONG_LOG_CTRL_SLEEP_MICROSECONDS;
} else {
maxSleepTime = SHORT_LOG_CTRL_SLEEP_MICROSECONDS;
}
if (rtBuildCtrl->currentRTO < MIN_LOG_CTRL_ENABLE_RTO) {
sleepTime = 0;
ereport(DEBUG4, (errmodule(MOD_RTO_RPO),
errmsg("[On-demand] The RTO estimated is = : %lu seconds. sleeptime is = %d microseconds.",
rtBuildCtrl->currentRTO, sleepTime)));
} else if (rtBuildCtrl->currentRTO >= g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto) {
sleepTime = maxSleepTime;
ereport(DEBUG4, (errmodule(MOD_RTO_RPO),
errmsg("[On-demand] The RTO estimated is = : %lu seconds. sleeptime is = %d microseconds.",
rtBuildCtrl->currentRTO, sleepTime)));
} else {
// method 2
int64 targetRTO = (int64)g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto;
sleepTime = (int)(CalculateSleepTimeByCdf(rtBuildCtrl->currentRTO, targetRTO) *maxSleepTime);
sleepTime = Min(maxSleepTime, Max(0, sleepTime));
ereport(DEBUG4, (errmodule(MOD_RTO_RPO),
errmsg("[On-demand] The RTO estimated is = : %lu seconds. sleeptime is = %d microseconds.",
rtBuildCtrl->currentRTO, sleepTime)));
}
return sleepTime;
}
const int LOG_UPDATA_GAP = SHORT_LOG_CTRL_SLEEP_MICROSECONDS / 2;
void SSRealtimeBuildUpdatGlobalSleepTime(int srcId, int localSleepTime)
{
int oldSleepTime = g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime;
int newSleepTime = 0;
SpinLockAcquire(&g_instance.dms_cxt.SSRecoveryInfo.sleepTimeSyncLock);
if (SS_PRIMARY_ENABLE_TARGET_RTO) {
g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[srcId].sleepTime = localSleepTime;
int maxSleepTime = 0;
for (int i = 0; i < DMS_MAX_INSTANCES; i++) {
maxSleepTime = Max(maxSleepTime, g_instance.dms_cxt.SSRecoveryInfo.rtBuildCtrl[i].sleepTime);
}
g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime = maxSleepTime;
} else {
g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime = 0;
}
newSleepTime = g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime;
SpinLockRelease(&g_instance.dms_cxt.SSRecoveryInfo.sleepTimeSyncLock);
if ((oldSleepTime == 0 && newSleepTime > 0) ||
(oldSleepTime > 0 && newSleepTime == 0) ||
(oldSleepTime > LOG_UPDATA_GAP && newSleepTime < LOG_UPDATA_GAP) ||
(oldSleepTime < LOG_UPDATA_GAP && newSleepTime > LOG_UPDATA_GAP) ||
(oldSleepTime != LONG_LOG_CTRL_SLEEP_MICROSECONDS && newSleepTime == LONG_LOG_CTRL_SLEEP_MICROSECONDS) ||
(oldSleepTime != SHORT_LOG_CTRL_SLEEP_MICROSECONDS && newSleepTime == SHORT_LOG_CTRL_SLEEP_MICROSECONDS)) {
ereport(LOG, (errmodule(MOD_RTO_RPO),
errmsg("[On-demand] realtime-build log ctl global sleep time update, "
"oldSleepTime: %d microseconds, newSleepTime: %d microseconds.",
oldSleepTime, newSleepTime)));
}
}
void SSRealtimebuildLogCtrl(int srcId)
{
if (!SS_PRIMARY_ENABLE_TARGET_RTO) {
return;
}
// step 1:calculate rto for instance: srcId
SSRealtimeBuildCalculateCurrentRTO(srcId);
// step 2:calculate sleep time for instance: srcId
int localSleepTime = SSRealtimeBuildCalculateSleepTime(srcId);
// step 3:update global sleep time
SSRealtimeBuildUpdatGlobalSleepTime(srcId, localSleepTime);
}

View File

@ -113,6 +113,7 @@ static int standby_priority_comparator(const void *a, const void *b);
static inline void free_sync_standbys_list(List* sync_standbys);
static int cmp_lsn(const void *a, const void *b);
static bool DelayIntoMostAvaSync(bool checkSyncNum, SyncStandbyNumState state = STANDBIES_EMPTY);
SyncWaitRet SSRealtimeBuildWaitForTime(XLogRecPtr XactCommitLSN);
typedef struct TransContext {
/* for global */
@ -228,6 +229,9 @@ bool SynRepWaitCatchup(XLogRecPtr XactCommitLSN)
*/
SyncWaitRet SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel)
{
if (SS_PRIMARY_ENABLE_TARGET_RTO) {
return SSRealtimeBuildWaitForTime(XactCommitLSN);
}
char *new_status = NULL;
const char *old_status = NULL;
int mode = u_sess->attr.attr_storage.sync_rep_wait_mode;
@ -495,6 +499,58 @@ SyncWaitRet SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel)
return waitStopRes;
}
/*
* Used in realtime-build log ctrl.
* threads will wait for a while befor commit if need log ctrl.
*/
SyncWaitRet SSRealtimeBuildWaitForTime(XLogRecPtr XactCommitLSN)
{
SyncWaitRet waitStopRes = NOT_REQUEST;
/*
* Only wait on prmiary node, and realtimeBuildLogCtrl enable.
*/
if (!SS_PRIMARY_ENABLE_TARGET_RTO) {
return NOT_REQUEST;
}
/* Prevent the unexpected cleanups to be influenced by external interruptions */
HOLD_INTERRUPTS();
/*
* Wait for a while, until globalSleepTime timeout.
*/
const int syncSleepTimeInterval = 1000;
int targetSleepTime = g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime;
if (targetSleepTime > 0) {
ereport(DEBUG4,
(errmsg("SSRealtimeBuildWaitForTime sleep %d microseconds before commit.", targetSleepTime)));
}
if (targetSleepTime == 0) {
return NOT_REQUEST;
}
for (int sleepTime = 0; sleepTime < targetSleepTime; sleepTime += syncSleepTimeInterval) {
int currentSleepTime = g_instance.dms_cxt.SSRecoveryInfo.globalSleepTime;
if (sleepTime >= currentSleepTime) {
waitStopRes = SYNC_COMPLETE;
t_thrd.proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
RESUME_INTERRUPTS();
return REPSYNCED;
} else if (!SS_PRIMARY_ENABLE_TARGET_RTO) {
t_thrd.proc->syncRepState = SYNC_REP_NOT_WAITING;
RESUME_INTERRUPTS();
return STOP_WAIT;
}
pg_usleep(syncSleepTimeInterval);
}
RESUME_INTERRUPTS();
waitStopRes = SYNC_COMPLETE;
t_thrd.proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
return REPSYNCED;
}
void SyncRepCleanupAtProcExit(void)
{
if (t_thrd.proc->syncRepLinks.prev || t_thrd.proc->syncRepLinks.next ||

View File

@ -185,6 +185,7 @@ typedef struct {
* control file into standby node's, when standby node shutdown.
*/
ControlFileData* restoreControlFile;
TimestampTz reportTime;
} LogDispatcher;
typedef struct {

View File

@ -32,6 +32,8 @@ typedef enum {
PARSE_TYPE_SEG,
} XLogRecParseType;
const long MIN_REPLY_MILLISEC_TIME_DIFF = 200;
Size OndemandRecoveryShmemSize(void);
void OndemandRecoveryShmemInit(void);
void OndemandXlogFileIdCacheInit(void);
@ -57,4 +59,10 @@ bool IsRecParseStateHaveChildState(XLogRecParseState *checkState);
void OndemandGlobalXLogMemReleaseIfNeed(RedoMemManager *memctl);
bool OndemandAllowBufAccess();
bool IsRealtimeBuildRtoOverTarget(int srcId);
// return unit: millisecond
long SSLogCtrlCalculateTimeDiff(TimestampTz startTime, TimestampTz endTime);
void SSRealtimebuildLogCtrl(int srcId);
void SSRealtimeBuildUpdateGlobalSyncLSN();
#endif /* ONDEMAND_EXTREME_RTO_REDO_UTILS_H */

View File

@ -184,6 +184,8 @@ typedef enum SSBroadcastOp {
BCAST_CHECK_DB_BACKENDS,
BCAST_SEND_SNAPSHOT,
BCAST_RELOAD_REFORM_CTRL_PAGE,
BCAST_REALTIME_BUILD_LOG_CTRL_ENABLE,
BCAST_REPORT_REALTIME_BUILD_PTR,
BCAST_END
} SSBroadcastOp;

View File

@ -58,6 +58,15 @@
#define SS_ONDEMAND_RECOVERY_TRXN_QUEUE_FULL (ENABLE_DMS && \
g_instance.dms_cxt.SSRecoveryInfo.ondemand_recovery_pause_status == PAUSE_FOR_PRUNE_TRXN_QUEUE)
#define ENABLE_REALTIME_BUILD_TARGET_RTO (ENABLE_DMS && \
g_instance.attr.attr_storage.dms_attr.realtime_build_target_rto > 0 && \
!g_instance.attr.attr_storage.ss_enable_dorado && \
!g_instance.attr.attr_storage.ss_stream_cluster)
#define SS_PRIMARY_ENABLE_TARGET_RTO (ENABLE_REALTIME_BUILD_TARGET_RTO && \
SS_NORMAL_PRIMARY && g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl > 0)
#define SS_STANDBY_ENABLE_TARGET_RTO (SS_NORMAL_STANDBY && \
SS_ONDEMAND_REALTIME_BUILD_NORMAL && g_instance.dms_cxt.SSRecoveryInfo.enableRealtimeBuildLogCtrl > 0)
#define REFORM_CTRL_VERSION 1
typedef struct st_reformer_ctrl {
uint32 version;
@ -128,13 +137,25 @@ typedef struct ondemand_recovery_stat {
uint64 recordItemMemUsed;
} ondemand_recovery_stat;
typedef struct realtime_build_log_ctrl {
int64 currentRTO;
TimestampTz prevReplyTime;
TimestampTz prevCalculateTime;
TimestampTz replyTime;
uint64 periodTotalBuild;
uint64 buildRate;
XLogRecPtr prevBuildPtr;
XLogRecPtr realtimeBuildPtr;
int sleepTime;
} realtime_build_ctrl_t;
typedef struct ss_recovery_info {
bool recovery_pause_flag;
volatile failover_ckpt_status_t failover_ckpt_status;
char recovery_xlog_dir[MAXPGPATH];
int recovery_inst_id;
volatile SSGlobalClusterState cluster_ondemand_status;
char xlog_list[DMS_MAX_INSTANCE][MAXPGPATH];;
char xlog_list[DMS_MAX_INSTANCE][MAXPGPATH];
LWLock* update_seg_lock;
bool new_primary_reset_walbuf_flag;
bool ready_to_startup; // when DB start (except failover), the flag will set true
@ -152,6 +173,10 @@ typedef struct ss_recovery_info {
bool disaster_cluster_promoting; // standby cluster is promoting
volatile ondemand_recovery_pause_status_t ondemand_recovery_pause_status;
bool realtime_build_in_reform; // used to avoid starting realtime build during reform
volatile bool enableRealtimeBuildLogCtrl;
slock_t sleepTimeSyncLock;
volatile int globalSleepTime;
realtime_build_ctrl_t rtBuildCtrl[DMS_MAX_INSTANCES];
} ss_recovery_info_t;
typedef struct ondemand_htab_ctrl {

View File

@ -94,6 +94,17 @@ typedef struct SSBroadcasDbBackendsAck {
int count;
} SSBroadcastDbBackendsAck;
typedef struct SSBroadcastRealtimeBuildLogCtrl {
SSBroadcastOp type; // must be first
bool enableLogCtrl;
} SSBroadcastRealtimeBuildLogCtrl;
typedef struct SSBroadcastRealtimeBuildPtr {
SSBroadcastOp type; // must be first
XLogRecPtr realtimeBuildPtr;
int srcInstId;
} SSBroadcastRealtimeBuildPtr;
Snapshot SSGetSnapshotData(Snapshot snapshot);
CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCommit, bool isMvcc, bool isNest,
Snapshot snapshot, bool* sync);
@ -101,6 +112,8 @@ void SSTransactionIdDidCommit(TransactionId transactionId, bool *ret_did_commit)
void SSTransactionIdIsInProgress(TransactionId transactionId, bool *in_progress);
TransactionId SSMultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask, uint16 t_infomask2);
bool SSGetOldestXminFromAllStandby(TransactionId xmin, TransactionId xmax, CommitSeqNo csn);
void SSBroadcastRealtimeBuildLogCtrlEnable(bool canncelInReform);
bool SSReportRealtimeBuildPtr(XLogRecPtr realtimeBuildPtr);
int SSGetOldestXmin(char *data, uint32 len, char *output_msg, uint32 *output_msg_len);
int SSGetOldestXminAck(SSBroadcastXminAck *ack_data);
void SSIsPageHitDms(RelFileNode& node, BlockNumber page, int pagesNum, uint64 *pageMap, int *bitCount);
@ -125,5 +138,7 @@ int SSUpdateLatestSnapshotOfStandby(char *data, uint32 len, char *output_msg, ui
int SSReloadReformCtrlPage(uint32 len);
void SSRequestAllStandbyReloadReformCtrlPage();
bool SSCanFetchLocalSnapshotTxnRelatedInfo();
int SSUpdateRealtimeBuildLogCtrl(char* data, uint32 len);
int SSGetStandbyRealtimeBuildPtr(char* data, uint32 len);
#endif

View File

@ -104,6 +104,7 @@ typedef struct knl_instance_attr_dms {
bool enable_ondemand_realtime_build;
bool enable_ondemand_recovery;
int ondemand_recovery_mem_size;
int realtime_build_target_rto;
int instance_id;
int recv_msg_pool_size;
char* interconnect_url;

View File

@ -153,6 +153,7 @@ extern int syncrep_scanner_yylex(syncrep_scanner_YYSTYPE* lvalp, YYLTYPE* llocp,
extern void syncrep_scanner_yyerror(const char* message, syncrep_scanner_yyscan_t yyscanner);
extern void AtomicUpdateIfGreater(volatile XLogRecPtr* ptr, XLogRecPtr newVal, bool* result);
extern bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, bool* am_sync, bool check_am_sync = true);
extern SyncWaitRet SSRealtimeBuildWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel);
#ifndef ENABLE_MULTIPLE_NODES
extern void SetXactLastCommitToSyncedStandby(XLogRecPtr recptr);
#endif