diff --git a/build_make.sh b/build_make.sh index 197ae47..96ae49d 100644 --- a/build_make.sh +++ b/build_make.sh @@ -78,7 +78,7 @@ function update_dcc_dependency() { fi if [ "x${THIRD_BIN_PATH}" != "x" ]; then - local dccHome="${THIRD_BIN_PATH}/kernel/component/dcc" + local dcc_home="${THIRD_BIN_PATH}/kernel/component/dcc" if [ -d "${dcc_home}" ]; then echo "We well get dcc lib from 3rd[${dcc_home}]." diff --git a/src/cm_adapter/cm_sharedisk_adapter/cm_ddb_sharedisk.cpp b/src/cm_adapter/cm_sharedisk_adapter/cm_ddb_sharedisk.cpp index 2031374..031e50d 100644 --- a/src/cm_adapter/cm_sharedisk_adapter/cm_ddb_sharedisk.cpp +++ b/src/cm_adapter/cm_sharedisk_adapter/cm_ddb_sharedisk.cpp @@ -20,6 +20,7 @@ * * ------------------------------------------------------------------------- */ +#include #include "cm/cm_c.h" #include "cm/cm_elog.h" #include "cm_disk_rw.h" @@ -33,8 +34,24 @@ static DDB_ROLE g_notifySd = DDB_ROLE_UNKNOWN; static DDB_ROLE g_dbRole = DDB_ROLE_FOLLOWER; static uint32 g_cmServerNum = 0; static int64 g_waitForTime = 0; -static volatile int64 g_waitForChangeTime = 0; +static volatile int64 g_notifyBeginSec = 0; const uint32 ONE_PRIMARY_ONE_STANDBY = 2; +static DdbArbiCon *g_arbiCon = NULL; +static const time_t MAX_VALID_LOCK_TIME = 125; +static const time_t BASE_VALID_LOCK_TIME = 1; +static const uint32 DEFAULT_CMD_TIME_OUT = 2; + +typedef enum en_persist_cmd_type { + CMD_LOCK = 0, + CMD_FORCE_LOCK = 1, +} PERSIST_CMD_TYPE; + +typedef struct SdArbitrateDataSt { + uint32 lockNotRefreshTimes; + time_t lockFailBeginTime; + time_t lockTime; + DDB_ROLE lastDdbRole; +} SdArbitrateData; static status_t SdLoadApi(const DrvApiInfo *apiInfo); @@ -184,14 +201,15 @@ static void DrvNotifySd(DDB_ROLE dbRole) return; } - if (g_dbRole != dbRole && g_cmServerNum > ONE_PRIMARY_ONE_STANDBY) { - if (dbRole == DDB_ROLE_FOLLOWER) { - (void)pthread_rwlock_wrlock(&g_notifySdLock); - g_waitForChangeTime = g_waitForTime; - g_notifySd = DDB_ROLE_FOLLOWER; - ddbNotiStatusFun(DDB_ROLE_FOLLOWER); - (void)pthread_rwlock_unlock(&g_notifySdLock); - } + if (g_dbRole != dbRole) { + struct timespec checkBegin = {0, 0}; + (void)clock_gettime(CLOCK_MONOTONIC, &checkBegin); + + (void)pthread_rwlock_wrlock(&g_notifySdLock); + g_notifyBeginSec = checkBegin.tv_sec; + g_notifySd = dbRole; + ddbNotiStatusFun(dbRole); + (void)pthread_rwlock_unlock(&g_notifySdLock); write_runlog(LOG, "receive notify msg, it has set ddb role, dbRole is [%d: %d], g_waitForTime is %ld, " "g_cmServerNum is %u.\n", (int32)dbRole, (int32)g_dbRole, g_waitForTime, g_cmServerNum); } @@ -248,47 +266,287 @@ static void NotifyDdbRole(DDB_ROLE *lastDdbRole) *lastDdbRole = g_dbRole; } +static uint32 GetForceLockTimeOutCfg() +{ + uint32 curForceLockTimeOut = g_arbiCon->arbiCfg->haHeartBeatTimeOut; + if (curForceLockTimeOut < DEFAULT_CMD_TIME_OUT) { + curForceLockTimeOut = DEFAULT_CMD_TIME_OUT; + } + return curForceLockTimeOut; +} + +int ExecuteLockCmd(const char *cmd) +{ + int ret = system(cmd); + if (ret == -1) { + write_runlog(ERROR, "Fail to execute command %s, and errno=%d.\n", cmd, errno); + return -1; + } + + write_runlog(DEBUG1, "execute command %s exit code %d.\n", cmd, ret); + if (WIFEXITED(ret)) { + return WEXITSTATUS(ret); + } + + write_runlog(ERROR, "Fail to execute command %s, script exit code %d.\n", cmd, ret); + return -1; +} + +static bool CheckDemoteDdbRole(SdArbitrateData *sdArbitrateData) +{ + if (sdArbitrateData->lastDdbRole != DDB_ROLE_LEADER) { + sdArbitrateData->lockFailBeginTime = 0; + return true; + } + + uint32 forceLockTimeOut = GetForceLockTimeOutCfg(); + uint32 exeCmdTwiceTimeOut = DEFAULT_CMD_TIME_OUT + DEFAULT_CMD_TIME_OUT; + if (exeCmdTwiceTimeOut >= forceLockTimeOut) { + sdArbitrateData->lockFailBeginTime = 0; + return true; + } + + // try to avoid execute cmd timeout, so use diff time to demote cms primary + uint32 diffTimeOut = forceLockTimeOut - exeCmdTwiceTimeOut; + struct timespec time = {0, 0}; + (void)clock_gettime(CLOCK_MONOTONIC, &time); + time_t minusTime = time.tv_sec - sdArbitrateData->lockFailBeginTime; + if (minusTime >= diffTimeOut) { + write_runlog(LOG, + "CheckDemoteDdbRole: CMS primary will demote for current time %ld, lockFailBeginTime %ld, diffTimeOut " + "%u.\n", + time.tv_sec, + sdArbitrateData->lockFailBeginTime, + diffTimeOut); + return true; + } + write_runlog(LOG, + "CheckDemoteDdbRole: CMS primary demote will wait %u seconds for current time %ld, lockFailBeginTime %ld, " + "diffTimeOut %u.\n", + (uint32)(diffTimeOut - minusTime), + time.tv_sec, + sdArbitrateData->lockFailBeginTime, + diffTimeOut); + + return false; +} + +static void CmNormalArbitrate(const char *lockCmd, SdArbitrateData *sdArbitrateData) +{ + int lockRst = ExecuteLockCmd(lockCmd); + + write_runlog(DEBUG1, + "CmNormalArbitrate: execute lock cmd %s result %d. lockTime %ld, lockNotRefreshTimes %u, lockFailBeginTime " + "%ld\n", lockCmd, lockRst, sdArbitrateData->lockTime, sdArbitrateData->lockNotRefreshTimes, + sdArbitrateData->lockFailBeginTime); + if (lockRst == 0) { + // get lock success, notify cmserver to primary + sdArbitrateData->lockTime = 0; + sdArbitrateData->lockNotRefreshTimes = 0; + sdArbitrateData->lockFailBeginTime = 0; + g_dbRole = DDB_ROLE_LEADER; + NotifyDdbRole(&sdArbitrateData->lastDdbRole); + return; + } + + if (lockRst >= BASE_VALID_LOCK_TIME && lockRst <= MAX_VALID_LOCK_TIME) { + g_dbRole = DDB_ROLE_FOLLOWER; + sdArbitrateData->lockFailBeginTime = 0; + // get lock failed, check lock time if refreshed by other process + if (sdArbitrateData->lockTime != lockRst) { + sdArbitrateData->lockTime = lockRst; + sdArbitrateData->lockNotRefreshTimes = 0; + } else { + const uint32 defaultNotRefreshTimes = 2; + uint32 curForceLockTimeOut = GetForceLockTimeOutCfg(); + int logLevel = sdArbitrateData->lockNotRefreshTimes >= defaultNotRefreshTimes ? LOG : DEBUG1; + write_runlog(logLevel, + "CmNormalArbitrate: other cmserver maybe lost lock for %u times, current lock time %ld\n", + sdArbitrateData->lockNotRefreshTimes, sdArbitrateData->lockTime); + if (sdArbitrateData->lockNotRefreshTimes < curForceLockTimeOut) { + ++sdArbitrateData->lockNotRefreshTimes; + } else { + sdArbitrateData->lockNotRefreshTimes = 0; + char forceLockCmd[MAX_PATH_LEN] = {0}; + errno_t rc = snprintf_s(forceLockCmd, MAX_PATH_LEN, MAX_PATH_LEN - 1, + "timeout -s SIGKILL %us cm_persist %s %ld %lu %d %ld > /dev/null 2>&1", + DEFAULT_CMD_TIME_OUT, g_cmsArbitrateDiskHandler.scsiDev, g_cmsArbitrateDiskHandler.instId, + g_cmsArbitrateDiskHandler.offset, (int)CMD_FORCE_LOCK, sdArbitrateData->lockTime); + securec_check_intval(rc, (void)rc); + lockRst = ExecuteLockCmd(forceLockCmd); + g_dbRole = ((lockRst == 0) ? DDB_ROLE_LEADER : DDB_ROLE_FOLLOWER); + write_runlog(LOG, "CmNormalArbitrate: exe force lock cmd %s result %d, curForceLockTime %u.\n", + forceLockCmd, lockRst, curForceLockTimeOut); + } + } + NotifyDdbRole(&sdArbitrateData->lastDdbRole); + return; + } + + // get lock time failed + sdArbitrateData->lockTime = 0; + sdArbitrateData->lockNotRefreshTimes = 0; + g_dbRole = CheckDemoteDdbRole(sdArbitrateData) ? DDB_ROLE_FOLLOWER : DDB_ROLE_LEADER; + NotifyDdbRole(&sdArbitrateData->lastDdbRole); +} + +static bool CheckResetTime() +{ + struct timespec checkEnd = {0, 0}; + (void)clock_gettime(CLOCK_MONOTONIC, &checkEnd); + int64 diffSeconds = checkEnd.tv_sec - g_notifyBeginSec; + if (diffSeconds <= g_waitForTime) { + write_runlog(DEBUG1, + "CheckResetTime: current time %ld, g_notifyBeginSec %ld, g_waitTime %ld, cannot reset g_notifySd.\n", + (long int)checkEnd.tv_sec, g_notifyBeginSec, g_waitForTime); + return false; + } + + return true; +} + +static bool CheckSdDemote(SdArbitrateData *sdArbitrateData) +{ + if (g_notifySd != DDB_ROLE_FOLLOWER) { + return false; + } + + if (g_dbRole == DDB_ROLE_LEADER) { + g_dbRole = DDB_ROLE_FOLLOWER; + NotifyDdbRole(&sdArbitrateData->lastDdbRole); + return true; + } + + if (!CheckResetTime()) { + return true; + } + + write_runlog(LOG, + "CheckSdDemote: will reset g_notifySd from %u to %u after wait %ld time.\n", + (uint32)g_notifySd, + (uint32)DDB_ROLE_UNKNOWN, + g_waitForTime); + g_notifySd = DDB_ROLE_UNKNOWN; + return false; +} + +static status_t ExePromoteCmd(const char *lockCmd, SdArbitrateData *sdArbitrateData) +{ + int st = ExecuteLockCmd(lockCmd); + if (st != 0) { + write_runlog(WARNING, "ExePromoteCmd: Execute get lock cmd %s failed, lockResult %d!\n", lockCmd, st); + if (st >= BASE_VALID_LOCK_TIME && st <= MAX_VALID_LOCK_TIME) { + char forceLockCmd[MAX_PATH_LEN] = {0}; + errno_t rc = snprintf_s(forceLockCmd, + MAX_PATH_LEN, + MAX_PATH_LEN - 1, + "timeout -s SIGKILL %us cm_persist %s %ld %lu %d %ld > /dev/null 2>&1", + DEFAULT_CMD_TIME_OUT, + g_cmsArbitrateDiskHandler.scsiDev, + g_cmsArbitrateDiskHandler.instId, + g_cmsArbitrateDiskHandler.offset, + (int)CMD_FORCE_LOCK, + st); + securec_check_intval(rc, (void)rc); + st = ExecuteLockCmd(forceLockCmd); + if (st != 0) { + write_runlog(WARNING, "ExePromoteCmd: Execute force lock cmd %s failed, result %d!\n", lockCmd, st); + return CM_ERROR; + } + return CM_SUCCESS; + } + + return CM_ERROR; + } + + sdArbitrateData->lockTime = 0; + sdArbitrateData->lockNotRefreshTimes = 0; + sdArbitrateData->lockFailBeginTime = 0; + write_runlog(DEBUG1, "ExePromoteCmd: Execute get lock cmd %s success!\n", lockCmd); + return CM_SUCCESS; +} + +static bool CheckSdPromote(const char *lockCmd, SdArbitrateData *sdArbitrateData) +{ + if (g_notifySd != DDB_ROLE_LEADER) { + return false; + } + + if (g_dbRole != DDB_ROLE_LEADER) { + (void)ExePromoteCmd(lockCmd, sdArbitrateData); + g_dbRole = DDB_ROLE_LEADER; + NotifyDdbRole(&sdArbitrateData->lastDdbRole); + return true; + } + + (void)ExePromoteCmd(lockCmd, sdArbitrateData); + + if (!CheckResetTime()) { + return true; + } + + write_runlog(LOG, + "CheckSdPromote: will reset g_notifySd from %u to %u after wait %ld seconds.\n", + (uint32)g_notifySd, + (uint32)DDB_ROLE_UNKNOWN, + g_waitForTime); + g_notifySd = DDB_ROLE_UNKNOWN; + return false; +} + +static bool HaveNotifySd(const char *lockCmd, SdArbitrateData *sdArbitrateData) +{ + bool res = false; + (void)pthread_rwlock_wrlock(&g_notifySdLock); + if (g_notifySd == DDB_ROLE_FOLLOWER) { + res = CheckSdDemote(sdArbitrateData); + } else if (g_notifySd == DDB_ROLE_LEADER) { + res = CheckSdPromote(lockCmd, sdArbitrateData); + } + (void)pthread_rwlock_unlock(&g_notifySdLock); + return res; +} + static void *GetShareDiskLockMain(void *arg) { thread_name = "GetShareDiskLockMain"; write_runlog(LOG, "Starting get share disk lock thread.\n"); - char cmd[MAX_PATH_LEN] = {0}; - errno_t rc = snprintf_s(cmd, + char lockCmd[MAX_PATH_LEN] = {0}; + errno_t rc = snprintf_s(lockCmd, MAX_PATH_LEN, MAX_PATH_LEN - 1, - "cm_persist %s %ld %lu > /dev/null 2>&1", + "timeout -s SIGKILL %us cm_persist %s %ld %lu %d > /dev/null 2>&1", + DEFAULT_CMD_TIME_OUT, g_cmsArbitrateDiskHandler.scsiDev, g_cmsArbitrateDiskHandler.instId, - g_cmsArbitrateDiskHandler.offset); + g_cmsArbitrateDiskHandler.offset, + (int)CMD_LOCK); securec_check_intval(rc, (void)rc); - int st; - DDB_ROLE lastDdbRole = DDB_ROLE_UNKNOWN; + + SdArbitrateData sdArbitrateData; + rc = memset_s(&sdArbitrateData, sizeof(SdArbitrateData), 0, sizeof(SdArbitrateData)); + securec_check_errno(rc, (void)rc); + struct timespec checkBegin = {0, 0}; + struct timespec checkEnd = {0, 0}; + uint32 twoSec = 2; for (;;) { - if (g_cmServerNum > ONE_PRIMARY_ONE_STANDBY) { - (void)pthread_rwlock_wrlock(&g_notifySdLock); - if (g_notifySd == DDB_ROLE_FOLLOWER && g_waitForChangeTime > 0) { - g_dbRole = DDB_ROLE_FOLLOWER; - NotifyDdbRole(&lastDdbRole); - g_waitForChangeTime--; - (void)pthread_rwlock_unlock(&g_notifySdLock); - write_runlog(DEBUG1, - "GetShareDiskLockMain: current ddbRole is %d, g_waitForChangeTime is %ld.\n", - (int)g_dbRole, - g_waitForChangeTime); - (void)sleep(1); - continue; + (void)clock_gettime(CLOCK_MONOTONIC, &checkBegin); + if (!HaveNotifySd(lockCmd, &sdArbitrateData)) { + if (sdArbitrateData.lockFailBeginTime == 0) { + sdArbitrateData.lockFailBeginTime = checkBegin.tv_sec; } - - g_notifySd = DDB_ROLE_UNKNOWN; - (void)pthread_rwlock_unlock(&g_notifySdLock); + CmNormalArbitrate(lockCmd, &sdArbitrateData); + } + (void)clock_gettime(CLOCK_MONOTONIC, &checkEnd); + uint32 second = (uint32)(checkEnd.tv_sec - checkBegin.tv_sec); + int64 nanosecond = checkEnd.tv_nsec - checkBegin.tv_nsec; + if (second > twoSec) { + write_runlog( + LOG, "it takes %u seconds %ld nanoseconds to cmserver share disk arbitrate.\n", second, nanosecond); + } else { + (void)sleep(1); } - - st = system(cmd); - g_dbRole = (st == 0) ? DDB_ROLE_LEADER : DDB_ROLE_FOLLOWER; - NotifyDdbRole(&lastDdbRole); - - (void)sleep(1); } return NULL; } @@ -310,6 +568,7 @@ static status_t CreateShareDiskThread(const DrvApiInfo *apiInfo) write_runlog(ERROR, "Failed to start get share disk lock thread.\n"); return CM_ERROR; } + g_arbiCon = apiInfo->cmsArbiCon; pthread_t thrId; int32 res = pthread_create(&thrId, NULL, GetShareDiskLockMain, NULL); if (res != 0) { @@ -350,8 +609,8 @@ static status_t SdLoadApi(const DrvApiInfo *apiInfo) if (st != CM_SUCCESS) { return st; } - st = CreateShareDiskThread(apiInfo); - return st; + + return CreateShareDiskThread(apiInfo); } DdbDriver *DrvSdGet(void) diff --git a/src/cm_adapter/cm_sharediskapi/cm_voting_disk.cpp b/src/cm_adapter/cm_sharediskapi/cm_voting_disk.cpp index 8af6273..d3f8545 100644 --- a/src/cm_adapter/cm_sharediskapi/cm_voting_disk.cpp +++ b/src/cm_adapter/cm_sharediskapi/cm_voting_disk.cpp @@ -189,10 +189,10 @@ status_t InitVotingDisk(const char *votingDiskPath) return CM_SUCCESS; } -VotingDiskStatus GetNodeHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout) +VotingDiskStatus GetNodeHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout, int logLevel) { if (nodeIndex >= VOTING_DISK_MAX_NODE_NUM) { - write_runlog(ERROR, "[%s] node index %u exceeds max node number of voting disk .\n", __FUNCTION__, nodeIndex); + write_runlog(logLevel, "[%s] node index %u exceeds max node of voting disk .\n", __FUNCTION__, nodeIndex); return VOTING_DISK_STATUS_UNAVAIL; } if (diskTimeout == 0) { @@ -211,7 +211,7 @@ VotingDiskStatus GetNodeHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout) time_t curTime = time(NULL); if (IsRhbTimeout(g_heartbeat[nodeIndex], curTime, (int)diskTimeout)) { - write_runlog(ERROR, "[%s] nodeIndex %u heartbeat timeout, diskTimeout=%u, nodeTime: %s\n", + write_runlog(logLevel, "[%s] nodeIndex %u heartbeat timeout, diskTimeout=%u, nodeTime: %s\n", __FUNCTION__, nodeIndex, diskTimeout, timeBuf); return VOTING_DISK_STATUS_UNAVAIL; } diff --git a/src/cm_agent/client_adpts/libpq/cma_datanode.cpp b/src/cm_agent/client_adpts/libpq/cma_datanode.cpp index 9b6f083..403db20 100644 --- a/src/cm_agent/client_adpts/libpq/cma_datanode.cpp +++ b/src/cm_agent/client_adpts/libpq/cma_datanode.cpp @@ -602,6 +602,20 @@ int CheckDatanodeStatus(const char *dataDir, int *role) return 0; } +static bool IsConnBadButNotPhonyDead(const char *errMsg, int conResult) +{ + if (strstr(errMsg, "too many clients already")) { + write_runlog(LOG, "need to change conn pool number, conn result is %d.\n", conResult); + return true; + } + if (strstr(errMsg, "failed to request snapshot")) { + write_runlog(LOG, "failed to request snapshot, not phony dead, conn result is %d.\n", conResult); + return true; + } + + return false; +} + int CheckDnStausPhonyDead(int dnId, int agentCheckTimeInterval) { int agentConnectDb = 5; @@ -624,8 +638,7 @@ int CheckDnStausPhonyDead(int dnId, int agentCheckTimeInterval) if (!IsConnOk(tmpDNConn)) { write_runlog(ERROR, "get connect failed for dn(%s) phony dead check, errmsg is %s\n", pidPath, ErrorMessage(tmpDNConn)); - if (strstr(ErrorMessage(tmpDNConn), "too many clients already")) { - write_runlog(LOG, "need to change conn pool number, conn result is %d.\n", Status(tmpDNConn)); + if (IsConnBadButNotPhonyDead(ErrorMessage(tmpDNConn), Status(tmpDNConn))) { close_and_reset_connection(tmpDNConn); return 0; } diff --git a/src/cm_agent/cma_instance_management_res.cpp b/src/cm_agent/cma_instance_management_res.cpp index 2127e6b..0ae3b04 100644 --- a/src/cm_agent/cma_instance_management_res.cpp +++ b/src/cm_agent/cma_instance_management_res.cpp @@ -57,7 +57,7 @@ int SystemExecute(const char *scriptPath, const char *oper, uint32 timeout) return -1; } -void StartOneResInst(const CmResConfList *conf) +status_t StartOneResInst(const CmResConfList *conf) { char oper[MAX_OPTION_LEN] = {0}; int ret = snprintf_s(oper, MAX_OPTION_LEN, MAX_OPTION_LEN - 1, "-start %u %s", conf->resInstanceId, conf->arg); @@ -66,9 +66,15 @@ void StartOneResInst(const CmResConfList *conf) ret = SystemExecute(conf->script, oper, (uint32)conf->checkInfo.timeOut); if (ret == 0) { write_runlog(LOG, "StartOneResInst: run start script (%s %s) successfully.\n", conf->script, oper); + } else if (ret == CUS_RES_START_FAIL_DEPEND_NOT_ALIVE) { + write_runlog(LOG, "StartOneResInst: res(%s) inst(%u) can't do restart, cause depend resource inst not alive.\n", + conf->resName, conf->cmInstanceId); + return CM_ERROR; } else { write_runlog(ERROR, "StartOneResInst: run start script (%s %s) failed, ret=%d.\n", conf->script, oper, ret); } + + return CM_SUCCESS; } void StopOneResInst(const CmResConfList *conf) @@ -93,6 +99,13 @@ void OneResInstShutdown(const CmResConfList *oneResConf) } } +void OneResInstClean(const CmResConfList *oneResConf) +{ + if (CheckOneResInst(oneResConf) != CUS_RES_CHECK_STAT_OFFLINE) { + (void)CleanOneResInst(oneResConf); + } +} + status_t RegOneResInst(const CmResConfList *conf, uint32 destInstId) { char oper[MAX_OPTION_LEN] = {0}; @@ -171,10 +184,20 @@ status_t CleanOneResInst(const CmResConfList *conf) return CM_SUCCESS; } +static inline void CleanOneInstCheckCount(CmResConfList *resConf) +{ + if (resConf->checkInfo.startCount != 0) { + write_runlog(LOG, "res(%s) inst(%u) restart times will clean.\n", resConf->resName, resConf->cmInstanceId); + } + resConf->checkInfo.startCount = 0; + resConf->checkInfo.startTime = 0; + resConf->checkInfo.brokeTime = 0; +} + void StopAllResInst() { for (uint32 i = 0; i < GetLocalResConfCount(); ++i) { - (void)CleanOneResInst(&g_resConf[i]); + OneResInstClean(&g_resConf[i]); } } @@ -214,8 +237,7 @@ static void ManualStopLocalResInst(CmResConfList *conf) write_runlog(ERROR, "manual stop res(%s) inst(%u) failed, ret=%d.\n", conf->resName, conf->resInstanceId, ret); } else { write_runlog(LOG, "manual stop res(%s) inst(%u) success.\n", conf->resName, conf->resInstanceId); - conf->checkInfo.startCount = 0; - conf->checkInfo.startTime = 0; + CleanOneInstCheckCount(conf); } } @@ -231,29 +253,29 @@ bool IsInstManualStopped(uint32 instId) return false; } -static bool CanCusInstDoStart(const CmResConfList *conf) +static bool CanCusInstDoRestart(const CmResConfList *conf) { ResIsregStatus stat = IsregOneResInst(conf, conf->resInstanceId); if ((stat == CM_RES_ISREG_REG) || (stat == CM_RES_ISREG_NOT_SUPPORT)) { return true; } - write_runlog(LOG, "cur inst(%u) isreg stat=(%u), can't do start.\n", conf->cmInstanceId, (uint32)stat); + write_runlog(LOG, "cur inst(%u) isreg stat=(%u), can't do restart.\n", conf->cmInstanceId, (uint32)stat); return false; } -static inline void RestartOneResInst(CmResConfList *conf) +static inline status_t RestartOneResInst(CmResConfList *conf) { - write_runlog(LOG, "res(%s) inst(%u) need restart.\n", conf->resName, conf->cmInstanceId); (void)CleanOneResInst(conf); - if (CanCusInstDoStart(conf)) { - StartOneResInst(conf); - } + CM_RETURN_IFERR(StartOneResInst(conf)); + return CM_SUCCESS; } static void ProcessOfflineInstance(CmResConfList *conf) { if (conf->checkInfo.restartTimes == -1) { - RestartOneResInst(conf); + if (CanCusInstDoRestart(conf)) { + (void)RestartOneResInst(conf); + } return; } if (conf->checkInfo.brokeTime == 0) { @@ -261,7 +283,9 @@ static void ProcessOfflineInstance(CmResConfList *conf) return; } if (conf->checkInfo.startCount >= conf->checkInfo.restartTimes) { - write_runlog(LOG, "[CLIENT] res(%s) inst(%u) get out from cluster.\n", conf->resName, conf->resInstanceId); + write_runlog(LOG, "res(%s) inst(%u) is offline, but restart times (%d) >= limit (%d), can't do restart again, " + "will do manually stop.\n", conf->resName, conf->resInstanceId, conf->checkInfo.startCount, + conf->checkInfo.restartTimes); ManualStopLocalResInst(conf); return; } @@ -275,18 +299,14 @@ static void ProcessOfflineInstance(CmResConfList *conf) conf->resName, conf->resInstanceId, conf->checkInfo.startTime, conf->checkInfo.restartPeriod); return; } - RestartOneResInst(conf); + if (!CanCusInstDoRestart(conf)) { + return; + } + CM_RETVOID_IFERR(RestartOneResInst(conf)); conf->checkInfo.startCount++; conf->checkInfo.startTime = time(NULL); - write_runlog(DEBUG1, "[CLIENT] res(%s) inst(%u) startCount=%d, startTime=%ld.\n", - conf->resName, conf->resInstanceId, conf->checkInfo.startCount, conf->checkInfo.startTime); -} - -static inline void CleanOneInstCheckCount(CmResConfList *resConf) -{ - resConf->checkInfo.startCount = 0; - resConf->checkInfo.startTime = 0; - resConf->checkInfo.brokeTime = 0; + write_runlog(LOG, "res(%s) inst(%u) has been restart (%d) times, restart more than (%d) time will manually stop.\n", + conf->resName, conf->cmInstanceId, conf->checkInfo.startCount, conf->checkInfo.restartTimes); } static inline bool NeedStopResInst(const char *resName, uint32 cmInstId) @@ -353,7 +373,7 @@ void StopResourceCheck() OneResInstShutdown(&g_resConf[i]); } if (CmFileExist(g_cmManualStartPath) || !IsOneResInstWork(g_resConf[i].resName, g_resConf[i].cmInstanceId)) { - (void)CleanOneResInst(&g_resConf[i]); + OneResInstClean(&g_resConf[i]); } } } diff --git a/src/cm_agent/cma_process_messages_client.cpp b/src/cm_agent/cma_process_messages_client.cpp index 3631231..914c380 100644 --- a/src/cm_agent/cma_process_messages_client.cpp +++ b/src/cm_agent/cma_process_messages_client.cpp @@ -368,7 +368,7 @@ static void ProcessRegResInst(const CmsNotifyAgentRegMsg *recvMsg) write_runlog(LOG, "local res inst[%s:%u] has been reg.\n", recvMsg->resName, recvMsg->resInstId); } else if ((isreg == CM_RES_ISREG_UNREG) || (isreg == CM_RES_ISREG_PENDING) || (isreg == CM_RES_ISREG_UNKNOWN)) { write_runlog(LOG, "before reg res inst, need clean res inst first.\n"); - if (CleanOneResInst(local) == CM_SUCCESS) { + if ((CheckOneResInst(local) == CUS_RES_CHECK_STAT_OFFLINE) || (CleanOneResInst(local) == CM_SUCCESS)) { (void)RegOneResInst(local, recvMsg->resInstId); } } else if (isreg == CM_RES_ISREG_NOT_SUPPORT) { diff --git a/src/cm_client/cm_client.cpp b/src/cm_client/cm_client.cpp index 06036cb..e37f1b5 100644 --- a/src/cm_client/cm_client.cpp +++ b/src/cm_client/cm_client.cpp @@ -242,7 +242,7 @@ void *ConnectAgentMain(void *arg) ConnectCreate(g_agentConnect); if (g_agentConnect->isClosed) { write_runlog(ERROR, "cm_client connect to cm_agent failed, retry.\n"); - CmUsleep(CLIENT_CHECK_CONN_INTERVAL); + (void)usleep(CLIENT_CHECK_CONN_INTERVAL); continue; } g_needReconnect = false; @@ -273,7 +273,7 @@ void *ConnectAgentMain(void *arg) continue; } } - CmUsleep(CLIENT_CHECK_CONN_INTERVAL); + (void)usleep(CLIENT_CHECK_CONN_INTERVAL); } return NULL; @@ -325,7 +325,7 @@ void SendOneMsgToAgent() if (CmClientSendMsg(msgPkg.msgPtr, msgPkg.msgLen) != CM_SUCCESS) { write_runlog(ERROR, "client send msg to agent failed!\n"); g_needReconnect = true; - CmUsleep(CLIENT_CHECK_CONN_INTERVAL); + (void)usleep(CLIENT_CHECK_CONN_INTERVAL); } free(msgPkg.msgPtr); } @@ -341,7 +341,7 @@ void *SendMsgToAgentMain(void *arg) break; } if (g_agentConnect->isClosed) { - CmUsleep(CLIENT_SEND_CHECK_INTERVAL); + (void)usleep(CLIENT_SEND_CHECK_INTERVAL); continue; } @@ -489,12 +489,12 @@ void *RecvMsgFromAgentMain(void *arg) break; } if (g_agentConnect->isClosed) { - CmUsleep(CLIENT_CHECK_CONN_INTERVAL); + (void)usleep(CLIENT_CHECK_CONN_INTERVAL); continue; } if (RecvMsgFromAgent() != CM_SUCCESS) { g_needReconnect = true; - CmUsleep(CLIENT_CHECK_CONN_INTERVAL); + (void)usleep(CLIENT_CHECK_CONN_INTERVAL); } } diff --git a/src/cm_common/cm_elog.cpp b/src/cm_common/cm_elog.cpp index 0a6243f..d82a119 100644 --- a/src/cm_common/cm_elog.cpp +++ b/src/cm_common/cm_elog.cpp @@ -1952,6 +1952,7 @@ void CmKeyEventInit(void) g_cmKeyEventType[KEY_EVENT_RECOVER] = "KEY_EVENT_RECOVER"; g_cmKeyEventType[KEY_EVENT_REFRESH_OBS_DELETE_TEXT] = "KEY_EVENT_REFRESH_OBS_DELETE_TEXT"; g_cmKeyEventType[KEY_EVENT_DROP_CN_OBS_XLOG] = "KEY_EVENT_DROP_CN_OBS_XLOG"; + g_cmKeyEventType[KEY_EVENT_RES_ARBITRATE] = "KEY_EVENT_RES_ARBITRATE"; } void CreateKeyEventLogFile(const char *sysLogPath) diff --git a/src/cm_common/cm_util.cpp b/src/cm_common/cm_util.cpp index 95cb2b0..a4b800a 100644 --- a/src/cm_common/cm_util.cpp +++ b/src/cm_common/cm_util.cpp @@ -89,69 +89,65 @@ char *gs_getenv_r(const char *name) uint64 GetMonotonicTimeMs() { - static const uint32 CM_NSEC_COUNT_PER_MS = 1000000; - static const uint32 CM_MS_COUNT_PER_SEC = 1000; - struct timespec ts; (void)clock_gettime(CLOCK_MONOTONIC, &ts); return (uint64)ts.tv_sec * CM_MS_COUNT_PER_SEC + (uint64)ts.tv_nsec / CM_NSEC_COUNT_PER_MS; } -void CMPrioMutexInit(CMPrioMutex &mutex) +void CMFairMutexInit(CMFairMutex &mutex) { (void)pthread_mutex_init(&mutex.lock, NULL); (void)pthread_mutex_init(&mutex.innerLock, NULL); (void)pthread_cond_init(&mutex.cond, NULL); - mutex.highPrioCount = 0; - mutex.curPrio = CMMutexPrio::CM_MUTEX_PRIO_NONE; + mutex.readerCount = 0; + mutex.writerCount = 0; + mutex.curType = CMFairMutexType::CM_MUTEX_NODE; } - -int CMPrioMutexLock(CMPrioMutex &mutex, CMMutexPrio prio) + +int CMFairMutexLock(CMFairMutex &mutex, CMFairMutexType type) { - if (prio == CMMutexPrio::CM_MUTEX_PRIO_HIGH) { - (void)pthread_mutex_lock(&mutex.innerLock); - mutex.highPrioCount++; - (void)pthread_mutex_unlock(&mutex.innerLock); - int ret = pthread_mutex_lock(&mutex.lock); - if (ret == 0) { - mutex.curPrio = CMMutexPrio::CM_MUTEX_PRIO_HIGH; - } else { - (void)pthread_mutex_lock(&mutex.innerLock); - mutex.highPrioCount--; - (void)pthread_mutex_unlock(&mutex.innerLock); - (void)pthread_cond_broadcast(&mutex.cond); - } - - return ret; + struct timespec ts; + uint32* count1 = NULL; + uint32* count2 = NULL; + const int LOCK_WAIT_TIME = 2; + + if (type == CMFairMutexType::CM_MUTEX_READ) { + count1 = &mutex.readerCount; + count2 = &mutex.writerCount; + } else { + count1 = &mutex.writerCount; + count2 = &mutex.readerCount; } - + + (void)pthread_mutex_lock(&mutex.innerLock); + (*count1)++; + while (true) { - if (mutex.highPrioCount == 0) { + if (type != mutex.curType || *count2 == 0) { int ret = pthread_mutex_trylock(&mutex.lock); if (ret == 0) { - mutex.curPrio = CMMutexPrio::CM_MUTEX_PRIO_NORMAL; - return 0; - } else if (ret != EBUSY) { - return ret; + mutex.curType = type; + (*count1)--; + break; } } - - (void)pthread_mutex_lock(&mutex.innerLock); - (void)pthread_cond_wait(&mutex.cond, &mutex.innerLock); - (void)pthread_mutex_unlock(&mutex.innerLock); + + (void)clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += LOCK_WAIT_TIME; + + (void)pthread_cond_timedwait(&mutex.cond, &mutex.innerLock, &ts); } + + (void)pthread_mutex_unlock(&mutex.innerLock); + + return 0; } - -void CMPrioMutexUnLock(CMPrioMutex &mutex) + +void CMFairMutexUnLock(CMFairMutex &mutex) { - if (mutex.curPrio == CMMutexPrio::CM_MUTEX_PRIO_HIGH) { - (void)pthread_mutex_lock(&mutex.innerLock); - mutex.highPrioCount--; - (void)pthread_mutex_unlock(&mutex.innerLock); - } - - mutex.curPrio = CMMutexPrio::CM_MUTEX_PRIO_NONE; + (void)pthread_mutex_lock(&mutex.innerLock); (void)pthread_mutex_unlock(&mutex.lock); + (void)pthread_mutex_unlock(&mutex.innerLock); (void)pthread_cond_broadcast(&mutex.cond); } diff --git a/src/cm_ctl/ctl_show.cpp b/src/cm_ctl/ctl_show.cpp index 96c9108..7e389c1 100644 --- a/src/cm_ctl/ctl_show.cpp +++ b/src/cm_ctl/ctl_show.cpp @@ -82,6 +82,7 @@ void HandleRhbAck(CmRhbStatAck *ack) (void)printf("Network stat('Y' means connected, otherwise 'N'):\n"); char *rs = GetRhbSimple((time_t *)ack->hbs, MAX_RHB_NUM, ack->hwl, ack->baseTime, ack->timeout); + CM_RETURN_IF_NULL(rs); (void)printf("%s\n", rs); free(rs); } diff --git a/src/cm_ctl/ctl_switchover.cpp b/src/cm_ctl/ctl_switchover.cpp index f46e571..f223747 100644 --- a/src/cm_ctl/ctl_switchover.cpp +++ b/src/cm_ctl/ctl_switchover.cpp @@ -460,7 +460,6 @@ static int BalanceResultReq(int &timePass, bool waitBalance, int &sendCheckCount write_runlog(ERROR, "switchover command timeout!\n\n" "HINT: Maybe the switchover action is continually running in the background.\n" - "You can wait for a while and check the status of current cluster using " "\"cm_ctl query -Cv\".\n"); CMPQfinish(CmServer_conn); CmServer_conn = NULL; diff --git a/src/cm_persist/cm_persist.cpp b/src/cm_persist/cm_persist.cpp index 0eab90c..a1e20bf 100644 --- a/src/cm_persist/cm_persist.cpp +++ b/src/cm_persist/cm_persist.cpp @@ -20,29 +20,59 @@ * * ------------------------------------------------------------------------- */ -#include "share_disk_lock_api.h" - #include #include #include #include #include +#include "share_disk_lock_api.h" - -static const int ARGS_NUM = 4; +static const int ARGS_FIVE_NUM = 5; +static const int ARGS_SIX_NUM = 6; +static const int CMDLOCKTIME_NO = 5; +static const int CMDTYPE_NO = 4; static const int OFFSET_NO = 3; static const int INSTANCEID_NO = 2; static const int DEVICE_NO = 1; static const int DECIMAL_BASE = 10; + +int ExeLockCmd(diskLrwHandler *handler) +{ + status_t ret = CmDiskLockS(&handler->headerLock, handler->scsiDev, handler->fd); + if (ret == CM_SUCCESS) { + return 0; + } + time_t lockTime = LOCKR_LOCK_TIME(handler->headerLock); + if (lockTime <= 0) { + return -1; + } + + // system function execute lock cmd result range is [0, 127], 127 and 126 maybe system command failed result + // so get lock time valid range is [1, 125] + // 0:get lock success;-1:get lock failed and get lock time failed;[1,125]:get lock failed but get lock time success + return (int)lockTime; +} +int ExeForceLockCmd(diskLrwHandler *handler, int64 lockTime) +{ + return (int)CmDiskLockf(&handler->headerLock, handler->fd, lockTime); +} + +typedef enum en_persist_cmd_type { + CMD_LOCK = 0, + CMD_FORCE_LOCK = 1, +} PERSIST_CMD_TYPE; + static void usage() { (void)printf(_("cm_persist: get disk lock for the shared storage.\n\n")); (void)printf(_("Usage:\n")); - (void)printf(_(" cm_persist [DEVICEPATH] [INSTANCE_ID] [OFFSET]\n")); + (void)printf(_(" cm_persist [DEVICEPATH] [INSTANCE_ID] [OFFSET] [CMD_TYPE] [LOCK_TIME]\n")); (void)printf(_("[DEVICEPATH]: the path of the shared storage\n")); (void)printf(_("[INSTANCE_ID]: the instanceid of the process\n")); (void)printf(_("[OFFSET]: get disk lock on storage position\n")); + (void)printf(_("[CMD_TYPE]: cm_persist command type\n")); + (void)printf(_("[LOCK_TIME]: lock time only used when CMD_TYPE is 1\n")); (void)printf(_("-?, --help show this help, then exit\n")); } @@ -60,6 +90,33 @@ static status_t GetIntValue(char *input, int64 *value) return CM_SUCCESS; } +static int ExePersistCmd(diskLrwHandler *cmsArbitrateDiskHandler, int64 cmdType, int argc, char **argv) +{ + int ret = -1; + switch (cmdType) { + case CMD_LOCK: + if (argc != ARGS_FIVE_NUM) { + break; + } + ret = ExeLockCmd(cmsArbitrateDiskHandler); + break; + case CMD_FORCE_LOCK: + if (argc != ARGS_SIX_NUM) { + break; + } + int64 lockTime; + if (GetIntValue(argv[CMDLOCKTIME_NO], &lockTime) != CM_SUCCESS) { + break; + } + ret = ExeForceLockCmd(cmsArbitrateDiskHandler, lockTime); + break; + default: + break; + } + + return ret; +} + int main(int argc, char **argv) { if (argc > 1) { @@ -69,35 +126,39 @@ int main(int argc, char **argv) } } - if (argc != ARGS_NUM) { + if (argc < ARGS_FIVE_NUM) { (void)printf(_("the num(%d) of parameters input is invalid.\n\n"), argc); usage(); - return 1; + return -1; } int64 instanceId; int64 offset; + int64 cmdType; if (GetIntValue(argv[INSTANCEID_NO], &instanceId) != CM_SUCCESS) { - return 1; + return -1; } if (GetIntValue(argv[OFFSET_NO], &offset) != CM_SUCCESS) { - return 1; + return -1; + } + if (GetIntValue(argv[CMDTYPE_NO], &cmdType) != CM_SUCCESS) { + return -1; } diskLrwHandler cmsArbitrateDiskHandler; if (InitDiskLockHandle(&cmsArbitrateDiskHandler, argv[DEVICE_NO], (uint32)offset, instanceId) != CM_SUCCESS) { - return 1; + return -1; } - status_t ret = ShareDiskGetDlock(&cmsArbitrateDiskHandler); + int ret = ExePersistCmd(&cmsArbitrateDiskHandler, cmdType, argc, argv); (void)close(cmsArbitrateDiskHandler.fd); FREE_AND_RESET(cmsArbitrateDiskHandler.headerLock.buff); - if (ret != CM_SUCCESS) { + if (ret != (int)CM_SUCCESS) { (void)printf(_("Failed to get disk lock.\n\n")); - return 1; + return ret; } - + (void)printf(_("Success to get disk lock.\n\n")); - return 0; + return ret; } diff --git a/src/cm_persist/share_disk_lock.cpp b/src/cm_persist/share_disk_lock.cpp index fad86c3..2235557 100644 --- a/src/cm_persist/share_disk_lock.cpp +++ b/src/cm_persist/share_disk_lock.cpp @@ -29,6 +29,16 @@ const int BLOCK_NUMS = 3; const uint32 LOCK_BLOCK_NUMS = 2; +const time_t MAX_VALID_LOCK_TIME = 125; +const time_t BASE_VALID_LOCK_TIME = 1; + +time_t CalcLockTime(time_t lockTime) +{ + // system function execute lock cmd result range is [0, 127], 127 and 126 maybe system command failed result + // so get lock time valid range is [1, 125] + // 0:get lock success;-1:get lock failed and get lock time failed;[1,125]:get lock failed but get lock time success + return lockTime % MAX_VALID_LOCK_TIME + BASE_VALID_LOCK_TIME; +} status_t CmAllocDlock(dlock_t *lock, uint64 lockAddr, int64 instId) { @@ -141,28 +151,6 @@ status_t CmDiskLockS(dlock_t *lock, const char *scsiDev, int32 fd) return CM_SUCCESS; } -status_t CmDiskLockfS(dlock_t *lock, const char *scsiDev) -{ - if (lock == NULL|| scsiDev == NULL) { - return CM_ERROR; - } - - int32 fd = open(scsiDev, O_RDWR | O_DIRECT | O_SYNC); - if (fd < 0) { - (void)printf(_("CmDiskLockfS Open dev %s failed, errno %d.\n"), scsiDev, errno); - return CM_ERROR; - } - - status_t ret = CmDiskLockf(lock, fd); - if (ret != CM_SUCCESS) { - (void)close(fd); - return ret; - } - - (void)close(fd); - return CM_SUCCESS; -} - int32 CmDiskLock(dlock_t *lock, int32 fd) { uint32 buffLen = LOCK_BLOCK_NUMS * CM_DEF_BLOCK_SIZE; @@ -172,7 +160,7 @@ int32 CmDiskLock(dlock_t *lock, int32 fd) } time_t t = time(NULL); - LOCKW_LOCK_TIME(*lock) = t; + LOCKW_LOCK_TIME(*lock) = CalcLockTime(t); LOCKW_LOCK_CREATE_TIME(*lock) = t; int32 ret = CmScsi3Caw(fd, lock->lockAddr / CM_DEF_BLOCK_SIZE, lock->lockr, buffLen); if (ret != (int)CM_SUCCESS) { @@ -223,7 +211,7 @@ int32 CmDiskLock(dlock_t *lock, int32 fd) return 0; } -status_t CmDiskLockf(dlock_t *lock, int32 fd) +status_t CmDiskLockf(dlock_t *lock, int32 fd, int64 lockTime) { if (lock == NULL || fd < 0) { return CM_ERROR; @@ -233,6 +221,7 @@ status_t CmDiskLockf(dlock_t *lock, int32 fd) (void)printf(_("Get lock info from dev failed, fd %d.\n"), fd); return CM_ERROR; } + LOCKR_LOCK_TIME(*lock) = lockTime; int32 ret = CmDiskLock(lock, fd); if (ret != (int)CM_SUCCESS) { return CM_ERROR; @@ -256,3 +245,4 @@ status_t CmGetDlockInfo(dlock_t *lock, int32 fd) } return CM_SUCCESS; } + diff --git a/src/cm_persist/share_disk_lock_api.cpp b/src/cm_persist/share_disk_lock_api.cpp index 16d443b..3a04158 100644 --- a/src/cm_persist/share_disk_lock_api.cpp +++ b/src/cm_persist/share_disk_lock_api.cpp @@ -30,10 +30,6 @@ #include "securec.h" #include "share_disk_lock_api.h" -const int LOCK_WAIT_INTERVAL = 10 * 1000; -const int FORCE_LOCK_TRY_TIMES = 3; -const int LOCK_WAIT_MAX_TIMTS = 500; - status_t ShareDiskHandlerInit(diskLrwHandler *handler) { if (CmAllocDlock(&handler->headerLock, handler->offset, handler->instId) != CM_SUCCESS) { @@ -65,46 +61,3 @@ status_t InitDiskLockHandle(diskLrwHandler *sdLrwHandler, const char *scsi_dev, return CM_SUCCESS; } -status_t ShareDiskGetDlock(diskLrwHandler *handler) -{ - status_t ret = CM_SUCCESS; - int32 times = 0; - int32 delayTimes = 0; - bool hasRefreshLockTime = false; - - time_t lockTime = 0; - do { - ret = CmDiskLockS(&handler->headerLock, handler->scsiDev, handler->fd); - if (ret == CM_SUCCESS) { - return CM_SUCCESS; - } - if (lockTime != LOCKR_LOCK_TIME(handler->headerLock)) { - lockTime = LOCKR_LOCK_TIME(handler->headerLock); - delayTimes = 0; - if (hasRefreshLockTime) { - (void)printf(_("Get lock failed for lock time has been refreshed by other process\n")); - return CM_ERROR; - } - hasRefreshLockTime = true; - } - - if (lockTime == LOCKR_LOCK_TIME(handler->headerLock)) { - if (delayTimes < LOCK_WAIT_MAX_TIMTS) { - (void)usleep(LOCK_WAIT_INTERVAL); - delayTimes++; - continue; - } - - ret = CmDiskLockfS(&handler->headerLock, handler->scsiDev); - if (ret != CM_SUCCESS) { - (void)printf(_("Get lock failed when force %d times\n"), ++times); - return CM_ERROR; - } - (void)printf(_("Get lock success when force %d times.\n"), ++times); - return CM_SUCCESS; - } - - (void)usleep(LOCK_WAIT_INTERVAL); - } while (1); -} - diff --git a/src/cm_server/cms_arbitrate_cluster.cpp b/src/cm_server/cms_arbitrate_cluster.cpp index 950ec05..72bec1b 100644 --- a/src/cm_server/cms_arbitrate_cluster.cpp +++ b/src/cm_server/cms_arbitrate_cluster.cpp @@ -265,14 +265,6 @@ static void ReleaseMaxNodeMemory() FreeDmsValue(); } -static inline char *GetDynamicMem(char *dynamicPtr, size_t *curSize, size_t memSize) -{ - size_t tmpCurSize = (*curSize); - char *tmp = dynamicPtr + tmpCurSize; - (*curSize) = tmpCurSize + memSize; - return tmp; -} - static status_t AllocNodeClusterMemory(NodeCluster *nodeCluster, int32 maxNodeNum) { size_t memSize = sizeof(uint32) * (uint32)(maxNodeNum); @@ -337,9 +329,9 @@ static bool CheckPoint2PointConn(int32 resIdx1, int32 resIdx2) return (connRes1 && connRes2); } -static MaxClusterResStatus GetDiskHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout) +static MaxClusterResStatus GetDiskHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout, int logLevel) { - VotingDiskStatus stat = GetNodeHeartbeatStat(nodeIndex, diskTimeout); + VotingDiskStatus stat = GetNodeHeartbeatStat(nodeIndex, diskTimeout, logLevel); if (stat == VOTING_DISK_STATUS_UNAVAIL) { return MAX_CLUSTER_STATUS_UNAVAIL; } else if (stat == VOTING_DISK_STATUS_AVAIL) { @@ -351,9 +343,9 @@ static MaxClusterResStatus GetDiskHeartbeatStat(uint32 nodeIndex, uint32 diskTim static bool IsAllResAvailInNode(int32 resIdx) { uint32 nodeIdx = g_clusterRes.map[resIdx].nodeIdx; - MaxClusterResStatus heartbeatStatus = GetDiskHeartbeatStat(nodeIdx, g_diskTimeout); + MaxClusterResStatus heartbeatStatus = GetDiskHeartbeatStat(nodeIdx, g_diskTimeout, DEBUG5); bool heartbeatRes = IsCurResAvail(resIdx, MAX_CLUSTER_TYPE_VOTE_DISK, heartbeatStatus); - MaxClusterResStatus nodeStatus = GetResNodeStat(g_node[nodeIdx].node); + MaxClusterResStatus nodeStatus = GetResNodeStat(g_node[nodeIdx].node, DEBUG5); bool nodeRes = IsCurResAvail(resIdx, MAX_CLUSTER_TYPE_RES_STATUS, nodeStatus); return (heartbeatRes && nodeRes); } @@ -429,16 +421,10 @@ static int32 FindNodeCluster(int32 startPoint, int32 maxNum, NodeCluster *nodeCl continue; } if (!CheckPoint2PointConn(startPoint, i)) { - write_runlog(LOG, "(index=%d,nodeId=%u) disconnect with (index=%d,nodeId=%u).\n", - startPoint, GetNodeByPoint(startPoint), i, GetNodeByPoint(i)); - PrintHbsInfo(startPoint, GetNodeByPoint(startPoint), i, GetNodeByPoint(i), LOG); continue; } for (j = 0; j < maxNum; ++j) { if (!CheckPoint2PointConn(i, nodeCluster->visNode[j])) { - write_runlog(LOG, "(index=%d,nodeId=%u) disconnect with (index=%d,nodeId=%u).\n", - startPoint, GetNodeByPoint(startPoint), i, GetNodeByPoint(i)); - PrintHbsInfo(startPoint, GetNodeByPoint(startPoint), i, GetNodeByPoint(i), LOG); break; } } @@ -492,7 +478,7 @@ static void PrintMaxNodeCluster(const MaxNodeCluster *maxNodeCluster, const char for (int32 i = 0; i < maxNodeCluster->nodeCluster.clusterNum; ++i) { StrcatNextNodeStr(clusterStr, MAX_PATH_LEN, maxNodeCluster->nodeCluster.cluster[i]); } - write_runlog(LOG, "%s the max node cluster: %s.\n", str, clusterStr); + write_runlog(logLevel, "%s the max node cluster: %s.\n", str, clusterStr); } static void GetClusterKeyInDdb(char *key, uint32 keyLen) @@ -835,7 +821,7 @@ static void CopyCur2LastMaxNodeCluster(MaxNodeCluster *lastCluster, MaxNodeClust lastCluster->version = curCluster->version; SetCurMaxNodeByLast(curCluster, lastCluster); (void)pthread_rwlock_unlock(&(lastCluster->lock)); - PrintMaxNodeCluster(lastCluster, "[CompareCurLastMaxNodeCluster]", FATAL); + PrintMaxNodeCluster(lastCluster, "[CompareCurLastMaxNodeCluster]", DEBUG1); } static void AddCurResInCurCluster(int32 resIdx, NodeCluster *curCluster) @@ -889,6 +875,85 @@ static bool8 CanArbitrateMaxCluster(const NodeCluster *lastCluster, NodeCluster return (bool8)(curCluster->clusterNum > lastCluster->clusterNum); } +static bool IsNodeInCluster(int32 resIdx, const MaxNodeCluster *nodeCluster) +{ + for (int32 i = 0; i < nodeCluster->nodeCluster.clusterNum; ++i) { + if (resIdx == nodeCluster->nodeCluster.cluster[i]) { + return true; + } + } + return false; +} + +static void PrintRhbStatus() +{ + uint32 hwl = 0; + time_t hbs[MAX_RHB_NUM][MAX_RHB_NUM] = {{0}}; + GetRhbStat(hbs, &hwl); + char *rhbStr = GetRhbSimple((time_t *)hbs, MAX_RHB_NUM, hwl, time(NULL), g_agentNetworkTimeout); + CM_RETURN_IF_NULL(rhbStr); + size_t rhbLen = strlen(rhbStr); + if (rhbLen >= MAX_LOG_BUFF_LEN) { + write_runlog(LOG, "rhbStr len(%lu) is exceed max log buff len(%d), can't print network stat.\n", + rhbLen, MAX_LOG_BUFF_LEN); + FREE_AND_RESET(rhbStr); + return; + } + write_runlog(LOG, "Network timeout:%u\n", g_agentNetworkTimeout); + write_runlog(LOG, "Network stat('Y' means connected, otherwise 'N'):\n%s\n", rhbStr); + FREE_AND_RESET(rhbStr); +} + +static void PrintKickOutResult(int32 resIdx, const MaxNodeCluster *maxCluster) +{ + uint32 nodeIdx = g_clusterRes.map[resIdx].nodeIdx; + + MaxClusterResStatus heartbeatStatus = GetDiskHeartbeatStat(nodeIdx, g_diskTimeout, LOG); + if (!IsCurResAvail(resIdx, MAX_CLUSTER_TYPE_VOTE_DISK, heartbeatStatus)) { + write_runlog(LOG, "kick out result: node(%u) disk heartbeat timeout.\n", g_node[nodeIdx].node); + return; + } + + MaxClusterResStatus nodeStatus = GetResNodeStat(g_node[nodeIdx].node, LOG); + if (!IsCurResAvail(resIdx, MAX_CLUSTER_TYPE_RES_STATUS, nodeStatus)) { + write_runlog(LOG, "kick out result: node(%u) res inst manual stop or report timeout.\n", g_node[nodeIdx].node); + return; + } + + for (int32 i = 0; i < maxCluster->nodeCluster.clusterNum; ++i) { + if (resIdx == maxCluster->nodeCluster.cluster[i]) { + continue; + } + if (!CheckPoint2PointConn(resIdx, maxCluster->nodeCluster.cluster[i])) { + write_runlog(LOG, "kick out result: (index=%d,nodeId=%u) disconnect with (index=%d,nodeId=%u).\n", + resIdx, GetNodeByPoint(resIdx), i, GetNodeByPoint(i)); + PrintHbsInfo(resIdx, GetNodeByPoint(resIdx), i, GetNodeByPoint(i), LOG); + continue; + } + } + PrintRhbStatus(); +} + +static void PrintArbitrateResult(const MaxNodeCluster *lastCluster, const MaxNodeCluster *curCluster) +{ + // kick out + for (int32 i = 0; i < lastCluster->nodeCluster.clusterNum; ++i) { + if (!IsNodeInCluster(lastCluster->nodeCluster.cluster[i], curCluster)) { + uint32 nodeIdx = g_clusterRes.map[lastCluster->nodeCluster.cluster[i]].nodeIdx; + WriteKeyEventLog(KEY_EVENT_RES_ARBITRATE, 0, "node(%u) kick out.", g_node[nodeIdx].node); + PrintKickOutResult(lastCluster->nodeCluster.cluster[i], lastCluster); + } + } + + // join in + for (int32 i = 0; i < curCluster->nodeCluster.clusterNum; ++i) { + if (!IsNodeInCluster(curCluster->nodeCluster.cluster[i], lastCluster)) { + uint32 nodeIdx = g_clusterRes.map[curCluster->nodeCluster.cluster[i]].nodeIdx; + WriteKeyEventLog(KEY_EVENT_RES_ARBITRATE, 0, "node(%u) join in cluster.", g_node[nodeIdx].node); + } + } +} + static void CompareCurLastMaxNodeCluster(MaxNodeCluster *lastCluster, MaxNodeCluster *curCluster) { if (curCluster->nodeCluster.clusterNum <= 0) { @@ -905,6 +970,7 @@ static void CompareCurLastMaxNodeCluster(MaxNodeCluster *lastCluster, MaxNodeClu } write_runlog(LOG, "last(%lu) is different from current(%lu), result is %d.\n", lastCluster->version, curCluster->version, result); + PrintArbitrateResult(lastCluster, curCluster); // wait for successfully setting cluster to ddb. status_t st = SetCurClusterToDdb(curCluster); if (st != CM_SUCCESS) { diff --git a/src/cm_server/cms_arbitrate_cms.cpp b/src/cm_server/cms_arbitrate_cms.cpp index 253d5d9..f9c4900 100644 --- a/src/cm_server/cms_arbitrate_cms.cpp +++ b/src/cm_server/cms_arbitrate_cms.cpp @@ -127,20 +127,6 @@ static void coordinator_notify_msg_reset(void) } } -static void IncrementTermIfCmRestart() -{ - if (!IsNeedSyncDdb()) { - return; - } - (void)pthread_rwlock_wrlock(&term_update_rwlock); - int incrementTermSesult = IncrementTermToDdb(); - (void)pthread_rwlock_unlock(&term_update_rwlock); - if (incrementTermSesult != 0) { - write_runlog(ERROR, "Incrtement term to ddb failed, %d:node(%u) cm_server role is %s, will to primary.\n", - __LINE__, g_currentNode->node, server_role_to_string(g_HA_status->local_role)); - } -} - static void clean_cn_heart_beat(int cmServerCurrentRole, int cm_server_last_role) { if ((cmServerCurrentRole != CM_SERVER_PRIMARY) && (cm_server_last_role == CM_SERVER_PRIMARY)) { @@ -159,26 +145,6 @@ static void clean_cn_heart_beat(int cmServerCurrentRole, int cm_server_last_role } } -static void CmsSyncStandbyMode() -{ - char key[MAX_PATH_LEN] = {0}; - char value[MAX_PATH_LEN] = {0}; - errno_t rc = - snprintf_s(key, MAX_PATH_LEN, MAX_PATH_LEN - 1, "/%s/CMServer/status_key/sync_standby_mode", pw->pw_name); - securec_check_intval(rc, (void)rc); - - DDB_RESULT ddbResult = SUCCESS_GET_VALUE; - status_t st = GetKVFromDDb(key, MAX_PATH_LEN, value, MAX_PATH_LEN, &ddbResult); - if (st != CM_SUCCESS) { - int logLevel = (ddbResult == CAN_NOT_FIND_THE_KEY) ? ERROR : LOG; - write_runlog(logLevel, "failed to get value with key(%s).\n", key); - return; - } - current_cluster_az_status = (synchronous_standby_mode)strtol(value, NULL, 10); - write_runlog(LOG, "setting to %d.\n", current_cluster_az_status); - return; -} - static void CleanSwitchoverCommand() { write_runlog(LOG, "cms change to primary, will clean switchover command.\n"); @@ -257,8 +223,6 @@ static void check_server_role_changed(int cm_server_role) (int)cm_server_start_mode, g_node_num); } #endif - /* get the ddb key-value of the synchronize-standby-mode */ - CmsSyncStandbyMode(); } } @@ -420,13 +384,21 @@ static void CmsChange2Primary(int32 *cmsDemoteDelayOnConnLess) if (g_HA_status->local_role == CM_SERVER_PRIMARY) { return; } - IncrementTermIfCmRestart(); + if (IsNeedSyncDdb()) { + (void)pthread_rwlock_wrlock(&term_update_rwlock); + g_needIncTermToDdbAgain = true; + (void)pthread_rwlock_unlock(&term_update_rwlock); + g_needReloadSyncStandbyMode = true; + } + write_runlog(LOG, "node(%u) cms role is %s, change to primary by ddb, and g_ddbRole is %d.\n", g_currentNode->node, server_role_to_string(g_HA_status->local_role), (int)g_ddbRole); g_HA_status->local_role = CM_SERVER_PRIMARY; *cmsDemoteDelayOnConnLess = cmserver_demote_delay_on_conn_less; ClearSyncWithDdbFlag(); - NotifyDdb(DDB_ROLE_LEADER); + if (g_dbType != DB_SHAREDISK) { + NotifyDdb(DDB_ROLE_LEADER); + } } static void PromoteCmsDirect(int32 *cmsDemoteDelayOnConnLess) diff --git a/src/cm_server/cms_arbitrate_datanode_pms.cpp b/src/cm_server/cms_arbitrate_datanode_pms.cpp index e1e5faf..7c3d282 100644 --- a/src/cm_server/cms_arbitrate_datanode_pms.cpp +++ b/src/cm_server/cms_arbitrate_datanode_pms.cpp @@ -781,6 +781,10 @@ uint32 GetPrimaryTerm(const DnArbCtx *ctx) static bool DnArbitrateInAsync(DnArbCtx *ctx) { + if (g_needReloadSyncStandbyMode) { + write_runlog(LOG, "line %d: wait to reload sync standby mode ddb value.\n", __LINE__); + return true; + } int azIndex = GetAzIndex(ctx); if (azIndex == -1) { return true; diff --git a/src/cm_server/cms_arbitrate_datanode_psd.cpp b/src/cm_server/cms_arbitrate_datanode_psd.cpp index 1fd30b8..64fb1db 100644 --- a/src/cm_server/cms_arbitrate_datanode_psd.cpp +++ b/src/cm_server/cms_arbitrate_datanode_psd.cpp @@ -1072,6 +1072,14 @@ void datanode_instance_arbitrate_for_psd(MsgRecvInfo* recvMsgInfo, const agent_t return; } + if (g_needReloadSyncStandbyMode) { + write_runlog(LOG, + "instance(node=%u instanceid=%u) arbitrate will wait to reload sync standby mode ddb value.\n", + node, + instanceId); + return; + } + GetDatanodeDynamicConfigChangeFromDdb(group_index); (void)pthread_rwlock_wrlock(&(g_instance_group_report_status_ptr[group_index].lk_lock)); diff --git a/src/cm_server/cms_common.cpp b/src/cm_server/cms_common.cpp index 7552701..815870b 100644 --- a/src/cm_server/cms_common.cpp +++ b/src/cm_server/cms_common.cpp @@ -313,6 +313,7 @@ void get_paramter_coordinator_heartbeat_timeout() } #endif + bool CheckBoolConfigParam(const char* value) { if (strcasecmp(value, "on") == 0 || strcasecmp(value, "yes") == 0 || strcasecmp(value, "true") == 0 || @@ -827,6 +828,31 @@ bool SetOfflineNode() return false; } +void CmsSyncStandbyMode() +{ + if (!g_needReloadSyncStandbyMode) { + return; + } + char key[MAX_PATH_LEN] = {0}; + char value[MAX_PATH_LEN] = {0}; + errno_t rc = + snprintf_s(key, MAX_PATH_LEN, MAX_PATH_LEN - 1, "/%s/CMServer/status_key/sync_standby_mode", pw->pw_name); + securec_check_intval(rc, (void)rc); + + DDB_RESULT ddbResult = SUCCESS_GET_VALUE; + status_t st = GetKVFromDDb(key, MAX_PATH_LEN, value, MAX_PATH_LEN, &ddbResult); + if (st != CM_SUCCESS) { + g_needReloadSyncStandbyMode = false; + int logLevel = (ddbResult == CAN_NOT_FIND_THE_KEY) ? ERROR : LOG; + write_runlog(logLevel, "failed to get value with key(%s).\n", key); + return; + } + current_cluster_az_status = (synchronous_standby_mode)strtol(value, NULL, 10); + g_needReloadSyncStandbyMode = false; + write_runlog(LOG, "setting current cluster az status to %d.\n", (int)current_cluster_az_status); + return; +} + bool EnableShareDisk() { return (g_dnArbitrateMode == SHARE_DISK); diff --git a/src/cm_server/cms_common_res.cpp b/src/cm_server/cms_common_res.cpp index df126e3..0ce7568 100644 --- a/src/cm_server/cms_common_res.cpp +++ b/src/cm_server/cms_common_res.cpp @@ -583,9 +583,11 @@ void NotifyCmaDoReg(uint32 destNodeId) continue; } ResIsregStatus isreg = GetIsregStatusByCmInstId(resInfo->resStat[j].cmInstanceId); - if (isreg == CM_RES_ISREG_UNREG || isreg == CM_RES_ISREG_PENDING || isreg == CM_RES_ISREG_INIT) { + if (isreg == CM_RES_ISREG_REG) { + UpdateIsworkList(resInfo->resStat[j].cmInstanceId, RES_INST_WORK_STATUS_AVAIL); + } else if (isreg == CM_RES_ISREG_UNREG || isreg == CM_RES_ISREG_PENDING || isreg == CM_RES_ISREG_INIT) { SendRegMsgToCma(destNodeId, 1, resInfo->resStat[j].resInstanceId, resInfo->resName); - } else if (isreg == CM_RES_ISREG_REG || isreg == CM_RES_ISREG_NOT_SUPPORT) { + } else if (isreg == CM_RES_ISREG_NOT_SUPPORT && resInfo->resStat[j].status == (uint32)CM_RES_STAT_OFFLINE) { UpdateIsworkList(resInfo->resStat[j].cmInstanceId, RES_INST_WORK_STATUS_AVAIL); } } diff --git a/src/cm_server/cms_conn.cpp b/src/cm_server/cms_conn.cpp index a122f34..7de8132 100644 --- a/src/cm_server/cms_conn.cpp +++ b/src/cm_server/cms_conn.cpp @@ -23,6 +23,7 @@ */ #include #include +#include #include #include "cm/cm_elog.h" #include "cms_common.h" @@ -38,31 +39,34 @@ #include "cm_util.h" static const int EPOLL_TIMEOUT = 1000; -static const int SEND_COUNT = 100; static const uint32 ALL_AGENT_NODE_ID = 0xffffffff; static const uint32 MAX_MSG_BUF_POOL_SIZE = 102400; static const uint32 MAX_MSG_BUF_POOL_COUNT = 200; +static const uint32 MAX_MSG_IN_QUE = 100; struct DdbPreAgentCon { uint32 connCount; char conFlag[DDB_MAX_CONNECTIONS]; }; -using MapConns = std::map; +using MapConns = std::map; struct TempConns { MapConns tempConns; - uint32 tempConnSeq; pthread_mutex_t lock; }; +using CM_Connections = struct CM_Connections_t { + uint32 count; + uint32 max_node_id; + CM_Connection* connections[CM_MAX_CONNECTIONS + MAXLISTEN]; + pthread_rwlock_t lock; +} ; TempConns g_tempConns; CM_Connections gConns; DdbPreAgentCon g_preAgentCon = {0}; uint8 g_msgProcFlag[MSG_CM_TYPE_CEIL]; -static int g_wakefd = -1; - int32 InitConn() { MsgPoolInit(MAX_MSG_BUF_POOL_SIZE, MAX_MSG_BUF_POOL_COUNT); @@ -247,7 +251,6 @@ void AddCMAgentConnection(CM_Connection *con) errno_t rc = memset_s(g_preAgentCon.conFlag, sizeof(g_preAgentCon.conFlag), 0, sizeof(g_preAgentCon.conFlag)); securec_check_errno(rc, (void)pthread_rwlock_unlock(&gConns.lock)); } - con->connSeq = 0; (void)pthread_rwlock_unlock(&gConns.lock); con->notifyCn = setNotifyCnFlagByNodeId(con->port->node_id); } @@ -255,11 +258,9 @@ void AddCMAgentConnection(CM_Connection *con) void AddTempConnection(CM_Connection *con) { (void)pthread_mutex_lock(&g_tempConns.lock); - g_tempConns.tempConnSeq++; - con->connSeq = g_tempConns.tempConnSeq; (void)g_tempConns.tempConns.insert(make_pair(con->connSeq, con)); (void)pthread_mutex_unlock(&g_tempConns.lock); - write_runlog(DEBUG5, "AddTempConnection:connSeq=%u\n", con->connSeq); + write_runlog(DEBUG5, "AddTempConnection:connSeq=%lu.\n", con->connSeq); } void RemoveTempConnection(CM_Connection *con) @@ -267,7 +268,7 @@ void RemoveTempConnection(CM_Connection *con) (void)pthread_mutex_lock(&g_tempConns.lock); (void)g_tempConns.tempConns.erase(con->connSeq); (void)pthread_mutex_unlock(&g_tempConns.lock); - write_runlog(DEBUG5, "RemoveTempConnection:connSeq=%u\n", con->connSeq); + write_runlog(DEBUG5, "RemoveTempConnection:connSeq=%lu.\n", con->connSeq); } CM_Connection* GetTempConnection(uint64 connSeq) @@ -452,20 +453,23 @@ void ConnFree(Port* conn) free(conn); } - -static void CloseAllConnections(volatile sig_atomic_t& gotConnsClose, int epollHandle) +static void CloseAllConnections(CM_IOThread *thrinfo) { CM_Connection* con = NULL; - if (gotConnsClose == 1) { + if (thrinfo->gotConnClose == 1) { /* left some time, other thread maybe use the mem of conn. */ cm_sleep(1); bool findepollHandle = false; (void)pthread_rwlock_wrlock(&gConns.lock); write_runlog(LOG, "receive signal to close all the agent connections now, conn count is %u.\n", gConns.count); for (uint32 i = 0; i < gConns.max_node_id + 1; i++) { + if (i % gIOThreads.count != thrinfo->id) { + continue; + } + con = gConns.connections[i]; - if (con != NULL && epollHandle == con->epHandle) { + if (con != NULL && thrinfo->epHandle == con->epHandle) { Assert(con->port->remote_type == CM_AGENT); EventDel(con->epHandle, con); @@ -479,12 +483,12 @@ static void CloseAllConnections(volatile sig_atomic_t& gotConnsClose, int epollH } } if (gConns.count == 0 || g_HA_status->local_role == CM_SERVER_PRIMARY) { - gotConnsClose = 0; + thrinfo->gotConnClose = 0; write_runlog(LOG, "reset close conn flag.\n"); } (void)pthread_rwlock_unlock(&gConns.lock); if (!findepollHandle) { - write_runlog(LOG, "can't get epollHandle %d.\n", epollHandle); + write_runlog(LOG, "can't get epollHandle %d.\n", thrinfo->epHandle); } } } @@ -538,8 +542,9 @@ void* CM_WorkThreadMain(void* argp) CM_WorkThread* thrinfo = (CM_WorkThread*)argp; - thread_name = (thrinfo->type == CM_AGENT) ? "NORMAL WORKER" : "CTL WORKER"; - MsgPriority pri = (thrinfo->type == CM_AGENT) ? MsgPriNormal : MsgPriHigh; + thread_name = (thrinfo->type == CM_AGENT) ? "AGENT_WORKER" : "CTL_WORKER"; + MsgSourceType src = (thrinfo->type == CM_AGENT) ? MsgSrcAgent : MsgSrcCtl; + (void)prctl(PR_SET_NAME, thread_name); (void)pthread_detach(pthread_self()); @@ -548,8 +553,14 @@ void* CM_WorkThreadMain(void* argp) write_runlog(LOG, "cmserver pool thread %lu starting, \n", thrinfo->tid); SetCanProcThisMsgFun(CanProcThisMsg); - uint32 msgCount = 0; + uint32 preMsgCount = 0; + uint64 totalWaitTime = 0; + uint64 totalProcTime = 0; MsgRecvInfo *msg = NULL; + uint64 t0 = GetMonotonicTimeMs(); + + uint32 ioThreadIdx = thrinfo->id % gIOThreads.count; + CM_IOThread* ioThrInfo = &gIOThreads.threads[ioThreadIdx]; for (;;) { if (got_stop == true) { @@ -558,14 +569,16 @@ void* CM_WorkThreadMain(void* argp) continue; } + uint64 t1 = GetMonotonicTimeMs(); thrinfo->isBusy = false; do { - msg = (MsgRecvInfo*)(getRecvMsg(pri, 1, argp)); + msg = (MsgRecvInfo*)(getRecvMsg((PriMsgQues*)ioThrInfo->recvMsgQue, src, 1, argp)); } while (msg == NULL); + uint64 t2 = GetMonotonicTimeMs(); thrinfo->isBusy = true; write_runlog(DEBUG5, - "get message from recv que:remote_type:%s,connSeq=%u,agentNodeId=%u,qtype=%c,len=%d.\n", + "get message from recv que:remote_type:%s,connSeq=%lu,agentNodeId=%u,qtype=%c,len=%d.\n", msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT", msg->connID.connSeq, msg->connID.agentNodeId, @@ -573,22 +586,35 @@ void* CM_WorkThreadMain(void* argp) msg->msg.len); cm_server_process_msg(msg); + uint64 t3 = GetMonotonicTimeMs(); thrinfo->ProcConnID.remoteType = 0; thrinfo->ProcConnID.connSeq = 0; thrinfo->ProcConnID.agentNodeId = 0; FreeBufFromMsgPool((void *)msg); msg = NULL; + thrinfo->procMsgCount++; - if (msgCount % (MSG_COUNT_FOR_LOG) == 0 && msgCount >= (MSG_COUNT_FOR_LOG)) { - write_runlog(DEBUG1, "the thread has deal 300 msg at this time.\n"); - msgCount = 0; + totalWaitTime += t2 - t1; + totalProcTime += t3 - t2; + + if (t3 - t0 > MSG_TIME_FOR_LOG * CM_MS_COUNT_PER_SEC) { + write_runlog(DEBUG5, + "the thread process message:total count:%u,this time:%u,wait time=%lums,proc time=%lums\n", + thrinfo->procMsgCount, + thrinfo->procMsgCount - preMsgCount, + totalWaitTime, + totalProcTime); + totalWaitTime = 0; + totalProcTime = 0; + t0 = t3; + preMsgCount = thrinfo->procMsgCount; } } return thrinfo; } -void pushMsgToQue(CM_Connection* con) +void pushMsgToQue(CM_IOThread *thrinfo, CM_Connection* con) { uint32 totalFreeCount, totalAllocCount, freeCount, allocCount, typeCount; @@ -606,21 +632,17 @@ void pushMsgToQue(CM_Connection* con) write_runlog(LOG, "alloc memory for msg failed,totalFreeCount(%u), totalAllocCount(%u). this type(%u) " "freeCount(%u), allocCount(%u).\n", - totalFreeCount, - totalAllocCount, - allocLen, - freeCount, - allocCount); + totalFreeCount, totalAllocCount, allocLen, freeCount, allocCount); return; } - + errno_t rc = memset_s(msgInfo, (size_t)allocLen, 0, (size_t)allocLen); + securec_check_errno(rc, (void)rc); msgInfo->connID.remoteType = con->port->remote_type; msgInfo->msgProcFlag = 0; msgInfo->msg = *con->inBuffer; msgInfo->msg.data = (char*)&msgInfo->data[0]; if (con->inBuffer->len > 0) { - errno_t rc = - memcpy_s(msgInfo->msg.data, (size_t)con->inBuffer->len, con->inBuffer->data, (size_t)con->inBuffer->len); + rc = memcpy_s(msgInfo->msg.data, (size_t)con->inBuffer->len, con->inBuffer->data, (size_t)con->inBuffer->len); securec_check_errno(rc, (void)rc); } msgInfo->connID.connSeq = con->connSeq; @@ -631,7 +653,7 @@ void pushMsgToQue(CM_Connection* con) } write_runlog(DEBUG5, - "push message to recv que:remote_type:%s,connSeq=%u,agentNodeId=%u,qtype=%c,len=%d.\n", + "push message to recv que:remote_type:%s,connSeq=%lu,agentNodeId=%u,qtype=%c,len=%d.\n", msgInfo->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT", msgInfo->connID.connSeq, msgInfo->connID.agentNodeId, @@ -639,11 +661,15 @@ void pushMsgToQue(CM_Connection* con) msgInfo->msg.len); // push the message to the queue + uint64 t1 = GetMonotonicTimeMs(); + msgInfo->connID.t1 = t1; if (con->port->remote_type == CM_CTL) { - pushRecvMsg(msgInfo, MsgPriHigh); + pushRecvMsg((PriMsgQues*)thrinfo->recvMsgQue, msgInfo, MsgSrcCtl); } else { - pushRecvMsg(msgInfo, MsgPriNormal); + pushRecvMsg((PriMsgQues*)thrinfo->recvMsgQue, msgInfo, MsgSrcAgent); } + uint64 t2 = GetMonotonicTimeMs(); + thrinfo->pushRecvQueWaitTime += (uint32)(t2 - t1); } static bool checkMsg(CM_Connection* con) @@ -730,7 +756,7 @@ static void CleanConBuffer(CM_Connection *con) * @param events My Param doc * @param arg My Param doc */ -static void cm_server_recv_msg(void* arg) +static void cm_server_recv_msg(CM_IOThread *thrinfo, void* arg) { CM_Connection* con = (CM_Connection*)arg; int qtype = 0; @@ -754,7 +780,7 @@ static void cm_server_recv_msg(void* arg) if (!checkMsg(con)) { break; } - pushMsgToQue(con); + pushMsgToQue(thrinfo, con); #ifdef KRB5 } #endif // KRB5 @@ -811,18 +837,17 @@ static void cm_server_recv_msg(void* arg) } } -static void recvMsg(const volatile sig_atomic_t& gotConnsClose, int fds, struct epoll_event *events) +static void recvMsg(int fds, struct epoll_event *events, CM_IOThread *thrinfo) { - static uint32 msgCount = 0; eventfd_t value = 0; for (int i = 0; i < fds; i++) { - if (gotConnsClose) { + if (thrinfo->gotConnClose) { return; } - if (events[i].data.fd == g_wakefd) { - int ret = eventfd_read(g_wakefd, &value); + if (events[i].data.fd == thrinfo->wakefd) { + int ret = eventfd_read(thrinfo->wakefd, &value); write_runlog(DEBUG5, "eventfd_read ret = %d,value=%lu.\n", ret, value); continue; } @@ -832,16 +857,11 @@ static void recvMsg(const volatile sig_atomic_t& gotConnsClose, int fds, struct /* read event */ if (events[i].events & EPOLLIN) { if ((con != NULL) && (con->port != NULL)) { - cm_server_recv_msg(con->arg); - msgCount++; + cm_server_recv_msg(thrinfo, con->arg); + thrinfo->recvMsgCount++; } } } - - if (msgCount % (MSG_COUNT_FOR_LOG) == 0 && msgCount >= (MSG_COUNT_FOR_LOG)) { - write_runlog(DEBUG1, "the thread has received %d msg(s) at this time.\n", MSG_COUNT_FOR_LOG); - msgCount = 0; - } } static CM_Connection *getConnect(const MsgSendInfo* msg) @@ -871,12 +891,49 @@ static CM_Connection *getConnect(const MsgSendInfo* msg) } } - write_runlog(DEBUG5, "getConnect:remote_type=%d,connSeq=%u,agentNodeId=%u,msg_type=%d.\n", + write_runlog(DEBUG5, "getConnect:remote_type=%d,connSeq=%lu,agentNodeId=%u,msg_type=%d.\n", msg->connID.remoteType, msg->connID.connSeq, msg->connID.agentNodeId, msgType); return con; } +static inline uint64 GetIOThreadID(const ConnID connID) +{ + if (connID.remoteType == CM_AGENT) { + return connID.agentNodeId % gIOThreads.count; + } else if (connID.remoteType == CM_CTL) { + return connID.connSeq % gIOThreads.count; + } + + CM_ASSERT(0); + return 0; +} + +static void pushMsgToSendQue(MsgSendInfo *msg, MsgSourceType src) +{ + if (msg->connID.remoteType == CM_AGENT && msg->connID.agentNodeId == ALL_AGENT_NODE_ID) { + for (uint32 i = 1; i < gIOThreads.count; i++) { + uint32 len = sizeof(MsgSendInfo) + msg->dataSize; + MsgSendInfo *msg_cpy = (MsgSendInfo *)AllocBufFromMsgPool(len); + if (msg_cpy == NULL) { + write_runlog(ERROR, "pushMsgToSendQue:AllocBufFromMsgPool failed,size=%u\n", len); + return; + } + errno_t rc = memcpy_s(msg_cpy, len, msg, len); + securec_check_errno(rc, (void)rc); + CM_IOThread *thrinfo = &gIOThreads.threads[i]; + pushSendMsg((PriMsgQues *)thrinfo->sendMsgQue, msg_cpy, src); + } + + CM_IOThread *thrinfo = &gIOThreads.threads[0]; + pushSendMsg((PriMsgQues *)thrinfo->sendMsgQue, msg, src); + } else { + uint64 id = GetIOThreadID(msg->connID); + CM_IOThread *thrinfo = &gIOThreads.threads[id]; + pushSendMsg((PriMsgQues *)thrinfo->sendMsgQue, msg, src); + } +} + static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con) { status_t status = CM_SUCCESS; @@ -884,7 +941,7 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con) uint64 now = GetMonotonicTimeMs(); bool retryProc = false; static const int retrySSLAcceptDetayMs = 10; - write_runlog(LOG, "[InnerProcSSLAccept] now=%lu,procTime=%lu,startTime=%lu,connSeq=%u.\n", + write_runlog(LOG, "[InnerProcSSLAccept] now=%lu,procTime=%lu,startTime=%lu,connSeq=%lu.\n", now, msg->procTime, connMsg->startConnTime, con->connSeq); status = cm_cs_ssl_accept(g_ssl_acceptor_fd, &con->port->pipe); @@ -892,9 +949,9 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con) if (now < connMsg->startConnTime + CM_SSL_IO_TIMEOUT) { retryProc = true; status = CM_SUCCESS; - write_runlog(LOG, "[ProcessSslConnRequest]retry ssl connect,connSeq=%u.\n", con->connSeq); + write_runlog(LOG, "[ProcessSslConnRequest]retry ssl connect,connSeq=%lu.\n", con->connSeq); } else { - write_runlog(ERROR, "[ProcessSslConnRequest]ssl connect timeout,connSeq=%u.\n", con->connSeq); + write_runlog(ERROR, "[ProcessSslConnRequest]ssl connect timeout,connSeq=%lu.\n", con->connSeq); } } @@ -909,9 +966,9 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con) errno_t rc = memcpy_s(nextConnMsg, msgSize, msg, msgSize); securec_check_errno(rc, (void)rc); nextConnMsg->procTime = now + retrySSLAcceptDetayMs; - pushSendMsg(nextConnMsg, MsgPriNormal); + pushMsgToSendQue(nextConnMsg, msg->connID.remoteType == CM_AGENT ? MsgSrcAgent : MsgSrcCtl); write_runlog(LOG, - "[ProcessSslConnRequest]retry ssl connect later,procTime=%lu,connSeq=%u.\n", + "[ProcessSslConnRequest]retry ssl connect later,procTime=%lu,connSeq=%lu.\n", nextConnMsg->procTime, con->connSeq); return; @@ -920,7 +977,7 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con) (void)EventAdd(con->epHandle, (int)EPOLLIN, con); if (status != CM_SUCCESS) { - write_runlog(ERROR, "[ProcessSslConnRequest]srv ssl accept failed,connSeq=%u.\n", con->connSeq); + write_runlog(ERROR, "[ProcessSslConnRequest]srv ssl accept failed,connSeq=%lu.\n", con->connSeq); DisableRemoveConn(con); return; } @@ -930,7 +987,7 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con) AddCMAgentConnection(con); RemoveTempConnection(con); } - write_runlog(LOG, "[ProcessSslConnRequest]srv ssl connect success,connSeq=%u.\n", con->connSeq); + write_runlog(LOG, "[ProcessSslConnRequest]srv ssl connect success,connSeq=%lu.\n", con->connSeq); } /** @@ -988,7 +1045,9 @@ static int32 AssignCmaConnToThread(CM_Connection *con) } /* assign new connection to a work thread by round robin */ - if (CMAssignConnToThread(con, &gIOThread) != STATUS_OK) { + uint32 threadID = con->port->node_id % gIOThreads.count; + CM_IOThread *ioThread = &gIOThreads.threads[threadID]; + if (CMAssignConnToThread(con, ioThread) != STATUS_OK) { write_runlog(LOG, "Assign new CM_AGENT connection to worker thread failed, confd is %d.\n", con->fd); return -1; } @@ -1000,8 +1059,10 @@ static int32 AssignCmctlConnToThread(CM_Connection *con) if (con->fd < 0) { return -1; } + uint64 threadID = con->connSeq % gIOThreads.count; + CM_IOThread *ioThread = &gIOThreads.threads[threadID]; AddTempConnection(con); - if (CMAssignConnToThread(con, &gIOThread) != STATUS_OK) { + if (CMAssignConnToThread(con, ioThread) != STATUS_OK) { write_runlog( LOG, "Assign new connection %d to worker thread failed, confd is %d.\n", con->port->remote_type, con->fd); return -1; @@ -1035,7 +1096,7 @@ static inline CM_Connection *GetCmConnect(const MsgSendInfo* msg) CM_Connection *con = getConnect(msg); if (con == NULL) { write_runlog(ERROR, - "[sendMsgs]get connection failed:remote_type=%s,connSeq=%u,agentNodeId=%u.\n", + "[sendMsgs]get connection failed:remote_type=%s,connSeq=%lu,agentNodeId=%u.\n", msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT", msg->connID.connSeq, msg->connID.agentNodeId); @@ -1079,7 +1140,7 @@ static void CheckConnectAssignThread(const MsgSendInfo* msg) AssignConnToThread(*(CM_Connection **)msg->data); } -void InnerProc(const MsgSendInfo* msg) +void InnerProc(MsgSendInfo* msg) { switch (msg->procMethod) { case PM_REMOVE_CONN: @@ -1095,13 +1156,16 @@ void InnerProc(const MsgSendInfo* msg) case PM_ASSIGN_CONN: CheckConnectAssignThread(msg); break; - default:; + default: + write_runlog(ERROR, "unknown procMethod:%d.\n", (int)msg->procMethod); } + msg->connID.t9 = GetMonotonicTimeMs(); } -static int sendMsg(const MsgSendInfo *msg, CM_Connection *con) +static int sendMsg(MsgSendInfo *msg, CM_Connection *con) { int ret = CmsSendAndFlushMsg(con, msg->msgType, (const char *)&msg->data[0], msg->dataSize, msg->log_level); + msg->connID.t9 = GetMonotonicTimeMs(); if (ret != 0) { write_runlog(ERROR, "CmsSendAndFlushMsg error.\n"); } else { @@ -1115,11 +1179,15 @@ static int sendMsg(const MsgSendInfo *msg, CM_Connection *con) return ret; } -static void sendMsg(const MsgSendInfo *msg) +static void sendMsg(uint32 id, MsgSendInfo *msg) { if (msg->connID.remoteType == CM_AGENT && msg->connID.agentNodeId == ALL_AGENT_NODE_ID) { (void)pthread_rwlock_wrlock(&gConns.lock); for (uint32 i = 0; i < gConns.max_node_id + 1; i++) { + if (i % gIOThreads.count != id) { + continue; + } + CM_Connection *con = gConns.connections[i]; if (con == NULL) { continue; @@ -1135,7 +1203,7 @@ static void sendMsg(const MsgSendInfo *msg) CM_Connection *con = getConnect(msg); if (con == NULL) { write_runlog(ERROR, - "[sendMsgs]get connection failed:remote_type=%s,connSeq=%u,agentNodeId=%u.\n", + "[sendMsgs]get connection failed:remote_type=%s,connSeq=%lu,agentNodeId=%u.\n", msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT", msg->connID.connSeq, msg->connID.agentNodeId); @@ -1148,31 +1216,73 @@ static void sendMsg(const MsgSendInfo *msg) } } -static void procSendMsg(const MsgSendInfo *msg) +static void procSendMsg(CM_IOThread &thrinfo, MsgSendInfo *msg) { + static const int log_interval = 5; + static const int expire_time = 7000; if (msg->procMethod == (int)PM_NONE) { - sendMsg(msg); + thrinfo.sendMsgCount++; + sendMsg(thrinfo.id, msg); } else { write_runlog(DEBUG5, "innerProc,method=%d.\n", msg->procMethod); + thrinfo.innerProcCount++; InnerProc(msg); } + + if (msg->connID.t1 != 0 && msg->connID.t9 != 0 && msg->connID.t9 - msg->connID.t1 > expire_time) { + static volatile time_t pre = 0; + static volatile uint32 discard = 0; + time_t now = time(NULL); + if (now > pre + log_interval) { + write_runlog(WARNING, + "msg_delay:type=%c,procMethod=%d,msgProcFlag=%d,msgType=%d,remoteType=%d,pushRecvQue=%lu,inRecvQue=%lu," + "getRecvQue=%lu,proc=%lu,pushSendQue=%lu,inSendQue=%lu,getSendQue=%lu,send=%lu,discard=%u\n", + msg->msgType, + (int)msg->procMethod, + (int)msg->msgProcFlag, + msg->dataSize > sizeof(int) ? *((int *)msg->data) : -1, + msg->connID.remoteType, + msg->connID.t2 - msg->connID.t1, + msg->connID.t3 - msg->connID.t2, + msg->connID.t4 - msg->connID.t3, + msg->connID.t5 - msg->connID.t4, + msg->connID.t6 - msg->connID.t5, + msg->connID.t7 - msg->connID.t6, + msg->connID.t8 - msg->connID.t7, + msg->connID.t9 - msg->connID.t8, + discard); + pre = now; + discard = 0; + } else { + ++discard; + } + } } -static void sendMsgs() +static void sendMsgs(CM_IOThread &thrinfo) { - static uint32 msgCount = 0; - size_t total = getSendMsgCount(); + PriMsgQues *sendQue = (PriMsgQues*)thrinfo.sendMsgQue; + size_t total = getMsgCount(sendQue); size_t procCount = 0; + if (total == 0) { + return; + } + for (;;) { - MsgSendInfo *msg = (MsgSendInfo*)(getSendMsg()); + uint64 t1 = GetMonotonicTimeMs(); + MsgSendInfo *msg = (MsgSendInfo *)(getSendMsg(sendQue, MsgSrcAgent)); if (msg == NULL) { - write_runlog(DEBUG5, "no message in send que.\n"); + uint64 t2 = GetMonotonicTimeMs(); + thrinfo.getSendQueWaitTime += (uint32)(t2 - t1); break; } + uint64 t2 = GetMonotonicTimeMs(); + thrinfo.getSendQueWaitTime += (uint32)(t2 - t1); + write_runlog(DEBUG5, - "get message from send que:remote_type:%s,connSeq=%u,agentNodeId=%u,msgType=%c:%d,len=%u.\n", + "get message from send que:remote_type:%s,connSeq=%lu,agentNodeId=%u,msgType=%c:%d,len=%u.\n", msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT", msg->connID.connSeq, msg->connID.agentNodeId, @@ -1180,14 +1290,10 @@ static void sendMsgs() msg->dataSize > sizeof(int) ? *((int *)msg->data) : 0, // internal process message's datasize maybe 0 msg->dataSize); - procSendMsg(msg); + procSendMsg(thrinfo, msg); FreeBufFromMsgPool(msg); msg = NULL; - if (msgCount++ % SEND_COUNT == 0) { - write_runlog(DEBUG1, "the thread has send %u msg at this time.\n", msgCount); - } - procCount++; if (procCount >= total) { break; @@ -1195,11 +1301,14 @@ static void sendMsgs() } } -static void WakeSenderFunc(void) +static void WakeSenderFunc(const ConnID connID) { + uint64 id = GetIOThreadID(connID); + CM_IOThread *thrinfo = &gIOThreads.threads[id]; + int wakefd = thrinfo->wakefd; eventfd_t value = pthread_self(); - if (g_wakefd >= 0) { - int ret = eventfd_write(g_wakefd, value); + if (wakefd >= 0) { + int ret = eventfd_write(wakefd, value); if (ret != 0) { write_runlog(ERROR, "eventfd_write failed.ret = %d,errno=%d,value=%lu.\n", ret, errno, value); } @@ -1208,44 +1317,48 @@ static void WakeSenderFunc(void) } } -static int CreateWakeupEvent(int epollHandle) +static int CreateWakeupEvent(int epollHandle, int &wakefd) { - g_wakefd = eventfd(0, 0); - if (g_wakefd < 0) { + wakefd = eventfd(0, 0); + if (wakefd < 0) { write_runlog(ERROR, "eventfd error :%d.\n", errno); return CM_ERROR; } - write_runlog(LOG, "eventfd :%d.\n", g_wakefd); + write_runlog(LOG, "eventfd :%d.\n", wakefd); - struct epoll_event ev = {.events = (uint32)EPOLLIN, {.fd = g_wakefd}}; - if (epoll_ctl(epollHandle, EPOLL_CTL_ADD, g_wakefd, &ev) != 0) { + struct epoll_event ev = {.events = (uint32)EPOLLIN, {.fd = wakefd}}; + if (epoll_ctl(epollHandle, EPOLL_CTL_ADD, wakefd, &ev) != 0) { write_runlog(ERROR, "epoll_ctl error :%d.\n", errno); - (void)close(g_wakefd); - g_wakefd = -1; + (void)close(wakefd); + wakefd = -1; return CM_ERROR; } return CM_SUCCESS; } -void* CM_IOThreadMain(void* argp) +void *CM_IOThreadMain(void *argp) { int epollHandle; struct epoll_event events[MAX_EVENTS]; sigset_t block_sig_set; + CM_IOThread *thrinfo = (CM_IOThread *)argp; + time_t time1 = time(NULL); - CM_IOThread* thrinfo = (CM_IOThread*)argp; - thread_name = "IO_THREAD"; + thread_name = "IO_WORKER"; + (void)prctl(PR_SET_NAME, thread_name); epollHandle = thrinfo->epHandle; - if (CreateWakeupEvent(epollHandle) != CM_SUCCESS) { + if (CreateWakeupEvent(epollHandle, thrinfo->wakefd) != CM_SUCCESS) { return NULL; } + + uint64 epollWait = 0, recvMsgTime = 0, sendMsgTime = 0, count = 0; + (void)pthread_detach(pthread_self()); setBlockSigMask(&block_sig_set); setWakeSenderFunc(WakeSenderFunc); int waitTime = EPOLL_TIMEOUT; - volatile sig_atomic_t& gotConnsClose = got_conns_close[thrinfo->id]; write_runlog(LOG, "cmserver pool thread %lu starting, epollfd is %d.\n", thrinfo->tid, epollHandle); for (;;) { if (got_stop == 1) { @@ -1254,15 +1367,10 @@ void* CM_IOThreadMain(void* argp) continue; } - CloseAllConnections(gotConnsClose, epollHandle); + CloseAllConnections(thrinfo); thrinfo->isBusy = false; - /* wait for events to happen, 5s timeout */ - if (existSendMsg()) { - waitTime = 1; - } else { - waitTime = EPOLL_TIMEOUT; - } + uint64 t2 = GetMonotonicTimeMs(); int fds = epoll_pwait(epollHandle, events, MAX_EVENTS, waitTime, &block_sig_set); if (fds < 0) { if (errno != EINTR && errno != EWOULDBLOCK) { @@ -1271,21 +1379,49 @@ void* CM_IOThreadMain(void* argp) } } thrinfo->isBusy = true; + uint64 t3 = GetMonotonicTimeMs(); if (fds > 0) { - recvMsg(gotConnsClose, fds, events); + recvMsg(fds, events, thrinfo); } + uint64 t4 = GetMonotonicTimeMs(); + sendMsgs(*thrinfo); + uint64 t5 = GetMonotonicTimeMs(); - sendMsgs(); + epollWait += t3 - t2; + recvMsgTime += t4 - t3; + sendMsgTime += t5 - t4; + count++; + time_t time2 = time(NULL); + if (time2 - time1 >= MSG_TIME_FOR_LOG) { + size_t totalRecvMsg = getMsgCount((PriMsgQues *)thrinfo->recvMsgQue); + size_t totalSendMsg = getMsgCount((PriMsgQues *)thrinfo->sendMsgQue); + if (totalRecvMsg >= MAX_MSG_IN_QUE || totalSendMsg >= MAX_MSG_IN_QUE) { + write_runlog(LOG, + "total receive count:%u,send count:%u,innerProc count:%u;recv que size:%lu,send que size:%lu," + "push send msg wait:%u,get send msg wait:%u,epoll wait=%lu,recv msg=%lu,send msg=%lu,count=%lu\n", + thrinfo->recvMsgCount, thrinfo->sendMsgCount, thrinfo->innerProcCount, totalRecvMsg, totalSendMsg, + thrinfo->pushRecvQueWaitTime / CM_MS_COUNT_PER_SEC, + thrinfo->getSendQueWaitTime / CM_MS_COUNT_PER_SEC, + epollWait, recvMsgTime, sendMsgTime, count); + } + epollWait = recvMsgTime = sendMsgTime = 0; + count = 0; + time1 = time2; + } } (void)close(epollHandle); - (void)close(g_wakefd); - g_wakefd = -1; + thrinfo->epHandle = -1; + (void)close(thrinfo->wakefd); + thrinfo->wakefd = -1; + delete (PriMsgQues*)thrinfo->recvMsgQue; + thrinfo->recvMsgQue = NULL; + delete (PriMsgQues*)thrinfo->sendMsgQue; + thrinfo->sendMsgQue = NULL; return thrinfo; } - /** * @brief add/mod an event to epoll * @@ -1626,9 +1762,7 @@ static int asyncSendMsgInner(const ConnID& connID, uint8 msgProcFlag, char msgty write_runlog(ERROR, "RespondMsg:AllocBufFromMsgPool failed,size=%u\n", (uint32)(sizeof(MsgSendInfo) + len)); return (int)ERR_ALLOC_MEMORY; } - msg->connID.remoteType = connID.remoteType; - msg->connID.agentNodeId = connID.agentNodeId; - msg->connID.connSeq = connID.connSeq; + msg->connID = connID; msg->procTime = 0; msg->log_level = log_level; msg->dataSize = (uint32)len; @@ -1641,25 +1775,22 @@ static int asyncSendMsgInner(const ConnID& connID, uint8 msgProcFlag, char msgty } write_runlog(DEBUG1, - "push message to send que:remote_type:%s,connSeq=%u,agentNodeId=%u,msgType=%c,len=%u.\n", + "push message to send que:remote_type:%s,connSeq=%lu,agentNodeId=%u,msgType=%c,len=%u.\n", msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT", msg->connID.connSeq, msg->connID.agentNodeId, msg->msgType, msg->dataSize); - pushSendMsg(msg, msg->connID.remoteType == CM_AGENT ? MsgPriNormal : MsgPriHigh); + pushMsgToSendQue(msg, msg->connID.remoteType == CM_CTL ? MsgSrcCtl : MsgSrcAgent); return 0; } -int RespondMsg(const MsgRecvInfo* recvMsg, char msgtype, const char *s, size_t len, int log_level) +int RespondMsg(MsgRecvInfo* recvMsg, char msgtype, const char *s, size_t len, int log_level) { - ConnID connID; - connID.remoteType = recvMsg->connID.remoteType; - connID.connSeq = recvMsg->connID.connSeq; - connID.agentNodeId = recvMsg->connID.agentNodeId; - return asyncSendMsgInner(connID, recvMsg->msgProcFlag, msgtype, s, len, log_level); + recvMsg->connID.t5 = GetMonotonicTimeMs(); + return asyncSendMsgInner(recvMsg->connID, recvMsg->msgProcFlag, msgtype, s, len, log_level); } int SendToAgentMsg(uint agentNodeId, char msgtype, const char *s, size_t len, int log_level) @@ -1687,9 +1818,7 @@ void AsyncProcMsg(const MsgRecvInfo *recvMsg, IOProcMethond procMethod, const ch write_runlog(ERROR, "[%s] AllocBufFromMsgPool failed.\n", __FUNCTION__); return; } - msg->connID.remoteType = recvMsg->connID.remoteType; - msg->connID.connSeq = recvMsg->connID.connSeq; - msg->connID.agentNodeId = recvMsg->connID.agentNodeId; + msg->connID = recvMsg->connID; msg->procTime = 0; msg->dataSize = len; msg->msgType = 0; @@ -1704,12 +1833,12 @@ void AsyncProcMsg(const MsgRecvInfo *recvMsg, IOProcMethond procMethod, const ch } write_runlog(logLevel, - "push message to send que:remote_type:%s,connSeq=%u,agentNodeId=%u,procMethod=%d,len=%u.\n", + "push message to send que:remote_type:%s,connSeq=%lu,agentNodeId=%u,procMethod=%d,len=%u.\n", msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT", msg->connID.connSeq, msg->connID.agentNodeId, (int)msg->procMethod, msg->dataSize); - pushSendMsg(msg, msg->connID.remoteType == CM_AGENT ? MsgPriNormal : MsgPriHigh); + pushMsgToSendQue(msg, msg->connID.remoteType == CM_CTL ? MsgSrcCtl : MsgSrcAgent); } diff --git a/src/cm_server/cms_ddb_adapter.cpp b/src/cm_server/cms_ddb_adapter.cpp index 3e8682f..92383a3 100644 --- a/src/cm_server/cms_ddb_adapter.cpp +++ b/src/cm_server/cms_ddb_adapter.cpp @@ -392,7 +392,7 @@ static bool IsCurrentNodeInVoteAZ() status_t InitDdbArbitrate(DrvApiInfo *drvApiInfo) { - if (g_dbType != DB_ETCD) { + if (g_dbType != DB_ETCD && g_dbType != DB_SHAREDISK) { return CM_SUCCESS; } @@ -909,7 +909,7 @@ DdbConn *GetNextDdbConn() uint32 idx = 0; for (uint32 i = 0; i < g_sess->count; ++i) { idx = (g_sess->curIdx + i) % g_sess->count; - if (g_sess->ddbConn[idx].state == PROCESS_IN_IDLE) { + if (g_sess->ddbConn[idx].state != PROCESS_IN_RUNNING) { break; } } diff --git a/src/cm_server/cms_global_params.cpp b/src/cm_server/cms_global_params.cpp index fc16549..72879e7 100644 --- a/src/cm_server/cms_global_params.cpp +++ b/src/cm_server/cms_global_params.cpp @@ -205,6 +205,8 @@ bool g_getHistoryCnStatusFromDdb = false; bool g_needIncTermToDdbAgain = false; bool g_clusterStarting = false; bool g_isSharedStorageMode = false; +volatile bool g_needReloadSyncStandbyMode = false; + volatile uint32 g_refreshDynamicCfgNum = 0; bool g_elastic_exist_node = false; @@ -249,7 +251,7 @@ int *g_lastCnDnDisconnectTimes = NULL; volatile switchover_az_mode cm_switchover_az_mode = AUTOSWITCHOVER_AZ; volatile logic_cluster_restart_mode cm_logic_cluster_restart_mode = INITIAL_LOGIC_CLUSTER_RESTART; -CM_IOThread gIOThread; +CM_IOThreads gIOThreads; CM_WorkThreads gWorkThreads; CM_HAThreads gHAThreads; CM_MonitorThread gMonitorThread; @@ -370,47 +372,6 @@ void initazArray(char azArray[][CM_AZ_NAME]) } } -bool is_majority_reelection_exceptional_condition(synchronous_standby_mode current_datanode_sync_mode, - int dynamic_primary_count, int staticPrimaryIndex, bool static_primary_and_dynamic_primary_differs) -{ - /* - * Skip majority re-election if we are in minority mode, - * Skip majority re-election if we are not in a multi-active standby cluster - * Skip majority re-election if we are doing AZ auto-switchover - */ - if (cm_arbitration_mode == MINORITY_ARBITRATION || !g_multi_az_cluster || switchoverAZInProgress) { - return true; - } - - /* Skip majority re-election if we don't have all AZ's available to us */ - if ((current_cluster_az_status >= AnyAz1 && current_cluster_az_status <= FirstAz2) || - (current_datanode_sync_mode >= AnyAz1 && current_datanode_sync_mode <= FirstAz2)) { - return true; - } - - /* - * Skip majority re-election if we are just promoted to CMS primary, - * as we attempt to keep the original configuration of datanode primary/standbys - * Skip majority re-election if we don't have a static primary, which should not happen - */ - if (arbitration_majority_reelection_timeout > 0 || staticPrimaryIndex < 0) { - return true; - } - - /* - * Currently we don't have interactive communication between CM server and datanodes, - * and we cannot failover to a datanode whose dynamic role is already a primary. - * Because of these constraint, we cannot deal with multiple-dynamic-primary scenarios correctly. - * Leave that to partition locking later on. - */ - const int onePrimary = 1; - if (dynamic_primary_count > onePrimary || static_primary_and_dynamic_primary_differs) { - return true; - } - - return false; -} - maintenance_mode getMaintenanceMode(const uint32 &group_index) { maintenance_mode mode = MAINTENANCE_MODE_NONE; diff --git a/src/cm_server/cms_main.cpp b/src/cm_server/cms_main.cpp index b8b0cf8..e0a4d13 100644 --- a/src/cm_server/cms_main.cpp +++ b/src/cm_server/cms_main.cpp @@ -53,8 +53,8 @@ volatile sig_atomic_t g_SetReplaceCnStatus = 0; /* main thread exit after HA thread close connection */ volatile sig_atomic_t ha_connection_closed = 0; -volatile sig_atomic_t got_conns_close[CM_IO_THREAD_COUNT] = {0}; pid_t cm_agent = 0; +uint64 gConnSeq = 0; const char* g_progname; static char g_appPath[MAXPGPATH] = {0}; @@ -140,8 +140,8 @@ static void stop_signal_reaper(int arg) static void close_all_agent_connections(int arg) { - for (int i = 0; i < CM_IO_THREAD_COUNT; i++) { - got_conns_close[i] = 1; + for (uint32 i = 0; i < gIOThreads.count; i++) { + gIOThreads.threads[i].gotConnClose = 1; } } @@ -1594,7 +1594,7 @@ static int32 CmSetConnState(CM_Connection *con) errno_t rc = memset_s(&recvMsg, sizeof(MsgRecvInfo), 0, sizeof(MsgRecvInfo)); securec_check_errno(rc, (void)rc); recvMsg.connID.agentNodeId = con->port->node_id; - recvMsg.connID.connSeq = 0; + recvMsg.connID.connSeq = con->connSeq; recvMsg.connID.remoteType = con->port->remote_type; AsyncProcMsg(&recvMsg, PM_ASSIGN_CONN, (const char *)&con, sizeof(CM_Connection *)); } else { @@ -1866,6 +1866,7 @@ static CM_Connection* makeConnection(int fd, Port* port) listenCon->fd = fd; listenCon->port = port; listenCon->inBuffer = CM_makeStringInfo(); + listenCon->connSeq = gConnSeq++; Assert(listenCon->inBuffer != NULL); @@ -2620,22 +2621,41 @@ int main(int argc, char** argv) } } - status = CM_CreateWorkThreadPool(cm_thread_count); - if (status < 0) { - write_runlog(ERROR, "Create Threads Pool failed!\n"); - CloseAllDdbSession(); - FreeNotifyMsg(); - return -1; + // worker: total [5,1000] s 5 6 10 32 50 100 200 500 1000 + // IO worker a = s/3 1 2 3 10 16 33 66 166 333 + // clt worker b = (s-a)/2 2 2 3 11 17 33 66 167 333 + // agent worker c = s-a-b 2 2 4 11 17 34 68 167 334 + + // node num <32 <32 >32 >32 + // agent worker <=4 >4 <=4 >4 + // ctl worker 2 2 2 4 + + const uint32 workerCountPerNode = 3; + uint32 totalWorker = cm_thread_count; + if ((uint32)cm_thread_count > g_node_num * workerCountPerNode) { + totalWorker = g_node_num * workerCountPerNode; } - - status = CM_CreateIOThread(); + + uint32 ioWorkerCount = totalWorker / 3; + uint32 cltWorkerCount = (totalWorker - ioWorkerCount) / 2; + uint32 agentWorkerCount = (totalWorker - ioWorkerCount) - cltWorkerCount; + + status = CM_CreateIOThreadPool(ioWorkerCount); if (status < 0) { write_runlog(ERROR, "Create IOThreads failed!\n"); CloseAllDdbSession(); FreeNotifyMsg(); return -1; } - + + status = CM_CreateWorkThreadPool(cltWorkerCount, agentWorkerCount); + if (status < 0) { + write_runlog(ERROR, "Create Threads Pool failed!\n"); + CloseAllDdbSession(); + FreeNotifyMsg(); + return -1; + } + g_inMaintainMode = IsMaintainFileExist(); status = CM_CreateHA(); diff --git a/src/cm_server/cms_msg_que.cpp b/src/cm_server/cms_msg_que.cpp index 4beb025..f777aab 100644 --- a/src/cm_server/cms_msg_que.cpp +++ b/src/cm_server/cms_msg_que.cpp @@ -22,44 +22,25 @@ * ------------------------------------------------------------------------- */ -#include -#include #include "elog.h" #include "cm_c.h" #include "cm_util.h" +#include "cm_msg_buf_pool.h" + #include "cms_msg_que.h" -using MsgQueType = std::deque; -using MsgQuePtr = MsgQueType*; - -struct PriMsgQues { - MsgQuePtr Ques[MSG_PRI_COUNT]; - pthread_mutex_t msg_lock; - pthread_cond_t msg_cond; - CMPrioMutex prioLock; -}; - -static PriMsgQues g_recvMsgQues; -static PriMsgQues g_sendMsgQues; - static wakeSenderFuncType wakeSenderFunc = NULL; static CanProcThisMsgFunType CanProcThisMsgFun = NULL; -static void InitMsgQue(PriMsgQues &que) +void InitMsgQue(PriMsgQues &que) { - for (int i = 0; i < (int)MSG_PRI_COUNT; i++) { - que.Ques[i] = NULL; + for (int i = 0; i < (int)MSG_SRC_COUNT; i++) { + CMFairMutexInit(que.ques[i].fairLock); } - (void)pthread_mutex_init(&que.msg_lock, NULL); - (void)pthread_cond_init(&que.msg_cond, NULL); - CMPrioMutexInit(que.prioLock); -} -void InitMsgQue() -{ - InitMsgQue(g_recvMsgQues); - InitMsgQue(g_sendMsgQues); + (void)pthread_mutex_init(&que.msgLock, NULL); + (void)pthread_cond_init(&que.msgCond, NULL); } void setWakeSenderFunc(wakeSenderFuncType func) @@ -71,147 +52,147 @@ void SetCanProcThisMsgFun(CanProcThisMsgFunType func) { CanProcThisMsgFun = func; } -static void pushToQue(PriMsgQues *priQue, MsgPriority pri, const char *msg) + +size_t getMsgCount(PriMsgQues *priQue) { - if (priQue->Ques[pri] == NULL) { - priQue->Ques[pri] = new MsgQueType; - if (priQue->Ques[pri] == NULL) { - write_runlog(ERROR, "pushToQue:out of memory.\n"); - return; - } - } - priQue->Ques[pri]->push_back(msg); -} + size_t count = 0; -void pushRecvMsg(const MsgRecvInfo *msg, MsgPriority pri) -{ - Assert(pri >= 0 && pri < MSG_PRI_COUNT); - PriMsgQues *priQue = &g_recvMsgQues; - - (void)CMPrioMutexLock(priQue->prioLock, CMMutexPrio::CM_MUTEX_PRIO_HIGH); - pushToQue(priQue, pri, (const char*)msg); - CMPrioMutexUnLock(priQue->prioLock); - - (void)pthread_cond_broadcast(&priQue->msg_cond); -} - -const MsgRecvInfo *getRecvMsg(MsgPriority pri, uint32 waitTime, void *threadInfo) -{ - Assert(pri >= 0 && pri < MSG_PRI_COUNT); - PriMsgQues *priQue = &g_recvMsgQues; - const MsgRecvInfo *msg = NULL; - struct timespec tv; - - (void)CMPrioMutexLock(priQue->prioLock, CMMutexPrio::CM_MUTEX_PRIO_NORMAL); - - for (int i = 0; i < (int)pri + 1; i++) { - MsgQuePtr que = priQue->Ques[i]; - if (que == NULL) { - continue; - } - - MsgQueType::iterator it = que->begin(); - for (; it != que->end(); ++it) { - if (CanProcThisMsgFun == NULL || CanProcThisMsgFun(threadInfo, *it)) { - msg = (const MsgRecvInfo *)*it; - (void)que->erase(it); - break; - } - } - - if (msg != NULL) { - break; - } + for (int i = 0; i < (int)MSG_SRC_COUNT; i++) { + MsgQuePtr que = &priQue->ques[i].que; + count += que->size(); } - CMPrioMutexUnLock(priQue->prioLock); - - if (msg == NULL && waitTime > 0) { - (void)clock_gettime(CLOCK_REALTIME, &tv); - tv.tv_sec = tv.tv_sec + (long long)waitTime; - (void)pthread_mutex_lock(&priQue->msg_lock); - (void)pthread_cond_timedwait(&priQue->msg_cond, &priQue->msg_lock, &tv); - (void)pthread_mutex_unlock(&priQue->msg_lock); - } - - return msg; + return count; } -void pushSendMsg(const MsgSendInfo *msg, MsgPriority pri) +bool existMsg(const PriMsgQues *priQue) { - Assert(pri >= 0 && pri < MSG_PRI_COUNT); - PriMsgQues *priQue = &g_sendMsgQues; - - (void)CMPrioMutexLock(priQue->prioLock, CMMutexPrio::CM_MUTEX_PRIO_NORMAL); - pushToQue(priQue, pri, (const char*)msg); - CMPrioMutexUnLock(priQue->prioLock); - - if (wakeSenderFunc != NULL) { - wakeSenderFunc(); - } -} - -const MsgSendInfo *getSendMsg() -{ - const MsgSendInfo *msg = NULL; - PriMsgQues *priQue = &g_sendMsgQues; - - uint64 now = GetMonotonicTimeMs(); - (void)CMPrioMutexLock(priQue->prioLock, CMMutexPrio::CM_MUTEX_PRIO_HIGH); - - for (int i = 0; i < (int)MSG_PRI_COUNT; i++) { - MsgQuePtr que = priQue->Ques[i]; - if (que == NULL) { - continue; - } - - MsgQueType::iterator it = que->begin(); - for (; it != que->end(); ++it) { - const MsgSendInfo* sendMsg = (MsgSendInfo*)(*it); - if (sendMsg->procTime == 0 || sendMsg->procTime <= now) { - msg = (const MsgSendInfo *)(*it); - (void)que->erase(it); - break; - } - } - - if (msg != NULL) { - break; - } - } - - CMPrioMutexUnLock(priQue->prioLock); - - return msg; -} -bool existSendMsg() -{ - PriMsgQues *priQue = &g_sendMsgQues; - for (int i = 0; i < (int)MSG_PRI_COUNT; i++) { - MsgQuePtr que = priQue->Ques[i]; - if (que != NULL) { - if (!que->empty()) { - return true; - } + for (int i = 0; i < (int)MSG_SRC_COUNT; i++) { + ConstMsgQuePtr que = &priQue->ques[i].que; + if (!que->empty()) { + return true; } } return false; } -size_t getSendMsgCount() +void pushRecvMsg(PriMsgQues *priQue, MsgRecvInfo *msg, MsgSourceType src) { - PriMsgQues *priQue = &g_sendMsgQues; - size_t count = 0; + Assert(src >= 0 && src < MSG_SRC_COUNT); - (void)CMPrioMutexLock(priQue->prioLock, CMMutexPrio::CM_MUTEX_PRIO_HIGH); - for (int i = 0; i < (int)MSG_PRI_COUNT; i++) { - MsgQuePtr que = priQue->Ques[i]; - if (que != NULL) { - count += que->size(); - } - } - CMPrioMutexUnLock(priQue->prioLock); + (void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_WRITE); + msg->connID.t2 = GetMonotonicTimeMs(); + priQue->ques[src].que.push_back((const char *)msg); + CMFairMutexUnLock(priQue->ques[src].fairLock); - return count; + (void)pthread_cond_broadcast(&priQue->msgCond); } + +static const MsgRecvInfo *getRecvMsgInner(PriMsgQues *priQue, MsgSourceType src, void *threadInfo) +{ + Assert(src >= 0 && src < MSG_SRC_COUNT); + MsgRecvInfo *msg = NULL; + uint64 t3 = GetMonotonicTimeMs(); + + if (!existMsg(priQue)) { + return NULL; + } + + for (int i = 0; i < (int)MSG_SRC_COUNT; i++) { + MsgQuePtr que = &priQue->ques[src].que; + (void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_READ); + MsgQueType::iterator it = que->begin(); + for (; it != que->end(); ++it) { + if (CanProcThisMsgFun == NULL || CanProcThisMsgFun(threadInfo, *it)) { + msg = (MsgRecvInfo *)*it; + (void)que->erase(it); + msg->connID.t3 = t3; + msg->connID.t4 = GetMonotonicTimeMs(); + break; + } + } + CMFairMutexUnLock(priQue->ques[src].fairLock); + + if (msg != NULL) { + break; + } + src = (src == MsgSrcAgent) ? MsgSrcCtl : MsgSrcAgent; // switch src type; + } + + return msg; +} + +const MsgRecvInfo *getRecvMsg(PriMsgQues *priQue, MsgSourceType src, uint32 waitTime, void *threadInfo) +{ + struct timespec tv; + if (priQue == NULL) { + return NULL; + } + + const MsgRecvInfo* msg = getRecvMsgInner(priQue, src, threadInfo); + + if (msg == NULL && waitTime > 0) { + (void)clock_gettime(CLOCK_REALTIME, &tv); + tv.tv_sec = tv.tv_sec + (long long)waitTime; + (void)pthread_mutex_lock(&priQue->msgLock); + (void)pthread_cond_timedwait(&priQue->msgCond, &priQue->msgLock, &tv); + (void)pthread_mutex_unlock(&priQue->msgLock); + } + + return msg; +} + +void pushSendMsg(PriMsgQues *priQue, MsgSendInfo *msg, MsgSourceType src) +{ + Assert(src >= 0 && src < MSG_SRC_COUNT); + ConnID connID = msg->connID; + (void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_WRITE); + priQue->ques[src].que.push_back((const char*)msg); + msg->connID.t6 = GetMonotonicTimeMs(); + CMFairMutexUnLock(priQue->ques[src].fairLock); + + if (wakeSenderFunc != NULL) { + wakeSenderFunc(connID); + } +} + +const MsgSendInfo *getSendMsg(PriMsgQues *priQue, MsgSourceType src) +{ + const MsgSendInfo *msg = NULL; + + if (!existMsg(priQue)) { + return NULL; + } + + uint64 now = GetMonotonicTimeMs(); + for (int i = 0; i < (int)MSG_SRC_COUNT; i++) { + MsgQuePtr que = &priQue->ques[src].que; + (void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_READ); + MsgQueType::iterator it = que->begin(); + for (; it != que->end(); ++it) { + MsgSendInfo *sendMsg = (MsgSendInfo *)(*it); + if (sendMsg->procTime == 0 || sendMsg->procTime <= now) { + msg = sendMsg; + (void)que->erase(it); + sendMsg->connID.t7 = now; + sendMsg->connID.t8 = GetMonotonicTimeMs(); + break; + } + } + CMFairMutexUnLock(priQue->ques[src].fairLock); + + if (msg != NULL) { + break; + } + src = (src == MsgSrcAgent) ? MsgSrcCtl : MsgSrcAgent; + } + + return msg; +} + +bool existSendMsg(const PriMsgQues *priQue) +{ + return existMsg(priQue); +} + diff --git a/src/cm_server/cms_process_messages_ctl.cpp b/src/cm_server/cms_process_messages_ctl.cpp index 1381e65..a9fc31d 100644 --- a/src/cm_server/cms_process_messages_ctl.cpp +++ b/src/cm_server/cms_process_messages_ctl.cpp @@ -550,11 +550,6 @@ void process_ctl_to_cm_switchover_full_msg( cm_to_ctl_command_ack msgSwitchoverFullAck = {0}; msgSwitchoverFullAck.msg_type = MSG_CM_CTL_SWITCHOVER_FULL_ACK; - if (CheckEnableFlag()) { - msgSwitchoverFullAck.command_result = CM_INVALID_COMMAND; - (void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgSwitchoverFullAck), sizeof(cm_to_ctl_command_ack)); - return; - } if (backup_open != CLUSTER_PRIMARY) { msgSwitchoverFullAck.msg_type = MSG_CM_CTL_BACKUP_OPEN; (void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgSwitchoverFullAck), sizeof(cm_to_ctl_command_ack)); @@ -928,11 +923,6 @@ void ProcessCtlToCmSwitchoverFullCheckMsg(MsgRecvInfo* recvMsgInfo) cm_to_ctl_switchover_full_check_ack msgSwitchoverFullCheckAck; msgSwitchoverFullCheckAck.msg_type = MSG_CM_CTL_SWITCHOVER_FULL_CHECK_ACK; - if (CheckEnableFlag()) { - msgSwitchoverFullCheckAck.switchoverDone = INVALID_COMMAND; - (void)RespondMsg(recvMsgInfo, 'S', (char*)(&msgSwitchoverFullCheckAck), sizeof(msgSwitchoverFullCheckAck)); - return; - } int32 switchoverDone = GetSwitchoverDone("[ProcessCtlToCmSwitchoverFullCheckMsg]"); msgSwitchoverFullCheckAck.switchoverDone = switchoverDone; @@ -958,11 +948,6 @@ void ProcessCtlToCmSwitchoverAzCheckMsg(MsgRecvInfo* recvMsgInfo) cm_to_ctl_switchover_az_check_ack msgSwitchoverAZCheckAck; msgSwitchoverAZCheckAck.msg_type = MSG_CM_CTL_SWITCHOVER_AZ_CHECK_ACK; - if (CheckEnableFlag()) { - msgSwitchoverAZCheckAck.switchoverDone = INVALID_COMMAND; - (void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgSwitchoverAZCheckAck), sizeof(msgSwitchoverAZCheckAck)); - return; - } int32 switchoverDone = GetSwitchoverDone("[ProcessCtlToCmSwitchoverAzCheckMsg]"); msgSwitchoverAZCheckAck.switchoverDone = switchoverDone; @@ -2153,11 +2138,6 @@ void ProcessCtlToCmSwitchoverAllMsg(MsgRecvInfo* recvMsgInfo, const ctl_to_cm_sw cm_to_ctl_command_ack msgSwitchoverAllAck = { 0 }; msgSwitchoverAllAck.msg_type = MSG_CM_CTL_SWITCHOVER_ALL_ACK; - if (CheckEnableFlag()) { - msgSwitchoverAllAck.command_result = CM_INVALID_COMMAND; - (void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgSwitchoverAllAck), sizeof(cm_to_ctl_command_ack)); - return; - } if (backup_open != CLUSTER_PRIMARY) { msgSwitchoverAllAck.msg_type = MSG_CM_CTL_BACKUP_OPEN; (void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgSwitchoverAllAck), sizeof(cm_to_ctl_command_ack)); diff --git a/src/cm_server/cms_process_messages_res.cpp b/src/cm_server/cms_process_messages_res.cpp index 05f063f..dfad408 100644 --- a/src/cm_server/cms_process_messages_res.cpp +++ b/src/cm_server/cms_process_messages_res.cpp @@ -217,16 +217,16 @@ static const char* GetClusterResStatStr(MaxClusterResStatus stat) return ""; } -MaxClusterResStatus GetResNodeStat(uint32 nodeId) +MaxClusterResStatus GetResNodeStat(uint32 nodeId, int logLevel) { uint32 ind = FindNodeReportResInterByNodeId(nodeId); if (ind < g_node_num) { if (IsReportTimeout(g_resNodeStat[ind].reportInter)) { - write_runlog(LOG, "recv node(%u) agent report res status msg timeout.\n", nodeId); + write_runlog(logLevel, "recv node(%u) agent report res status msg timeout.\n", nodeId); return MAX_CLUSTER_STATUS_UNAVAIL; } if (g_resNodeStat[ind].isAvail != MAX_CLUSTER_STATUS_AVAIL) { - write_runlog(LOG, "res node(%u) stat is (%s).\n", nodeId, GetClusterResStatStr(g_resNodeStat[ind].isAvail)); + write_runlog(logLevel, "node(%u) stat (%s).\n", nodeId, GetClusterResStatStr(g_resNodeStat[ind].isAvail)); } return g_resNodeStat[ind].isAvail; } else { @@ -432,7 +432,6 @@ void ReleaseResLockOwner(const char *resName, uint32 instId) return; } - bool isSuccess = true; for (uint32 i = 0; i < kvCount; ++i) { if (kvs[i].key[0] == '\0' || kvs[i].value[0] == '\0') { break; @@ -441,18 +440,11 @@ void ReleaseResLockOwner(const char *resName, uint32 instId) continue; } if (DelKeyInDdb(kvs[i].key, (uint32)strlen(kvs[i].key)) != CM_SUCCESS) { - write_runlog(ERROR, "[CLIENT] ddb del failed. key=%s, value=%s.\n", kvs[i].key, kvs[i].value); - isSuccess = false; + write_runlog(ERROR, "[CLIENT] release lock failed. key=%s, value=%s.\n", kvs[i].key, kvs[i].value); } else { - write_runlog(LOG, "[CLIENT] ddb del success. key=%s, value=%s.\n", kvs[i].key, kvs[i].value); + write_runlog(LOG, "[CLIENT] release lock success. key=%s, value=%s.\n", kvs[i].key, kvs[i].value); } } - - if (isSuccess) { - write_runlog(LOG, "[CLIENT] res(%s) inst(%u) release all lock success.\n", resName, instId); - } else { - write_runlog(ERROR, "[CLIENT] res(%s) inst(%u) release all lock failed.\n", resName, instId); - } } static ClientError CmResLock(const CmaToCmsResLock *lockMsg) diff --git a/src/cm_server/cms_sync_dynamic_info.cpp b/src/cm_server/cms_sync_dynamic_info.cpp index de973d6..624cd43 100644 --- a/src/cm_server/cms_sync_dynamic_info.cpp +++ b/src/cm_server/cms_sync_dynamic_info.cpp @@ -25,6 +25,7 @@ #include "cms_ddb.h" #include "cms_global_params.h" #include "cms_write_dynamic_config.h" +#include "cms_common.h" static bool IsCnStatusParameterValid(const char *cnId, const char *cnStatus) { @@ -193,6 +194,7 @@ void* SyncDynamicInfoFromDdb(void* arg) if (IsDdbHealth(DDB_PRE_CONN)) { write_runlog(DEBUG1, "will sync instance info from ddb. \n"); + CmsSyncStandbyMode(); if ((cm_arbitration_mode == MINORITY_ARBITRATION || cm_server_start_mode == MINORITY_START) && g_multi_az_cluster) { write_runlog(LOG, diff --git a/src/cm_server/cms_threads.cpp b/src/cm_server/cms_threads.cpp index 375d5f5..fdd3405 100644 --- a/src/cm_server/cms_threads.cpp +++ b/src/cm_server/cms_threads.cpp @@ -194,9 +194,9 @@ int CM_CreateMonitorStopNode(void) return 0; } -int CM_CreateIOThread(void) +int CM_CreateIOThread(CM_IOThread &ioThread, uint32 id) { - errno_t rc = memset_s(&gIOThread, sizeof(CM_IOThread), 0, sizeof(CM_IOThread)); + errno_t rc = memset_s(&ioThread, sizeof(CM_IOThread), 0, sizeof(CM_IOThread)); securec_check_errno(rc, (void)rc); /* create epoll fd, MAX_EVENTS just a HINT */ @@ -206,41 +206,51 @@ int CM_CreateIOThread(void) return -1; } - gIOThread.epHandle = epollFd; - gIOThread.type = 0; - gIOThread.isBusy = false; + ioThread.epHandle = epollFd; + ioThread.id = id; + ioThread.isBusy = false; + ioThread.recvMsgQue = new PriMsgQues; + ioThread.sendMsgQue = new PriMsgQues; + InitMsgQue(*((PriMsgQues *)ioThread.sendMsgQue)); + InitMsgQue(*((PriMsgQues *)ioThread.recvMsgQue)); - if (pthread_create(&(gIOThread.tid), NULL, CM_IOThreadMain, &gIOThread) != 0) { + if (pthread_create(&(ioThread.tid), NULL, CM_IOThreadMain, &ioThread) != 0) { return -1; } + return 0; } +int CM_CreateIOThreadPool(uint32 thrCount) +{ + int err; + for (uint32 i = 0; i < thrCount; i++) { + err = CM_CreateIOThread(gIOThreads.threads[i], i); + if (err != 0) { + return err; + } + gIOThreads.count++; + } -int CM_CreateWorkThreadPool(int thrCount) + return 0; +} + +static int createWorkerThread(uint32 thrCount, int type) { CM_WorkThread* thrinfo = NULL; - int err; errno_t rc = 0; - int trueThrCount = thrCount; + int err; - /* include cm_ctl, so g_node_num need add. */ - int ctlThreadNum = GetCtlThreadNum(); - if (thrCount > ((int)g_node_num + ctlThreadNum)) { - trueThrCount = ((int)g_node_num + ctlThreadNum); - } - if (trueThrCount <= ctlThreadNum) { - trueThrCount += ctlThreadNum; - } + for (uint32 i = 0; i < thrCount; i++) { + uint32 thread_idx = gWorkThreads.count; + thrinfo = &(gWorkThreads.threads[thread_idx]); - for (int i = 0; i < trueThrCount; i++) { - thrinfo = &(gWorkThreads.threads[i]); - - rc = memset_s(&gWorkThreads.threads[i], sizeof(CM_WorkThread), 0, sizeof(CM_WorkThread)); + rc = memset_s(thrinfo, sizeof(CM_WorkThread), 0, sizeof(CM_WorkThread)); securec_check_errno(rc, (void)rc); - thrinfo->type = (i >= trueThrCount - ctlThreadNum) ? CM_CTL : CM_AGENT; + thrinfo->type = type; thrinfo->isBusy = false; + thrinfo->id = i; if ((err = pthread_create(&thrinfo->tid, NULL, CM_WorkThreadMain, thrinfo)) != 0) { write_runlog(ERROR, "Failed to create a new CM_WorkThreadMain %d: %d\n", err, errno); @@ -249,6 +259,21 @@ int CM_CreateWorkThreadPool(int thrCount) gWorkThreads.count++; } + + return 0; +} + +int CM_CreateWorkThreadPool(uint32 ctlWorkerCount, uint32 agentWorkerCount) +{ + gWorkThreads.count = 0; + if (createWorkerThread(ctlWorkerCount, CM_CTL) != 0) { + return -1; + } + + if (createWorkerThread(agentWorkerCount, CM_AGENT) != 0) { + return -1; + } + return 0; } diff --git a/src/include/cm/cm_adapter/cm_ddb_adapter.h b/src/include/cm/cm_adapter/cm_ddb_adapter.h index 3456c8f..8079854 100644 --- a/src/include/cm/cm_adapter/cm_ddb_adapter.h +++ b/src/include/cm/cm_adapter/cm_ddb_adapter.h @@ -61,7 +61,7 @@ typedef enum DDB_ROLE { DDB_ROLE_CEIL, } DDB_ROLE; -typedef enum { PROCESS_IN_RUNNING = 0, PROCESS_IN_IDLE } PROCESS_STATE; +typedef enum { PROCESS_IN_INIT = 0, PROCESS_IN_RUNNING, PROCESS_IN_IDLE } PROCESS_STATE; typedef enum { DDB_PRE_CONN = 0, DDB_HEAL_COUNT } DDB_CHECK_MOD; diff --git a/src/include/cm/cm_adapter/cm_sharedisk/cm_voting_disk.h b/src/include/cm/cm_adapter/cm_sharedisk/cm_voting_disk.h index 8f6ebe0..c32816c 100644 --- a/src/include/cm/cm_adapter/cm_sharedisk/cm_voting_disk.h +++ b/src/include/cm/cm_adapter/cm_sharedisk/cm_voting_disk.h @@ -44,8 +44,8 @@ status_t InitVotingDiskHandler(const char *scsiDev, uint32 offset); status_t InitVotingDisk(const char *votingDiskPath); status_t UpdateAllNodeHeartBeat(); void ResetVotingdiskHeartBeat(); -VotingDiskStatus GetNodeHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout); +VotingDiskStatus GetNodeHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout, int logLevel); status_t AllocVotingDiskMem(); void FreeVotingDiskMem(); -#endif \ No newline at end of file +#endif diff --git a/src/include/cm/cm_agent/cma_instance_management_res.h b/src/include/cm/cm_agent/cma_instance_management_res.h index 74064f0..262680a 100644 --- a/src/include/cm/cm_agent/cma_instance_management_res.h +++ b/src/include/cm/cm_agent/cma_instance_management_res.h @@ -28,7 +28,7 @@ #include "cm_misc.h" int SystemExecute(const char *scriptPath, const char *oper, uint32 timeout); -void StartOneResInst(const CmResConfList *conf); +status_t StartOneResInst(const CmResConfList *conf); void StopOneResInst(const CmResConfList *conf); void OneResInstShutdown(const CmResConfList *oneResConf); status_t RegOneResInst(const CmResConfList *conf, uint32 destInstId); diff --git a/src/include/cm/cm_elog.h b/src/include/cm/cm_elog.h index 3d561fb..8d5b305 100644 --- a/src/include/cm/cm_elog.h +++ b/src/include/cm/cm_elog.h @@ -49,6 +49,7 @@ typedef enum KeyEventTypeEn { KEY_EVENT_RECOVER = 16, KEY_EVENT_REFRESH_OBS_DELETE_TEXT = 17, KEY_EVENT_DROP_CN_OBS_XLOG = 18, + KEY_EVENT_RES_ARBITRATE = 19, KEY_EVENT_TYPE_CEIL, // new event types should be added before this. } KeyEventType; @@ -70,4 +71,4 @@ void write_runlog(int elevel, const char *fmt, ...) __attribute__((format(printf void write_stderr(const char *fmt, ...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 1, 2))); void WriteKeyEventLog(KeyEventType keyEventType, uint32 instanceId, const char *fmt, ...); -#endif \ No newline at end of file +#endif diff --git a/src/include/cm/cm_misc_res.h b/src/include/cm/cm_misc_res.h index 90c7805..c22b937 100644 --- a/src/include/cm/cm_misc_res.h +++ b/src/include/cm/cm_misc_res.h @@ -50,6 +50,8 @@ const uint32 CM_MAX_VIP_COUNT = 16; #define CUS_RES_CHECK_STAT_UNKNOWN 2 #define CUS_RES_CHECK_STAT_TIMEOUT 137 +#define CUS_RES_START_FAIL_DEPEND_NOT_ALIVE 6 + #define RES_INSTANCE_ID_MIN 20000 #define RES_INSTANCE_ID_MAX 30000 diff --git a/src/include/cm/cm_persist/share_disk_lock.h b/src/include/cm/cm_persist/share_disk_lock.h index bea2206..18c64f6 100644 --- a/src/include/cm/cm_persist/share_disk_lock.h +++ b/src/include/cm/cm_persist/share_disk_lock.h @@ -102,9 +102,8 @@ status_t CmAllocDlock(dlock_t *lock, uint64 lockAddr, int64 instId); void CmInitDlock(dlock_t *lock, uint64 lockAddr, int64 instId); // init lockw header and body void CmInitDlockHeader(dlock_t *lock, uint64 lockAddr, int64 instId); // init lockw and lockr header status_t CmDiskLockS(dlock_t *lock, const char *scsiDev, int32 fd); -status_t CmDiskLockfS(dlock_t *lock, const char *scsiDev); int32 CmDiskLock(dlock_t *lock, int32 fd); -status_t CmDiskLockf(dlock_t *lock, int32 fd); +status_t CmDiskLockf(dlock_t *lock, int32 fd, int64 lockTime); status_t CmGetDlockInfo(dlock_t *lock, int32 fd); #endif diff --git a/src/include/cm/cm_persist/share_disk_lock_api.h b/src/include/cm/cm_persist/share_disk_lock_api.h index 32fed90..29f600c 100644 --- a/src/include/cm/cm_persist/share_disk_lock_api.h +++ b/src/include/cm/cm_persist/share_disk_lock_api.h @@ -46,5 +46,4 @@ typedef struct _DISK_LRW_HANDLER { } diskLrwHandler; status_t InitDiskLockHandle(diskLrwHandler *sdLrwHandler, const char *scsi_dev, uint32 offset, int64 instId); -status_t ShareDiskGetDlock(diskLrwHandler *handler); #endif diff --git a/src/include/cm/cm_server/cm_server.h b/src/include/cm/cm_server/cm_server.h index d88db4d..ab26695 100644 --- a/src/include/cm/cm_server/cm_server.h +++ b/src/include/cm/cm_server/cm_server.h @@ -83,19 +83,14 @@ typedef struct CM_Connection_t { gss_buffer_desc gss_outbuf; /* GSS output token */ #endif // KRB5 NotifyCn_t notifyCn; - uint32 connSeq; + uint64 connSeq; } CM_Connection; -typedef struct CM_Connections_t { - uint32 count; - uint32 max_node_id; - CM_Connection* connections[CM_MAX_CONNECTIONS + MAXLISTEN]; - pthread_rwlock_t lock; -} CM_Connections; - typedef struct CM_WorkThread_t { pthread_t tid; + uint32 id; int type; + uint32 procMsgCount; ConnID ProcConnID; // which connection is processing now; volatile bool isBusy; } CM_WorkThread; @@ -124,12 +119,25 @@ typedef struct CM_MonitorNodeStopThread_t { typedef struct { pthread_t tid; - int type; uint32 id; int epHandle; + int wakefd; volatile bool isBusy; + volatile int gotConnClose; + void* recvMsgQue; + void* sendMsgQue; + uint32 pushRecvQueWaitTime; + uint32 getSendQueWaitTime; + uint32 recvMsgCount; + uint32 sendMsgCount; + uint32 innerProcCount; } CM_IOThread; +typedef struct CM_IOThreads_t { + uint32 count; + CM_IOThread threads[CM_MAX_THREADS]; +} CM_IOThreads; + typedef enum CM_ThreadStatus_e { CM_THREAD_STARTING, CM_THREAD_RUNNING, @@ -222,7 +230,6 @@ typedef struct CM_ConnDdbInfo_t { #define INSTANCE_ARBITRATE_DELAY_NO_SET 0 #define INSTANCE_ARBITRATE_DELAY_HAVE_SET 1 -#define CM_IO_THREAD_COUNT 1 constexpr int NO_NEED_TO_SET_PARAM = -1; typedef struct DatanodeDynamicStatusT { @@ -315,7 +322,6 @@ extern volatile sig_atomic_t got_stop; extern volatile sig_atomic_t g_gotParameterReload; extern volatile sig_atomic_t g_SetReplaceCnStatus; extern volatile sig_atomic_t ha_connection_closed; -extern volatile sig_atomic_t got_conns_close[CM_IO_THREAD_COUNT]; extern char g_replaceCnStatusFile[MAX_PATH_LEN]; void ProcessStartupPacket(int epollFd, void* arg); diff --git a/src/include/cm/cm_server/cms_common.h b/src/include/cm/cm_server/cms_common.h index a0aefa5..445f060 100644 --- a/src/include/cm/cm_server/cms_common.h +++ b/src/include/cm/cm_server/cms_common.h @@ -56,6 +56,7 @@ void GetDelayArbitTimeFromConf(); void GetBackupOpenConfig(); void GetDelayArbitClusterTimeFromConf(); void GetDnArbitrateMode(); +void CmsSyncStandbyMode(); bool EnableShareDisk(); diff --git a/src/include/cm/cm_server/cms_conn.h b/src/include/cm/cm_server/cms_conn.h index f1ff972..81a651c 100644 --- a/src/include/cm/cm_server/cms_conn.h +++ b/src/include/cm/cm_server/cms_conn.h @@ -34,7 +34,7 @@ #endif // KRB5 #define CM_SERVER_PACKET_ERROR_MSG 128 -#define MSG_COUNT_FOR_LOG 300 +#define MSG_TIME_FOR_LOG 5 #include "cms_msg_que.h" enum IOProcMethond { @@ -71,7 +71,7 @@ void EventDel(int epollFd, CM_Connection* con); void CMPerformAuthentication(CM_Connection* con); int ReadCommand(CM_Connection *con, const char *str); int get_authentication_type(const char* config_file); -int RespondMsg(const MsgRecvInfo* recvMsg, char msgtype, const char *s, size_t len, int log_level = LOG); +int RespondMsg(MsgRecvInfo* recvMsg, char msgtype, const char *s, size_t len, int log_level = LOG); int SendToAgentMsg(uint agentNodeId, char msgtype, const char *s, size_t len, int log_level = LOG); int BroadcastMsg(char msgtype, const char *s, size_t len, int log_level = LOG); void AsyncProcMsg(const MsgRecvInfo* recvMsg, IOProcMethond procMethod, const char *s, uint32 len); diff --git a/src/include/cm/cm_server/cms_global_params.h b/src/include/cm/cm_server/cms_global_params.h index 8ec6be5..ad8fbbd 100644 --- a/src/include/cm/cm_server/cms_global_params.h +++ b/src/include/cm/cm_server/cms_global_params.h @@ -289,7 +289,7 @@ extern pthread_rwlock_t g_sendQueueRwlock; extern pthread_rwlock_t g_recvQueueRwlock; extern synchronous_standby_mode current_cluster_az_status; extern CM_WorkThreads gWorkThreads; -extern CM_IOThread gIOThread; +extern CM_IOThreads gIOThreads; extern CM_HAThreads gHAThreads; extern CM_MonitorThread gMonitorThread; extern CM_MonitorNodeStopThread gMonitorNodeStopThread; @@ -411,6 +411,7 @@ extern bool g_getHistoryDnStatusFromDdb; extern bool g_getHistoryCnStatusFromDdb; extern volatile uint32 g_refreshDynamicCfgNum; extern bool g_needIncTermToDdbAgain; +extern volatile bool g_needReloadSyncStandbyMode; extern bool g_instance_status_for_cm_server_pending[CM_PRIMARY_STANDBY_NUM]; extern bool g_clusterStarting; /* thread count of thread pool */ diff --git a/src/include/cm/cm_server/cms_msg_que.h b/src/include/cm/cm_server/cms_msg_que.h index 0336f2c..0b72beb 100644 --- a/src/include/cm/cm_server/cms_msg_que.h +++ b/src/include/cm/cm_server/cms_msg_que.h @@ -24,54 +24,84 @@ #ifndef CMS_MSG_QUE_H #define CMS_MSG_QUE_H +#include +#include #include "c.h" #include "stringinfo.h" +#include "cm_util.h" -enum MsgPriority { - MsgPriHigh = 0, - MsgPriNormal, - MsgPriLow, - MSG_PRI_COUNT +enum MsgSourceType { + MsgSrcAgent = 0, + MsgSrcCtl, + MSG_SRC_COUNT }; struct ConnID { int32 remoteType; // CM_AGENT,CM_CTL - uint32 connSeq; + uint64 connSeq; uint32 agentNodeId; + uint64 t1; + uint64 t2; + uint64 t3; + uint64 t4; + uint64 t5; + uint64 t6; + uint64 t7; + uint64 t8; + uint64 t9; + uint64 t10; }; struct MsgSendInfo { - ConnID connID; + ConnID connID; int32 log_level; uint64 procTime; uint8 msgProcFlag; char msgType; char procMethod; - char reserved; // for alignment + char reserved; // for alignment uint32 dataSize; uint64 data[0]; }; struct MsgRecvInfo { - ConnID connID; + ConnID connID; uint8 msgProcFlag; uint8 reserved1; uint8 reserved2; uint8 reserved3; - CM_StringInfoData msg; + CM_StringInfoData msg; uint64 data[0]; }; +using MsgQueType = std::deque; +using MsgQuePtr = MsgQueType*; +using ConstMsgQuePtr = const MsgQueType*; -typedef void (*wakeSenderFuncType)(void); +struct MsgQue { + MsgQueType que; + CMFairMutex fairLock; +}; + +struct PriMsgQues { + MsgQue ques[MSG_SRC_COUNT]; + pthread_mutex_t msgLock; + pthread_cond_t msgCond; +}; + +typedef void (*wakeSenderFuncType)(const ConnID connID); typedef bool (*CanProcThisMsgFunType)(void *threadInfo, const char *msgData); +void InitMsgQue(PriMsgQues &que); +size_t getMsgCount(PriMsgQues *priQue); -void pushRecvMsg(const MsgRecvInfo* msg, MsgPriority pri); -const MsgRecvInfo *getRecvMsg(MsgPriority pri, uint32 waitTime, void *threadInfo); -void pushSendMsg(const MsgSendInfo *msg, MsgPriority pri); -const MsgSendInfo *getSendMsg(); -size_t getSendMsgCount(); -bool existSendMsg(); +void pushRecvMsg(PriMsgQues* priQue, MsgRecvInfo* msg, MsgSourceType src); +const MsgRecvInfo *getRecvMsg(PriMsgQues *priQue, MsgSourceType src, uint32 waitTime, void *threadInfo); +bool existRecvMsg(); + +void pushSendMsg(PriMsgQues *priQue, MsgSendInfo *msg, MsgSourceType src); +const MsgSendInfo *getSendMsg(PriMsgQues *priQue, MsgSourceType src); + +bool existSendMsg(const PriMsgQues *priQue); void setWakeSenderFunc(wakeSenderFuncType func); void SetCanProcThisMsgFun(CanProcThisMsgFunType func); diff --git a/src/include/cm/cm_server/cms_process_messages.h b/src/include/cm/cm_server/cms_process_messages.h index 0a1e59b..d6395fc 100644 --- a/src/include/cm/cm_server/cms_process_messages.h +++ b/src/include/cm/cm_server/cms_process_messages.h @@ -126,16 +126,7 @@ uint32 GetResStatReportInter(uint32 nodeId); int GetCurAz(); uint32 GetPrimaryDnIndex(void); status_t InitNodeReportResStatInter(); -MaxClusterResStatus GetResNodeStat(uint32 nodeId); - -inline bool CheckEnableFlag() -{ - if (IsBoolCmParamTrue(g_enableDcf)) { - write_runlog(ERROR, "switchover is not support on dcf mode.\n"); - return true; - } - return false; -} +MaxClusterResStatus GetResNodeStat(uint32 nodeId, int logLevel); void ProcessCtlToCmReloadMsg(MsgRecvInfo* recvMsgInfo); void ProcessCtlToCmExecDccCmdMsg(MsgRecvInfo* recvMsgInfo, ExecDdbCmdMsg *msg); diff --git a/src/include/cm/cm_server/cms_threads.h b/src/include/cm/cm_server/cms_threads.h index adf081d..52bd893 100644 --- a/src/include/cm/cm_server/cms_threads.h +++ b/src/include/cm/cm_server/cms_threads.h @@ -30,8 +30,8 @@ int CM_CreateHA(void); int CM_CreateMonitor(void); int CM_CreateMonitorStopNode(void); -int CM_CreateWorkThreadPool(int thrCount); -int CM_CreateIOThread(); +int CM_CreateWorkThreadPool(uint32 ctlWorkerCount, uint32 agentWorkerCount); +int CM_CreateIOThreadPool(uint32 thrCount); void CreateDnGroupStatusCheckAndArbitrateThread(void); void CreateDealGlobalBarrierThread(void); diff --git a/src/include/cm/cm_util.h b/src/include/cm/cm_util.h index a875f9e..cea1cd4 100644 --- a/src/include/cm/cm_util.h +++ b/src/include/cm/cm_util.h @@ -28,27 +28,31 @@ #include #include "c.h" +const int CM_NSEC_COUNT_PER_MS = 1000000; +const int CM_MS_COUNT_PER_SEC = 1000; + int CmMkdirP(char *path, unsigned int omode); char *gs_getenv_r(const char *name); uint64 GetMonotonicTimeMs(); -enum class CMMutexPrio { - CM_MUTEX_PRIO_NONE, - CM_MUTEX_PRIO_NORMAL, - CM_MUTEX_PRIO_HIGH, +enum class CMFairMutexType { + CM_MUTEX_NODE, + CM_MUTEX_READ, + CM_MUTEX_WRITE, }; - -using CMPrioMutex = struct CMPrioMutexSt { + +using CMFairMutex = struct CMFairMutexSt { pthread_mutex_t lock; pthread_mutex_t innerLock; pthread_cond_t cond; - uint32 highPrioCount; - CMMutexPrio curPrio; + uint32 readerCount; + uint32 writerCount; + CMFairMutexType curType; }; - -void CMPrioMutexInit(CMPrioMutex &mutex); -int CMPrioMutexLock(CMPrioMutex &mutex, CMMutexPrio prio); -void CMPrioMutexUnLock(CMPrioMutex &mutex); + +void CMFairMutexInit(CMFairMutex &mutex); +int CMFairMutexLock(CMFairMutex &mutex, CMFairMutexType type); +void CMFairMutexUnLock(CMFairMutex &mutex); char *GetDynamicMem(char *dynamicPtr, size_t *curSize, size_t memSize); #endif // CM_UTIL_H