适配DMS分布式锁性能优化

This commit is contained in:
黄堰蛟
2024-12-26 11:17:36 +08:00
committed by chen-chao6661
parent e9819adae9
commit 7c9f37efa4
7 changed files with 33 additions and 21 deletions

View File

@ -281,19 +281,19 @@ int dms_broadcast_opengauss_ddllock(dms_context_t *dms_ctx, char *data, unsigned
resend_after_reform);
}
bool dms_latch_timed_x(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned int wait_ticks)
bool dms_latch_timed_x(dms_drlatch_t *dlatch, unsigned int sid, unsigned int wait_ticks, void *dms_stat)
{
return (bool)g_ss_dms_func.dms_latch_timed_x(dms_ctx, dlatch, wait_ticks);
return (bool)g_ss_dms_func.dms_latch_timed_x(dlatch, sid, wait_ticks, dms_stat);
}
bool dms_latch_timed_s(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned int wait_ticks, unsigned char is_force)
bool dms_latch_timed_s(dms_drlatch_t *dlatch, unsigned int sid, unsigned int wait_ticks, unsigned char is_force, void *dms_stat)
{
return (bool)g_ss_dms_func.dms_latch_timed_s(dms_ctx, dlatch, wait_ticks, is_force);
return (bool)g_ss_dms_func.dms_latch_timed_s(dlatch, sid, wait_ticks, is_force, dms_stat);
}
void dms_unlatch(dms_context_t *dms_ctx, dms_drlatch_t *dlatch)
void dms_unlatch(dms_drlatch_t *dlatch, void *dms_stat)
{
g_ss_dms_func.dms_unlatch(dms_ctx, dlatch);
g_ss_dms_func.dms_unlatch(dlatch, dms_stat);
}
int dms_register_thread_init(dms_thread_init_t thrd_init)

View File

@ -2393,6 +2393,16 @@ int CBBufCtrlRcyClean(void *db_handle, unsigned char thread_index, unsigned char
return GS_SUCCESS;
}
dms_session_e CBDmsGetSessionType(unsigned int sid)
{
return DMSGetProcType4RequestPage();
}
unsigned char CBDmsGetInterceptType(unsigned int sid)
{
return 0;
}
void DmsInitCallback(dms_callback_t *callback)
{
// used in reform
@ -2462,4 +2472,7 @@ void DmsInitCallback(dms_callback_t *callback)
callback->dms_thread_deinit = DmsThreadDeinit;
callback->opengauss_do_ckpt_immediate = CBDoCheckpointImmediately;
callback->dms_ctl_rcy_clean_parallel = CBBufCtrlRcyClean;
callback->get_session_type = CBDmsGetSessionType;
callback->get_intercept_type = CBDmsGetInterceptType;
}

View File

@ -419,6 +419,7 @@ static void setDMSProfile(dms_profile_t* profile)
profile->inst_map = 0;
profile->parallel_thread_num = dms_attr->parallel_thread_num;
profile->max_wait_time = DMS_MSG_MAX_WAIT_TIME;
profile->spin_sleep_time_nsec = SS_SPIN_WAIT_TIME;
if (dms_attr->enable_ssl && g_instance.attr.attr_security.EnableSSL) {
InitDmsSSL();

View File

@ -1,3 +1,3 @@
dms_commit_id=23569ef4b8743de1b290dcda7002a995ee6f0c0d
dss_commit_id=314bbb038e7493998a7e542a050bd97572452b33
cbb_commit_id=627ad39b62cf7c349bda6351bd92b4544c3a4c3f
cbb_commit_id=627ad39b62cf7c349bda6351bd92b4544c3a4c3f

View File

@ -4044,7 +4044,6 @@ int LockWaiterCount(const LOCKTAG *locktag)
static bool SSDmsLockAcquire(LOCALLOCK *locallock, bool dontWait, int waitSec)
{
dms_context_t dms_ctx;
dms_drlatch_t dlatch;
bool ret = true;
bool timeout = true;
@ -4053,8 +4052,8 @@ static bool SSDmsLockAcquire(LOCALLOCK *locallock, bool dontWait, int waitSec)
int needWaitMilliSec = 0;
TimestampTz startTime;
LOCKMODE lockmode;
uint32 sid = (uint32)(t_thrd.proc ? t_thrd.proc->logictid : t_thrd.myLogicTid + GLOBAL_ALL_PROCS);
InitDmsContext(&dms_ctx);
TransformLockTagToDmsLatch(&dlatch, locallock->tag.lock);
needWaitMilliSec = (waitSec == 0) ? u_sess->attr.attr_storage.LockWaitTimeout : waitSec * SEC2MILLISEC;
@ -4065,9 +4064,9 @@ static bool SSDmsLockAcquire(LOCALLOCK *locallock, bool dontWait, int waitSec)
{
do {
if (lockmode < AccessExclusiveLock && SS_NORMAL_STANDBY) {
ret = dms_latch_timed_s(&dms_ctx, &dlatch, SS_ACQUIRE_LOCK_DO_NOT_WAIT, (unsigned char)false);
ret = dms_latch_timed_s(&dlatch, sid, SS_ACQUIRE_LOCK_DO_NOT_WAIT, (unsigned char)false, NULL);
} else if (lockmode >= AccessExclusiveLock && SS_NORMAL_PRIMARY) {
ret = dms_latch_timed_x(&dms_ctx, &dlatch, SS_ACQUIRE_LOCK_DO_NOT_WAIT);
ret = dms_latch_timed_x(&dlatch, sid, SS_ACQUIRE_LOCK_DO_NOT_WAIT, NULL);
} else {
// skip if lockmode do not meet ss lock request, or openGauss in reform process
skipAcquire = true;
@ -4113,16 +4112,14 @@ static bool SSDmsLockAcquire(LOCALLOCK *locallock, bool dontWait, int waitSec)
static void SSDmsLockRelease(LOCALLOCK *locallock)
{
dms_context_t dms_ctx;
dms_drlatch_t dlatch;
if (!locallock->ssLock) {
return;
}
InitDmsContext(&dms_ctx);
TransformLockTagToDmsLatch(&dlatch, locallock->tag.lock);
locallock->ssLock = FALSE;
dms_unlatch(&dms_ctx, &dlatch);
dms_unlatch(&dlatch, NULL);
}

View File

@ -159,6 +159,7 @@
#define DMS_MSG_MAX_WAIT_TIME (10 * 1000) // 10s
#define SS_REFORM_WAIT_TIME (5000) // 5ms
#define SS_WAIT_TIME (20*10000000) // 200s
#define SS_SPIN_WAIT_TIME (200000) // ns
/* length of segment filename like '/1' */
#define SEG_MAINFORK_FILENAME_LEN 2

View File

@ -74,10 +74,10 @@ typedef struct st_ss_dms_func {
int (*dms_broadcast_opengauss_ddllock)(dms_context_t *dms_ctx, char *data, unsigned int len,
unsigned char handle_recv_msg, unsigned int timeout, unsigned char resend_after_reform);
int (*dms_reform_last_failed)(void);
bool (*dms_latch_timed_x)(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned int wait_ticks);
bool (*dms_latch_timed_s)(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned int wait_ticks,
unsigned char is_force);
void (*dms_unlatch)(dms_context_t *dms_ctx, dms_drlatch_t *dlatch);
bool (*dms_latch_timed_x)(dms_drlatch_t *dlatch, unsigned int sid, unsigned int wait_ticks, void *dms_stat);
bool (*dms_latch_timed_s)(dms_drlatch_t *dlatch, unsigned int sid, unsigned int wait_ticks,
unsigned char is_force, void *dms_stat);
void (*dms_unlatch)(dms_drlatch_t *dlatch, void *dms_stat);
void (*dms_pre_uninit)(void);
int (*dms_init_logger)(logger_param_t *log_param);
void (*dms_refresh_logger)(char *log_field, unsigned long long *value);
@ -146,9 +146,9 @@ int dms_drc_accessible(unsigned char res_type);
int dms_broadcast_opengauss_ddllock(dms_context_t *dms_ctx, char *data, unsigned int len, unsigned char handle_recv_msg,
unsigned int timeout, unsigned char resend_after_reform);
int dms_reform_last_failed(void);
bool dms_latch_timed_x(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned int wait_ticks);
bool dms_latch_timed_s(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned int wait_ticks, unsigned char is_force);
void dms_unlatch(dms_context_t *dms_ctx, dms_drlatch_t *dlatch);
bool dms_latch_timed_x(dms_drlatch_t *dlatch, unsigned int sid, unsigned int wait_ticks, void *dms_stat);
bool dms_latch_timed_s(dms_drlatch_t *dlatch, unsigned int sid, unsigned int wait_ticks, unsigned char is_force, void *dms_stat);
void dms_unlatch(dms_drlatch_t *dlatch, void *dms_stat);
void dms_pre_uninit(void);
void dms_validate_drc(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned long long lsn, unsigned char is_dirty);
int dms_reform_req_opengauss_ondemand_redo_buffer(dms_context_t *dms_ctx, void *block_key, unsigned int key_len,