diff --git a/contrib/ndpplugin/ndp/ndp_req.h b/contrib/ndpplugin/ndp/ndp_req.h index 9158a6454..c0eb50b15 100644 --- a/contrib/ndpplugin/ndp/ndp_req.h +++ b/contrib/ndpplugin/ndp/ndp_req.h @@ -34,7 +34,6 @@ const uint32 NDP_LOCAL_VERSION_NUM = 4; #define BITMAP_SIZE_PER_AU_U64 (BITMAP_SIZE_PER_AU_BYTE / sizeof(uint64)) #define NDPGETBYTE(x, i) (*((char*)(x) + (int)((i) / BITS_PER_BYTE))) -#define NDPGETBITBYTE(x, i) ((((char)(x)) >> (i)) & 0x01) #define NDPCLRBIT(x, i) NDPGETBYTE(x, i) &= ~(0x01 << ((i) % BITS_PER_BYTE)) #define NDPSETBIT(x, i) NDPGETBYTE(x, i) |= (0x01 << ((i) % BITS_PER_BYTE)) #define NDPGETBIT(x, i) ((NDPGETBYTE(x, i) >> ((i) % BITS_PER_BYTE)) & 0x01) diff --git a/contrib/ndpplugin/ndpam.cpp b/contrib/ndpplugin/ndpam.cpp index ad7b9bffa..aae81a401 100644 --- a/contrib/ndpplugin/ndpam.cpp +++ b/contrib/ndpplugin/ndpam.cpp @@ -166,24 +166,6 @@ void pm_get_pageinfo(NdpScanDesc ndpScan, BlockNumber page, CephObject *object, } } -bool IsPageHitDms(RelFileNode& node, BlockNumber page) -{ - int bufId = 0; - BufferTag newTag; - - INIT_BUFFERTAG(newTag, node, MAIN_FORKNUM, page); - uint32 new_hash = BufTableHashCode(&newTag); - LWLock *new_partition_lock = BufMappingPartitionLock(new_hash); - /* see if the block is in the buffer pool already */ - (void)LWLockAcquire(new_partition_lock, LW_SHARED); - bufId = BufTableLookup(&newTag, new_hash); - LWLockRelease(new_partition_lock); - if (bufId != -1) { - return true; - } - return false; -} - void CopyCLog(int64 pageno, char *pageBuffer) { int slotno; @@ -229,17 +211,16 @@ int NdpIoSlot::SetReq(RelFileNode& node, uint16 taskId, uint16 tableId, AuInfo& errno_t rc = memset_s(req.pageMap, BITMAP_SIZE_PER_AU_BYTE, 0, BITMAP_SIZE_PER_AU_BYTE); securec_check(rc, "", ""); - if (SS_STANDBY_MODE) { - // SSIsPageHitDms(node, startBlockNum, auinfo.pageNum, req.pageMap, &bitCount); - } else { - for (uint32 i = startBlockNum, offset = 0; i != startBlockNum + auinfo.pageNum; ++i, ++offset) { - bool cached = IsPageHitDms(node, i); - if (!cached) { - NDPSETBIT(req.pageMap, offset); - ++bitCount; - } + for (uint32 i = startBlockNum, offset = 0; i != startBlockNum + auinfo.pageNum; ++i, ++offset) { + bool cached = IsPageHitBufferPool(node, MAIN_FORKNUM, i); + if (!cached) { + NDPSETBIT(req.pageMap, offset); + ++bitCount; } } + if (SS_STANDBY_MODE) { + SSIsPageHitDms(node, startBlockNum, auinfo.pageNum, req.pageMap, &bitCount); + } return bitCount; } diff --git a/src/gausskernel/ddes/adapter/ss_dms.cpp b/src/gausskernel/ddes/adapter/ss_dms.cpp index 3987c1b45..251d6e725 100644 --- a/src/gausskernel/ddes/adapter/ss_dms.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms.cpp @@ -102,6 +102,7 @@ int ss_dms_func_init() SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_request_opengauss_txn_status)); SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_request_opengauss_txn_snapshot)); SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_request_opengauss_txn_of_master)); + SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_request_opengauss_page_status)); SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_register_thread_init)); SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_release_owner)); SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_wait_reform)); @@ -227,6 +228,12 @@ int dms_request_opengauss_txn_of_master(dms_context_t *dms_ctx, dms_opengauss_tx return g_ss_dms_func.dms_request_opengauss_txn_of_master(dms_ctx, dms_txn_swinfo); } +int dms_request_opengauss_page_status(dms_context_t *dms_ctx, unsigned int page, int page_num, + unsigned long int *page_map, int *bit_count) +{ + return g_ss_dms_func.dms_request_opengauss_page_status(dms_ctx, page, page_num, page_map, bit_count); +} + int dms_broadcast_opengauss_ddllock(dms_context_t *dms_ctx, char *data, unsigned int len, unsigned char handle_recv_msg, unsigned int timeout, unsigned char resend_after_reform) { diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 882c461a6..ba293b73d 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -277,6 +277,25 @@ static int CBGetTxnStatus(void *db_handle, unsigned long long xid, unsigned char return DMS_SUCCESS; } +#define NDPGETBYTE(x, i) (*((char*)(x) + (int)((i) / BITS_PER_BYTE))) +#define NDPCLRBIT(x, i) NDPGETBYTE(x, i) &= ~(0x01 << ((i) % BITS_PER_BYTE)) +#define NDPGETBIT(x, i) ((NDPGETBYTE(x, i) >> ((i) % BITS_PER_BYTE)) & 0x01) + +static int CBGetPageStatus(void *db_handle, dms_opengauss_relfilenode_t *rnode, unsigned int page, + int pagesNum, dms_opengauss_page_status_result_t *page_result) +{ + for (uint32 i = page, offset = 0; i != page + pagesNum; ++i, ++offset) { + if (NDPGETBIT(page_result->page_map, offset)) { + bool cached = IsPageHitBufferPool(*(RelFileNode * )(rnode), MAIN_FORKNUM, i); + if (cached) { + NDPCLRBIT(page_result->page_map, offset); + --page_result->bit_count; + } + } + } + return DMS_SUCCESS; +} + static int CBGetCurrModeAndLockBuffer(void *db_handle, int buffer, unsigned char lock_mode, unsigned char *curr_mode) { @@ -1962,6 +1981,7 @@ void DmsInitCallback(dms_callback_t *callback) callback->opengauss_lock_buffer = CBGetCurrModeAndLockBuffer; callback->get_opengauss_txn_snapshot = CBGetSnapshotData; callback->get_opengauss_txn_of_master = CBGetTxnSwinfo; + callback->get_opengauss_page_status = CBGetPageStatus; callback->log_output = NULL; diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index 2867ab9d8..740085657 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -317,6 +317,22 @@ bool SSGetOldestXminFromAllStandby() return true; } +void SSIsPageHitDms(RelFileNode& node, BlockNumber page, int pagesNum, uint64 *pageMap, int *bitCount) +{ + dms_context_t dms_ctx; + InitDmsContext(&dms_ctx); + + dms_ctx.rfn.inst_id = (unsigned char)SS_PRIMARY_ID; + dms_ctx.rfn.rnode = *(dms_opengauss_relfilenode_t *)(&node); + + if (dms_request_opengauss_page_status(&dms_ctx, page, pagesNum, pageMap, bitCount) != DMS_SUCCESS) { + *bitCount = 0; + ereport(DEBUG1, (errmsg("SS get page map failed, buffer_id = %u.", page))); + return; + } + ereport(DEBUG1, (errmsg("SS get page map success, buffer_id = %u.", page))); +} + int SSCheckDbBackends(char *data, uint32 len, char *output_msg, uint32 *output_msg_len) { if (unlikely(len != sizeof(SSBroadcastDbBackends))) { diff --git a/src/gausskernel/ddes/ddes_commit_id b/src/gausskernel/ddes/ddes_commit_id index cf134156f..7ee3a32b7 100644 --- a/src/gausskernel/ddes/ddes_commit_id +++ b/src/gausskernel/ddes/ddes_commit_id @@ -1,2 +1,2 @@ -dms_commit_id=ede9efeda8ed0ab2ad79f119b619c8a063023683 -dss_commit_id=97deeec25d7ec4ee0c92b5bc0bcc50eaa3ae64af \ No newline at end of file +dms_commit_id=a9d399af041ae5491149fdb7c58e5115da3065b2 +dss_commit_id=97deeec25d7ec4ee0c92b5bc0bcc50eaa3ae64af diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 8e79236af..a4a8b2242 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -7332,3 +7332,21 @@ void LockTwoLWLock(LWLock *new_partition_lock, LWLock *old_partition_lock) (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); } } + +bool IsPageHitBufferPool(RelFileNode& node, ForkNumber forkNum, BlockNumber blockNum) +{ + int bufId = 0; + BufferTag newTag; + + INIT_BUFFERTAG(newTag, node, forkNum, blockNum); + uint32 new_hash = BufTableHashCode(&newTag); + LWLock *new_partition_lock = BufMappingPartitionLock(new_hash); + /* see if the block is in the buffer pool already */ + (void)LWLockAcquire(new_partition_lock, LW_SHARED); + bufId = BufTableLookup(&newTag, new_hash); + LWLockRelease(new_partition_lock); + if (bufId != -1) { + return true; + } + return false; +} diff --git a/src/include/ddes/dms/dms_api.h b/src/include/ddes/dms/dms_api.h index 3bfe5f4f5..b0c26edcb 100644 --- a/src/include/ddes/dms/dms_api.h +++ b/src/include/ddes/dms/dms_api.h @@ -182,6 +182,19 @@ typedef struct st_dms_xid_ctx { unsigned long long scn; } dms_xid_ctx_t; +typedef struct dms_opengauss_relfilenode { + unsigned int spcNode; + unsigned int dbNode; + unsigned int relNode; + signed short bucketNode; + unsigned short opt; +} dms_opengauss_relfilenode_t; + +typedef struct st_dms_rfn { + dms_opengauss_relfilenode_t rnode; + unsigned char inst_id; +} dms_rfn_t; + typedef struct st_dms_xmap_ctx { unsigned int xmap; unsigned int dest_id; @@ -201,6 +214,7 @@ typedef struct st_dms_context { dms_drid_t lock_id; dms_xid_ctx_t xid_ctx; dms_xmap_ctx_t xmap_ctx; + dms_rfn_t rfn; unsigned char edp_inst; }; } dms_context_t; @@ -242,6 +256,11 @@ typedef struct dms_opengauss_txn_sw_info { unsigned int server_proc_slot; // backend slot of master, used for standby write feature } dms_opengauss_txn_sw_info_t; +typedef struct st_dms_opengauss_page_status_result { + int bit_count; + unsigned long int page_map[8]; +} dms_opengauss_page_status_result_t; + typedef enum dms_opengauss_lock_req_type { SHARED_INVAL_MSG, DROP_BUF_MSG, @@ -452,6 +471,7 @@ typedef enum en_dms_wait_event { DMS_EVT_LATCH_X_REMOTE, DMS_EVT_LATCH_S_REMOTE, DMS_EVT_ONDEMAND_REDO, + DMS_EVT_PAGE_STATUS_INFO, DMS_EVT_COUNT, @@ -598,6 +618,8 @@ typedef int(*dms_opengauss_lock_buffer)(void *db_handle, int buffer, unsigned ch typedef int(*dms_get_txn_snapshot)(void *db_handle, unsigned int xmap, dms_txn_snapshot_t *txn_snapshot); typedef int(*dms_get_opengauss_txn_snapshot)(void *db_handle, dms_opengauss_txn_snapshot_t *txn_snapshot); typedef int(*dms_get_opengauss_txn_of_master)(void *db_handle, dms_opengauss_txn_sw_info_t *txn_swinfo); +typedef int(*dms_get_opengauss_page_status)(void *db_handle, dms_opengauss_relfilenode_t *rnode, unsigned int page, + int page_num, dms_opengauss_page_status_result_t *page_result); typedef void (*dms_log_output)(dms_log_id_t log_type, dms_log_level_t log_level, const char *code_file_name, unsigned int code_line_num, const char *module_name, const char *format, ...); typedef int (*dms_log_flush)(void *db_handle, unsigned long long *lsn); @@ -745,6 +767,7 @@ typedef struct st_dms_callback { dms_get_txn_snapshot get_txn_snapshot; dms_get_opengauss_txn_snapshot get_opengauss_txn_snapshot; dms_get_opengauss_txn_of_master get_opengauss_txn_of_master; + dms_get_opengauss_page_status get_opengauss_page_status; dms_log_output log_output; dms_log_flush log_flush; dms_process_edp ckpt_edp; @@ -856,7 +879,7 @@ typedef enum en_dms_info_id { #define DMS_LOCAL_MINOR_VER_WEIGHT 1000 #define DMS_LOCAL_MAJOR_VERSION 0 #define DMS_LOCAL_MINOR_VERSION 0 -#define DMS_LOCAL_VERSION 76 +#define DMS_LOCAL_VERSION 77 #ifdef __cplusplus } diff --git a/src/include/ddes/dms/ss_dms.h b/src/include/ddes/dms/ss_dms.h index 534ed7834..37477f398 100644 --- a/src/include/ddes/dms/ss_dms.h +++ b/src/include/ddes/dms/ss_dms.h @@ -53,6 +53,8 @@ typedef struct st_ss_dms_func { dms_opengauss_txn_snapshot_t *dms_txn_snapshot); int (*dms_request_opengauss_txn_of_master)(dms_context_t *dms_ctx, dms_opengauss_txn_sw_info_t *dms_txn_swinfo); + int (*dms_request_opengauss_page_status)(dms_context_t *dms_ctx, unsigned int page, int page_num, + unsigned long int *page_map, int *bit_count); int (*dms_register_thread_init)(dms_thread_init_t thrd_init); int (*dms_release_owner)(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned char *released); int (*dms_wait_reform)(unsigned int *has_offline); @@ -102,6 +104,8 @@ int dms_request_opengauss_txn_snapshot(dms_context_t *dms_ctx, dms_opengauss_txn_snapshot_t *dms_txn_snapshot); int dms_request_opengauss_txn_of_master(dms_context_t *dms_ctx, dms_opengauss_txn_sw_info_t *dms_txn_swinfo); +int dms_request_opengauss_page_status(dms_context_t *dms_ctx, unsigned int page, int page_num, + unsigned long int *page_map, int *bit_count); int dms_register_thread_init(dms_thread_init_t thrd_init); int dms_release_owner(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned char *released); int dms_wait_reform(unsigned int *has_offline); diff --git a/src/include/ddes/dms/ss_transaction.h b/src/include/ddes/dms/ss_transaction.h index c11cf595d..721c38bb8 100644 --- a/src/include/ddes/dms/ss_transaction.h +++ b/src/include/ddes/dms/ss_transaction.h @@ -96,6 +96,7 @@ TransactionId SSMultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask, u bool SSGetOldestXminFromAllStandby(); int SSGetOldestXmin(char *data, uint32 len, char *output_msg, uint32 *output_msg_len); int SSGetOldestXminAck(SSBroadcastXminAck *ack_data); +void SSIsPageHitDms(RelFileNode& node, BlockNumber page, int pagesNum, uint64 *pageMap, int *bitCount); void SSSendSharedInvalidMessages(const SharedInvalidationMessage* msgs, int n); void SSBCastDropRelAllBuffer(RelFileNode *rnodes, int rnode_len); void SSBCastDropRelRangeBuffer(RelFileNode node, ForkNumber forkNum, BlockNumber firstDelBlock); diff --git a/src/include/storage/buf/bufmgr.h b/src/include/storage/buf/bufmgr.h index a02c2837d..5581d23f6 100644 --- a/src/include/storage/buf/bufmgr.h +++ b/src/include/storage/buf/bufmgr.h @@ -293,6 +293,7 @@ void InvalidateBuffer(BufferDesc *buf); extern void ReservePrivateRefCountEntry(void); extern PrivateRefCountEntry* NewPrivateRefCountEntry(Buffer buffer); void LockTwoLWLock(LWLock *new_partition_lock, LWLock *old_partition_lock); +extern bool IsPageHitBufferPool(RelFileNode& node, ForkNumber forkNum, BlockNumber blockNum); extern void InitBufferPool(void); extern void pca_buf_init_ctx();