cm新增支持failover触发器时传入静态主id

This commit is contained in:
chen.zhu 2024-03-27 15:01:54 +08:00
parent a7ad16c4e3
commit 6b034ab905
6 changed files with 31 additions and 8 deletions

View File

@ -88,7 +88,7 @@ char *g_eventTriggers[EVENT_COUNT] = {NULL};
static const uint32 MAX_MSG_BUF_POOL_SIZE = 102400;
static const uint32 MAX_MSG_BUF_POOL_COUNT = 200;
static const int32 INVALID_ID = -1;
/* unify log style */
void create_system_call_log(void);
int check_one_instance_status(const char *processName, const char *cmdLine, int *isPhonyDead);
@ -2100,15 +2100,22 @@ void GetEventTrigger()
ParseEventTriggers(eventTriggerString);
}
void ExecuteEventTrigger(const EventTriggerType triggerType)
void ExecuteEventTrigger(const EventTriggerType triggerType, int32 staPrimId)
{
if (g_eventTriggers[triggerType] == NULL) {
return;
}
write_runlog(LOG, "Event trigger %s was triggered.\n", triggerTypeStringMap[triggerType].typeStr);
char execTriggerCmd[MAX_COMMAND_LEN] = {0};
int rc = snprintf_s(execTriggerCmd, MAX_COMMAND_LEN, MAX_COMMAND_LEN - 1,
int rc;
if (staPrimId != INVALID_ID && triggerType == EVENT_FAILOVER) {
rc = snprintf_s(execTriggerCmd, MAX_COMMAND_LEN, MAX_COMMAND_LEN - 1,
SYSTEMQUOTE "%s %d >> %s 2>&1 &" SYSTEMQUOTE, g_eventTriggers[triggerType], staPrimId, system_call_log);
}
else {
rc = snprintf_s(execTriggerCmd, MAX_COMMAND_LEN, MAX_COMMAND_LEN - 1,
SYSTEMQUOTE "%s >> %s 2>&1 &" SYSTEMQUOTE, g_eventTriggers[triggerType], system_call_log);
}
securec_check_intval(rc, (void)rc);
write_runlog(LOG, "event trigger command: \"%s\".\n", execTriggerCmd);
RunCmd(execTriggerCmd);

View File

@ -443,7 +443,7 @@ void GetDnFailoverCommand(char *command, uint32 cmdLen, const char *dataDir, uin
securec_check_intval(rc, (void)rc);
}
static void process_failover_command(const char* dataDir, int instanceType, uint32 instance_id, uint32 term)
static void process_failover_command(const char* dataDir, int instanceType, uint32 instance_id, uint32 term, int32 staPrimId)
{
char command[MAXPGPATH];
errno_t rc;
@ -501,7 +501,7 @@ static void process_failover_command(const char* dataDir, int instanceType, uint
RunCmd(command);
if (instanceType == INSTANCE_TYPE_DATANODE) {
ExecuteEventTrigger(EVENT_FAILOVER);
ExecuteEventTrigger(EVENT_FAILOVER, staPrimId);
}
return;
@ -1410,6 +1410,7 @@ static void MsgCmAgentFailover(const AgentMsgPkg* msg, char *dataPath, const cm_
return;
}
uint32 term = msgTypeFailoverPtr->term;
int32 staPrimId = msgTypeFailoverPtr->staPrimId;
ret = FindInstancePathAndType(
msgTypeFailoverPtr->node, msgTypeFailoverPtr->instanceId, dataPath, &instanceType);
if (ret != 0) {
@ -1419,7 +1420,7 @@ static void MsgCmAgentFailover(const AgentMsgPkg* msg, char *dataPath, const cm_
msgTypeFailoverPtr->instanceId);
return;
}
process_failover_command(dataPath, instanceType, msgTypeFailoverPtr->instanceId, term);
process_failover_command(dataPath, instanceType, msgTypeFailoverPtr->instanceId, term, staPrimId);
}
static void MsgCmAgentBuild(const AgentMsgPkg* msg, char *dataPath, const cm_msg_type* msgTypePtr)

View File

@ -1394,6 +1394,17 @@ static bool InstanceForceFinishRedo(DnArbCtx *ctx)
return false;
}
static void SetFailoverMsgStaPriID(DnArbCtx *ctx, cm_to_agent_failover* failover_msg_ptr) {
ArbiCond *cond = &(ctx->cond);
if (cond->staticPriIdx != INVALID_INDEX) {
cm_instance_role_status *role = ctx->roleGroup->instanceMember;
failover_msg_ptr->staPrimId = role[cond->staticPriIdx].instanceId;
}
else {
failover_msg_ptr->staPrimId = INVALID_INDEX;
}
}
static bool InstanceForceFailover(DnArbCtx *ctx)
{
bool res = InstanceForceFinishRedo(ctx);
@ -1410,6 +1421,7 @@ static bool InstanceForceFailover(DnArbCtx *ctx)
if (cond->candiIdx == ctx->memIdx && CanFailoverDn(isMajority) &&
cond->redoDone > HALF_COUNT(cond->vaildCount)) {
cm_to_agent_failover failoverMsg;
SetFailoverMsgStaPriID(ctx, &failoverMsg);
send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, ctx->memIdx, &failoverMsg);
write_runlog(LOG, "[ForceFailover], line %d: Redo done, non force failover message sent to instance %u, "
"requested by cm_ctl, arbitrate_time=%u\n", __LINE__, ctx->instId, cond->maxMemArbiTime);
@ -1764,6 +1776,7 @@ static void SendFailoverMsg(DnArbCtx *ctx, uint32 arbitInterval, bool isStaPrim,
ctx->repGroup->time = 0;
ClearDnArbiCond(ctx->groupIdx, CLEAR_ARBI_TIME);
cm_to_agent_failover failoverMsg;
SetFailoverMsgStaPriID(ctx, &failoverMsg);
if ((!cond->instMainta && !IsSyncListEmpty(ctx->groupIdx, ctx->instId, ctx->maintaMode)) || isStaPrim) {
GroupStatusShow(sfMsg->tyName, ctx->groupIdx, ctx->instId, cond->vaildCount, cond->finishRedo);
send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, ctx->memIdx, &failoverMsg);
@ -1922,6 +1935,7 @@ static void SendFailoverInQuarmBackup(DnArbCtx *ctx)
cm_to_agent_failover failoverMsg;
if (!cond->instMainta || ctx->localRole->role == INSTANCE_ROLE_PRIMARY) {
GroupStatusShow(sfMsg.tyName, ctx->groupIdx, ctx->instId, cond->vaildCount, cond->finishRedo);
SetFailoverMsgStaPriID(ctx, &failoverMsg);
send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, ctx->memIdx, &failoverMsg);
ctx->repGroup->lastFailoverDn = ctx->instId;
write_runlog(LOG, "%s, line %d: Failover message has sent to instance %u, %s.\n",

View File

@ -246,7 +246,7 @@ extern pthread_rwlock_t g_datanodesFailoverLock;
extern pthread_rwlock_t g_gtmsFailoverLock;
extern int g_gtmMode;
extern char *g_eventTriggers[EVENT_COUNT];
extern void ExecuteEventTrigger(const EventTriggerType triggerType);
extern void ExecuteEventTrigger(const EventTriggerType triggerType, int32 staPrimId = -1);
extern int node_match_find(const char *node_type, const char *node_port, const char *node_host, const char *node_port1,
const char *node_host1, int *node_index, int *instance_index, int *inode_type);

View File

@ -40,7 +40,7 @@ void *ProcessRecvCmsMsgMain(void *arg);
extern void process_notify_command(const char* data_dir, int instance_type, int role, uint32 term);
extern void process_restart_command(const char* data_dir, int instance_type);
extern int FindInstancePathAndType(uint32 node, uint32 instanceId, char* data_path, int* instance_type);
extern void process_failover_command(const char* dataDir, int instance_type, uint32 instance_id, uint32 term);
extern void process_failover_command(const char* dataDir, int instance_type, uint32 instance_id, uint32 term, int32 staPrimId);
extern void process_rep_most_available_command(const char* dataDir, int instance_type);
extern void process_heartbeat_command(int cluster_status);
#endif

View File

@ -660,6 +660,7 @@ typedef struct cm_to_agent_failover_st {
uint32 instanceId;
int instance_type;
int wait_seconds;
int32 staPrimId;
uint32 term;
} cm_to_agent_failover;