【资源池化】opengauss侧适配DMS的并行逻辑

This commit is contained in:
dongning12
2023-04-17 11:27:40 +08:00
parent f4a28676e9
commit 649b5fda4f
13 changed files with 137 additions and 53 deletions

View File

@ -721,6 +721,7 @@ ss_interconnect_type|string|0,0|NULL|NULL|
ss_log_level|int|0,887|NULL|NULL|
ss_log_backup_file_count|int|0,1024|NULL|NULL|
ss_log_max_file_size|int|1024,4194304|kB|NULL|
ss_parallel_thread_count|int|0,64|NULL|NULL|
ss_instance_id|int|0,63|NULL|NULL|
ss_interconnect_url|string|0,0|NULL|NULL|
ss_rdma_work_config|string|0,0|NULL|NULL|

View File

@ -3567,6 +3567,20 @@ static void InitStorageConfigureNamesInt()
4 * 1024 *1024,
NULL,
assign_ss_log_max_file_size,
NULL},
{{"ss_parallel_thread_count",
PGC_POSTMASTER,
NODE_SINGLENODE,
SHARED_STORAGE_OPTIONS,
gettext_noop("Sets ss reform parallel thread count"),
NULL,
GUC_SUPERUSER_ONLY},
&g_instance.attr.attr_storage.dms_attr.parallel_thread_num,
16,
0,
64,
NULL,
NULL,
NULL},
/* End-of-list marker */
{{NULL,

View File

@ -845,3 +845,4 @@ job_queue_processes = 10 # Number of concurrent jobs, optional: [0..1000]
#ss_log_level = 7
#ss_log_backup_file_count = 10
#ss_log_max_file_size = 10MB
#ss_parallel_thread_count = 16

View File

@ -106,6 +106,7 @@ int ss_dms_func_init()
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_wait_reform));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_get_event));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_buf_res_rebuild_drc));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_buf_res_rebuild_drc_parallel));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_is_recovery_session));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(drc_get_page_master_id));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_release_page_batch));
@ -268,6 +269,12 @@ int dms_buf_res_rebuild_drc(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsign
return g_ss_dms_func.dms_buf_res_rebuild_drc(dms_ctx, ctrl, lsn, is_dirty);
}
int dms_buf_res_rebuild_drc_parallel(dms_context_t *dms_ctx, dms_ctrl_info_t *ctrl_info, unsigned char thread_index,
unsigned char for_rebuild)
{
return g_ss_dms_func.dms_buf_res_rebuild_drc_parallel(dms_ctx, ctrl_info, thread_index, for_rebuild);
}
int dms_is_recovery_session(unsigned int sid)
{
return g_ss_dms_func.dms_is_recovery_session(sid);

View File

@ -533,33 +533,6 @@ void BufValidateDrc(BufferDesc *buf_desc)
dms_validate_drc(&dms_ctx, buf_ctrl, lsn, (unsigned char)is_dirty);
}
int32 CheckBuf4Rebuild(BufferDesc *buf_desc)
{
#ifdef USE_ASSERT_CHECKING
if (IsSegmentPhysicalRelNode(buf_desc->tag.rnode)) {
SegNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL);
} else {
SmgrNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL);
}
#endif
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
Assert(buf_ctrl != NULL);
Assert(buf_ctrl->is_edp != 1);
Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo));
dms_context_t dms_ctx;
InitDmsBufContext(&dms_ctx, buf_desc->tag);
bool is_dirty = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED)) > 0 ? true : false;
int ret = dms_buf_res_rebuild_drc(&dms_ctx, buf_ctrl, (unsigned long long)BufferGetLSN(buf_desc), is_dirty);
if (ret != DMS_SUCCESS) {
ereport(LOG, (errmsg("Failed to rebuild page, rel:%u/%u/%u/%d, forknum:%d, blocknum:%u.",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum)));
return ret;
}
return DMS_SUCCESS;
}
int SSLockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait,
dms_opengauss_lock_req_type_t reqType)
{

View File

@ -1065,34 +1065,102 @@ static bool SSCheckBufferIfCanGoRebuild(BufferDesc* buf_desc, uint32 buf_state)
return ret;
}
static int32 CBDrcBufRebuild(void *db_handle)
static int32 SSRebuildBuf(BufferDesc *buf_desc, unsigned char thread_index)
{
/* Load Control File */
int src_id = SSGetPrimaryInstId();
SSReadControlFile(src_id, true);
uint32 buf_state;
for (int i = 0; i < TOTAL_BUFFER_NUM; i++) {
BufferDesc *buf_desc = GetBufferDescriptor(i);
buf_state = LockBufHdr(buf_desc);
if (SSCheckBufferIfCanGoRebuild(buf_desc, buf_state)) {
int ret = CheckBuf4Rebuild(buf_desc);
if (ret != DMS_SUCCESS) {
if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) {
LWLockRelease(buf_desc->io_in_progress_lock);
}
UnlockBufHdr(buf_desc, buf_state);
return ret;
}
}
if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) {
LWLockRelease(buf_desc->io_in_progress_lock);
}
UnlockBufHdr(buf_desc, buf_state);
#ifdef USE_ASSERT_CHECKING
if (IsSegmentPhysicalRelNode(buf_desc->tag.rnode)) {
SegNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL);
} else {
SmgrNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL);
}
#endif
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
Assert(buf_ctrl != NULL);
Assert(buf_ctrl->is_edp != 1);
Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo));
dms_context_t dms_ctx;
InitDmsBufContext(&dms_ctx, buf_desc->tag);
dms_ctrl_info_t ctrl_info = { 0 };
ctrl_info.ctrl = *buf_ctrl;
ctrl_info.lsn = (unsigned long long)BufferGetLSN(buf_desc);
ctrl_info.is_dirty = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED)) > 0 ? true : false;
int ret = dms_buf_res_rebuild_drc_parallel(&dms_ctx, &ctrl_info, thread_index, true);
if (ret != DMS_SUCCESS) {
ereport(WARNING, (errmsg("Failed to rebuild page, rel:%u/%u/%u/%d, forknum:%d, blocknum:%u.",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum)));
return ret;
}
return DMS_SUCCESS;
}
static int32 CBDrcBufRebuildInternal(int begin, int len, unsigned char thread_index)
{
uint32 buf_state;
Assert(begin >= 0 && len > 0 && (begin + len) <= TOTAL_BUFFER_NUM);
for (int i = begin; i < begin + len; i++) {
BufferDesc *buf_desc = GetBufferDescriptor(i);
if (LWLockConditionalAcquire(buf_desc->content_lock, LW_EXCLUSIVE)) {
buf_state = LockBufHdr(buf_desc);
if (buf_state & BM_VALID) {
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
buf_ctrl->lock_mode = DMS_LOCK_NULL;
}
UnlockBufHdr(buf_desc, buf_state);
LWLockRelease(buf_desc->content_lock);
} else {
buf_state = LockBufHdr(buf_desc);
if (SSCheckBufferIfCanGoRebuild(buf_desc, buf_state)) {
int ret = SSRebuildBuf(buf_desc, thread_index);
if (ret != DMS_SUCCESS) {
if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) {
LWLockRelease(buf_desc->io_in_progress_lock);
}
UnlockBufHdr(buf_desc, buf_state);
return ret;
}
}
if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) {
LWLockRelease(buf_desc->io_in_progress_lock);
}
UnlockBufHdr(buf_desc, buf_state);
}
}
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform] rebuild buf thread_index:%d, buf_if start from:%d to:%d, max_buf_id:%d",
(int)thread_index, begin, (begin + len - 1), (TOTAL_BUFFER_NUM - 1))));
return GS_SUCCESS;
}
/*
* as you can see, thread_num represets the number of thread. thread_index reprsents the n-th thread, begin from 0.
* special case:
* when parallel disable, rebuild phase still call this function,
* do you think thread_num is 1, and thread_index is 0 ?
* actually thread_num and thread_index are 255. It just a agreement in DMS
*/
const int dms_invalid_thread_index = 255;
const int dms_invalid_thread_num = 255;
static int32 CBDrcBufRebuildParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num,
unsigned char for_rebuild)
{
Assert((thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) ||
(thread_index != dms_invalid_thread_index && thread_num != dms_invalid_thread_num &&
thread_index < thread_num));
int buf_num = TOTAL_BUFFER_NUM / thread_num;
int buf_begin = thread_index * buf_num;
if (thread_index == thread_num - 1) {
buf_num = TOTAL_BUFFER_NUM - buf_begin;
}
if (thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) {
buf_begin = 0;
buf_num = TOTAL_BUFFER_NUM;
}
return CBDrcBufRebuildInternal(buf_begin, buf_num, thread_index);
}
static int32 CBDrcBufValidate(void *db_handle)
{
/* Load Control File */
@ -1514,6 +1582,9 @@ static void CBReformStartNotify(void *db_handle, dms_role_t role, unsigned char
}
}
int old_primary = SSGetPrimaryInstId();
SSReadControlFile(old_primary, true);
/* cluster has no transactions during startup reform */
if (!g_instance.dms_cxt.SSRecoveryInfo.startup_reform) {
SendPostmasterSignal(PMSIGNAL_DMS_REFORM);
@ -1643,7 +1714,7 @@ void DmsInitCallback(dms_callback_t *callback)
callback->opengauss_recovery_primary = CBRecoveryPrimary;
callback->get_dms_status = CBGetDmsStatus;
callback->set_dms_status = CBSetDmsStatus;
callback->dms_reform_rebuild_buf_res = CBDrcBufRebuild;
callback->dms_reform_rebuild_parallel = CBDrcBufRebuildParallel;
callback->dms_thread_init = DmsCallbackThreadShmemInit;
callback->confirm_owner = CBConfirmOwner;
callback->confirm_converting = CBConfirmConverting;

View File

@ -371,6 +371,7 @@ static void setDMSProfile(dms_profile_t* profile)
profile->inst_map = 0;
profile->enable_reform = (unsigned char)dms_attr->enable_reform;
profile->load_balance_mode = 1; /* primary-standby */
profile->parallel_thread_num = dms_attr->parallel_thread_num;
if (dms_attr->enable_ssl && g_instance.attr.attr_security.EnableSSL) {
InitDmsSSL();

View File

@ -2435,6 +2435,14 @@ found_branch:
Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));
do {
if (!DmsCheckBufAccessible()) {
if(LWLockHeldByMe(bufHdr->io_in_progress_lock)) {
TerminateBufferIO(bufHdr, false, 0);
}
pg_usleep(5000L);
continue;
}
bool startio;
if (LWLockHeldByMe(bufHdr->io_in_progress_lock)) {
startio = true;

View File

@ -57,6 +57,8 @@ typedef struct st_ss_dms_func {
void (*dms_get_event)(dms_wait_event_t event_type, unsigned long long *event_cnt, unsigned long long *event_time);
int (*dms_buf_res_rebuild_drc)(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned long long lsn,
unsigned char is_dirty);
int (*dms_buf_res_rebuild_drc_parallel)(dms_context_t *dms_ctx, dms_ctrl_info_t *ctrl_info, unsigned char thread_index,
unsigned char for_rebuild);
int (*dms_is_recovery_session)(unsigned int sid);
int (*drc_get_page_master_id)(char pageid[DMS_PAGEID_SIZE], unsigned char *master_id);
int (*dms_release_page_batch)(dms_context_t *dms_ctx, dcs_batch_buf_t *owner_map, unsigned int *owner_count);
@ -103,6 +105,8 @@ int dms_wait_reform(unsigned int *has_offline);
void dms_get_event(dms_wait_event_t event_type, unsigned long long *event_cnt, unsigned long long *event_time);
int dms_buf_res_rebuild_drc(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned long long lsn,
unsigned char is_dirty);
int dms_buf_res_rebuild_drc_parallel(dms_context_t *dms_ctx, dms_ctrl_info_t *ctrl_info, unsigned char thread_index,
unsigned char for_rebuild);
int dms_is_recovery_session(unsigned int sid);
int drc_get_page_master_id(char pageid[DMS_PAGEID_SIZE], unsigned char *master_id);
int dms_release_page_batch(dms_context_t *dms_ctx, dcs_batch_buf_t *owner_map, unsigned int *owner_count);

View File

@ -50,7 +50,7 @@ typedef struct SSBroadcastDDLLock {
void InitDmsBufCtrl(void);
void InitDmsContext(dms_context_t* dmsContext);
void InitDmsBufContext(dms_context_t* dmsBufCxt, BufferTag buftag);
void MarkReadHint(int buf_id, char persistence, bool extend, const XLogPhyBlock *pblk);
bool LockModeCompatible(dms_buf_ctrl_t *buf_ctrl, LWLockMode mode);
bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode);
@ -61,7 +61,6 @@ Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegS
Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io);
Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io);
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id, unsigned char* released);
int32 CheckBuf4Rebuild(BufferDesc* buf_desc);
int SSLockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait,
dms_opengauss_lock_req_type_t reqType = LOCK_NORMAL_MODE);
int SSLockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock);

View File

@ -121,6 +121,7 @@ typedef struct knl_instance_attr_dms {
int32 sslog_level;
int32 sslog_backup_file_count;
int32 sslog_max_file_size; //Unit:KB
int parallel_thread_num;
} knl_instance_attr_dms;
typedef struct knl_instance_attr_storage {

View File

@ -451,6 +451,8 @@ const int MAX_COMPACTION_THREAD_NUM = 10;
#define NUM_DMS_REFORM_CALLLBACK_PROCS (5)
#define NUM_DMS_LSNR_CALLBACK_PROC (1)
#define NUM_DMS_SMON_CALLBACK_PROC (1)
#define NUM_DMS_PARALLEL_CALLBACK_PROC (g_instance.attr.attr_storage.dms_attr.parallel_thread_num <= 1 ? 0 : \
g_instance.attr.attr_storage.dms_attr.parallel_thread_num)
#define NUM_DMS_RDMA_THREAD_CNT (g_instance.attr.attr_storage.dms_attr.work_thread_count * 2)
#define NUM_DMS_CALLBACK_PROCS \
(g_instance.attr.attr_storage.dms_attr.enable_dms ? \
@ -460,6 +462,7 @@ const int MAX_COMPACTION_THREAD_NUM = 10;
NUM_DMS_RDMA_THREAD_CNT) + \
NUM_DMS_LSNR_CALLBACK_PROC + \
NUM_DMS_SMON_CALLBACK_PROC + \
NUM_DMS_PARALLEL_CALLBACK_PROC + \
NUM_DMS_REFORM_CALLLBACK_PROCS ) : 0)
#define GLOBAL_ALL_PROCS \

View File

@ -630,6 +630,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c
ss_log_max_file_size | integer | kB | 1024 | 4194304
ssl_renegotiation_limit | integer | kB | 0 | 2147483647
ss_ock_log_path | string | | |
ss_parallel_thread_count | integer | | 0 | 64
ss_rdma_work_config | string | | |
ss_recv_msg_pool_size | integer | kB | 1024 | 1048576
ss_scrlock_server_bind_core | string | | |