!3323 1. 适配DMS DRC允许不一致;2. 适配DRC按需回收能力

Merge pull request !3323 from 刘博文/3dev
This commit is contained in:
opengauss-bot
2023-05-18 11:42:43 +00:00
committed by Gitee
10 changed files with 53 additions and 159 deletions

View File

@ -109,7 +109,6 @@ int ss_dms_func_init()
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_buf_res_rebuild_drc_parallel));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_is_recovery_session));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(drc_get_page_master_id));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_release_page_batch));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_register_ssl_decrypt_pwd));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_set_ssl_param));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_get_ssl_param));
@ -285,11 +284,6 @@ int drc_get_page_master_id(char pageid[DMS_PAGEID_SIZE], unsigned char *master_i
return g_ss_dms_func.drc_get_page_master_id(pageid, master_id);
}
int dms_release_page_batch(dms_context_t *dms_ctx, dcs_batch_buf_t *owner_map, unsigned int *owner_count)
{
return g_ss_dms_func.dms_release_page_batch(dms_ctx, owner_map, owner_count);
}
int dms_register_ssl_decrypt_pwd(dms_decrypt_pwd_t cb_func)
{
return g_ss_dms_func.dms_register_ssl_decrypt_pwd(cb_func);

View File

@ -153,44 +153,13 @@ void MarkReadHint(int buf_id, char persistence, bool extend, const XLogPhyBlock
void ClearReadHint(int buf_id, bool buf_deleted)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id);
if (buf_ctrl->state & BUF_BEING_RELEASED) {
BufferDesc *buf_desc = GetBufferDescriptor(buf_id);
RelFileNode rnode = buf_desc->tag.rnode;
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as NOT being"
" released during release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode,
rnode.bucketNode, rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
buf_desc->buf_id, BUF_BEING_RELEASED)));
}
buf_ctrl->state &=
~(BUF_NEED_LOAD | BUF_IS_LOADED | BUF_LOAD_FAILED | BUF_NEED_TRANSFER | BUF_IS_EXTEND |
BUF_DIRTY_NEED_FLUSH | BUF_BEING_RELEASED);
~(BUF_NEED_LOAD | BUF_IS_LOADED | BUF_LOAD_FAILED | BUF_NEED_TRANSFER | BUF_IS_EXTEND | BUF_DIRTY_NEED_FLUSH);
if (buf_deleted) {
buf_ctrl->state = 0;
}
}
/* this function should be called inside header lock */
void MarkDmsBufBeingReleased(BufferDesc *buf_desc, bool set)
{
RelFileNode rnode = buf_desc->tag.rnode;
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
unsigned int old_val = buf_ctrl->state & BUF_BEING_RELEASED;
if (set) {
buf_ctrl->state |= BUF_BEING_RELEASED;
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as being released during"
" release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode,
rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum, buf_desc->buf_id, old_val)));
} else if (!set && old_val) {
buf_ctrl->state &= ~BUF_BEING_RELEASED;
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as NOT being"
" released during release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode,
rnode.bucketNode, rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
buf_desc->buf_id, old_val)));
}
}
/*
* true: the page is transferred successfully by dms,
* false: the page request is rejected or error, if hold the content_lock,
@ -200,14 +169,6 @@ bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
dms_lock_mode_t req_mode = (mode == LW_SHARED) ? DMS_LOCK_SHARE : DMS_LOCK_EXCLUSIVE;
RelFileNode rnode = buf_desc->tag.rnode;
if (buf_ctrl->state & BUF_BEING_RELEASED) {
ereport(WARNING, (errmsg("[%d/%d/%d/%d/%d %d-%d] buffer is being released in dms_release_owner",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
buf_desc->tag.forkNum, buf_desc->tag.blockNum)));
return false;
}
dms_context_t dms_ctx;
InitDmsBufContext(&dms_ctx, buf_desc->tag);
@ -311,6 +272,9 @@ Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const X
ereport(PANIC, (errmsg("extend page should not be tranferred from DMS, "
"and needs to be loaded from disk!")));
}
if (buf_ctrl->been_loaded == false) {
ereport(PANIC, (errmsg("ctrl not marked loaded before transferring from remote")));
}
#endif
Block bufBlock = BufHdrGetBlock(buf_desc);
@ -327,6 +291,9 @@ Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const X
CalcSegDmsPhysicalLoc(buf_desc, buffer, !g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy);
}
}
if (BufferIsValid(buffer)) {
buf_ctrl->been_loaded = true;
}
if ((read_mode == RBM_ZERO_AND_LOCK || read_mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
!LWLockHeldByMe(buf_desc->content_lock)) {
@ -433,6 +400,9 @@ Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegS
SegTerminateBufferIO(buf_desc, false, BM_VALID);
buffer = BufferDescriptorGetBuffer(buf_desc);
}
if (!BufferIsInvalid(buffer)) {
buf_ctrl->been_loaded = true;
}
if ((read_mode == RBM_ZERO_AND_LOCK || read_mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
!LWLockHeldByMe(buf_desc->content_lock)) {
@ -508,15 +478,17 @@ Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, boo
return TerminateReadPage(buf_desc, read_mode, OidIsValid(buf_ctrl->pblk_relno) ? &pblk : NULL);
}
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id, unsigned char* released)
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id);
if (buf_ctrl->state & BUF_IS_RELPERSISTENT_TEMP) {
return true;
}
unsigned char released = 0;
dms_context_t dms_ctx;
InitDmsBufContext(&dms_ctx, buf_tag);
return (dms_release_owner(&dms_ctx, buf_ctrl, released) == DMS_SUCCESS);
return ((dms_release_owner(&dms_ctx, buf_ctrl, &released) == DMS_SUCCESS) && (released != 0));
}
void BufValidateDrc(BufferDesc *buf_desc)

View File

@ -546,6 +546,15 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
(void)LWLockAcquire(buf_desc->content_lock, content_mode);
*buf_ctrl = GetDmsBufCtrl(buf_id);
Assert(buf_id >= 0);
if ((*buf_ctrl)->been_loaded == false) {
*buf_ctrl = NULL;
DmsReleaseBuffer(buf_desc->buf_id + 1, is_seg);
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[%u/%u/%u/%d %d-%u] been_loaded marked false, page swapped out and failed to load",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum)));
break;
}
if ((*buf_ctrl)->lock_mode == DMS_LOCK_NULL) {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[%u/%u/%u/%d %d-%u] lock mode is null, still need to transfer page",
@ -609,7 +618,7 @@ static char* CBGetPage(dms_buf_ctrl_t *buf_ctrl)
return (char *)BufHdrGetBlock(buf_desc);
}
static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned int ver)
static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned char invld_owner)
{
int buf_id = -1;
BufferTag* tag = (BufferTag *)pageid;
@ -652,16 +661,19 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
break;
}
(void)LWLockAcquire(buf_desc->content_lock, LW_EXCLUSIVE);
buf_ctrl = GetDmsBufCtrl(buf_id);
if (ver == buf_ctrl->ver) {
bool can_invld_owner = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED | BM_PERMANENT)) > 0 ? false : true;
if (!invld_owner || (invld_owner && can_invld_owner)) {
(void)LWLockAcquire(buf_desc->content_lock, LW_EXCLUSIVE);
buf_ctrl = GetDmsBufCtrl(buf_id);
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
} else {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[CBInvalidatePage] invalid ver:%u, buf_ctrl ver:%u", ver, buf_ctrl->ver)));
LWLockRelease(buf_desc->content_lock);
} else { /* invalidate owner which buffer is dirty/permanent */
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("[%d/%d/%d/%d %d-%d] invalidate owner rejected, buffer is dirty/permanent, state = 0x%x",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_desc->state)));
ret = DMS_ERROR;
}
LWLockRelease(buf_desc->content_lock);
if (IsSegmentBufferID(buf_id)) {
SegReleaseBuffer(buf_id + 1);
@ -1286,7 +1298,7 @@ static int CBConfirmOwner(void *db_handle, char *pageid, unsigned char *lock_mod
}
static int CBConfirmConverting(void *db_handle, char *pageid, unsigned char smon_chk,
unsigned char *lock_mode, unsigned long long *edp_map, unsigned long long *lsn, unsigned int *ver)
unsigned char *lock_mode, unsigned long long *edp_map, unsigned long long *lsn)
{
BufferDesc *buf_desc = NULL;
bool valid;
@ -1317,7 +1329,6 @@ static int CBConfirmConverting(void *db_handle, char *pageid, unsigned char smon
bool is_locked = LWLockConditionalAcquire(buf_desc->io_in_progress_lock, LW_EXCLUSIVE);
if (is_locked) {
buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
*ver = buf_ctrl->ver;
*lock_mode = buf_ctrl->lock_mode;
LWLockRelease(buf_desc->io_in_progress_lock);
break;
@ -1344,7 +1355,6 @@ static int CBConfirmConverting(void *db_handle, char *pageid, unsigned char smon
// without lock
buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
*ver = buf_ctrl->ver;
*lock_mode = buf_ctrl->lock_mode;
SSUnPinBuffer(buf_desc);
@ -1733,7 +1743,7 @@ void DmsInitCallback(dms_callback_t *callback)
callback->get_page = CBGetPage;
callback->set_buf_load_status = CBSetBufLoadStatus;
callback->remove_buf_load_status = CBRemoveBufLoadStatus;
callback->invld_share_copy = CBInvalidatePage;
callback->invalidate_page = CBInvalidatePage;
callback->get_db_handle = CBGetHandle;
callback->display_pageid = CBDisplayBufferTag;
callback->verify_page = CBVerifyPage;

View File

@ -1,2 +1,2 @@
dms_commit_id=384f13a3f0d080c85259b2c3a47def7d3c18c0a6
dms_commit_id=a114a1ad517272a4dceb2b579e90089b6cd62492
dss_commit_id=2db80e7f65b63c8412f97086cbd64340505075aa

View File

@ -711,26 +711,15 @@ static volatile BufferDesc *PageListBufferAlloc(SMgrRelation smgr, char relpersi
*
* Need to lock the buffer header to change its tag.
*/
retry_victim:
buf_state = LockBufHdr(buf);
/* Everything is fine, the buffer is ours, so break */
old_flags = buf_state & BUF_FLAG_MASK;
if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(old_flags & BM_DIRTY) && !(old_flags & BM_IS_META)) {
if (ENABLE_DMS && (old_flags & BM_TAG_VALID)) {
unsigned char released = 0;
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
if (returned && released) {
if (DmsReleaseOwner(buf->tag, buf->buf_id)) {
ClearReadHint(buf->buf_id, true);
break;
} else if (!returned) {
MarkDmsBufBeingReleased(buf, true);
UnlockBufHdr(buf, buf_state);
pg_usleep(1000L);
goto retry_victim;
} else { /* if returned and !released, we will have to try another victim */
MarkDmsBufBeingReleased(buf, false);
}
} else {
break;
@ -745,9 +734,6 @@ retry_victim:
* we must undo everything we've done and start
* over with a new victim buffer.
*/
if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */
MarkDmsBufBeingReleased(buf, false);
}
UnlockBufHdr(buf, buf_state);
BufTableDelete(&new_tag, new_hash);
if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) {
@ -2480,23 +2466,11 @@ found_branch:
* 1. previous attempts to read the buffer must have failed,
* but DRC has been created, so load page directly again
* 2. maybe we have failed previous, and try again in this loop
* 3. if previous read attempt has failed but concurrently getting
* released, a contradiction happens and we panic
*/
if (buf_ctrl->state & BUF_BEING_RELEASED) {
RelFileNode rnode = bufHdr->tag.rnode;
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] previous read"
" attempt has failed but concurrently trying to release owner, contradiction",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
bufHdr->tag.forkNum, bufHdr->tag.blockNum)));
pg_usleep(5000L);
continue;
} else {
buf_ctrl->state |= BUF_NEED_LOAD;
}
buf_ctrl->state |= BUF_NEED_LOAD;
}
break;
} while (true);
}while (true);
return TerminateReadPage(bufHdr, mode, pblk);
}
@ -2971,7 +2945,6 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe
/*
* Need to lock the buffer header too in order to change its tag.
*/
retry_victim:
buf_state = LockBufHdr(buf);
/*
* Somebody could have pinned or re-dirtied the buffer while we were
@ -2989,34 +2962,15 @@ retry_victim:
* release owner procedure is in buf header lock, it's not reasonable,
* need to improve.
*/
unsigned char released = 0;
RelFileNode rnode = buf->tag.rnode;
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
int retry_times = 0;
if (returned && released) {
if (DmsReleaseOwner(old_tag, buf->buf_id)) {
ClearReadHint(buf->buf_id, true);
break;
} else if (!returned) {
MarkDmsBufBeingReleased(buf, true);
UnlockBufHdr(buf, buf_state);
pg_usleep(1000L);
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d retry release owner for %d times",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
buf->tag.forkNum, buf->tag.blockNum, buf->buf_id, ++retry_times)));
goto retry_victim;
} else { /* if returned and !released, we will have to try another victim */
MarkDmsBufBeingReleased(buf, false);
}
} else {
break;
}
}
if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */
MarkDmsBufBeingReleased(buf, false);
}
UnlockBufHdr(buf, buf_state);
BufTableDelete(&new_tag, new_hash);
if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) {
@ -3057,6 +3011,7 @@ retry_victim:
if (ENABLE_DMS) {
GetDmsBufCtrl(buf->buf_id)->lock_mode = DMS_LOCK_NULL;
GetDmsBufCtrl(buf->buf_id)->been_loaded = false;
}
if (old_flags & BM_TAG_VALID) {
@ -3173,9 +3128,7 @@ retry:
}
if (ENABLE_DMS && (buf_state & BM_TAG_VALID)) {
unsigned char released = 0;
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
if (!(returned && released)) {
if (!DmsReleaseOwner(buf->tag, buf->buf_id)) {
UnlockBufHdr(buf, buf_state);
LWLockRelease(old_partition_lock);
pg_usleep(5000);

View File

@ -629,21 +629,9 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc
} else {
/*
* previous attempts to read the buffer must have failed,
* but DRC has been created, so load page directly again;
* but if previous read attempt has failed but concurrently
* getting released, a contradiction happens and we panic.
* but DRC has been created, so load page directly again
*/
Assert(pg_atomic_read_u32(&bufHdr->state) & BM_IO_ERROR);
if (buf_ctrl->state & BUF_BEING_RELEASED) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg(
"[%d/%d/%d/%d/%d %d-%d] buffer:%d is in the process of release owner",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
bufHdr->tag.forkNum, bufHdr->tag.blockNum, bufHdr->buf_id)));
pg_usleep(5000L);
continue;
}
buf_ctrl->state |= BUF_NEED_LOAD;
}
@ -796,7 +784,6 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
return FoundBufferInHashTable(buf_id, new_partition_lock, foundPtr);
}
retry_victim:
buf_state = LockBufHdr(buf);
old_flags = buf_state & BUF_FLAG_MASK;
@ -807,33 +794,14 @@ retry_victim:
* release owner procedure is in buf header lock, it's not reasonable,
* need to improve.
*/
unsigned char released = 0;
RelFileNode rnode = buf->tag.rnode;
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
int retry_times = 0;
if (returned && released) {
if (DmsReleaseOwner(old_tag, buf->buf_id)) {
ClearReadHint(buf->buf_id, true);
break;
} else if (!returned) {
MarkDmsBufBeingReleased(buf, true);
UnlockBufHdr(buf, buf_state);
pg_usleep(1000L);
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d retry release owner "
"for %d times", rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
buf->tag.forkNum, buf->tag.blockNum, buf->buf_id, ++retry_times)));
goto retry_victim;
} else { /* if returned and !released, we will have to try another victim */
MarkDmsBufBeingReleased(buf, false);
}
} else {
break;
}
}
if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */
MarkDmsBufBeingReleased(buf, false);
}
UnlockBufHdr(buf, buf_state);
BufTableDelete(&new_tag, new_hash);
if (old_flag_valid && old_partition_lock != new_partition_lock) {
@ -851,6 +819,7 @@ retry_victim:
if (ENABLE_DMS) {
GetDmsBufCtrl(buf->buf_id)->lock_mode = DMS_LOCK_NULL;
GetDmsBufCtrl(buf->buf_id)->been_loaded = false;
}
if (old_flag_valid) {

View File

@ -79,6 +79,7 @@ typedef enum en_dms_dr_type {
DMS_DR_TYPE_UNDO = 22,
DMS_DR_TYPE_PROC = 23,
DMS_DR_TYPE_GDV = 24,
DMS_DR_TYPE_SEQVAL = 25,
DMS_DR_TYPE_MAX,
} dms_dr_type_t;
@ -273,10 +274,10 @@ typedef struct st_dms_buf_ctrl {
volatile unsigned char is_edp;
volatile unsigned char force_request; // force to request page from remote
volatile unsigned char need_flush; // for recovery, owner is abort, copy instance should flush before release
volatile unsigned char been_loaded; // first alloc ctrl:FALSE, after successfully loaded: TRUE
unsigned long long edp_scn; // set when become edp, lastest scn when page becomes edp
unsigned long long edp_map; // records edp instance
long long last_ckpt_time; // last time when local edp page is added to group.
unsigned int ver;
#ifdef OPENGAUSS
int buf_id;
unsigned int state;
@ -488,7 +489,7 @@ typedef int(*dms_save_list_stable)(void *db_handle, unsigned long long list_stab
typedef int(*dms_get_dms_status)(void *db_handle);
typedef void(*dms_set_dms_status)(void *db_handle, int status);
typedef int(*dms_confirm_converting)(void *db_handle, char *pageid, unsigned char smon_chk,
unsigned char *lock_mode, unsigned long long *edp_map, unsigned long long *lsn, unsigned int *ver);
unsigned char *lock_mode, unsigned long long *edp_map, unsigned long long *lsn);
typedef int(*dms_confirm_owner)(void *db_handle, char *pageid, unsigned char *lock_mode, unsigned char *is_edp,
unsigned long long *lsn);
typedef int(*dms_flush_copy)(void *db_handle, char *pageid);
@ -526,7 +527,7 @@ typedef unsigned char(*dms_page_is_dirty)(dms_buf_ctrl_t *buf_ctrl);
typedef void(*dms_leave_local_page)(void *db_handle, dms_buf_ctrl_t *buf_ctrl);
typedef void(*dms_get_pageid)(dms_buf_ctrl_t *buf_ctrl, char **pageid, unsigned int *size);
typedef char *(*dms_get_page)(dms_buf_ctrl_t *buf_ctrl);
typedef int (*dms_invalidate_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned int ver);
typedef int (*dms_invalidate_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned char invld_owner);
typedef void *(*dms_get_db_handle)(unsigned int *db_handle_index);
typedef void (*dms_release_db_handle)(void *db_handle);
typedef void *(*dms_stack_push_cr_cursor)(void *db_handle);
@ -675,7 +676,7 @@ typedef struct st_dms_callback {
dms_leave_local_page leave_local_page;
dms_get_pageid get_pageid;
dms_get_page get_page;
dms_invalidate_page invld_share_copy;
dms_invalidate_page invalidate_page;
dms_get_db_handle get_db_handle;
dms_release_db_handle release_db_handle;
dms_stack_push_cr_cursor stack_push_cr_cursor;
@ -818,7 +819,7 @@ typedef struct st_logger_param {
#define DMS_LOCAL_MINOR_VER_WEIGHT 1000
#define DMS_LOCAL_MAJOR_VERSION 0
#define DMS_LOCAL_MINOR_VERSION 0
#define DMS_LOCAL_VERSION 59
#define DMS_LOCAL_VERSION 60
#ifdef __cplusplus
}

View File

@ -127,8 +127,6 @@
#define BUF_READ_MODE_ZERO_LOCK 0x80
#define BUF_DIRTY_NEED_FLUSH 0x100
#define BUF_ERTO_NEED_MARK_DIRTY 0x200
/* mark buffer whether is being released in DMS DRC */
#define BUF_BEING_RELEASED 0x400
#define SS_BROADCAST_FAILED_RETRYCOUNTS 4
#define SS_BROADCAST_WAIT_INFINITE (0xFFFFFFFF)

View File

@ -61,7 +61,6 @@ typedef struct st_ss_dms_func {
unsigned char for_rebuild);
int (*dms_is_recovery_session)(unsigned int sid);
int (*drc_get_page_master_id)(char pageid[DMS_PAGEID_SIZE], unsigned char *master_id);
int (*dms_release_page_batch)(dms_context_t *dms_ctx, dcs_batch_buf_t *owner_map, unsigned int *owner_count);
int (*dms_register_ssl_decrypt_pwd)(dms_decrypt_pwd_t cb_func);
int (*dms_set_ssl_param)(const char *param_name, const char *param_value);
int (*dms_get_ssl_param)(const char *param_name, char *param_value, unsigned int size);
@ -109,7 +108,6 @@ int dms_buf_res_rebuild_drc_parallel(dms_context_t *dms_ctx, dms_ctrl_info_t *ct
unsigned char for_rebuild);
int dms_is_recovery_session(unsigned int sid);
int drc_get_page_master_id(char pageid[DMS_PAGEID_SIZE], unsigned char *master_id);
int dms_release_page_batch(dms_context_t *dms_ctx, dcs_batch_buf_t *owner_map, unsigned int *owner_count);
int dms_register_ssl_decrypt_pwd(dms_decrypt_pwd_t cb_func);
int dms_set_ssl_param(const char *param_name, const char *param_value);
int dms_get_ssl_param(const char *param_name, char *param_value, unsigned int size);

View File

@ -55,12 +55,11 @@ void MarkReadHint(int buf_id, char persistence, bool extend, const XLogPhyBlock
bool LockModeCompatible(dms_buf_ctrl_t *buf_ctrl, LWLockMode mode);
bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode);
void ClearReadHint(int buf_id, bool buf_deleted = false);
void MarkDmsBufBeingReleased(BufferDesc *buf_desc, bool set);
Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk);
Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc = NULL);
Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io);
Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io);
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id, unsigned char* released);
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id);
int SSLockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait,
dms_opengauss_lock_req_type_t reqType = LOCK_NORMAL_MODE);
int SSLockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock);