From 6df9bf6d6be130819342f981ca64afe5c9d73155 Mon Sep 17 00:00:00 2001 From: chen-chao666 <1790599142@qq.com> Date: Fri, 15 Nov 2024 15:45:33 +0800 Subject: [PATCH] =?UTF-8?q?dms=E6=8E=A8=E7=82=B9=E4=BB=A5=E5=8F=8A?= =?UTF-8?q?=E9=80=82=E9=85=8Dopgs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ddes/adapter/ss_dms_bufmgr.cpp | 1 + .../ddes/adapter/ss_dms_callback.cpp | 78 ++++++++++++++++--- src/gausskernel/ddes/adapter/ss_init.cpp | 2 +- src/gausskernel/ddes/ddes_commit_id | 4 +- src/include/ddes/dms/dms_api.h | 15 +++- 5 files changed, 83 insertions(+), 17 deletions(-) diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 105b1cf82..440304167 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -98,6 +98,7 @@ void TransformLockTagToDmsLatch(dms_drlatch_t* dlatch, const LOCKTAG locktag) dlatch->drid.parent = locktag.locktag_field3; dlatch->drid.part = locktag.locktag_field4; dlatch->drid.uid = locktag.locktag_field5; + dlatch->handle = NULL; } static void CalcSegDmsPhysicalLoc(BufferDesc* buf_desc, Buffer buffer, bool check_standby) diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index faaa31e38..82db6fee5 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -617,7 +617,8 @@ static unsigned long long CBGetGlobalLSN(void *db_handle) return GetInsertRecPtr(); } -static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl) +static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl, + unsigned long long seq) { bool is_seg; int ret = DMS_SUCCESS; @@ -714,6 +715,7 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ } *buf_ctrl = GetDmsBufCtrl(buf_id); Assert(buf_id >= 0); + if ((*buf_ctrl)->been_loaded == false) { *buf_ctrl = NULL; LWLockRelease(buf_desc->content_lock); @@ -724,6 +726,19 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ tag->forkNum, tag->blockNum))); break; } + + if (seq <= (*buf_ctrl)->seq) { + *buf_ctrl = NULL; + ret = DMS_ERROR; + LWLockRelease(buf_desc->content_lock); + DmsReleaseBuffer(buf_desc->buf_id + 1, is_seg); + ereport(WARNING, (errmodule(MOD_DMS), + errmsg("[SS page][%u/%u/%u/%d %d-%u] message expired", + tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, + tag->forkNum, tag->blockNum))); + break; + } + if ((*buf_ctrl)->lock_mode == DMS_LOCK_NULL) { ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS page][%u/%u/%u/%d %d-%u] lock mode is null, still need to transfer page", @@ -750,10 +765,10 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ } static int CBEnterLocalPage(void *db_handle, char pageid[DMS_PAGEID_SIZE], dms_lock_mode_t mode, - dms_buf_ctrl_t **buf_ctrl) + dms_buf_ctrl_t **buf_ctrl, unsigned long long seq) { BufferTag *tag = (BufferTag *)pageid; - return tryEnterLocalPage(tag, mode, buf_ctrl); + return tryEnterLocalPage(tag, mode, buf_ctrl, seq); } static unsigned char CBPageDirty(dms_buf_ctrl_t *buf_ctrl) @@ -794,7 +809,8 @@ static char* CBGetPage(dms_buf_ctrl_t *buf_ctrl) return (char *)BufHdrGetBlock(buf_desc); } -static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned char invld_owner) +static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned char invld_owner, + unsigned long long seq) { int buf_id = -1; BufferTag* tag = (BufferTag *)pageid; @@ -844,6 +860,15 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig break; } + if (seq <= buf_ctrl->seq) { + ereport(LOG, (errmodule(MOD_DMS), + errmsg("[SS page][%d/%d/%d/%d %d-%d] expired message", tag->rnode.spcNode, tag->rnode.dbNode, + tag->rnode.relNode, tag->rnode.bucketNode, tag->forkNum, tag->blockNum))); + UnlockBufHdr(buf_desc, buf_state); + ret = DMS_ERROR; + break; + } + /* For aio (flush disk not finished), dirty, in dirty queue, dirty need flush, can't recycle */ if (buf_desc->extra->aio_in_progress || (buf_state & BM_DIRTY) || (buf_state & BM_JUST_DIRTIED) || XLogRecPtrIsValid(pg_atomic_read_u64(&buf_desc->extra->rec_lsn)) || @@ -1392,6 +1417,8 @@ static int32 SSBufRebuildOneDrcInternal(BufferDesc *buf_desc, unsigned char thre Assert(buf_ctrl->is_edp != 1); Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo)); #endif + buf_ctrl->seq = 0; + dms_context_t dms_ctx; InitDmsBufContext(&dms_ctx, buf_desc->tag); dms_ctrl_info_t ctrl_info = { 0 }; @@ -1475,24 +1502,39 @@ static int32 CBBufRebuildDrcInternal(int begin, int len, unsigned char thread_in */ const int dms_invalid_thread_index = 255; const int dms_invalid_thread_num = 255; -static int32 CBBufRebuildDrcParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num) +static void CBAllocBufRangeForThread(unsigned char thread_index, unsigned char thread_num, + int* buf_begin, int* buf_num) { Assert((thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) || (thread_index != dms_invalid_thread_index && thread_num != dms_invalid_thread_num && thread_index < thread_num)); - int buf_num = TOTAL_BUFFER_NUM / thread_num; - int buf_begin = thread_index * buf_num; + int num = TOTAL_BUFFER_NUM / thread_num; + int begin = thread_index * num; if (thread_index == thread_num - 1) { - buf_num = TOTAL_BUFFER_NUM - buf_begin; + num = TOTAL_BUFFER_NUM - begin; } if (thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) { - buf_begin = 0; - buf_num = TOTAL_BUFFER_NUM; + begin = 0; + num = TOTAL_BUFFER_NUM; } + *buf_begin = begin; + *buf_num = num; +} + +static int32 CBBufRebuildDrcParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num) +{ + int buf_begin = 0; + int buf_num = 0; + CBAllocBufRangeForThread(thread_index, thread_num, &buf_begin, &buf_num); return CBBufRebuildDrcInternal(buf_begin, buf_num, thread_index); } +static int32 CBRcyClean(void* db_handle, unsigned char thread_index, unsigned char thread_num) +{ + +} + static int32 CBDrcBufValidate(void *db_handle) { /* Load Control File */ @@ -2255,6 +2297,7 @@ int CBOndemandRedoPageForStandby(void *block_key, int32 *redo_status) return GS_SUCCESS; } + void CBGetBufInfo(char* resid, stat_buf_info_t *buf_info) { BufferTag tag; @@ -2285,6 +2328,19 @@ int CBDoCheckpointImmediately(unsigned long long *ckpt_lsn) return GS_SUCCESS; } +int CBBufCtrlRcyClean(void *db_handle, unsigned char thread_index, unsigned char thread_num) +{ + int buf_begin = 0; + int buf_num = 0; + CBAllocBufRangeForThread(thread_index, thread_num, &buf_begin, &buf_num); + int buf_end = buf_begin + buf_num - 1; + for (int i = buf_begin; i <= buf_end; i++) { + dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(i); + buf_ctrl->in_rcy = false; + } + return GS_SUCCESS; +} + void DmsInitCallback(dms_callback_t *callback) { // used in reform @@ -2304,6 +2360,7 @@ void DmsInitCallback(dms_callback_t *callback) callback->reform_start_notify = CBReformStartNotify; callback->reform_set_dms_role = CBReformSetDmsRole; callback->opengauss_ondemand_redo_buffer = CBOndemandRedoPageForStandby; + callback->dms_ctl_rcy_clean_parallel = CBRcyClean; callback->inc_and_get_srsn = CBIncAndGetSrsn; callback->get_page_hash_val = CBPageHashCode; @@ -2353,4 +2410,5 @@ void DmsInitCallback(dms_callback_t *callback) callback->buf_ctrl_recycle = CBBufCtrlRecycle; callback->dms_thread_deinit = DmsThreadDeinit; callback->opengauss_do_ckpt_immediate = CBDoCheckpointImmediately; + callback->dms_ctl_rcy_clean_parallel = CBBufCtrlRcyClean; } diff --git a/src/gausskernel/ddes/adapter/ss_init.cpp b/src/gausskernel/ddes/adapter/ss_init.cpp index 8c3f86550..f0f4eb062 100644 --- a/src/gausskernel/ddes/adapter/ss_init.cpp +++ b/src/gausskernel/ddes/adapter/ss_init.cpp @@ -412,7 +412,7 @@ static void setDMSProfile(dms_profile_t* profile) profile->max_session_cnt = DMS_MAX_SESSIONS; profile->time_stat_enabled = TRUE; profile->pipe_type = convertInterconnectType(); - profile->conn_created_during_init = TRUE; + profile->conn_created_during_init = FALSE; setRdmaWorkConfig(profile); setScrlConfig(profile); SetOckLogPath(dms_attr, profile->ock_log_path); diff --git a/src/gausskernel/ddes/ddes_commit_id b/src/gausskernel/ddes/ddes_commit_id index 27ee618a6..6d522679d 100644 --- a/src/gausskernel/ddes/ddes_commit_id +++ b/src/gausskernel/ddes/ddes_commit_id @@ -1,3 +1,3 @@ -dms_commit_id=57edf641c1feb77c543cf5492dc70e9a3278b41a +dms_commit_id=b276d85d2026178604c0646b61f1e86b1d2220ee dss_commit_id=4629ef977cd3d78b36d5aad50a8ecacf8c4015a4 -cbb_commit_id=ca573ccfc2edb6da4c287cdc7eab7a03d8944901 \ No newline at end of file +cbb_commit_id=82f64d003809e61635ad112998b979b39c89a460 \ No newline at end of file diff --git a/src/include/ddes/dms/dms_api.h b/src/include/ddes/dms/dms_api.h index 09ece1755..2a133e109 100644 --- a/src/include/ddes/dms/dms_api.h +++ b/src/include/ddes/dms/dms_api.h @@ -35,7 +35,7 @@ extern "C" { #define DMS_LOCAL_MINOR_VER_WEIGHT 1000 #define DMS_LOCAL_MAJOR_VERSION 0 #define DMS_LOCAL_MINOR_VERSION 0 -#define DMS_LOCAL_VERSION 170 +#define DMS_LOCAL_VERSION 172 #define DMS_SUCCESS 0 #define DMS_ERROR (-1) @@ -260,10 +260,12 @@ typedef struct st_dms_cr_assist_t { typedef struct st_dms_drlock { dms_drid_t drid; + void *handle; } dms_drlock_t; typedef struct st_dms_drlatch { - dms_drid_t drid; + dms_drid_t drid; + void *handle; } dms_drlatch_t; typedef struct st_dms_xid_ctx { @@ -448,6 +450,7 @@ typedef struct st_dms_buf_ctrl unsigned long long edp_map; // records edp instance long long last_ckpt_time; // last time when local edp page is added to group. volatile unsigned int lock_ss_read; // concurrency control for rebuild/confirm + unsigned long long seq; // for dms page swap message-sequence #ifdef OPENGAUSS int buf_id; unsigned int state; @@ -851,14 +854,15 @@ typedef unsigned long long(*dms_get_global_lsn)(void *db_handle); typedef void(*dms_get_global_flushed_lfn)(void *db_handle, unsigned char *node_id, unsigned long long *node_lfn, unsigned long long *node_data, unsigned int len); typedef int(*dms_read_local_page4transfer)(void *db_handle, char pageid[DMS_PAGEID_SIZE], - dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl); + dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl, unsigned long long seq); typedef int(*dms_try_read_local_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE], dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl); typedef unsigned char(*dms_page_is_dirty)(dms_buf_ctrl_t *buf_ctrl); typedef void(*dms_leave_local_page)(void *db_handle, dms_buf_ctrl_t *buf_ctrl); typedef void(*dms_get_pageid)(dms_buf_ctrl_t *buf_ctrl, char **pageid, unsigned int *size); typedef char *(*dms_get_page)(dms_buf_ctrl_t *buf_ctrl); -typedef int (*dms_invalidate_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned char invld_owner); +typedef int (*dms_invalidate_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned char invld_owner, + unsigned long long seq); typedef void *(*dms_get_db_handle)(unsigned int *db_handle_index, dms_session_type_e session_type); typedef void (*dms_release_db_handle)(void *db_handle); typedef char *(*dms_get_wxid_from_cr_cursor)(void *cr_cursor); @@ -987,6 +991,7 @@ typedef int (*dms_standby_resume_server)(void *db_handle); typedef int (*dms_start_lrpl)(void *db_handle, int is_reformer); typedef int (*dms_stop_lrpl)(void *db_handle, int is_reformer); typedef int (*dms_az_switchover_demote_phase1)(void *db_handle); +typedef int (*dms_az_switchover_demote_stop_ckpt)(void *db_handle); typedef int (*dms_az_switchover_demote_update_node_ctrl)(void *db_handle, unsigned long long online_list); typedef int (*dms_az_switchover_demote_change_role)(void *db_handle); typedef int (*dms_az_switchover_demote_approve)(void *db_handle); @@ -1184,6 +1189,7 @@ typedef struct st_dms_callback { // for az switchover and az failover dms_az_switchover_demote_phase1 az_switchover_demote_phase1; + dms_az_switchover_demote_stop_ckpt az_switchover_demote_stop_ckpt; dms_az_switchover_demote_update_node_ctrl az_switchover_demote_update_node_ctrl; dms_az_switchover_demote_change_role az_switchover_demote_change_role; dms_az_switchover_demote_approve az_switchover_demote_approve; @@ -1260,6 +1266,7 @@ typedef struct st_dms_profile { unsigned int max_alive_time_for_abnormal_status; unsigned char enable_dyn_trace; unsigned char enable_reform_trace; + unsigned long long drc_buf_size; } dms_profile_t; typedef struct st_logger_param {