!2808 【共享存储】解决failover过程中RecoveryInProgress不符合预期
Merge pull request !2808 from 董宁/ss_fix3_up
This commit is contained in:
@ -1313,11 +1313,6 @@ static int CBFlushCopy(void *db_handle, char *pageid)
|
||||
|
||||
static int CBFailoverPromote(void *db_handle)
|
||||
{
|
||||
Assert(g_instance.dms_cxt.SSClusterState == NODESTATE_NORMAL);
|
||||
g_instance.dms_cxt.SSRecoveryInfo.failover_triggered = true;
|
||||
g_instance.dms_cxt.SSClusterState = NODESTATE_STANDBY_FAILOVER_PROMOTING;
|
||||
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] failover trigger.")));
|
||||
|
||||
SSTriggerFailover();
|
||||
while (true) {
|
||||
if (SSFAILOVER_TRIGGER && g_instance.pid_cxt.StartupPID != 0) {
|
||||
@ -1338,24 +1333,34 @@ static void CBReformStartNotify(void *db_handle, dms_role_t role, unsigned char
|
||||
{
|
||||
SSReformType ss_reform_type = (SSReformType)reform_type;
|
||||
ss_reform_info_t *reform_info = &g_instance.dms_cxt.SSReformInfo;
|
||||
reform_info->dms_role = role;
|
||||
reform_info->in_reform = true;
|
||||
g_instance.dms_cxt.SSClusterState = NODESTATE_NORMAL;
|
||||
g_instance.dms_cxt.SSRecoveryInfo.reform_ready = false;
|
||||
g_instance.dms_cxt.resetSyscache = true;
|
||||
if (ss_reform_type == DMS_REFORM_TYPE_FOR_FAILOVER_OPENGAUSS) {
|
||||
g_instance.dms_cxt.SSRecoveryInfo.in_failover = true;
|
||||
if (role == DMS_ROLE_REFORMER) {
|
||||
// variable set order: SharedRecoveryInProgress -> failover_triggered -> dms_role
|
||||
volatile XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl;
|
||||
SpinLockAcquire(&xlogctl->info_lck);
|
||||
xlogctl->IsRecoveryDone = false;
|
||||
xlogctl->SharedRecoveryInProgress = true;
|
||||
SpinLockRelease(&xlogctl->info_lck);
|
||||
t_thrd.shemem_ptr_cxt.ControlFile->state = DB_IN_CRASH_RECOVERY;
|
||||
pg_memory_barrier();
|
||||
g_instance.dms_cxt.SSRecoveryInfo.failover_triggered = true;
|
||||
g_instance.dms_cxt.SSClusterState = NODESTATE_STANDBY_FAILOVER_PROMOTING;
|
||||
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] failover trigger.")));
|
||||
}
|
||||
}
|
||||
reform_info->dms_role = role;
|
||||
reform_info->in_reform = true;
|
||||
|
||||
ereport(LOG, (errmodule(MOD_DMS),
|
||||
errmsg("[SS reform] dms reform start, role:%d, reform type:%d", role, (int)ss_reform_type)));
|
||||
if (reform_info->dms_role == DMS_ROLE_REFORMER) {
|
||||
if (dss_set_server_status_wrapper(true) != GS_SUCCESS) {
|
||||
ereport(PANIC, (errmodule(MOD_DMS), errmsg("[SS reform] Could not set dssserver flag=read_write")));
|
||||
}
|
||||
if (!SS_MY_INST_IS_MASTER) {
|
||||
// means failover
|
||||
g_instance.dms_cxt.SSRecoveryInfo.reclsn_updated = false;
|
||||
}
|
||||
} else {
|
||||
if (dss_set_server_status_wrapper(false) != GS_SUCCESS) {
|
||||
ereport(PANIC, (errmodule(MOD_DMS), errmsg("[SS reform] Could not set dssserver flag=read_only")));
|
||||
@ -1382,12 +1387,16 @@ static void CBReformStartNotify(void *db_handle, dms_role_t role, unsigned char
|
||||
|
||||
static int CBReformDoneNotify(void *db_handle)
|
||||
{
|
||||
if (g_instance.dms_cxt.SSRecoveryInfo.in_failover) {
|
||||
g_instance.dms_cxt.SSRecoveryInfo.in_failover = false;
|
||||
if (SS_REFORM_REFORMER) {
|
||||
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] failover success, instance:%d become primary.",
|
||||
g_instance.attr.attr_storage.dms_attr.instance_id)));
|
||||
}
|
||||
}
|
||||
/* SSClusterState and in_reform must be set atomically */
|
||||
g_instance.dms_cxt.SSClusterState = NODESTATE_NORMAL;
|
||||
g_instance.dms_cxt.SSReformInfo.in_reform = false;
|
||||
if (g_instance.dms_cxt.SSRecoveryInfo.in_failover) {
|
||||
g_instance.dms_cxt.SSRecoveryInfo.in_failover = false;
|
||||
}
|
||||
g_instance.dms_cxt.SSRecoveryInfo.startup_reform = false;
|
||||
g_instance.dms_cxt.SSRecoveryInfo.restart_failover_flag = false;
|
||||
ereport(LOG,
|
||||
|
||||
@ -72,28 +72,30 @@ void SSWakeupRecovery(void)
|
||||
bool SSRecoveryNodes()
|
||||
{
|
||||
bool result = false;
|
||||
|
||||
if (t_thrd.shemem_ptr_cxt.XLogCtl->IsRecoveryDone &&
|
||||
t_thrd.shemem_ptr_cxt.ControlFile->state == DB_IN_PRODUCTION) {
|
||||
result = true;
|
||||
} else {
|
||||
/* Release my own lock before recovery */
|
||||
SSLockReleaseAll();
|
||||
SSWakeupRecovery();
|
||||
while (true) {
|
||||
if (dms_reform_failed()) {
|
||||
result = false;
|
||||
break;
|
||||
}
|
||||
if (t_thrd.shemem_ptr_cxt.XLogCtl->IsRecoveryDone &&
|
||||
t_thrd.shemem_ptr_cxt.ControlFile->state == DB_IN_PRODUCTION) {
|
||||
result = true;
|
||||
break;
|
||||
}
|
||||
pg_usleep(REFORM_WAIT_TIME);
|
||||
/* Release my own lock before recovery */
|
||||
SSLockReleaseAll();
|
||||
SSWakeupRecovery();
|
||||
while (true) {
|
||||
if (dms_reform_failed()) {
|
||||
result = false;
|
||||
break;
|
||||
}
|
||||
/** why use lock:
|
||||
* time1 startup thread: update IsRecoveryDone, not finish UpdateControlFile
|
||||
* time2 reform_proc: finish reform, think ControlFile is ok
|
||||
* time3 DB crash
|
||||
* time4 read the checkpoint which created before failover. oops, it is wrong
|
||||
*/
|
||||
LWLockAcquire(ControlFileLock, LW_SHARED);
|
||||
if (t_thrd.shemem_ptr_cxt.XLogCtl->IsRecoveryDone &&
|
||||
t_thrd.shemem_ptr_cxt.ControlFile->state == DB_IN_PRODUCTION) {
|
||||
LWLockRelease(ControlFileLock);
|
||||
result = true;
|
||||
break;
|
||||
}
|
||||
LWLockRelease(ControlFileLock);
|
||||
pg_usleep(REFORM_WAIT_TIME);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -276,15 +278,6 @@ void SSTriggerFailover()
|
||||
|
||||
void SShandle_promote_signal()
|
||||
{
|
||||
volatile XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl;
|
||||
SpinLockAcquire(&xlogctl->info_lck);
|
||||
xlogctl->IsRecoveryDone = false;
|
||||
xlogctl->SharedRecoveryInProgress = true;
|
||||
SpinLockRelease(&xlogctl->info_lck);
|
||||
|
||||
t_thrd.shemem_ptr_cxt.ControlFile->state = DB_IN_CRASH_RECOVERY;
|
||||
pg_memory_barrier();
|
||||
|
||||
if (pmState == PM_WAIT_BACKENDS) {
|
||||
g_instance.pid_cxt.StartupPID = initialize_util_thread(STARTUP);
|
||||
Assert(g_instance.pid_cxt.StartupPID != 0);
|
||||
|
||||
@ -184,7 +184,6 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt)
|
||||
dms_cxt->SSRecoveryInfo.failover_triggered = false;
|
||||
dms_cxt->SSRecoveryInfo.new_primary_reset_walbuf_flag = false;
|
||||
dms_cxt->SSRecoveryInfo.skip_redo_replay = false;
|
||||
dms_cxt->SSRecoveryInfo.reclsn_updated = false;
|
||||
dms_cxt->SSRecoveryInfo.ready_to_startup = false;
|
||||
dms_cxt->SSRecoveryInfo.startup_reform = true;
|
||||
dms_cxt->SSRecoveryInfo.restart_failover_flag = false;
|
||||
|
||||
@ -9732,9 +9732,6 @@ void StartupXLOG(void)
|
||||
|
||||
/* init dirty page queue rec lsn to checkpoint.redo */
|
||||
update_dirty_page_queue_rec_lsn(checkPoint.redo, true);
|
||||
if (ENABLE_DMS) {
|
||||
g_instance.dms_cxt.SSRecoveryInfo.reclsn_updated = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* for gtm environment, we need to set the local csn to next xid to increase.
|
||||
@ -10825,11 +10822,10 @@ void StartupXLOG(void)
|
||||
if (SSFAILOVER_TRIGGER || SS_STANDBY_PROMOTING) {
|
||||
if (SSFAILOVER_TRIGGER) {
|
||||
g_instance.dms_cxt.SSRecoveryInfo.failover_triggered = false;
|
||||
g_instance.dms_cxt.SSRecoveryInfo.in_failover = false;
|
||||
pg_memory_barrier();
|
||||
}
|
||||
ereport(LOG, (errmodule(MOD_DMS),
|
||||
errmsg("[SS switchover/failover] standby promoting: start full checkpoint.")));
|
||||
errmsg("[SS switchover/SS failover] standby promoting: start full checkpoint.")));
|
||||
|
||||
RequestCheckpoint(CHECKPOINT_FORCE | CHECKPOINT_IMMEDIATE | CHECKPOINT_WAIT);
|
||||
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
|
||||
@ -10839,7 +10835,8 @@ void StartupXLOG(void)
|
||||
LWLockRelease(ControlFileLock);
|
||||
SSRecheckBufferPool();
|
||||
ereport(LOG, (errmodule(MOD_DMS),
|
||||
errmsg("[SS switchover/failover] standby promoting: finished start checkpoint.")));
|
||||
errmsg("[SS switchover/SS failover] standby promoting: finished full checkpoint"
|
||||
"and update control file")));
|
||||
}
|
||||
|
||||
NextXidAfterReovery = t_thrd.xact_cxt.ShmemVariableCache->nextXid;
|
||||
|
||||
@ -69,8 +69,7 @@
|
||||
|
||||
#define SS_IN_FLUSHCOPY (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy == true)
|
||||
|
||||
#define SS_STANDBY_FAILOVER (((g_instance.dms_cxt.SSClusterState == NODESTATE_NORMAL) \
|
||||
|| (g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_FAILOVER_PROMOTING)) \
|
||||
#define SS_STANDBY_FAILOVER ((g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_FAILOVER_PROMOTING) \
|
||||
&& (g_instance.dms_cxt.SSReformerControl.primaryInstId != SS_MY_INST_ID) \
|
||||
&& SS_REFORM_REFORMER)
|
||||
|
||||
|
||||
@ -29,8 +29,7 @@
|
||||
#define REFORM_CTRL_PAGE DMS_MAX_INSTANCE
|
||||
|
||||
#define RECOVERY_WAIT_TIME 10000
|
||||
#define SSFAILOVER_TRIGGER (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.failover_triggered == true && \
|
||||
g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_FAILOVER_PROMOTING)
|
||||
#define SSFAILOVER_TRIGGER (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.failover_triggered == true)
|
||||
#define SSSKIP_REDO_REPLAY (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.skip_redo_replay == true)
|
||||
#define SS_BEFORE_RECOVERY (ENABLE_DMS && g_instance.dms_cxt.SSReformInfo.in_reform == true \
|
||||
&& g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag == true)
|
||||
@ -54,7 +53,6 @@ typedef struct ss_recovery_info {
|
||||
bool skip_redo_replay;
|
||||
LWLock* update_seg_lock;
|
||||
bool new_primary_reset_walbuf_flag;
|
||||
bool reclsn_updated;
|
||||
bool ready_to_startup; // when DB start (except failover), the flag will set true
|
||||
bool startup_reform; // used to judge DB first start, when first reform finshed set false
|
||||
bool restart_failover_flag; // used to indicate do failover when DB start
|
||||
|
||||
Reference in New Issue
Block a user