From 5f633d44f415bc55679d0c32c65bae37234dcff3 Mon Sep 17 00:00:00 2001 From: yewk Date: Mon, 18 Sep 2023 20:50:58 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=82=E9=85=8D=E6=96=B0mes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cm_agent/cma_mes.cpp | 91 +++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 42 deletions(-) diff --git a/src/cm_agent/cma_mes.cpp b/src/cm_agent/cma_mes.cpp index c4d3ef7..641000a 100644 --- a/src/cm_agent/cma_mes.cpp +++ b/src/cm_agent/cma_mes.cpp @@ -20,9 +20,9 @@ * * ------------------------------------------------------------------------- */ -#include "cma_mes.h" +#include "mes_interface.h" -#include "mes.h" +#include "cm_debug.h" #include "cm_config.h" #include "cm_elog.h" @@ -37,6 +37,14 @@ #define AGENT_RHB_BUFF_POOL_SIZE (1024) #define AGENT_RHB_CHECK_SID (0) +typedef struct CmaMesMsgHeadT { + uint32 version; + uint32 cmd; // command + char reserved[64]; + uint32 bufSize; + char buf[0]; +} CmaMesMsgHead; // total size is 76 + static pthread_t g_rhbThread; static const uint32 PASSWD_MAX_LEN = 64; @@ -123,9 +131,6 @@ static void InitTaskCmdGroup(mes_profile_t *pf) pf->task_group[MES_TASK_GROUP_TWO] = 0; pf->task_group[MES_TASK_GROUP_THREE] = 0; - for (uint8 i = (uint8)RHB_MSG_BEGIN; i < (uint8)RHB_MSG_CEIL; i++) { - mes_set_command_task_group(i, MES_TASK_GROUP_ZERO); - } } static void InitBuffPool(mes_profile_t *pf) @@ -147,7 +152,7 @@ static void initPfile(mes_profile_t *pf, const RhbCtx *ctx) pf->mes_elapsed_switch = 0; pf->inst_cnt = ctx->instCount; - error_t rc = memcpy_s( + errno_t rc = memcpy_s( pf->inst_net_addr, sizeof(mes_addr_t) * MES_MAX_INSTANCES, ctx->instAddrs, sizeof(mes_addr_t) * MAX_RHB_NUM); securec_check_errno(rc, (void)rc); @@ -218,7 +223,7 @@ static void LogCallBack(int logType, int logLevel, const char *codeFileName, uns va_end(ap); } -typedef void (*CmMesMsgProc)(mes_message_t *mgs); +typedef void (*CmMesMsgProc)(mes_msg_t *mgs); typedef struct ProcessorFunc_ { RhbMsgCmd cmd; @@ -242,35 +247,45 @@ void GetHbs(time_t *hbs, unsigned int *hwl) securec_check_errno(rc, (void)rc); } -void CmaHdlRhbReq(mes_message_t *msg) +void CmaHdlRhbReq(mes_msg_t *msg) { - write_runlog(DEBUG1, "[RHB] receive a hb msg from inst[%hhu]!\n", msg->head->src_inst); - if (msg->head->src_inst < g_curNodeHb.hwl) { - g_curNodeHb.hbs[msg->head->src_inst] = time(NULL); + write_runlog(DEBUG1, "[RHB] receive a hb msg from inst[%hhu]!\n", msg->src_inst); + if (msg->src_inst < g_curNodeHb.hwl) { + g_curNodeHb.hbs[msg->src_inst] = time(NULL); } } -void CmaHdlRhbAck(mes_message_t *msg) -{ - mes_notify_broadcast_msg_recv_and_release(msg); -} - static const ProcessorFunc g_processors[RHB_MSG_CEIL] = { {RHB_MSG_HB_BC, CmaHdlRhbReq, CM_FALSE, "handle cma rhb broadcast message"}, }; -void MesMsgProc(uint32 workThread, mes_message_t *msg) +void MesMsgProc(unsigned int work_idx, ruid_type ruid, mes_msg_t *msg) { - mes_message_head_t *head = msg->head; - if (head->cmd >= (uint8)RHB_MSG_CEIL) { - write_runlog(ERROR, "unknow cmd(%hhu) from inst:[%hhu], size:[%hu]!\n", head->cmd, head->src_inst, head->size); - return; - } + do { + if (msg == NULL || msg->buffer == NULL) { + write_runlog(ERROR, "invaild msg, when msg or buffer is null.\n"); + break; + } - const ProcessorFunc *processor = &g_processors[head->cmd]; + if (msg->size < sizeof(CmaMesMsgHead)) { + write_runlog(ERROR, "unknown msg head from inst:[%u], size:[%u].\n", msg->src_inst, msg->size); + break; + } - processor->proc(msg); - mes_release_message_buf(msg); + CmaMesMsgHead *head = (CmaMesMsgHead *)msg->buffer; + if (head->cmd >= (uint32)RHB_MSG_CEIL) { + write_runlog(ERROR, "unknow cmd(%hhu) from inst:[%hhu], size:[%hu]!\n", + head->cmd, msg->src_inst, head->bufSize); + break; + } + + const ProcessorFunc *processor = &g_processors[head->cmd]; + + CM_ASSERT(processor->proc != NULL); + + processor->proc(msg); + } while (0); + mes_release_msg(msg); } status_t CmaRhbInit(const RhbCtx *ctx) @@ -314,10 +329,6 @@ status_t CmaRhbInit(const RhbCtx *ctx) write_runlog(WARNING, "mes ssl not enable!.\n"); } - for (uint32 i = (uint32)RHB_MSG_BEGIN; i < (uint32)RHB_MSG_CEIL; i++) { - mes_set_msg_enqueue((uint32)g_processors[i].cmd, (uint32)g_processors[i].isEnqueue); - } - status_t ret = (status_t)mes_init(&pf); if (ret != CM_SUCCESS) { write_runlog(ERROR, "mes init failed!.\n"); @@ -328,10 +339,11 @@ status_t CmaRhbInit(const RhbCtx *ctx) return CM_SUCCESS; } -static void InitMsgHead(mes_message_head_t *head, const RhbCtx *ctx) +static void InitMsgHead(CmaMesMsgHead *head, const RhbCtx *ctx) { - MES_INIT_MESSAGE_HEAD(head, RHB_MSG_HB_BC, 0, ctx->instId, 0, ctx->sid, 0xFFFF); - head->size = sizeof(mes_message_head_t); + head->version = 0; + head->cmd = (uint32)RHB_MSG_HB_BC; + head->bufSize = 0; } static void checkMesSslCertExpire() @@ -369,11 +381,9 @@ void *CmaRhbMain(void *args) (void)atexit(CmaRhbUnInit); write_runlog(LOG, "RHB check is ready to work!\n"); - mes_message_head_t head = {0}; + CmaMesMsgHead head = {0}; InitMsgHead(&head, &ctx); - uint64 succInsts = 0; - - uint64 bcInsts = ctx.instMap & (~((uint64)0x1 << (ctx.instId))); + int32 ret = 0; int itv = 0; struct timespec curTime = {0, 0}; struct timespec lastTime = {0, 0}; @@ -391,12 +401,9 @@ void *CmaRhbMain(void *args) } write_runlog(DEBUG1, "RHB broadcast hb to all nodes.!\n"); - mes_broadcast(ctx.sid, bcInsts, &head, &succInsts); - if (bcInsts != succInsts) { - write_runlog(DEBUG1, - "bc not all success, send idx:[%llu], success status:[%llu]!\n", - (long long unsigned int)bcInsts, - (long long unsigned int)succInsts); + ret = mes_broadcast(0, (char*)&head, sizeof(CmaMesMsgHead)); + if (ret != 0) { + write_runlog(DEBUG1, "bc not all success, ret=%d.\n", ret); } const int printItv = 5;