enhance log and code

This commit is contained in:
shirley_zhengx
2024-05-13 17:32:58 +08:00
committed by yaoxin
parent 8f1e170229
commit 4da722eb3c
12 changed files with 473 additions and 206 deletions

View File

@ -125,7 +125,7 @@ void SSInitXminInfo()
xmin_info->snap_cache = HeapMemInitHash("DMS snapshot xmin cache", 60, 30000, &ctl,
HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
if (xmin_info->snap_cache == NULL) {
ereport(FATAL, (errmodule(MOD_DMS), errmsg("could not initialize shared xmin_info hash table")));
ereport(FATAL, (errmodule(MOD_DMS), errmsg("[SS] could not initialize shared xmin_info hash table")));
}
}
@ -137,7 +137,7 @@ void DmsAuxiliaryMain(void)
(void)sigdelset(&t_thrd.libpq_cxt.BlockSig, SIGQUIT);
if (sigsetjmp(localSigjmpBuf, 1) != 0) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("dms auxiliary thread exception occured.")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS] dms auxiliary thread exception occured.")));
dms_auxiliary_handle_exception();
}

View File

@ -103,7 +103,7 @@ static void CalcSegDmsPhysicalLoc(BufferDesc* buf_desc, Buffer buffer, bool chec
{
if (IsSegmentFileNode(buf_desc->tag.rnode)) {
SegmentCheck(!IsSegmentPhysicalRelNode(buf_desc->tag.rnode));
ereport(WARNING, (errmsg("buffer:%d is segdata page, bufdesc seginfo is empty", buffer)));
ereport(WARNING, (errmsg("[SS] buffer:%d is segdata page, bufdesc seginfo is empty", buffer)));
SegPageLocation loc = seg_get_physical_location(buf_desc->tag.rnode, buf_desc->tag.forkNum,
buf_desc->tag.blockNum, check_standby);
SegmentCheck(loc.blocknum != InvalidBlockNumber);
@ -130,7 +130,7 @@ bool LockModeCompatible(dms_buf_ctrl_t *buf_ctrl, LWLockMode mode)
compatible = true;
}
} else {
AssertEreport(0, MOD_DMS, "lock mode value is wrong");
AssertEreport(0, MOD_DMS, "[SS] lock mode value is wrong");
}
return compatible;
@ -251,7 +251,7 @@ void SmgrNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, con
}
if (rdStatus == SMGR_RD_CRC_ERROR) {
ereport(WARNING, (errmsg("[%d/%d/%d/%d/%d %d-%d] read from disk error, maybe buffer in flush",
ereport(WARNING, (errmsg("[SS][%d/%d/%d/%d/%d %d-%d] read from disk error, maybe buffer in flush",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
(int)buf_desc->tag.rnode.bucketNode, (int)buf_desc->tag.rnode.opt, buf_desc->tag.forkNum,
buf_desc->tag.blockNum)));
@ -266,7 +266,7 @@ void SmgrNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, con
if (!RecoveryInProgress() && !SS_IN_ONDEMAND_RECOVERY) {
elevel = PANIC;
}
ereport(elevel, (errmsg("[%d/%d/%d/%d/%d %d-%d] memory lsn(0x%llx) is less than disk lsn(0x%llx)",
ereport(elevel, (errmsg("[SS][%d/%d/%d/%d/%d %d-%d] memory lsn(0x%llx) is less than disk lsn(0x%llx)",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
buf_desc->tag.forkNum, buf_desc->tag.blockNum,
(unsigned long long)lsn_on_mem, (unsigned long long)lsn_on_disk)));
@ -282,17 +282,17 @@ Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const X
Buffer buffer;
if (buf_ctrl->state & BUF_NEED_LOAD) {
if (g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy && AmDmsReformProcProcess()) {
ereport(PANIC, (errmsg("SS In flush copy, can't read from disk!")));
ereport(PANIC, (errmsg("[SS] In flush copy, can't read from disk!")));
}
buffer = ReadBuffer_common_for_dms(read_mode, buf_desc, pblk);
} else {
#ifdef USE_ASSERT_CHECKING
if (buf_ctrl->state & BUF_IS_EXTEND) {
ereport(PANIC, (errmsg("extend page should not be tranferred from DMS, "
ereport(PANIC, (errmsg("[SS] extend page should not be tranferred from DMS, "
"and needs to be loaded from disk!")));
}
if (buf_ctrl->been_loaded == false) {
ereport(PANIC, (errmsg("ctrl not marked loaded before transferring from remote")));
ereport(PANIC, (errmsg("[SS] ctrl not marked loaded before transferring from remote")));
}
#endif
@ -409,7 +409,7 @@ void SegNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, SegS
/* maybe some pages are not protected by WAL-Logged */
if ((lsn_on_mem != InvalidXLogRecPtr) && (lsn_on_disk > lsn_on_mem)) {
RelFileNode rnode = buf_desc->tag.rnode;
ereport(PANIC, (errmsg("[%d/%d/%d/%d/%d %d-%d] memory lsn(0x%llx) is less than disk lsn(0x%llx)",
ereport(PANIC, (errmsg("[SS][%d/%d/%d/%d/%d %d-%d] memory lsn(0x%llx) is less than disk lsn(0x%llx)",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
buf_desc->tag.forkNum, buf_desc->tag.blockNum,
(unsigned long long)lsn_on_mem, (unsigned long long)lsn_on_disk)));
@ -535,8 +535,8 @@ bool SSOndemandRequestPrimaryRedo(BufferTag tag)
}
ereport(DEBUG1,
(errmsg("[On-demand] Start request primary node redo page, spc/db/rel/bucket "
"fork-block: %u/%u/%u/%d %d-%u", tag.rnode.spcNode, tag.rnode.dbNode,
(errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Start request primary node redo page.",
tag.rnode.spcNode, tag.rnode.dbNode,
tag.rnode.relNode, tag.rnode.bucketNode, tag.forkNum, tag.blockNum)));
InitDmsContext(&dms_ctx);
dms_ctx.xmap_ctx.dest_id = (unsigned int)SS_PRIMARY_ID;
@ -544,32 +544,32 @@ bool SSOndemandRequestPrimaryRedo(BufferTag tag)
(unsigned int)sizeof(BufferTag), &redo_status) != DMS_SUCCESS) {
SSReadControlFile(REFORM_CTRL_PAGE);
ereport(LOG,
(errmsg("[On-demand] Request primary node redo page timeout, spc/db/rel/bucket "
"fork-block: %u/%u/%u/%d %d-%u", tag.rnode.spcNode, tag.rnode.dbNode,
(errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Request primary node redo page timeout.",
tag.rnode.spcNode, tag.rnode.dbNode,
tag.rnode.relNode, tag.rnode.bucketNode, tag.forkNum, tag.blockNum)));
return false;
}
if (redo_status == ONDEMAND_REDO_DONE) {
ereport(DEBUG1,
(errmsg("[On-demand] Request primary node redo page done, spc/db/rel/bucket "
"fork-block: %u/%u/%u/%d %d-%u", tag.rnode.spcNode, tag.rnode.dbNode,
(errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Request primary node redo page done.",
tag.rnode.spcNode, tag.rnode.dbNode,
tag.rnode.relNode, tag.rnode.bucketNode, tag.forkNum, tag.blockNum)));
} else if (redo_status == ONDEMAND_REDO_SKIP) {
ereport(DEBUG1,
(errmsg("[On-demand] Primary node is not in ondemand recovery now and "
(errmsg("[SS][On-demand] Primary node is not in ondemand recovery now and "
"ignore this redo request, so refresh reform control file")));
SSReadControlFile(REFORM_CTRL_PAGE);
} else if (redo_status == ONDEMAND_REDO_ERROR) {
ereport(PANIC,
(errmsg("[On-demand] Error happend in request primary node redo page, read buffer crash, "
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u", tag.rnode.spcNode, tag.rnode.dbNode,
(errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Error happend in request primary node redo page, "
"read buffer crash.", tag.rnode.spcNode, tag.rnode.dbNode,
tag.rnode.relNode, tag.rnode.bucketNode, tag.forkNum, tag.blockNum)));
return false;
} else if (redo_status == ONDEMAND_REDO_FAIL) {
ereport(WARNING,
(errmsg("[On-demand] Error happend in request primary node redo page, buffer is invalid, "
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u", tag.rnode.spcNode, tag.rnode.dbNode,
(errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Error happend in request primary node redo page, "
"buffer is invalid", tag.rnode.spcNode, tag.rnode.dbNode,
tag.rnode.relNode, tag.rnode.bucketNode, tag.forkNum, tag.blockNum)));
return false;
}
@ -655,7 +655,7 @@ int SSLockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
int ret = dms_broadcast_opengauss_ddllock(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastDDLLock),
(unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS, (unsigned char)LOCK_NORMAL_MODE);
if (ret != DMS_SUCCESS) {
ereport(WARNING, (errmsg("SS broadcast DDLLockRelease request failed!")));
ereport(WARNING, (errmsg("[SS broadcast] DDLLockRelease request failed!")));
}
t_thrd.postgres_cxt.whereToSendOutput = output_backup;
@ -674,7 +674,7 @@ void SSLockReleaseAll()
int ret = dms_broadcast_opengauss_ddllock(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastCmdOnly),
(unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS, (unsigned char)LOCK_RELEASE_SELF);
if (ret != DMS_SUCCESS) {
ereport(DEBUG1, (errmsg("SS broadcast DDLLockReleaseAll request failed!")));
ereport(DEBUG1, (errmsg("[SS broadcast] DDLLockReleaseAll request failed!")));
}
t_thrd.postgres_cxt.whereToSendOutput = output_backup;
@ -697,7 +697,7 @@ void SSLockAcquireAll()
LOCK *lock = proclock->tag.myLock;
int ret = SSLockAcquire(&(lock->tag), AccessExclusiveLock, false, false, LOCK_REACQUIRE);
if (ret) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("SS Broadcast LockAcquire when reform finished failed")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS Broadcast] LockAcquire when reform finished failed")));
}
}
}
@ -756,7 +756,7 @@ void SSRecheckBufferPool()
mode = PANIC;
#endif
ereport(mode,
(errmsg("Buffer was not flushed or replayed, spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx)",
(errmsg("[SS][%u/%u/%u/%d %d-%u] Buffer was not flushed or replayed, page lsn (0x%llx)",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, (unsigned long long)pagelsn)));
}
@ -777,7 +777,7 @@ bool CheckPageNeedSkipInRecovery(Buffer buf, uint64 xlogLsn)
securec_check(err, "\0", "\0");
int ret = dms_recovery_page_need_skip(pageid, xlogLsn, (unsigned char *)&skip);
if (ret != DMS_SUCCESS) {
ereport(PANIC, (errmsg("DMS Internal error happened during recovery, errno %d", ret)));
ereport(PANIC, (errmsg("[SS] DMS Internal error happened during recovery, errno %d", ret)));
}
return skip;
@ -854,7 +854,7 @@ bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer)
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf - 1);
if (buf_ctrl->seg_fileno != EXTENT_INVALID && (buf_ctrl->seg_fileno != buf_desc->extra->seg_fileno ||
buf_ctrl->seg_blockno != buf_desc->extra->seg_blockno)) {
ereport(PANIC, (errmsg("It seemd physical location of drc not match with buf desc!")));
ereport(PANIC, (errmsg("[SS] It seemd physical location of drc not match with buf desc!")));
}
seg_physical_read(reln->seg_space, fakenode, forknum, buf_desc->extra->seg_blockno, (char *)buffer);
@ -949,8 +949,8 @@ bool SSHelpFlushBufferIfNeed(BufferDesc* buf_desc)
XLogRecPtr pagelsn = BufferGetLSN(buf_desc);
if (!SS_IN_REFORM && !SS_IN_ONDEMAND_RECOVERY) {
ereport(PANIC,
(errmsg("[SS] this buffer should not exist with BUF_DIRTY_NEED_FLUSH but not in reform, "
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx), seg info:%u-%u",
(errmsg("[SS][%u/%u/%u/%d %d-%u] this buffer should not exist with BUF_DIRTY_NEED_FLUSH "
"but not in reform, page lsn (0x%llx), seg info:%u-%u",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
(unsigned long long)pagelsn, (unsigned int)buf_desc->extra->seg_fileno,
@ -959,13 +959,12 @@ bool SSHelpFlushBufferIfNeed(BufferDesc* buf_desc)
bool in_flush_copy = SS_IN_FLUSHCOPY;
bool in_recovery = !g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag;
ereport(LOG,
(errmsg("[SS flush copy] ready to flush buffer with need flush, "
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx), seg info:%u-%u, reform phase "
"is in flush_copy:%d, in recovery:%d",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
(unsigned long long)pagelsn, (unsigned int)buf_desc->extra->seg_fileno, buf_desc->extra->seg_blockno,
in_flush_copy, in_recovery)));
(errmsg("[SS][%u/%u/%u/%d %d-%u] flush copy: ready to flush buffer with need flush, "
"page lsn (0x%llx), seg info:%u-%u, reform phase is in flush_copy:%d, in recovery:%d ",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
(unsigned long long)pagelsn, (unsigned int)buf_desc->extra->seg_fileno, buf_desc->extra->seg_blockno,
in_flush_copy, in_recovery)));
if (IsSegmentBufferID(buf_desc->buf_id)) {
return SSTrySegFlushBuffer(buf_desc);
} else {

View File

@ -67,13 +67,16 @@ void SSWakeupRecovery(void)
return;
}
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] wait pagewriter thread start")));
while (pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) != thread_num) {
/* No need to wait pagewriter thread start because there is no xlog to need to recovery when db start. */
if (!RecoveryInProgress()) {
need_recovery = false;
break;
}
pg_usleep(REFORM_WAIT_TIME);
}
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] wait pagewriter thread start: succsess")));
if (need_recovery) {
g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag = false;
@ -124,7 +127,7 @@ static CommitSeqNo TransactionWaitCommittingCSN(dms_opengauss_xid_csn_t *xid_csn
while (COMMITSEQNO_IS_COMMITTING(csn)) {
if (looped && isCommit) {
ereport(DEBUG1,
(errmodule(MOD_DMS), errmsg("committed SS xid %lu's csn %lu"
(errmodule(MOD_DMS), errmsg("[SS] committed SS xid %lu's csn %lu"
"is changed to FROZEN after lockwait.", xid, csn)));
CSNLogSetCommitSeqNo(xid, 0, NULL, COMMITSEQNO_FROZEN);
SetLatestFetchState(xid, COMMITSEQNO_FROZEN);
@ -132,7 +135,7 @@ static CommitSeqNo TransactionWaitCommittingCSN(dms_opengauss_xid_csn_t *xid_csn
return COMMITSEQNO_FROZEN;
} else if (looped && !isCommit) {
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("SS XID %lu's csn %lu is changed to ABORT after lockwait.", xid, csn)));
errmsg("[SS] SS XID %lu's csn %lu is changed to ABORT after lockwait.", xid, csn)));
/* recheck if transaction id is finished */
RecheckXidFinish(xid, csn);
CSNLogSetCommitSeqNo(xid, 0, NULL, COMMITSEQNO_ABORTED);
@ -146,7 +149,7 @@ static CommitSeqNo TransactionWaitCommittingCSN(dms_opengauss_xid_csn_t *xid_csn
if (latestCSN >= snapshotcsn) {
ereport(DEBUG1,
(errmodule(MOD_DMS), errmsg(
"snapshotcsn %lu < csn %lu stored in CSNLog, TXN invisible, no need to sync wait, XID %lu",
"[SS] snapshotcsn %lu < csn %lu stored in CSNLog, TXN invisible, no need to sync wait, XID %lu",
snapshotcsn,
latestCSN,
xid)));
@ -338,7 +341,7 @@ static int CBGetCurrModeAndLockBuffer(void *db_handle, int buffer, unsigned char
Assert(*curr_mode == LW_EXCLUSIVE || *curr_mode == LW_SHARED);
LockBuffer((Buffer)buffer, lock_mode); // BUFFER_LOCK_UNLOCK, BUFFER_LOCK_SHARE or BUFFER_LOCK_EXCLUSIVE
ereport(LOG, (errmodule(MOD_DMS),
errmsg("SS lock buf success, buffer=%d, mode=%hhu, curr_mode=%hhu", buffer, lock_mode, *curr_mode)));
errmsg("[SS] SS lock buf success, buffer=%d, mode=%hhu, curr_mode=%hhu", buffer, lock_mode, *curr_mode)));
return DMS_SUCCESS;
}
@ -353,7 +356,7 @@ static void SSHandleReformFailDuringDemote(DemoteMode demote_mode)
{
ereport(WARNING,
(errmodule(MOD_DMS),
errmsg("[SS switchover] Failure in %s primary demote, pmState=%d, need reform rcy.",
errmsg("[SS reform][SS switchover] Failure in %s primary demote, pmState=%d, need reform rcy.",
DemoteModeDesc(demote_mode), pmState)));
/*
@ -364,7 +367,7 @@ static void SSHandleReformFailDuringDemote(DemoteMode demote_mode)
if (CheckpointInProgress() || pmState >= PM_SHUTDOWN) {
ereport(WARNING,
(errmodule(MOD_DMS),
errmsg("[SS switchover DFX] reform failed after shutdown ckpt has started, exit now")));
errmsg("[SS reform][SS switchover] reform failed after shutdown ckpt has started, exit now")));
_exit(0);
}
@ -382,7 +385,7 @@ static int CBSwitchoverDemote(void *db_handle)
SpinLockAcquire(&t_thrd.walsender_cxt.WalSndCtl->mutex);
if (t_thrd.walsender_cxt.WalSndCtl->demotion > NoDemote) {
SpinLockRelease(&t_thrd.walsender_cxt.WalSndCtl->mutex);
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS switchover] master is doing switchover,"
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS switchover] master is doing switchover,"
" probably standby already requested switchover.")));
return DMS_SUCCESS;
}
@ -395,7 +398,7 @@ static int CBSwitchoverDemote(void *db_handle)
SpinLockRelease(&t_thrd.walsender_cxt.WalSndCtl->mutex);
ereport(LOG,
(errmodule(MOD_DMS), errmsg("[SS switchover] Recv %s demote request from DMS reformer.",
(errmodule(MOD_DMS), errmsg("[SS reform][SS switchover] Recv %s demote request from DMS reformer.",
DemoteModeDesc(demote_mode))));
SendPostmasterSignal(PMSIGNAL_DEMOTE_PRIMARY);
@ -405,8 +408,8 @@ static int CBSwitchoverDemote(void *db_handle)
if (pmState == PM_RUN && g_instance.dms_cxt.SSClusterState == NODESTATE_PROMOTE_APPROVE) {
SSResetDemoteReqType();
ereport(LOG,
(errmodule(MOD_DMS), errmsg("[SS switchover] Success in %s primary demote, running as standby,"
" waiting for reformer setting new role.", DemoteModeDesc(demote_mode))));
(errmodule(MOD_DMS), errmsg("[SS reform][SS switchover] Success in %s primary demote, running as"
"standby, waiting for reformer setting new role.", DemoteModeDesc(demote_mode))));
return DMS_SUCCESS;
} else {
if (ntries >= WAIT_DEMOTE || dms_reform_failed()) {
@ -431,7 +434,7 @@ static int CBSwitchoverPromote(void *db_handle, unsigned char origPrimaryId)
t_thrd.shemem_ptr_cxt.XLogCtl->SharedRecoveryInProgress = true;
t_thrd.shemem_ptr_cxt.ControlFile->state = DB_IN_CRASH_RECOVERY;
pg_memory_barrier();
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS switchover] Starting to promote standby.")));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS switchover] Starting to promote standby.")));
SSNotifySwitchoverPromote();
@ -441,12 +444,12 @@ static int CBSwitchoverPromote(void *db_handle, unsigned char origPrimaryId)
/* flush control file primary id in advance to save new standby's waiting time */
SSSavePrimaryInstId(SS_MY_INST_ID);
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS switchover] Standby promote: success, set new primary:%d.", SS_MY_INST_ID)));
errmsg("[SS reform][SS switchover] Standby promote: success, set new primary:%d.", SS_MY_INST_ID)));
return DMS_SUCCESS;
} else {
if (ntries >= WAIT_PROMOTE || dms_reform_failed()) {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS switchover] Standby promote timeout, please try again later.")));
errmsg("[SS reform][SS switchover] Standby promote timeout, please try again later.")));
SSWaitStartupExit();
return DMS_ERROR;
}
@ -464,7 +467,7 @@ static void CBSwitchoverResult(void *db_handle, int result)
{
if (result == DMS_SUCCESS) {
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS switchover] Switchover success, letting reformer update roles.")));
errmsg("[SS reform][SS switchover] Switchover success, letting reformer update roles.")));
return;
} else {
/* abort and restore state */
@ -472,7 +475,8 @@ static void CBSwitchoverResult(void *db_handle, int result)
if (SS_DISASTER_STANDBY_CLUSTER) {
g_instance.dms_cxt.SSReformInfo.in_reform = false;
}
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS switchover] Switchover failed, errno: %d.", result)));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform][SS switchover] Switchover failed,"
"errno: %d.", result)));
}
}
@ -536,6 +540,7 @@ static int CBSaveStableList(void *db_handle, unsigned long long list_stable, uns
SSUpdateReformerCtrl();
LWLockRelease(ControlFileLock);
Assert(g_instance.dms_cxt.SSReformerControl.primaryInstId == (int)primary_id);
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS %s] set current instance:%d as primary, list_stable:%llu.",
SS_PERFORMING_SWITCHOVER ? "switchover" : "reform", primary_id, list_stable)));
ret = DMS_SUCCESS;
@ -599,7 +604,7 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
BlockNumber spc_nblocks = spc_size(spc, relfilenode.relNode, tag->forkNum);
if (tag->blockNum >= spc_nblocks) {
ereport(PANIC, (errmodule(MOD_DMS),
errmsg("unexpected blocknum %u >= spc nblocks %u", tag->blockNum, spc_nblocks)));
errmsg("[SS] unexpected blocknum %u >= spc nblocks %u", tag->blockNum, spc_nblocks)));
}
}
#endif
@ -640,7 +645,7 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
if (!(pg_atomic_read_u64(&buf_desc->state) & BM_VALID)) {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[%d/%d/%d/%d %d-%d] try enter page failed, buffer is not valid, state = 0x%lx",
errmsg("[SS page][%d/%d/%d/%d %d-%d] try enter page failed, buffer is not valid, state = 0x%lx",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_desc->state)));
DmsReleaseBuffer(buf_desc->buf_id + 1, is_seg);
@ -651,7 +656,7 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
if (pg_atomic_read_u64(&buf_desc->state) & BM_IO_ERROR) {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[%d/%d/%d/%d %d-%d] try enter page failed, buffer is io error, state = 0x%lx",
errmsg("[SS page][%d/%d/%d/%d %d-%d] try enter page failed, buffer is io error, state = 0x%lx",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_desc->state)));
DmsReleaseBuffer(buf_desc->buf_id + 1, is_seg);
@ -678,14 +683,14 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
LWLockRelease(buf_desc->content_lock);
DmsReleaseBuffer(buf_desc->buf_id + 1, is_seg);
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[%u/%u/%u/%d %d-%u] been_loaded marked false, page swapped out and failed to load",
errmsg("[SS page][%u/%u/%u/%d %d-%u] been_loaded marked false, page swapped out and failed to load",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum)));
break;
}
if ((*buf_ctrl)->lock_mode == DMS_LOCK_NULL) {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[%u/%u/%u/%d %d-%u] lock mode is null, still need to transfer page",
errmsg("[SS page][%u/%u/%u/%d %d-%u] lock mode is null, still need to transfer page",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum)));
} else if (buf_desc->extra->seg_fileno != EXTENT_INVALID) {
@ -784,9 +789,9 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
if (!(buf_state & BM_VALID) || (buf_state & BM_IO_ERROR)) {
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[%d/%d/%d/%d %d-%d] invalidate page, buffer is not valid or io error, state = 0x%lx",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_desc->state)));
errmsg("[SS page][%d/%d/%d/%d %d-%d] invalidate page, buffer is not valid or io error, "
"state = 0x%lx", tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode,
tag->rnode.bucketNode, tag->forkNum, tag->blockNum, buf_desc->state)));
UnlockBufHdr(buf_desc, buf_state);
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
buf_ctrl->seg_fileno = EXTENT_INVALID;
@ -800,9 +805,9 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
XLogRecPtrIsValid(pg_atomic_read_u64(&buf_desc->extra->rec_lsn)) ||
(buf_ctrl->state & BUF_DIRTY_NEED_FLUSH)) {
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("[%d/%d/%d/%d %d-%d] invalidate owner rejected, buffer is dirty/permanent, state = 0x%lx",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_desc->state)));
errmsg("[SS page][%d/%d/%d/%d %d-%d] invalidate owner rejected, buffer is dirty/permanent, "
"state = 0x%lx", tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode,
tag->rnode.bucketNode, tag->forkNum, tag->blockNum, buf_desc->state)));
ret = DMS_ERROR;
} else {
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
@ -839,9 +844,9 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
if ((!(pg_atomic_read_u64(&buf_desc->state) & BM_VALID)) ||
(pg_atomic_read_u64(&buf_desc->state) & BM_IO_ERROR)) {
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[%d/%d/%d/%d %d-%d] invalidate page, buffer is not valid or io error, state = 0x%lx",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_desc->state)));
errmsg("[SS page][%d/%d/%d/%d %d-%d] invalidate page, buffer is not valid or io error, "
"state = 0x%lx", tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode,
tag->rnode.bucketNode, tag->forkNum, tag->blockNum, buf_desc->state)));
DmsReleaseBuffer(buf_id + 1, IsSegmentBufferID(buf_id));
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
buf_ctrl->seg_fileno = EXTENT_INVALID;
@ -852,8 +857,8 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
get_lock = SSLWLockAcquireTimeout(buf_desc->content_lock, LW_EXCLUSIVE);
if (!get_lock) {
ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, "
"buf_id:%d, lwlock:%p",
ereport(WARNING, (errmodule(MOD_DMS), errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] "
"request LWLock timeout, buf_id:%d, lwlock:%p",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_id, buf_desc->content_lock))));
ret = GS_TIMEOUT;
@ -878,7 +883,7 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
ErrorData* edata = CopyErrorData();
FlushErrorState();
FreeErrorData(edata);
ereport(WARNING, (errmsg("[CBInvalidatePage] Error happend, spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u",
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS][%d/%d/%d/%d %d-%d] CBInvalidatePage: Error happend.",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, tag->forkNum,
tag->blockNum)));
ReleaseResource();
@ -912,7 +917,7 @@ static void CBVerifyPage(dms_buf_ctrl_t *buf_ctrl, char *new_page)
buf_desc->extra->seg_blockno = buf_ctrl->seg_blockno;
} else if (buf_desc->extra->seg_fileno != buf_ctrl->seg_fileno ||
buf_desc->extra->seg_blockno != buf_ctrl->seg_blockno) {
ereport(PANIC, (errmsg("[%u/%u/%u/%d/%d %d-%u] location mismatch, seg_fileno:%d, seg_blockno:%u",
ereport(PANIC, (errmodule(MOD_DMS), errmsg("[SS page][%u/%u/%u/%d/%d %d-%u] location mismatch, seg_fileno:%d, seg_blockno:%u",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.rnode.opt, buf_desc->tag.forkNum,
buf_desc->tag.blockNum, buf_desc->extra->seg_fileno, buf_desc->extra->seg_blockno)));
@ -934,7 +939,7 @@ static void CBVerifyPage(dms_buf_ctrl_t *buf_ctrl, char *new_page)
XLogRecPtr lsn_now = PageGetLSN(new_page);
if ((lsn_now != InvalidXLogRecPtr) && (lsn_past > lsn_now)) {
RelFileNode rnode = buf_desc->tag.rnode;
ereport(PANIC, (errmsg("[%d/%d/%d/%d/%d %d-%d] now lsn(0x%llx) is less than past lsn(0x%llx)",
ereport(PANIC, (errmodule(MOD_DMS), errmsg("[SS page][%d/%d/%d/%d/%d %d-%d] now lsn(0x%llx) is less than past lsn(0x%llx)",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
buf_desc->tag.forkNum, buf_desc->tag.blockNum,
(unsigned long long)lsn_now, (unsigned long long)lsn_past)));
@ -1112,7 +1117,7 @@ static void CBMemReset(void *context)
static int32 CBProcessLockAcquire(char *data, uint32 len)
{
if (unlikely(len != sizeof(SSBroadcastDDLLock))) {
ereport(DEBUG1, (errmsg("invalid broadcast ddl lock message")));
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[SS] invalid broadcast ddl lock message")));
return DMS_ERROR;
}
@ -1127,7 +1132,7 @@ static int32 CBProcessLockAcquire(char *data, uint32 len)
{
t_thrd.int_cxt.InterruptHoldoffCount = saveInterruptHoldoffCount;
res = LOCKACQUIRE_NOT_AVAIL;
ereport(WARNING, (errmsg("SS Standby process DDLLockAccquire got in PG_CATCH")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS] Standby process DDLLockAccquire got in PG_CATCH")));
if (t_thrd.role == DMS_WORKER) {
FlushErrorState();
}
@ -1135,7 +1140,7 @@ static int32 CBProcessLockAcquire(char *data, uint32 len)
PG_END_TRY();
if (!(ssmsg->dontWait) && res == LOCKACQUIRE_NOT_AVAIL) {
ereport(WARNING, (errmsg("SS process DDLLockAccquire request failed!")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS] process DDLLockAccquire request failed!")));
return DMS_ERROR;
}
return DMS_SUCCESS;
@ -1144,7 +1149,7 @@ static int32 CBProcessLockAcquire(char *data, uint32 len)
static int32 CBProcessLockRelease(char *data, uint32 len)
{
if (unlikely(len != sizeof(SSBroadcastDDLLock))) {
ereport(DEBUG1, (errmsg("invalid lock release message")));
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[SS lock] invalid lock release message")));
return DMS_ERROR;
}
@ -1159,7 +1164,7 @@ static int32 CBProcessLockRelease(char *data, uint32 len)
{
t_thrd.int_cxt.InterruptHoldoffCount = saveInterruptHoldoffCount;
res = DMS_ERROR;
ereport(WARNING, (errmsg("SS process DDLLockRelease request failed!")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS lock] process DDLLockRelease request failed!")));
if (t_thrd.role == DMS_WORKER) {
FlushErrorState();
}
@ -1187,7 +1192,7 @@ static int32 CBProcessReleaseAllLock(uint32 len)
{
t_thrd.int_cxt.InterruptHoldoffCount = saveInterruptHoldoffCount;
res = DMS_ERROR;
ereport(WARNING, (errmsg("SS process DDLLockReleaseAll request failed!")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS lock] process DDLLockReleaseAll request failed!")));
if (t_thrd.role == DMS_WORKER) {
FlushErrorState();
}
@ -1249,7 +1254,7 @@ static int32 CBProcessBroadcast(void *db_handle, dms_broadcast_context_t *broad_
ret = SSReloadReformCtrlPage(len);
break;
default:
ereport(WARNING, (errmodule(MOD_DMS), errmsg("invalid broadcast operate type")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS] invalid broadcast operate type")));
ret = DMS_ERROR;
break;
}
@ -1278,7 +1283,7 @@ static int32 CBProcessBroadcastAck(void *db_handle, dms_broadcast_context_t *bro
ret = SSCheckDbBackendsAck(data, len);
break;
default:
ereport(WARNING, (errmodule(MOD_DMS), errmsg("invalid broadcast ack type")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS] invalid broadcast ack type")));
ret = DMS_ERROR;
}
return ret;
@ -1292,6 +1297,7 @@ static int CBGetDmsStatus(void *db_handle)
static void CBSetDmsStatus(void *db_handle, int dms_status)
{
g_instance.dms_cxt.dms_status = (dms_status_t)dms_status;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] set dms status: dmsStatus=%d.", g_instance.dms_cxt.dms_status)));
}
static int32 SSBufRebuildOneDrcInternal(BufferDesc *buf_desc, unsigned char thread_index)
@ -1316,7 +1322,7 @@ static int32 SSBufRebuildOneDrcInternal(BufferDesc *buf_desc, unsigned char thre
ctrl_info.is_dirty = SSBufferIsDirty(buf_desc);
int ret = dms_buf_res_rebuild_drc_parallel(&dms_ctx, &ctrl_info, thread_index);
if (ret != DMS_SUCCESS) {
ereport(WARNING, (errmsg("[%u/%u/%u/%d %d-%u][SS reform] rebuild page failed.",
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform][%u/%u/%u/%d %d-%u] rebuild page: failed.",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum)));
return ret;
@ -1343,7 +1349,7 @@ static int SSBufRebuildOneDrc(int index, unsigned char thread_index)
buf_ctrl->lock_mode = DMS_LOCK_NULL;
LWLockRelease((LWLock*)buf_ctrl->ctrl_lock);
need_rebuild = false;
ereport(DEBUG5, (errmsg("[%u/%u/%u/%d %d-%u][SS reform] no need rebuild, set lock_mode NULL.",
ereport(DEBUG5, (errmodule(MOD_DMS), errmsg("[SS reform][%u/%u/%u/%d %d-%u] no need rebuild, set lock_mode NULL.",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum)));
}
@ -1372,14 +1378,14 @@ static int32 CBBufRebuildDrcInternal(int begin, int len, unsigned char thread_in
return ret;
}
}
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform] rebuild buf thread_index:%d, buf_if start from:%d to:%d, max_buf_id:%d",
(int)thread_index, begin, end, (TOTAL_BUFFER_NUM - 1))));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] rebuild page: success."
"rebuild buf thread_index:%d, buf_if start from:%d to:%d, max_buf_id:%d.",
(int)thread_index, begin, end, (TOTAL_BUFFER_NUM - 1))));
return GS_SUCCESS;
}
/*
* as you can see, thread_num represets the number of thread. thread_index reprsents the n-th thread, begin from 0.
* As you can see, thread_num represents the number of thread. thread_index reprsents the n-th thread, begin from 0.
* special case:
* when parallel disable, rebuild phase still call this function,
* do you think thread_num is 1, and thread_index is 0 ?
@ -1414,7 +1420,7 @@ static int32 CBDrcBufValidate(void *db_handle)
uint64 buf_state;
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform]CBDrcBufValidate starts before reform done.")));
errmsg("[SS reform] CBDrcBufValidate starts before reform done.")));
for (int i = 0; i < TOTAL_BUFFER_NUM; i++) {
BufferDesc *buf_desc = GetBufferDescriptor(i);
buf_state = LockBufHdr(buf_desc);
@ -1426,7 +1432,7 @@ static int32 CBDrcBufValidate(void *db_handle)
}
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform]CBDrcBufValidate %d buffers success.", buf_cnt)));
errmsg("[SS reform] CBDrcBufValidate %d buffers success.", buf_cnt)));
return GS_SUCCESS;
}
@ -1447,7 +1453,7 @@ static BufferDesc* SSGetBufferDesc(char *pageid)
BlockNumber spc_nblocks = spc_size(spc, relfilenode.relNode, tag->forkNum);
if (tag->blockNum >= spc_nblocks) {
ereport(PANIC, (errmodule(MOD_DMS),
errmsg("unexpected blocknum %u >= spc nblocks %u", tag->blockNum, spc_nblocks)));
errmsg("[SS] unexpected blocknum %u >= spc nblocks %u", tag->blockNum, spc_nblocks)));
}
}
#endif
@ -1520,16 +1526,22 @@ static int CBGetStableList(void *db_handle, unsigned long long *list_stable, uns
static int CBStartup(void *db_handle)
{
g_instance.dms_cxt.SSRecoveryInfo.ready_to_startup = true;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] initialize startup: Node %d set ready_to_startup to true.",
SS_MY_INST_ID)));
return GS_SUCCESS;
}
static int CBRecoveryStandby(void *db_handle, int inst_id)
{
Assert(inst_id == g_instance.attr.attr_storage.dms_attr.instance_id);
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] Recovery as standby")));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] recovery: Recovery as standby start")));
if (!SSRecoveryNodes()) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform] Recovery failed")));
/*
* Because which can process failed condition is dms reform proc, so no errors can occurs here.
* If setting error is accept, That database exits before dms-reform maybe happen.
*/
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform] recovery: Fail")));
return GS_ERROR;
}
@ -1538,18 +1550,24 @@ static int CBRecoveryStandby(void *db_handle, int inst_id)
static int CBRecoveryPrimary(void *db_handle, int inst_id)
{
char* type_string = NULL;
type_string = SSGetLogHeaderTypeStr();
Assert(g_instance.dms_cxt.SSReformerControl.primaryInstId == inst_id ||
g_instance.dms_cxt.SSReformerControl.primaryInstId == -1);
g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy = false;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] Recovery as primary, will replay xlog from inst:%d",
g_instance.dms_cxt.SSReformerControl.primaryInstId)));
ereport(LOG, (errmodule(MOD_DMS), errmsg("%s recovery: Recovery as primary start, will replay xlog "
"from inst:%d", type_string, g_instance.dms_cxt.SSReformerControl.primaryInstId)));
/* Release my own lock before recovery */
SSLockReleaseAll();
SSWakeupRecovery();
if (!SSRecoveryNodes()) {
g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag = true;
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform] Recovery failed")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("%s recovery: Failed. pmstate=%d, SSClusterState=%d, "
"demotion=%d-%d, rec=%d", type_string, pmState, g_instance.dms_cxt.SSClusterState,
g_instance.demotion, t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery)));
return GS_ERROR;
}
@ -1559,10 +1577,16 @@ static int CBRecoveryPrimary(void *db_handle, int inst_id)
static int CBFlushCopy(void *db_handle, char *pageid)
{
// only 1) primary restart 2) failover need flush_copy
/*
* only two occasions
* 1) primary restart: SS_REFORM_REFORMER, dms_status not is DMS_STATUS_IN.
* 2) failover need flush_copy
*/
if (SS_REFORM_REFORMER && g_instance.dms_cxt.dms_status == DMS_STATUS_IN && !SS_STANDBY_FAILOVER) {
return GS_SUCCESS;
}
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] flush copy start: This step is occurs"
"only in primary restart and flush copy.")));
if (SS_REFORM_REFORMER && !g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy) {
g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy = true;
@ -1584,16 +1608,12 @@ static int CBFlushCopy(void *db_handle, char *pageid)
ErrorData* edata = CopyErrorData();
FlushErrorState();
FreeErrorData(edata);
ereport(PANIC, (errmsg("[SS flush copy] Error happend, spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u",
ereport(PANIC, (errmodule(MOD_DMS), errmsg("[SS reform][%u/%u/%u/%d %d-%u] flush copy: Error happend",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum)));
}
PG_END_TRY();
/**
* when remote DB instance reboot, this round reform fail
* primary node may fail to get page from remote node which reboot, this phase should return fail
*/
if (BufferIsInvalid(buffer)) {
if (dms_reform_failed()) {
SSWaitStartupExit();
@ -1602,7 +1622,11 @@ static int CBFlushCopy(void *db_handle, char *pageid)
Assert(0);
}
}
/*
* when remote DB instance reboot, this round reform fail
* primary node may fail to get page from remote node which reboot, this phase should return fail
*/
Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo));
LockBuffer(buffer, BUFFER_LOCK_SHARE);
if (t_thrd.dms_cxt.flush_copy_get_page_failed) {
@ -1615,11 +1639,12 @@ static int CBFlushCopy(void *db_handle, char *pageid)
if (XLByteLT(g_instance.dms_cxt.ckptRedo, pagelsn)) {
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buffer - 1);
buf_ctrl->state |= BUF_DIRTY_NEED_FLUSH;
ereport(LOG, (errmsg("[SS] Mark need flush in flush copy, spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx)",
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][%u/%u/%u/%d %d-%u] mark need flush in flush copy:"
"page lsn (0x%llx), buf_ctrl.state: %lu",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, (unsigned long long)pagelsn)));
tag->forkNum, tag->blockNum, (unsigned long long)pagelsn, buf_ctrl->state)));
} else {
ereport(LOG, (errmsg("[SS] ready to flush copy, spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx)",
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][%u/%u/%u/%d %d-%u] ready to flush copy, page lsn (0x%llx)",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, (unsigned long long)pagelsn)));
}
@ -1632,10 +1657,13 @@ static void SSFailoverPromoteNotify()
{
if (g_instance.dms_cxt.SSRecoveryInfo.startup_reform) {
g_instance.dms_cxt.SSRecoveryInfo.restart_failover_flag = true;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] do failover when DB restart.")));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] SSFailoverPromoteNotify:"
"set restart_failover_flag to %s when DB restart.",
g_instance.dms_cxt.SSRecoveryInfo.restart_failover_flag ? "true" : "false")));
} else {
SendPostmasterSignal(PMSIGNAL_DMS_FAILOVER_STARTUP);
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] do failover when DB alive")));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] SSFailoverPromoteNotify:"
"send signal to PM to initialize startup thread when DB alive")));
}
}
@ -1643,13 +1671,22 @@ static int CBFailoverPromote(void *db_handle)
{
SSClearSegCache();
SSFailoverPromoteNotify();
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] wait startup thread start.")));
long max_wait_time = 30000000L;
long wait_time = 0;
while (true) {
if (SS_STANDBY_FAILOVER && g_instance.pid_cxt.StartupPID != 0) {
ereport(LOG, (errmodule(MOD_DMS), errmsg("startup thread success.")));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] startup thread success.")));
return GS_SUCCESS;
}
if ((wait_time % max_wait_time) == 0 && wait_time != 0) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] wait startup thread to"
"start successfully for %ld us.", wait_time)));
}
pg_usleep(REFORM_WAIT_TIME);
wait_time += REFORM_WAIT_TIME;
}
}
@ -1669,14 +1706,14 @@ static void CBReformSetDmsRole(void *db_handle, unsigned int reformer_id)
ss_reform_info_t *reform_info = &g_instance.dms_cxt.SSReformInfo;
dms_role_t new_dms_role = reformer_id == (unsigned int)SS_MY_INST_ID ? DMS_ROLE_REFORMER : DMS_ROLE_PARTNER;
if (new_dms_role == DMS_ROLE_REFORMER) {
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS switchover]begin to set currrent DSS as primary")));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS switchover] begin to set currrent DSS as primary")));
SSGrantDSSWritePermission();
g_instance.dms_cxt.SSClusterState = NODESTATE_STANDBY_PROMOTING;
}
reform_info->dms_role = new_dms_role;
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS switchover]role and lock switched, updated inst:%d with role:%d success",
errmsg("[SS reform][SS switchover]role and lock switched, updated inst:%d with role:%d success",
SS_MY_INST_ID, reform_info->dms_role)));
/* we need change ha cur mode for switchover in ss double cluster here */
if (SS_DISASTER_CLUSTER) {
@ -1690,23 +1727,40 @@ static void ReformCleanBackends()
if (!g_instance.dms_cxt.SSRecoveryInfo.startup_reform) {
SendPostmasterSignal(PMSIGNAL_DMS_REFORM);
}
long max_wait_time = 60000000L;
long wait_time = 0;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] wait backends to exit")));
while (true) {
if (dms_reform_failed()) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform]reform failed during caneling backends")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform] reform failed during caneling backends")));
return;
}
if (g_instance.dms_cxt.SSRecoveryInfo.reform_ready || g_instance.dms_cxt.SSRecoveryInfo.startup_reform) {
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform]reform ready, backends have been terminated")));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] reform ready, backends have been terminated successfully."
" pmState=%d, SSClusterState=%d, demotion=%d-%d, rec=%d",
pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery)));
return;
}
if (wait_time > max_wait_time) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform] reform failed, backends can not exit")));
/* check and print some thread which no exit. */
SSCountAndPrintChildren(BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC);
SSProcessForceExit();
}
pg_usleep(REFORM_WAIT_TIME);
wait_time += REFORM_WAIT_TIME;
}
}
static void FailoverCleanBackends()
{
if (g_instance.dms_cxt.SSRecoveryInfo.startup_reform) {
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] FailoverCleanBackends:"
"no need to clean backends.")));
return;
}
@ -1725,18 +1779,21 @@ static void FailoverCleanBackends()
SendPostmasterSignal(PMSIGNAL_DMS_FAILOVER_TERM_BACKENDS);
long max_wait_time = 30000000L;
long wait_time = 0;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] wait backends to exit")));
while (true) {
if (g_instance.dms_cxt.SSRecoveryInfo.no_backend_left && !CheckpointInProgress()) {
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] backends exit successfully")));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] backends exit successfully")));
break;
}
if (wait_time > max_wait_time) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS failover] failover failed, backends can not exit")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] failover failed, backends can not exit")));
/* check and print some thread which no exit. */
SSCountAndPrintChildren(BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC);
_exit(0);
}
if (dms_reform_failed()) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS failover] reform failed during clean backends")));
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] reform failed during clean backends")));
return;
}
@ -1803,7 +1860,7 @@ static void FailoverStartNotify(dms_reform_start_context_t *rs_cxt)
g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag = true;
if (rs_cxt->role == DMS_ROLE_REFORMER) {
g_instance.dms_cxt.dw_init = false;
// variable set order: SharedRecoveryInProgress -> failover_ckpt_status -> dms_role
/* variable set order: SharedRecoveryInProgress -> failover_ckpt_status -> dms_role */
volatile XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->IsRecoveryDone = false;
@ -1813,18 +1870,41 @@ static void FailoverStartNotify(dms_reform_start_context_t *rs_cxt)
pg_memory_barrier();
g_instance.dms_cxt.SSRecoveryInfo.failover_ckpt_status = NOT_ALLOW_CKPT;
g_instance.dms_cxt.SSClusterState = NODESTATE_STANDBY_FAILOVER_PROMOTING;
/* Backends should exit in here, this step should be bring forward and not in CBFailoverPromote */
pmState = PM_WAIT_BACKENDS;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] failover trigger.")));
/*
* single cluster: SET PM_WAIT_BACKENDS in check PMSIGNAL_DMS_FAILOVER_TERM_BACKENDS.
* standby cluster of dual cluster: Backends should exit in here, this step should be
* bring forward and not in CBFailoverPromote.
*/
if (SS_DORADO_STANDBY_CLUSTER) {
pmState = PM_WAIT_BACKENDS;
}
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] failover trigger.")));
}
/* SET PM_WAIT_BACKENDS in check PMSIGNAL_DMS_FAILOVER_TERM_BACKENDS */
ereport(LOG, (errmsg("[SS reform][SS failover] starts, pmState=%d, SSClusterState=%d, demotion=%d-%d, rec=%d",
pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery)));
}
}
static void CBReformStartNotify(void *db_handle, dms_reform_start_context_t *rs_cxt)
{
ereport(LOG, (errmsg("[SS Reform] starts, pmState=%d, SSClusterState=%d, demotion=%d-%d, rec=%d",
pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery)));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS Reform] reform start enter: pmState=%d, SSClusterState=%d, demotion=%d-%d, rec=%d",
pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery)));
/*
* During db is stoping by gs_ctl or cm_ctl stop, but next round of reform starts due to error judgement.
* If db stop process is blocked or too slow, so next round of reform maybe trigger. In theory, this should
* not happened, so db have to exit by fore.
*/
if (g_instance.status >= FastShutdown) {
ereport(WARNING,
(errmodule(MOD_DMS),
errmsg("[SS reform] reform starts concurrencely when db is stoping, db exit by force now")));
_exit(0);
}
SSHandleStartupWhenReformStart(rs_cxt);
ss_reform_info_t *reform_info = &g_instance.dms_cxt.SSReformInfo;
reform_info->is_hashmap_constructed = false;
@ -1848,9 +1928,9 @@ static void CBReformStartNotify(void *db_handle, dms_reform_start_context_t *rs_
ReformTypeToString(reform_info->reform_type, reform_type_str);
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform] reform start, role:%d, reform type:SS %s, standby scenario:%d, "
"reform_ver:%ld.",
"bitmap_reconnect:%d, reform_ver:%ld.",
reform_info->dms_role, reform_type_str, SSPerformingStandbyScenario(),
reform_info->reform_ver)));
reform_info->bitmap_reconnect, reform_info->reform_ver)));
if (reform_info->dms_role == DMS_ROLE_REFORMER) {
SSGrantDSSWritePermission();
}
@ -1858,12 +1938,14 @@ static void CBReformStartNotify(void *db_handle, dms_reform_start_context_t *rs_
int old_primary = SSGetPrimaryInstId();
SSReadControlFile(old_primary, true);
g_instance.dms_cxt.SSReformInfo.old_bitmap = g_instance.dms_cxt.SSReformerControl.list_stable;
ereport(LOG, (errmsg("[SS reform] old cluster node bitmap: %lu", g_instance.dms_cxt.SSReformInfo.old_bitmap)));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] old cluster node bitmap: %lu", g_instance.dms_cxt.SSReformInfo.old_bitmap)));
if (g_instance.dms_cxt.SSRecoveryInfo.in_failover) {
FailoverCleanBackends();
} else if (SSBackendNeedExitScenario()) {
ReformCleanBackends();
} else {
ProcessNoCleanBackendsScenario();
}
if (SS_DISASTER_CLUSTER && reform_info->reform_type != DMS_REFORM_TYPE_FOR_SWITCHOVER_OPENGAUSS) {
@ -1876,8 +1958,8 @@ static int CBReformDoneNotify(void *db_handle)
if (g_instance.dms_cxt.SSRecoveryInfo.in_failover) {
g_instance.dms_cxt.SSRecoveryInfo.in_failover = false;
if (SS_REFORM_REFORMER) {
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] failover success, instance:%d become primary.",
g_instance.attr.attr_storage.dms_attr.instance_id)));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] failover success, instance:%d"
" become primary.", g_instance.attr.attr_storage.dms_attr.instance_id)));
}
}
@ -1891,13 +1973,14 @@ static int CBReformDoneNotify(void *db_handle)
g_instance.dms_cxt.SSRecoveryInfo.failover_ckpt_status = NOT_ACTIVE;
Assert(g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy == false);
g_instance.dms_cxt.SSReformInfo.new_bitmap = g_instance.dms_cxt.SSReformerControl.list_stable;
ereport(LOG, (errmsg("[SS reform] new cluster node bitmap: %lu", g_instance.dms_cxt.SSReformInfo.new_bitmap)));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] new cluster node bitmap: %lu",
g_instance.dms_cxt.SSReformInfo.new_bitmap)));
g_instance.dms_cxt.SSReformInfo.reform_end_time = GetCurrentTimestamp();
g_instance.dms_cxt.SSReformInfo.reform_success = true;
ereport(LOG,
(errmodule(MOD_DMS),
errmsg("[SS reform/SS switchover/SS failover] Reform success, instance:%d is running.",
errmsg("[SS reform] Reform success, instance:%d is running.",
g_instance.attr.attr_storage.dms_attr.instance_id)));
/* reform success indicates that reform of primary and standby all complete, then update gaussdb.state */
@ -1906,6 +1989,21 @@ static int CBReformDoneNotify(void *db_handle)
g_instance.dms_cxt.SSClusterState = NODESTATE_NORMAL;
g_instance.dms_cxt.SSRecoveryInfo.realtime_build_in_reform = false;
g_instance.dms_cxt.SSReformInfo.in_reform = false;
/*
* Only two kand of condition:
* 1.Primary or standby restart in single node mode, other nodes in cluster is set PM_WAIT_REFORM.
* 2.In failover, standby no promoting as priamey is set PM_WAIT_BACKENDS.
*/
if (pmState == PM_WAIT_REFORM ||
(SS_PERFORMING_FAILOVER && SS_STANDBY_MODE && pmState == PM_WAIT_BACKENDS)) {
pmState = PM_RUN;
}
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] reform done: pmState=%d, SSClusterState=%d, demotion=%d-%d, "
"rec=%d, dmsStatus=%d.", pmState, g_instance.dms_cxt.SSClusterState,
g_instance.demotion, t_thrd.walsender_cxt.WalSndCtl->demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery,
g_instance.dms_cxt.dms_status)));
return GS_SUCCESS;
}
@ -1918,7 +2016,7 @@ static int CBXLogWaitFlush(void *db_handle, unsigned long long lsn)
static int CBDBCheckLock(void *db_handle)
{
if (t_thrd.storage_cxt.num_held_lwlocks > 0) {
ereport(PANIC, (errmsg("hold lock, lock address:%p, lock mode:%u",
ereport(PANIC, (errmodule(MOD_DMS), errmsg("[SS lock] hold lock, lock address:%p, lock mode:%u",
t_thrd.storage_cxt.held_lwlocks[0].lock, t_thrd.storage_cxt.held_lwlocks[0].mode)));
return GS_ERROR;
}
@ -1980,7 +2078,8 @@ void DmsCallbackThreadShmemInit(unsigned char need_startup, char **reg_data)
ALLOCSET_DEFAULT_MAXSIZE);
/* create timer with thread safe */
if (gs_signal_createtimer() < 0) {
ereport(FATAL, (errmsg("create timer fail at thread : %lu", t_thrd.proc_cxt.MyProcPid)));
ereport(FATAL, (errmodule(MOD_DMS), errmsg("[SS] create timer fail at thread : %lu",
t_thrd.proc_cxt.MyProcPid)));
}
CreateLocalSysDBCache();
InitShmemForDmsCallBack();
@ -2002,7 +2101,7 @@ int CBOndemandRedoPageForStandby(void *block_key, int32 *redo_status)
Assert(SS_PRIMARY_MODE);
// do nothing if not in ondemand recovery
if (!SS_IN_ONDEMAND_RECOVERY) {
ereport(DEBUG1, (errmsg("[On-demand] Ignore standby redo page request, spc/db/rel/bucket "
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[SS][On-demand] Ignore standby redo page request, spc/db/rel/bucket "
"fork-block: %u/%u/%u/%d %d-%u", tag->rnode.spcNode, tag->rnode.dbNode,
tag->rnode.relNode, tag->rnode.bucketNode, tag->forkNum, tag->blockNum)));
*redo_status = ONDEMAND_REDO_SKIP;
@ -2027,9 +2126,9 @@ int CBOndemandRedoPageForStandby(void *block_key, int32 *redo_status)
t_thrd.int_cxt.InterruptHoldoffCount = saveInterruptHoldoffCount;
/* Save error info */
ErrorData* edata = CopyErrorData();
ereport(WARNING,
(errmsg("[On-demand] Error happend when primary redo page for standby, spc/db/rel/bucket "
"fork-block: %u/%u/%u/%d %d-%u", tag->rnode.spcNode, tag->rnode.dbNode,
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Error happend when primary redo page for standby.",
tag->rnode.spcNode, tag->rnode.dbNode,
tag->rnode.relNode, tag->rnode.bucketNode, tag->forkNum, tag->blockNum),
errdetail("%s", edata->detail)));
FlushErrorState();
@ -2038,8 +2137,8 @@ int CBOndemandRedoPageForStandby(void *block_key, int32 *redo_status)
}
PG_END_TRY();
ereport(DEBUG1, (errmsg("[On-demand] Redo page for standby done, spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, "
"redo status: %d", tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode,
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Redo page for standby done. redo status: %d.",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode,
tag->rnode.bucketNode, tag->forkNum, tag->blockNum, *redo_status)));
return GS_SUCCESS;;
}

View File

@ -34,6 +34,7 @@
#include "postmaster/postmaster.h"
#include "storage/file/fio_device.h"
#include "replication/ss_disaster_cluster.h"
#include "replication/walsender_private.h"
#include "ddes/dms/ss_dms_bufmgr.h"
#include "ddes/dms/ss_dms_recovery.h"
#include "ddes/dms/ss_reform_common.h"
@ -78,6 +79,9 @@ void SSSavePrimaryInstId(int id)
bool SSRecoveryNodes()
{
bool result = false;
long max_wait_time = 300000000L;
long wait_time = 0;
while (true) {
/** why use lock:
* time1 startup thread: update IsRecoveryDone, not finish UpdateControlFile
@ -110,12 +114,25 @@ bool SSRecoveryNodes()
}
if (dms_reform_failed()) {
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] reform fail in recovery, pmState=%d, SSClusterState=%d,"
"demotion=%d-%d, rec=%d", pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.shemem_ptr_cxt.XLogCtl->IsRecoveryDone)));
SSWaitStartupExit();
result = false;
break;
}
if ((wait_time % max_wait_time) == 0 && wait_time != 0) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform] wait recovery to"
" apply done for %ld us.", wait_time)));
}
pg_usleep(REFORM_WAIT_TIME);
wait_time += REFORM_WAIT_TIME;
}
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] recovery success, pmState=%d, SSClusterState=%d, demotion=%d-%d, rec=%d",
pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.shemem_ptr_cxt.XLogCtl->IsRecoveryDone)));
return result;
}
@ -179,7 +196,7 @@ void SSInitReformerControlPages(void)
FIN_CRC32C(g_instance.dms_cxt.SSReformerControl.crc);
if (sizeof(ss_reformer_ctrl_t) > PG_CONTROL_SIZE) {
ereport(PANIC, (errmsg("sizeof(ControlFileData) is larger than PG_CONTROL_SIZE; fix either one")));
ereport(PANIC, (errmsg("[SS] sizeof(ControlFileData) is larger than PG_CONTROL_SIZE; fix either one")));
}
errorno = memset_s(buffer, PG_CONTROL_SIZE, 0, PG_CONTROL_SIZE);
@ -191,18 +208,23 @@ void SSInitReformerControlPages(void)
fd = BasicOpenFile(XLOG_CONTROL_FILE, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0) {
ereport(PANIC,
(errcode_for_file_access(), errmsg("could not create control file \"%s\": %m", XLOG_CONTROL_FILE)));
(errcode_for_file_access(), errmsg("[SS] could not create control file \"%s\": %m", XLOG_CONTROL_FILE)));
}
SSWriteInstanceControlFile(fd, buffer, REFORM_CTRL_PAGE, PG_CONTROL_SIZE);
if (close(fd)) {
ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m")));
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not close control file: %m")));
}
}
void SShandle_promote_signal()
{
if (pmState == PM_WAIT_BACKENDS) {
/*
* SS_ONDEMAND_REALTIME_BUILD_DISABLED represent 2 conditions
* 1. SS_PRIMARY_MODE
* 2. SS_STANDBY_MODE and not ENABLE_ONDEMAND_REALTIME_BUILD
*/
if (SS_ONDEMAND_REALTIME_BUILD_DISABLED) {
g_instance.pid_cxt.StartupPID = initialize_util_thread(STARTUP);
}
@ -210,7 +232,8 @@ void SShandle_promote_signal()
pmState = PM_STARTUP;
}
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] begin startup.")));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] initialize startup thread start,"
"pmState: %d.", pmState)));
}
@ -242,7 +265,7 @@ void ss_failover_dw_init_internal()
dw_ext_init();
dw_init();
g_instance.dms_cxt.finishedRecoverOldPrimaryDWFile = false;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] dw init finish")));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] dw init finish")));
}
void ss_failover_dw_init()
@ -262,26 +285,26 @@ XLogRecPtr SSOndemandRequestPrimaryCkptAndGetRedoLsn()
XLogRecPtr primaryRedoLsn = InvalidXLogRecPtr;
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("[On-demand] start request primary node %d do checkpoint", SS_PRIMARY_ID)));
errmsg("[SS][On-demand] start request primary node %d do checkpoint", SS_PRIMARY_ID)));
if (SS_ONDEMAND_REALTIME_BUILD_NORMAL) {
dms_context_t dms_ctx;
InitDmsContext(&dms_ctx);
dms_ctx.xmap_ctx.dest_id = (unsigned int)SS_PRIMARY_ID;
if (dms_req_opengauss_immediate_checkpoint(&dms_ctx, (unsigned long long *)&primaryRedoLsn) == GS_SUCCESS) {
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("[On-demand] request primary node %d checkpoint success, redoLoc %X/%X", SS_PRIMARY_ID,
errmsg("[SS][On-demand] request primary node %d checkpoint success, redoLoc %X/%X", SS_PRIMARY_ID,
(uint32)(primaryRedoLsn << 32), (uint32)primaryRedoLsn)));
return primaryRedoLsn;
}
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("[On-demand] request primary node %d checkpoint failed", SS_PRIMARY_ID)));
errmsg("[SS][On-demand] request primary node %d checkpoint failed", SS_PRIMARY_ID)));
}
// read from DMS failed, so read from DSS
SSReadControlFile(SS_PRIMARY_ID, true);
primaryRedoLsn = g_instance.dms_cxt.ckptRedo;
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("[On-demand] read primary node %d checkpoint loc in control file, redoLoc %X/%X", SS_PRIMARY_ID,
errmsg("[SS][On-demand] read primary node %d checkpoint loc in control file, redoLoc %X/%X", SS_PRIMARY_ID,
(uint32)(primaryRedoLsn << 32), (uint32)primaryRedoLsn)));
return primaryRedoLsn;
}
@ -321,7 +344,7 @@ void OndemandRealtimeBuildHandleFailover()
pg_usleep(REFORM_WAIT_TIME);
}
g_instance.dms_cxt.SSRecoveryInfo.ondemand_realtime_build_status = BUILD_TO_REDO;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[On-demand] Node:%d receive failover signal, "
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS][On-demand] Node:%d receive failover signal, "
"close realtime build and start ondemand build, set status to BUILD_TO_REDO.", SS_MY_INST_ID)));
}
@ -335,7 +358,7 @@ XLogRedoAction SSCheckInitPageXLog(XLogReaderState *record, uint8 block_id, Redo
RedoBufferTag blockinfo;
if (!XLogRecGetBlockTag(record, block_id, &(blockinfo.rnode), &(blockinfo.forknum), &(blockinfo.blkno),
&(blockinfo.pblk))) {
ereport(PANIC, (errmsg("[SS redo] failed to locate backup block with ID %d", block_id)));
ereport(PANIC, (errmsg("[SS redo] failed to locate backup block with ID %d.", block_id)));
}
bool skip = false;
char pageid[DMS_PAGEID_SIZE] = {0};
@ -352,7 +375,7 @@ XLogRedoAction SSCheckInitPageXLog(XLogReaderState *record, uint8 block_id, Redo
Buffer buf = XLogReadBufferExtended(blockinfo.rnode, blockinfo.forknum, blockinfo.blkno,
RBM_NORMAL_NO_LOG, pblk, tde);
if (BufferIsInvalid(buf)) {
ereport(PANIC, (errmsg("[SS redo][%u/%u/%u/%d %d-%u] buffer should be found",
ereport(PANIC, (errmsg("[SS redo][%u/%u/%u/%d %d-%u] buffer should be found.",
blockinfo.rnode.spcNode, blockinfo.rnode.dbNode, blockinfo.rnode.relNode,
blockinfo.rnode.bucketNode, blockinfo.forknum, blockinfo.blkno)));
}
@ -435,6 +458,7 @@ bool SSPageReplayNeedSkip(RedoBufferInfo *bufferinfo, XLogRecPtr xlogLsn)
xlogLsn, pageLsn)));
}
#endif
if (XLByteLE(xlogLsn, pageLsn)) {
bufferinfo->buf = buf;
bufferinfo->lsn = xlogLsn;

View File

@ -576,19 +576,29 @@ static bool DMSReformCheckStartup()
bool DMSWaitInitStartup()
{
ereport(LOG, (errmsg("[SS reform] Node:%d first-round reform wait to initialize startup thread.", SS_MY_INST_ID)));
g_instance.dms_cxt.dms_status = (dms_status_t)DMS_STATUS_JOIN;
ereport(LOG, (errmsg("[SS reform][db sync wait] Node:%d first-round reform wait to initialize startup thread."
"dms_status:%d", SS_MY_INST_ID, g_instance.dms_cxt.dms_status)));
long max_wait_time = 300000000L;
long wait_time = 0;
while (g_instance.pid_cxt.StartupPID == 0) {
(void)DMSReformCheckStartup();
if (dms_reform_last_failed()) {
return false;
}
if ((wait_time % max_wait_time) == 0 && wait_time != 0) {
ereport(WARNING, (errmsg("[SS reform][db sync wait ] Node:%d wait startup thread "
"to initialize for %ld us.", SS_MY_INST_ID, wait_time)));
}
pg_usleep(REFORM_WAIT_TIME);
wait_time += REFORM_WAIT_TIME;
}
if (g_instance.pid_cxt.StartupPID != 0) {
ereport(LOG, (errmsg("[SS reform] Node:%d initialize startup thread success.", SS_MY_INST_ID)));
ereport(LOG, (errmsg("[SS reform][db sync wait] Node:%d initialize startup thread success.", SS_MY_INST_ID)));
}
return true;

View File

@ -36,6 +36,7 @@
#include "storage/file/fio_device.h"
#include "storage/smgr/segment_internal.h"
#include "replication/walreceiver.h"
#include "replication/walsender_private.h"
#include "replication/ss_disaster_cluster.h"
/*
@ -92,8 +93,8 @@ int SSXLogFileOpenAnyTLI(XLogSegNo segno, int emode, uint32 sources, char* xlog_
}
if (!FILE_POSSIBLY_DELETED(errno)) {
ereport(PANIC, (errcode_for_file_access(), errmsg("could not open file \"%s\" (log segment %s): %m", path,
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, segno))));
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not open file \"%s\" (log segment %s): %m,"
" %s", path, XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, segno), TRANSLATE_ERRNO)));
}
}
@ -104,7 +105,7 @@ int SSXLogFileOpenAnyTLI(XLogSegNo segno, int emode, uint32 sources, char* xlog_
securec_check_ss(errorno, "", "");
errno = ENOENT;
ereport(emode, (errcode_for_file_access(), errmsg("could not open file \"%s\" (log segment %s): %m", path,
ereport(emode, (errcode_for_file_access(), errmsg("[SS] could not open file \"%s\" (log segment %s): %m", path,
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, segno))));
return -1;
@ -129,7 +130,7 @@ int SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XL
* That preReadStartPtr is InvalidXlogPreReadStartPtr has three kinds of occasions.
*/
if (xlogreader->preReadStartPtr == InvalidXlogPreReadStartPtr && SS_DISASTER_CLUSTER) {
ereport(LOG, (errmsg("In ss disaster cluster mode, preReadStartPtr is 0.")));
ereport(LOG, (errmsg("[SS] In ss disaster cluster mode, preReadStartPtr is 0.")));
}
// pre-reading for dss
@ -231,7 +232,7 @@ void SSDisasterGetXlogPathList()
DIR* dssdir = opendir(g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name);
if (dssdir == NULL) {
ereport(PANIC, (errcode_for_file_access(), errmsg("Error opening dssdir %s",
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] Error opening dssdir %s",
g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name)));
}
@ -283,12 +284,14 @@ void SSUpdateReformerCtrl()
}
if (fd < 0) {
ereport(FATAL, (errcode_for_file_access(), errmsg("could not open control file \"%s\": %s", fname[i], TRANSLATE_ERRNO)));
ereport(FATAL, (errcode_for_file_access(), errmsg("[SS] could not open control file \"%s\": %s",
fname[i], TRANSLATE_ERRNO)));
}
SSWriteInstanceControlFile(fd, buffer, REFORM_CTRL_PAGE, write_size);
if (close(fd)) {
ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %s", TRANSLATE_ERRNO)));
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not close control file: %s",
TRANSLATE_ERRNO)));
}
}
}
@ -309,7 +312,7 @@ loop:
fd = BasicOpenFile(fname, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0) {
LWLockRelease(ControlFileLock);
ereport(FATAL, (errcode_for_file_access(), errmsg("could not open control file \"%s\": %m", fname)));
ereport(FATAL, (errcode_for_file_access(), errmsg("[SS] could not open control file \"%s\": %m", fname)));
}
off_t seekpos = (off_t)BLCKSZ * id;
@ -324,7 +327,7 @@ loop:
char buffer[read_size] __attribute__((__aligned__(ALIGNOF_BUFFER)));
if (pread(fd, buffer, read_size, seekpos) != read_size) {
LWLockRelease(ControlFileLock);
ereport(PANIC, (errcode_for_file_access(), errmsg("could not read from control file: %m")));
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not read from control file: %m")));
}
if (id == REFORM_CTRL_PAGE) {
@ -332,7 +335,7 @@ loop:
securec_check(rc, "", "");
if (close(fd) < 0) {
LWLockRelease(ControlFileLock);
ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m")));
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not close control file: %m")));
}
/* Now check the CRC. */
@ -342,13 +345,13 @@ loop:
if (!EQ_CRC32C(crc, g_instance.dms_cxt.SSReformerControl.crc)) {
if (retry == false) {
ereport(WARNING, (errmsg("control file \"%s\" contains incorrect checksum, try backup file", fname)));
ereport(WARNING, (errmsg("[SS] control file \"%s\" contains incorrect checksum, try backup file", fname)));
fname = XLOG_CONTROL_FILE_BAK;
retry = true;
goto loop;
} else {
LWLockRelease(ControlFileLock);
ereport(FATAL, (errmsg("incorrect checksum in control file")));
ereport(FATAL, (errmsg("[SS] incorrect checksum in control file")));
}
}
g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status = g_instance.dms_cxt.SSReformerControl.clusterStatus;
@ -365,7 +368,7 @@ loop:
securec_check(rc, "", "");
if (close(fd) < 0) {
LWLockRelease(ControlFileLock);
ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m")));
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not close control file: %m")));
}
/* Now check the CRC. */
@ -375,13 +378,13 @@ loop:
if (!EQ_CRC32C(crc, controlFile->crc)) {
if (retry == false) {
ereport(WARNING, (errmsg("control file \"%s\" contains incorrect checksum, try backup file", fname)));
ereport(WARNING, (errmsg("[SS] control file \"%s\" contains incorrect checksum, try backup file", fname)));
fname = XLOG_CONTROL_FILE_BAK;
retry = true;
goto loop;
} else {
LWLockRelease(ControlFileLock);
ereport(FATAL, (errmsg("incorrect checksum in control file")));
ereport(FATAL, (errmsg("[SS] incorrect checksum in control file")));
}
}
@ -395,14 +398,14 @@ loop:
void SSClearSegCache()
{
(void)LWLockAcquire(ShmemIndexLock, LW_EXCLUSIVE);
HeapMemResetHash(t_thrd.storage_cxt.SegSpcCache, "Shared Seg Spc hash by request");
HeapMemResetHash(t_thrd.storage_cxt.SegSpcCache, "[SS] Shared Seg Spc hash by request");
LWLockRelease(ShmemIndexLock);
}
void SSStandbySetLibpqswConninfo()
{
if (strlen(g_instance.dms_cxt.dmsInstAddr[g_instance.dms_cxt.SSReformerControl.primaryInstId]) == 0) {
ereport(WARNING, (errmsg("Failed to get ip of primary node!")));
ereport(WARNING, (errmsg("[SS] Failed to get ip of primary node!")));
return;
}
@ -423,7 +426,7 @@ void SSStandbySetLibpqswConninfo()
}
if (replIdx == -1) {
ereport(WARNING, (errmsg("Failed to get replconninfo of primary node, check the replconninfo config!")));
ereport(WARNING, (errmsg("[SS] Failed to get replconninfo of primary node, check the replconninfo config!")));
return;
}
@ -450,7 +453,7 @@ static void SSReadClusterRunMode()
fd = BasicOpenFile(fname, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0) {
LWLockRelease(ControlFileLock);
ereport(FATAL, (errcode_for_file_access(), errmsg("could not open control file \"%s\": %m", fname)));
ereport(FATAL, (errcode_for_file_access(), errmsg("[SS] could not open control file \"%s\": %m", fname)));
}
off_t seekpos = (off_t)BLCKSZ * REFORM_CTRL_PAGE;
@ -460,14 +463,14 @@ static void SSReadClusterRunMode()
if (pread(fd, buffer, read_size, seekpos) != read_size) {
(void)close(fd);
LWLockRelease(ControlFileLock);
ereport(PANIC, (errcode_for_file_access(), errmsg("could not read from control file: %m")));
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not read from control file: %m")));
}
g_instance.dms_cxt.SSReformerControl.clusterRunMode = ((ss_reformer_ctrl_t*)buffer)->clusterRunMode;
if (close(fd) < 0) {
LWLockRelease(ControlFileLock);
ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m")));
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not close control file: %m")));
}
LWLockRelease(ControlFileLock);
}
@ -478,7 +481,7 @@ void SSDisasterRefreshMode()
SSUpdateReformerCtrl();
LWLockRelease(ControlFileLock);
ereport(LOG, (errmsg("SSDisasterRefreshMode change control file cluster run mode to: %d",
ereport(LOG, (errmsg("[SS] SSDisasterRefreshMode change control file cluster run mode to: %d",
g_instance.dms_cxt.SSReformerControl.clusterRunMode)));
}
@ -491,7 +494,7 @@ void SSDisasterUpdateHAmode()
} else if (SS_DISASTER_STANDBY_CLUSTER) {
t_thrd.postmaster_cxt.HaShmData->current_mode = STANDBY_MODE;
}
ereport(LOG, (errmsg("SSDisasterUpdateHAmode change Ha current mode to: %d",
ereport(LOG, (errmsg("[SS] SSDisasterUpdateHAmode change Ha current mode to: %d",
t_thrd.postmaster_cxt.HaShmData->current_mode)));
}
}
@ -511,15 +514,21 @@ bool SSPerformingStandbyScenario()
void SSGrantDSSWritePermission(void)
{
/*
* Whether dss_set_server_status_wrapper leads to dead loop. There exists occasion that database maybe block
* here but fail and success log have no print. So it is suspected greatly that dss_set_server_status_wrapper
* cause database stuck.
*/
ereport(LOG, (errmodule(MOD_DMS), (errmsg("[SS reform] set dss server status as primary start."))));
while (dss_set_server_status_wrapper() != GS_SUCCESS) {
pg_usleep(REFORM_WAIT_LONG);
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("Failed to set DSS as primary, vgname: \"%s\", socketpath: \"%s\"",
errmsg("[SS reform] Failed to set DSS as primary, vgname: \"%s\", socketpath: \"%s\"",
g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name,
g_instance.attr.attr_storage.dss_attr.ss_dss_conn_path),
errhint("Check vgname and socketpath and restart later.")));
}
ereport(LOG, (errmsg("set dss server status as primary")));
ereport(LOG, (errmodule(MOD_DMS), (errmsg("[SS reform] set dss server status as primary: success"))));
}
bool SSPrimaryRestartScenario()
@ -533,9 +542,12 @@ bool SSPrimaryRestartScenario()
return false;
}
/* PRIMARY_CLUSTER and single cluster
* 1)standby scenario 2)primary restart -- no need exit
* 3)switchover 4)failover -- need exit
/*
* PRIMARY_CLUSTER and single cluster
* 1)standby scenario
* 2)primary restart -- no need exit
* 3)switchover
* 4)failover -- need exit
* STANDBY_CLUSTER
* all scenario -- need exit
*/
@ -550,7 +562,8 @@ bool SSBackendNeedExitScenario()
}
if (g_instance.attr.attr_sql.enableRemoteExcute) {
ereport(LOG, (errmsg("remote execute is enabled, all backends need exit to ensure complete transaction!")));
ereport(LOG, (errmsg("[SS] remote execute is enabled, all backends need exit to ensure"
"complete transaction!")));
return true;
}
@ -561,14 +574,14 @@ bool SSBackendNeedExitScenario()
return true;
}
static void SSProcessForceExit()
void SSProcessForceExit()
{
int log_level = WARNING;
#ifdef USE_ASSERT_CHECKING
log_level = PANIC;
#endif
ereport(log_level, (errmodule(MOD_DMS),
errmsg("exit now")));
errmsg("[SS reform] db exit directly by force now")));
_exit(0);
}
@ -585,12 +598,14 @@ void SSWaitStartupExit(bool send_signal)
pg_memory_barrier();
}
SendPostmasterSignal(PMSIGNAL_DMS_TERM_STARTUP);
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform] send terminate startup thread signal to PM.")));
}
if (SS_IN_REFORM && dms_reform_failed()) {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS reform] reform failed")));
errmsg("[SS reform] reform failed.")));
}
#ifdef USE_ASSERT_CHECKING
ereport(LOG, (errmodule(MOD_DMS),
@ -610,8 +625,8 @@ void SSWaitStartupExit(bool send_signal)
if (g_instance.dms_cxt.SSRecoveryInfo.recovery_trapped_in_page_request) {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS reform] pageredo or startup thread are trapped in page request "
"during recovery phase, need exit")));
errmsg("[SS reform] recovery_trapped_in_page_request: Thread pageredo or startup are trapped"
"in page request during recovery phase, db exit directly by force now.")));
_exit(0);
}
#ifndef USE_ASSERT_CHECKING
@ -635,19 +650,19 @@ void SSHandleStartupWhenReformStart(dms_reform_start_context_t *rs_cxt)
rs_cxt->role != DMS_ROLE_REFORMER) {
g_instance.dms_cxt.SSRecoveryInfo.realtime_build_in_reform = true;
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform][On-demand] reform start phase, stop ondemand realtime build before switchover.")));
errmsg("[SS reform][On-demand] reform start phase: stop ondemand realtime build before switchover.")));
SSWaitStartupExit(true);
return;
} else {
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform][On-demand] start phase, ondemand realtime build is enable, no need wait startup thread exit.")));
errmsg("[SS reform][On-demand] reform start phase: ondemand realtime build is enable, no need wait startup thread exit.")));
return;
}
}
if (SS_DISASTER_MAIN_STANDBY_NODE) {
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform] start phase, standby cluster main node "
errmsg("[SS reform][Dual cluster] reform start phase: standby cluster main node "
"no need wait startup thread exit.")));
return;
}
@ -658,8 +673,53 @@ void SSHandleStartupWhenReformStart(dms_reform_start_context_t *rs_cxt)
SSWaitStartupExit(false);
} else {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS reform] start phase, last round reform version:%ld, startup wait version:%ld",
errmsg("[SS reform] reform start phase, last round reform version:%ld, startup wait version:%ld",
reform_info->reform_ver, reform_info->reform_ver_startup_wait)));
SSProcessForceExit();
}
}
}
char* SSGetLogHeaderTypeStr()
{
if (!SS_IN_REFORM) {
return "[SS]";
}
switch (g_instance.dms_cxt.SSReformInfo.reform_type)
{
case (DMS_REFORM_TYPE_FOR_NORMAL_OPENGAUSS):
return "[SS reform]";
break;
case (DMS_REFORM_TYPE_FOR_FAILOVER_OPENGAUSS):
return "[SS reform][SS failover]";
break;
case (DMS_REFORM_TYPE_FOR_SWITCHOVER_OPENGAUSS):
return "[SS reform][SS swithover]";
break;
default:
return "[SS]";
}
}
void ProcessNoCleanBackendsScenario() {
/*
* PM_WAIT_REFORM indicates that this node don't do actually thing during reform phase.
* what only will be done currently is waiting this round of reform to finish.
* 1. primary restart, standby need set PM_WAIT_REFORM
* 2. standby restart, primary need set PM_WAIT_REFORM
* 3. SS_PERFORMING_SWITCHOVER, primary and standby all return. Because primary need PM_RUN to
* do ProcessDemoteRequest.
*/
if (SS_PERFORMING_SWITCHOVER) {
return;
}
if (((uint64)(0x1 << SS_MY_INST_ID) & g_instance.dms_cxt.SSReformInfo.bitmap_reconnect) == 0 &&
!g_instance.dms_cxt.SSRecoveryInfo.startup_reform &&
g_instance.dms_cxt.SSReformInfo.reform_type != DMS_REFORM_TYPE_FOR_FULL_CLEAN) {
pmState = PM_WAIT_REFORM;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] no clean backends, pmState=%d, SSClusterState=%d, "
"demotion=%d-%d, rec=%d", pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery)));
}
}

View File

@ -38,6 +38,7 @@
#include "securec.h"
#include "storage/procarray.h"
#include "replication/replicainternal.h"
#include "replication/walsender_private.h"
#include "storage/smgr/fd.h"
#include "access/csnlog.h"
#include "access/twophase.h"
@ -79,5 +80,8 @@ void SSHandleSwitchoverPromote()
Assert(g_instance.pid_cxt.StartupPID != 0);
pmState = PM_STARTUP;
}
ereport(LOG, (errmsg("[SS reform][SS swithover] swithover promote: pmState=%d, SSClusterState=%d, "
"demotion=%d-%d, rec=%d", pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery)));
return;
}

View File

@ -597,6 +597,7 @@ PMStateInfo pmStateDescription[] = {{PM_INIT, "PM_INIT"},
{PM_WAIT_BACKUP, "PM_WAIT_BACKUP"},
{PM_WAIT_READONLY, "PM_WAIT_READONLY"},
{PM_WAIT_BACKENDS, "PM_WAIT_BACKENDS"},
{PM_WAIT_REFORM, "PM_WAIT_REFORM"},
{PM_SHUTDOWN, "PM_SHUTDOWN"},
{PM_SHUTDOWN_2, "PM_SHUTDOWN_2"},
{PM_WAIT_DEAD_END, "PM_WAIT_DEAD_END"},
@ -3062,7 +3063,7 @@ int PostmasterMain(int argc, char* argv[])
SSDisasterRefreshMode();
}
int src_id = g_instance.dms_cxt.SSReformerControl.primaryInstId;
ereport(LOG, (errmsg("[SS reform] node%d starts, found cluster PRIMARY:%d",
ereport(LOG, (errmsg("[SS reform][db] Node:%d starts, found cluster PRIMARY:%d",
g_instance.attr.attr_storage.dms_attr.instance_id, src_id)));
Assert(src_id >= 0 && src_id <= DMS_MAX_INSTANCE - 1);
@ -3071,10 +3072,10 @@ int PostmasterMain(int argc, char* argv[])
while (g_instance.dms_cxt.SSReformerControl.list_stable == 0) {
pg_usleep(SLEEP_ONE_SEC);
SSReadControlFile(REFORM_CTRL_PAGE);
ereport(WARNING, (errmsg("[SS reform] node%d waiting for PRIMARY:%d to finish 1st reform",
ereport(WARNING, (errmsg("[SS reform][db wait] Node:%d waiting for PRIMARY:%d to finish 1st reform",
g_instance.attr.attr_storage.dms_attr.instance_id, src_id)));
}
ereport(LOG, (errmsg("[SS reform] Success: node:%d wait for PRIMARY:%d to finish 1st reform",
ereport(LOG, (errmsg("[SS reform][db wait] Node:%d wait for PRIMARY:%d to finish 1st reform: success",
g_instance.attr.attr_storage.dms_attr.instance_id, src_id)));
}
}
@ -3943,21 +3944,21 @@ static int ServerLoop(void)
(void)gs_signal_unblock_sigusr2();
if (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.startup_reform && !startup_reform_finish) {
ereport(LOG, (errmsg("[SS reform] Node:%d first-round reform start wait.", SS_MY_INST_ID)));
ereport(LOG, (errmsg("[SS reform][db sync wait] Node:%d first-round reform start wait.", SS_MY_INST_ID)));
if (!DMSWaitReform()) {
ereport(WARNING, (errmsg("[SS reform] Node:%d first-round reform failed, shutdown now",
ereport(WARNING, (errmsg("[SS reform][db sync wait] Node:%d first-round reform failed, shutdown now",
SS_MY_INST_ID)));
(void)gs_signal_send(PostmasterPid, SIGTERM);
startup_reform_finish = true;
} else {
ereport(LOG, (errmsg("[SS reform] Node:%d first-round reform success.", SS_MY_INST_ID)));
ereport(LOG, (errmsg("[SS reform][db sync wait] Node:%d first-round reform success.", SS_MY_INST_ID)));
startup_reform_finish = true;
}
}
/**
* Standby node start StartUp thread to start ondemand realtime build,
* after reform.
/*
* Standby node initializes StartUp thread to start ondemand realtime build
* after reform.
*/
if (ENABLE_ONDEMAND_REALTIME_BUILD && SS_ONDEMAND_REALTIME_BUILD_DISABLED &&
SS_NORMAL_STANDBY && SS_CLUSTER_ONDEMAND_NORMAL && pmState == PM_RUN &&
@ -6869,7 +6870,7 @@ dms_demote:
signal_child(g_instance.pid_cxt.CBMWriterPID, SIGTERM);
}
ereport(LOG, (errmsg("[SS switchover] primary demoting: "
ereport(LOG, (errmsg("[SS reform][SS switchover] primary demoting: "
"killed threads, waiting for backends die")));
pmState = PM_WAIT_BACKENDS;
} else {
@ -9713,6 +9714,11 @@ static void handle_recovery_started()
}
pmState = PM_RECOVERY;
if (ENABLE_DMS) {
ereport(LOG, (errmsg("[SS reform][db startupxlog]recovery start, pmState=%d, SSClusterState=%d,"
"demotion=%d-%d, rec=%d", pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.shemem_ptr_cxt.XLogCtl->IsRecoveryDone)));
}
}
}
@ -10509,6 +10515,9 @@ static void sigusr1_handler(SIGNAL_ARGS)
}
pmState = PM_WAIT_BACKENDS;
ereport(LOG, (errmsg("[SS reform][SS switchover] clean backends, pmState=%d, SSClusterState=%d, "
"demotion=%d-%d, rec=%d", pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery)));
if (ENABLE_THREAD_POOL) {
g_threadPoolControler->EnableAdjustPool();
}
@ -10584,22 +10593,26 @@ static void sigusr1_handler(SIGNAL_ARGS)
signal_child(g_instance.pid_cxt.CBMWriterPID, SIGTERM);
}
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] terminate backends success")));
ereport(LOG, (errmsg("[SS reform] clean backends: pmState=%d, SSClusterState=%d, demotion=%d-%d, rec=%d",
pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery)));
g_instance.dms_cxt.SSRecoveryInfo.reform_ready = true;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] clean backens: set reform_ready to true suceessfully")));
}
if (ENABLE_DMS && CheckPostmasterSignal(PMSIGNAL_DMS_FAILOVER_TERM_BACKENDS)) {
// failover trriger,cm show promoting in failover node, show starting in normal standby node.
if (SS_STANDBY_FAILOVER) {
PMUpdateDBState(PROMOTING_STATE, get_cur_mode(), get_cur_repl_num());
} else {
PMUpdateDBState(STARTING_STATE, get_cur_mode(), get_cur_repl_num());
}
t_thrd.dms_cxt.CloseAllSessionsFailed = false;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] kill backends begin.")));
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] kill backends begin.")));
if (ENABLE_THREAD_POOL) {
g_threadPoolControler->CloseAllSessions();
if (t_thrd.dms_cxt.CloseAllSessionsFailed) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS failover] failover failed,"
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS reform][SS failover] failover failed,"
"threadpool sessions closed failed.")));
_exit(0);
}
@ -10641,10 +10654,14 @@ static void sigusr1_handler(SIGNAL_ARGS)
if (g_instance.pid_cxt.WalWriterAuxiliaryPID != 0)
signal_child(g_instance.pid_cxt.WalWriterAuxiliaryPID, SIGTERM);
if (SS_STANDBY_FAILOVER) {
if (SS_PERFORMING_FAILOVER) {
pmState = PM_WAIT_BACKENDS;
}
ereport(LOG, (errmsg("[SS reform][SS failover] clean backends, pmState=%d, SSClusterState=%d, "
"demotion=%d-%d, rec=%d", pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery)));
if (ENABLE_THREAD_POOL) {
g_threadPoolControler->EnableAdjustPool();
}
@ -15381,3 +15398,40 @@ void SSOndemandProcExitIfStayWaitBackends()
_exit(0);
}
}
int SSCountAndPrintChildren(int target)
{
Dlelem* curr = NULL;
int cnt = 0;
for (curr = DLGetHead(g_instance.backend_list); curr; curr = DLGetSucc(curr)) {
Backend* bp = (Backend*)DLE_VAL(curr);
/*
* Since target == BACKEND_TYPE_ALL is the most common case, we test
* it first and avoid touching shared memory for every child.
*/
if (target != BACKEND_TYPE_ALL) {
int child = 0;
if (bp->is_autovacuum)
child = BACKEND_TYPE_AUTOVAC;
else if (IsPostmasterChildWalSender(bp->child_slot))
child = BACKEND_TYPE_WALSND;
else if (IsPostmasterChildDataSender(bp->child_slot))
child = BACKEND_TYPE_DATASND;
else
child = BACKEND_TYPE_NORMAL;
if (!((unsigned int)target & (unsigned int)child))
continue;
}
cnt++;
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS reform][SS failover] print thread no exiting, thread id:%lu, thread role:%d",
bp->pid, bp->role)));
}
return cnt;
}

View File

@ -606,6 +606,12 @@ BlockNumber eg_try_alloc_extent(SegExtentGroup *seg, BlockNumber preassigned_blo
SegmentCheck(map_group->free_page < map_group->page_count);
for (int j = map_group->free_page; j < map_group->page_count; j++) {
result = eg_alloc_extent_from_map(seg, curr_map_bno, target_file_size, i, j);
/*
* eg_alloc_extent_from_map two condition returning InvalidBlockNumber
* 1. map_page->free_bits == 0, need to find next map_page
* 2. eg_alloc_extent_from_map find free map and free bit but target_file_size > 0, that is
* block to be allocated > seg->segfile->total_blocks, so need to extend segphysicalfile.
*/
if (result != InvalidBlockNumber) {
return result;
}

View File

@ -75,6 +75,7 @@
#define SS_STANDBY_FAILOVER (ENABLE_DMS && g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_FAILOVER_PROMOTING)
/* reform type */
#define SS_PRIMARY_NORMAL_REFORM \
(SS_REFORM_REFORMER && (g_instance.dms_cxt.SSReformInfo.reform_type == DMS_REFORM_TYPE_FOR_NORMAL_OPENGAUSS))
@ -82,6 +83,11 @@
(ENABLE_DMS && SS_IN_REFORM && \
(g_instance.dms_cxt.SSReformInfo.reform_type == DMS_REFORM_TYPE_FOR_SWITCHOVER_OPENGAUSS))
#define SS_PERFORMING_FAILOVER \
(ENABLE_DMS && SS_IN_REFORM && \
(g_instance.dms_cxt.SSReformInfo.reform_type == DMS_REFORM_TYPE_FOR_FAILOVER_OPENGAUSS))
/* clusterstate for switchover */
#define SS_STANDBY_PROMOTING \
(ENABLE_DMS && (g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_PROMOTING))
@ -96,8 +102,6 @@
(ENABLE_DMS && (g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_WAITING || \
g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_REDIRECT))
/* Mode in dorado hyperreplication and dms enabled as follow */
#define SS_CLUSTER_ONDEMAND_NOT_NORAML \
(ENABLE_DMS && (g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status != CLUSTER_NORMAL))
#define SS_CLUSTER_ONDEMAND_BUILD \

View File

@ -30,6 +30,8 @@
#define REFORM_WAIT_TIME 10000 /* 0.01 sec */
#define REFORM_WAIT_LONG 100000 /* 0.1 sec */
#define WAIT_REFORM_CTRL_REFRESH_TRIES 1000
#define BACKEND_TYPE_NORMAL 0x0001 /* normal backend */
#define BACKEND_TYPE_AUTOVAC 0x0002 /* autovacuum worker process */
#define WAIT_PMSTATE_UPDATE_TRIES 100
@ -61,4 +63,7 @@ void SSGrantDSSWritePermission(void);
bool SSPrimaryRestartScenario();
bool SSBackendNeedExitScenario();
void SSWaitStartupExit(bool send_signal = true);
void SSHandleStartupWhenReformStart(dms_reform_start_context_t *rs_cxt);
void SSHandleStartupWhenReformStart(dms_reform_start_context_t *rs_cxt);
char* SSGetLogHeaderTypeStr();
void ProcessNoCleanBackendsScenario();
void SSProcessForceExit();

View File

@ -104,6 +104,7 @@ typedef enum {
PM_WAIT_BACKUP, /* waiting for online backup mode to end */
PM_WAIT_READONLY, /* waiting for read only backends to exit */
PM_WAIT_BACKENDS, /* waiting for live backends to exit */
PM_WAIT_REFORM, /* waiting for dms reform to finish */
PM_SHUTDOWN, /* waiting for checkpointer to do shutdown
* ckpt */
PM_SHUTDOWN_2, /* waiting for archiver and walsenders to
@ -262,4 +263,5 @@ extern void InitShmemForDmsCallBack();
extern void SignalTermAllBackEnd();
extern void SSRestartFailoverPromote();
extern void SIGBUS_handler(SIGNAL_ARGS);
extern int SSCountAndPrintChildren(int target);
#endif /* _POSTMASTER_H */