Merge pull request !198 from zhangxubo/5.0.0
This commit is contained in:
opengauss_bot 2024-04-23 09:30:02 +00:00 committed by Gitee
commit 2c01dc47b6
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
12 changed files with 18 additions and 92 deletions

View File

@ -88,7 +88,7 @@ char *g_eventTriggers[EVENT_COUNT] = {NULL};
static const uint32 MAX_MSG_BUF_POOL_SIZE = 102400;
static const uint32 MAX_MSG_BUF_POOL_COUNT = 200;
static const int32 INVALID_ID = -1;
/* unify log style */
void create_system_call_log(void);
int check_one_instance_status(const char *processName, const char *cmdLine, int *isPhonyDead);
@ -2100,22 +2100,15 @@ void GetEventTrigger()
ParseEventTriggers(eventTriggerString);
}
void ExecuteEventTrigger(const EventTriggerType triggerType, int32 staPrimId)
void ExecuteEventTrigger(const EventTriggerType triggerType)
{
if (g_eventTriggers[triggerType] == NULL) {
return;
}
write_runlog(LOG, "Event trigger %s was triggered.\n", triggerTypeStringMap[triggerType].typeStr);
char execTriggerCmd[MAX_COMMAND_LEN] = {0};
int rc;
if (staPrimId != INVALID_ID && triggerType == EVENT_FAILOVER) {
rc = snprintf_s(execTriggerCmd, MAX_COMMAND_LEN, MAX_COMMAND_LEN - 1,
SYSTEMQUOTE "%s %d >> %s 2>&1 &" SYSTEMQUOTE, g_eventTriggers[triggerType], staPrimId, system_call_log);
}
else {
rc = snprintf_s(execTriggerCmd, MAX_COMMAND_LEN, MAX_COMMAND_LEN - 1,
int rc = snprintf_s(execTriggerCmd, MAX_COMMAND_LEN, MAX_COMMAND_LEN - 1,
SYSTEMQUOTE "%s >> %s 2>&1 &" SYSTEMQUOTE, g_eventTriggers[triggerType], system_call_log);
}
securec_check_intval(rc, (void)rc);
write_runlog(LOG, "event trigger command: \"%s\".\n", execTriggerCmd);
RunCmd(execTriggerCmd);

View File

@ -443,7 +443,7 @@ void GetDnFailoverCommand(char *command, uint32 cmdLen, const char *dataDir, uin
securec_check_intval(rc, (void)rc);
}
static void process_failover_command(const char* dataDir, int instanceType, uint32 instance_id, uint32 term, int32 staPrimId)
static void process_failover_command(const char* dataDir, int instanceType, uint32 instance_id, uint32 term)
{
char command[MAXPGPATH];
errno_t rc;
@ -501,7 +501,7 @@ static void process_failover_command(const char* dataDir, int instanceType, uint
RunCmd(command);
if (instanceType == INSTANCE_TYPE_DATANODE) {
ExecuteEventTrigger(EVENT_FAILOVER, staPrimId);
ExecuteEventTrigger(EVENT_FAILOVER);
}
return;
@ -1410,7 +1410,6 @@ static void MsgCmAgentFailover(const AgentMsgPkg* msg, char *dataPath, const cm_
return;
}
uint32 term = msgTypeFailoverPtr->term;
int32 staPrimId = msgTypeFailoverPtr->staPrimId;
ret = FindInstancePathAndType(
msgTypeFailoverPtr->node, msgTypeFailoverPtr->instanceId, dataPath, &instanceType);
if (ret != 0) {
@ -1420,7 +1419,7 @@ static void MsgCmAgentFailover(const AgentMsgPkg* msg, char *dataPath, const cm_
msgTypeFailoverPtr->instanceId);
return;
}
process_failover_command(dataPath, instanceType, msgTypeFailoverPtr->instanceId, term, staPrimId);
process_failover_command(dataPath, instanceType, msgTypeFailoverPtr->instanceId, term);
}
static void MsgCmAgentBuild(const AgentMsgPkg* msg, char *dataPath, const cm_msg_type* msgTypePtr)

View File

@ -1322,7 +1322,6 @@ status_t IsReachableIP(char *ip)
if (ip == nullptr) {
return CM_ERROR;
}
char cmd[MAXPGPATH] = {0};
int rc = snprintf_s(cmd, MAXPGPATH, MAXPGPATH - 1, "timeout 2 ping -c 2 %s > /dev/null 2>&1", ip);
securec_check_intval(rc, (void)rc);

View File

@ -74,7 +74,6 @@ agent_network_timeout = 6
dn_arbitrate_mode = quorum
agent_fault_timeout = 60
third_party_gateway_ip = '' # used in 2 nodes cluster for ddb role arbitration with network isolation,
# support multiple iP addresses separated by commas, like '172.0.0.1,172.0.0.2'
# when cms_enable_failover_on2nodes is true.
# default ''. if cms_enable_failover_on2nodes is true, this param must be configured.
cms_enable_failover_on2nodes = false # used in 2 nodes cluster. if true, will use third_party_gateway_ip as an arbitrator,

View File

@ -71,7 +71,6 @@ agent_network_timeout = 6
dn_arbitrate_mode = quorum
delay_arbitrate_max_cluster_timeout = 300 # When resources are in the startup process, delay arbitration of the maximum cluster.
third_party_gateway_ip = '' # used in 2 nodes cluster for ddb role arbitration with network isolation,
# support multiple iP addresses separated by commas, like '172.0.0.1,172.0.0.2'
# when cms_enable_failover_on2nodes is true.
# default ''. if cms_enable_failover_on2nodes is true, this param must be configured.
cms_enable_failover_on2nodes = false # used in 2 nodes cluster. if true, will use third_party_gateway_ip as an arbitrator,

View File

@ -71,7 +71,6 @@ ddb_log_suppress_enable = 1 # Indicates whether to enable the log s
ddb_election_timeout = 3 # DCC election timeout interval [1S,600S]
share_disk_path = ''
third_party_gateway_ip = '' # used in 2 nodes cluster for ddb role arbitration with network isolation,
# support multiple iP addresses separated by commas, like '172.0.0.1,172.0.0.2'
# when cms_enable_failover_on2nodes is true.
# default ''. if cms_enable_failover_on2nodes is true, this param must be configured.
cms_enable_failover_on2nodes = false # used in 2 nodes cluster. if true, will use third_party_gateway_ip as an arbitrator,

View File

@ -1394,17 +1394,6 @@ static bool InstanceForceFinishRedo(DnArbCtx *ctx)
return false;
}
static void SetFailoverMsgStaPriID(DnArbCtx *ctx, cm_to_agent_failover* failover_msg_ptr) {
ArbiCond *cond = &(ctx->cond);
if (cond->staticPriIdx != INVALID_INDEX) {
cm_instance_role_status *role = ctx->roleGroup->instanceMember;
failover_msg_ptr->staPrimId = role[cond->staticPriIdx].instanceId;
}
else {
failover_msg_ptr->staPrimId = INVALID_INDEX;
}
}
static bool InstanceForceFailover(DnArbCtx *ctx)
{
bool res = InstanceForceFinishRedo(ctx);
@ -1421,7 +1410,6 @@ static bool InstanceForceFailover(DnArbCtx *ctx)
if (cond->candiIdx == ctx->memIdx && CanFailoverDn(isMajority) &&
cond->redoDone > HALF_COUNT(cond->vaildCount)) {
cm_to_agent_failover failoverMsg;
SetFailoverMsgStaPriID(ctx, &failoverMsg);
send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, ctx->memIdx, &failoverMsg);
write_runlog(LOG, "[ForceFailover], line %d: Redo done, non force failover message sent to instance %u, "
"requested by cm_ctl, arbitrate_time=%u\n", __LINE__, ctx->instId, cond->maxMemArbiTime);
@ -1776,7 +1764,6 @@ static void SendFailoverMsg(DnArbCtx *ctx, uint32 arbitInterval, bool isStaPrim,
ctx->repGroup->time = 0;
ClearDnArbiCond(ctx->groupIdx, CLEAR_ARBI_TIME);
cm_to_agent_failover failoverMsg;
SetFailoverMsgStaPriID(ctx, &failoverMsg);
if ((!cond->instMainta && !IsSyncListEmpty(ctx->groupIdx, ctx->instId, ctx->maintaMode)) || isStaPrim) {
GroupStatusShow(sfMsg->tyName, ctx->groupIdx, ctx->instId, cond->vaildCount, cond->finishRedo);
send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, ctx->memIdx, &failoverMsg);
@ -1935,7 +1922,6 @@ static void SendFailoverInQuarmBackup(DnArbCtx *ctx)
cm_to_agent_failover failoverMsg;
if (!cond->instMainta || ctx->localRole->role == INSTANCE_ROLE_PRIMARY) {
GroupStatusShow(sfMsg.tyName, ctx->groupIdx, ctx->instId, cond->vaildCount, cond->finishRedo);
SetFailoverMsgStaPriID(ctx, &failoverMsg);
send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, ctx->memIdx, &failoverMsg);
ctx->repGroup->lastFailoverDn = ctx->instId;
write_runlog(LOG, "%s, line %d: Failover message has sent to instance %u, %s.\n",

View File

@ -352,19 +352,10 @@ void GetTwoNodesArbitrateParams(void) {
}
}
if (g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes) {
char tmpIp[MAXPGPATH];
strcpy_s(tmpIp, MAXPGPATH, g_paramsOn2Nodes.thirdPartyGatewayIp);
char *saveptr = NULL;
char *token = strtok_r(tmpIp, ",", &saveptr);
while (token != NULL) {
if (!IsIPAddrValid(token)) {
write_runlog(ERROR, "parameter \"cms_enable_failover_on2nodes\" is true, "
"but parameter \"third_party_gateway_ip\" is invalid, please check!\n");
exit(1);
}
token = strtok_r(NULL, ",", &saveptr);
}
if (g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes == true && !IsIPAddrValid(g_paramsOn2Nodes.thirdPartyGatewayIp)) {
write_runlog(ERROR, "parameter \"cms_enable_failover_on2nodes\" is true, "
"but parameter \"third_party_gateway_ip\" is invalid, please check!\n");
exit(1);
}
g_paramsOn2Nodes.cmsNetworkIsolationTimeout = (uint32)get_int_value_from_config(configDir,

View File

@ -1047,52 +1047,15 @@ static inline void DdbSetDdbWorkMode(ddb_work_mode workMode, unsigned int voteNu
}
}
/*
if reachale is true:
all ip is reachable, return CM_SUCCESS
else return CM_ERROR
if reachale is false:
all ip is not reachable, return CM_SUCCESS
else return CM_ERROR
*/
static status_t CheckAllIpStatus(char *ip, bool reachable) {
if (ip == nullptr) {
return CM_ERROR;
}
char tmpIp[CM_IP_LENGTH];
int rc = -1;
rc = strcpy_s(tmpIp, CM_IP_LENGTH, ip);
securec_check_errno(rc, (void)rc);
char *saveptr = NULL;
char *token = strtok_r(tmpIp, ",", &saveptr);
status_t ret = CM_SUCCESS;
bool flag = false;
while (token != NULL) {
if (reachable && IsReachableIP(token) != CM_SUCCESS) {
ret = CM_ERROR;
break;
}
else if (!reachable && IsReachableIP(token) == CM_SUCCESS) {
ret = CM_ERROR;
break;
}
flag = true;
token = strtok_r(NULL, ",", &saveptr);
}
return flag ? ret : CM_ERROR;
}
static void DdbMinorityWorkModeSetInMajority()
{
uint32 minVoteNum = 1;
if (CheckAllIpStatus(g_paramsOn2Nodes.thirdPartyGatewayIp, true) == CM_SUCCESS) {
// all third party gateway is reachable, setting a small vote num to make sure current node works as primary.
if (IsReachableIP(g_paramsOn2Nodes.thirdPartyGatewayIp) == CM_SUCCESS) {
// third party gateway is reachable, setting a small vote num to make sure current node works as primary.
write_runlog(LOG, "promote node to primary\n");
DdbSetDdbWorkMode(DDB_WORK_MODE_MINORITY, minVoteNum, 0);
} else {
// not all third party gateway is reachable, setting a big vote num to make sure current node works as standby.
// third party gateway is not reachable, setting a big vote num to make sure current node works as standby.
minVoteNum += MAX_VOTE_NUM;
DdbSetDdbWorkMode(DDB_WORK_MODE_MINORITY, minVoteNum, 1);
@ -1116,11 +1079,10 @@ static void DdbMinorityWorkModeSetInMajority()
static void DdbMinorityWorkModeSetInMinority()
{
uint32 minVoteNum = 1;
if (CheckAllIpStatus(g_paramsOn2Nodes.thirdPartyGatewayIp, true) == CM_SUCCESS && g_bigVoteNumInMinorityMode == 1) {
if (IsReachableIP(g_paramsOn2Nodes.thirdPartyGatewayIp) == CM_SUCCESS && g_bigVoteNumInMinorityMode == 1) {
write_runlog(LOG, "reset minority work mode and become primary.\n");
DdbSetDdbWorkMode(DDB_WORK_MODE_MINORITY, minVoteNum, 0);
} else if (CheckAllIpStatus(g_paramsOn2Nodes.thirdPartyGatewayIp, false) == CM_SUCCESS && g_bigVoteNumInMinorityMode == 0) {
// every third party gateway is not reachable, setting a big vote num to make sure current node works as standby.
} else if (IsReachableIP(g_paramsOn2Nodes.thirdPartyGatewayIp) != CM_SUCCESS && g_bigVoteNumInMinorityMode == 0) {
minVoteNum += MAX_VOTE_NUM;
write_runlog(LOG, "reset minority work mode and become standby.\n");
DdbSetDdbWorkMode(DDB_WORK_MODE_MINORITY, minVoteNum, 1);
@ -1137,7 +1099,7 @@ static void DdbMinorityWorkModeSetInMinority()
static void DdbMinorityWorkModeSetInStartup()
{
uint32 minVoteNum = 1;
if (CheckAllIpStatus(g_paramsOn2Nodes.thirdPartyGatewayIp, true) == CM_SUCCESS) {
if (IsReachableIP(g_paramsOn2Nodes.thirdPartyGatewayIp) == CM_SUCCESS) {
write_runlog(LOG, "start up with minority work mode and minVoteNum: %d.\n", minVoteNum);
DdbSetDdbWorkMode(DDB_WORK_MODE_MINORITY, minVoteNum, 0);
} else {

View File

@ -246,7 +246,7 @@ extern pthread_rwlock_t g_datanodesFailoverLock;
extern pthread_rwlock_t g_gtmsFailoverLock;
extern int g_gtmMode;
extern char *g_eventTriggers[EVENT_COUNT];
extern void ExecuteEventTrigger(const EventTriggerType triggerType, int32 staPrimId = -1);
extern void ExecuteEventTrigger(const EventTriggerType triggerType);
extern int node_match_find(const char *node_type, const char *node_port, const char *node_host, const char *node_port1,
const char *node_host1, int *node_index, int *instance_index, int *inode_type);

View File

@ -40,7 +40,7 @@ void *ProcessRecvCmsMsgMain(void *arg);
extern void process_notify_command(const char* data_dir, int instance_type, int role, uint32 term);
extern void process_restart_command(const char* data_dir, int instance_type);
extern int FindInstancePathAndType(uint32 node, uint32 instanceId, char* data_path, int* instance_type);
extern void process_failover_command(const char* dataDir, int instance_type, uint32 instance_id, uint32 term, int32 staPrimId);
extern void process_failover_command(const char* dataDir, int instance_type, uint32 instance_id, uint32 term);
extern void process_rep_most_available_command(const char* dataDir, int instance_type);
extern void process_heartbeat_command(int cluster_status);
#endif

View File

@ -660,7 +660,6 @@ typedef struct cm_to_agent_failover_st {
uint32 instanceId;
int instance_type;
int wait_seconds;
int32 staPrimId;
uint32 term;
} cm_to_agent_failover;