!76 CM适配共享存储1130patch

Merge pull request !76 from yewk/master_01
This commit is contained in:
opengauss-bot 2022-12-09 01:27:01 +00:00 committed by Gitee
commit 91ef6e7c26
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
44 changed files with 1211 additions and 716 deletions

View File

@ -78,7 +78,7 @@ function update_dcc_dependency() {
fi
if [ "x${THIRD_BIN_PATH}" != "x" ]; then
local dccHome="${THIRD_BIN_PATH}/kernel/component/dcc"
local dcc_home="${THIRD_BIN_PATH}/kernel/component/dcc"
if [ -d "${dcc_home}" ]; then
echo "We well get dcc lib from 3rd[${dcc_home}]."

View File

@ -20,6 +20,7 @@
*
* -------------------------------------------------------------------------
*/
#include <limits.h>
#include "cm/cm_c.h"
#include "cm/cm_elog.h"
#include "cm_disk_rw.h"
@ -33,8 +34,24 @@ static DDB_ROLE g_notifySd = DDB_ROLE_UNKNOWN;
static DDB_ROLE g_dbRole = DDB_ROLE_FOLLOWER;
static uint32 g_cmServerNum = 0;
static int64 g_waitForTime = 0;
static volatile int64 g_waitForChangeTime = 0;
static volatile int64 g_notifyBeginSec = 0;
const uint32 ONE_PRIMARY_ONE_STANDBY = 2;
static DdbArbiCon *g_arbiCon = NULL;
static const time_t MAX_VALID_LOCK_TIME = 125;
static const time_t BASE_VALID_LOCK_TIME = 1;
static const uint32 DEFAULT_CMD_TIME_OUT = 2;
typedef enum en_persist_cmd_type {
CMD_LOCK = 0,
CMD_FORCE_LOCK = 1,
} PERSIST_CMD_TYPE;
typedef struct SdArbitrateDataSt {
uint32 lockNotRefreshTimes;
time_t lockFailBeginTime;
time_t lockTime;
DDB_ROLE lastDdbRole;
} SdArbitrateData;
static status_t SdLoadApi(const DrvApiInfo *apiInfo);
@ -184,14 +201,15 @@ static void DrvNotifySd(DDB_ROLE dbRole)
return;
}
if (g_dbRole != dbRole && g_cmServerNum > ONE_PRIMARY_ONE_STANDBY) {
if (dbRole == DDB_ROLE_FOLLOWER) {
(void)pthread_rwlock_wrlock(&g_notifySdLock);
g_waitForChangeTime = g_waitForTime;
g_notifySd = DDB_ROLE_FOLLOWER;
ddbNotiStatusFun(DDB_ROLE_FOLLOWER);
(void)pthread_rwlock_unlock(&g_notifySdLock);
}
if (g_dbRole != dbRole) {
struct timespec checkBegin = {0, 0};
(void)clock_gettime(CLOCK_MONOTONIC, &checkBegin);
(void)pthread_rwlock_wrlock(&g_notifySdLock);
g_notifyBeginSec = checkBegin.tv_sec;
g_notifySd = dbRole;
ddbNotiStatusFun(dbRole);
(void)pthread_rwlock_unlock(&g_notifySdLock);
write_runlog(LOG, "receive notify msg, it has set ddb role, dbRole is [%d: %d], g_waitForTime is %ld, "
"g_cmServerNum is %u.\n", (int32)dbRole, (int32)g_dbRole, g_waitForTime, g_cmServerNum);
}
@ -248,47 +266,287 @@ static void NotifyDdbRole(DDB_ROLE *lastDdbRole)
*lastDdbRole = g_dbRole;
}
static uint32 GetForceLockTimeOutCfg()
{
uint32 curForceLockTimeOut = g_arbiCon->arbiCfg->haHeartBeatTimeOut;
if (curForceLockTimeOut < DEFAULT_CMD_TIME_OUT) {
curForceLockTimeOut = DEFAULT_CMD_TIME_OUT;
}
return curForceLockTimeOut;
}
int ExecuteLockCmd(const char *cmd)
{
int ret = system(cmd);
if (ret == -1) {
write_runlog(ERROR, "Fail to execute command %s, and errno=%d.\n", cmd, errno);
return -1;
}
write_runlog(DEBUG1, "execute command %s exit code %d.\n", cmd, ret);
if (WIFEXITED(ret)) {
return WEXITSTATUS(ret);
}
write_runlog(ERROR, "Fail to execute command %s, script exit code %d.\n", cmd, ret);
return -1;
}
static bool CheckDemoteDdbRole(SdArbitrateData *sdArbitrateData)
{
if (sdArbitrateData->lastDdbRole != DDB_ROLE_LEADER) {
sdArbitrateData->lockFailBeginTime = 0;
return true;
}
uint32 forceLockTimeOut = GetForceLockTimeOutCfg();
uint32 exeCmdTwiceTimeOut = DEFAULT_CMD_TIME_OUT + DEFAULT_CMD_TIME_OUT;
if (exeCmdTwiceTimeOut >= forceLockTimeOut) {
sdArbitrateData->lockFailBeginTime = 0;
return true;
}
// try to avoid execute cmd timeout, so use diff time to demote cms primary
uint32 diffTimeOut = forceLockTimeOut - exeCmdTwiceTimeOut;
struct timespec time = {0, 0};
(void)clock_gettime(CLOCK_MONOTONIC, &time);
time_t minusTime = time.tv_sec - sdArbitrateData->lockFailBeginTime;
if (minusTime >= diffTimeOut) {
write_runlog(LOG,
"CheckDemoteDdbRole: CMS primary will demote for current time %ld, lockFailBeginTime %ld, diffTimeOut "
"%u.\n",
time.tv_sec,
sdArbitrateData->lockFailBeginTime,
diffTimeOut);
return true;
}
write_runlog(LOG,
"CheckDemoteDdbRole: CMS primary demote will wait %u seconds for current time %ld, lockFailBeginTime %ld, "
"diffTimeOut %u.\n",
(uint32)(diffTimeOut - minusTime),
time.tv_sec,
sdArbitrateData->lockFailBeginTime,
diffTimeOut);
return false;
}
static void CmNormalArbitrate(const char *lockCmd, SdArbitrateData *sdArbitrateData)
{
int lockRst = ExecuteLockCmd(lockCmd);
write_runlog(DEBUG1,
"CmNormalArbitrate: execute lock cmd %s result %d. lockTime %ld, lockNotRefreshTimes %u, lockFailBeginTime "
"%ld\n", lockCmd, lockRst, sdArbitrateData->lockTime, sdArbitrateData->lockNotRefreshTimes,
sdArbitrateData->lockFailBeginTime);
if (lockRst == 0) {
// get lock success, notify cmserver to primary
sdArbitrateData->lockTime = 0;
sdArbitrateData->lockNotRefreshTimes = 0;
sdArbitrateData->lockFailBeginTime = 0;
g_dbRole = DDB_ROLE_LEADER;
NotifyDdbRole(&sdArbitrateData->lastDdbRole);
return;
}
if (lockRst >= BASE_VALID_LOCK_TIME && lockRst <= MAX_VALID_LOCK_TIME) {
g_dbRole = DDB_ROLE_FOLLOWER;
sdArbitrateData->lockFailBeginTime = 0;
// get lock failed, check lock time if refreshed by other process
if (sdArbitrateData->lockTime != lockRst) {
sdArbitrateData->lockTime = lockRst;
sdArbitrateData->lockNotRefreshTimes = 0;
} else {
const uint32 defaultNotRefreshTimes = 2;
uint32 curForceLockTimeOut = GetForceLockTimeOutCfg();
int logLevel = sdArbitrateData->lockNotRefreshTimes >= defaultNotRefreshTimes ? LOG : DEBUG1;
write_runlog(logLevel,
"CmNormalArbitrate: other cmserver maybe lost lock for %u times, current lock time %ld\n",
sdArbitrateData->lockNotRefreshTimes, sdArbitrateData->lockTime);
if (sdArbitrateData->lockNotRefreshTimes < curForceLockTimeOut) {
++sdArbitrateData->lockNotRefreshTimes;
} else {
sdArbitrateData->lockNotRefreshTimes = 0;
char forceLockCmd[MAX_PATH_LEN] = {0};
errno_t rc = snprintf_s(forceLockCmd, MAX_PATH_LEN, MAX_PATH_LEN - 1,
"timeout -s SIGKILL %us cm_persist %s %ld %lu %d %ld > /dev/null 2>&1",
DEFAULT_CMD_TIME_OUT, g_cmsArbitrateDiskHandler.scsiDev, g_cmsArbitrateDiskHandler.instId,
g_cmsArbitrateDiskHandler.offset, (int)CMD_FORCE_LOCK, sdArbitrateData->lockTime);
securec_check_intval(rc, (void)rc);
lockRst = ExecuteLockCmd(forceLockCmd);
g_dbRole = ((lockRst == 0) ? DDB_ROLE_LEADER : DDB_ROLE_FOLLOWER);
write_runlog(LOG, "CmNormalArbitrate: exe force lock cmd %s result %d, curForceLockTime %u.\n",
forceLockCmd, lockRst, curForceLockTimeOut);
}
}
NotifyDdbRole(&sdArbitrateData->lastDdbRole);
return;
}
// get lock time failed
sdArbitrateData->lockTime = 0;
sdArbitrateData->lockNotRefreshTimes = 0;
g_dbRole = CheckDemoteDdbRole(sdArbitrateData) ? DDB_ROLE_FOLLOWER : DDB_ROLE_LEADER;
NotifyDdbRole(&sdArbitrateData->lastDdbRole);
}
static bool CheckResetTime()
{
struct timespec checkEnd = {0, 0};
(void)clock_gettime(CLOCK_MONOTONIC, &checkEnd);
int64 diffSeconds = checkEnd.tv_sec - g_notifyBeginSec;
if (diffSeconds <= g_waitForTime) {
write_runlog(DEBUG1,
"CheckResetTime: current time %ld, g_notifyBeginSec %ld, g_waitTime %ld, cannot reset g_notifySd.\n",
(long int)checkEnd.tv_sec, g_notifyBeginSec, g_waitForTime);
return false;
}
return true;
}
static bool CheckSdDemote(SdArbitrateData *sdArbitrateData)
{
if (g_notifySd != DDB_ROLE_FOLLOWER) {
return false;
}
if (g_dbRole == DDB_ROLE_LEADER) {
g_dbRole = DDB_ROLE_FOLLOWER;
NotifyDdbRole(&sdArbitrateData->lastDdbRole);
return true;
}
if (!CheckResetTime()) {
return true;
}
write_runlog(LOG,
"CheckSdDemote: will reset g_notifySd from %u to %u after wait %ld time.\n",
(uint32)g_notifySd,
(uint32)DDB_ROLE_UNKNOWN,
g_waitForTime);
g_notifySd = DDB_ROLE_UNKNOWN;
return false;
}
static status_t ExePromoteCmd(const char *lockCmd, SdArbitrateData *sdArbitrateData)
{
int st = ExecuteLockCmd(lockCmd);
if (st != 0) {
write_runlog(WARNING, "ExePromoteCmd: Execute get lock cmd %s failed, lockResult %d!\n", lockCmd, st);
if (st >= BASE_VALID_LOCK_TIME && st <= MAX_VALID_LOCK_TIME) {
char forceLockCmd[MAX_PATH_LEN] = {0};
errno_t rc = snprintf_s(forceLockCmd,
MAX_PATH_LEN,
MAX_PATH_LEN - 1,
"timeout -s SIGKILL %us cm_persist %s %ld %lu %d %ld > /dev/null 2>&1",
DEFAULT_CMD_TIME_OUT,
g_cmsArbitrateDiskHandler.scsiDev,
g_cmsArbitrateDiskHandler.instId,
g_cmsArbitrateDiskHandler.offset,
(int)CMD_FORCE_LOCK,
st);
securec_check_intval(rc, (void)rc);
st = ExecuteLockCmd(forceLockCmd);
if (st != 0) {
write_runlog(WARNING, "ExePromoteCmd: Execute force lock cmd %s failed, result %d!\n", lockCmd, st);
return CM_ERROR;
}
return CM_SUCCESS;
}
return CM_ERROR;
}
sdArbitrateData->lockTime = 0;
sdArbitrateData->lockNotRefreshTimes = 0;
sdArbitrateData->lockFailBeginTime = 0;
write_runlog(DEBUG1, "ExePromoteCmd: Execute get lock cmd %s success!\n", lockCmd);
return CM_SUCCESS;
}
static bool CheckSdPromote(const char *lockCmd, SdArbitrateData *sdArbitrateData)
{
if (g_notifySd != DDB_ROLE_LEADER) {
return false;
}
if (g_dbRole != DDB_ROLE_LEADER) {
(void)ExePromoteCmd(lockCmd, sdArbitrateData);
g_dbRole = DDB_ROLE_LEADER;
NotifyDdbRole(&sdArbitrateData->lastDdbRole);
return true;
}
(void)ExePromoteCmd(lockCmd, sdArbitrateData);
if (!CheckResetTime()) {
return true;
}
write_runlog(LOG,
"CheckSdPromote: will reset g_notifySd from %u to %u after wait %ld seconds.\n",
(uint32)g_notifySd,
(uint32)DDB_ROLE_UNKNOWN,
g_waitForTime);
g_notifySd = DDB_ROLE_UNKNOWN;
return false;
}
static bool HaveNotifySd(const char *lockCmd, SdArbitrateData *sdArbitrateData)
{
bool res = false;
(void)pthread_rwlock_wrlock(&g_notifySdLock);
if (g_notifySd == DDB_ROLE_FOLLOWER) {
res = CheckSdDemote(sdArbitrateData);
} else if (g_notifySd == DDB_ROLE_LEADER) {
res = CheckSdPromote(lockCmd, sdArbitrateData);
}
(void)pthread_rwlock_unlock(&g_notifySdLock);
return res;
}
static void *GetShareDiskLockMain(void *arg)
{
thread_name = "GetShareDiskLockMain";
write_runlog(LOG, "Starting get share disk lock thread.\n");
char cmd[MAX_PATH_LEN] = {0};
errno_t rc = snprintf_s(cmd,
char lockCmd[MAX_PATH_LEN] = {0};
errno_t rc = snprintf_s(lockCmd,
MAX_PATH_LEN,
MAX_PATH_LEN - 1,
"cm_persist %s %ld %lu > /dev/null 2>&1",
"timeout -s SIGKILL %us cm_persist %s %ld %lu %d > /dev/null 2>&1",
DEFAULT_CMD_TIME_OUT,
g_cmsArbitrateDiskHandler.scsiDev,
g_cmsArbitrateDiskHandler.instId,
g_cmsArbitrateDiskHandler.offset);
g_cmsArbitrateDiskHandler.offset,
(int)CMD_LOCK);
securec_check_intval(rc, (void)rc);
int st;
DDB_ROLE lastDdbRole = DDB_ROLE_UNKNOWN;
SdArbitrateData sdArbitrateData;
rc = memset_s(&sdArbitrateData, sizeof(SdArbitrateData), 0, sizeof(SdArbitrateData));
securec_check_errno(rc, (void)rc);
struct timespec checkBegin = {0, 0};
struct timespec checkEnd = {0, 0};
uint32 twoSec = 2;
for (;;) {
if (g_cmServerNum > ONE_PRIMARY_ONE_STANDBY) {
(void)pthread_rwlock_wrlock(&g_notifySdLock);
if (g_notifySd == DDB_ROLE_FOLLOWER && g_waitForChangeTime > 0) {
g_dbRole = DDB_ROLE_FOLLOWER;
NotifyDdbRole(&lastDdbRole);
g_waitForChangeTime--;
(void)pthread_rwlock_unlock(&g_notifySdLock);
write_runlog(DEBUG1,
"GetShareDiskLockMain: current ddbRole is %d, g_waitForChangeTime is %ld.\n",
(int)g_dbRole,
g_waitForChangeTime);
(void)sleep(1);
continue;
(void)clock_gettime(CLOCK_MONOTONIC, &checkBegin);
if (!HaveNotifySd(lockCmd, &sdArbitrateData)) {
if (sdArbitrateData.lockFailBeginTime == 0) {
sdArbitrateData.lockFailBeginTime = checkBegin.tv_sec;
}
g_notifySd = DDB_ROLE_UNKNOWN;
(void)pthread_rwlock_unlock(&g_notifySdLock);
CmNormalArbitrate(lockCmd, &sdArbitrateData);
}
(void)clock_gettime(CLOCK_MONOTONIC, &checkEnd);
uint32 second = (uint32)(checkEnd.tv_sec - checkBegin.tv_sec);
int64 nanosecond = checkEnd.tv_nsec - checkBegin.tv_nsec;
if (second > twoSec) {
write_runlog(
LOG, "it takes %u seconds %ld nanoseconds to cmserver share disk arbitrate.\n", second, nanosecond);
} else {
(void)sleep(1);
}
st = system(cmd);
g_dbRole = (st == 0) ? DDB_ROLE_LEADER : DDB_ROLE_FOLLOWER;
NotifyDdbRole(&lastDdbRole);
(void)sleep(1);
}
return NULL;
}
@ -310,6 +568,7 @@ static status_t CreateShareDiskThread(const DrvApiInfo *apiInfo)
write_runlog(ERROR, "Failed to start get share disk lock thread.\n");
return CM_ERROR;
}
g_arbiCon = apiInfo->cmsArbiCon;
pthread_t thrId;
int32 res = pthread_create(&thrId, NULL, GetShareDiskLockMain, NULL);
if (res != 0) {
@ -350,8 +609,8 @@ static status_t SdLoadApi(const DrvApiInfo *apiInfo)
if (st != CM_SUCCESS) {
return st;
}
st = CreateShareDiskThread(apiInfo);
return st;
return CreateShareDiskThread(apiInfo);
}
DdbDriver *DrvSdGet(void)

View File

@ -189,10 +189,10 @@ status_t InitVotingDisk(const char *votingDiskPath)
return CM_SUCCESS;
}
VotingDiskStatus GetNodeHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout)
VotingDiskStatus GetNodeHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout, int logLevel)
{
if (nodeIndex >= VOTING_DISK_MAX_NODE_NUM) {
write_runlog(ERROR, "[%s] node index %u exceeds max node number of voting disk .\n", __FUNCTION__, nodeIndex);
write_runlog(logLevel, "[%s] node index %u exceeds max node of voting disk .\n", __FUNCTION__, nodeIndex);
return VOTING_DISK_STATUS_UNAVAIL;
}
if (diskTimeout == 0) {
@ -211,7 +211,7 @@ VotingDiskStatus GetNodeHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout)
time_t curTime = time(NULL);
if (IsRhbTimeout(g_heartbeat[nodeIndex], curTime, (int)diskTimeout)) {
write_runlog(ERROR, "[%s] nodeIndex %u heartbeat timeout, diskTimeout=%u, nodeTime: %s\n",
write_runlog(logLevel, "[%s] nodeIndex %u heartbeat timeout, diskTimeout=%u, nodeTime: %s\n",
__FUNCTION__, nodeIndex, diskTimeout, timeBuf);
return VOTING_DISK_STATUS_UNAVAIL;
}

View File

@ -602,6 +602,20 @@ int CheckDatanodeStatus(const char *dataDir, int *role)
return 0;
}
static bool IsConnBadButNotPhonyDead(const char *errMsg, int conResult)
{
if (strstr(errMsg, "too many clients already")) {
write_runlog(LOG, "need to change conn pool number, conn result is %d.\n", conResult);
return true;
}
if (strstr(errMsg, "failed to request snapshot")) {
write_runlog(LOG, "failed to request snapshot, not phony dead, conn result is %d.\n", conResult);
return true;
}
return false;
}
int CheckDnStausPhonyDead(int dnId, int agentCheckTimeInterval)
{
int agentConnectDb = 5;
@ -624,8 +638,7 @@ int CheckDnStausPhonyDead(int dnId, int agentCheckTimeInterval)
if (!IsConnOk(tmpDNConn)) {
write_runlog(ERROR, "get connect failed for dn(%s) phony dead check, errmsg is %s\n",
pidPath, ErrorMessage(tmpDNConn));
if (strstr(ErrorMessage(tmpDNConn), "too many clients already")) {
write_runlog(LOG, "need to change conn pool number, conn result is %d.\n", Status(tmpDNConn));
if (IsConnBadButNotPhonyDead(ErrorMessage(tmpDNConn), Status(tmpDNConn))) {
close_and_reset_connection(tmpDNConn);
return 0;
}

View File

@ -57,7 +57,7 @@ int SystemExecute(const char *scriptPath, const char *oper, uint32 timeout)
return -1;
}
void StartOneResInst(const CmResConfList *conf)
status_t StartOneResInst(const CmResConfList *conf)
{
char oper[MAX_OPTION_LEN] = {0};
int ret = snprintf_s(oper, MAX_OPTION_LEN, MAX_OPTION_LEN - 1, "-start %u %s", conf->resInstanceId, conf->arg);
@ -66,9 +66,15 @@ void StartOneResInst(const CmResConfList *conf)
ret = SystemExecute(conf->script, oper, (uint32)conf->checkInfo.timeOut);
if (ret == 0) {
write_runlog(LOG, "StartOneResInst: run start script (%s %s) successfully.\n", conf->script, oper);
} else if (ret == CUS_RES_START_FAIL_DEPEND_NOT_ALIVE) {
write_runlog(LOG, "StartOneResInst: res(%s) inst(%u) can't do restart, cause depend resource inst not alive.\n",
conf->resName, conf->cmInstanceId);
return CM_ERROR;
} else {
write_runlog(ERROR, "StartOneResInst: run start script (%s %s) failed, ret=%d.\n", conf->script, oper, ret);
}
return CM_SUCCESS;
}
void StopOneResInst(const CmResConfList *conf)
@ -93,6 +99,13 @@ void OneResInstShutdown(const CmResConfList *oneResConf)
}
}
void OneResInstClean(const CmResConfList *oneResConf)
{
if (CheckOneResInst(oneResConf) != CUS_RES_CHECK_STAT_OFFLINE) {
(void)CleanOneResInst(oneResConf);
}
}
status_t RegOneResInst(const CmResConfList *conf, uint32 destInstId)
{
char oper[MAX_OPTION_LEN] = {0};
@ -171,10 +184,20 @@ status_t CleanOneResInst(const CmResConfList *conf)
return CM_SUCCESS;
}
static inline void CleanOneInstCheckCount(CmResConfList *resConf)
{
if (resConf->checkInfo.startCount != 0) {
write_runlog(LOG, "res(%s) inst(%u) restart times will clean.\n", resConf->resName, resConf->cmInstanceId);
}
resConf->checkInfo.startCount = 0;
resConf->checkInfo.startTime = 0;
resConf->checkInfo.brokeTime = 0;
}
void StopAllResInst()
{
for (uint32 i = 0; i < GetLocalResConfCount(); ++i) {
(void)CleanOneResInst(&g_resConf[i]);
OneResInstClean(&g_resConf[i]);
}
}
@ -214,8 +237,7 @@ static void ManualStopLocalResInst(CmResConfList *conf)
write_runlog(ERROR, "manual stop res(%s) inst(%u) failed, ret=%d.\n", conf->resName, conf->resInstanceId, ret);
} else {
write_runlog(LOG, "manual stop res(%s) inst(%u) success.\n", conf->resName, conf->resInstanceId);
conf->checkInfo.startCount = 0;
conf->checkInfo.startTime = 0;
CleanOneInstCheckCount(conf);
}
}
@ -231,29 +253,29 @@ bool IsInstManualStopped(uint32 instId)
return false;
}
static bool CanCusInstDoStart(const CmResConfList *conf)
static bool CanCusInstDoRestart(const CmResConfList *conf)
{
ResIsregStatus stat = IsregOneResInst(conf, conf->resInstanceId);
if ((stat == CM_RES_ISREG_REG) || (stat == CM_RES_ISREG_NOT_SUPPORT)) {
return true;
}
write_runlog(LOG, "cur inst(%u) isreg stat=(%u), can't do start.\n", conf->cmInstanceId, (uint32)stat);
write_runlog(LOG, "cur inst(%u) isreg stat=(%u), can't do restart.\n", conf->cmInstanceId, (uint32)stat);
return false;
}
static inline void RestartOneResInst(CmResConfList *conf)
static inline status_t RestartOneResInst(CmResConfList *conf)
{
write_runlog(LOG, "res(%s) inst(%u) need restart.\n", conf->resName, conf->cmInstanceId);
(void)CleanOneResInst(conf);
if (CanCusInstDoStart(conf)) {
StartOneResInst(conf);
}
CM_RETURN_IFERR(StartOneResInst(conf));
return CM_SUCCESS;
}
static void ProcessOfflineInstance(CmResConfList *conf)
{
if (conf->checkInfo.restartTimes == -1) {
RestartOneResInst(conf);
if (CanCusInstDoRestart(conf)) {
(void)RestartOneResInst(conf);
}
return;
}
if (conf->checkInfo.brokeTime == 0) {
@ -261,7 +283,9 @@ static void ProcessOfflineInstance(CmResConfList *conf)
return;
}
if (conf->checkInfo.startCount >= conf->checkInfo.restartTimes) {
write_runlog(LOG, "[CLIENT] res(%s) inst(%u) get out from cluster.\n", conf->resName, conf->resInstanceId);
write_runlog(LOG, "res(%s) inst(%u) is offline, but restart times (%d) >= limit (%d), can't do restart again, "
"will do manually stop.\n", conf->resName, conf->resInstanceId, conf->checkInfo.startCount,
conf->checkInfo.restartTimes);
ManualStopLocalResInst(conf);
return;
}
@ -275,18 +299,14 @@ static void ProcessOfflineInstance(CmResConfList *conf)
conf->resName, conf->resInstanceId, conf->checkInfo.startTime, conf->checkInfo.restartPeriod);
return;
}
RestartOneResInst(conf);
if (!CanCusInstDoRestart(conf)) {
return;
}
CM_RETVOID_IFERR(RestartOneResInst(conf));
conf->checkInfo.startCount++;
conf->checkInfo.startTime = time(NULL);
write_runlog(DEBUG1, "[CLIENT] res(%s) inst(%u) startCount=%d, startTime=%ld.\n",
conf->resName, conf->resInstanceId, conf->checkInfo.startCount, conf->checkInfo.startTime);
}
static inline void CleanOneInstCheckCount(CmResConfList *resConf)
{
resConf->checkInfo.startCount = 0;
resConf->checkInfo.startTime = 0;
resConf->checkInfo.brokeTime = 0;
write_runlog(LOG, "res(%s) inst(%u) has been restart (%d) times, restart more than (%d) time will manually stop.\n",
conf->resName, conf->cmInstanceId, conf->checkInfo.startCount, conf->checkInfo.restartTimes);
}
static inline bool NeedStopResInst(const char *resName, uint32 cmInstId)
@ -353,7 +373,7 @@ void StopResourceCheck()
OneResInstShutdown(&g_resConf[i]);
}
if (CmFileExist(g_cmManualStartPath) || !IsOneResInstWork(g_resConf[i].resName, g_resConf[i].cmInstanceId)) {
(void)CleanOneResInst(&g_resConf[i]);
OneResInstClean(&g_resConf[i]);
}
}
}

View File

@ -368,7 +368,7 @@ static void ProcessRegResInst(const CmsNotifyAgentRegMsg *recvMsg)
write_runlog(LOG, "local res inst[%s:%u] has been reg.\n", recvMsg->resName, recvMsg->resInstId);
} else if ((isreg == CM_RES_ISREG_UNREG) || (isreg == CM_RES_ISREG_PENDING) || (isreg == CM_RES_ISREG_UNKNOWN)) {
write_runlog(LOG, "before reg res inst, need clean res inst first.\n");
if (CleanOneResInst(local) == CM_SUCCESS) {
if ((CheckOneResInst(local) == CUS_RES_CHECK_STAT_OFFLINE) || (CleanOneResInst(local) == CM_SUCCESS)) {
(void)RegOneResInst(local, recvMsg->resInstId);
}
} else if (isreg == CM_RES_ISREG_NOT_SUPPORT) {

View File

@ -242,7 +242,7 @@ void *ConnectAgentMain(void *arg)
ConnectCreate(g_agentConnect);
if (g_agentConnect->isClosed) {
write_runlog(ERROR, "cm_client connect to cm_agent failed, retry.\n");
CmUsleep(CLIENT_CHECK_CONN_INTERVAL);
(void)usleep(CLIENT_CHECK_CONN_INTERVAL);
continue;
}
g_needReconnect = false;
@ -273,7 +273,7 @@ void *ConnectAgentMain(void *arg)
continue;
}
}
CmUsleep(CLIENT_CHECK_CONN_INTERVAL);
(void)usleep(CLIENT_CHECK_CONN_INTERVAL);
}
return NULL;
@ -325,7 +325,7 @@ void SendOneMsgToAgent()
if (CmClientSendMsg(msgPkg.msgPtr, msgPkg.msgLen) != CM_SUCCESS) {
write_runlog(ERROR, "client send msg to agent failed!\n");
g_needReconnect = true;
CmUsleep(CLIENT_CHECK_CONN_INTERVAL);
(void)usleep(CLIENT_CHECK_CONN_INTERVAL);
}
free(msgPkg.msgPtr);
}
@ -341,7 +341,7 @@ void *SendMsgToAgentMain(void *arg)
break;
}
if (g_agentConnect->isClosed) {
CmUsleep(CLIENT_SEND_CHECK_INTERVAL);
(void)usleep(CLIENT_SEND_CHECK_INTERVAL);
continue;
}
@ -489,12 +489,12 @@ void *RecvMsgFromAgentMain(void *arg)
break;
}
if (g_agentConnect->isClosed) {
CmUsleep(CLIENT_CHECK_CONN_INTERVAL);
(void)usleep(CLIENT_CHECK_CONN_INTERVAL);
continue;
}
if (RecvMsgFromAgent() != CM_SUCCESS) {
g_needReconnect = true;
CmUsleep(CLIENT_CHECK_CONN_INTERVAL);
(void)usleep(CLIENT_CHECK_CONN_INTERVAL);
}
}

View File

@ -1952,6 +1952,7 @@ void CmKeyEventInit(void)
g_cmKeyEventType[KEY_EVENT_RECOVER] = "KEY_EVENT_RECOVER";
g_cmKeyEventType[KEY_EVENT_REFRESH_OBS_DELETE_TEXT] = "KEY_EVENT_REFRESH_OBS_DELETE_TEXT";
g_cmKeyEventType[KEY_EVENT_DROP_CN_OBS_XLOG] = "KEY_EVENT_DROP_CN_OBS_XLOG";
g_cmKeyEventType[KEY_EVENT_RES_ARBITRATE] = "KEY_EVENT_RES_ARBITRATE";
}
void CreateKeyEventLogFile(const char *sysLogPath)

View File

@ -89,69 +89,65 @@ char *gs_getenv_r(const char *name)
uint64 GetMonotonicTimeMs()
{
static const uint32 CM_NSEC_COUNT_PER_MS = 1000000;
static const uint32 CM_MS_COUNT_PER_SEC = 1000;
struct timespec ts;
(void)clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64)ts.tv_sec * CM_MS_COUNT_PER_SEC + (uint64)ts.tv_nsec / CM_NSEC_COUNT_PER_MS;
}
void CMPrioMutexInit(CMPrioMutex &mutex)
void CMFairMutexInit(CMFairMutex &mutex)
{
(void)pthread_mutex_init(&mutex.lock, NULL);
(void)pthread_mutex_init(&mutex.innerLock, NULL);
(void)pthread_cond_init(&mutex.cond, NULL);
mutex.highPrioCount = 0;
mutex.curPrio = CMMutexPrio::CM_MUTEX_PRIO_NONE;
mutex.readerCount = 0;
mutex.writerCount = 0;
mutex.curType = CMFairMutexType::CM_MUTEX_NODE;
}
int CMPrioMutexLock(CMPrioMutex &mutex, CMMutexPrio prio)
int CMFairMutexLock(CMFairMutex &mutex, CMFairMutexType type)
{
if (prio == CMMutexPrio::CM_MUTEX_PRIO_HIGH) {
(void)pthread_mutex_lock(&mutex.innerLock);
mutex.highPrioCount++;
(void)pthread_mutex_unlock(&mutex.innerLock);
int ret = pthread_mutex_lock(&mutex.lock);
if (ret == 0) {
mutex.curPrio = CMMutexPrio::CM_MUTEX_PRIO_HIGH;
} else {
(void)pthread_mutex_lock(&mutex.innerLock);
mutex.highPrioCount--;
(void)pthread_mutex_unlock(&mutex.innerLock);
(void)pthread_cond_broadcast(&mutex.cond);
}
return ret;
struct timespec ts;
uint32* count1 = NULL;
uint32* count2 = NULL;
const int LOCK_WAIT_TIME = 2;
if (type == CMFairMutexType::CM_MUTEX_READ) {
count1 = &mutex.readerCount;
count2 = &mutex.writerCount;
} else {
count1 = &mutex.writerCount;
count2 = &mutex.readerCount;
}
(void)pthread_mutex_lock(&mutex.innerLock);
(*count1)++;
while (true) {
if (mutex.highPrioCount == 0) {
if (type != mutex.curType || *count2 == 0) {
int ret = pthread_mutex_trylock(&mutex.lock);
if (ret == 0) {
mutex.curPrio = CMMutexPrio::CM_MUTEX_PRIO_NORMAL;
return 0;
} else if (ret != EBUSY) {
return ret;
mutex.curType = type;
(*count1)--;
break;
}
}
(void)pthread_mutex_lock(&mutex.innerLock);
(void)pthread_cond_wait(&mutex.cond, &mutex.innerLock);
(void)pthread_mutex_unlock(&mutex.innerLock);
(void)clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += LOCK_WAIT_TIME;
(void)pthread_cond_timedwait(&mutex.cond, &mutex.innerLock, &ts);
}
(void)pthread_mutex_unlock(&mutex.innerLock);
return 0;
}
void CMPrioMutexUnLock(CMPrioMutex &mutex)
void CMFairMutexUnLock(CMFairMutex &mutex)
{
if (mutex.curPrio == CMMutexPrio::CM_MUTEX_PRIO_HIGH) {
(void)pthread_mutex_lock(&mutex.innerLock);
mutex.highPrioCount--;
(void)pthread_mutex_unlock(&mutex.innerLock);
}
mutex.curPrio = CMMutexPrio::CM_MUTEX_PRIO_NONE;
(void)pthread_mutex_lock(&mutex.innerLock);
(void)pthread_mutex_unlock(&mutex.lock);
(void)pthread_mutex_unlock(&mutex.innerLock);
(void)pthread_cond_broadcast(&mutex.cond);
}

View File

@ -82,6 +82,7 @@ void HandleRhbAck(CmRhbStatAck *ack)
(void)printf("Network stat('Y' means connected, otherwise 'N'):\n");
char *rs = GetRhbSimple((time_t *)ack->hbs, MAX_RHB_NUM, ack->hwl, ack->baseTime, ack->timeout);
CM_RETURN_IF_NULL(rs);
(void)printf("%s\n", rs);
free(rs);
}

View File

@ -460,7 +460,6 @@ static int BalanceResultReq(int &timePass, bool waitBalance, int &sendCheckCount
write_runlog(ERROR,
"switchover command timeout!\n\n"
"HINT: Maybe the switchover action is continually running in the background.\n"
"You can wait for a while and check the status of current cluster using "
"\"cm_ctl query -Cv\".\n");
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;

View File

@ -20,29 +20,59 @@
*
* -------------------------------------------------------------------------
*/
#include "share_disk_lock_api.h"
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <ctype.h>
#include "share_disk_lock_api.h"
static const int ARGS_NUM = 4;
static const int ARGS_FIVE_NUM = 5;
static const int ARGS_SIX_NUM = 6;
static const int CMDLOCKTIME_NO = 5;
static const int CMDTYPE_NO = 4;
static const int OFFSET_NO = 3;
static const int INSTANCEID_NO = 2;
static const int DEVICE_NO = 1;
static const int DECIMAL_BASE = 10;
int ExeLockCmd(diskLrwHandler *handler)
{
status_t ret = CmDiskLockS(&handler->headerLock, handler->scsiDev, handler->fd);
if (ret == CM_SUCCESS) {
return 0;
}
time_t lockTime = LOCKR_LOCK_TIME(handler->headerLock);
if (lockTime <= 0) {
return -1;
}
// system function execute lock cmd result range is [0, 127], 127 and 126 maybe system command failed result
// so get lock time valid range is [1, 125]
// 0:get lock success;-1:get lock failed and get lock time failed;[1,125]:get lock failed but get lock time success
return (int)lockTime;
}
int ExeForceLockCmd(diskLrwHandler *handler, int64 lockTime)
{
return (int)CmDiskLockf(&handler->headerLock, handler->fd, lockTime);
}
typedef enum en_persist_cmd_type {
CMD_LOCK = 0,
CMD_FORCE_LOCK = 1,
} PERSIST_CMD_TYPE;
static void usage()
{
(void)printf(_("cm_persist: get disk lock for the shared storage.\n\n"));
(void)printf(_("Usage:\n"));
(void)printf(_(" cm_persist [DEVICEPATH] [INSTANCE_ID] [OFFSET]\n"));
(void)printf(_(" cm_persist [DEVICEPATH] [INSTANCE_ID] [OFFSET] [CMD_TYPE] [LOCK_TIME]\n"));
(void)printf(_("[DEVICEPATH]: the path of the shared storage\n"));
(void)printf(_("[INSTANCE_ID]: the instanceid of the process\n"));
(void)printf(_("[OFFSET]: get disk lock on storage position\n"));
(void)printf(_("[CMD_TYPE]: cm_persist command type\n"));
(void)printf(_("[LOCK_TIME]: lock time only used when CMD_TYPE is 1\n"));
(void)printf(_("-?, --help show this help, then exit\n"));
}
@ -60,6 +90,33 @@ static status_t GetIntValue(char *input, int64 *value)
return CM_SUCCESS;
}
static int ExePersistCmd(diskLrwHandler *cmsArbitrateDiskHandler, int64 cmdType, int argc, char **argv)
{
int ret = -1;
switch (cmdType) {
case CMD_LOCK:
if (argc != ARGS_FIVE_NUM) {
break;
}
ret = ExeLockCmd(cmsArbitrateDiskHandler);
break;
case CMD_FORCE_LOCK:
if (argc != ARGS_SIX_NUM) {
break;
}
int64 lockTime;
if (GetIntValue(argv[CMDLOCKTIME_NO], &lockTime) != CM_SUCCESS) {
break;
}
ret = ExeForceLockCmd(cmsArbitrateDiskHandler, lockTime);
break;
default:
break;
}
return ret;
}
int main(int argc, char **argv)
{
if (argc > 1) {
@ -69,35 +126,39 @@ int main(int argc, char **argv)
}
}
if (argc != ARGS_NUM) {
if (argc < ARGS_FIVE_NUM) {
(void)printf(_("the num(%d) of parameters input is invalid.\n\n"), argc);
usage();
return 1;
return -1;
}
int64 instanceId;
int64 offset;
int64 cmdType;
if (GetIntValue(argv[INSTANCEID_NO], &instanceId) != CM_SUCCESS) {
return 1;
return -1;
}
if (GetIntValue(argv[OFFSET_NO], &offset) != CM_SUCCESS) {
return 1;
return -1;
}
if (GetIntValue(argv[CMDTYPE_NO], &cmdType) != CM_SUCCESS) {
return -1;
}
diskLrwHandler cmsArbitrateDiskHandler;
if (InitDiskLockHandle(&cmsArbitrateDiskHandler, argv[DEVICE_NO], (uint32)offset, instanceId) != CM_SUCCESS) {
return 1;
return -1;
}
status_t ret = ShareDiskGetDlock(&cmsArbitrateDiskHandler);
int ret = ExePersistCmd(&cmsArbitrateDiskHandler, cmdType, argc, argv);
(void)close(cmsArbitrateDiskHandler.fd);
FREE_AND_RESET(cmsArbitrateDiskHandler.headerLock.buff);
if (ret != CM_SUCCESS) {
if (ret != (int)CM_SUCCESS) {
(void)printf(_("Failed to get disk lock.\n\n"));
return 1;
return ret;
}
(void)printf(_("Success to get disk lock.\n\n"));
return 0;
return ret;
}

View File

@ -29,6 +29,16 @@
const int BLOCK_NUMS = 3;
const uint32 LOCK_BLOCK_NUMS = 2;
const time_t MAX_VALID_LOCK_TIME = 125;
const time_t BASE_VALID_LOCK_TIME = 1;
time_t CalcLockTime(time_t lockTime)
{
// system function execute lock cmd result range is [0, 127], 127 and 126 maybe system command failed result
// so get lock time valid range is [1, 125]
// 0:get lock success;-1:get lock failed and get lock time failed;[1,125]:get lock failed but get lock time success
return lockTime % MAX_VALID_LOCK_TIME + BASE_VALID_LOCK_TIME;
}
status_t CmAllocDlock(dlock_t *lock, uint64 lockAddr, int64 instId)
{
@ -141,28 +151,6 @@ status_t CmDiskLockS(dlock_t *lock, const char *scsiDev, int32 fd)
return CM_SUCCESS;
}
status_t CmDiskLockfS(dlock_t *lock, const char *scsiDev)
{
if (lock == NULL|| scsiDev == NULL) {
return CM_ERROR;
}
int32 fd = open(scsiDev, O_RDWR | O_DIRECT | O_SYNC);
if (fd < 0) {
(void)printf(_("CmDiskLockfS Open dev %s failed, errno %d.\n"), scsiDev, errno);
return CM_ERROR;
}
status_t ret = CmDiskLockf(lock, fd);
if (ret != CM_SUCCESS) {
(void)close(fd);
return ret;
}
(void)close(fd);
return CM_SUCCESS;
}
int32 CmDiskLock(dlock_t *lock, int32 fd)
{
uint32 buffLen = LOCK_BLOCK_NUMS * CM_DEF_BLOCK_SIZE;
@ -172,7 +160,7 @@ int32 CmDiskLock(dlock_t *lock, int32 fd)
}
time_t t = time(NULL);
LOCKW_LOCK_TIME(*lock) = t;
LOCKW_LOCK_TIME(*lock) = CalcLockTime(t);
LOCKW_LOCK_CREATE_TIME(*lock) = t;
int32 ret = CmScsi3Caw(fd, lock->lockAddr / CM_DEF_BLOCK_SIZE, lock->lockr, buffLen);
if (ret != (int)CM_SUCCESS) {
@ -223,7 +211,7 @@ int32 CmDiskLock(dlock_t *lock, int32 fd)
return 0;
}
status_t CmDiskLockf(dlock_t *lock, int32 fd)
status_t CmDiskLockf(dlock_t *lock, int32 fd, int64 lockTime)
{
if (lock == NULL || fd < 0) {
return CM_ERROR;
@ -233,6 +221,7 @@ status_t CmDiskLockf(dlock_t *lock, int32 fd)
(void)printf(_("Get lock info from dev failed, fd %d.\n"), fd);
return CM_ERROR;
}
LOCKR_LOCK_TIME(*lock) = lockTime;
int32 ret = CmDiskLock(lock, fd);
if (ret != (int)CM_SUCCESS) {
return CM_ERROR;
@ -256,3 +245,4 @@ status_t CmGetDlockInfo(dlock_t *lock, int32 fd)
}
return CM_SUCCESS;
}

View File

@ -30,10 +30,6 @@
#include "securec.h"
#include "share_disk_lock_api.h"
const int LOCK_WAIT_INTERVAL = 10 * 1000;
const int FORCE_LOCK_TRY_TIMES = 3;
const int LOCK_WAIT_MAX_TIMTS = 500;
status_t ShareDiskHandlerInit(diskLrwHandler *handler)
{
if (CmAllocDlock(&handler->headerLock, handler->offset, handler->instId) != CM_SUCCESS) {
@ -65,46 +61,3 @@ status_t InitDiskLockHandle(diskLrwHandler *sdLrwHandler, const char *scsi_dev,
return CM_SUCCESS;
}
status_t ShareDiskGetDlock(diskLrwHandler *handler)
{
status_t ret = CM_SUCCESS;
int32 times = 0;
int32 delayTimes = 0;
bool hasRefreshLockTime = false;
time_t lockTime = 0;
do {
ret = CmDiskLockS(&handler->headerLock, handler->scsiDev, handler->fd);
if (ret == CM_SUCCESS) {
return CM_SUCCESS;
}
if (lockTime != LOCKR_LOCK_TIME(handler->headerLock)) {
lockTime = LOCKR_LOCK_TIME(handler->headerLock);
delayTimes = 0;
if (hasRefreshLockTime) {
(void)printf(_("Get lock failed for lock time has been refreshed by other process\n"));
return CM_ERROR;
}
hasRefreshLockTime = true;
}
if (lockTime == LOCKR_LOCK_TIME(handler->headerLock)) {
if (delayTimes < LOCK_WAIT_MAX_TIMTS) {
(void)usleep(LOCK_WAIT_INTERVAL);
delayTimes++;
continue;
}
ret = CmDiskLockfS(&handler->headerLock, handler->scsiDev);
if (ret != CM_SUCCESS) {
(void)printf(_("Get lock failed when force %d times\n"), ++times);
return CM_ERROR;
}
(void)printf(_("Get lock success when force %d times.\n"), ++times);
return CM_SUCCESS;
}
(void)usleep(LOCK_WAIT_INTERVAL);
} while (1);
}

View File

@ -265,14 +265,6 @@ static void ReleaseMaxNodeMemory()
FreeDmsValue();
}
static inline char *GetDynamicMem(char *dynamicPtr, size_t *curSize, size_t memSize)
{
size_t tmpCurSize = (*curSize);
char *tmp = dynamicPtr + tmpCurSize;
(*curSize) = tmpCurSize + memSize;
return tmp;
}
static status_t AllocNodeClusterMemory(NodeCluster *nodeCluster, int32 maxNodeNum)
{
size_t memSize = sizeof(uint32) * (uint32)(maxNodeNum);
@ -337,9 +329,9 @@ static bool CheckPoint2PointConn(int32 resIdx1, int32 resIdx2)
return (connRes1 && connRes2);
}
static MaxClusterResStatus GetDiskHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout)
static MaxClusterResStatus GetDiskHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout, int logLevel)
{
VotingDiskStatus stat = GetNodeHeartbeatStat(nodeIndex, diskTimeout);
VotingDiskStatus stat = GetNodeHeartbeatStat(nodeIndex, diskTimeout, logLevel);
if (stat == VOTING_DISK_STATUS_UNAVAIL) {
return MAX_CLUSTER_STATUS_UNAVAIL;
} else if (stat == VOTING_DISK_STATUS_AVAIL) {
@ -351,9 +343,9 @@ static MaxClusterResStatus GetDiskHeartbeatStat(uint32 nodeIndex, uint32 diskTim
static bool IsAllResAvailInNode(int32 resIdx)
{
uint32 nodeIdx = g_clusterRes.map[resIdx].nodeIdx;
MaxClusterResStatus heartbeatStatus = GetDiskHeartbeatStat(nodeIdx, g_diskTimeout);
MaxClusterResStatus heartbeatStatus = GetDiskHeartbeatStat(nodeIdx, g_diskTimeout, DEBUG5);
bool heartbeatRes = IsCurResAvail(resIdx, MAX_CLUSTER_TYPE_VOTE_DISK, heartbeatStatus);
MaxClusterResStatus nodeStatus = GetResNodeStat(g_node[nodeIdx].node);
MaxClusterResStatus nodeStatus = GetResNodeStat(g_node[nodeIdx].node, DEBUG5);
bool nodeRes = IsCurResAvail(resIdx, MAX_CLUSTER_TYPE_RES_STATUS, nodeStatus);
return (heartbeatRes && nodeRes);
}
@ -429,16 +421,10 @@ static int32 FindNodeCluster(int32 startPoint, int32 maxNum, NodeCluster *nodeCl
continue;
}
if (!CheckPoint2PointConn(startPoint, i)) {
write_runlog(LOG, "(index=%d,nodeId=%u) disconnect with (index=%d,nodeId=%u).\n",
startPoint, GetNodeByPoint(startPoint), i, GetNodeByPoint(i));
PrintHbsInfo(startPoint, GetNodeByPoint(startPoint), i, GetNodeByPoint(i), LOG);
continue;
}
for (j = 0; j < maxNum; ++j) {
if (!CheckPoint2PointConn(i, nodeCluster->visNode[j])) {
write_runlog(LOG, "(index=%d,nodeId=%u) disconnect with (index=%d,nodeId=%u).\n",
startPoint, GetNodeByPoint(startPoint), i, GetNodeByPoint(i));
PrintHbsInfo(startPoint, GetNodeByPoint(startPoint), i, GetNodeByPoint(i), LOG);
break;
}
}
@ -492,7 +478,7 @@ static void PrintMaxNodeCluster(const MaxNodeCluster *maxNodeCluster, const char
for (int32 i = 0; i < maxNodeCluster->nodeCluster.clusterNum; ++i) {
StrcatNextNodeStr(clusterStr, MAX_PATH_LEN, maxNodeCluster->nodeCluster.cluster[i]);
}
write_runlog(LOG, "%s the max node cluster: %s.\n", str, clusterStr);
write_runlog(logLevel, "%s the max node cluster: %s.\n", str, clusterStr);
}
static void GetClusterKeyInDdb(char *key, uint32 keyLen)
@ -835,7 +821,7 @@ static void CopyCur2LastMaxNodeCluster(MaxNodeCluster *lastCluster, MaxNodeClust
lastCluster->version = curCluster->version;
SetCurMaxNodeByLast(curCluster, lastCluster);
(void)pthread_rwlock_unlock(&(lastCluster->lock));
PrintMaxNodeCluster(lastCluster, "[CompareCurLastMaxNodeCluster]", FATAL);
PrintMaxNodeCluster(lastCluster, "[CompareCurLastMaxNodeCluster]", DEBUG1);
}
static void AddCurResInCurCluster(int32 resIdx, NodeCluster *curCluster)
@ -889,6 +875,85 @@ static bool8 CanArbitrateMaxCluster(const NodeCluster *lastCluster, NodeCluster
return (bool8)(curCluster->clusterNum > lastCluster->clusterNum);
}
static bool IsNodeInCluster(int32 resIdx, const MaxNodeCluster *nodeCluster)
{
for (int32 i = 0; i < nodeCluster->nodeCluster.clusterNum; ++i) {
if (resIdx == nodeCluster->nodeCluster.cluster[i]) {
return true;
}
}
return false;
}
static void PrintRhbStatus()
{
uint32 hwl = 0;
time_t hbs[MAX_RHB_NUM][MAX_RHB_NUM] = {{0}};
GetRhbStat(hbs, &hwl);
char *rhbStr = GetRhbSimple((time_t *)hbs, MAX_RHB_NUM, hwl, time(NULL), g_agentNetworkTimeout);
CM_RETURN_IF_NULL(rhbStr);
size_t rhbLen = strlen(rhbStr);
if (rhbLen >= MAX_LOG_BUFF_LEN) {
write_runlog(LOG, "rhbStr len(%lu) is exceed max log buff len(%d), can't print network stat.\n",
rhbLen, MAX_LOG_BUFF_LEN);
FREE_AND_RESET(rhbStr);
return;
}
write_runlog(LOG, "Network timeout:%u\n", g_agentNetworkTimeout);
write_runlog(LOG, "Network stat('Y' means connected, otherwise 'N'):\n%s\n", rhbStr);
FREE_AND_RESET(rhbStr);
}
static void PrintKickOutResult(int32 resIdx, const MaxNodeCluster *maxCluster)
{
uint32 nodeIdx = g_clusterRes.map[resIdx].nodeIdx;
MaxClusterResStatus heartbeatStatus = GetDiskHeartbeatStat(nodeIdx, g_diskTimeout, LOG);
if (!IsCurResAvail(resIdx, MAX_CLUSTER_TYPE_VOTE_DISK, heartbeatStatus)) {
write_runlog(LOG, "kick out result: node(%u) disk heartbeat timeout.\n", g_node[nodeIdx].node);
return;
}
MaxClusterResStatus nodeStatus = GetResNodeStat(g_node[nodeIdx].node, LOG);
if (!IsCurResAvail(resIdx, MAX_CLUSTER_TYPE_RES_STATUS, nodeStatus)) {
write_runlog(LOG, "kick out result: node(%u) res inst manual stop or report timeout.\n", g_node[nodeIdx].node);
return;
}
for (int32 i = 0; i < maxCluster->nodeCluster.clusterNum; ++i) {
if (resIdx == maxCluster->nodeCluster.cluster[i]) {
continue;
}
if (!CheckPoint2PointConn(resIdx, maxCluster->nodeCluster.cluster[i])) {
write_runlog(LOG, "kick out result: (index=%d,nodeId=%u) disconnect with (index=%d,nodeId=%u).\n",
resIdx, GetNodeByPoint(resIdx), i, GetNodeByPoint(i));
PrintHbsInfo(resIdx, GetNodeByPoint(resIdx), i, GetNodeByPoint(i), LOG);
continue;
}
}
PrintRhbStatus();
}
static void PrintArbitrateResult(const MaxNodeCluster *lastCluster, const MaxNodeCluster *curCluster)
{
// kick out
for (int32 i = 0; i < lastCluster->nodeCluster.clusterNum; ++i) {
if (!IsNodeInCluster(lastCluster->nodeCluster.cluster[i], curCluster)) {
uint32 nodeIdx = g_clusterRes.map[lastCluster->nodeCluster.cluster[i]].nodeIdx;
WriteKeyEventLog(KEY_EVENT_RES_ARBITRATE, 0, "node(%u) kick out.", g_node[nodeIdx].node);
PrintKickOutResult(lastCluster->nodeCluster.cluster[i], lastCluster);
}
}
// join in
for (int32 i = 0; i < curCluster->nodeCluster.clusterNum; ++i) {
if (!IsNodeInCluster(curCluster->nodeCluster.cluster[i], lastCluster)) {
uint32 nodeIdx = g_clusterRes.map[curCluster->nodeCluster.cluster[i]].nodeIdx;
WriteKeyEventLog(KEY_EVENT_RES_ARBITRATE, 0, "node(%u) join in cluster.", g_node[nodeIdx].node);
}
}
}
static void CompareCurLastMaxNodeCluster(MaxNodeCluster *lastCluster, MaxNodeCluster *curCluster)
{
if (curCluster->nodeCluster.clusterNum <= 0) {
@ -905,6 +970,7 @@ static void CompareCurLastMaxNodeCluster(MaxNodeCluster *lastCluster, MaxNodeClu
}
write_runlog(LOG, "last(%lu) is different from current(%lu), result is %d.\n",
lastCluster->version, curCluster->version, result);
PrintArbitrateResult(lastCluster, curCluster);
// wait for successfully setting cluster to ddb.
status_t st = SetCurClusterToDdb(curCluster);
if (st != CM_SUCCESS) {

View File

@ -127,20 +127,6 @@ static void coordinator_notify_msg_reset(void)
}
}
static void IncrementTermIfCmRestart()
{
if (!IsNeedSyncDdb()) {
return;
}
(void)pthread_rwlock_wrlock(&term_update_rwlock);
int incrementTermSesult = IncrementTermToDdb();
(void)pthread_rwlock_unlock(&term_update_rwlock);
if (incrementTermSesult != 0) {
write_runlog(ERROR, "Incrtement term to ddb failed, %d:node(%u) cm_server role is %s, will to primary.\n",
__LINE__, g_currentNode->node, server_role_to_string(g_HA_status->local_role));
}
}
static void clean_cn_heart_beat(int cmServerCurrentRole, int cm_server_last_role)
{
if ((cmServerCurrentRole != CM_SERVER_PRIMARY) && (cm_server_last_role == CM_SERVER_PRIMARY)) {
@ -159,26 +145,6 @@ static void clean_cn_heart_beat(int cmServerCurrentRole, int cm_server_last_role
}
}
static void CmsSyncStandbyMode()
{
char key[MAX_PATH_LEN] = {0};
char value[MAX_PATH_LEN] = {0};
errno_t rc =
snprintf_s(key, MAX_PATH_LEN, MAX_PATH_LEN - 1, "/%s/CMServer/status_key/sync_standby_mode", pw->pw_name);
securec_check_intval(rc, (void)rc);
DDB_RESULT ddbResult = SUCCESS_GET_VALUE;
status_t st = GetKVFromDDb(key, MAX_PATH_LEN, value, MAX_PATH_LEN, &ddbResult);
if (st != CM_SUCCESS) {
int logLevel = (ddbResult == CAN_NOT_FIND_THE_KEY) ? ERROR : LOG;
write_runlog(logLevel, "failed to get value with key(%s).\n", key);
return;
}
current_cluster_az_status = (synchronous_standby_mode)strtol(value, NULL, 10);
write_runlog(LOG, "setting to %d.\n", current_cluster_az_status);
return;
}
static void CleanSwitchoverCommand()
{
write_runlog(LOG, "cms change to primary, will clean switchover command.\n");
@ -257,8 +223,6 @@ static void check_server_role_changed(int cm_server_role)
(int)cm_server_start_mode, g_node_num);
}
#endif
/* get the ddb key-value of the synchronize-standby-mode */
CmsSyncStandbyMode();
}
}
@ -420,13 +384,21 @@ static void CmsChange2Primary(int32 *cmsDemoteDelayOnConnLess)
if (g_HA_status->local_role == CM_SERVER_PRIMARY) {
return;
}
IncrementTermIfCmRestart();
if (IsNeedSyncDdb()) {
(void)pthread_rwlock_wrlock(&term_update_rwlock);
g_needIncTermToDdbAgain = true;
(void)pthread_rwlock_unlock(&term_update_rwlock);
g_needReloadSyncStandbyMode = true;
}
write_runlog(LOG, "node(%u) cms role is %s, change to primary by ddb, and g_ddbRole is %d.\n",
g_currentNode->node, server_role_to_string(g_HA_status->local_role), (int)g_ddbRole);
g_HA_status->local_role = CM_SERVER_PRIMARY;
*cmsDemoteDelayOnConnLess = cmserver_demote_delay_on_conn_less;
ClearSyncWithDdbFlag();
NotifyDdb(DDB_ROLE_LEADER);
if (g_dbType != DB_SHAREDISK) {
NotifyDdb(DDB_ROLE_LEADER);
}
}
static void PromoteCmsDirect(int32 *cmsDemoteDelayOnConnLess)

View File

@ -781,6 +781,10 @@ uint32 GetPrimaryTerm(const DnArbCtx *ctx)
static bool DnArbitrateInAsync(DnArbCtx *ctx)
{
if (g_needReloadSyncStandbyMode) {
write_runlog(LOG, "line %d: wait to reload sync standby mode ddb value.\n", __LINE__);
return true;
}
int azIndex = GetAzIndex(ctx);
if (azIndex == -1) {
return true;

View File

@ -1072,6 +1072,14 @@ void datanode_instance_arbitrate_for_psd(MsgRecvInfo* recvMsgInfo, const agent_t
return;
}
if (g_needReloadSyncStandbyMode) {
write_runlog(LOG,
"instance(node=%u instanceid=%u) arbitrate will wait to reload sync standby mode ddb value.\n",
node,
instanceId);
return;
}
GetDatanodeDynamicConfigChangeFromDdb(group_index);
(void)pthread_rwlock_wrlock(&(g_instance_group_report_status_ptr[group_index].lk_lock));

View File

@ -313,6 +313,7 @@ void get_paramter_coordinator_heartbeat_timeout()
}
#endif
bool CheckBoolConfigParam(const char* value)
{
if (strcasecmp(value, "on") == 0 || strcasecmp(value, "yes") == 0 || strcasecmp(value, "true") == 0 ||
@ -827,6 +828,31 @@ bool SetOfflineNode()
return false;
}
void CmsSyncStandbyMode()
{
if (!g_needReloadSyncStandbyMode) {
return;
}
char key[MAX_PATH_LEN] = {0};
char value[MAX_PATH_LEN] = {0};
errno_t rc =
snprintf_s(key, MAX_PATH_LEN, MAX_PATH_LEN - 1, "/%s/CMServer/status_key/sync_standby_mode", pw->pw_name);
securec_check_intval(rc, (void)rc);
DDB_RESULT ddbResult = SUCCESS_GET_VALUE;
status_t st = GetKVFromDDb(key, MAX_PATH_LEN, value, MAX_PATH_LEN, &ddbResult);
if (st != CM_SUCCESS) {
g_needReloadSyncStandbyMode = false;
int logLevel = (ddbResult == CAN_NOT_FIND_THE_KEY) ? ERROR : LOG;
write_runlog(logLevel, "failed to get value with key(%s).\n", key);
return;
}
current_cluster_az_status = (synchronous_standby_mode)strtol(value, NULL, 10);
g_needReloadSyncStandbyMode = false;
write_runlog(LOG, "setting current cluster az status to %d.\n", (int)current_cluster_az_status);
return;
}
bool EnableShareDisk()
{
return (g_dnArbitrateMode == SHARE_DISK);

View File

@ -583,9 +583,11 @@ void NotifyCmaDoReg(uint32 destNodeId)
continue;
}
ResIsregStatus isreg = GetIsregStatusByCmInstId(resInfo->resStat[j].cmInstanceId);
if (isreg == CM_RES_ISREG_UNREG || isreg == CM_RES_ISREG_PENDING || isreg == CM_RES_ISREG_INIT) {
if (isreg == CM_RES_ISREG_REG) {
UpdateIsworkList(resInfo->resStat[j].cmInstanceId, RES_INST_WORK_STATUS_AVAIL);
} else if (isreg == CM_RES_ISREG_UNREG || isreg == CM_RES_ISREG_PENDING || isreg == CM_RES_ISREG_INIT) {
SendRegMsgToCma(destNodeId, 1, resInfo->resStat[j].resInstanceId, resInfo->resName);
} else if (isreg == CM_RES_ISREG_REG || isreg == CM_RES_ISREG_NOT_SUPPORT) {
} else if (isreg == CM_RES_ISREG_NOT_SUPPORT && resInfo->resStat[j].status == (uint32)CM_RES_STAT_OFFLINE) {
UpdateIsworkList(resInfo->resStat[j].cmInstanceId, RES_INST_WORK_STATUS_AVAIL);
}
}

View File

@ -23,6 +23,7 @@
*/
#include <map>
#include <sys/eventfd.h>
#include <sys/prctl.h>
#include <sys/epoll.h>
#include "cm/cm_elog.h"
#include "cms_common.h"
@ -38,31 +39,34 @@
#include "cm_util.h"
static const int EPOLL_TIMEOUT = 1000;
static const int SEND_COUNT = 100;
static const uint32 ALL_AGENT_NODE_ID = 0xffffffff;
static const uint32 MAX_MSG_BUF_POOL_SIZE = 102400;
static const uint32 MAX_MSG_BUF_POOL_COUNT = 200;
static const uint32 MAX_MSG_IN_QUE = 100;
struct DdbPreAgentCon {
uint32 connCount;
char conFlag[DDB_MAX_CONNECTIONS];
};
using MapConns = std::map<uint32, CM_Connection*>;
using MapConns = std::map<uint64, CM_Connection*>;
struct TempConns {
MapConns tempConns;
uint32 tempConnSeq;
pthread_mutex_t lock;
};
using CM_Connections = struct CM_Connections_t {
uint32 count;
uint32 max_node_id;
CM_Connection* connections[CM_MAX_CONNECTIONS + MAXLISTEN];
pthread_rwlock_t lock;
} ;
TempConns g_tempConns;
CM_Connections gConns;
DdbPreAgentCon g_preAgentCon = {0};
uint8 g_msgProcFlag[MSG_CM_TYPE_CEIL];
static int g_wakefd = -1;
int32 InitConn()
{
MsgPoolInit(MAX_MSG_BUF_POOL_SIZE, MAX_MSG_BUF_POOL_COUNT);
@ -247,7 +251,6 @@ void AddCMAgentConnection(CM_Connection *con)
errno_t rc = memset_s(g_preAgentCon.conFlag, sizeof(g_preAgentCon.conFlag), 0, sizeof(g_preAgentCon.conFlag));
securec_check_errno(rc, (void)pthread_rwlock_unlock(&gConns.lock));
}
con->connSeq = 0;
(void)pthread_rwlock_unlock(&gConns.lock);
con->notifyCn = setNotifyCnFlagByNodeId(con->port->node_id);
}
@ -255,11 +258,9 @@ void AddCMAgentConnection(CM_Connection *con)
void AddTempConnection(CM_Connection *con)
{
(void)pthread_mutex_lock(&g_tempConns.lock);
g_tempConns.tempConnSeq++;
con->connSeq = g_tempConns.tempConnSeq;
(void)g_tempConns.tempConns.insert(make_pair(con->connSeq, con));
(void)pthread_mutex_unlock(&g_tempConns.lock);
write_runlog(DEBUG5, "AddTempConnection:connSeq=%u\n", con->connSeq);
write_runlog(DEBUG5, "AddTempConnection:connSeq=%lu.\n", con->connSeq);
}
void RemoveTempConnection(CM_Connection *con)
@ -267,7 +268,7 @@ void RemoveTempConnection(CM_Connection *con)
(void)pthread_mutex_lock(&g_tempConns.lock);
(void)g_tempConns.tempConns.erase(con->connSeq);
(void)pthread_mutex_unlock(&g_tempConns.lock);
write_runlog(DEBUG5, "RemoveTempConnection:connSeq=%u\n", con->connSeq);
write_runlog(DEBUG5, "RemoveTempConnection:connSeq=%lu.\n", con->connSeq);
}
CM_Connection* GetTempConnection(uint64 connSeq)
@ -452,20 +453,23 @@ void ConnFree(Port* conn)
free(conn);
}
static void CloseAllConnections(volatile sig_atomic_t& gotConnsClose, int epollHandle)
static void CloseAllConnections(CM_IOThread *thrinfo)
{
CM_Connection* con = NULL;
if (gotConnsClose == 1) {
if (thrinfo->gotConnClose == 1) {
/* left some time, other thread maybe use the mem of conn. */
cm_sleep(1);
bool findepollHandle = false;
(void)pthread_rwlock_wrlock(&gConns.lock);
write_runlog(LOG, "receive signal to close all the agent connections now, conn count is %u.\n", gConns.count);
for (uint32 i = 0; i < gConns.max_node_id + 1; i++) {
if (i % gIOThreads.count != thrinfo->id) {
continue;
}
con = gConns.connections[i];
if (con != NULL && epollHandle == con->epHandle) {
if (con != NULL && thrinfo->epHandle == con->epHandle) {
Assert(con->port->remote_type == CM_AGENT);
EventDel(con->epHandle, con);
@ -479,12 +483,12 @@ static void CloseAllConnections(volatile sig_atomic_t& gotConnsClose, int epollH
}
}
if (gConns.count == 0 || g_HA_status->local_role == CM_SERVER_PRIMARY) {
gotConnsClose = 0;
thrinfo->gotConnClose = 0;
write_runlog(LOG, "reset close conn flag.\n");
}
(void)pthread_rwlock_unlock(&gConns.lock);
if (!findepollHandle) {
write_runlog(LOG, "can't get epollHandle %d.\n", epollHandle);
write_runlog(LOG, "can't get epollHandle %d.\n", thrinfo->epHandle);
}
}
}
@ -538,8 +542,9 @@ void* CM_WorkThreadMain(void* argp)
CM_WorkThread* thrinfo = (CM_WorkThread*)argp;
thread_name = (thrinfo->type == CM_AGENT) ? "NORMAL WORKER" : "CTL WORKER";
MsgPriority pri = (thrinfo->type == CM_AGENT) ? MsgPriNormal : MsgPriHigh;
thread_name = (thrinfo->type == CM_AGENT) ? "AGENT_WORKER" : "CTL_WORKER";
MsgSourceType src = (thrinfo->type == CM_AGENT) ? MsgSrcAgent : MsgSrcCtl;
(void)prctl(PR_SET_NAME, thread_name);
(void)pthread_detach(pthread_self());
@ -548,8 +553,14 @@ void* CM_WorkThreadMain(void* argp)
write_runlog(LOG, "cmserver pool thread %lu starting, \n", thrinfo->tid);
SetCanProcThisMsgFun(CanProcThisMsg);
uint32 msgCount = 0;
uint32 preMsgCount = 0;
uint64 totalWaitTime = 0;
uint64 totalProcTime = 0;
MsgRecvInfo *msg = NULL;
uint64 t0 = GetMonotonicTimeMs();
uint32 ioThreadIdx = thrinfo->id % gIOThreads.count;
CM_IOThread* ioThrInfo = &gIOThreads.threads[ioThreadIdx];
for (;;) {
if (got_stop == true) {
@ -558,14 +569,16 @@ void* CM_WorkThreadMain(void* argp)
continue;
}
uint64 t1 = GetMonotonicTimeMs();
thrinfo->isBusy = false;
do {
msg = (MsgRecvInfo*)(getRecvMsg(pri, 1, argp));
msg = (MsgRecvInfo*)(getRecvMsg((PriMsgQues*)ioThrInfo->recvMsgQue, src, 1, argp));
} while (msg == NULL);
uint64 t2 = GetMonotonicTimeMs();
thrinfo->isBusy = true;
write_runlog(DEBUG5,
"get message from recv que:remote_type:%s,connSeq=%u,agentNodeId=%u,qtype=%c,len=%d.\n",
"get message from recv que:remote_type:%s,connSeq=%lu,agentNodeId=%u,qtype=%c,len=%d.\n",
msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT",
msg->connID.connSeq,
msg->connID.agentNodeId,
@ -573,22 +586,35 @@ void* CM_WorkThreadMain(void* argp)
msg->msg.len);
cm_server_process_msg(msg);
uint64 t3 = GetMonotonicTimeMs();
thrinfo->ProcConnID.remoteType = 0;
thrinfo->ProcConnID.connSeq = 0;
thrinfo->ProcConnID.agentNodeId = 0;
FreeBufFromMsgPool((void *)msg);
msg = NULL;
thrinfo->procMsgCount++;
if (msgCount % (MSG_COUNT_FOR_LOG) == 0 && msgCount >= (MSG_COUNT_FOR_LOG)) {
write_runlog(DEBUG1, "the thread has deal 300 msg at this time.\n");
msgCount = 0;
totalWaitTime += t2 - t1;
totalProcTime += t3 - t2;
if (t3 - t0 > MSG_TIME_FOR_LOG * CM_MS_COUNT_PER_SEC) {
write_runlog(DEBUG5,
"the thread process message:total count:%u,this time:%u,wait time=%lums,proc time=%lums\n",
thrinfo->procMsgCount,
thrinfo->procMsgCount - preMsgCount,
totalWaitTime,
totalProcTime);
totalWaitTime = 0;
totalProcTime = 0;
t0 = t3;
preMsgCount = thrinfo->procMsgCount;
}
}
return thrinfo;
}
void pushMsgToQue(CM_Connection* con)
void pushMsgToQue(CM_IOThread *thrinfo, CM_Connection* con)
{
uint32 totalFreeCount, totalAllocCount, freeCount, allocCount, typeCount;
@ -606,21 +632,17 @@ void pushMsgToQue(CM_Connection* con)
write_runlog(LOG,
"alloc memory for msg failed,totalFreeCount(%u), totalAllocCount(%u). this type(%u) "
"freeCount(%u), allocCount(%u).\n",
totalFreeCount,
totalAllocCount,
allocLen,
freeCount,
allocCount);
totalFreeCount, totalAllocCount, allocLen, freeCount, allocCount);
return;
}
errno_t rc = memset_s(msgInfo, (size_t)allocLen, 0, (size_t)allocLen);
securec_check_errno(rc, (void)rc);
msgInfo->connID.remoteType = con->port->remote_type;
msgInfo->msgProcFlag = 0;
msgInfo->msg = *con->inBuffer;
msgInfo->msg.data = (char*)&msgInfo->data[0];
if (con->inBuffer->len > 0) {
errno_t rc =
memcpy_s(msgInfo->msg.data, (size_t)con->inBuffer->len, con->inBuffer->data, (size_t)con->inBuffer->len);
rc = memcpy_s(msgInfo->msg.data, (size_t)con->inBuffer->len, con->inBuffer->data, (size_t)con->inBuffer->len);
securec_check_errno(rc, (void)rc);
}
msgInfo->connID.connSeq = con->connSeq;
@ -631,7 +653,7 @@ void pushMsgToQue(CM_Connection* con)
}
write_runlog(DEBUG5,
"push message to recv que:remote_type:%s,connSeq=%u,agentNodeId=%u,qtype=%c,len=%d.\n",
"push message to recv que:remote_type:%s,connSeq=%lu,agentNodeId=%u,qtype=%c,len=%d.\n",
msgInfo->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT",
msgInfo->connID.connSeq,
msgInfo->connID.agentNodeId,
@ -639,11 +661,15 @@ void pushMsgToQue(CM_Connection* con)
msgInfo->msg.len);
// push the message to the queue
uint64 t1 = GetMonotonicTimeMs();
msgInfo->connID.t1 = t1;
if (con->port->remote_type == CM_CTL) {
pushRecvMsg(msgInfo, MsgPriHigh);
pushRecvMsg((PriMsgQues*)thrinfo->recvMsgQue, msgInfo, MsgSrcCtl);
} else {
pushRecvMsg(msgInfo, MsgPriNormal);
pushRecvMsg((PriMsgQues*)thrinfo->recvMsgQue, msgInfo, MsgSrcAgent);
}
uint64 t2 = GetMonotonicTimeMs();
thrinfo->pushRecvQueWaitTime += (uint32)(t2 - t1);
}
static bool checkMsg(CM_Connection* con)
@ -730,7 +756,7 @@ static void CleanConBuffer(CM_Connection *con)
* @param events My Param doc
* @param arg My Param doc
*/
static void cm_server_recv_msg(void* arg)
static void cm_server_recv_msg(CM_IOThread *thrinfo, void* arg)
{
CM_Connection* con = (CM_Connection*)arg;
int qtype = 0;
@ -754,7 +780,7 @@ static void cm_server_recv_msg(void* arg)
if (!checkMsg(con)) {
break;
}
pushMsgToQue(con);
pushMsgToQue(thrinfo, con);
#ifdef KRB5
}
#endif // KRB5
@ -811,18 +837,17 @@ static void cm_server_recv_msg(void* arg)
}
}
static void recvMsg(const volatile sig_atomic_t& gotConnsClose, int fds, struct epoll_event *events)
static void recvMsg(int fds, struct epoll_event *events, CM_IOThread *thrinfo)
{
static uint32 msgCount = 0;
eventfd_t value = 0;
for (int i = 0; i < fds; i++) {
if (gotConnsClose) {
if (thrinfo->gotConnClose) {
return;
}
if (events[i].data.fd == g_wakefd) {
int ret = eventfd_read(g_wakefd, &value);
if (events[i].data.fd == thrinfo->wakefd) {
int ret = eventfd_read(thrinfo->wakefd, &value);
write_runlog(DEBUG5, "eventfd_read ret = %d,value=%lu.\n", ret, value);
continue;
}
@ -832,16 +857,11 @@ static void recvMsg(const volatile sig_atomic_t& gotConnsClose, int fds, struct
/* read event */
if (events[i].events & EPOLLIN) {
if ((con != NULL) && (con->port != NULL)) {
cm_server_recv_msg(con->arg);
msgCount++;
cm_server_recv_msg(thrinfo, con->arg);
thrinfo->recvMsgCount++;
}
}
}
if (msgCount % (MSG_COUNT_FOR_LOG) == 0 && msgCount >= (MSG_COUNT_FOR_LOG)) {
write_runlog(DEBUG1, "the thread has received %d msg(s) at this time.\n", MSG_COUNT_FOR_LOG);
msgCount = 0;
}
}
static CM_Connection *getConnect(const MsgSendInfo* msg)
@ -871,12 +891,49 @@ static CM_Connection *getConnect(const MsgSendInfo* msg)
}
}
write_runlog(DEBUG5, "getConnect:remote_type=%d,connSeq=%u,agentNodeId=%u,msg_type=%d.\n",
write_runlog(DEBUG5, "getConnect:remote_type=%d,connSeq=%lu,agentNodeId=%u,msg_type=%d.\n",
msg->connID.remoteType, msg->connID.connSeq, msg->connID.agentNodeId, msgType);
return con;
}
static inline uint64 GetIOThreadID(const ConnID connID)
{
if (connID.remoteType == CM_AGENT) {
return connID.agentNodeId % gIOThreads.count;
} else if (connID.remoteType == CM_CTL) {
return connID.connSeq % gIOThreads.count;
}
CM_ASSERT(0);
return 0;
}
static void pushMsgToSendQue(MsgSendInfo *msg, MsgSourceType src)
{
if (msg->connID.remoteType == CM_AGENT && msg->connID.agentNodeId == ALL_AGENT_NODE_ID) {
for (uint32 i = 1; i < gIOThreads.count; i++) {
uint32 len = sizeof(MsgSendInfo) + msg->dataSize;
MsgSendInfo *msg_cpy = (MsgSendInfo *)AllocBufFromMsgPool(len);
if (msg_cpy == NULL) {
write_runlog(ERROR, "pushMsgToSendQue:AllocBufFromMsgPool failed,size=%u\n", len);
return;
}
errno_t rc = memcpy_s(msg_cpy, len, msg, len);
securec_check_errno(rc, (void)rc);
CM_IOThread *thrinfo = &gIOThreads.threads[i];
pushSendMsg((PriMsgQues *)thrinfo->sendMsgQue, msg_cpy, src);
}
CM_IOThread *thrinfo = &gIOThreads.threads[0];
pushSendMsg((PriMsgQues *)thrinfo->sendMsgQue, msg, src);
} else {
uint64 id = GetIOThreadID(msg->connID);
CM_IOThread *thrinfo = &gIOThreads.threads[id];
pushSendMsg((PriMsgQues *)thrinfo->sendMsgQue, msg, src);
}
}
static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con)
{
status_t status = CM_SUCCESS;
@ -884,7 +941,7 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con)
uint64 now = GetMonotonicTimeMs();
bool retryProc = false;
static const int retrySSLAcceptDetayMs = 10;
write_runlog(LOG, "[InnerProcSSLAccept] now=%lu,procTime=%lu,startTime=%lu,connSeq=%u.\n",
write_runlog(LOG, "[InnerProcSSLAccept] now=%lu,procTime=%lu,startTime=%lu,connSeq=%lu.\n",
now, msg->procTime, connMsg->startConnTime, con->connSeq);
status = cm_cs_ssl_accept(g_ssl_acceptor_fd, &con->port->pipe);
@ -892,9 +949,9 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con)
if (now < connMsg->startConnTime + CM_SSL_IO_TIMEOUT) {
retryProc = true;
status = CM_SUCCESS;
write_runlog(LOG, "[ProcessSslConnRequest]retry ssl connect,connSeq=%u.\n", con->connSeq);
write_runlog(LOG, "[ProcessSslConnRequest]retry ssl connect,connSeq=%lu.\n", con->connSeq);
} else {
write_runlog(ERROR, "[ProcessSslConnRequest]ssl connect timeout,connSeq=%u.\n", con->connSeq);
write_runlog(ERROR, "[ProcessSslConnRequest]ssl connect timeout,connSeq=%lu.\n", con->connSeq);
}
}
@ -909,9 +966,9 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con)
errno_t rc = memcpy_s(nextConnMsg, msgSize, msg, msgSize);
securec_check_errno(rc, (void)rc);
nextConnMsg->procTime = now + retrySSLAcceptDetayMs;
pushSendMsg(nextConnMsg, MsgPriNormal);
pushMsgToSendQue(nextConnMsg, msg->connID.remoteType == CM_AGENT ? MsgSrcAgent : MsgSrcCtl);
write_runlog(LOG,
"[ProcessSslConnRequest]retry ssl connect later,procTime=%lu,connSeq=%u.\n",
"[ProcessSslConnRequest]retry ssl connect later,procTime=%lu,connSeq=%lu.\n",
nextConnMsg->procTime,
con->connSeq);
return;
@ -920,7 +977,7 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con)
(void)EventAdd(con->epHandle, (int)EPOLLIN, con);
if (status != CM_SUCCESS) {
write_runlog(ERROR, "[ProcessSslConnRequest]srv ssl accept failed,connSeq=%u.\n", con->connSeq);
write_runlog(ERROR, "[ProcessSslConnRequest]srv ssl accept failed,connSeq=%lu.\n", con->connSeq);
DisableRemoveConn(con);
return;
}
@ -930,7 +987,7 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con)
AddCMAgentConnection(con);
RemoveTempConnection(con);
}
write_runlog(LOG, "[ProcessSslConnRequest]srv ssl connect success,connSeq=%u.\n", con->connSeq);
write_runlog(LOG, "[ProcessSslConnRequest]srv ssl connect success,connSeq=%lu.\n", con->connSeq);
}
/**
@ -988,7 +1045,9 @@ static int32 AssignCmaConnToThread(CM_Connection *con)
}
/* assign new connection to a work thread by round robin */
if (CMAssignConnToThread(con, &gIOThread) != STATUS_OK) {
uint32 threadID = con->port->node_id % gIOThreads.count;
CM_IOThread *ioThread = &gIOThreads.threads[threadID];
if (CMAssignConnToThread(con, ioThread) != STATUS_OK) {
write_runlog(LOG, "Assign new CM_AGENT connection to worker thread failed, confd is %d.\n", con->fd);
return -1;
}
@ -1000,8 +1059,10 @@ static int32 AssignCmctlConnToThread(CM_Connection *con)
if (con->fd < 0) {
return -1;
}
uint64 threadID = con->connSeq % gIOThreads.count;
CM_IOThread *ioThread = &gIOThreads.threads[threadID];
AddTempConnection(con);
if (CMAssignConnToThread(con, &gIOThread) != STATUS_OK) {
if (CMAssignConnToThread(con, ioThread) != STATUS_OK) {
write_runlog(
LOG, "Assign new connection %d to worker thread failed, confd is %d.\n", con->port->remote_type, con->fd);
return -1;
@ -1035,7 +1096,7 @@ static inline CM_Connection *GetCmConnect(const MsgSendInfo* msg)
CM_Connection *con = getConnect(msg);
if (con == NULL) {
write_runlog(ERROR,
"[sendMsgs]get connection failed:remote_type=%s,connSeq=%u,agentNodeId=%u.\n",
"[sendMsgs]get connection failed:remote_type=%s,connSeq=%lu,agentNodeId=%u.\n",
msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT",
msg->connID.connSeq,
msg->connID.agentNodeId);
@ -1079,7 +1140,7 @@ static void CheckConnectAssignThread(const MsgSendInfo* msg)
AssignConnToThread(*(CM_Connection **)msg->data);
}
void InnerProc(const MsgSendInfo* msg)
void InnerProc(MsgSendInfo* msg)
{
switch (msg->procMethod) {
case PM_REMOVE_CONN:
@ -1095,13 +1156,16 @@ void InnerProc(const MsgSendInfo* msg)
case PM_ASSIGN_CONN:
CheckConnectAssignThread(msg);
break;
default:;
default:
write_runlog(ERROR, "unknown procMethod:%d.\n", (int)msg->procMethod);
}
msg->connID.t9 = GetMonotonicTimeMs();
}
static int sendMsg(const MsgSendInfo *msg, CM_Connection *con)
static int sendMsg(MsgSendInfo *msg, CM_Connection *con)
{
int ret = CmsSendAndFlushMsg(con, msg->msgType, (const char *)&msg->data[0], msg->dataSize, msg->log_level);
msg->connID.t9 = GetMonotonicTimeMs();
if (ret != 0) {
write_runlog(ERROR, "CmsSendAndFlushMsg error.\n");
} else {
@ -1115,11 +1179,15 @@ static int sendMsg(const MsgSendInfo *msg, CM_Connection *con)
return ret;
}
static void sendMsg(const MsgSendInfo *msg)
static void sendMsg(uint32 id, MsgSendInfo *msg)
{
if (msg->connID.remoteType == CM_AGENT && msg->connID.agentNodeId == ALL_AGENT_NODE_ID) {
(void)pthread_rwlock_wrlock(&gConns.lock);
for (uint32 i = 0; i < gConns.max_node_id + 1; i++) {
if (i % gIOThreads.count != id) {
continue;
}
CM_Connection *con = gConns.connections[i];
if (con == NULL) {
continue;
@ -1135,7 +1203,7 @@ static void sendMsg(const MsgSendInfo *msg)
CM_Connection *con = getConnect(msg);
if (con == NULL) {
write_runlog(ERROR,
"[sendMsgs]get connection failed:remote_type=%s,connSeq=%u,agentNodeId=%u.\n",
"[sendMsgs]get connection failed:remote_type=%s,connSeq=%lu,agentNodeId=%u.\n",
msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT",
msg->connID.connSeq,
msg->connID.agentNodeId);
@ -1148,31 +1216,73 @@ static void sendMsg(const MsgSendInfo *msg)
}
}
static void procSendMsg(const MsgSendInfo *msg)
static void procSendMsg(CM_IOThread &thrinfo, MsgSendInfo *msg)
{
static const int log_interval = 5;
static const int expire_time = 7000;
if (msg->procMethod == (int)PM_NONE) {
sendMsg(msg);
thrinfo.sendMsgCount++;
sendMsg(thrinfo.id, msg);
} else {
write_runlog(DEBUG5, "innerProc,method=%d.\n", msg->procMethod);
thrinfo.innerProcCount++;
InnerProc(msg);
}
if (msg->connID.t1 != 0 && msg->connID.t9 != 0 && msg->connID.t9 - msg->connID.t1 > expire_time) {
static volatile time_t pre = 0;
static volatile uint32 discard = 0;
time_t now = time(NULL);
if (now > pre + log_interval) {
write_runlog(WARNING,
"msg_delay:type=%c,procMethod=%d,msgProcFlag=%d,msgType=%d,remoteType=%d,pushRecvQue=%lu,inRecvQue=%lu,"
"getRecvQue=%lu,proc=%lu,pushSendQue=%lu,inSendQue=%lu,getSendQue=%lu,send=%lu,discard=%u\n",
msg->msgType,
(int)msg->procMethod,
(int)msg->msgProcFlag,
msg->dataSize > sizeof(int) ? *((int *)msg->data) : -1,
msg->connID.remoteType,
msg->connID.t2 - msg->connID.t1,
msg->connID.t3 - msg->connID.t2,
msg->connID.t4 - msg->connID.t3,
msg->connID.t5 - msg->connID.t4,
msg->connID.t6 - msg->connID.t5,
msg->connID.t7 - msg->connID.t6,
msg->connID.t8 - msg->connID.t7,
msg->connID.t9 - msg->connID.t8,
discard);
pre = now;
discard = 0;
} else {
++discard;
}
}
}
static void sendMsgs()
static void sendMsgs(CM_IOThread &thrinfo)
{
static uint32 msgCount = 0;
size_t total = getSendMsgCount();
PriMsgQues *sendQue = (PriMsgQues*)thrinfo.sendMsgQue;
size_t total = getMsgCount(sendQue);
size_t procCount = 0;
if (total == 0) {
return;
}
for (;;) {
MsgSendInfo *msg = (MsgSendInfo*)(getSendMsg());
uint64 t1 = GetMonotonicTimeMs();
MsgSendInfo *msg = (MsgSendInfo *)(getSendMsg(sendQue, MsgSrcAgent));
if (msg == NULL) {
write_runlog(DEBUG5, "no message in send que.\n");
uint64 t2 = GetMonotonicTimeMs();
thrinfo.getSendQueWaitTime += (uint32)(t2 - t1);
break;
}
uint64 t2 = GetMonotonicTimeMs();
thrinfo.getSendQueWaitTime += (uint32)(t2 - t1);
write_runlog(DEBUG5,
"get message from send que:remote_type:%s,connSeq=%u,agentNodeId=%u,msgType=%c:%d,len=%u.\n",
"get message from send que:remote_type:%s,connSeq=%lu,agentNodeId=%u,msgType=%c:%d,len=%u.\n",
msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT",
msg->connID.connSeq,
msg->connID.agentNodeId,
@ -1180,14 +1290,10 @@ static void sendMsgs()
msg->dataSize > sizeof(int) ? *((int *)msg->data) : 0, // internal process message's datasize maybe 0
msg->dataSize);
procSendMsg(msg);
procSendMsg(thrinfo, msg);
FreeBufFromMsgPool(msg);
msg = NULL;
if (msgCount++ % SEND_COUNT == 0) {
write_runlog(DEBUG1, "the thread has send %u msg at this time.\n", msgCount);
}
procCount++;
if (procCount >= total) {
break;
@ -1195,11 +1301,14 @@ static void sendMsgs()
}
}
static void WakeSenderFunc(void)
static void WakeSenderFunc(const ConnID connID)
{
uint64 id = GetIOThreadID(connID);
CM_IOThread *thrinfo = &gIOThreads.threads[id];
int wakefd = thrinfo->wakefd;
eventfd_t value = pthread_self();
if (g_wakefd >= 0) {
int ret = eventfd_write(g_wakefd, value);
if (wakefd >= 0) {
int ret = eventfd_write(wakefd, value);
if (ret != 0) {
write_runlog(ERROR, "eventfd_write failed.ret = %d,errno=%d,value=%lu.\n", ret, errno, value);
}
@ -1208,44 +1317,48 @@ static void WakeSenderFunc(void)
}
}
static int CreateWakeupEvent(int epollHandle)
static int CreateWakeupEvent(int epollHandle, int &wakefd)
{
g_wakefd = eventfd(0, 0);
if (g_wakefd < 0) {
wakefd = eventfd(0, 0);
if (wakefd < 0) {
write_runlog(ERROR, "eventfd error :%d.\n", errno);
return CM_ERROR;
}
write_runlog(LOG, "eventfd :%d.\n", g_wakefd);
write_runlog(LOG, "eventfd :%d.\n", wakefd);
struct epoll_event ev = {.events = (uint32)EPOLLIN, {.fd = g_wakefd}};
if (epoll_ctl(epollHandle, EPOLL_CTL_ADD, g_wakefd, &ev) != 0) {
struct epoll_event ev = {.events = (uint32)EPOLLIN, {.fd = wakefd}};
if (epoll_ctl(epollHandle, EPOLL_CTL_ADD, wakefd, &ev) != 0) {
write_runlog(ERROR, "epoll_ctl error :%d.\n", errno);
(void)close(g_wakefd);
g_wakefd = -1;
(void)close(wakefd);
wakefd = -1;
return CM_ERROR;
}
return CM_SUCCESS;
}
void* CM_IOThreadMain(void* argp)
void *CM_IOThreadMain(void *argp)
{
int epollHandle;
struct epoll_event events[MAX_EVENTS];
sigset_t block_sig_set;
CM_IOThread *thrinfo = (CM_IOThread *)argp;
time_t time1 = time(NULL);
CM_IOThread* thrinfo = (CM_IOThread*)argp;
thread_name = "IO_THREAD";
thread_name = "IO_WORKER";
(void)prctl(PR_SET_NAME, thread_name);
epollHandle = thrinfo->epHandle;
if (CreateWakeupEvent(epollHandle) != CM_SUCCESS) {
if (CreateWakeupEvent(epollHandle, thrinfo->wakefd) != CM_SUCCESS) {
return NULL;
}
uint64 epollWait = 0, recvMsgTime = 0, sendMsgTime = 0, count = 0;
(void)pthread_detach(pthread_self());
setBlockSigMask(&block_sig_set);
setWakeSenderFunc(WakeSenderFunc);
int waitTime = EPOLL_TIMEOUT;
volatile sig_atomic_t& gotConnsClose = got_conns_close[thrinfo->id];
write_runlog(LOG, "cmserver pool thread %lu starting, epollfd is %d.\n", thrinfo->tid, epollHandle);
for (;;) {
if (got_stop == 1) {
@ -1254,15 +1367,10 @@ void* CM_IOThreadMain(void* argp)
continue;
}
CloseAllConnections(gotConnsClose, epollHandle);
CloseAllConnections(thrinfo);
thrinfo->isBusy = false;
/* wait for events to happen, 5s timeout */
if (existSendMsg()) {
waitTime = 1;
} else {
waitTime = EPOLL_TIMEOUT;
}
uint64 t2 = GetMonotonicTimeMs();
int fds = epoll_pwait(epollHandle, events, MAX_EVENTS, waitTime, &block_sig_set);
if (fds < 0) {
if (errno != EINTR && errno != EWOULDBLOCK) {
@ -1271,21 +1379,49 @@ void* CM_IOThreadMain(void* argp)
}
}
thrinfo->isBusy = true;
uint64 t3 = GetMonotonicTimeMs();
if (fds > 0) {
recvMsg(gotConnsClose, fds, events);
recvMsg(fds, events, thrinfo);
}
uint64 t4 = GetMonotonicTimeMs();
sendMsgs(*thrinfo);
uint64 t5 = GetMonotonicTimeMs();
sendMsgs();
epollWait += t3 - t2;
recvMsgTime += t4 - t3;
sendMsgTime += t5 - t4;
count++;
time_t time2 = time(NULL);
if (time2 - time1 >= MSG_TIME_FOR_LOG) {
size_t totalRecvMsg = getMsgCount((PriMsgQues *)thrinfo->recvMsgQue);
size_t totalSendMsg = getMsgCount((PriMsgQues *)thrinfo->sendMsgQue);
if (totalRecvMsg >= MAX_MSG_IN_QUE || totalSendMsg >= MAX_MSG_IN_QUE) {
write_runlog(LOG,
"total receive count:%u,send count:%u,innerProc count:%u;recv que size:%lu,send que size:%lu,"
"push send msg wait:%u,get send msg wait:%u,epoll wait=%lu,recv msg=%lu,send msg=%lu,count=%lu\n",
thrinfo->recvMsgCount, thrinfo->sendMsgCount, thrinfo->innerProcCount, totalRecvMsg, totalSendMsg,
thrinfo->pushRecvQueWaitTime / CM_MS_COUNT_PER_SEC,
thrinfo->getSendQueWaitTime / CM_MS_COUNT_PER_SEC,
epollWait, recvMsgTime, sendMsgTime, count);
}
epollWait = recvMsgTime = sendMsgTime = 0;
count = 0;
time1 = time2;
}
}
(void)close(epollHandle);
(void)close(g_wakefd);
g_wakefd = -1;
thrinfo->epHandle = -1;
(void)close(thrinfo->wakefd);
thrinfo->wakefd = -1;
delete (PriMsgQues*)thrinfo->recvMsgQue;
thrinfo->recvMsgQue = NULL;
delete (PriMsgQues*)thrinfo->sendMsgQue;
thrinfo->sendMsgQue = NULL;
return thrinfo;
}
/**
* @brief add/mod an event to epoll
*
@ -1626,9 +1762,7 @@ static int asyncSendMsgInner(const ConnID& connID, uint8 msgProcFlag, char msgty
write_runlog(ERROR, "RespondMsg:AllocBufFromMsgPool failed,size=%u\n", (uint32)(sizeof(MsgSendInfo) + len));
return (int)ERR_ALLOC_MEMORY;
}
msg->connID.remoteType = connID.remoteType;
msg->connID.agentNodeId = connID.agentNodeId;
msg->connID.connSeq = connID.connSeq;
msg->connID = connID;
msg->procTime = 0;
msg->log_level = log_level;
msg->dataSize = (uint32)len;
@ -1641,25 +1775,22 @@ static int asyncSendMsgInner(const ConnID& connID, uint8 msgProcFlag, char msgty
}
write_runlog(DEBUG1,
"push message to send que:remote_type:%s,connSeq=%u,agentNodeId=%u,msgType=%c,len=%u.\n",
"push message to send que:remote_type:%s,connSeq=%lu,agentNodeId=%u,msgType=%c,len=%u.\n",
msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT",
msg->connID.connSeq,
msg->connID.agentNodeId,
msg->msgType,
msg->dataSize);
pushSendMsg(msg, msg->connID.remoteType == CM_AGENT ? MsgPriNormal : MsgPriHigh);
pushMsgToSendQue(msg, msg->connID.remoteType == CM_CTL ? MsgSrcCtl : MsgSrcAgent);
return 0;
}
int RespondMsg(const MsgRecvInfo* recvMsg, char msgtype, const char *s, size_t len, int log_level)
int RespondMsg(MsgRecvInfo* recvMsg, char msgtype, const char *s, size_t len, int log_level)
{
ConnID connID;
connID.remoteType = recvMsg->connID.remoteType;
connID.connSeq = recvMsg->connID.connSeq;
connID.agentNodeId = recvMsg->connID.agentNodeId;
return asyncSendMsgInner(connID, recvMsg->msgProcFlag, msgtype, s, len, log_level);
recvMsg->connID.t5 = GetMonotonicTimeMs();
return asyncSendMsgInner(recvMsg->connID, recvMsg->msgProcFlag, msgtype, s, len, log_level);
}
int SendToAgentMsg(uint agentNodeId, char msgtype, const char *s, size_t len, int log_level)
@ -1687,9 +1818,7 @@ void AsyncProcMsg(const MsgRecvInfo *recvMsg, IOProcMethond procMethod, const ch
write_runlog(ERROR, "[%s] AllocBufFromMsgPool failed.\n", __FUNCTION__);
return;
}
msg->connID.remoteType = recvMsg->connID.remoteType;
msg->connID.connSeq = recvMsg->connID.connSeq;
msg->connID.agentNodeId = recvMsg->connID.agentNodeId;
msg->connID = recvMsg->connID;
msg->procTime = 0;
msg->dataSize = len;
msg->msgType = 0;
@ -1704,12 +1833,12 @@ void AsyncProcMsg(const MsgRecvInfo *recvMsg, IOProcMethond procMethod, const ch
}
write_runlog(logLevel,
"push message to send que:remote_type:%s,connSeq=%u,agentNodeId=%u,procMethod=%d,len=%u.\n",
"push message to send que:remote_type:%s,connSeq=%lu,agentNodeId=%u,procMethod=%d,len=%u.\n",
msg->connID.remoteType == CM_CTL ? "CM_CTL" : "CM_AGENT",
msg->connID.connSeq,
msg->connID.agentNodeId,
(int)msg->procMethod,
msg->dataSize);
pushSendMsg(msg, msg->connID.remoteType == CM_AGENT ? MsgPriNormal : MsgPriHigh);
pushMsgToSendQue(msg, msg->connID.remoteType == CM_CTL ? MsgSrcCtl : MsgSrcAgent);
}

View File

@ -392,7 +392,7 @@ static bool IsCurrentNodeInVoteAZ()
status_t InitDdbArbitrate(DrvApiInfo *drvApiInfo)
{
if (g_dbType != DB_ETCD) {
if (g_dbType != DB_ETCD && g_dbType != DB_SHAREDISK) {
return CM_SUCCESS;
}
@ -909,7 +909,7 @@ DdbConn *GetNextDdbConn()
uint32 idx = 0;
for (uint32 i = 0; i < g_sess->count; ++i) {
idx = (g_sess->curIdx + i) % g_sess->count;
if (g_sess->ddbConn[idx].state == PROCESS_IN_IDLE) {
if (g_sess->ddbConn[idx].state != PROCESS_IN_RUNNING) {
break;
}
}

View File

@ -205,6 +205,8 @@ bool g_getHistoryCnStatusFromDdb = false;
bool g_needIncTermToDdbAgain = false;
bool g_clusterStarting = false;
bool g_isSharedStorageMode = false;
volatile bool g_needReloadSyncStandbyMode = false;
volatile uint32 g_refreshDynamicCfgNum = 0;
bool g_elastic_exist_node = false;
@ -249,7 +251,7 @@ int *g_lastCnDnDisconnectTimes = NULL;
volatile switchover_az_mode cm_switchover_az_mode = AUTOSWITCHOVER_AZ;
volatile logic_cluster_restart_mode cm_logic_cluster_restart_mode = INITIAL_LOGIC_CLUSTER_RESTART;
CM_IOThread gIOThread;
CM_IOThreads gIOThreads;
CM_WorkThreads gWorkThreads;
CM_HAThreads gHAThreads;
CM_MonitorThread gMonitorThread;
@ -370,47 +372,6 @@ void initazArray(char azArray[][CM_AZ_NAME])
}
}
bool is_majority_reelection_exceptional_condition(synchronous_standby_mode current_datanode_sync_mode,
int dynamic_primary_count, int staticPrimaryIndex, bool static_primary_and_dynamic_primary_differs)
{
/*
* Skip majority re-election if we are in minority mode,
* Skip majority re-election if we are not in a multi-active standby cluster
* Skip majority re-election if we are doing AZ auto-switchover
*/
if (cm_arbitration_mode == MINORITY_ARBITRATION || !g_multi_az_cluster || switchoverAZInProgress) {
return true;
}
/* Skip majority re-election if we don't have all AZ's available to us */
if ((current_cluster_az_status >= AnyAz1 && current_cluster_az_status <= FirstAz2) ||
(current_datanode_sync_mode >= AnyAz1 && current_datanode_sync_mode <= FirstAz2)) {
return true;
}
/*
* Skip majority re-election if we are just promoted to CMS primary,
* as we attempt to keep the original configuration of datanode primary/standbys
* Skip majority re-election if we don't have a static primary, which should not happen
*/
if (arbitration_majority_reelection_timeout > 0 || staticPrimaryIndex < 0) {
return true;
}
/*
* Currently we don't have interactive communication between CM server and datanodes,
* and we cannot failover to a datanode whose dynamic role is already a primary.
* Because of these constraint, we cannot deal with multiple-dynamic-primary scenarios correctly.
* Leave that to partition locking later on.
*/
const int onePrimary = 1;
if (dynamic_primary_count > onePrimary || static_primary_and_dynamic_primary_differs) {
return true;
}
return false;
}
maintenance_mode getMaintenanceMode(const uint32 &group_index)
{
maintenance_mode mode = MAINTENANCE_MODE_NONE;

View File

@ -53,8 +53,8 @@ volatile sig_atomic_t g_SetReplaceCnStatus = 0;
/* main thread exit after HA thread close connection */
volatile sig_atomic_t ha_connection_closed = 0;
volatile sig_atomic_t got_conns_close[CM_IO_THREAD_COUNT] = {0};
pid_t cm_agent = 0;
uint64 gConnSeq = 0;
const char* g_progname;
static char g_appPath[MAXPGPATH] = {0};
@ -140,8 +140,8 @@ static void stop_signal_reaper(int arg)
static void close_all_agent_connections(int arg)
{
for (int i = 0; i < CM_IO_THREAD_COUNT; i++) {
got_conns_close[i] = 1;
for (uint32 i = 0; i < gIOThreads.count; i++) {
gIOThreads.threads[i].gotConnClose = 1;
}
}
@ -1594,7 +1594,7 @@ static int32 CmSetConnState(CM_Connection *con)
errno_t rc = memset_s(&recvMsg, sizeof(MsgRecvInfo), 0, sizeof(MsgRecvInfo));
securec_check_errno(rc, (void)rc);
recvMsg.connID.agentNodeId = con->port->node_id;
recvMsg.connID.connSeq = 0;
recvMsg.connID.connSeq = con->connSeq;
recvMsg.connID.remoteType = con->port->remote_type;
AsyncProcMsg(&recvMsg, PM_ASSIGN_CONN, (const char *)&con, sizeof(CM_Connection *));
} else {
@ -1866,6 +1866,7 @@ static CM_Connection* makeConnection(int fd, Port* port)
listenCon->fd = fd;
listenCon->port = port;
listenCon->inBuffer = CM_makeStringInfo();
listenCon->connSeq = gConnSeq++;
Assert(listenCon->inBuffer != NULL);
@ -2620,22 +2621,41 @@ int main(int argc, char** argv)
}
}
status = CM_CreateWorkThreadPool(cm_thread_count);
if (status < 0) {
write_runlog(ERROR, "Create Threads Pool failed!\n");
CloseAllDdbSession();
FreeNotifyMsg();
return -1;
// worker: total [5,1000] s 5 6 10 32 50 100 200 500 1000
// IO worker a = s/3 1 2 3 10 16 33 66 166 333
// clt worker b = (s-a)/2 2 2 3 11 17 33 66 167 333
// agent worker c = s-a-b 2 2 4 11 17 34 68 167 334
// node num <32 <32 >32 >32
// agent worker <=4 >4 <=4 >4
// ctl worker 2 2 2 4
const uint32 workerCountPerNode = 3;
uint32 totalWorker = cm_thread_count;
if ((uint32)cm_thread_count > g_node_num * workerCountPerNode) {
totalWorker = g_node_num * workerCountPerNode;
}
status = CM_CreateIOThread();
uint32 ioWorkerCount = totalWorker / 3;
uint32 cltWorkerCount = (totalWorker - ioWorkerCount) / 2;
uint32 agentWorkerCount = (totalWorker - ioWorkerCount) - cltWorkerCount;
status = CM_CreateIOThreadPool(ioWorkerCount);
if (status < 0) {
write_runlog(ERROR, "Create IOThreads failed!\n");
CloseAllDdbSession();
FreeNotifyMsg();
return -1;
}
status = CM_CreateWorkThreadPool(cltWorkerCount, agentWorkerCount);
if (status < 0) {
write_runlog(ERROR, "Create Threads Pool failed!\n");
CloseAllDdbSession();
FreeNotifyMsg();
return -1;
}
g_inMaintainMode = IsMaintainFileExist();
status = CM_CreateHA();

View File

@ -22,44 +22,25 @@
* -------------------------------------------------------------------------
*/
#include <queue>
#include <pthread.h>
#include "elog.h"
#include "cm_c.h"
#include "cm_util.h"
#include "cm_msg_buf_pool.h"
#include "cms_msg_que.h"
using MsgQueType = std::deque<const char *>;
using MsgQuePtr = MsgQueType*;
struct PriMsgQues {
MsgQuePtr Ques[MSG_PRI_COUNT];
pthread_mutex_t msg_lock;
pthread_cond_t msg_cond;
CMPrioMutex prioLock;
};
static PriMsgQues g_recvMsgQues;
static PriMsgQues g_sendMsgQues;
static wakeSenderFuncType wakeSenderFunc = NULL;
static CanProcThisMsgFunType CanProcThisMsgFun = NULL;
static void InitMsgQue(PriMsgQues &que)
void InitMsgQue(PriMsgQues &que)
{
for (int i = 0; i < (int)MSG_PRI_COUNT; i++) {
que.Ques[i] = NULL;
for (int i = 0; i < (int)MSG_SRC_COUNT; i++) {
CMFairMutexInit(que.ques[i].fairLock);
}
(void)pthread_mutex_init(&que.msg_lock, NULL);
(void)pthread_cond_init(&que.msg_cond, NULL);
CMPrioMutexInit(que.prioLock);
}
void InitMsgQue()
{
InitMsgQue(g_recvMsgQues);
InitMsgQue(g_sendMsgQues);
(void)pthread_mutex_init(&que.msgLock, NULL);
(void)pthread_cond_init(&que.msgCond, NULL);
}
void setWakeSenderFunc(wakeSenderFuncType func)
@ -71,147 +52,147 @@ void SetCanProcThisMsgFun(CanProcThisMsgFunType func)
{
CanProcThisMsgFun = func;
}
static void pushToQue(PriMsgQues *priQue, MsgPriority pri, const char *msg)
size_t getMsgCount(PriMsgQues *priQue)
{
if (priQue->Ques[pri] == NULL) {
priQue->Ques[pri] = new MsgQueType;
if (priQue->Ques[pri] == NULL) {
write_runlog(ERROR, "pushToQue:out of memory.\n");
return;
}
}
priQue->Ques[pri]->push_back(msg);
}
size_t count = 0;
void pushRecvMsg(const MsgRecvInfo *msg, MsgPriority pri)
{
Assert(pri >= 0 && pri < MSG_PRI_COUNT);
PriMsgQues *priQue = &g_recvMsgQues;
(void)CMPrioMutexLock(priQue->prioLock, CMMutexPrio::CM_MUTEX_PRIO_HIGH);
pushToQue(priQue, pri, (const char*)msg);
CMPrioMutexUnLock(priQue->prioLock);
(void)pthread_cond_broadcast(&priQue->msg_cond);
}
const MsgRecvInfo *getRecvMsg(MsgPriority pri, uint32 waitTime, void *threadInfo)
{
Assert(pri >= 0 && pri < MSG_PRI_COUNT);
PriMsgQues *priQue = &g_recvMsgQues;
const MsgRecvInfo *msg = NULL;
struct timespec tv;
(void)CMPrioMutexLock(priQue->prioLock, CMMutexPrio::CM_MUTEX_PRIO_NORMAL);
for (int i = 0; i < (int)pri + 1; i++) {
MsgQuePtr que = priQue->Ques[i];
if (que == NULL) {
continue;
}
MsgQueType::iterator it = que->begin();
for (; it != que->end(); ++it) {
if (CanProcThisMsgFun == NULL || CanProcThisMsgFun(threadInfo, *it)) {
msg = (const MsgRecvInfo *)*it;
(void)que->erase(it);
break;
}
}
if (msg != NULL) {
break;
}
for (int i = 0; i < (int)MSG_SRC_COUNT; i++) {
MsgQuePtr que = &priQue->ques[i].que;
count += que->size();
}
CMPrioMutexUnLock(priQue->prioLock);
if (msg == NULL && waitTime > 0) {
(void)clock_gettime(CLOCK_REALTIME, &tv);
tv.tv_sec = tv.tv_sec + (long long)waitTime;
(void)pthread_mutex_lock(&priQue->msg_lock);
(void)pthread_cond_timedwait(&priQue->msg_cond, &priQue->msg_lock, &tv);
(void)pthread_mutex_unlock(&priQue->msg_lock);
}
return msg;
return count;
}
void pushSendMsg(const MsgSendInfo *msg, MsgPriority pri)
bool existMsg(const PriMsgQues *priQue)
{
Assert(pri >= 0 && pri < MSG_PRI_COUNT);
PriMsgQues *priQue = &g_sendMsgQues;
(void)CMPrioMutexLock(priQue->prioLock, CMMutexPrio::CM_MUTEX_PRIO_NORMAL);
pushToQue(priQue, pri, (const char*)msg);
CMPrioMutexUnLock(priQue->prioLock);
if (wakeSenderFunc != NULL) {
wakeSenderFunc();
}
}
const MsgSendInfo *getSendMsg()
{
const MsgSendInfo *msg = NULL;
PriMsgQues *priQue = &g_sendMsgQues;
uint64 now = GetMonotonicTimeMs();
(void)CMPrioMutexLock(priQue->prioLock, CMMutexPrio::CM_MUTEX_PRIO_HIGH);
for (int i = 0; i < (int)MSG_PRI_COUNT; i++) {
MsgQuePtr que = priQue->Ques[i];
if (que == NULL) {
continue;
}
MsgQueType::iterator it = que->begin();
for (; it != que->end(); ++it) {
const MsgSendInfo* sendMsg = (MsgSendInfo*)(*it);
if (sendMsg->procTime == 0 || sendMsg->procTime <= now) {
msg = (const MsgSendInfo *)(*it);
(void)que->erase(it);
break;
}
}
if (msg != NULL) {
break;
}
}
CMPrioMutexUnLock(priQue->prioLock);
return msg;
}
bool existSendMsg()
{
PriMsgQues *priQue = &g_sendMsgQues;
for (int i = 0; i < (int)MSG_PRI_COUNT; i++) {
MsgQuePtr que = priQue->Ques[i];
if (que != NULL) {
if (!que->empty()) {
return true;
}
for (int i = 0; i < (int)MSG_SRC_COUNT; i++) {
ConstMsgQuePtr que = &priQue->ques[i].que;
if (!que->empty()) {
return true;
}
}
return false;
}
size_t getSendMsgCount()
void pushRecvMsg(PriMsgQues *priQue, MsgRecvInfo *msg, MsgSourceType src)
{
PriMsgQues *priQue = &g_sendMsgQues;
size_t count = 0;
Assert(src >= 0 && src < MSG_SRC_COUNT);
(void)CMPrioMutexLock(priQue->prioLock, CMMutexPrio::CM_MUTEX_PRIO_HIGH);
for (int i = 0; i < (int)MSG_PRI_COUNT; i++) {
MsgQuePtr que = priQue->Ques[i];
if (que != NULL) {
count += que->size();
}
}
CMPrioMutexUnLock(priQue->prioLock);
(void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_WRITE);
msg->connID.t2 = GetMonotonicTimeMs();
priQue->ques[src].que.push_back((const char *)msg);
CMFairMutexUnLock(priQue->ques[src].fairLock);
return count;
(void)pthread_cond_broadcast(&priQue->msgCond);
}
static const MsgRecvInfo *getRecvMsgInner(PriMsgQues *priQue, MsgSourceType src, void *threadInfo)
{
Assert(src >= 0 && src < MSG_SRC_COUNT);
MsgRecvInfo *msg = NULL;
uint64 t3 = GetMonotonicTimeMs();
if (!existMsg(priQue)) {
return NULL;
}
for (int i = 0; i < (int)MSG_SRC_COUNT; i++) {
MsgQuePtr que = &priQue->ques[src].que;
(void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_READ);
MsgQueType::iterator it = que->begin();
for (; it != que->end(); ++it) {
if (CanProcThisMsgFun == NULL || CanProcThisMsgFun(threadInfo, *it)) {
msg = (MsgRecvInfo *)*it;
(void)que->erase(it);
msg->connID.t3 = t3;
msg->connID.t4 = GetMonotonicTimeMs();
break;
}
}
CMFairMutexUnLock(priQue->ques[src].fairLock);
if (msg != NULL) {
break;
}
src = (src == MsgSrcAgent) ? MsgSrcCtl : MsgSrcAgent; // switch src type;
}
return msg;
}
const MsgRecvInfo *getRecvMsg(PriMsgQues *priQue, MsgSourceType src, uint32 waitTime, void *threadInfo)
{
struct timespec tv;
if (priQue == NULL) {
return NULL;
}
const MsgRecvInfo* msg = getRecvMsgInner(priQue, src, threadInfo);
if (msg == NULL && waitTime > 0) {
(void)clock_gettime(CLOCK_REALTIME, &tv);
tv.tv_sec = tv.tv_sec + (long long)waitTime;
(void)pthread_mutex_lock(&priQue->msgLock);
(void)pthread_cond_timedwait(&priQue->msgCond, &priQue->msgLock, &tv);
(void)pthread_mutex_unlock(&priQue->msgLock);
}
return msg;
}
void pushSendMsg(PriMsgQues *priQue, MsgSendInfo *msg, MsgSourceType src)
{
Assert(src >= 0 && src < MSG_SRC_COUNT);
ConnID connID = msg->connID;
(void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_WRITE);
priQue->ques[src].que.push_back((const char*)msg);
msg->connID.t6 = GetMonotonicTimeMs();
CMFairMutexUnLock(priQue->ques[src].fairLock);
if (wakeSenderFunc != NULL) {
wakeSenderFunc(connID);
}
}
const MsgSendInfo *getSendMsg(PriMsgQues *priQue, MsgSourceType src)
{
const MsgSendInfo *msg = NULL;
if (!existMsg(priQue)) {
return NULL;
}
uint64 now = GetMonotonicTimeMs();
for (int i = 0; i < (int)MSG_SRC_COUNT; i++) {
MsgQuePtr que = &priQue->ques[src].que;
(void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_READ);
MsgQueType::iterator it = que->begin();
for (; it != que->end(); ++it) {
MsgSendInfo *sendMsg = (MsgSendInfo *)(*it);
if (sendMsg->procTime == 0 || sendMsg->procTime <= now) {
msg = sendMsg;
(void)que->erase(it);
sendMsg->connID.t7 = now;
sendMsg->connID.t8 = GetMonotonicTimeMs();
break;
}
}
CMFairMutexUnLock(priQue->ques[src].fairLock);
if (msg != NULL) {
break;
}
src = (src == MsgSrcAgent) ? MsgSrcCtl : MsgSrcAgent;
}
return msg;
}
bool existSendMsg(const PriMsgQues *priQue)
{
return existMsg(priQue);
}

View File

@ -550,11 +550,6 @@ void process_ctl_to_cm_switchover_full_msg(
cm_to_ctl_command_ack msgSwitchoverFullAck = {0};
msgSwitchoverFullAck.msg_type = MSG_CM_CTL_SWITCHOVER_FULL_ACK;
if (CheckEnableFlag()) {
msgSwitchoverFullAck.command_result = CM_INVALID_COMMAND;
(void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgSwitchoverFullAck), sizeof(cm_to_ctl_command_ack));
return;
}
if (backup_open != CLUSTER_PRIMARY) {
msgSwitchoverFullAck.msg_type = MSG_CM_CTL_BACKUP_OPEN;
(void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgSwitchoverFullAck), sizeof(cm_to_ctl_command_ack));
@ -928,11 +923,6 @@ void ProcessCtlToCmSwitchoverFullCheckMsg(MsgRecvInfo* recvMsgInfo)
cm_to_ctl_switchover_full_check_ack msgSwitchoverFullCheckAck;
msgSwitchoverFullCheckAck.msg_type = MSG_CM_CTL_SWITCHOVER_FULL_CHECK_ACK;
if (CheckEnableFlag()) {
msgSwitchoverFullCheckAck.switchoverDone = INVALID_COMMAND;
(void)RespondMsg(recvMsgInfo, 'S', (char*)(&msgSwitchoverFullCheckAck), sizeof(msgSwitchoverFullCheckAck));
return;
}
int32 switchoverDone = GetSwitchoverDone("[ProcessCtlToCmSwitchoverFullCheckMsg]");
msgSwitchoverFullCheckAck.switchoverDone = switchoverDone;
@ -958,11 +948,6 @@ void ProcessCtlToCmSwitchoverAzCheckMsg(MsgRecvInfo* recvMsgInfo)
cm_to_ctl_switchover_az_check_ack msgSwitchoverAZCheckAck;
msgSwitchoverAZCheckAck.msg_type = MSG_CM_CTL_SWITCHOVER_AZ_CHECK_ACK;
if (CheckEnableFlag()) {
msgSwitchoverAZCheckAck.switchoverDone = INVALID_COMMAND;
(void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgSwitchoverAZCheckAck), sizeof(msgSwitchoverAZCheckAck));
return;
}
int32 switchoverDone = GetSwitchoverDone("[ProcessCtlToCmSwitchoverAzCheckMsg]");
msgSwitchoverAZCheckAck.switchoverDone = switchoverDone;
@ -2153,11 +2138,6 @@ void ProcessCtlToCmSwitchoverAllMsg(MsgRecvInfo* recvMsgInfo, const ctl_to_cm_sw
cm_to_ctl_command_ack msgSwitchoverAllAck = { 0 };
msgSwitchoverAllAck.msg_type = MSG_CM_CTL_SWITCHOVER_ALL_ACK;
if (CheckEnableFlag()) {
msgSwitchoverAllAck.command_result = CM_INVALID_COMMAND;
(void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgSwitchoverAllAck), sizeof(cm_to_ctl_command_ack));
return;
}
if (backup_open != CLUSTER_PRIMARY) {
msgSwitchoverAllAck.msg_type = MSG_CM_CTL_BACKUP_OPEN;
(void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgSwitchoverAllAck), sizeof(cm_to_ctl_command_ack));

View File

@ -217,16 +217,16 @@ static const char* GetClusterResStatStr(MaxClusterResStatus stat)
return "";
}
MaxClusterResStatus GetResNodeStat(uint32 nodeId)
MaxClusterResStatus GetResNodeStat(uint32 nodeId, int logLevel)
{
uint32 ind = FindNodeReportResInterByNodeId(nodeId);
if (ind < g_node_num) {
if (IsReportTimeout(g_resNodeStat[ind].reportInter)) {
write_runlog(LOG, "recv node(%u) agent report res status msg timeout.\n", nodeId);
write_runlog(logLevel, "recv node(%u) agent report res status msg timeout.\n", nodeId);
return MAX_CLUSTER_STATUS_UNAVAIL;
}
if (g_resNodeStat[ind].isAvail != MAX_CLUSTER_STATUS_AVAIL) {
write_runlog(LOG, "res node(%u) stat is (%s).\n", nodeId, GetClusterResStatStr(g_resNodeStat[ind].isAvail));
write_runlog(logLevel, "node(%u) stat (%s).\n", nodeId, GetClusterResStatStr(g_resNodeStat[ind].isAvail));
}
return g_resNodeStat[ind].isAvail;
} else {
@ -432,7 +432,6 @@ void ReleaseResLockOwner(const char *resName, uint32 instId)
return;
}
bool isSuccess = true;
for (uint32 i = 0; i < kvCount; ++i) {
if (kvs[i].key[0] == '\0' || kvs[i].value[0] == '\0') {
break;
@ -441,18 +440,11 @@ void ReleaseResLockOwner(const char *resName, uint32 instId)
continue;
}
if (DelKeyInDdb(kvs[i].key, (uint32)strlen(kvs[i].key)) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] ddb del failed. key=%s, value=%s.\n", kvs[i].key, kvs[i].value);
isSuccess = false;
write_runlog(ERROR, "[CLIENT] release lock failed. key=%s, value=%s.\n", kvs[i].key, kvs[i].value);
} else {
write_runlog(LOG, "[CLIENT] ddb del success. key=%s, value=%s.\n", kvs[i].key, kvs[i].value);
write_runlog(LOG, "[CLIENT] release lock success. key=%s, value=%s.\n", kvs[i].key, kvs[i].value);
}
}
if (isSuccess) {
write_runlog(LOG, "[CLIENT] res(%s) inst(%u) release all lock success.\n", resName, instId);
} else {
write_runlog(ERROR, "[CLIENT] res(%s) inst(%u) release all lock failed.\n", resName, instId);
}
}
static ClientError CmResLock(const CmaToCmsResLock *lockMsg)

View File

@ -25,6 +25,7 @@
#include "cms_ddb.h"
#include "cms_global_params.h"
#include "cms_write_dynamic_config.h"
#include "cms_common.h"
static bool IsCnStatusParameterValid(const char *cnId, const char *cnStatus)
{
@ -193,6 +194,7 @@ void* SyncDynamicInfoFromDdb(void* arg)
if (IsDdbHealth(DDB_PRE_CONN)) {
write_runlog(DEBUG1, "will sync instance info from ddb. \n");
CmsSyncStandbyMode();
if ((cm_arbitration_mode == MINORITY_ARBITRATION || cm_server_start_mode == MINORITY_START) &&
g_multi_az_cluster) {
write_runlog(LOG,

View File

@ -194,9 +194,9 @@ int CM_CreateMonitorStopNode(void)
return 0;
}
int CM_CreateIOThread(void)
int CM_CreateIOThread(CM_IOThread &ioThread, uint32 id)
{
errno_t rc = memset_s(&gIOThread, sizeof(CM_IOThread), 0, sizeof(CM_IOThread));
errno_t rc = memset_s(&ioThread, sizeof(CM_IOThread), 0, sizeof(CM_IOThread));
securec_check_errno(rc, (void)rc);
/* create epoll fd, MAX_EVENTS just a HINT */
@ -206,41 +206,51 @@ int CM_CreateIOThread(void)
return -1;
}
gIOThread.epHandle = epollFd;
gIOThread.type = 0;
gIOThread.isBusy = false;
ioThread.epHandle = epollFd;
ioThread.id = id;
ioThread.isBusy = false;
ioThread.recvMsgQue = new PriMsgQues;
ioThread.sendMsgQue = new PriMsgQues;
InitMsgQue(*((PriMsgQues *)ioThread.sendMsgQue));
InitMsgQue(*((PriMsgQues *)ioThread.recvMsgQue));
if (pthread_create(&(gIOThread.tid), NULL, CM_IOThreadMain, &gIOThread) != 0) {
if (pthread_create(&(ioThread.tid), NULL, CM_IOThreadMain, &ioThread) != 0) {
return -1;
}
return 0;
}
int CM_CreateIOThreadPool(uint32 thrCount)
{
int err;
for (uint32 i = 0; i < thrCount; i++) {
err = CM_CreateIOThread(gIOThreads.threads[i], i);
if (err != 0) {
return err;
}
gIOThreads.count++;
}
int CM_CreateWorkThreadPool(int thrCount)
return 0;
}
static int createWorkerThread(uint32 thrCount, int type)
{
CM_WorkThread* thrinfo = NULL;
int err;
errno_t rc = 0;
int trueThrCount = thrCount;
int err;
/* include cm_ctl, so g_node_num need add. */
int ctlThreadNum = GetCtlThreadNum();
if (thrCount > ((int)g_node_num + ctlThreadNum)) {
trueThrCount = ((int)g_node_num + ctlThreadNum);
}
if (trueThrCount <= ctlThreadNum) {
trueThrCount += ctlThreadNum;
}
for (uint32 i = 0; i < thrCount; i++) {
uint32 thread_idx = gWorkThreads.count;
thrinfo = &(gWorkThreads.threads[thread_idx]);
for (int i = 0; i < trueThrCount; i++) {
thrinfo = &(gWorkThreads.threads[i]);
rc = memset_s(&gWorkThreads.threads[i], sizeof(CM_WorkThread), 0, sizeof(CM_WorkThread));
rc = memset_s(thrinfo, sizeof(CM_WorkThread), 0, sizeof(CM_WorkThread));
securec_check_errno(rc, (void)rc);
thrinfo->type = (i >= trueThrCount - ctlThreadNum) ? CM_CTL : CM_AGENT;
thrinfo->type = type;
thrinfo->isBusy = false;
thrinfo->id = i;
if ((err = pthread_create(&thrinfo->tid, NULL, CM_WorkThreadMain, thrinfo)) != 0) {
write_runlog(ERROR, "Failed to create a new CM_WorkThreadMain %d: %d\n", err, errno);
@ -249,6 +259,21 @@ int CM_CreateWorkThreadPool(int thrCount)
gWorkThreads.count++;
}
return 0;
}
int CM_CreateWorkThreadPool(uint32 ctlWorkerCount, uint32 agentWorkerCount)
{
gWorkThreads.count = 0;
if (createWorkerThread(ctlWorkerCount, CM_CTL) != 0) {
return -1;
}
if (createWorkerThread(agentWorkerCount, CM_AGENT) != 0) {
return -1;
}
return 0;
}

View File

@ -61,7 +61,7 @@ typedef enum DDB_ROLE {
DDB_ROLE_CEIL,
} DDB_ROLE;
typedef enum { PROCESS_IN_RUNNING = 0, PROCESS_IN_IDLE } PROCESS_STATE;
typedef enum { PROCESS_IN_INIT = 0, PROCESS_IN_RUNNING, PROCESS_IN_IDLE } PROCESS_STATE;
typedef enum { DDB_PRE_CONN = 0, DDB_HEAL_COUNT } DDB_CHECK_MOD;

View File

@ -44,8 +44,8 @@ status_t InitVotingDiskHandler(const char *scsiDev, uint32 offset);
status_t InitVotingDisk(const char *votingDiskPath);
status_t UpdateAllNodeHeartBeat();
void ResetVotingdiskHeartBeat();
VotingDiskStatus GetNodeHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout);
VotingDiskStatus GetNodeHeartbeatStat(uint32 nodeIndex, uint32 diskTimeout, int logLevel);
status_t AllocVotingDiskMem();
void FreeVotingDiskMem();
#endif
#endif

View File

@ -28,7 +28,7 @@
#include "cm_misc.h"
int SystemExecute(const char *scriptPath, const char *oper, uint32 timeout);
void StartOneResInst(const CmResConfList *conf);
status_t StartOneResInst(const CmResConfList *conf);
void StopOneResInst(const CmResConfList *conf);
void OneResInstShutdown(const CmResConfList *oneResConf);
status_t RegOneResInst(const CmResConfList *conf, uint32 destInstId);

View File

@ -49,6 +49,7 @@ typedef enum KeyEventTypeEn {
KEY_EVENT_RECOVER = 16,
KEY_EVENT_REFRESH_OBS_DELETE_TEXT = 17,
KEY_EVENT_DROP_CN_OBS_XLOG = 18,
KEY_EVENT_RES_ARBITRATE = 19,
KEY_EVENT_TYPE_CEIL, // new event types should be added before this.
} KeyEventType;
@ -70,4 +71,4 @@ void write_runlog(int elevel, const char *fmt, ...) __attribute__((format(printf
void write_stderr(const char *fmt, ...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 1, 2)));
void WriteKeyEventLog(KeyEventType keyEventType, uint32 instanceId, const char *fmt, ...);
#endif
#endif

View File

@ -50,6 +50,8 @@ const uint32 CM_MAX_VIP_COUNT = 16;
#define CUS_RES_CHECK_STAT_UNKNOWN 2
#define CUS_RES_CHECK_STAT_TIMEOUT 137
#define CUS_RES_START_FAIL_DEPEND_NOT_ALIVE 6
#define RES_INSTANCE_ID_MIN 20000
#define RES_INSTANCE_ID_MAX 30000

View File

@ -102,9 +102,8 @@ status_t CmAllocDlock(dlock_t *lock, uint64 lockAddr, int64 instId);
void CmInitDlock(dlock_t *lock, uint64 lockAddr, int64 instId); // init lockw header and body
void CmInitDlockHeader(dlock_t *lock, uint64 lockAddr, int64 instId); // init lockw and lockr header
status_t CmDiskLockS(dlock_t *lock, const char *scsiDev, int32 fd);
status_t CmDiskLockfS(dlock_t *lock, const char *scsiDev);
int32 CmDiskLock(dlock_t *lock, int32 fd);
status_t CmDiskLockf(dlock_t *lock, int32 fd);
status_t CmDiskLockf(dlock_t *lock, int32 fd, int64 lockTime);
status_t CmGetDlockInfo(dlock_t *lock, int32 fd);
#endif

View File

@ -46,5 +46,4 @@ typedef struct _DISK_LRW_HANDLER {
} diskLrwHandler;
status_t InitDiskLockHandle(diskLrwHandler *sdLrwHandler, const char *scsi_dev, uint32 offset, int64 instId);
status_t ShareDiskGetDlock(diskLrwHandler *handler);
#endif

View File

@ -83,19 +83,14 @@ typedef struct CM_Connection_t {
gss_buffer_desc gss_outbuf; /* GSS output token */
#endif // KRB5
NotifyCn_t notifyCn;
uint32 connSeq;
uint64 connSeq;
} CM_Connection;
typedef struct CM_Connections_t {
uint32 count;
uint32 max_node_id;
CM_Connection* connections[CM_MAX_CONNECTIONS + MAXLISTEN];
pthread_rwlock_t lock;
} CM_Connections;
typedef struct CM_WorkThread_t {
pthread_t tid;
uint32 id;
int type;
uint32 procMsgCount;
ConnID ProcConnID; // which connection is processing now;
volatile bool isBusy;
} CM_WorkThread;
@ -124,12 +119,25 @@ typedef struct CM_MonitorNodeStopThread_t {
typedef struct {
pthread_t tid;
int type;
uint32 id;
int epHandle;
int wakefd;
volatile bool isBusy;
volatile int gotConnClose;
void* recvMsgQue;
void* sendMsgQue;
uint32 pushRecvQueWaitTime;
uint32 getSendQueWaitTime;
uint32 recvMsgCount;
uint32 sendMsgCount;
uint32 innerProcCount;
} CM_IOThread;
typedef struct CM_IOThreads_t {
uint32 count;
CM_IOThread threads[CM_MAX_THREADS];
} CM_IOThreads;
typedef enum CM_ThreadStatus_e {
CM_THREAD_STARTING,
CM_THREAD_RUNNING,
@ -222,7 +230,6 @@ typedef struct CM_ConnDdbInfo_t {
#define INSTANCE_ARBITRATE_DELAY_NO_SET 0
#define INSTANCE_ARBITRATE_DELAY_HAVE_SET 1
#define CM_IO_THREAD_COUNT 1
constexpr int NO_NEED_TO_SET_PARAM = -1;
typedef struct DatanodeDynamicStatusT {
@ -315,7 +322,6 @@ extern volatile sig_atomic_t got_stop;
extern volatile sig_atomic_t g_gotParameterReload;
extern volatile sig_atomic_t g_SetReplaceCnStatus;
extern volatile sig_atomic_t ha_connection_closed;
extern volatile sig_atomic_t got_conns_close[CM_IO_THREAD_COUNT];
extern char g_replaceCnStatusFile[MAX_PATH_LEN];
void ProcessStartupPacket(int epollFd, void* arg);

View File

@ -56,6 +56,7 @@ void GetDelayArbitTimeFromConf();
void GetBackupOpenConfig();
void GetDelayArbitClusterTimeFromConf();
void GetDnArbitrateMode();
void CmsSyncStandbyMode();
bool EnableShareDisk();

View File

@ -34,7 +34,7 @@
#endif // KRB5
#define CM_SERVER_PACKET_ERROR_MSG 128
#define MSG_COUNT_FOR_LOG 300
#define MSG_TIME_FOR_LOG 5
#include "cms_msg_que.h"
enum IOProcMethond {
@ -71,7 +71,7 @@ void EventDel(int epollFd, CM_Connection* con);
void CMPerformAuthentication(CM_Connection* con);
int ReadCommand(CM_Connection *con, const char *str);
int get_authentication_type(const char* config_file);
int RespondMsg(const MsgRecvInfo* recvMsg, char msgtype, const char *s, size_t len, int log_level = LOG);
int RespondMsg(MsgRecvInfo* recvMsg, char msgtype, const char *s, size_t len, int log_level = LOG);
int SendToAgentMsg(uint agentNodeId, char msgtype, const char *s, size_t len, int log_level = LOG);
int BroadcastMsg(char msgtype, const char *s, size_t len, int log_level = LOG);
void AsyncProcMsg(const MsgRecvInfo* recvMsg, IOProcMethond procMethod, const char *s, uint32 len);

View File

@ -289,7 +289,7 @@ extern pthread_rwlock_t g_sendQueueRwlock;
extern pthread_rwlock_t g_recvQueueRwlock;
extern synchronous_standby_mode current_cluster_az_status;
extern CM_WorkThreads gWorkThreads;
extern CM_IOThread gIOThread;
extern CM_IOThreads gIOThreads;
extern CM_HAThreads gHAThreads;
extern CM_MonitorThread gMonitorThread;
extern CM_MonitorNodeStopThread gMonitorNodeStopThread;
@ -411,6 +411,7 @@ extern bool g_getHistoryDnStatusFromDdb;
extern bool g_getHistoryCnStatusFromDdb;
extern volatile uint32 g_refreshDynamicCfgNum;
extern bool g_needIncTermToDdbAgain;
extern volatile bool g_needReloadSyncStandbyMode;
extern bool g_instance_status_for_cm_server_pending[CM_PRIMARY_STANDBY_NUM];
extern bool g_clusterStarting;
/* thread count of thread pool */

View File

@ -24,54 +24,84 @@
#ifndef CMS_MSG_QUE_H
#define CMS_MSG_QUE_H
#include <queue>
#include <pthread.h>
#include "c.h"
#include "stringinfo.h"
#include "cm_util.h"
enum MsgPriority {
MsgPriHigh = 0,
MsgPriNormal,
MsgPriLow,
MSG_PRI_COUNT
enum MsgSourceType {
MsgSrcAgent = 0,
MsgSrcCtl,
MSG_SRC_COUNT
};
struct ConnID {
int32 remoteType; // CM_AGENT,CM_CTL
uint32 connSeq;
uint64 connSeq;
uint32 agentNodeId;
uint64 t1;
uint64 t2;
uint64 t3;
uint64 t4;
uint64 t5;
uint64 t6;
uint64 t7;
uint64 t8;
uint64 t9;
uint64 t10;
};
struct MsgSendInfo {
ConnID connID;
ConnID connID;
int32 log_level;
uint64 procTime;
uint8 msgProcFlag;
char msgType;
char procMethod;
char reserved; // for alignment
char reserved; // for alignment
uint32 dataSize;
uint64 data[0];
};
struct MsgRecvInfo {
ConnID connID;
ConnID connID;
uint8 msgProcFlag;
uint8 reserved1;
uint8 reserved2;
uint8 reserved3;
CM_StringInfoData msg;
CM_StringInfoData msg;
uint64 data[0];
};
using MsgQueType = std::deque<const char *>;
using MsgQuePtr = MsgQueType*;
using ConstMsgQuePtr = const MsgQueType*;
typedef void (*wakeSenderFuncType)(void);
struct MsgQue {
MsgQueType que;
CMFairMutex fairLock;
};
struct PriMsgQues {
MsgQue ques[MSG_SRC_COUNT];
pthread_mutex_t msgLock;
pthread_cond_t msgCond;
};
typedef void (*wakeSenderFuncType)(const ConnID connID);
typedef bool (*CanProcThisMsgFunType)(void *threadInfo, const char *msgData);
void InitMsgQue(PriMsgQues &que);
size_t getMsgCount(PriMsgQues *priQue);
void pushRecvMsg(const MsgRecvInfo* msg, MsgPriority pri);
const MsgRecvInfo *getRecvMsg(MsgPriority pri, uint32 waitTime, void *threadInfo);
void pushSendMsg(const MsgSendInfo *msg, MsgPriority pri);
const MsgSendInfo *getSendMsg();
size_t getSendMsgCount();
bool existSendMsg();
void pushRecvMsg(PriMsgQues* priQue, MsgRecvInfo* msg, MsgSourceType src);
const MsgRecvInfo *getRecvMsg(PriMsgQues *priQue, MsgSourceType src, uint32 waitTime, void *threadInfo);
bool existRecvMsg();
void pushSendMsg(PriMsgQues *priQue, MsgSendInfo *msg, MsgSourceType src);
const MsgSendInfo *getSendMsg(PriMsgQues *priQue, MsgSourceType src);
bool existSendMsg(const PriMsgQues *priQue);
void setWakeSenderFunc(wakeSenderFuncType func);
void SetCanProcThisMsgFun(CanProcThisMsgFunType func);

View File

@ -126,16 +126,7 @@ uint32 GetResStatReportInter(uint32 nodeId);
int GetCurAz();
uint32 GetPrimaryDnIndex(void);
status_t InitNodeReportResStatInter();
MaxClusterResStatus GetResNodeStat(uint32 nodeId);
inline bool CheckEnableFlag()
{
if (IsBoolCmParamTrue(g_enableDcf)) {
write_runlog(ERROR, "switchover is not support on dcf mode.\n");
return true;
}
return false;
}
MaxClusterResStatus GetResNodeStat(uint32 nodeId, int logLevel);
void ProcessCtlToCmReloadMsg(MsgRecvInfo* recvMsgInfo);
void ProcessCtlToCmExecDccCmdMsg(MsgRecvInfo* recvMsgInfo, ExecDdbCmdMsg *msg);

View File

@ -30,8 +30,8 @@
int CM_CreateHA(void);
int CM_CreateMonitor(void);
int CM_CreateMonitorStopNode(void);
int CM_CreateWorkThreadPool(int thrCount);
int CM_CreateIOThread();
int CM_CreateWorkThreadPool(uint32 ctlWorkerCount, uint32 agentWorkerCount);
int CM_CreateIOThreadPool(uint32 thrCount);
void CreateDnGroupStatusCheckAndArbitrateThread(void);
void CreateDealGlobalBarrierThread(void);

View File

@ -28,27 +28,31 @@
#include <pthread.h>
#include "c.h"
const int CM_NSEC_COUNT_PER_MS = 1000000;
const int CM_MS_COUNT_PER_SEC = 1000;
int CmMkdirP(char *path, unsigned int omode);
char *gs_getenv_r(const char *name);
uint64 GetMonotonicTimeMs();
enum class CMMutexPrio {
CM_MUTEX_PRIO_NONE,
CM_MUTEX_PRIO_NORMAL,
CM_MUTEX_PRIO_HIGH,
enum class CMFairMutexType {
CM_MUTEX_NODE,
CM_MUTEX_READ,
CM_MUTEX_WRITE,
};
using CMPrioMutex = struct CMPrioMutexSt {
using CMFairMutex = struct CMFairMutexSt {
pthread_mutex_t lock;
pthread_mutex_t innerLock;
pthread_cond_t cond;
uint32 highPrioCount;
CMMutexPrio curPrio;
uint32 readerCount;
uint32 writerCount;
CMFairMutexType curType;
};
void CMPrioMutexInit(CMPrioMutex &mutex);
int CMPrioMutexLock(CMPrioMutex &mutex, CMMutexPrio prio);
void CMPrioMutexUnLock(CMPrioMutex &mutex);
void CMFairMutexInit(CMFairMutex &mutex);
int CMFairMutexLock(CMFairMutex &mutex, CMFairMutexType type);
void CMFairMutexUnLock(CMFairMutex &mutex);
char *GetDynamicMem(char *dynamicPtr, size_t *curSize, size_t memSize);
#endif // CM_UTIL_H