最大集群仲裁,校验agent网络连通性过程原子化

This commit is contained in:
yewk 2023-04-17 11:22:03 +08:00 committed by zhang_xubo
parent c0a00dd73c
commit 8f9ef9afac
4 changed files with 67 additions and 38 deletions

View File

@ -80,6 +80,13 @@ typedef enum MaxClusterStatEn {
MAX_CLUSTER_EXCLUDE,
} MaxClusterStat;
typedef struct CurCmRhbStatSt {
uint32 hwl;
time_t baseTime;
time_t hbs[MAX_RHB_NUM][MAX_RHB_NUM];
} CurCmRhbStat;
static CurCmRhbStat g_curRhbStat = {0};
static const int32 CHECK_DELAY_IN_ROLE_CHANGING = 10;
static MaxNodeCluster g_curCluster = {{0}};
@ -320,6 +327,23 @@ static status_t InitMaxNodeCluster(MaxNodeCluster *maxNodeCluster)
return CM_SUCCESS;
}
static MaxClusterResStatus GetNodesConnStatByRhb(int idx1, int idx2, int timeout)
{
if (timeout == 0) {
return MAX_CLUSTER_STATUS_AVAIL;
}
if (g_curRhbStat.hbs[idx1][idx2] == 0 || g_curRhbStat.hbs[idx2][idx1] == 0) {
return MAX_CLUSTER_STATUS_INIT;
}
if (IsRhbTimeout(g_curRhbStat.hbs[idx1][idx2], g_curRhbStat.baseTime, timeout) ||
IsRhbTimeout(g_curRhbStat.hbs[idx2][idx1], g_curRhbStat.baseTime, timeout)) {
return MAX_CLUSTER_STATUS_UNAVAIL;
}
return MAX_CLUSTER_STATUS_AVAIL;
}
static bool CheckPoint2PointConn(int32 resIdx1, int32 resIdx2)
{
MaxClusterResStatus connStatus = GetNodesConnStatByRhb(resIdx1, resIdx2, (int)g_agentNetworkTimeout);
@ -454,6 +478,8 @@ static void FindMaxNodeCluster(MaxNodeCluster *maxCluster)
{
NodeCluster *nodeCluster = &(maxCluster->nodeCluster);
nodeCluster->clusterNum = -1;
g_curRhbStat.baseTime = time(NULL);
GetRhbStat(g_curRhbStat.hbs, &g_curRhbStat.hwl);
// assume that all meet the conditions.
for (int32 i = nodeCluster->maxNodeNum - 1; i >= 0; --i) {
if (!IsAllResAvailInNode(i)) {
@ -885,23 +911,35 @@ static bool IsNodeInCluster(int32 resIdx, const MaxNodeCluster *nodeCluster)
return false;
}
static void PrintRhbStatus()
static void PrintOneRhbLine(time_t *timeArr)
{
uint32 hwl = 0;
time_t hbs[MAX_RHB_NUM][MAX_RHB_NUM] = {{0}};
GetRhbStat(hbs, &hwl);
char *rhbStr = GetRhbSimple((time_t *)hbs, MAX_RHB_NUM, hwl, time(NULL), g_agentNetworkTimeout);
CM_RETURN_IF_NULL(rhbStr);
size_t rhbLen = strlen(rhbStr);
if (rhbLen >= MAX_LOG_BUFF_LEN) {
write_runlog(LOG, "rhbStr len(%lu) is exceed max log buff len(%d), can't print network stat.\n",
rhbLen, MAX_LOG_BUFF_LEN);
FREE_AND_RESET(rhbStr);
return;
int ret;
errno_t rc;
char rhbStr[MAX_PATH_LEN] = {0};
const uint32 maxInfoLen = TIME_STR_MAX_LEN + 1;
for (uint32 i = 0; i < g_curRhbStat.hwl; ++i) {
char info[maxInfoLen] = {0};
char timeBuf[TIME_STR_MAX_LEN] = {0};
GetTimeStr(timeArr[i], timeBuf, TIME_STR_MAX_LEN);
ret = snprintf_s(info, maxInfoLen, maxInfoLen - 1, "%s|", timeBuf);
securec_check_intval(ret, (void)ret);
rc = strncat_s(rhbStr, MAX_PATH_LEN, info, strlen(info));
securec_check_errno(rc, (void)rc);
}
write_runlog(LOG, "[RHB] hb infos: |%s\n", rhbStr);
}
static void PrintAllRhbStatus()
{
char timeBuf[TIME_STR_MAX_LEN] = {0};
GetTimeStr(g_curRhbStat.baseTime, timeBuf, TIME_STR_MAX_LEN);
write_runlog(LOG, "Network timeout:%u\n", g_agentNetworkTimeout);
write_runlog(LOG, "Network stat('Y' means connected, otherwise 'N'):\n%s\n", rhbStr);
FREE_AND_RESET(rhbStr);
write_runlog(LOG, "Network base_time:%s\n", timeBuf);
for (uint32 i = 0; i < g_curRhbStat.hwl; ++i) {
PrintOneRhbLine(&g_curRhbStat.hbs[i][0]);
}
}
static void PrintKickOutResult(int32 resIdx, const MaxNodeCluster *maxCluster)
@ -927,11 +965,10 @@ static void PrintKickOutResult(int32 resIdx, const MaxNodeCluster *maxCluster)
if (!CheckPoint2PointConn(resIdx, maxCluster->nodeCluster.cluster[i])) {
write_runlog(LOG, "kick out result: (index=%d,nodeId=%u) disconnect with (index=%d,nodeId=%u).\n",
resIdx, GetNodeByPoint(resIdx), i, GetNodeByPoint(i));
PrintHbsInfo(resIdx, GetNodeByPoint(resIdx), i, GetNodeByPoint(i), LOG);
continue;
}
}
PrintRhbStatus();
PrintAllRhbStatus();
}
static void PrintArbitrateResult(const MaxNodeCluster *lastCluster, const MaxNodeCluster *curCluster)
@ -1028,6 +1065,8 @@ void *MaxNodeClusterArbitrateMain(void *arg)
write_runlog(FATAL, "Alloc voting disk memory failed!\n");
exit(-1);
}
g_curRhbStat.baseTime = time(NULL);
GetRhbStat(g_curRhbStat.hbs, &g_curRhbStat.hwl);
for (;;) {
if (got_stop) {
g_threadProcessStatus = THREAD_PROCESS_STOP;

View File

@ -1727,7 +1727,7 @@ static int cm_server_process_startup_packet(int epollfd, CM_Connection* con, CM_
if ((con->port->user_name != NULL) && strncmp(con->port->user_name, pw->pw_name, SP_USER - 1)) {
write_runlog(WARNING, "invalid connection\n");
if (CmsSendAndFlushMsg(con, 'E', "invalid connection", CM_SERVER_PACKET_ERROR_MSG) != 0) {
if (CmsSendAndFlushMsg(con, 'E', "invalid connection", sizeof("invalid connection")) != 0) {
RemoveConnAfterSendMsgFailed(con);
write_runlog(ERROR, "[%s][line:%d] CmsSendAndFlushMsg fail.\n", __FUNCTION__, __LINE__);
}

View File

@ -95,31 +95,13 @@ void GetRhbStat(time_t hbs[MAX_RHB_NUM][MAX_RHB_NUM], unsigned int *hwl)
securec_check_errno(rc, (void)rc);
}
MaxClusterResStatus GetNodesConnStatByRhb(int resIdx1, int resIdx2, int timeout)
{
if (timeout == 0) {
return MAX_CLUSTER_STATUS_AVAIL;
}
if (g_hbs[resIdx1][resIdx2] == 0 || g_hbs[resIdx2][resIdx1] == 0) {
return MAX_CLUSTER_STATUS_INIT;
}
time_t curTime = time(NULL);
if (IsRhbTimeout(g_hbs[resIdx1][resIdx2], curTime, timeout) ||
IsRhbTimeout(g_hbs[resIdx2][resIdx1], curTime, timeout)) {
return MAX_CLUSTER_STATUS_UNAVAIL;
}
return MAX_CLUSTER_STATUS_AVAIL;
}
void ResetNodeConnStat()
{
errno_t rc = memset_s(g_hbs, sizeof(g_hbs), 0, sizeof(g_hbs));
securec_check_errno(rc, (void)rc);
}
void PrintOneHbInfo(int resIdx1, uint32 nodeId1, int resIdx2, uint32 nodeId2, int logLevel)
static void PrintOneHbInfo(int resIdx1, uint32 nodeId1, int resIdx2, uint32 nodeId2, int logLevel)
{
struct tm result;
GetLocalTime(&g_hbs[resIdx1][resIdx2], &result);
@ -135,3 +117,10 @@ void PrintHbsInfo(int resIdx1, uint32 nodeId1, int resIdx2, uint32 nodeId2, int
PrintOneHbInfo(resIdx1, nodeId1, resIdx2, nodeId2, logLevel);
PrintOneHbInfo(resIdx2, nodeId2, resIdx1, nodeId1, logLevel);
}
void GetTimeStr(time_t baseTime, char *timeStr, uint32 strLen)
{
struct tm result;
GetLocalTime(&baseTime, &result);
(void)strftime(timeStr, strLen, "%Y-%m-%d %H:%M:%S", &result);
}

View File

@ -26,13 +26,14 @@
#define CMS_RHB_H
#include <time.h>
#include "cms_arbitrate_cluster.h"
#define TIME_STR_MAX_LEN 20
void InitDbListsByStaticConfig();
void RefreshNodeRhbInfo(unsigned int nodeId, const time_t *hbs, unsigned int hwl);
MaxClusterResStatus GetNodesConnStatByRhb(int resIdx1, int resIdx2, int timeout);
void GetRhbStat(time_t hbs[MAX_RHB_NUM][MAX_RHB_NUM], unsigned int *hwl);
void ResetNodeConnStat();
void PrintHbsInfo(int resIdx1, uint32 nodeId1, int resIdx2, uint32 nodeId2, int logLevel);
void GetTimeStr(time_t baseTime, char *timeStr, uint32 strLen);
#endif