diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index f15ae1caf..148b4fb81 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -1451,21 +1451,31 @@ static int32 CBBufRebuildDrcInternal(int begin, int len, unsigned char thread_in */ const int dms_invalid_thread_index = 255; const int dms_invalid_thread_num = 255; -static int32 CBBufRebuildDrcParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num) +static void CBAllocBufRangeForThread(unsigned char thread_index, unsigned char thread_num, + int* buf_begin, int* buf_num) { Assert((thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) || (thread_index != dms_invalid_thread_index && thread_num != dms_invalid_thread_num && thread_index < thread_num)); - int buf_num = TOTAL_BUFFER_NUM / thread_num; - int buf_begin = thread_index * buf_num; + int num = TOTAL_BUFFER_NUM / thread_num; + int begin = thread_index * num; if (thread_index == thread_num - 1) { - buf_num = TOTAL_BUFFER_NUM - buf_begin; + num = TOTAL_BUFFER_NUM - begin; } if (thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) { - buf_begin = 0; - buf_num = TOTAL_BUFFER_NUM; + begin = 0; + num = TOTAL_BUFFER_NUM; } + *buf_begin = begin; + *buf_num = num; +} + +static int32 CBBufRebuildDrcParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num) +{ + int buf_begin = 0; + int buf_num = 0; + CBAllocBufRangeForThread(thread_index, thread_num, &buf_begin, &buf_num); return CBBufRebuildDrcInternal(buf_begin, buf_num, thread_index); } @@ -2240,6 +2250,19 @@ int CBDoCheckpointImmediately(unsigned long long *ckpt_lsn) return GS_SUCCESS; } +int CBBufCtrlRcyClean(void *db_handle, unsigned char thread_index, unsigned char thread_num) +{ + int buf_begin = 0; + int buf_num = 0; + CBAllocBufRangeForThread(thread_index, thread_num, &buf_begin, &buf_num); + int buf_end = buf_begin + buf_num - 1; + for (int i = buf_begin; i <= buf_end; i++) { + dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(i); + buf_ctrl->in_rcy = false; + } + return GS_SUCCESS; +} + void DmsInitCallback(dms_callback_t *callback) { // used in reform @@ -2308,4 +2331,5 @@ void DmsInitCallback(dms_callback_t *callback) callback->buf_ctrl_recycle = CBBufCtrlRecycle; callback->dms_thread_deinit = DmsThreadDeinit; callback->opengauss_do_ckpt_immediate = CBDoCheckpointImmediately; + callback->dms_ctl_rcy_clean_parallel = CBBufCtrlRcyClean; } diff --git a/src/gausskernel/ddes/adapter/ss_init.cpp b/src/gausskernel/ddes/adapter/ss_init.cpp index d982ec374..4adb31ee2 100644 --- a/src/gausskernel/ddes/adapter/ss_init.cpp +++ b/src/gausskernel/ddes/adapter/ss_init.cpp @@ -407,7 +407,7 @@ static void setDMSProfile(dms_profile_t* profile) profile->max_session_cnt = DMS_MAX_SESSIONS; profile->time_stat_enabled = TRUE; profile->pipe_type = convertInterconnectType(); - profile->conn_created_during_init = TRUE; + profile->conn_created_during_init = false; setRdmaWorkConfig(profile); setScrlConfig(profile); SetOckLogPath(dms_attr, profile->ock_log_path); diff --git a/src/gausskernel/ddes/ddes_commit_id b/src/gausskernel/ddes/ddes_commit_id index da57c1e16..f4097434f 100644 --- a/src/gausskernel/ddes/ddes_commit_id +++ b/src/gausskernel/ddes/ddes_commit_id @@ -1,3 +1,3 @@ -dms_commit_id=6de342c050a9ff2ac5cb5b462a699e46c88bd156 +dms_commit_id=402eef8afae247c9592959c77fcf136aac9e2dd9 dss_commit_id=621eb9d6aac34726db404446511be2de9ae32a3f cbb_commit_id=2ea0e4ea6349f00ca85793480ee1ced952c3c8c7