修改适配GSC开启后备机对全局失效消息的处理

This commit is contained in:
arcoalien@qq.com
2023-01-09 12:24:55 +08:00
parent d2cbd2bfc0
commit 7946661d31
9 changed files with 245 additions and 26 deletions

View File

@ -87,12 +87,12 @@ void TransformLockTagToDmsLatch(dms_drlatch_t* dlatch, const LOCKTAG locktag)
locktag.locktag_field3, locktag.locktag_field4, locktag.locktag_field5); locktag.locktag_field3, locktag.locktag_field4, locktag.locktag_field5);
} }
static void CalcSegDmsPhysicalLoc(BufferDesc* buf_desc, Buffer buffer) static void CalcSegDmsPhysicalLoc(BufferDesc* buf_desc, Buffer buffer, bool check_standby)
{ {
if (IsSegmentFileNode(buf_desc->tag.rnode)) { if (IsSegmentFileNode(buf_desc->tag.rnode)) {
SegmentCheck(!IsSegmentPhysicalRelNode(buf_desc->tag.rnode)); SegmentCheck(!IsSegmentPhysicalRelNode(buf_desc->tag.rnode));
SegPageLocation loc = seg_get_physical_location(buf_desc->tag.rnode, buf_desc->tag.forkNum, SegPageLocation loc = seg_get_physical_location(buf_desc->tag.rnode, buf_desc->tag.forkNum,
buf_desc->tag.blockNum); buf_desc->tag.blockNum, check_standby);
SegmentCheck(loc.blocknum != InvalidBlockNumber); SegmentCheck(loc.blocknum != InvalidBlockNumber);
ereport(DEBUG1, (errmsg("buffer:%d is segdata page, bufdesc seginfo is empty, calc segfileno:%d, segblkno:%u", ereport(DEBUG1, (errmsg("buffer:%d is segdata page, bufdesc seginfo is empty, calc segfileno:%d, segblkno:%u",
@ -185,7 +185,38 @@ bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode)
} }
#ifdef USE_ASSERT_CHECKING #ifdef USE_ASSERT_CHECKING
static void SmgrNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk) static SMGR_READ_STATUS SmgrNetPageCheckRead(Oid spcNode, Oid dbNode, Oid relNode, ForkNumber forkNum,
BlockNumber blockNo, char *blockbuf)
{
SMGR_READ_STATUS rdStatus;
SegSpace* spc = spc_open(spcNode, dbNode, false);
int count = 3;
SegmentCheck(spc);
RelFileNode fakenode = {
.spcNode = spc->spcNode,
.dbNode = spc->dbNode,
.relNode = relNode,
.bucketNode = SegmentBktId,
.opt = 0
};
RETRY:
seg_physical_read(spc, fakenode, forkNum, blockNo, (char *)blockbuf);
if (PageIsVerified((Page)blockbuf, blockNo)) {
rdStatus = SMGR_RD_OK;
} else {
rdStatus = SMGR_RD_CRC_ERROR;
}
if (rdStatus == SMGR_RD_CRC_ERROR && count > 0) {
count--;
goto RETRY;
}
return rdStatus;
}
void SmgrNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk)
{ {
/* /*
* prerequisite is that the page that initialized to zero in memory should be flush to disk * prerequisite is that the page that initialized to zero in memory should be flush to disk
@ -194,13 +225,34 @@ static void SmgrNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mo
IsSegmentBufferID(buf_desc->buf_id)) && (read_mode == RBM_NORMAL)) { IsSegmentBufferID(buf_desc->buf_id)) && (read_mode == RBM_NORMAL)) {
char *origin_buf = (char *)palloc(BLCKSZ + ALIGNOF_BUFFER); char *origin_buf = (char *)palloc(BLCKSZ + ALIGNOF_BUFFER);
char *temp_buf = (char *)BUFFERALIGN(origin_buf); char *temp_buf = (char *)BUFFERALIGN(origin_buf);
ReadBuffer_common_for_check(read_mode, buf_desc, pblk, temp_buf); SMgrRelation smgr = smgropen(buf_desc->tag.rnode, InvalidBackendId);
SMGR_READ_STATUS rdStatus;
if (pblk != NULL) {
rdStatus = SmgrNetPageCheckRead(smgr->smgr_rnode.node.spcNode, smgr->smgr_rnode.node.dbNode, pblk->relNode,
buf_desc->tag.forkNum, pblk->block, (char *)temp_buf);
} else if (buf_desc->seg_fileno != EXTENT_INVALID) {
rdStatus = SmgrNetPageCheckRead(smgr->smgr_rnode.node.spcNode, smgr->smgr_rnode.node.dbNode,
buf_desc->seg_fileno, buf_desc->tag.forkNum, buf_desc->seg_blockno, (char *)temp_buf);
} else {
rdStatus = smgrread(smgr, buf_desc->tag.forkNum, buf_desc->tag.blockNum, (char *)temp_buf);
}
if (rdStatus == SMGR_RD_CRC_ERROR) {
ereport(PANIC, (errmsg("[%d/%d/%d/%d/%d %d-%d] read from disk error, maybe buffer in flush",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
(int)buf_desc->tag.rnode.bucketNode, (int)buf_desc->tag.rnode.opt, buf_desc->tag.forkNum,
buf_desc->tag.blockNum)));
}
XLogRecPtr lsn_on_disk = PageGetLSN(temp_buf); XLogRecPtr lsn_on_disk = PageGetLSN(temp_buf);
XLogRecPtr lsn_on_mem = PageGetLSN(BufHdrGetBlock(buf_desc)); XLogRecPtr lsn_on_mem = PageGetLSN(BufHdrGetBlock(buf_desc));
/* maybe some pages are not protected by WAL-Logged */ /* maybe some pages are not protected by WAL-Logged */
if ((lsn_on_mem != InvalidXLogRecPtr) && (lsn_on_disk > lsn_on_mem)) { if ((lsn_on_mem != InvalidXLogRecPtr) && (lsn_on_disk > lsn_on_mem)) {
RelFileNode rnode = buf_desc->tag.rnode; RelFileNode rnode = buf_desc->tag.rnode;
ereport(PANIC, (errmsg("[%d/%d/%d/%d/%d %d-%d] memory lsn(0x%llx) is less than disk lsn(0x%llx)", int elevel = WARNING;
if (!RecoveryInProgress()) {
elevel = PANIC;
}
ereport(elevel, (errmsg("[%d/%d/%d/%d/%d %d-%d] memory lsn(0x%llx) is less than disk lsn(0x%llx)",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt, rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
buf_desc->tag.forkNum, buf_desc->tag.blockNum, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
(unsigned long long)lsn_on_mem, (unsigned long long)lsn_on_disk))); (unsigned long long)lsn_on_mem, (unsigned long long)lsn_on_disk)));
@ -237,8 +289,8 @@ Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const X
TerminateBufferIO(buf_desc, false, BM_VALID); TerminateBufferIO(buf_desc, false, BM_VALID);
buffer = BufferDescriptorGetBuffer(buf_desc); buffer = BufferDescriptorGetBuffer(buf_desc);
if (!RecoveryInProgress()) { if (!RecoveryInProgress() || g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy) {
CalcSegDmsPhysicalLoc(buf_desc, buffer); CalcSegDmsPhysicalLoc(buf_desc, buffer, !g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy);
} }
} }
@ -304,7 +356,7 @@ static bool DmsStartBufferIO(BufferDesc *buf_desc, LWLockMode mode)
} }
#ifdef USE_ASSERT_CHECKING #ifdef USE_ASSERT_CHECKING
static void SegNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc) void SegNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc)
{ {
/* /*
* prequisite is that the page that initialized to zero in memory should be flushed to disk, * prequisite is that the page that initialized to zero in memory should be flushed to disk,

View File

@ -652,8 +652,21 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
PG_CATCH(); PG_CATCH();
{ {
t_thrd.int_cxt.InterruptHoldoffCount = saveInterruptHoldoffCount; t_thrd.int_cxt.InterruptHoldoffCount = saveInterruptHoldoffCount;
/* Save error info */
ErrorData* edata = CopyErrorData();
FlushErrorState();
FreeErrorData(edata);
ereport(WARNING, (errmsg("[CBInvalidatePage] Error happend, spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, tag->forkNum,
tag->blockNum)));
ReleaseResource(); ReleaseResource();
ret = DMS_ERROR;
} }
if (ret == DMS_SUCCESS) {
Assert(buf_ctrl->lock_mode == DMS_LOCK_NULL);
}
PG_END_TRY(); PG_END_TRY();
return ret; return ret;
} }
@ -1213,7 +1226,6 @@ static int CBRecoveryPrimary(void *db_handle, int inst_id)
g_instance.dms_cxt.SSReformerControl.primaryInstId == -1); g_instance.dms_cxt.SSReformerControl.primaryInstId == -1);
g_instance.dms_cxt.SSRecoveryInfo.skip_redo_replay = false; g_instance.dms_cxt.SSRecoveryInfo.skip_redo_replay = false;
g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy = false; g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy = false;
t_thrd.xlog_cxt.LocalRecoveryInProgress = true;
ereport(LOG, (errmsg("[SS reform] Recovery as primary, will replay xlog from inst:%d", ereport(LOG, (errmsg("[SS reform] Recovery as primary, will replay xlog from inst:%d",
g_instance.dms_cxt.SSReformerControl.primaryInstId))); g_instance.dms_cxt.SSReformerControl.primaryInstId)));
@ -1229,16 +1241,18 @@ static int CBFlushCopy(void *db_handle, char *pageid)
{ {
if (SS_REFORM_REFORMER && !g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy) { if (SS_REFORM_REFORMER && !g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy) {
g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy = true; g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy = true;
smgrcloseall();
} }
BufferTag* tag = (BufferTag*)pageid; BufferTag* tag = (BufferTag*)pageid;
Buffer buffer; Buffer buffer;
SegSpace *spc = NULL;
uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount;
PG_TRY(); PG_TRY();
{ {
if (IsSegmentPhysicalRelNode(tag->rnode)) { if (IsSegmentPhysicalRelNode(tag->rnode)) {
SegSpace *spc = spc_open(tag->rnode.spcNode, tag->rnode.dbNode, true, false); spc = spc_open(tag->rnode.spcNode, tag->rnode.dbNode, false, false);
buffer = ReadBufferFast(spc, tag->rnode, tag->forkNum, tag->blockNum, RBM_NORMAL); buffer = ReadBufferFast(spc, tag->rnode, tag->forkNum, tag->blockNum, RBM_NORMAL);
} else { } else {
buffer = ReadBufferWithoutRelcache(tag->rnode, tag->forkNum, tag->blockNum, RBM_NORMAL, NULL, NULL); buffer = ReadBufferWithoutRelcache(tag->rnode, tag->forkNum, tag->blockNum, RBM_NORMAL, NULL, NULL);
@ -1270,8 +1284,17 @@ static int CBFlushCopy(void *db_handle, char *pageid)
} }
Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo)); Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo));
LockBuffer(buffer, BUFFER_LOCK_SHARE);
BufferDesc* buf_desc = GetBufferDescriptor(buffer - 1); BufferDesc* buf_desc = GetBufferDescriptor(buffer - 1);
XLogRecPtr pagelsn = BufferGetLSN(buf_desc); XLogRecPtr pagelsn = BufferGetLSN(buf_desc);
#ifdef USE_ASSERT_CHECKING
if (IsSegmentPhysicalRelNode(buf_desc->tag.rnode)) {
SegNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, spc);
} else {
SmgrNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL);
}
#endif
if (XLByteLT(g_instance.dms_cxt.ckptRedo, pagelsn)) { if (XLByteLT(g_instance.dms_cxt.ckptRedo, pagelsn)) {
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buffer - 1); dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buffer - 1);
buf_ctrl->state |= BUF_DIRTY_NEED_FLUSH; buf_ctrl->state |= BUF_DIRTY_NEED_FLUSH;
@ -1283,7 +1306,7 @@ static int CBFlushCopy(void *db_handle, char *pageid)
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, (unsigned long long)pagelsn))); tag->forkNum, tag->blockNum, (unsigned long long)pagelsn)));
} }
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer); ReleaseBuffer(buffer);
return GS_SUCCESS; return GS_SUCCESS;
} }
@ -1320,7 +1343,6 @@ static void CBReformStartNotify(void *db_handle, dms_role_t role, unsigned char
g_instance.dms_cxt.SSClusterState = NODESTATE_NORMAL; g_instance.dms_cxt.SSClusterState = NODESTATE_NORMAL;
g_instance.dms_cxt.SSRecoveryInfo.reform_ready = false; g_instance.dms_cxt.SSRecoveryInfo.reform_ready = false;
g_instance.dms_cxt.resetSyscache = true; g_instance.dms_cxt.resetSyscache = true;
t_thrd.xlog_cxt.LocalRecoveryInProgress = false;
if (ss_reform_type == DMS_REFORM_TYPE_FOR_FAILOVER_OPENGAUSS) { if (ss_reform_type == DMS_REFORM_TYPE_FOR_FAILOVER_OPENGAUSS) {
g_instance.dms_cxt.SSRecoveryInfo.in_failover = true; g_instance.dms_cxt.SSRecoveryInfo.in_failover = true;
} }

View File

@ -29,6 +29,9 @@
#include "storage/smgr/segment_internal.h" #include "storage/smgr/segment_internal.h"
#include "ddes/dms/ss_transaction.h" #include "ddes/dms/ss_transaction.h"
#include "ddes/dms/ss_dms_bufmgr.h" #include "ddes/dms/ss_dms_bufmgr.h"
#include "storage/sinvaladt.h"
void SSStandbyGlobalInvalidSharedInvalidMessages(const SharedInvalidationMessage* msg, Oid tsid);
Snapshot SSGetSnapshotData(Snapshot snapshot) Snapshot SSGetSnapshotData(Snapshot snapshot)
{ {
@ -372,6 +375,8 @@ void SSSendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
SharedInvalidationMessage *msg = (SharedInvalidationMessage *)(msgs + i); SharedInvalidationMessage *msg = (SharedInvalidationMessage *)(msgs + i);
SSBroadcastSI ssmsg; SSBroadcastSI ssmsg;
ssmsg.tablespaceid = u_sess->proc_cxt.MyDatabaseTableSpace;
Assert(ssmsg.tablespaceid != InvalidOid);
ssmsg.type = BCAST_SI; ssmsg.type = BCAST_SI;
if (msg->id >= SHAREDINVALFUNC_ID) { if (msg->id >= SHAREDINVALFUNC_ID) {
errno_t rc = errno_t rc =
@ -383,8 +388,8 @@ void SSSendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
} }
int backup_output = t_thrd.postgres_cxt.whereToSendOutput; int backup_output = t_thrd.postgres_cxt.whereToSendOutput;
t_thrd.postgres_cxt.whereToSendOutput = DestNone; t_thrd.postgres_cxt.whereToSendOutput = DestNone;
int ret = dms_broadcast_msg(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastSI), (unsigned char)false, int ret = dms_broadcast_opengauss_ddllock(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastSI),
SS_BROADCAST_WAIT_FIVE_SECONDS); (unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS, (unsigned char)SHARED_INVAL_MSG);
if (ret != DMS_SUCCESS) { if (ret != DMS_SUCCESS) {
ereport(DEBUG1, (errmsg("SS broadcast SI msg failed!"))); ereport(DEBUG1, (errmsg("SS broadcast SI msg failed!")));
} }
@ -411,8 +416,8 @@ void SSBCastDropRelAllBuffer(RelFileNode *rnodes, int rnode_len)
int output_backup = t_thrd.postgres_cxt.whereToSendOutput; int output_backup = t_thrd.postgres_cxt.whereToSendOutput;
t_thrd.postgres_cxt.whereToSendOutput = DestNone; t_thrd.postgres_cxt.whereToSendOutput = DestNone;
int ret = dms_broadcast_msg(&dms_ctx, (char *)msg, sizeof(SSBroadcastDropRelAllBuffer) + bytes, int ret = dms_broadcast_opengauss_ddllock(&dms_ctx, (char *)msg, sizeof(SSBroadcastDropRelAllBuffer) + bytes,
(unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS); (unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS, (unsigned char)DROP_BUF_MSG);
if (ret != DMS_SUCCESS) { if (ret != DMS_SUCCESS) {
ereport(DEBUG1, (errmsg("SS broadcast drop rel all buffer msg failed, rnode=[%d/%d/%d/%d]", ereport(DEBUG1, (errmsg("SS broadcast drop rel all buffer msg failed, rnode=[%d/%d/%d/%d]",
rnodes->spcNode, rnodes->dbNode, rnodes->relNode, rnodes->bucketNode))); rnodes->spcNode, rnodes->dbNode, rnodes->relNode, rnodes->bucketNode)));
@ -434,8 +439,8 @@ void SSBCastDropRelRangeBuffer(RelFileNode node, ForkNumber forkNum, BlockNumber
int output_backup = t_thrd.postgres_cxt.whereToSendOutput; int output_backup = t_thrd.postgres_cxt.whereToSendOutput;
t_thrd.postgres_cxt.whereToSendOutput = DestNone; t_thrd.postgres_cxt.whereToSendOutput = DestNone;
int ret = dms_broadcast_msg(&dms_ctx, (char *)msg, sizeof(SSBroadcastDropRelRangeBuffer), int ret = dms_broadcast_opengauss_ddllock(&dms_ctx, (char *)msg, sizeof(SSBroadcastDropRelRangeBuffer),
(unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS); (unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS, (unsigned char)DROP_BUF_MSG);
if (ret != DMS_SUCCESS) { if (ret != DMS_SUCCESS) {
ereport(DEBUG1, (errmsg("SS broadcast drop rel range buffer msg failed, rnode=[%d/%d/%d/%d]," ereport(DEBUG1, (errmsg("SS broadcast drop rel range buffer msg failed, rnode=[%d/%d/%d/%d],"
"firstDelBlock=%u", node.spcNode, node.dbNode, node.relNode, node.bucketNode, firstDelBlock))); "firstDelBlock=%u", node.spcNode, node.dbNode, node.relNode, node.bucketNode, firstDelBlock)));
@ -455,8 +460,8 @@ void SSBCastDropDBAllBuffer(Oid dbid)
int output_backup = t_thrd.postgres_cxt.whereToSendOutput; int output_backup = t_thrd.postgres_cxt.whereToSendOutput;
t_thrd.postgres_cxt.whereToSendOutput = DestNone; t_thrd.postgres_cxt.whereToSendOutput = DestNone;
int ret = dms_broadcast_msg(&dms_ctx, (char *)msg, sizeof(SSBroadcastDropDBAllBuffer), int ret = dms_broadcast_opengauss_ddllock(&dms_ctx, (char *)msg, sizeof(SSBroadcastDropDBAllBuffer),
(unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS); (unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS, (unsigned char)DROP_BUF_MSG);
if (ret != DMS_SUCCESS) { if (ret != DMS_SUCCESS) {
ereport(DEBUG1, (errmsg("SS broadcast drop db all buffer msg failed, db=%d", dbid))); ereport(DEBUG1, (errmsg("SS broadcast drop db all buffer msg failed, db=%d", dbid)));
} }
@ -476,8 +481,8 @@ void SSBCastDropSegSpace(Oid spcNode, Oid dbNode)
int output_backup = t_thrd.postgres_cxt.whereToSendOutput; int output_backup = t_thrd.postgres_cxt.whereToSendOutput;
t_thrd.postgres_cxt.whereToSendOutput = DestNone; t_thrd.postgres_cxt.whereToSendOutput = DestNone;
int ret = dms_broadcast_msg(&dms_ctx, (char *)msg, sizeof(SSBroadcastDropSegSpace), int ret = dms_broadcast_opengauss_ddllock(&dms_ctx, (char *)msg, sizeof(SSBroadcastDropSegSpace),
(unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS); (unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS, (unsigned char)DROP_BUF_MSG);
if (ret != DMS_SUCCESS) { if (ret != DMS_SUCCESS) {
ereport(DEBUG1, (errmsg("SS broadcast drop seg space msg failed, spc=%d, db=%d", spcNode, dbNode))); ereport(DEBUG1, (errmsg("SS broadcast drop seg space msg failed, spc=%d, db=%d", spcNode, dbNode)));
} }
@ -493,7 +498,13 @@ int SSProcessSharedInvalMsg(char *data, uint32 len)
SSBroadcastSI* ssmsg = (SSBroadcastSI *)data; SSBroadcastSI* ssmsg = (SSBroadcastSI *)data;
/* process msg one by one */ /* process msg one by one */
SendSharedInvalidMessages(&(ssmsg->msg), 1); if (EnableGlobalSysCache()) {
SSStandbyGlobalInvalidSharedInvalidMessages(&(ssmsg->msg), ssmsg->tablespaceid);
}
SIInsertDataEntries(&(ssmsg->msg), 1);
if (ENABLE_GPC && g_instance.plan_cache != NULL) {
g_instance.plan_cache->InvalMsg(&(ssmsg->msg), 1);
}
return DMS_SUCCESS; return DMS_SUCCESS;
} }
@ -590,3 +601,122 @@ int SSProcessDropSegSpace(char *data, uint32 len)
return DMS_SUCCESS; return DMS_SUCCESS;
} }
static void SSFlushGlobalByInvalidMsg(int8 id, Oid db_id, uint32 hash_value, bool reset)
{
if (db_id == InvalidOid) {
GlobalSysTabCache *global_systab = g_instance.global_sysdbcache.GetSharedGSCEntry()->m_systabCache;
global_systab->InvalidTuples(id, hash_value, reset);
} else {
GlobalSysDBCacheEntry *entry = g_instance.global_sysdbcache.FindTempGSCEntry(db_id);
if (entry == NULL) {
return;
}
entry->m_systabCache->InvalidTuples(id, hash_value, reset);
g_instance.global_sysdbcache.ReleaseTempGSCEntry(entry);
}
}
static void SSInvalidateGlobalCatalog(const SharedInvalidationMessage* msg)
{
for (int8 cache_id = 0; cache_id < SysCacheSize; cache_id++) {
if (cacheinfo[cache_id].reloid == msg->cat.catId) {
SSFlushGlobalByInvalidMsg(cache_id, msg->cat.dbId, 0, true);
}
}
}
static void SSInvalidateRelCache(const SharedInvalidationMessage* msg)
{
if (msg->rc.dbId == InvalidOid) {
GlobalTabDefCache *global_systab = g_instance.global_sysdbcache.GetSharedGSCEntry()->m_tabdefCache;
global_systab->Invalidate(msg->rc.dbId, msg->rc.relId);
} else {
GlobalSysDBCacheEntry *entry = g_instance.global_sysdbcache.FindTempGSCEntry(msg->rc.dbId);
if (entry == NULL) {
return;
}
entry->m_tabdefCache->Invalidate(msg->rc.dbId, msg->rc.relId);
g_instance.global_sysdbcache.ReleaseTempGSCEntry(entry);
}
}
static void SSInvalidateRelmap(const SharedInvalidationMessage* msg, Oid tsid)
{
/*First reload relmap file, then invalidate global relmap cache */
char* database_path = GetDatabasePath(msg->rm.dbId, tsid);
bool shared = (msg->rm.dbId == InvalidOid) ? true : false;
LWLockAcquire(RelationMappingLock, LW_EXCLUSIVE);
char *unaligned_buf = (char*)palloc0(sizeof(RelMapFile) + ALIGNOF_BUFFER);
RelMapFile* new_relmap = (RelMapFile*)BUFFERALIGN(unaligned_buf);
if (u_sess->proc_cxt.DatabasePath == NULL || strcmp(u_sess->proc_cxt.DatabasePath, database_path)) {
u_sess->proc_cxt.DatabasePath = database_path;
}
load_relmap_file(shared, new_relmap);
if (shared) {
GlobalSysDBCacheEntry *global_db = g_instance.global_sysdbcache.GetSharedGSCEntry();
global_db->m_relmapCache->UpdateBy(new_relmap);
} else {
GlobalSysDBCacheEntry *entry = g_instance.global_sysdbcache.FindTempGSCEntry(msg->rm.dbId);
if (entry == NULL) {
LWLockRelease(RelationMappingLock);
pfree(unaligned_buf);
pfree(database_path);
return;
}
entry->m_relmapCache->UpdateBy(new_relmap);
g_instance.global_sysdbcache.ReleaseTempGSCEntry(entry);
}
LWLockRelease(RelationMappingLock);
pfree(unaligned_buf);
pfree(database_path);
}
static void SSInvalidatePartCache(const SharedInvalidationMessage* msg)
{
GlobalSysDBCacheEntry *entry = g_instance.global_sysdbcache.FindTempGSCEntry(msg->pc.dbId);
if (entry == NULL) {
return;
}
entry->m_partdefCache->Invalidate(msg->pc.dbId, msg->pc.partId);
g_instance.global_sysdbcache.ReleaseTempGSCEntry(entry);
}
static void SSInvalidateCatCache(const SharedInvalidationMessage* msg)
{
SSFlushGlobalByInvalidMsg(msg->cc.id, msg->cc.dbId, msg->cc.hashValue, false);
}
void SSStandbyGlobalInvalidSharedInvalidMessages(const SharedInvalidationMessage* msg, Oid tsid)
{
Assert(EnableGlobalSysCache());
switch (msg->id) {
case SHAREDINVALCATALOG_ID: { /* reset system table */
SSInvalidateGlobalCatalog(msg);
break;
}
case SHAREDINVALRELCACHE_ID: { /* invalid table and call callbackfunc registered on the table */
SSInvalidateRelCache(msg);
break;
}
case SHAREDINVALRELMAP_ID: { /* invalid relmap cxt */
SSInvalidateRelmap(msg, tsid);
break;
}
case SHAREDINVALPARTCACHE_ID: { /* invalid partcache and call callbackfunc registered on the part */
SSInvalidatePartCache(msg);
break;
}
case SHAREDINVALSMGR_ID:
case SHAREDINVALHBKTSMGR_ID:
case SHAREDINVALFUNC_ID: {
break;
}
default:{
if (msg->id >= 0) { /* invalid catcache, most cases are ddls on rel */
SSInvalidateCatCache(msg);
} else {
ereport(FATAL, (errmsg("unrecognized SI message ID: %d", msg->id)));
}
}
}
}

View File

@ -1,2 +1,2 @@
dms_commit_id=1f7a138cbdd57becb8c26a16688d5dfd9d71b029 dms_commit_id=9e5e5c1f2efc071f8ec3ca884ad9331a3ed257ec
dss_commit_id=5444f9b4715bd78c9c6b4757475e07ac6398ed6e dss_commit_id=5444f9b4715bd78c9c6b4757475e07ac6398ed6e

View File

@ -39,6 +39,7 @@
#include "vectorsonic/vsonichash.h" #include "vectorsonic/vsonichash.h"
#include "storage/procarray.h" #include "storage/procarray.h"
#include "ddes/dms/ss_transaction.h" #include "ddes/dms/ss_transaction.h"
#include "ddes/dms/ss_dms_bufmgr.h"
/* /*
* This code manages relations that reside on segment-page storage. It implements functions used for smgr.cpp. * This code manages relations that reside on segment-page storage. It implements functions used for smgr.cpp.
* *
@ -368,7 +369,7 @@ SegPageLocation seg_logic_to_physic_mapping(SMgrRelation reln, SegmentHead *seg_
BlockNumber blocknum; BlockNumber blocknum;
/* Recovery thread should use physical location to read data directly. */ /* Recovery thread should use physical location to read data directly. */
if (RecoveryInProgress() && !CurrentThreadIsWorker()) { if (RecoveryInProgress() && !CurrentThreadIsWorker() && !SS_IN_FLUSHCOPY) {
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is in progress"), ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is in progress"),
errhint("cannot do segment address translation during recovery"))); errhint("cannot do segment address translation during recovery")));
} }
@ -1481,6 +1482,13 @@ void seg_extend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, cha
XLogRecPtr xlog_rec = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_SEGMENT_EXTEND, SegmentBktId); XLogRecPtr xlog_rec = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_SEGMENT_EXTEND, SegmentBktId);
END_CRIT_SECTION(); END_CRIT_SECTION();
#ifdef USE_ASSERT_CHECKING
if (ENABLE_DMS) {
SegNetPageCheckDiskLSN(GetBufferDescriptor(seg_buffer - 1), RBM_NORMAL, NULL);
SmgrNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL);
}
#endif
PageSetLSN(BufferGetPage(seg_buffer), xlog_rec); PageSetLSN(BufferGetPage(seg_buffer), xlog_rec);
PageSetLSN(buffer, xlog_rec); PageSetLSN(buffer, xlog_rec);
} }

View File

@ -230,6 +230,8 @@ typedef struct dms_opengauss_txn_snapshot {
} dms_opengauss_txn_snapshot_t; } dms_opengauss_txn_snapshot_t;
typedef enum dms_opengauss_lock_req_type { typedef enum dms_opengauss_lock_req_type {
SHARED_INVAL_MSG,
DROP_BUF_MSG,
LOCK_NORMAL_MODE, LOCK_NORMAL_MODE,
LOCK_RELEASE_SELF, LOCK_RELEASE_SELF,
LOCK_REACQUIRE, LOCK_REACQUIRE,
@ -772,7 +774,7 @@ typedef struct st_logger_param {
#define DMS_LOCAL_MINOR_VER_WEIGHT 1000 #define DMS_LOCAL_MINOR_VER_WEIGHT 1000
#define DMS_LOCAL_MAJOR_VERSION 0 #define DMS_LOCAL_MAJOR_VERSION 0
#define DMS_LOCAL_MINOR_VERSION 0 #define DMS_LOCAL_MINOR_VERSION 0
#define DMS_LOCAL_VERSION 43 #define DMS_LOCAL_VERSION 44
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -67,6 +67,8 @@
#define SS_IN_REFORM (ENABLE_DMS && g_instance.dms_cxt.SSReformInfo.in_reform == true) #define SS_IN_REFORM (ENABLE_DMS && g_instance.dms_cxt.SSReformInfo.in_reform == true)
#define SS_IN_FLUSHCOPY (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy == true)
#define SS_STANDBY_FAILOVER (((g_instance.dms_cxt.SSClusterState == NODESTATE_NORMAL) \ #define SS_STANDBY_FAILOVER (((g_instance.dms_cxt.SSClusterState == NODESTATE_NORMAL) \
|| (g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_FAILOVER_PROMOTING)) \ || (g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_FAILOVER_PROMOTING)) \
&& (g_instance.dms_cxt.SSReformerControl.primaryInstId != SS_MY_INST_ID) \ && (g_instance.dms_cxt.SSReformerControl.primaryInstId != SS_MY_INST_ID) \

View File

@ -70,5 +70,7 @@ void SSCheckBufferIfNeedMarkDirty(Buffer buf);
void SSRecheckBufferPool(); void SSRecheckBufferPool();
void TransformLockTagToDmsLatch(dms_drlatch_t* dlatch, const LOCKTAG locktag); void TransformLockTagToDmsLatch(dms_drlatch_t* dlatch, const LOCKTAG locktag);
void CheckPageNeedSkipInRecovery(Buffer buf); void CheckPageNeedSkipInRecovery(Buffer buf);
void SmgrNetPageCheckDiskLSN(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk);
void SegNetPageCheckDiskLSN(BufferDesc* buf_desc, ReadBufferMode read_mode, SegSpace *spc);
#endif #endif

View File

@ -44,6 +44,7 @@ typedef struct SSBroadcastXminAck {
typedef struct SSBroadcastSI { typedef struct SSBroadcastSI {
SSBroadcastOp type; // must be first SSBroadcastOp type; // must be first
Oid tablespaceid;
SharedInvalidationMessage msg; SharedInvalidationMessage msg;
} SSBroadcastSI; } SSBroadcastSI;