drc validate

This commit is contained in:
bowenliu
2023-01-05 21:22:36 +08:00
parent c5aa3fa255
commit 0c6d9a88a9
7 changed files with 57 additions and 3 deletions

View File

@ -123,6 +123,7 @@ int ss_dms_func_init()
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_pre_uninit));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_init_logger));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_refresh_logger));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_validate_drc));
g_ss_dms_func.inited = true;
return DMS_SUCCESS;
}
@ -319,4 +320,10 @@ int dms_reform_last_failed(void)
void dms_pre_uninit(void)
{
return g_ss_dms_func.dms_pre_uninit();
}
void dms_validate_drc(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned long long lsn,
unsigned char is_dirty)
{
return g_ss_dms_func.dms_validate_drc(dms_ctx, ctrl, lsn, is_dirty);
}

View File

@ -468,6 +468,20 @@ bool DmsReleaseOwner(BufferTag buf_tag, int buf_id)
return ((dms_release_owner(&dms_ctx, buf_ctrl, &released) == DMS_SUCCESS) && (released != 0));
}
void BufValidateDrc(BufferDesc *buf_desc)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
Assert(buf_ctrl != NULL);
Assert(buf_ctrl->is_edp != 1);
Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo));
dms_context_t dms_ctx;
InitDmsBufContext(&dms_ctx, buf_desc->tag);
unsigned long long lsn = (unsigned long long)BufferGetLSN(buf_desc);
bool is_dirty = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED)) > 0 ? true : false;
dms_validate_drc(&dms_ctx, buf_ctrl, lsn, (unsigned char)is_dirty);
}
int32 CheckBuf4Rebuild(BufferDesc *buf_desc)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
@ -610,7 +624,7 @@ void SSRecheckBufferPool()
* BUF_DIRTY_NEED_FLUSH was removed during mark buffer dirty and lsn_on_disk was set during sync buffer
* As BUF_DIRTY_NEED_FLUSH was set only if page lsn is bigger than ckpt redo, it should be removed at this time
* Unfortunately if it is not, mark it dirty again. For lsn_on_disk, if it is still invalid, this means it is
* not flushed. So if it is not dirty, invalidate it again.
* not flushed.
*/
BufferDesc *buf_desc = GetBufferDescriptor(i);
pg_memory_barrier();

View File

@ -1031,6 +1031,31 @@ static int32 CBDrcBufRebuild(void *db_handle)
return GS_SUCCESS;
}
static int32 CBDrcBufValidate(void *db_handle)
{
/* Load Control File */
int src_id = SSGetPrimaryInstId();
SSReadControlFile(src_id, true);
int buf_cnt = 0;
uint32 buf_state;
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform]CBDrcBufValidate starts before reform done.")));
for (int i = 0; i < TOTAL_BUFFER_NUM; i++) {
BufferDesc *buf_desc = GetBufferDescriptor(i);
buf_state = LockBufHdr(buf_desc);
if ((buf_state & BM_VALID) || (buf_state & BM_TAG_VALID)) {
BufValidateDrc(buf_desc);
buf_cnt++;
}
UnlockBufHdr(buf_desc, buf_state);
}
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform]CBDrcBufValidate %d buffers success.", buf_cnt)));
return GS_SUCCESS;
}
// used for find bufferdesc in dms
static void SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_desc)
{
@ -1552,4 +1577,5 @@ void DmsInitCallback(dms_callback_t *callback)
callback->db_is_primary = CBDbIsPrimary;
callback->reform_done_notify = CBReformDoneNotify;
callback->log_wait_flush = CBXLogWaitFlush;
callback->drc_validate = CBDrcBufValidate;
}

View File

@ -1,2 +1,2 @@
dms_commit_id=9e5e5c1f2efc071f8ec3ca884ad9331a3ed257ec
dms_commit_id=d5674265647286594922e5033142cfee9cbfdcfb
dss_commit_id=5444f9b4715bd78c9c6b4757475e07ac6398ed6e

View File

@ -595,6 +595,7 @@ typedef int (*dms_reform_done_notify)(void *db_handle);
typedef int (*dms_log_wait_flush)(void *db_handle, unsigned long long lsn);
typedef int (*dms_wait_ckpt)(void *db_handle);
typedef void (*dms_verify_page)(dms_buf_ctrl_t *buf_ctrl, char *new_page);
typedef int (*dms_drc_validate)(void *db_handle);
typedef struct st_dms_callback {
// used in reform
@ -712,6 +713,8 @@ typedef struct st_dms_callback {
dms_reform_done_notify reform_done_notify;
dms_log_wait_flush log_wait_flush;
dms_wait_ckpt wait_ckpt;
dms_drc_validate drc_validate;
} dms_callback_t;
typedef struct st_dms_instance_net_addr {
@ -779,7 +782,7 @@ typedef struct st_logger_param {
#define DMS_LOCAL_MINOR_VER_WEIGHT 1000
#define DMS_LOCAL_MAJOR_VERSION 0
#define DMS_LOCAL_MINOR_VERSION 0
#define DMS_LOCAL_VERSION 44
#define DMS_LOCAL_VERSION 45
#ifdef __cplusplus
}

View File

@ -76,6 +76,8 @@ typedef struct st_ss_dms_func {
void (*dms_pre_uninit)(void);
int (*dms_init_logger)(logger_param_t *log_param);
void (*dms_refresh_logger)(char *log_field, unsigned long long *value);
void (*dms_validate_drc)(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned long long lsn,
unsigned char is_dirty);
} ss_dms_func_t;
int ss_dms_func_init();
@ -116,6 +118,7 @@ bool dms_latch_timed_x(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned i
bool dms_latch_timed_s(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned int wait_ticks, unsigned char is_force);
void dms_unlatch(dms_context_t *dms_ctx, dms_drlatch_t *dlatch);
void dms_pre_uninit(void);
void dms_validate_drc(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned long long lsn, unsigned char is_dirty);
#ifdef __cplusplus
}
#endif

View File

@ -73,5 +73,6 @@ void CheckPageNeedSkipInRecovery(Buffer buf);
void SmgrNetPageCheckDiskLSN(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk);
void SegNetPageCheckDiskLSN(BufferDesc* buf_desc, ReadBufferMode read_mode, SegSpace *spc);
unsigned int DMSGetProcType4RequestPage();
void BufValidateDrc(BufferDesc *buf_desc);
#endif