【资源池化】【bugfix】【MES worker线程池化】解决CBGetHandle产生的问题

This commit is contained in:
dongning12
2024-01-25 15:36:11 +08:00
parent 7e136f5dac
commit 85d03f7285
5 changed files with 10 additions and 5 deletions

View File

@ -969,7 +969,7 @@ static void *CBGetHandle(unsigned int *db_handle_index, dms_session_type_e sessi
int index = fs_cxt->quickFetchIndex; int index = fs_cxt->quickFetchIndex;
fs_cxt->fake_sessions[index] = true; fs_cxt->fake_sessions[index] = true;
fs_cxt->quickFetchIndex++; fs_cxt->quickFetchIndex++;
if (fs_cxt->quickFetchIndex >= NUM_DMS_CALLBACK_PROCS) { if (fs_cxt->quickFetchIndex >= fs_cxt->fake_session_cnt) {
fs_cxt->quickFetchIndex = 0; fs_cxt->quickFetchIndex = 0;
} }
SpinLockRelease(&fs_cxt->lock); SpinLockRelease(&fs_cxt->lock);
@ -980,8 +980,8 @@ static void *CBGetHandle(unsigned int *db_handle_index, dms_session_type_e sessi
int start_index = fs_cxt->quickFetchIndex; int start_index = fs_cxt->quickFetchIndex;
int cur_index = 0; int cur_index = 0;
bool found = false; bool found = false;
for (int i = 0; i < NUM_DMS_CALLBACK_PROCS; i++) { for (int i = 0; i < fs_cxt->fake_session_cnt; i++) {
cur_index = (start_index + i) % NUM_DMS_CALLBACK_PROCS; cur_index = (start_index + i) % fs_cxt->fake_session_cnt;
if (!fs_cxt->fake_sessions[cur_index]) { if (!fs_cxt->fake_sessions[cur_index]) {
found = true; found = true;
break; break;
@ -994,7 +994,7 @@ static void *CBGetHandle(unsigned int *db_handle_index, dms_session_type_e sessi
} }
fs_cxt->quickFetchIndex = cur_index + 1; fs_cxt->quickFetchIndex = cur_index + 1;
if (fs_cxt->quickFetchIndex >= NUM_DMS_CALLBACK_PROCS) { if (fs_cxt->quickFetchIndex >= fs_cxt->fake_session_cnt) {
fs_cxt->quickFetchIndex = 0; fs_cxt->quickFetchIndex = 0;
} }
SpinLockRelease(&fs_cxt->lock); SpinLockRelease(&fs_cxt->lock);

View File

@ -368,8 +368,10 @@ static void SetWorkThreadpoolConfig(dms_profile_t *profile)
return; return;
} }
char* replStr = NULL;
replStr = pstrdup(attr);
profile->mes_task_worker_max_cnt = (unsigned int)pg_strtoint32(replStr);
profile->enable_mes_task_threadpool = true; profile->enable_mes_task_threadpool = true;
profile->mes_task_worker_max_cnt = g_instance.attr.attr_storage.dms_attr.work_thread_pool_max_cnt;
} }
static void setDMSProfile(dms_profile_t* profile) static void setDMSProfile(dms_profile_t* profile)

View File

@ -242,6 +242,7 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt)
ss_fake_seesion_context_t *fs_cxt = &g_instance.dms_cxt.SSFakeSessionCxt; ss_fake_seesion_context_t *fs_cxt = &g_instance.dms_cxt.SSFakeSessionCxt;
SpinLockInit(&fs_cxt->lock); SpinLockInit(&fs_cxt->lock);
fs_cxt->fake_sessions = NULL; fs_cxt->fake_sessions = NULL;
fs_cxt->fake_session_cnt = 0;
fs_cxt->quickFetchIndex = 0; fs_cxt->quickFetchIndex = 0;
fs_cxt->session_start = 0; fs_cxt->session_start = 0;
} }

View File

@ -439,6 +439,7 @@ void InitProcGlobal(void)
size_t size = NUM_DMS_CALLBACK_PROCS * sizeof(bool); size_t size = NUM_DMS_CALLBACK_PROCS * sizeof(bool);
g_instance.dms_cxt.SSFakeSessionCxt.fake_sessions = g_instance.dms_cxt.SSFakeSessionCxt.fake_sessions =
(bool*)CACHELINEALIGN(palloc0(size + PG_CACHE_LINE_SIZE)); (bool*)CACHELINEALIGN(palloc0(size + PG_CACHE_LINE_SIZE));
g_instance.dms_cxt.SSFakeSessionCxt.fake_session_cnt = NUM_DMS_CALLBACK_PROCS;
} }
} else if (i < g_instance.shmem_cxt.MaxConnections + thread_pool_stream_proc_num + AUXILIARY_BACKENDS + } else if (i < g_instance.shmem_cxt.MaxConnections + thread_pool_stream_proc_num + AUXILIARY_BACKENDS +
g_instance.attr.attr_sql.job_queue_processes + 1 + NUM_DCF_CALLBACK_PROCS + NUM_CMAGENT_PROCS + \ g_instance.attr.attr_sql.job_queue_processes + 1 + NUM_DCF_CALLBACK_PROCS + NUM_CMAGENT_PROCS + \

View File

@ -36,6 +36,7 @@
typedef struct st_ss_fake_seesion_context { typedef struct st_ss_fake_seesion_context {
slock_t lock; slock_t lock;
bool *fake_sessions; bool *fake_sessions;
uint32 fake_session_cnt;
uint32 quickFetchIndex; uint32 quickFetchIndex;
uint32 session_start; uint32 session_start;
} ss_fake_seesion_context_t; } ss_fake_seesion_context_t;