!4788 【资源池化】【bugfix】【MES worker线程池化】解决CBGetHandle产生的问题
Merge pull request !4788 from 董宁/dev2_fix3
This commit is contained in:
@ -969,7 +969,7 @@ static void *CBGetHandle(unsigned int *db_handle_index, dms_session_type_e sessi
|
||||
int index = fs_cxt->quickFetchIndex;
|
||||
fs_cxt->fake_sessions[index] = true;
|
||||
fs_cxt->quickFetchIndex++;
|
||||
if (fs_cxt->quickFetchIndex >= NUM_DMS_CALLBACK_PROCS) {
|
||||
if (fs_cxt->quickFetchIndex >= fs_cxt->fake_session_cnt) {
|
||||
fs_cxt->quickFetchIndex = 0;
|
||||
}
|
||||
SpinLockRelease(&fs_cxt->lock);
|
||||
@ -980,8 +980,8 @@ static void *CBGetHandle(unsigned int *db_handle_index, dms_session_type_e sessi
|
||||
int start_index = fs_cxt->quickFetchIndex;
|
||||
int cur_index = 0;
|
||||
bool found = false;
|
||||
for (int i = 0; i < NUM_DMS_CALLBACK_PROCS; i++) {
|
||||
cur_index = (start_index + i) % NUM_DMS_CALLBACK_PROCS;
|
||||
for (int i = 0; i < fs_cxt->fake_session_cnt; i++) {
|
||||
cur_index = (start_index + i) % fs_cxt->fake_session_cnt;
|
||||
if (!fs_cxt->fake_sessions[cur_index]) {
|
||||
found = true;
|
||||
break;
|
||||
@ -994,7 +994,7 @@ static void *CBGetHandle(unsigned int *db_handle_index, dms_session_type_e sessi
|
||||
}
|
||||
|
||||
fs_cxt->quickFetchIndex = cur_index + 1;
|
||||
if (fs_cxt->quickFetchIndex >= NUM_DMS_CALLBACK_PROCS) {
|
||||
if (fs_cxt->quickFetchIndex >= fs_cxt->fake_session_cnt) {
|
||||
fs_cxt->quickFetchIndex = 0;
|
||||
}
|
||||
SpinLockRelease(&fs_cxt->lock);
|
||||
|
@ -368,8 +368,10 @@ static void SetWorkThreadpoolConfig(dms_profile_t *profile)
|
||||
return;
|
||||
}
|
||||
|
||||
char* replStr = NULL;
|
||||
replStr = pstrdup(attr);
|
||||
profile->mes_task_worker_max_cnt = (unsigned int)pg_strtoint32(replStr);
|
||||
profile->enable_mes_task_threadpool = true;
|
||||
profile->mes_task_worker_max_cnt = g_instance.attr.attr_storage.dms_attr.work_thread_pool_max_cnt;
|
||||
}
|
||||
|
||||
static void setDMSProfile(dms_profile_t* profile)
|
||||
|
@ -242,6 +242,7 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt)
|
||||
ss_fake_seesion_context_t *fs_cxt = &g_instance.dms_cxt.SSFakeSessionCxt;
|
||||
SpinLockInit(&fs_cxt->lock);
|
||||
fs_cxt->fake_sessions = NULL;
|
||||
fs_cxt->fake_session_cnt = 0;
|
||||
fs_cxt->quickFetchIndex = 0;
|
||||
fs_cxt->session_start = 0;
|
||||
}
|
||||
|
@ -439,6 +439,7 @@ void InitProcGlobal(void)
|
||||
size_t size = NUM_DMS_CALLBACK_PROCS * sizeof(bool);
|
||||
g_instance.dms_cxt.SSFakeSessionCxt.fake_sessions =
|
||||
(bool*)CACHELINEALIGN(palloc0(size + PG_CACHE_LINE_SIZE));
|
||||
g_instance.dms_cxt.SSFakeSessionCxt.fake_session_cnt = NUM_DMS_CALLBACK_PROCS;
|
||||
}
|
||||
} else if (i < g_instance.shmem_cxt.MaxConnections + thread_pool_stream_proc_num + AUXILIARY_BACKENDS +
|
||||
g_instance.attr.attr_sql.job_queue_processes + 1 + NUM_DCF_CALLBACK_PROCS + NUM_CMAGENT_PROCS + \
|
||||
|
@ -36,6 +36,7 @@
|
||||
typedef struct st_ss_fake_seesion_context {
|
||||
slock_t lock;
|
||||
bool *fake_sessions;
|
||||
uint32 fake_session_cnt;
|
||||
uint32 quickFetchIndex;
|
||||
uint32 session_start;
|
||||
} ss_fake_seesion_context_t;
|
||||
|
Reference in New Issue
Block a user