!3656 【资源池化】增加获取page是否在缓存中的接口
Merge pull request !3656 from 阙鸣健/ndp_bugfix
This commit is contained in:
@ -34,7 +34,6 @@ const uint32 NDP_LOCAL_VERSION_NUM = 4;
|
||||
#define BITMAP_SIZE_PER_AU_U64 (BITMAP_SIZE_PER_AU_BYTE / sizeof(uint64))
|
||||
|
||||
#define NDPGETBYTE(x, i) (*((char*)(x) + (int)((i) / BITS_PER_BYTE)))
|
||||
#define NDPGETBITBYTE(x, i) ((((char)(x)) >> (i)) & 0x01)
|
||||
#define NDPCLRBIT(x, i) NDPGETBYTE(x, i) &= ~(0x01 << ((i) % BITS_PER_BYTE))
|
||||
#define NDPSETBIT(x, i) NDPGETBYTE(x, i) |= (0x01 << ((i) % BITS_PER_BYTE))
|
||||
#define NDPGETBIT(x, i) ((NDPGETBYTE(x, i) >> ((i) % BITS_PER_BYTE)) & 0x01)
|
||||
|
||||
@ -166,24 +166,6 @@ void pm_get_pageinfo(NdpScanDesc ndpScan, BlockNumber page, CephObject *object,
|
||||
}
|
||||
}
|
||||
|
||||
bool IsPageHitDms(RelFileNode& node, BlockNumber page)
|
||||
{
|
||||
int bufId = 0;
|
||||
BufferTag newTag;
|
||||
|
||||
INIT_BUFFERTAG(newTag, node, MAIN_FORKNUM, page);
|
||||
uint32 new_hash = BufTableHashCode(&newTag);
|
||||
LWLock *new_partition_lock = BufMappingPartitionLock(new_hash);
|
||||
/* see if the block is in the buffer pool already */
|
||||
(void)LWLockAcquire(new_partition_lock, LW_SHARED);
|
||||
bufId = BufTableLookup(&newTag, new_hash);
|
||||
LWLockRelease(new_partition_lock);
|
||||
if (bufId != -1) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void CopyCLog(int64 pageno, char *pageBuffer)
|
||||
{
|
||||
int slotno;
|
||||
@ -229,17 +211,16 @@ int NdpIoSlot::SetReq(RelFileNode& node, uint16 taskId, uint16 tableId, AuInfo&
|
||||
errno_t rc = memset_s(req.pageMap, BITMAP_SIZE_PER_AU_BYTE, 0, BITMAP_SIZE_PER_AU_BYTE);
|
||||
securec_check(rc, "", "");
|
||||
|
||||
if (SS_STANDBY_MODE) {
|
||||
// SSIsPageHitDms(node, startBlockNum, auinfo.pageNum, req.pageMap, &bitCount);
|
||||
} else {
|
||||
for (uint32 i = startBlockNum, offset = 0; i != startBlockNum + auinfo.pageNum; ++i, ++offset) {
|
||||
bool cached = IsPageHitDms(node, i);
|
||||
if (!cached) {
|
||||
NDPSETBIT(req.pageMap, offset);
|
||||
++bitCount;
|
||||
}
|
||||
for (uint32 i = startBlockNum, offset = 0; i != startBlockNum + auinfo.pageNum; ++i, ++offset) {
|
||||
bool cached = IsPageHitBufferPool(node, MAIN_FORKNUM, i);
|
||||
if (!cached) {
|
||||
NDPSETBIT(req.pageMap, offset);
|
||||
++bitCount;
|
||||
}
|
||||
}
|
||||
if (SS_STANDBY_MODE) {
|
||||
SSIsPageHitDms(node, startBlockNum, auinfo.pageNum, req.pageMap, &bitCount);
|
||||
}
|
||||
return bitCount;
|
||||
}
|
||||
|
||||
|
||||
@ -102,6 +102,7 @@ int ss_dms_func_init()
|
||||
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_request_opengauss_txn_status));
|
||||
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_request_opengauss_txn_snapshot));
|
||||
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_request_opengauss_txn_of_master));
|
||||
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_request_opengauss_page_status));
|
||||
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_register_thread_init));
|
||||
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_release_owner));
|
||||
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_wait_reform));
|
||||
@ -227,6 +228,12 @@ int dms_request_opengauss_txn_of_master(dms_context_t *dms_ctx, dms_opengauss_tx
|
||||
return g_ss_dms_func.dms_request_opengauss_txn_of_master(dms_ctx, dms_txn_swinfo);
|
||||
}
|
||||
|
||||
int dms_request_opengauss_page_status(dms_context_t *dms_ctx, unsigned int page, int page_num,
|
||||
unsigned long int *page_map, int *bit_count)
|
||||
{
|
||||
return g_ss_dms_func.dms_request_opengauss_page_status(dms_ctx, page, page_num, page_map, bit_count);
|
||||
}
|
||||
|
||||
int dms_broadcast_opengauss_ddllock(dms_context_t *dms_ctx, char *data, unsigned int len, unsigned char handle_recv_msg,
|
||||
unsigned int timeout, unsigned char resend_after_reform)
|
||||
{
|
||||
|
||||
@ -277,6 +277,25 @@ static int CBGetTxnStatus(void *db_handle, unsigned long long xid, unsigned char
|
||||
return DMS_SUCCESS;
|
||||
}
|
||||
|
||||
#define NDPGETBYTE(x, i) (*((char*)(x) + (int)((i) / BITS_PER_BYTE)))
|
||||
#define NDPCLRBIT(x, i) NDPGETBYTE(x, i) &= ~(0x01 << ((i) % BITS_PER_BYTE))
|
||||
#define NDPGETBIT(x, i) ((NDPGETBYTE(x, i) >> ((i) % BITS_PER_BYTE)) & 0x01)
|
||||
|
||||
static int CBGetPageStatus(void *db_handle, dms_opengauss_relfilenode_t *rnode, unsigned int page,
|
||||
int pagesNum, dms_opengauss_page_status_result_t *page_result)
|
||||
{
|
||||
for (uint32 i = page, offset = 0; i != page + pagesNum; ++i, ++offset) {
|
||||
if (NDPGETBIT(page_result->page_map, offset)) {
|
||||
bool cached = IsPageHitBufferPool(*(RelFileNode * )(rnode), MAIN_FORKNUM, i);
|
||||
if (cached) {
|
||||
NDPCLRBIT(page_result->page_map, offset);
|
||||
--page_result->bit_count;
|
||||
}
|
||||
}
|
||||
}
|
||||
return DMS_SUCCESS;
|
||||
}
|
||||
|
||||
static int CBGetCurrModeAndLockBuffer(void *db_handle, int buffer, unsigned char lock_mode,
|
||||
unsigned char *curr_mode)
|
||||
{
|
||||
@ -1962,6 +1981,7 @@ void DmsInitCallback(dms_callback_t *callback)
|
||||
callback->opengauss_lock_buffer = CBGetCurrModeAndLockBuffer;
|
||||
callback->get_opengauss_txn_snapshot = CBGetSnapshotData;
|
||||
callback->get_opengauss_txn_of_master = CBGetTxnSwinfo;
|
||||
callback->get_opengauss_page_status = CBGetPageStatus;
|
||||
|
||||
callback->log_output = NULL;
|
||||
|
||||
|
||||
@ -317,6 +317,22 @@ bool SSGetOldestXminFromAllStandby()
|
||||
return true;
|
||||
}
|
||||
|
||||
void SSIsPageHitDms(RelFileNode& node, BlockNumber page, int pagesNum, uint64 *pageMap, int *bitCount)
|
||||
{
|
||||
dms_context_t dms_ctx;
|
||||
InitDmsContext(&dms_ctx);
|
||||
|
||||
dms_ctx.rfn.inst_id = (unsigned char)SS_PRIMARY_ID;
|
||||
dms_ctx.rfn.rnode = *(dms_opengauss_relfilenode_t *)(&node);
|
||||
|
||||
if (dms_request_opengauss_page_status(&dms_ctx, page, pagesNum, pageMap, bitCount) != DMS_SUCCESS) {
|
||||
*bitCount = 0;
|
||||
ereport(DEBUG1, (errmsg("SS get page map failed, buffer_id = %u.", page)));
|
||||
return;
|
||||
}
|
||||
ereport(DEBUG1, (errmsg("SS get page map success, buffer_id = %u.", page)));
|
||||
}
|
||||
|
||||
int SSCheckDbBackends(char *data, uint32 len, char *output_msg, uint32 *output_msg_len)
|
||||
{
|
||||
if (unlikely(len != sizeof(SSBroadcastDbBackends))) {
|
||||
|
||||
@ -1,2 +1,2 @@
|
||||
dms_commit_id=ede9efeda8ed0ab2ad79f119b619c8a063023683
|
||||
dss_commit_id=97deeec25d7ec4ee0c92b5bc0bcc50eaa3ae64af
|
||||
dms_commit_id=a9d399af041ae5491149fdb7c58e5115da3065b2
|
||||
dss_commit_id=97deeec25d7ec4ee0c92b5bc0bcc50eaa3ae64af
|
||||
|
||||
@ -7332,3 +7332,21 @@ void LockTwoLWLock(LWLock *new_partition_lock, LWLock *old_partition_lock)
|
||||
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
|
||||
}
|
||||
}
|
||||
|
||||
bool IsPageHitBufferPool(RelFileNode& node, ForkNumber forkNum, BlockNumber blockNum)
|
||||
{
|
||||
int bufId = 0;
|
||||
BufferTag newTag;
|
||||
|
||||
INIT_BUFFERTAG(newTag, node, forkNum, blockNum);
|
||||
uint32 new_hash = BufTableHashCode(&newTag);
|
||||
LWLock *new_partition_lock = BufMappingPartitionLock(new_hash);
|
||||
/* see if the block is in the buffer pool already */
|
||||
(void)LWLockAcquire(new_partition_lock, LW_SHARED);
|
||||
bufId = BufTableLookup(&newTag, new_hash);
|
||||
LWLockRelease(new_partition_lock);
|
||||
if (bufId != -1) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -182,6 +182,19 @@ typedef struct st_dms_xid_ctx {
|
||||
unsigned long long scn;
|
||||
} dms_xid_ctx_t;
|
||||
|
||||
typedef struct dms_opengauss_relfilenode {
|
||||
unsigned int spcNode;
|
||||
unsigned int dbNode;
|
||||
unsigned int relNode;
|
||||
signed short bucketNode;
|
||||
unsigned short opt;
|
||||
} dms_opengauss_relfilenode_t;
|
||||
|
||||
typedef struct st_dms_rfn {
|
||||
dms_opengauss_relfilenode_t rnode;
|
||||
unsigned char inst_id;
|
||||
} dms_rfn_t;
|
||||
|
||||
typedef struct st_dms_xmap_ctx {
|
||||
unsigned int xmap;
|
||||
unsigned int dest_id;
|
||||
@ -201,6 +214,7 @@ typedef struct st_dms_context {
|
||||
dms_drid_t lock_id;
|
||||
dms_xid_ctx_t xid_ctx;
|
||||
dms_xmap_ctx_t xmap_ctx;
|
||||
dms_rfn_t rfn;
|
||||
unsigned char edp_inst;
|
||||
};
|
||||
} dms_context_t;
|
||||
@ -242,6 +256,11 @@ typedef struct dms_opengauss_txn_sw_info {
|
||||
unsigned int server_proc_slot; // backend slot of master, used for standby write feature
|
||||
} dms_opengauss_txn_sw_info_t;
|
||||
|
||||
typedef struct st_dms_opengauss_page_status_result {
|
||||
int bit_count;
|
||||
unsigned long int page_map[8];
|
||||
} dms_opengauss_page_status_result_t;
|
||||
|
||||
typedef enum dms_opengauss_lock_req_type {
|
||||
SHARED_INVAL_MSG,
|
||||
DROP_BUF_MSG,
|
||||
@ -452,6 +471,7 @@ typedef enum en_dms_wait_event {
|
||||
DMS_EVT_LATCH_X_REMOTE,
|
||||
DMS_EVT_LATCH_S_REMOTE,
|
||||
DMS_EVT_ONDEMAND_REDO,
|
||||
DMS_EVT_PAGE_STATUS_INFO,
|
||||
|
||||
|
||||
DMS_EVT_COUNT,
|
||||
@ -598,6 +618,8 @@ typedef int(*dms_opengauss_lock_buffer)(void *db_handle, int buffer, unsigned ch
|
||||
typedef int(*dms_get_txn_snapshot)(void *db_handle, unsigned int xmap, dms_txn_snapshot_t *txn_snapshot);
|
||||
typedef int(*dms_get_opengauss_txn_snapshot)(void *db_handle, dms_opengauss_txn_snapshot_t *txn_snapshot);
|
||||
typedef int(*dms_get_opengauss_txn_of_master)(void *db_handle, dms_opengauss_txn_sw_info_t *txn_swinfo);
|
||||
typedef int(*dms_get_opengauss_page_status)(void *db_handle, dms_opengauss_relfilenode_t *rnode, unsigned int page,
|
||||
int page_num, dms_opengauss_page_status_result_t *page_result);
|
||||
typedef void (*dms_log_output)(dms_log_id_t log_type, dms_log_level_t log_level, const char *code_file_name,
|
||||
unsigned int code_line_num, const char *module_name, const char *format, ...);
|
||||
typedef int (*dms_log_flush)(void *db_handle, unsigned long long *lsn);
|
||||
@ -745,6 +767,7 @@ typedef struct st_dms_callback {
|
||||
dms_get_txn_snapshot get_txn_snapshot;
|
||||
dms_get_opengauss_txn_snapshot get_opengauss_txn_snapshot;
|
||||
dms_get_opengauss_txn_of_master get_opengauss_txn_of_master;
|
||||
dms_get_opengauss_page_status get_opengauss_page_status;
|
||||
dms_log_output log_output;
|
||||
dms_log_flush log_flush;
|
||||
dms_process_edp ckpt_edp;
|
||||
@ -856,7 +879,7 @@ typedef enum en_dms_info_id {
|
||||
#define DMS_LOCAL_MINOR_VER_WEIGHT 1000
|
||||
#define DMS_LOCAL_MAJOR_VERSION 0
|
||||
#define DMS_LOCAL_MINOR_VERSION 0
|
||||
#define DMS_LOCAL_VERSION 76
|
||||
#define DMS_LOCAL_VERSION 77
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
@ -53,6 +53,8 @@ typedef struct st_ss_dms_func {
|
||||
dms_opengauss_txn_snapshot_t *dms_txn_snapshot);
|
||||
int (*dms_request_opengauss_txn_of_master)(dms_context_t *dms_ctx,
|
||||
dms_opengauss_txn_sw_info_t *dms_txn_swinfo);
|
||||
int (*dms_request_opengauss_page_status)(dms_context_t *dms_ctx, unsigned int page, int page_num,
|
||||
unsigned long int *page_map, int *bit_count);
|
||||
int (*dms_register_thread_init)(dms_thread_init_t thrd_init);
|
||||
int (*dms_release_owner)(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned char *released);
|
||||
int (*dms_wait_reform)(unsigned int *has_offline);
|
||||
@ -102,6 +104,8 @@ int dms_request_opengauss_txn_snapshot(dms_context_t *dms_ctx,
|
||||
dms_opengauss_txn_snapshot_t *dms_txn_snapshot);
|
||||
int dms_request_opengauss_txn_of_master(dms_context_t *dms_ctx,
|
||||
dms_opengauss_txn_sw_info_t *dms_txn_swinfo);
|
||||
int dms_request_opengauss_page_status(dms_context_t *dms_ctx, unsigned int page, int page_num,
|
||||
unsigned long int *page_map, int *bit_count);
|
||||
int dms_register_thread_init(dms_thread_init_t thrd_init);
|
||||
int dms_release_owner(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned char *released);
|
||||
int dms_wait_reform(unsigned int *has_offline);
|
||||
|
||||
@ -96,6 +96,7 @@ TransactionId SSMultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask, u
|
||||
bool SSGetOldestXminFromAllStandby();
|
||||
int SSGetOldestXmin(char *data, uint32 len, char *output_msg, uint32 *output_msg_len);
|
||||
int SSGetOldestXminAck(SSBroadcastXminAck *ack_data);
|
||||
void SSIsPageHitDms(RelFileNode& node, BlockNumber page, int pagesNum, uint64 *pageMap, int *bitCount);
|
||||
void SSSendSharedInvalidMessages(const SharedInvalidationMessage* msgs, int n);
|
||||
void SSBCastDropRelAllBuffer(RelFileNode *rnodes, int rnode_len);
|
||||
void SSBCastDropRelRangeBuffer(RelFileNode node, ForkNumber forkNum, BlockNumber firstDelBlock);
|
||||
|
||||
@ -293,6 +293,7 @@ void InvalidateBuffer(BufferDesc *buf);
|
||||
extern void ReservePrivateRefCountEntry(void);
|
||||
extern PrivateRefCountEntry* NewPrivateRefCountEntry(Buffer buffer);
|
||||
void LockTwoLWLock(LWLock *new_partition_lock, LWLock *old_partition_lock);
|
||||
extern bool IsPageHitBufferPool(RelFileNode& node, ForkNumber forkNum, BlockNumber blockNum);
|
||||
|
||||
extern void InitBufferPool(void);
|
||||
extern void pca_buf_init_ctx();
|
||||
|
||||
Reference in New Issue
Block a user