dms推点以及适配opgs

This commit is contained in:
chen-chao666
2024-11-15 15:45:33 +08:00
parent 0b7220f465
commit 6df9bf6d6b
5 changed files with 83 additions and 17 deletions

View File

@ -98,6 +98,7 @@ void TransformLockTagToDmsLatch(dms_drlatch_t* dlatch, const LOCKTAG locktag)
dlatch->drid.parent = locktag.locktag_field3;
dlatch->drid.part = locktag.locktag_field4;
dlatch->drid.uid = locktag.locktag_field5;
dlatch->handle = NULL;
}
static void CalcSegDmsPhysicalLoc(BufferDesc* buf_desc, Buffer buffer, bool check_standby)

View File

@ -617,7 +617,8 @@ static unsigned long long CBGetGlobalLSN(void *db_handle)
return GetInsertRecPtr();
}
static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl)
static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl,
unsigned long long seq)
{
bool is_seg;
int ret = DMS_SUCCESS;
@ -714,6 +715,7 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
}
*buf_ctrl = GetDmsBufCtrl(buf_id);
Assert(buf_id >= 0);
if ((*buf_ctrl)->been_loaded == false) {
*buf_ctrl = NULL;
LWLockRelease(buf_desc->content_lock);
@ -724,6 +726,19 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
tag->forkNum, tag->blockNum)));
break;
}
if (seq <= (*buf_ctrl)->seq) {
*buf_ctrl = NULL;
ret = DMS_ERROR;
LWLockRelease(buf_desc->content_lock);
DmsReleaseBuffer(buf_desc->buf_id + 1, is_seg);
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS page][%u/%u/%u/%d %d-%u] message expired",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum)));
break;
}
if ((*buf_ctrl)->lock_mode == DMS_LOCK_NULL) {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS page][%u/%u/%u/%d %d-%u] lock mode is null, still need to transfer page",
@ -750,10 +765,10 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
}
static int CBEnterLocalPage(void *db_handle, char pageid[DMS_PAGEID_SIZE], dms_lock_mode_t mode,
dms_buf_ctrl_t **buf_ctrl)
dms_buf_ctrl_t **buf_ctrl, unsigned long long seq)
{
BufferTag *tag = (BufferTag *)pageid;
return tryEnterLocalPage(tag, mode, buf_ctrl);
return tryEnterLocalPage(tag, mode, buf_ctrl, seq);
}
static unsigned char CBPageDirty(dms_buf_ctrl_t *buf_ctrl)
@ -794,7 +809,8 @@ static char* CBGetPage(dms_buf_ctrl_t *buf_ctrl)
return (char *)BufHdrGetBlock(buf_desc);
}
static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned char invld_owner)
static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned char invld_owner,
unsigned long long seq)
{
int buf_id = -1;
BufferTag* tag = (BufferTag *)pageid;
@ -844,6 +860,15 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
break;
}
if (seq <= buf_ctrl->seq) {
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS page][%d/%d/%d/%d %d-%d] expired message", tag->rnode.spcNode, tag->rnode.dbNode,
tag->rnode.relNode, tag->rnode.bucketNode, tag->forkNum, tag->blockNum)));
UnlockBufHdr(buf_desc, buf_state);
ret = DMS_ERROR;
break;
}
/* For aio (flush disk not finished), dirty, in dirty queue, dirty need flush, can't recycle */
if (buf_desc->extra->aio_in_progress || (buf_state & BM_DIRTY) || (buf_state & BM_JUST_DIRTIED) ||
XLogRecPtrIsValid(pg_atomic_read_u64(&buf_desc->extra->rec_lsn)) ||
@ -1392,6 +1417,8 @@ static int32 SSBufRebuildOneDrcInternal(BufferDesc *buf_desc, unsigned char thre
Assert(buf_ctrl->is_edp != 1);
Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo));
#endif
buf_ctrl->seq = 0;
dms_context_t dms_ctx;
InitDmsBufContext(&dms_ctx, buf_desc->tag);
dms_ctrl_info_t ctrl_info = { 0 };
@ -1475,24 +1502,39 @@ static int32 CBBufRebuildDrcInternal(int begin, int len, unsigned char thread_in
*/
const int dms_invalid_thread_index = 255;
const int dms_invalid_thread_num = 255;
static int32 CBBufRebuildDrcParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num)
static void CBAllocBufRangeForThread(unsigned char thread_index, unsigned char thread_num,
int* buf_begin, int* buf_num)
{
Assert((thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) ||
(thread_index != dms_invalid_thread_index && thread_num != dms_invalid_thread_num &&
thread_index < thread_num));
int buf_num = TOTAL_BUFFER_NUM / thread_num;
int buf_begin = thread_index * buf_num;
int num = TOTAL_BUFFER_NUM / thread_num;
int begin = thread_index * num;
if (thread_index == thread_num - 1) {
buf_num = TOTAL_BUFFER_NUM - buf_begin;
num = TOTAL_BUFFER_NUM - begin;
}
if (thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) {
buf_begin = 0;
buf_num = TOTAL_BUFFER_NUM;
begin = 0;
num = TOTAL_BUFFER_NUM;
}
*buf_begin = begin;
*buf_num = num;
}
static int32 CBBufRebuildDrcParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num)
{
int buf_begin = 0;
int buf_num = 0;
CBAllocBufRangeForThread(thread_index, thread_num, &buf_begin, &buf_num);
return CBBufRebuildDrcInternal(buf_begin, buf_num, thread_index);
}
static int32 CBRcyClean(void* db_handle, unsigned char thread_index, unsigned char thread_num)
{
}
static int32 CBDrcBufValidate(void *db_handle)
{
/* Load Control File */
@ -2255,6 +2297,7 @@ int CBOndemandRedoPageForStandby(void *block_key, int32 *redo_status)
return GS_SUCCESS;
}
void CBGetBufInfo(char* resid, stat_buf_info_t *buf_info)
{
BufferTag tag;
@ -2285,6 +2328,19 @@ int CBDoCheckpointImmediately(unsigned long long *ckpt_lsn)
return GS_SUCCESS;
}
int CBBufCtrlRcyClean(void *db_handle, unsigned char thread_index, unsigned char thread_num)
{
int buf_begin = 0;
int buf_num = 0;
CBAllocBufRangeForThread(thread_index, thread_num, &buf_begin, &buf_num);
int buf_end = buf_begin + buf_num - 1;
for (int i = buf_begin; i <= buf_end; i++) {
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(i);
buf_ctrl->in_rcy = false;
}
return GS_SUCCESS;
}
void DmsInitCallback(dms_callback_t *callback)
{
// used in reform
@ -2304,6 +2360,7 @@ void DmsInitCallback(dms_callback_t *callback)
callback->reform_start_notify = CBReformStartNotify;
callback->reform_set_dms_role = CBReformSetDmsRole;
callback->opengauss_ondemand_redo_buffer = CBOndemandRedoPageForStandby;
callback->dms_ctl_rcy_clean_parallel = CBRcyClean;
callback->inc_and_get_srsn = CBIncAndGetSrsn;
callback->get_page_hash_val = CBPageHashCode;
@ -2353,4 +2410,5 @@ void DmsInitCallback(dms_callback_t *callback)
callback->buf_ctrl_recycle = CBBufCtrlRecycle;
callback->dms_thread_deinit = DmsThreadDeinit;
callback->opengauss_do_ckpt_immediate = CBDoCheckpointImmediately;
callback->dms_ctl_rcy_clean_parallel = CBBufCtrlRcyClean;
}

View File

@ -412,7 +412,7 @@ static void setDMSProfile(dms_profile_t* profile)
profile->max_session_cnt = DMS_MAX_SESSIONS;
profile->time_stat_enabled = TRUE;
profile->pipe_type = convertInterconnectType();
profile->conn_created_during_init = TRUE;
profile->conn_created_during_init = FALSE;
setRdmaWorkConfig(profile);
setScrlConfig(profile);
SetOckLogPath(dms_attr, profile->ock_log_path);

View File

@ -1,3 +1,3 @@
dms_commit_id=57edf641c1feb77c543cf5492dc70e9a3278b41a
dms_commit_id=b276d85d2026178604c0646b61f1e86b1d2220ee
dss_commit_id=4629ef977cd3d78b36d5aad50a8ecacf8c4015a4
cbb_commit_id=ca573ccfc2edb6da4c287cdc7eab7a03d8944901
cbb_commit_id=82f64d003809e61635ad112998b979b39c89a460

View File

@ -35,7 +35,7 @@ extern "C" {
#define DMS_LOCAL_MINOR_VER_WEIGHT 1000
#define DMS_LOCAL_MAJOR_VERSION 0
#define DMS_LOCAL_MINOR_VERSION 0
#define DMS_LOCAL_VERSION 170
#define DMS_LOCAL_VERSION 172
#define DMS_SUCCESS 0
#define DMS_ERROR (-1)
@ -260,10 +260,12 @@ typedef struct st_dms_cr_assist_t {
typedef struct st_dms_drlock {
dms_drid_t drid;
void *handle;
} dms_drlock_t;
typedef struct st_dms_drlatch {
dms_drid_t drid;
dms_drid_t drid;
void *handle;
} dms_drlatch_t;
typedef struct st_dms_xid_ctx {
@ -448,6 +450,7 @@ typedef struct st_dms_buf_ctrl
unsigned long long edp_map; // records edp instance
long long last_ckpt_time; // last time when local edp page is added to group.
volatile unsigned int lock_ss_read; // concurrency control for rebuild/confirm
unsigned long long seq; // for dms page swap message-sequence
#ifdef OPENGAUSS
int buf_id;
unsigned int state;
@ -851,14 +854,15 @@ typedef unsigned long long(*dms_get_global_lsn)(void *db_handle);
typedef void(*dms_get_global_flushed_lfn)(void *db_handle, unsigned char *node_id, unsigned long long *node_lfn,
unsigned long long *node_data, unsigned int len);
typedef int(*dms_read_local_page4transfer)(void *db_handle, char pageid[DMS_PAGEID_SIZE],
dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl);
dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl, unsigned long long seq);
typedef int(*dms_try_read_local_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE],
dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl);
typedef unsigned char(*dms_page_is_dirty)(dms_buf_ctrl_t *buf_ctrl);
typedef void(*dms_leave_local_page)(void *db_handle, dms_buf_ctrl_t *buf_ctrl);
typedef void(*dms_get_pageid)(dms_buf_ctrl_t *buf_ctrl, char **pageid, unsigned int *size);
typedef char *(*dms_get_page)(dms_buf_ctrl_t *buf_ctrl);
typedef int (*dms_invalidate_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned char invld_owner);
typedef int (*dms_invalidate_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned char invld_owner,
unsigned long long seq);
typedef void *(*dms_get_db_handle)(unsigned int *db_handle_index, dms_session_type_e session_type);
typedef void (*dms_release_db_handle)(void *db_handle);
typedef char *(*dms_get_wxid_from_cr_cursor)(void *cr_cursor);
@ -987,6 +991,7 @@ typedef int (*dms_standby_resume_server)(void *db_handle);
typedef int (*dms_start_lrpl)(void *db_handle, int is_reformer);
typedef int (*dms_stop_lrpl)(void *db_handle, int is_reformer);
typedef int (*dms_az_switchover_demote_phase1)(void *db_handle);
typedef int (*dms_az_switchover_demote_stop_ckpt)(void *db_handle);
typedef int (*dms_az_switchover_demote_update_node_ctrl)(void *db_handle, unsigned long long online_list);
typedef int (*dms_az_switchover_demote_change_role)(void *db_handle);
typedef int (*dms_az_switchover_demote_approve)(void *db_handle);
@ -1184,6 +1189,7 @@ typedef struct st_dms_callback {
// for az switchover and az failover
dms_az_switchover_demote_phase1 az_switchover_demote_phase1;
dms_az_switchover_demote_stop_ckpt az_switchover_demote_stop_ckpt;
dms_az_switchover_demote_update_node_ctrl az_switchover_demote_update_node_ctrl;
dms_az_switchover_demote_change_role az_switchover_demote_change_role;
dms_az_switchover_demote_approve az_switchover_demote_approve;
@ -1260,6 +1266,7 @@ typedef struct st_dms_profile {
unsigned int max_alive_time_for_abnormal_status;
unsigned char enable_dyn_trace;
unsigned char enable_reform_trace;
unsigned long long drc_buf_size;
} dms_profile_t;
typedef struct st_logger_param {