共享存储330相关CM代码回合社区

This commit is contained in:
xue_meng_en 2023-03-06 10:12:21 +08:00
parent d86cf089c2
commit 23593eb013
26 changed files with 632 additions and 283 deletions

View File

@ -777,7 +777,7 @@ status_t FindCacheByMultiLevel(const char *key, char *buff, uint32 buffLen)
(void)pthread_rwlock_unlock(&g_sdCacheList.lk_lock);
if (offset == 0) {
write_runlog(WARNING, "FindCacheByMultiLevel: can't find key %s or buffLen %u is invalid.\n",
write_runlog(DEBUG1, "FindCacheByMultiLevel: can't find key %s or buffLen %u is invalid.\n",
key,
buffLen);
CM_SET_DISKRW_ERROR(ERR_DISKRW_KEY_NOTFOUND, key);

View File

@ -378,19 +378,6 @@ uint64 GetTimeMinus(const struct timeval checkEnd, const struct timeval checkBeg
return (uint64)((checkEnd.tv_sec - checkBegin.tv_sec) * secTomicSec + (checkEnd.tv_usec - checkBegin.tv_usec));
}
static void GetResStatusList()
{
if (!IsCusResExist()) {
write_runlog(DEBUG1, "[CLIENT] no resource config, don't need get res status list.\n");
return;
}
RequestResStatList sendMsg = {0};
sendMsg.msgType = (int)MSG_AGENT_CM_REQUEST_RES_STATUS_LIST;
PushMsgToCmsSendQue((char *)&sendMsg, (uint32)sizeof(RequestResStatList), "get res status");
}
void* ConnCmsPMain(void* arg)
{
(void)clock_gettime(CLOCK_MONOTONIC, &g_serverHeartbeatTime);
@ -413,7 +400,6 @@ void* ConnCmsPMain(void* arg)
(void)clock_gettime(CLOCK_MONOTONIC, &g_serverHeartbeatTime);
have_killed_nodes = false;
g_agentConnCmsSuccess = true;
GetResStatusList();
} else {
/* Firstly: We judge cma connect cms in other node is ok or disconnected.
* If the connection is disconnected, we need to execute the operation of stopping instances.

View File

@ -248,6 +248,11 @@ int CheckOneResInst(const CmResConfList *conf)
(ret != CUS_RES_CHECK_STAT_ABNORMAL)) {
write_runlog(LOG, "CheckOneResInst, run system command(%s %s) special result=%d\n", conf->script, oper, ret);
}
if (ret < 0) {
return CUS_RES_CHECK_STAT_FAILED;
}
return ret;
}
@ -546,9 +551,7 @@ static status_t InitLocalCommConfOfDefRes(const CusResConfJson *resJson, CmResCo
static uint32 GetCmInstId(const CmResConfList *newConf)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
if (strcmp(g_resStatus[i].status.resName, newConf->resName) != 0) {
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
continue;
}
uint32 cmInstId = 0;
@ -558,7 +561,6 @@ static uint32 GetCmInstId(const CmResConfList *newConf)
break;
}
}
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
return cmInstId;
}
return 0;

View File

@ -2044,6 +2044,18 @@ static void SendHbs()
write_runlog(DEBUG5, "push cms msg to send queue, hbs msg.\n");
}
static void GetCusResStatListFromCms()
{
RequestLatestStatList sendMsg = {0};
sendMsg.msgType = (int)MSG_AGENT_CM_GET_LATEST_STATUS_LIST;
for (uint32 i = 0; i < CusResCount(); ++i) {
sendMsg.statVersion[i] = g_resStatus[i].status.version;
}
PushMsgToCmsSendQue((char *)&sendMsg, (uint32)sizeof(RequestLatestStatList), "get res status");
}
static void ReportInstanceStatus()
{
InstancesStatusCheckAndReport();
@ -2053,6 +2065,9 @@ static void ReportInstanceStatus()
GetDoradoIpFromCms();
}
SendHbs();
if (IsCusResExist()) {
GetCusResStatListFromCms();
}
}
void *ProcessSendCmsMsgMain(void *arg)

View File

@ -156,14 +156,11 @@ static uint32 ResInstIdToCmInstId(const char *resName, uint32 resInstId)
write_runlog(ERROR, "[CLIENT] ProcessResStatusList, unknown the res(%s) of client.\n", resName);
return 0;
}
(void)pthread_rwlock_rdlock(&(g_resStatus[index].rwlock));
for (uint32 i = 0; i < g_resStatus[index].status.instanceCount; ++i) {
if (g_resStatus[index].status.resStat[i].resInstanceId == resInstId) {
(void)pthread_rwlock_unlock(&(g_resStatus[index].rwlock));
return g_resStatus[index].status.resStat[i].cmInstanceId;
}
}
(void)pthread_rwlock_unlock(&(g_resStatus[index].rwlock));
return 0;
}
@ -234,12 +231,12 @@ void* ProcessMessageMain(void * const arg)
return NULL;
}
static void UpdateResStatusList(CmResStatList *resStat, const OneResStatList *newStat)
static inline void UpdateResStatusList(CmResStatList *resStat, const OneResStatList *newStat)
{
(void)pthread_rwlock_wrlock(&(resStat->rwlock));
errno_t rc = memcpy_s(&resStat->status, sizeof(OneResStatList), newStat, sizeof(OneResStatList));
(void)pthread_rwlock_unlock(&(resStat->rwlock));
securec_check_errno(rc, (void)rc);
(void)pthread_rwlock_unlock(&(resStat->rwlock));
}
void ProcessResStatusList(const CmsReportResStatList *msg)
@ -298,7 +295,6 @@ void ProcessResLockAckFromCms(const CmsReportLockResult *recvMsg)
return;
}
bool getFlag = false;
(void)pthread_rwlock_rdlock(&(g_resStatus[index].rwlock));
for (uint32 i = 0; i < g_resStatus[index].status.instanceCount; ++i) {
if (g_resStatus[index].status.resStat[i].cmInstanceId == recvMsg->lockOwner) {
sendMsg.result.lockOwner = g_resStatus[index].status.resStat[i].resInstanceId;
@ -306,7 +302,6 @@ void ProcessResLockAckFromCms(const CmsReportLockResult *recvMsg)
break;
}
}
(void)pthread_rwlock_unlock(&(g_resStatus[index].rwlock));
if (!getFlag) {
sendMsg.result.lockOwner = 0;
sendMsg.result.error = (uint32)CM_RES_CLIENT_CANNOT_DO;

View File

@ -57,18 +57,14 @@ void InitIsregCheckVar()
static int GetResInstId(const char *resName, uint32 cmInstId)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
if (strcmp(resName, g_resStatus[i].status.resName) != 0) {
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
continue;
}
for (uint32 j = 0; j < g_resStatus[i].status.instanceCount; ++j) {
if (g_resStatus[i].status.resStat[j].cmInstanceId == cmInstId) {
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
return (int)g_resStatus[i].status.resStat[j].resInstanceId;
}
}
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
write_runlog(ERROR, "can't get res_inst_id, by cm_inst_id(%u).\n", cmInstId);
break;
}

View File

@ -414,12 +414,12 @@ static status_t RecvResStatusListProcess(int isNotifyChange)
if (isNotifyChange == STAT_CHANGED) {
g_agentConnect->callback();
}
write_runlog(LOG, "version=%llu\n", g_clientStatusList->version);
write_runlog(LOG, "resName(%s) version=%llu\n", g_clientStatusList->resName, g_clientStatusList->version);
for (uint32 i = 0; i < g_clientStatusList->instanceCount; ++i) {
write_runlog(LOG, "resName(%s):nodeId(%u),instanceId=%u,isWorkMember=%u,status=%u\n",
g_clientStatusList->resName,
write_runlog(LOG, "nodeId(%u),cmInstId=%u,resInstId=%u,isWork=%u,status=%u\n",
g_clientStatusList->resStat[i].nodeId,
g_clientStatusList->resStat[i].cmInstanceId,
g_clientStatusList->resStat[i].resInstanceId,
g_clientStatusList->resStat[i].isWorkMember,
g_clientStatusList->resStat[i].status);
}

View File

@ -776,7 +776,7 @@ cluster_msg_string cluster_msg_map_string[] = {
{"MSG_CM_AGENT_REPORT_SET_STATUS", MSG_CM_AGENT_REPORT_SET_STATUS},
{"MSG_CM_AGENT_REPORT_RES_DATA", MSG_CM_AGENT_REPORT_RES_DATA},
{"MSG_AGENT_CM_REQUEST_RES_STATUS_LIST", MSG_AGENT_CM_REQUEST_RES_STATUS_LIST},
{"MSG_AGENT_CM_SET_INSTANCE_DATA", MSG_AGENT_CM_SET_INSTANCE_DATA},
{"MSG_AGENT_CM_GET_LATEST_STATUS_LIST", MSG_AGENT_CM_GET_LATEST_STATUS_LIST},
{"MSG_AGENT_CM_SET_RES_DATA", MSG_AGENT_CM_SET_RES_DATA},
{"MSG_AGENT_CM_GET_RES_DATA", MSG_AGENT_CM_GET_RES_DATA},
{"MSG_CLIENT_AGENT_HEARTBEAT", MSG_CLIENT_AGENT_HEARTBEAT},

View File

@ -29,8 +29,10 @@
bool g_enableSharedStorage = false;
CmResStatList g_resStatus[CM_MAX_RES_COUNT] = {{0}};
static uint32 g_resCount = 0;
static bool8 g_isDnSSMode = CM_FALSE;
static uint32 g_resCount = 0;
static uint32 g_resNode[CM_MAX_RES_NODE_COUNT] = {0};
static uint32 g_resNodeCount = 0;
typedef enum IpTypeEn {
IP_TYPE_INIT = 0,
@ -106,6 +108,44 @@ const ResIsregStatusMap g_resIsregStatusMap[] = {
{CM_RES_ISREG_NOT_SUPPORT, "not_support"},
};
static bool8 IsNodeInResNode(uint32 nodeId)
{
for (uint32 i = 0; i < CM_MAX_RES_NODE_COUNT; ++i) {
if (i >= g_resNodeCount) {
return CM_FALSE;
}
if (g_resNode[i] == nodeId) {
return CM_TRUE;
}
}
return CM_FALSE;
}
static void UpdateResNode(uint32 nodeId)
{
if (IsNodeInResNode(nodeId)) {
return;
}
if (g_resNodeCount < CM_MAX_RES_NODE_COUNT) {
g_resNode[g_resNodeCount] = nodeId;
}
++g_resNodeCount;
}
uint32 GetResNodeCount()
{
return g_resNodeCount;
}
uint32 GetResNodeId(uint32 index)
{
if (index >= CM_MAX_RES_NODE_COUNT) {
return 0;
}
return g_resNode[index];
}
const char *GetIsregStatus(int isreg)
{
uint32 size = (uint32)(sizeof(g_resIsregStatusMap) / sizeof(g_resIsregStatusMap[0]));
@ -209,6 +249,7 @@ static status_t InitOneAppResInstStat(const char *resName, const CusResInstConf
{
if (IsNodeIdValid(resInst->nodeId)) {
instStat->nodeId = (uint32)resInst->nodeId;
UpdateResNode(instStat->nodeId);
} else {
write_runlog(ERROR, "[InitResStat] res(%s), nodeId(%d) is invalid.\n", resName, resInst->nodeId);
return CM_ERROR;
@ -288,6 +329,7 @@ static status_t InitAllDnResInstStat(const DnCusResConfJson *dnResJson, OneResSt
for (uint32 k = 0; k < g_node[i].datanodeCount; ++k) {
InitOneDnResInstStatByStaticConfig(&g_node[i].datanode[k], &oneStat->resStat[oneStat->instanceCount]);
oneStat->resStat[oneStat->instanceCount].nodeId = g_node[i].node;
UpdateResNode(oneStat->resStat[oneStat->instanceCount].nodeId);
++oneStat->instanceCount;
}
}
@ -369,13 +411,10 @@ void GetCmConfJsonPath(char *path, uint32 pathLen)
status_t GetGlobalResStatusIndex(const char *resName, uint32 &index)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&(g_resStatus[i].rwlock));
if (strcmp(g_resStatus[i].status.resName, resName) == 0) {
(void)pthread_rwlock_unlock(&(g_resStatus[i].rwlock));
index = i;
return CM_SUCCESS;
}
(void)pthread_rwlock_unlock(&(g_resStatus[i].rwlock));
}
return CM_ERROR;
}
@ -403,14 +442,14 @@ bool IsOneResInstWork(const char *resName, uint32 cmInstId)
bool isWork = false;
CmResStatList *resStat = &g_resStatus[index];
(void)pthread_rwlock_rdlock(&resStat->rwlock);
for (uint32 i = 0; i < resStat->status.instanceCount; ++i) {
if (resStat->status.resStat[i].cmInstanceId == cmInstId) {
(void)pthread_rwlock_rdlock(&resStat->rwlock);
isWork = (resStat->status.resStat[i].isWorkMember == RES_INST_WORK_STATUS_AVAIL);
(void)pthread_rwlock_unlock(&resStat->rwlock);
break;
}
}
(void)pthread_rwlock_unlock(&resStat->rwlock);
return isWork;
}
@ -443,16 +482,13 @@ const char *ReadConfJsonFailStr(int ret)
status_t GetResNameByCmInstId(uint32 instId, char *resName, uint32 nameLen)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
for (uint32 j = 0; j < g_resStatus[i].status.instanceCount; ++j) {
if (g_resStatus[i].status.resStat[j].cmInstanceId == instId) {
errno_t rc = strcpy_s(resName, nameLen, g_resStatus[i].status.resName);
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
securec_check_errno(rc, (void)rc);
return CM_SUCCESS;
}
}
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
}
write_runlog(LOG, "unknown cm_inst_id %u.\n", instId);

View File

@ -648,10 +648,8 @@ static uint32 GetResNameMaxLen()
{
uint32 minLen = (uint32)strlen("res_name") + SPACE_LEN;
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
uint32 curNameLen = (uint32)strlen(g_resStatus[i].status.resName) + SPACE_LEN;
minLen = (minLen > curNameLen) ? minLen : curNameLen;
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
}
return minLen;
}

View File

@ -1782,17 +1782,14 @@ static int start_check_cluster()
static bool IsAllResInstStarted(uint32 nodeId)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
for (uint32 j = 0; j < g_resStatus[i].status.instanceCount; ++j) {
if (g_resStatus[i].status.resStat[j].nodeId != nodeId) {
continue;
}
if (GetResInstStatus(g_resStatus[i].status.resStat[j].cmInstanceId) != CM_RES_STAT_ONLINE) {
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
return false;
}
}
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
}
return true;
}

View File

@ -29,6 +29,7 @@
#include "cms_global_params.h"
#include "cms_ddb_adapter.h"
#include "cms_process_messages.h"
#include "cms_cus_res.h"
#include "cms_common_res.h"
#include "cms_rhb.h"
#include "cms_arbitrate_cluster.h"
@ -791,6 +792,11 @@ void NotifyResRegOrUnreg()
return;
}
if (!CanProcessResStatus()) {
write_runlog(LOG, "[%s], res status list invalid, can't continue.\n", __FUNCTION__);
return;
}
for (uint32 i = 0; i < g_node_num; ++i) {
MaxClusterStat ret = IsNodeInMaxCluster(g_node[i].node);
if (ret == MAX_CLUSTER_INCLUDE) {

View File

@ -90,9 +90,9 @@ void InitIsregVariable()
}
}
g_isregCheckList.nodeCount = g_node_num;
for (uint32 i = 0; i < g_node_num; ++i) {
g_isregCheckList.nodeCheck[i].nodeId = g_node[i].node;
g_isregCheckList.nodeCount = GetResNodeCount();
for (uint32 i = 0; i < g_isregCheckList.nodeCount; ++i) {
g_isregCheckList.nodeCheck[i].nodeId = GetResNodeId(i);
g_isregCheckList.nodeCheck[i].reportInter = 0;
g_isregCheckList.nodeCheck[i].isValid = true;
InitCheckList(g_isregCheckList.nodeCheck[i].nodeId, &g_isregCheckList.nodeCheck[i]);
@ -281,13 +281,16 @@ void UpdateIsworkList(uint32 cmInstId, int newIswork)
ReleaseResLockOwner(g_resStatus[i].status.resName, g_resStatus[i].status.resStat[j].cmInstanceId);
}
if (g_resStatus[i].status.resStat[j].isWorkMember != (uint32)newIswork) {
(void)pthread_rwlock_wrlock(&g_resStatus[i].rwlock);
g_resStatus[i].status.resStat[j].isWorkMember = (uint32)newIswork;
++g_resStatus[i].status.version;
ProcessReportResChangedMsg(false, g_resStatus[i].status);
SaveOneResStatusToDdb(&g_resStatus[i].status);
PrintCusInfoResList(&g_resStatus[i].status, __FUNCTION__);
return;
++(g_resStatus[i].status.version);
OneResStatList resStat = g_resStatus[i].status;
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
ProcessReportResChangedMsg(false, &resStat);
PrintCusInfoResList(&resStat, __FUNCTION__);
}
return;
}
}
}
@ -429,38 +432,39 @@ static status_t SetResStatJsonToDdb(const cJSON *root, const char *resName)
return CM_SUCCESS;
}
void SaveOneResStatusToDdb(const OneResStatList *oneResStat)
status_t SaveOneResStatusToDdb(const OneResStatList *oneResStat)
{
const char *resName = oneResStat->resName;
cJSON *root = cJSON_CreateObject();
if (!cJSON_IsObject(root)) {
write_runlog(ERROR, "create res status json obj failed, save res(%s) status failed.\n", resName);
cJSON_Delete(root);
return;
return CM_ERROR;
}
if (AddAllResInstStatToJson(root, oneResStat) != CM_SUCCESS) {
write_runlog(ERROR, "fill res status json obj failed, save res(%s) status failed.\n", resName);
cJSON_Delete(root);
return;
return CM_ERROR;
}
if (SetResStatJsonToDdb(root, resName) != CM_SUCCESS) {
write_runlog(ERROR, "set res status json obj to ddb failed, save res(%s) status failed.\n", resName);
cJSON_Delete(root);
return;
return CM_ERROR;
}
write_runlog(LOG, "save res(%s) status json to ddb success.\n", resName);
write_runlog(LOG, "save res(%s) version(%llu) status json to ddb success.\n", resName, oneResStat->version);
cJSON_Delete(root);
return CM_SUCCESS;
}
static void ParseAndProcessOneResInst(cJSON *instItem, OneResStatList *resStat)
static status_t ParseAndProcessOneResInst(cJSON *instItem, OneResStatList *resStat)
{
cJSON *tmpObj = cJSON_GetObjectItem(instItem, "cmInstId");
if (!cJSON_IsNumber(tmpObj)) {
write_runlog(ERROR, "get cmInstId from res(%s) status json failed.\n", resStat->resName);
return;
return CM_ERROR;
}
uint32 cmInstId = (uint32)tmpObj->valueint;
for (uint32 i = 0; i < resStat->instanceCount; ++i) {
@ -470,75 +474,89 @@ static void ParseAndProcessOneResInst(cJSON *instItem, OneResStatList *resStat)
tmpObj = cJSON_GetObjectItem(instItem, "isWorkMember");
if (!cJSON_IsNumber(tmpObj)) {
write_runlog(ERROR, "get isWorkMember from res(%s) status json failed.\n", resStat->resName);
return;
return CM_ERROR;
}
resStat->resStat[i].isWorkMember = (uint32)tmpObj->valueint;
tmpObj = cJSON_GetObjectItem(instItem, "status");
if (!cJSON_IsNumber(tmpObj)) {
write_runlog(ERROR, "get status from res(%s) status json failed.\n", resStat->resName);
return;
return CM_ERROR;
}
resStat->resStat[i].status = (uint32)tmpObj->valueint;
}
return CM_SUCCESS;
}
static void UpdateResStatus(OneResStatList *resStat, const cJSON * const resObj)
static status_t UpdateResStatus(OneResStatList *resStat, const cJSON * const resObj)
{
cJSON *versionObj = cJSON_GetObjectItem(resObj, "version");
if (!cJSON_IsString(versionObj)) {
write_runlog(ERROR, "get version from res(%s) status json failed.\n", resStat->resName);
return;
return CM_ERROR;
}
resStat->version = (unsigned long long)CmAtol(versionObj->valuestring, 0);
cJSON *instStatArray = cJSON_GetObjectItem(resObj, "instStatus");
if (!cJSON_IsArray(instStatArray)) {
write_runlog(ERROR, "get instStatus array from res(%s) status json failed.\n", resStat->resName);
return;
return CM_ERROR;
}
cJSON *instItem;
cJSON_ArrayForEach(instItem, instStatArray) {
ParseAndProcessOneResInst(instItem, resStat);
CM_RETURN_IFERR(ParseAndProcessOneResInst(instItem, resStat));
}
return CM_SUCCESS;
}
static cJSON *GetResStatusJson(const char *resName)
status_t GetOneResStatusFromDdb(OneResStatList *resStat)
{
char key[MAX_PATH_LEN] = {0};
char value[MAX_PATH_LEN] = {0};
GetResStatusDdbKey(key, MAX_PATH_LEN, resName);
GetResStatusDdbKey(key, MAX_PATH_LEN, resStat->resName);
DDB_RESULT ddbResult = SUCCESS_GET_VALUE;
if (GetKVFromDDb(key, MAX_PATH_LEN, value, MAX_PATH_LEN, &ddbResult) != CM_SUCCESS) {
if (ddbResult == CAN_NOT_FIND_THE_KEY) {
write_runlog(LOG, "not exit res(%s) status, key:\"%s\" in ddb.\n", resName, key);
write_runlog(LOG, "not exit res(%s) status, key:\"%s\" in ddb.\n", resStat->resName, key);
return CM_SUCCESS;
} else {
write_runlog(ERROR, "get res(%s) status %s from ddb failed: %d.\n", resName, key, (int)ddbResult);
write_runlog(ERROR, "get res(%s) status %s from ddb failed: %d.\n", resStat->resName, key, (int)ddbResult);
return CM_ERROR;
}
return NULL;
}
write_runlog(LOG, "get res(%s) status json str success, str:\"%s\".\n", resName, value);
return cJSON_Parse(value);
}
void GetOneResStatusFromDdb(OneResStatList *resStat)
{
cJSON *root = GetResStatusJson(resStat->resName);
if (!cJSON_IsObject(root)) {
write_runlog(ERROR, "parse res(%s) status json str failed.\n", resStat->resName);
cJSON_Delete(root);
return;
write_runlog(LOG, "get res(%s) status json str success, str:\"%s\".\n", resStat->resName, value);
cJSON *root = cJSON_Parse(value);
if (cJSON_IsObject(root)) {
OneResStatList tmpResStat = (*resStat);
if (UpdateResStatus(&tmpResStat, root) == CM_SUCCESS) {
errno_t rc = memcpy_s(resStat, sizeof(OneResStatList), &tmpResStat, sizeof(OneResStatList));
securec_check_errno(rc, (void)rc);
}
} else {
write_runlog(ERROR, "res(%s) status json str in ddb is irregular.\n", resStat->resName);
}
UpdateResStatus(resStat, root);
cJSON_Delete(root);
return CM_SUCCESS;
}
void GetAllResStatusFromDdb()
status_t GetAllResStatusFromDdb()
{
write_runlog(LOG, "get latest res status from ddb.\n");
for (uint32 i = 0; i < CusResCount(); ++i) {
OneResStatList tmpResStat = g_resStatus[i].status;
CM_RETURN_IFERR(GetOneResStatusFromDdb(&tmpResStat));
PrintCusInfoResList(&tmpResStat, __FUNCTION__);
(void)pthread_rwlock_wrlock(&g_resStatus[i].rwlock);
GetOneResStatusFromDdb(&g_resStatus[i].status);
errno_t rc = memcpy_s(&g_resStatus[i].status, sizeof(OneResStatList), &tmpResStat, sizeof(OneResStatList));
securec_check_errno(rc, (void)rc);
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
PrintCusInfoResList(&g_resStatus[i].status, __FUNCTION__);
}
return CM_SUCCESS;
}
void SendRegMsgToCma(uint32 destNodeId, int resMode, uint32 resInstId, const char *resName)
@ -563,7 +581,6 @@ void SendRegMsgToCma(uint32 destNodeId, int resMode, uint32 resInstId, const cha
void NotifyCmaDoReg(uint32 destNodeId)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
const OneResStatList *resInfo = &g_resStatus[i].status;
for (uint32 j = 0; j < resInfo->instanceCount; ++j) {
if (resInfo->resStat[j].nodeId != destNodeId) {
@ -578,14 +595,12 @@ void NotifyCmaDoReg(uint32 destNodeId)
UpdateIsworkList(resInfo->resStat[j].cmInstanceId, RES_INST_WORK_STATUS_AVAIL);
}
}
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
}
}
void NotifyCmaDoUnreg(uint32 destNodeId)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
const OneResStatList *resInfo = &g_resStatus[i].status;
for (uint32 j = 0; j < g_resStatus[i].status.instanceCount; ++j) {
if (g_resStatus[i].status.resStat[j].nodeId != destNodeId) {
@ -598,6 +613,5 @@ void NotifyCmaDoUnreg(uint32 destNodeId)
UpdateIsworkList(g_resStatus[i].status.resStat[j].cmInstanceId, RES_INST_WORK_STATUS_UNAVAIL);
}
}
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
}
}

View File

@ -942,7 +942,7 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con)
uint64 now = GetMonotonicTimeMs();
bool retryProc = false;
static const int retrySSLAcceptDetayMs = 10;
write_runlog(LOG, "[InnerProcSSLAccept] now=%lu,procTime=%lu,startTime=%lu,connSeq=%lu.\n",
write_runlog(DEBUG5, "[InnerProcSSLAccept] now=%lu,procTime=%lu,startTime=%lu,connSeq=%lu.\n",
now, msg->procTime, connMsg->startConnTime, con->connSeq);
status = cm_cs_ssl_accept(g_ssl_acceptor_fd, &con->port->pipe);
@ -950,7 +950,7 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con)
if (now < connMsg->startConnTime + CM_SSL_IO_TIMEOUT) {
retryProc = true;
status = CM_SUCCESS;
write_runlog(LOG, "[ProcessSslConnRequest]retry ssl connect,connSeq=%lu.\n", con->connSeq);
write_runlog(DEBUG5, "[ProcessSslConnRequest]retry ssl connect,connSeq=%lu.\n", con->connSeq);
} else {
write_runlog(ERROR, "[ProcessSslConnRequest]ssl connect timeout,connSeq=%lu.\n", con->connSeq);
}
@ -968,7 +968,7 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con)
securec_check_errno(rc, (void)rc);
nextConnMsg->procTime = now + retrySSLAcceptDetayMs;
pushMsgToSendQue(nextConnMsg, msg->connID.remoteType == CM_AGENT ? MsgSrcAgent : MsgSrcCtl);
write_runlog(LOG,
write_runlog(DEBUG5,
"[ProcessSslConnRequest]retry ssl connect later,procTime=%lu,connSeq=%lu.\n",
nextConnMsg->procTime,
con->connSeq);
@ -988,7 +988,7 @@ static void InnerProcSSLAccept(const MsgSendInfo *msg, CM_Connection *con)
AddCMAgentConnection(con);
RemoveTempConnection(con);
}
write_runlog(LOG, "[ProcessSslConnRequest]srv ssl connect success,connSeq=%lu.\n", con->connSeq);
write_runlog(DEBUG5, "[ProcessSslConnRequest]srv ssl connect success,connSeq=%lu.\n", con->connSeq);
}
/**

View File

@ -0,0 +1,113 @@
/*
* Copyright (c) 2023 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cms_cus_res.cpp
*
*
* IDENTIFICATION
* src/cm_server/cms_cus_res.cpp
*
* -------------------------------------------------------------------------
*/
#include "cjson/cJSON.h"
#include "cms_ddb_adapter.h"
#include "cms_global_params.h"
#include "cms_common_res.h"
static ThreadProcessStatus g_resStatListStatus = THREAD_PROCESS_INIT;
bool8 CanProcessResStatus()
{
return (g_resStatListStatus == THREAD_PROCESS_RUNNING) ? CM_TRUE : CM_FALSE;
}
static void SaveLatestResStat(unsigned long long *oldVersion)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
if (oldVersion[i] < g_resStatus[i].status.version) {
OneResStatList tmpResStat = g_resStatus[i].status;
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
if (SaveOneResStatusToDdb(&tmpResStat) == CM_SUCCESS) {
oldVersion[i] = tmpResStat.version;
}
continue;
}
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
}
}
static inline void UpdateOldVersion(unsigned long long *oldVersion)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
oldVersion[i] = g_resStatus[i].status.version;
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
}
}
static inline void GetResStatAndSetThreadStat(ThreadProcessStatus threadStat, unsigned long long *oldVersion)
{
if (GetAllResStatusFromDdb() == CM_SUCCESS) {
UpdateOldVersion(oldVersion);
g_resStatListStatus = threadStat;
} else {
write_runlog(ERROR, "get all res status list failed, can't process cus res.\n");
}
}
void *UpdateResStatusListMain(void *arg)
{
thread_name = "UpdateResStat";
write_runlog(LOG, "UpdateResStatusListMain will start, and threadId is %lu.\n", (uint64)pthread_self());
unsigned long long *oldResVersion = (unsigned long long *)CmMalloc(sizeof(unsigned long long) * CusResCount());
for (;;) {
if (got_stop) {
g_resStatListStatus = THREAD_PROCESS_STOP;
cm_sleep(1);
break;
}
if (g_resStatListStatus == THREAD_PROCESS_INIT) {
ThreadProcessStatus processStat =
(g_HA_status->local_role == CM_SERVER_PRIMARY) ? THREAD_PROCESS_RUNNING : THREAD_PROCESS_READY;
GetResStatAndSetThreadStat(processStat, oldResVersion);
cm_sleep(1);
continue;
}
if (g_HA_status->local_role == CM_SERVER_PRIMARY) {
if (g_resStatListStatus == THREAD_PROCESS_READY) {
GetResStatAndSetThreadStat(THREAD_PROCESS_RUNNING, oldResVersion);
}
if (g_resStatListStatus == THREAD_PROCESS_RUNNING) {
SaveLatestResStat(oldResVersion);
}
} else {
if (g_resStatListStatus != THREAD_PROCESS_READY) {
g_resStatListStatus = THREAD_PROCESS_READY;
}
}
cm_sleep(1);
}
write_runlog(LOG, "UpdateResStatusListMain will exit, and threadId is %lu.\n", (uint64)pthread_self());
free(oldResVersion);
return NULL;
}

View File

@ -2411,6 +2411,20 @@ void ClearResource()
write_runlog(WARNING, "receive exit message, cms has cleared resource, and cms will exit.\n");
}
static status_t InitCusResVariable()
{
uint32 resNodeCount = GetResNodeCount();
if (resNodeCount > CM_MAX_RES_NODE_COUNT || resNodeCount == 0) {
write_runlog(ERROR,
"cus res, not support (%u) node, node count range:(0, %d].\n", resNodeCount, CM_MAX_RES_NODE_COUNT);
return CM_ERROR;
}
InitNodeReportVar();
InitIsregVariable();
return CM_SUCCESS;
}
int main(int argc, char** argv)
{
uid_t uid = getuid();
@ -2619,11 +2633,10 @@ int main(int argc, char** argv)
write_runlog(FATAL, "init res status failed.\n");
return -1;
}
if (IsCusResExist() && InitNodeReportResStatInter() != CM_SUCCESS) {
if (IsCusResExist() && (InitCusResVariable() != CM_SUCCESS)) {
write_runlog(FATAL, "init cus res variable failed.\n");
return -1;
}
InitIsregVariable();
GetAllResStatusFromDdb();
status = CM_CreateMonitor();
if (status < 0) {

View File

@ -26,6 +26,7 @@
#include "cms_ddb.h"
#include "cms_common.h"
#include "cms_common_res.h"
#include "cms_cus_res.h"
#include "cms_global_params.h"
#include "cms_process_messages.h"
#include "cms_write_dynamic_config.h"
@ -818,32 +819,37 @@ static void UpdateCheckInterval(MonitorContext *ctx)
ctx->takeTime = checkEnd.tv_sec;
}
static void SetResStatUnknown(uint32 nodeId)
static void SetResStatUnknown(CmResStatList *resStat, uint32 instIndex)
{
write_runlog(LOG, "nodeId(%u) report res stat heartbeat abnormal, set res status CM_RES_STAT_UNKNOWN.\n", nodeId);
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_wrlock(&g_resStatus[i].rwlock);
for (uint32 j = 0; j < g_resStatus[i].status.instanceCount; ++j) {
if ((g_resStatus[i].status.resStat[j].nodeId == nodeId) &&
(g_resStatus[i].status.resStat[j].status != (uint32)CM_RES_STAT_UNKNOWN)) {
g_resStatus[i].status.resStat[j].status = (uint32)CM_RES_STAT_UNKNOWN;
++g_resStatus[i].status.version;
ProcessReportResChangedMsg(false, g_resStatus[i].status);
SaveOneResStatusToDdb(&g_resStatus[i].status);
}
}
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
if (!CanProcessResStatus()) {
write_runlog(LOG, "[%s], res status list invalid, can't continue.\n", __FUNCTION__);
return;
}
if (resStat->status.resStat[instIndex].status != (uint32)CM_RES_STAT_UNKNOWN) {
(void)pthread_rwlock_wrlock(&resStat->rwlock);
resStat->status.resStat[instIndex].status = (uint32)CM_RES_STAT_UNKNOWN;
++resStat->status.version;
OneResStatList tmpStat = resStat->status;
(void)pthread_rwlock_unlock(&resStat->rwlock);
write_runlog(LOG, "res inst(%u) report invalid status timeout, set its status unknown.\n", instIndex);
PrintCusInfoResList(&tmpStat, __FUNCTION__);
ProcessReportResChangedMsg(false, &resStat->status);
}
}
static void CheckAllResReportByNode()
{
for (uint32 i = 0; i < g_node_num; ++i) {
uint32 inter = GetResStatReportInter(g_node[i].node);
if (inter > g_agentNetworkTimeout) {
SetResStatUnknown(g_node[i].node);
} else {
SetResStatReportInter(g_node[i].node);
for (uint32 i = 0; i < CusResCount(); ++i) {
for (uint32 j = 0; j < g_resStatus[i].status.instanceCount; ++j) {
uint32 cmInstId = g_resStatus[i].status.resStat[j].cmInstanceId;
uint32 inter = GetOneResInstReportInter(g_resStatus[i].status.resName, cmInstId);
if (inter >= g_agentNetworkTimeout) {
SetResStatUnknown(&g_resStatus[i], j);
} else {
IncreaseOneResInstReportInter(g_resStatus[i].status.resName, cmInstId);
}
}
}
}

View File

@ -27,6 +27,8 @@
#include "cms_ddb.h"
#include "cms_az.h"
#include "cms_common.h"
#include "cms_common_res.h"
#include "cms_cus_res.h"
#include "cms_arbitrate_datanode_pms.h"
#include "cms_rhb.h"
#ifdef ENABLE_MULTIPLE_NODES
@ -1443,6 +1445,13 @@ static void MsgKerberosStatus(MsgRecvInfo* recvMsgInfo, int msgType, CmdMsgProc
static void MsgAgentResourceStatus(MsgRecvInfo* recvMsgInfo, int msgType, CmdMsgProc *msgProc)
{
if (!IsCusResExist()) {
return;
}
if (!CanProcessResStatus()) {
write_runlog(LOG, "[%s], res status list invalid, can't continue.\n", __FUNCTION__);
return;
}
ReportResStatus *cmaToCmsResStatusPtr;
PROCESS_MSG_BY_TYPE_WITHOUT_CONN(ReportResStatus, cmaToCmsResStatusPtr, ProcessAgent2CmResStatReportMsg);
}
@ -1507,9 +1516,29 @@ static void MsgSwitchCmd(MsgRecvInfo* recvMsgInfo, int msgType, CmdMsgProc *msgP
static void MsgRequestResStatusList(MsgRecvInfo* recvMsgInfo, int msgType, CmdMsgProc *msgProc)
{
if (!IsCusResExist()) {
return;
}
if (!CanProcessResStatus()) {
write_runlog(LOG, "[%s], res status list invalid, can't continue.\n", __FUNCTION__);
return;
}
ProcessRequestResStatusListMsg(recvMsgInfo);
}
static void MsgLatestResStatusList(MsgRecvInfo* recvMsgInfo, int msgType, CmdMsgProc *msgProc)
{
if (!IsCusResExist()) {
return;
}
if (!CanProcessResStatus()) {
write_runlog(LOG, "[%s], res status list invalid, can't continue.\n", __FUNCTION__);
return;
}
RequestLatestStatList *recvMsg;
PROCESS_MSG_BY_TYPE(RequestLatestStatList, recvMsg, ProcessRequestLatestResStatusListMsg);
}
static void MsgGetSharedStorageInfo(MsgRecvInfo *recvMsgInfo, int msgType, CmdMsgProc *msgProc)
{
ProcessSharedStorageMsg(recvMsgInfo);
@ -1529,6 +1558,9 @@ static void MsgSslConnRequest(MsgRecvInfo* recvMsgInfo, int msgType, CmdMsgProc
static void MsgCmResLock(MsgRecvInfo* recvMsgInfo, int msgType, CmdMsgProc *msgProc)
{
if (!IsCusResExist()) {
return;
}
CmaToCmsResLock *lockMsg = NULL;
PROCESS_MSG_BY_TYPE(CmaToCmsResLock, lockMsg, ProcessCmResLock);
}
@ -1614,6 +1646,9 @@ static void MsgGetFloatIpInfo(MsgRecvInfo *recvMsgInfo, int msgType, CmdMsgProc
static void MsgResIsreg(MsgRecvInfo *recvMsgInfo, int msgType, CmdMsgProc *msgProc)
{
if (!IsCusResExist()) {
return;
}
CmaToCmsIsregMsg *isregMsg;
PROCESS_MSG_BY_TYPE(CmaToCmsIsregMsg, isregMsg, ProcessResIsregMsg);
}
@ -1668,17 +1703,19 @@ static void InitCmAgentCmdProc()
g_cmdProc[MSG_AGENT_CM_ETCD_CURRENT_TIME] = MsgEtcdCurrentTime;
g_cmdProc[MSG_CM_QUERY_INSTANCE_STATUS] = MsgQueryInstanceStatus;
g_cmdProc[MSG_AGENT_CM_KERBEROS_STATUS] = MsgKerberosStatus;
g_cmdProc[MSG_AGENT_CM_RESOURCE_STATUS] = MsgAgentResourceStatus;
g_cmdProc[MSG_AGENT_CM_DISKUSAGE_STATUS] = MsgDiskUsageStatus;
g_cmdProc[MSG_AGENT_CM_DATANODE_INSTANCE_BARRIER] = MsgDatanodeInstanceBarrier;
g_cmdProc[MSG_AGENT_CM_DN_SYNC_LIST] = MsgDnSyncList;
g_cmdProc[MSG_AGENT_CM_REQUEST_RES_STATUS_LIST] = MsgRequestResStatusList;
g_cmdProc[MSG_AGENT_CM_DATANODE_LOCAL_PEER] = MsgDnLocalPeer;
g_cmdProc[MSG_GET_SHARED_STORAGE_INFO] = MsgGetSharedStorageInfo;
g_cmdProc[MSG_CM_RES_LOCK] = MsgCmResLock;
g_cmdProc[MSG_CM_RHB] = MsgCmRhb;
g_cmdProc[MSG_AGENT_CM_FLOAT_IP] = MsgGetFloatIpInfo;
g_cmdProc[MSG_CM_RES_LOCK] = MsgCmResLock;
g_cmdProc[MSG_AGENT_CM_ISREG_REPORT] = MsgResIsreg;
g_cmdProc[MSG_AGENT_CM_RESOURCE_STATUS] = MsgAgentResourceStatus;
g_cmdProc[MSG_AGENT_CM_GET_LATEST_STATUS_LIST] = MsgLatestResStatusList;
g_cmdProc[MSG_AGENT_CM_REQUEST_RES_STATUS_LIST] = MsgRequestResStatusList;
}
static void InitCmClientCmdProc()

View File

@ -710,7 +710,7 @@ void ProcessSslConnRequest(MsgRecvInfo* recvMsgInfo, const AgentToCmConnectReque
return;
}
write_runlog(LOG, "ProcessSslConnRequest, node id: %u.\n", requestMsg->nodeid);
write_runlog(DEBUG5, "ProcessSslConnRequest, node id: %u.\n", requestMsg->nodeid);
if (g_ssl_acceptor_fd == NULL) {
write_runlog(ERROR, "[ProcessSslConnRequest]srv ssl_acceptor_fd null.\n");
RemoveCmagentSslConn(recvMsgInfo);

View File

@ -22,6 +22,7 @@
* -------------------------------------------------------------------------
*/
#include "cms_conn.h"
#include "cms_cus_res.h"
#include "cms_common_res.h"
#include "cms_ddb_adapter.h"
#include "cms_global_params.h"
@ -29,90 +30,188 @@
typedef struct ResStatReportInfoSt {
uint32 nodeId;
uint32 reportInter;
MaxClusterResStatus isAvail; // 0:res inst unavailable, 1:res inst available
} ResStatReportInfo;
static ResStatReportInfo *g_resNodeStat = NULL;
typedef struct OneResStatReportInterSt {
uint32 nodeId;
uint32 cmInstId;
uint32 statReportInter;
} OneResStatReportInter;
void ProcessReportResChangedMsg(bool notifyClient, const OneResStatList &status)
typedef struct ResStatReportInterSt {
char resName[CM_MAX_RES_NAME];
uint32 instCount;
OneResStatReportInter resReport[CM_MAX_RES_INST_COUNT];
} ResStatReportInter;
static ResStatReportInfo *g_resNodeStat = NULL;
static ResStatReportInter *g_resInstReport = NULL;
void ProcessReportResChangedMsg(bool notifyClient, const OneResStatList *status)
{
CmsReportResStatList sendMsg = {0};
sendMsg.msgType = notifyClient ? (int)MSG_CM_AGENT_RES_STATUS_CHANGED : (int)MSG_CM_AGENT_RES_STATUS_LIST;
errno_t rc = memcpy_s(&sendMsg.resList, sizeof(OneResStatList), &status, sizeof(OneResStatList));
errno_t rc = memcpy_s(&sendMsg.resList, sizeof(OneResStatList), status, sizeof(OneResStatList));
securec_check_errno(rc, (void)rc);
(void)BroadcastMsg('S', (char *)(&sendMsg), sizeof(CmsReportResStatList));
}
static bool IsResStatusChanged(uint32 cmInstId, uint32 newStat, CmResStatList *oldResStat)
uint32 ResCheckResultTransToResStat(uint32 recvStat)
{
for (uint32 i = 0; i < oldResStat->status.instanceCount; ++i) {
if (cmInstId != oldResStat->status.resStat[i].cmInstanceId) {
if (recvStat == CUS_RES_CHECK_STAT_ONLINE) {
return (uint32)CM_RES_STAT_ONLINE;
}
if (recvStat == CUS_RES_CHECK_STAT_OFFLINE) {
return (uint32)CM_RES_STAT_OFFLINE;
}
if (recvStat == CUS_RES_CHECK_STAT_TIMEOUT) {
return (uint32)CM_RES_STAT_UNKNOWN;
}
if (recvStat == CUS_RES_CHECK_STAT_ABNORMAL) {
return (uint32)CM_RES_STAT_ONLINE;
}
return (uint32)CM_RES_STAT_UNKNOWN;
}
void IncreaseResReportInterMain(const char *resName, uint32 increaseInstId)
{
if (g_resInstReport == NULL) {
return;
}
for (uint32 i = 0; i < CusResCount(); ++i) {
if (strcmp(g_resInstReport[i].resName, resName) != 0) {
continue;
}
if (newStat != oldResStat->status.resStat[i].status) {
oldResStat->status.resStat[i].status = newStat;
return true;
for (uint32 j = 0; j < g_resInstReport[i].instCount; ++j) {
if (g_resInstReport[i].resReport[j].cmInstId == increaseInstId) {
++(g_resInstReport[i].resReport[j].statReportInter);
break;
}
}
break;
}
}
void IncreaseOneResInstReportInter(const char *resName, uint32 instId)
{
IncreaseResReportInterMain(resName, instId);
}
uint32 GetResReportInterMain(const char *resName, uint32 instId)
{
if (g_resInstReport == NULL) {
return 0;
}
for (uint32 i = 0; i < CusResCount(); ++i) {
if (strcmp(g_resInstReport[i].resName, resName) != 0) {
continue;
}
for (uint32 j = 0; j < g_resInstReport[i].instCount; ++j) {
if (g_resInstReport[i].resReport[j].cmInstId == instId) {
return g_resInstReport[i].resReport[j].statReportInter;
}
}
break;
}
return 0;
}
uint32 GetOneResInstReportInter(const char *resName, uint32 instId)
{
return GetResReportInterMain(resName, instId);
}
static bool IsOneCusResStatValid(uint32 status)
{
if (status == CUS_RES_CHECK_STAT_ONLINE ||
status == CUS_RES_CHECK_STAT_OFFLINE ||
status == CUS_RES_CHECK_STAT_ABNORMAL ||
status == CUS_RES_CHECK_STAT_TIMEOUT) {
return true;
}
return false;
}
static inline int ReportResStatInterCmp(const void *arg1, const void *arg2)
static void CleanResReportInter(const CmResourceStatus *resStat)
{
return (int)((((const ResStatReportInfo *)arg1)->nodeId) - (((const ResStatReportInfo *)arg2)->nodeId));
for (uint32 i = 0; i < CusResCount(); ++i) {
if (strcmp(g_resInstReport[i].resName, resStat->resName) != 0) {
continue;
}
for (uint32 j = 0; j < g_resInstReport[i].instCount; ++j) {
if (g_resInstReport[i].resReport[j].cmInstId != resStat->cmInstanceId) {
continue;
}
if (IsOneCusResStatValid(resStat->status)) {
g_resInstReport[i].resReport[j].statReportInter = 0;
}
break;
}
break;
}
}
status_t InitNodeReportResStatInter()
static bool8 IsNodeCriticalResReportTimeout(uint32 nodeId)
{
g_resNodeStat = (ResStatReportInfo*)malloc(sizeof(ResStatReportInfo) * g_node_num);
if (g_resNodeStat == NULL) {
write_runlog(ERROR, "out of memory, InitNodeReportResStatInter.\n");
return CM_ERROR;
for (uint32 i = 0; i < CusResCount(); ++i) {
for (uint32 j = 0; j < g_resInstReport[i].instCount; ++j) {
if (g_resInstReport[i].resReport[j].nodeId != nodeId) {
continue;
}
if (g_resInstReport[i].resReport[j].statReportInter >= g_agentNetworkTimeout) {
return CM_TRUE;
}
}
}
errno_t rc = memset_s(g_resNodeStat, sizeof(ResStatReportInfo), 0, sizeof(ResStatReportInfo));
securec_check_errno(rc, (void)rc);
for (uint32 i = 0; i < g_node_num; ++i) {
g_resNodeStat[i].nodeId = g_node[i].node;
return CM_FALSE;
}
static void InitResInstReport()
{
g_resInstReport = (ResStatReportInter*)CmMalloc(sizeof(ResStatReportInter) * CusResCount());
errno_t rc;
for (uint32 i = 0; i < CusResCount(); ++i) {
rc = strcpy_s(g_resInstReport[i].resName, CM_MAX_RES_NAME, g_resStatus[i].status.resName);
securec_check_errno(rc, (void)rc);
g_resInstReport[i].instCount = g_resStatus[i].status.instanceCount;
for (uint32 j = 0; j < g_resInstReport[i].instCount; ++j) {
g_resInstReport[i].resReport[j].nodeId = g_resStatus[i].status.resStat[j].nodeId;
g_resInstReport[i].resReport[j].cmInstId = g_resStatus[i].status.resStat[j].cmInstanceId;
g_resInstReport[i].resReport[j].statReportInter = 0;
}
}
}
static void InitResStatReport()
{
g_resNodeStat = (ResStatReportInfo*)CmMalloc(sizeof(ResStatReportInfo) * GetResNodeCount());
for (uint32 i = 0; i < GetResNodeCount(); ++i) {
g_resNodeStat[i].nodeId = GetResNodeId(i);
g_resNodeStat[i].isAvail = MAX_CLUSTER_STATUS_INIT;
}
#undef qsort
qsort(g_resNodeStat, g_node_num, sizeof(ResStatReportInfo), ReportResStatInterCmp);
return CM_SUCCESS;
}
void InitNodeReportVar()
{
InitResInstReport();
InitResStatReport();
}
static uint32 FindNodeReportResInterByNodeId(uint32 nodeId)
{
if (nodeId >= g_node_num || nodeId == 0) {
for (int i = (int)(g_node_num - 1); i >= 0; --i) {
if (g_resNodeStat[i].nodeId == nodeId) {
return (uint32)i;
}
}
return g_node_num;
}
// In most cases, the position is nodeId - 1.
uint32 comInd = nodeId - 1;
if (g_resNodeStat[comInd].nodeId == nodeId) {
return comInd;
}
if (g_resNodeStat[comInd].nodeId > nodeId) {
for (int i = (int)(g_node_num - 1); i >= 0; --i) {
if (g_resNodeStat[i].nodeId == nodeId) {
return (uint32)i;
}
}
return g_node_num;
}
// if g_resNodeStat[comInd].nodeId < nodeId
for (uint32 i = comInd + 1; i < g_node_num; ++i) {
for (uint32 i = 0; i < GetResNodeCount(); ++i) {
if (g_resNodeStat[i].nodeId == nodeId) {
return i;
}
}
return g_node_num;
return GetResNodeCount();
}
static MaxClusterResStatus GetResNodeStatByReport(uint32 stat)
@ -149,35 +248,22 @@ static MaxClusterResStatus IsAllNodeResInstAvail(const OneNodeResourceStatus *no
static inline void WriteGetResNodeStatErrLog(uint32 nodeId)
{
write_runlog(ERROR, "can't find nodeId(%u) in g_resNodeStat.\n", nodeId);
for (uint32 i = 0; i < g_node_num; ++i) {
for (uint32 i = 0; i < GetResNodeCount(); ++i) {
write_runlog(ERROR, "g_resNodeStat[%u].nodeId = %u.\n", i, g_resNodeStat[i].nodeId);
}
}
static bool IsCusResStatValid(const OneNodeResourceStatus *nodeStat)
{
for (uint32 i = 0; i < nodeStat->count; ++i) {
if (nodeStat->status[i].status != CUS_RES_CHECK_STAT_ONLINE &&
nodeStat->status[i].status != CUS_RES_CHECK_STAT_OFFLINE &&
nodeStat->status[i].status != CUS_RES_CHECK_STAT_ABNORMAL &&
nodeStat->status[i].status != CUS_RES_CHECK_STAT_TIMEOUT) {
return false;
}
}
return true;
}
static void RecordResStatReport(const OneNodeResourceStatus *nodeStat)
{
if (g_resNodeStat == NULL) {
write_runlog(ERROR, "[CLIENT] g_resNodeStat is null.\n");
if (g_resNodeStat == NULL || g_resInstReport == NULL) {
write_runlog(ERROR, "g_resNodeStat or g_resInstReport is null.\n");
return;
}
uint32 ind = FindNodeReportResInterByNodeId(nodeStat->node);
if (ind < g_node_num) {
if (IsCusResStatValid(nodeStat)) {
g_resNodeStat[ind].reportInter = 0;
if (ind < GetResNodeCount()) {
for (uint32 i = 0; i < nodeStat->count; ++i) {
CleanResReportInter(&nodeStat->status[i]);
}
g_resNodeStat[ind].isAvail = IsAllNodeResInstAvail(nodeStat, g_resNodeStat[ind].isAvail);
} else {
@ -185,11 +271,6 @@ static void RecordResStatReport(const OneNodeResourceStatus *nodeStat)
}
}
static inline bool IsReportTimeout(uint32 reportInter)
{
return (reportInter >= g_agentNetworkTimeout);
}
static const char* GetClusterResStatStr(MaxClusterResStatus stat)
{
switch (stat) {
@ -210,12 +291,17 @@ static const char* GetClusterResStatStr(MaxClusterResStatus stat)
MaxClusterResStatus GetResNodeStat(uint32 nodeId, int logLevel)
{
if (g_resNodeStat == NULL || g_resInstReport == NULL) {
return MAX_CLUSTER_STATUS_UNKNOWN;
}
if (IsNodeCriticalResReportTimeout(nodeId)) {
write_runlog(logLevel, "recv node(%u) agent report res status msg timeout.\n", nodeId);
return MAX_CLUSTER_STATUS_UNAVAIL;
}
uint32 ind = FindNodeReportResInterByNodeId(nodeId);
if (ind < g_node_num) {
if (IsReportTimeout(g_resNodeStat[ind].reportInter)) {
write_runlog(logLevel, "recv node(%u) agent report res status msg timeout.\n", nodeId);
return MAX_CLUSTER_STATUS_UNAVAIL;
}
if (ind < GetResNodeCount()) {
if (g_resNodeStat[ind].isAvail != MAX_CLUSTER_STATUS_AVAIL) {
write_runlog(logLevel, "node(%u) stat (%s).\n", nodeId, GetClusterResStatStr(g_resNodeStat[ind].isAvail));
}
@ -227,77 +313,55 @@ MaxClusterResStatus GetResNodeStat(uint32 nodeId, int logLevel)
return MAX_CLUSTER_STATUS_UNKNOWN;
}
void SetResStatReportInter(uint32 nodeId)
static bool8 IsResInstStatChange(uint32 cmInstId, uint32 recvStat, CmResStatList *oldResStat, uint32 *changeInd)
{
if (g_resNodeStat == NULL) {
return;
}
uint32 ind = FindNodeReportResInterByNodeId(nodeId);
if (ind < g_node_num) {
++g_resNodeStat[ind].reportInter;
} else {
WriteGetResNodeStatErrLog(nodeId);
for (uint32 i = 0; i < oldResStat->status.instanceCount; ++i) {
if (cmInstId != oldResStat->status.resStat[i].cmInstanceId) {
continue;
}
if (!IsOneCusResStatValid(recvStat)) {
if (oldResStat->status.resStat[i].status != (uint32)CM_RES_STAT_UNKNOWN) {
write_runlog(LOG, "recv inst(%u) invalid res status(%u), exceed (%u)s, will set it unknown.\n",
cmInstId, recvStat, g_agentNetworkTimeout);
}
return CM_FALSE;
}
uint32 newStat = ResCheckResultTransToResStat(recvStat);
uint32 oldStat = oldResStat->status.resStat[i].status;
if (oldStat != newStat) {
write_runlog(LOG, "inst(%u)'s old status(%u) change to new status(%u).\n", cmInstId, oldStat, newStat);
(*changeInd) = i;
return CM_TRUE;
} else {
return CM_FALSE;
}
}
write_runlog(ERROR, "res(%s)'s inst(%u) not exist.\n", oldResStat->status.resName, cmInstId);
return CM_FALSE;
}
uint32 GetResStatReportInter(uint32 nodeId)
static void ProcessOneResInstStatReport(CmResourceStatus *newStat)
{
if (g_resNodeStat == NULL) {
return 0;
}
uint32 ind = FindNodeReportResInterByNodeId(nodeId);
if (ind < g_node_num) {
return g_resNodeStat[ind].reportInter;
}
WriteGetResNodeStatErrLog(nodeId);
return 0;
}
newStat->resName[CM_MAX_RES_NAME - 1] = '\0';
static uint32 GetResInstStat(uint32 recvStat)
{
if (recvStat == CUS_RES_CHECK_STAT_ONLINE) {
return (uint32)CM_RES_STAT_ONLINE;
}
if (recvStat == CUS_RES_CHECK_STAT_OFFLINE) {
return (uint32)CM_RES_STAT_OFFLINE;
}
if (recvStat == CUS_RES_CHECK_STAT_ABNORMAL) {
return (uint32)CM_RES_STAT_ONLINE;
}
return (uint32)CM_RES_STAT_UNKNOWN;
}
static void ProcessOneResInstStatReport(CmResourceStatus *stat)
{
uint32 index = 0;
stat->resName[CM_MAX_RES_NAME - 1] = '\0';
if (GetGlobalResStatusIndex(stat->resName, index) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] %s, unknown the resName(%s).\n", __func__, stat->resName);
if (GetGlobalResStatusIndex(newStat->resName, index) != CM_SUCCESS) {
write_runlog(ERROR, "%s, unknown the resName(%s).\n", __func__, newStat->resName);
return;
}
if (stat->workStatus == RES_INST_WORK_STATUS_UNKNOWN) {
write_runlog(LOG, "[CLIENT] node(%u) had restart.\n", stat->nodeId);
}
uint32 newInstStat = GetResInstStat(stat->status);
CmResStatList *resStat = &g_resStatus[index];
(void)pthread_rwlock_wrlock(&(resStat->rwlock));
bool isChanged = IsResStatusChanged(stat->cmInstanceId, newInstStat, resStat);
uint32 changeInd = 0;
bool8 isChanged = IsResInstStatChange(newStat->cmInstanceId, newStat->status, &g_resStatus[index], &changeInd);
if (isChanged) {
++resStat->status.version;
ProcessReportResChangedMsg(true, resStat->status);
}
(void)pthread_rwlock_unlock(&(resStat->rwlock));
(void)pthread_rwlock_wrlock(&(g_resStatus[index].rwlock));
g_resStatus[index].status.resStat[changeInd].status = ResCheckResultTransToResStat(newStat->status);
++(g_resStatus[index].status.version);
OneResStatList resStat = g_resStatus[index].status;
(void)pthread_rwlock_unlock(&(g_resStatus[index].rwlock));
if (isChanged) {
(void)pthread_rwlock_rdlock(&(resStat->rwlock));
SaveOneResStatusToDdb(&resStat->status);
(void)pthread_rwlock_unlock(&(resStat->rwlock));
PrintCusInfoResList(&resStat->status, __FUNCTION__);
write_runlog(LOG, "[CLIENT] [%u:%u] res(%s) changed\n", stat->nodeId, stat->cmInstanceId, stat->resName);
ProcessReportResChangedMsg(true, &resStat);
PrintCusInfoResList(&resStat, __FUNCTION__);
}
}
@ -319,8 +383,29 @@ void ProcessRequestResStatusListMsg(MsgRecvInfo* recvMsgInfo)
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&(g_resStatus[i].rwlock));
errno_t rc = memcpy_s(&sendMsg.resList, sizeof(OneResStatList), &g_resStatus[i].status, sizeof(OneResStatList));
(void)pthread_rwlock_unlock(&(g_resStatus[i].rwlock));
securec_check_errno(rc, (void)rc);
(void)pthread_rwlock_unlock(&(g_resStatus[i].rwlock));
(void)RespondMsg(recvMsgInfo, 'S', (char*)(&sendMsg), sizeof(CmsReportResStatList));
}
}
void ProcessRequestLatestResStatusListMsg(MsgRecvInfo *recvMsgInfo, RequestLatestStatList *recvMsg)
{
CmsReportResStatList sendMsg = {0};
sendMsg.msgType = (int)MSG_CM_AGENT_RES_STATUS_LIST;
errno_t rc;
for (uint32 i = 0; i < CusResCount(); ++i) {
if (g_resStatus[i].status.version == recvMsg->statVersion[i]) {
continue;
}
(void)pthread_rwlock_rdlock(&(g_resStatus[i].rwlock));
rc = memcpy_s(&sendMsg.resList, sizeof(OneResStatList), &g_resStatus[i].status, sizeof(OneResStatList));
securec_check_errno(rc, (void)rc);
(void)pthread_rwlock_unlock(&(g_resStatus[i].rwlock));
(void)RespondMsg(recvMsgInfo, 'S', (char*)(&sendMsg), sizeof(CmsReportResStatList));
}
}
@ -450,6 +535,10 @@ static ClientError CmResLock(const CmaToCmsResLock *lockMsg)
lockMsg->resName, lockMsg->lockName, lockMsg->cmInstId);
return CM_RES_CLIENT_CANNOT_DO;
}
if (!CanProcessResStatus()) {
write_runlog(LOG, "[%s], res status list invalid, can't continue.\n", __FUNCTION__);
return CM_RES_CLIENT_CANNOT_DO;
}
if (!IsOneResInstWork(lockMsg->resName, lockMsg->cmInstId)) {
write_runlog(ERROR, "[CLIENT] res(%s) inst(%u) has been get out of cluster, can't do lock.\n",
lockMsg->resName, lockMsg->cmInstId);
@ -541,6 +630,11 @@ static ClientError TransLockOwner(const CmaToCmsResLock *lockMsg)
resName, lockName, curLockOwner, resInstId);
return CM_RES_CLIENT_CANNOT_DO;
}
if (!CanProcessResStatus()) {
write_runlog(LOG, "[%s], res status list invalid, can't continue.\n", __FUNCTION__);
return CM_RES_CLIENT_CANNOT_DO;
}
if (!IsOneResInstWork(resName, newLockOwner)) {
write_runlog(LOG, "[CLIENT] res(%s) inst(%u) get out of cluster, can't be lockOwner.\n", resName, newLockOwner);
return CM_RES_CLIENT_CANNOT_DO;
@ -602,8 +696,8 @@ static inline void CopyResStatusToSendMsg(OneResStatList *sendStat, CmResStatLis
{
(void)pthread_rwlock_rdlock(&saveStat->rwlock);
errno_t rc = memcpy_s(sendStat, sizeof(OneResStatList), &saveStat->status, sizeof(OneResStatList));
(void)pthread_rwlock_unlock(&saveStat->rwlock);
securec_check_errno(rc, (void)rc);
(void)pthread_rwlock_unlock(&saveStat->rwlock);
}
void ProcessResInstanceStatusMsg(MsgRecvInfo* recvMsgInfo, const CmsToCtlGroupResStatus *queryStatusPtr)
@ -630,19 +724,16 @@ void ProcessQueryOneResInst(MsgRecvInfo* recvMsgInfo, const QueryOneResInstStat
uint32 destInstId = queryMsg->instId;
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
for (uint32 j = 0; j < g_resStatus[i].status.instanceCount; ++j) {
if (g_resStatus[i].status.resStat[j].cmInstanceId != destInstId) {
continue;
}
errno_t rc = memcpy_s(&ackMsg.instStat, sizeof(CmResStatInfo),
&g_resStatus[i].status.resStat[j], sizeof(CmResStatInfo));
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
ackMsg.instStat = g_resStatus[i].status.resStat[j];
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
securec_check_errno(rc, (void)rc);
(void)RespondMsg(recvMsgInfo, 'S', (char*)&(ackMsg), sizeof(ackMsg), DEBUG5);
return;
}
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
}
write_runlog(ERROR, "unknown res instId(%u).\n", destInstId);
}
@ -690,7 +781,7 @@ void ResetResNodeStat()
if (g_resNodeStat == NULL) {
return;
}
for (uint32 i = 0; i < g_node_num; ++i) {
for (uint32 i = 0; i < GetResNodeCount(); ++i) {
g_resNodeStat[i].isAvail = MAX_CLUSTER_STATUS_INIT;
}
}

View File

@ -34,6 +34,7 @@
#include "cms_write_dynamic_config.h"
#include "cms_barrier_check.h"
#include "cms_arbitrate_cluster.h"
#include "cms_cus_res.h"
#include "cms_threads.h"
static const int GET_DORADO_IP_TIMES = 3;
@ -465,6 +466,19 @@ static void CreateArbitrateClusterThread()
}
}
static void CreateUpdateResStatusListThread()
{
int err;
pthread_t thrId;
if (!IsCusResExist()) {
return;
}
if ((err = pthread_create(&thrId, NULL, UpdateResStatusListMain, NULL)) != 0) {
write_runlog(ERROR, "Failed to create CreateArbitrateClusterThread: error %d\n", err);
}
}
status_t CmsCreateThreads()
{
#ifdef ENABLE_MULTIPLE_NODES
@ -494,6 +508,7 @@ status_t CmsCreateThreads()
CreateDealGlobalBarrierThread();
CreateDoradoCheckThread();
CreateArbitrateClusterThread();
CreateUpdateResStatusListThread();
return CM_SUCCESS;
}

View File

@ -50,6 +50,7 @@ const uint32 CM_MAX_VIP_COUNT = 16;
#define CUS_RES_CHECK_STAT_UNKNOWN 2
#define CUS_RES_CHECK_STAT_ABNORMAL 3
#define CUS_RES_CHECK_STAT_TIMEOUT 137
#define CUS_RES_CHECK_STAT_FAILED 255 // -1
#define CUS_RES_START_FAIL_DEPEND_NOT_ALIVE 6
@ -115,8 +116,7 @@ bool IsCusResExist();
const char *GetIsregStatus(int isreg);
void PrintCusInfoResList(const OneResStatList *status, const char *info);
bool8 IsDatanodeSSMode();
const char* ResConfDefValue(const char *param);
int ResConfMinValue(const char *param);
int ResConfMaxValue(const char *param);
uint32 GetResNodeCount();
uint32 GetResNodeId(uint32 index);
#endif // CM_CM_MISC_RES_H

View File

@ -211,7 +211,7 @@ typedef enum CM_MessageType_st {
MSG_CM_AGENT_REPORT_RES_DATA = 131,
MSG_AGENT_CM_REQUEST_RES_STATUS_LIST = 132,
MSG_AGENT_CM_SET_INSTANCE_DATA = 133,
MSG_AGENT_CM_GET_LATEST_STATUS_LIST = 133,
MSG_AGENT_CM_SET_RES_DATA = 134,
MSG_AGENT_CM_GET_RES_DATA = 135,
@ -2079,12 +2079,10 @@ typedef struct RequestResStatListSt {
int msgType;
} RequestResStatList;
typedef struct CmaToCmsRegResultSt {
int32 msgType;
char resName[CM_MAX_RES_NAME];
uint32 cmInstId;
uint32 workStat; // 0:unreg 1:reg
} CmaToCmsRegResult;
typedef struct RequestLatestStatListSt {
int msgType;
unsigned long long statVersion[CM_MAX_RES_COUNT];
} RequestLatestStatList;
typedef struct ResInstIsregSt {
uint32 cmInstId;

View File

@ -41,9 +41,9 @@ bool IsCmInstIdInCheckList(uint32 nodeId, uint32 cmInstId);
bool IsRecvCheckListMiss(uint32 nodeId, uint32 *checkList, uint32 checkCount);
bool IsRecvIsregStatValid(int stat);
void SaveOneResStatusToDdb(const OneResStatList *oneResStat);
void GetOneResStatusFromDdb(OneResStatList *resStat);
void GetAllResStatusFromDdb();
status_t SaveOneResStatusToDdb(const OneResStatList *oneResStat);
status_t GetOneResStatusFromDdb(OneResStatList *resStat);
status_t GetAllResStatusFromDdb();
void NotifyCmaDoReg(uint32 destNodeId);
void NotifyCmaDoUnreg(uint32 destNodeId);

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2023 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cms_cus_res.h
*
*
* IDENTIFICATION
* include/cm/cm_server/cms_cus_res.h
*
* -------------------------------------------------------------------------
*/
#ifndef CMS_CUS_RES_H
#define CMS_CUS_RES_H
bool8 CanProcessResStatus();
void *UpdateResStatusListMain(void *arg);
#endif // CMS_CUS_RES_H

View File

@ -120,17 +120,18 @@ void ProcessCtl2CmOneInstanceBarrierQueryMsg(
void ProcessGetDnSyncListMsg(AgentToCmserverDnSyncList *agentDnSyncList);
#endif
void ProcessAgent2CmResStatReportMsg(ReportResStatus *resStatusPtr);
void ProcessReportResChangedMsg(bool notifyClient, const OneResStatList &status);
void SetResStatReportInter(uint32 nodeId);
uint32 GetResStatReportInter(uint32 nodeId);
void ProcessReportResChangedMsg(bool notifyClient, const OneResStatList *status);
void IncreaseOneResInstReportInter(const char *resName, uint32 instId);
uint32 GetOneResInstReportInter(const char *resName, uint32 instId);
int GetCurAz();
uint32 GetPrimaryDnIndex(void);
status_t InitNodeReportResStatInter();
void InitNodeReportVar();
MaxClusterResStatus GetResNodeStat(uint32 nodeId, int logLevel);
void ProcessCtlToCmReloadMsg(MsgRecvInfo* recvMsgInfo);
void ProcessCtlToCmExecDccCmdMsg(MsgRecvInfo* recvMsgInfo, ExecDdbCmdMsg *msg);
void ProcessRequestResStatusListMsg(MsgRecvInfo* recvMsgInfo);
void ProcessRequestLatestResStatusListMsg(MsgRecvInfo *recvMsgInfo, RequestLatestStatList *recvMsg);
void ProcessCltSendOper(MsgRecvInfo* recvMsgInfo, CltSendDdbOper *ddbOper);
void ProcessSslConnRequest(MsgRecvInfo* recvMsgInfo, const AgentToCmConnectRequest *requestMsg);
void ProcessSharedStorageMsg(MsgRecvInfo* recvMsgInfo);