!101 CM集群支持两节点部署

Merge pull request !101 from amoswooo/enmotech_2nodes_cluster
This commit is contained in:
opengauss-bot 2023-03-02 07:48:49 +00:00 committed by Gitee
commit 4326fb8671
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
41 changed files with 884 additions and 21 deletions

View File

@ -640,7 +640,12 @@ status_t DrvExecDccCmd(DrvCon_t session, char *cmdLine, char *output, int *outpu
if (outputLen != NULL) {
*outputLen = static_cast<int>(getText.len);
}
write_runlog(LOG, "Success to exec dcc cmd(%s).\n", cmdLine);
if (g_cmServerNum != ONE_PRIMARY_ONE_STANDBY) {
write_runlog(LOG, "Success to exec dcc cmd(%s).\n", cmdLine);
} else {
write_runlog(DEBUG5, "Success to exec dcc cmd(%s).\n", cmdLine);
}
return CM_SUCCESS;
}
@ -987,6 +992,28 @@ static status_t DrvDccStop(bool *ddbStop)
return CM_ERROR;
}
status_t DrvDccSetWorkMode(DrvCon_t session, unsigned int workMode, unsigned int voteNum)
{
int32 res = 0;
res = srv_dcc_set_work_mode((dcc_work_mode_t)workMode, voteNum);
if (res != CM_SUCCESS) {
write_runlog(ERROR, "set work mode failed. %d \n", res);
return CM_ERROR;
}
return CM_SUCCESS;
}
status_t DrvDccDemoteDdbRole(DrvCon_t session)
{
int32 res = 0;
res = srv_dcc_demote_follower();
if (res != CM_SUCCESS) {
write_runlog(ERROR, "dcc demote follower failed. %d \n", res);
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t DccLoadApi(const DrvApiInfo *apiInfo)
{
DdbDriver *drv = DrvDccGet();
@ -1011,6 +1038,8 @@ static status_t DccLoadApi(const DrvApiInfo *apiInfo)
drv->setBlocked = DrvDccSetBlocked;
drv->setParam = DrvDccSetParam;
drv->stop = DrvDccStop;
drv->setWorkMode = DrvDccSetWorkMode;
drv->demoteDdbRole = DrvDccDemoteDdbRole;
g_cmServerNum = apiInfo->nodeNum;
status_t st = StartDccProcess(apiInfo);
if (st != CM_SUCCESS) {

View File

@ -544,3 +544,25 @@ status_t DdbStop(DdbConn *ddbConn)
ComputTimeInDdb(&checkBegin, msg, ddbConn->modId);
return st;
}
status_t DdbSetWorkMode(DdbConn *ddbConn, unsigned int workMode, unsigned int voteNum)
{
status_t ret = CM_ERROR;
CM_RETURN_IFERR(CheckDdbSession(ddbConn, __FUNCTION__));
DdbSetRunning(ddbConn);
CHECK_DB_SESSION_AND_STOPPED(ddbConn, CM_ERROR);
ret = ddbConn->drv->setWorkMode(ddbConn->session, workMode, voteNum);
DdbSetIdle(ddbConn);
return ret;
}
status_t DdbDemoteRole2Standby(DdbConn *ddbConn)
{
status_t ret = CM_ERROR;
CM_RETURN_IFERR(CheckDdbSession(ddbConn, __FUNCTION__));
DdbSetRunning(ddbConn);
CHECK_DB_SESSION_AND_STOPPED(ddbConn, CM_ERROR);
ret = ddbConn->drv->demoteDdbRole(ddbConn->session);
DdbSetIdle(ddbConn);
return ret;
}

View File

@ -230,6 +230,12 @@ int DatanodeStatusCheck(DnStatus *dnStatus, uint32 dataNodeIndex, int32 dnProces
/* in case we return 0 without set the db_state. */
reportMsg->local_status.db_state = INSTANCE_HA_STATE_UNKONWN;
if (strcmp(g_dbServiceVip, "") != 0) {
reportMsg->dnVipStatus = IsReachableIP(g_dbServiceVip);
} else {
reportMsg->dnVipStatus = CM_ERROR;
}
if (g_dnConn[dataNodeIndex] == NULL) {
rcs = snprintf_s(gaussdbStatePath, MAXPGPATH, MAXPGPATH - 1, "%s/gaussdb.state", dataPath);
securec_check_intval(rcs, (void)rcs);

View File

@ -44,6 +44,8 @@ agent_rhb_interval = 1000 # the heatbeat of
enable_ssl = on # enable cma to cma ssl
ssl_cert_expire_alert_threshold = 90
ssl_cert_expire_check_interval = 86400
db_service_vip = '' # db primary virtual ip address
# default '' means no virutal ip configured
enable_fence_dn = off #enable fence the datanode when cma cannot connect to any cms.
#if set to on, restart datenode after 30 seconds. otherwise, don't restart datanode.
#default off

View File

@ -42,6 +42,8 @@ agent_rhb_interval = 1000 # the heatbeat of
enable_ssl = on # enable cma to cma ssl
ssl_cert_expire_alert_threshold = 90
ssl_cert_expire_check_interval = 86400
db_service_vip = '' # db primary virtual ip address
# default '' means no virutal ip configured
enable_fence_dn = off #enable fence the datanode when cma cannot connect to any cms.
#if set to on, restart datenode after 30 seconds. otherwise, don't restart datanode.
#default off

View File

@ -45,6 +45,8 @@ agent_rhb_interval = 1000 # the heatbeat of
enable_ssl = off # enable cma to cma ssl
ssl_cert_expire_alert_threshold = 90
ssl_cert_expire_check_interval = 86400
db_service_vip = '' # db primary virtual ip address
# default '' means no virutal ip configured
enable_fence_dn = off #enable fence the datanode when cma cannot connect to any cms.
#if set to on, restart datenode after 30 seconds. otherwise, don't restart datanode.
#default off

View File

@ -386,6 +386,10 @@ void ReloadParametersFromConfigfile()
check_input_for_security(g_unixSocketDirectory);
}
if (get_config_param(configDir, "db_service_vip", g_dbServiceVip, sizeof(g_dbServiceVip)) < 0) {
write_runlog(ERROR, "get_config_param() get db_service_vip fail.\n");
}
log_max_size = get_int_value_from_config(configDir, "log_max_size", 10240);
log_saved_days = (uint32)get_int_value_from_config(configDir, "log_saved_days", 90);
log_max_count = (uint32)get_int_value_from_config(configDir, "log_max_count", 10000);
@ -404,7 +408,8 @@ void ReloadParametersFromConfigfile()
" log_threshold_check_interval=%u, log_max_size=%ld, log_max_count=%u, log_saved_days=%u, upgrade_from=%u,\n"
" enableLogCompress=%s, security_mode=%s, incremental_build=%d, unix_socket_directory=%s, "
#ifndef ENABLE_MULTIPLE_NODES
"enable_e2e_rto=%u, disaster_recovery_type=%d, environment_threshold=%s, enable_fence_dn=%s\n",
"enable_e2e_rto=%u, disaster_recovery_type=%d, environment_threshold=%s, "
"db_service_vip=%s, enable_fence_dn=%s\n",
#else
"enable_e2e_rto=%u, disaster_recovery_type=%d, environment_threshold=%s\n",
#endif
@ -433,6 +438,7 @@ void ReloadParametersFromConfigfile()
g_disasterRecoveryType,
#ifndef ENABLE_MULTIPLE_NODES
g_environmentThreshold,
g_dbServiceVip,
g_enableFenceDn);
#else
g_environmentThreshold);

View File

@ -226,6 +226,7 @@ uint32 g_sslCertExpireCheckInterval = SECONDS_PER_DAY;
uint32 g_cmaRhbItvl = 1000;
CmResConfList g_resConf[CM_MAX_RES_INST_COUNT] = {{{0}}};
#ifndef ENABLE_MULTIPLE_NODES
char g_dbServiceVip[CM_IP_LENGTH] = {0};
char g_enableFenceDn[10] = {0};
#endif
bool g_isPauseArbitration = false;

View File

@ -1428,6 +1428,14 @@ int get_agent_global_params_from_configfile()
get_start_mode(configDir);
get_connection_mode(configDir);
GetStringFromConf(configDir, g_environmentThreshold, sizeof(g_environmentThreshold), "environment_threshold");
GetStringFromConf(configDir, g_dbServiceVip, sizeof(g_dbServiceVip), "db_service_vip");
if (g_dbServiceVip[0] == '\0') {
write_runlog(LOG, "parameter \"db_service_vip\" is not provided, please check!\n");
} else if (!IsIPAddrValid(g_dbServiceVip)) {
write_runlog(ERROR, "value of parameter \"db_service_vip\" is invalid, please check!\n");
return -1;
}
agent_report_interval = get_uint32_value_from_config(configDir, "agent_report_interval", 1);
agent_heartbeat_timeout = get_uint32_value_from_config(configDir, "agent_heartbeat_timeout", 8);
agent_connect_timeout = get_uint32_value_from_config(configDir, "agent_connect_timeout", 1);

View File

@ -46,6 +46,7 @@
#include "common/config/cm_config.h"
#include "cm/cm_cipher.h"
#include "cm/cm_misc.h"
#include <arpa/inet.h>
/*
* ssh connect does not exit automatically when the network is fault,
* this will cause cm_ctl hang for several hours,
@ -64,6 +65,10 @@ conn_option_t g_sslOption;
#define SSL_CONNECT_TIMEOUT (5000)
#define SSL_SOCKET_TIMEOUT (5000)
/* two nodes arch usage */
ArbitrateParamsOn2Nodes g_paramsOn2Nodes = {"", false, false, 20};
static const int VAILD_IP_ADDR = 1;
bool CmFileExist(const char *file_path)
{
int32 ret;
@ -1312,6 +1317,33 @@ bool IsNodeIdValid(int nodeId)
return false;
}
status_t IsReachableIP(char *ip)
{
if (ip == nullptr) {
return CM_ERROR;
}
char cmd[MAXPGPATH] = {0};
int rc = snprintf_s(cmd, MAXPGPATH, MAXPGPATH - 1, "timeout 2 ping -c 2 %s > /dev/null 2>&1", ip);
securec_check_intval(rc, (void)rc);
rc = system(cmd);
return rc == 0 ? CM_SUCCESS : CM_ERROR;
}
bool IsIPAddrValid(const char *ipAddr)
{
if (ipAddr == nullptr) {
return false;
}
unsigned char ipAddrBuf[sizeof(struct in6_addr)];
// return value of function 'inet_pton' is 1 only when valid ip addr
if (inet_pton(AF_INET, ipAddr, &ipAddrBuf) == VAILD_IP_ADDR ||
inet_pton(AF_INET6, ipAddr, &ipAddrBuf) == VAILD_IP_ADDR) {
return true;
}
return false;
}
bool IsNeedCheckFloatIp()
{
if (g_clusterType == SingleInstCluster) {

View File

@ -62,6 +62,7 @@ const char *g_cmaParamInfo[] = {
"enable_gtm_phony_dead_check|int|0,1|NULL|NULL|",
"environment_threshold|string|0,0|NULL|NULL|",
#endif
"db_service_vip|string|0,0|NULL|NULL|",
"event_triggers|string|0,0|NULL|NULL|"
};
@ -122,6 +123,10 @@ const char *g_cmsParamInfo[] = {
"coordinator_heartbeat_timeout|int|0,2592000|NULL|if set 0,the function is disabled|",
"cluster_starting_aribt_delay|int|1,2592000|NULL|NULL|",
#endif
"third_party_gateway_ip|string|0,0|NULL|NULL|",
"cms_enable_failover_on2nodes|bool|0,0|NULL|NULL|",
"cms_enable_db_crash_recovery|bool|0,0|NULL|NULL|",
"cms_network_isolation_timeout|int|10,2147483647|NULL|NULL|",
};
const char *g_valueTypeStr[] = {

View File

@ -73,4 +73,18 @@ disk_timeout = 200
agent_network_timeout = 6
dn_arbitrate_mode = quorum
agent_fault_timeout = 60
third_party_gateway_ip = '' # used in 2 nodes cluster for ddb role arbitration with network isolation,
# when cms_enable_failover_on2nodes is true.
# default ''. if cms_enable_failover_on2nodes is true, this param must be configured.
cms_enable_failover_on2nodes = false # used in 2 nodes cluster. if true, will use third_party_gateway_ip as an arbitrator,
# when network isolation happens.
# default false.
cms_enable_db_crash_recovery = false # used in 2 nodes cluster. when network recovers from isolation,
# if there are two db primary nodes,
# if this param is true, cms will choice a new db primary, and demote the other db.
# if false, cms will choice a new db primary, but manually stop the other db.
# default false
cms_network_isolation_timeout = 20 # cms judges the network is isolated when it finds ddb cluster is not sync with each other nodes,
# after cms_network_isolation_timeout times.
# default 20
############### must leave a new line at the end ###################

View File

@ -70,4 +70,18 @@ disk_timeout = 200
agent_network_timeout = 6
dn_arbitrate_mode = quorum
delay_arbitrate_max_cluster_timeout = 300 # When resources are in the startup process, delay arbitration of the maximum cluster.
third_party_gateway_ip = '' # used in 2 nodes cluster for ddb role arbitration with network isolation,
# when cms_enable_failover_on2nodes is true.
# default ''. if cms_enable_failover_on2nodes is true, this param must be configured.
cms_enable_failover_on2nodes = false # used in 2 nodes cluster. if true, will use third_party_gateway_ip as an arbitrator,
# when network isolation happens.
# default false.
cms_enable_db_crash_recovery = false # used in 2 nodes cluster. when network recovers from isolation,
# if there are two db primary nodes,
# if this param is true, cms will choice a new db primary, and demote the other db.
# if false, cms will choice a new db primary, but manually stop the other db.
# default false
cms_network_isolation_timeout = 20 # cms judges the network is isolated when it finds ddb cluster is not sync with each other nodes,
# after cms_network_isolation_timeout times.
# default 20
############### must leave a new line at the end ###################

View File

@ -70,4 +70,18 @@ ddb_max_log_file_size = 10M # Maximum number of bytes in a log
ddb_log_suppress_enable = 1 # Indicates whether to enable the log suppression function. 1: open 0, close
ddb_election_timeout = 3 # DCC election timeout interval [1S,600S]
share_disk_path = ''
third_party_gateway_ip = '' # used in 2 nodes cluster for ddb role arbitration with network isolation,
# when cms_enable_failover_on2nodes is true.
# default ''. if cms_enable_failover_on2nodes is true, this param must be configured.
cms_enable_failover_on2nodes = false # used in 2 nodes cluster. if true, will use third_party_gateway_ip as an arbitrator,
# when network isolation happens.
# default false.
cms_enable_db_crash_recovery = false # used in 2 nodes cluster. when network recovers from isolation,
# if there are two db primary nodes,
# if this param is true, cms will choice a new db primary, and demote the other db.
# if false, cms will choice a new db primary, but manually stop the other db.
# default false
cms_network_isolation_timeout = 20 # cms judges the network is isolated when it finds ddb cluster is not sync with each other nodes,
# after cms_network_isolation_timeout times.
# default 20
############### must leave a new line at the end ###################

View File

@ -311,6 +311,25 @@ void report_unbalanced_alarm(AlarmType alarmType)
AlarmReporter(UnbalanceAlarmItem, alarmType, &tempAdditionalParam);
}
void ReportClusterDoublePrimaryAlarm(
AlarmType alarmType, AlarmId alarmId, uint32 instanceId, const char* serviceType)
{
AlarmItemInitialize(DoublePrimaryAlarmItem, alarmId, ALM_AS_Normal, NULL);
char instanceInfo[RESERVE_LEN] = {0};
int32 ret = -1;
ret = sprintf_s(instanceInfo, RESERVE_LEN, "%s_%d", serviceType, instanceId);
securec_check_intval(ret, (void)ret);
AlarmAdditionalParam tempAdditionalParam;
/* fill the alarm message */
WriteAlarmAdditionalInfo(&tempAdditionalParam, instanceInfo, "", "", "",
DoublePrimaryAlarmItem, alarmType, instanceInfo);
/* report the alarm */
AlarmReporter(DoublePrimaryAlarmItem, alarmType, &tempAdditionalParam);
}
void report_ddb_fail_alarm(AlarmType alarmType, const char* instanceName, int alarmIndex)
{
Alarm* alarm = GetDdbAlarm(alarmIndex);

View File

@ -579,6 +579,13 @@ static void ArbitratePromote(int32 *cmsDemoteDelayOnConnLess)
return;
}
SetDdbMinority(false);
if (g_dbType == DB_DCC &&
ENABLED_AUTO_FAILOVER_ON2NODES(g_cm_server_num, g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes) &&
g_ddbWorkMode == DDB_WORK_MODE_NONE) {
return;
}
status_t st = CmsRoleChangeWithDdb(cmsDemoteDelayOnConnLess);
if (st == CM_SUCCESS) {
CheckCmsPrimaryAgentConn(cmsDemoteDelayOnConnLess);

View File

@ -28,6 +28,7 @@
#include "cms_arbitrate_datanode_pms_utils.h"
#include "cms_common.h"
#include "cms_disk_check.h"
#include "cms_alarm.h"
#ifdef ENABLE_MULTIPLE_NODES
#include "cms_arbitrate_gtm.h"
#endif
@ -454,6 +455,11 @@ static void SaveDnStatusFromReport(const agent_to_cm_datanode_status_report *age
__LINE__, ctx->localRep->phony_dead_interval, ctx->instId);
}
ctx->localRep->phony_dead_times = agentRep->phony_dead_times;
if (undocumentedVersion == 0) {
ctx->localRep->dnVipStatus = agentRep->dnVipStatus;
} else {
ctx->localRep->dnVipStatus = CM_ERROR;
}
/* cluster streaming standby ignore term */
if (backup_open == CLUSTER_STREAMING_STANDBY) {
ctx->localRep->local_status.term = FirstTerm;
@ -539,9 +545,30 @@ static status_t RestartSmallerTermDynamicPrimary(DnArbCtx *ctx)
uint32 localTerm = ctx->localRep->local_status.term;
if (localTerm < info.maxTerm && ctx->localRep->local_status.local_role == INSTANCE_ROLE_PRIMARY &&
ctx->localRep->local_status.db_state == INSTANCE_HA_STATE_NORMAL && localTerm != InvalidTerm) {
SendRestartMsg(ctx, "[SmallerTerm]");
write_runlog(LOG, "line %d: instance %u local term(%u) is not max term(%u), "
"restart to pending.\n", __LINE__, ctx->instId, localTerm, info.maxTerm);
/*
* stop instance only when
* enable CM cluster auto failover and unable DB cluster auto crash recovery in two node deployment arch
*/
if (ENABLED_AUTO_FAILOVER_ON2NODES(g_cm_server_num, g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes) &&
!g_paramsOn2Nodes.cmsEnableDbCrashRecovery) {
write_runlog(ERROR,
"line %d: split brain failure in db service, instance %u local term(%u) is not max term(%u). "
"Due to auto crash recovery is disabled, will not restart current instance, "
"waiting for manual intervention.\n",
__LINE__, ctx->instId, localTerm, ctx->maxTerm);
ReportClusterDoublePrimaryAlarm(
ALM_AT_Event,
ALM_AI_DbInstanceDoublePrimary,
ctx->instId,
SERVICE_TYPE_DB);
// try to stop fake primary db instance from cms
StopFakePrimaryResourceInstance(ctx);
} else {
SendRestartMsg(ctx, "[SmallerTerm]");
write_runlog(LOG, "line %d: instance %u local term(%u) is not max term(%u), "
"restart to pending.\n", __LINE__, ctx->instId, localTerm, ctx->maxTerm);
}
return CM_ERROR;
}
/* The connection between the CMA and the DN may be abnormal. Need to restart this Primary. */
@ -1442,10 +1469,38 @@ static bool MoreDyPrimary(DnArbCtx *ctx, const char *typeName)
/* restart dn instance */
if (ctx->info.dbRestart) {
GroupStatusShow(typeName, ctx->groupIdx, ctx->instId, ctx->cond.vaildCount, ctx->cond.finishRedo);
SendRestartMsg(ctx, typeName);
write_runlog(LOG, "%s, line %d: more dynamic primary and their term(%u) are the most(%u), "
"send restart msg to instance(%u).\n", typeName, __LINE__, ctx->info.term, ctx->maxTerm, ctx->instId);
ctx->repGroup->arbitrate_status_member[ctx->memIdx].restarting = false;
/*
* stop instance only when
* enable CM cluster auto failover and unable DB cluster auto crash recovery in two node deployment arch
*/
if (ENABLED_AUTO_FAILOVER_ON2NODES(g_cm_server_num, g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes) &&
!g_paramsOn2Nodes.cmsEnableDbCrashRecovery) {
write_runlog(ERROR,
"%s, line %d: split brain failure in db service, more dynamic primary and their term(%u) "
"are the most(%u). Due to auto crash recovery is disabled, no need send restart msg to instance(%u) "
"that had been restarted, waiting for manual intervention.\n",
typeName, __LINE__, ctx->info.term, ctx->maxTerm, ctx->instId);
/* compare local term, local lsn, noidid */
if (XLByteWE_W_TERM(ctx->maxTerm, ctx->cond.maxLsn,
ctx->dnReport[ctx->memIdx].local_status.term, ctx->dnReport[ctx->memIdx].local_status.last_flush_lsn) ||
IsInstanceIdMax(ctx)) {
ReportClusterDoublePrimaryAlarm(
ALM_AT_Event,
ALM_AI_DbInstanceDoublePrimary,
ctx->instId,
SERVICE_TYPE_DB);
/* try to stop fake primary db instance from cms */
StopFakePrimaryResourceInstance(ctx);
}
} else {
SendRestartMsg(ctx, typeName);
write_runlog(LOG, "%s, line %d: more dynamic primary and their term(%u) are the most(%u), "
"send restart msg to instance(%u) that had been restarted.\n",
typeName, __LINE__, ctx->info.term, ctx->maxTerm, ctx->instId);
ctx->repGroup->arbitrate_status_member[ctx->memIdx].restarting = false;
}
return true;
}
@ -1475,10 +1530,37 @@ static bool MoreDyPrimary(DnArbCtx *ctx, const char *typeName)
/* restart dn instance */
if (ctx->repGroup->arbitrate_status_member[ctx->memIdx].restarting) {
GroupStatusShow(typeName, ctx->groupIdx, ctx->instId, ctx->cond.vaildCount, ctx->cond.finishRedo);
SendRestartMsg(ctx, typeName);
write_runlog(LOG, "%s, line %d: more dynamic primary and their term(%u) are the most(%u), "
"send restart msg to instance(%u).\n", typeName, __LINE__, ctx->info.term, ctx->maxTerm, ctx->instId);
ctx->repGroup->arbitrate_status_member[ctx->memIdx].restarting = false;
/*
* stop instance only when
* enable CM cluster auto failover and unable DB cluster auto crash recovery in two node deployment arch
*/
if (ENABLED_AUTO_FAILOVER_ON2NODES(g_cm_server_num, g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes) &&
!g_paramsOn2Nodes.cmsEnableDbCrashRecovery) {
write_runlog(ERROR,
"%s, line %d: split brain failure in db service, more dynamic primary and their term(%u) "
"are the most(%u). Due to auto crash recovery is disabled, no need send restart msg to instance(%u), "
"waiting for manual intervention.\n", typeName, __LINE__, ctx->info.term, ctx->maxTerm, ctx->instId);
/* compare local term, local lsn, noidid */
if (XLByteWE_W_TERM(ctx->maxTerm, ctx->cond.maxLsn,
ctx->dnReport[memIdx].local_status.term, ctx->dnReport[memIdx].local_status.last_flush_lsn) ||
IsInstanceIdMax(ctx)) {
ReportClusterDoublePrimaryAlarm(
ALM_AT_Event,
ALM_AI_DbInstanceDoublePrimary,
ctx->instId,
SERVICE_TYPE_DB);
/* try to stop fake primary db instance from cms */
StopFakePrimaryResourceInstance(ctx);
}
} else {
SendRestartMsg(ctx, typeName);
write_runlog(LOG, "%s, line %d: more dynamic primary and their term(%u) are the most(%u), "
"send restart msg to instance(%u).\n", typeName, __LINE__, ctx->info.term, ctx->maxTerm, ctx->instId);
ctx->repGroup->arbitrate_status_member[ctx->memIdx].restarting = false;
}
return true;
}
return false;
@ -1880,6 +1962,15 @@ static void ArbitrateStandbyInQuarm(DnArbCtx *ctx, const char *str)
str, ctx->instId, ctx->cond.isPrimaryValid, ctx->cond.dyPrimNormalIdx, ctx->cond.vaildPrimIdx);
return;
}
if (ctx->localRep->dnVipStatus == CM_SUCCESS) {
write_runlog(DEBUG1, "%s, instanceId %u dnVipStatus is %s, dyPrimNormalIdx is %d, vaildPrimIdx is %d, "
"not need to arbitrate.\n",
str, ctx->instId, ctx->localRep->dnVipStatus == CM_SUCCESS ? "good" : "bad",
ctx->cond.dyPrimNormalIdx, ctx->cond.vaildPrimIdx);
return;
}
if (ChangeStaticPrimaryRoleInStandby(ctx, str)) {
return;
}
@ -2201,3 +2292,60 @@ void DatanodeInstanceArbitrate(MsgRecvInfo* recvMsgInfo, const agent_to_cm_datan
DnArbitrateInner(&ctx);
(void)pthread_rwlock_unlock(ctx.lock);
}
void StopFakePrimaryResourceInstance(const DnArbCtx *ctx)
{
int ret = -1;
uint32 ii = 0;
uint32 jj = 0;
bool isFound = false;
char command[MAX_PATH_LEN] = {0};
// find fake primary instance's local data path
for (ii = 0; ii < g_node_num; ii++) {
for (jj = 0; jj < g_node[ii].datanodeCount; jj++) {
if (g_node[ii].datanode[jj].datanodeId == ctx->instId) {
isFound = true;
break;
}
}
if (isFound) {
break;
}
}
if (ii >= g_node_num) {
write_runlog(ERROR, "cannot find dn instance: nodeid=%u.\n", g_node[ii].node);
return;
}
// stop fake primary instance
ret = snprintf_s(command, sizeof(command), sizeof(command) - 1,
SYSTEMQUOTE "cm_ctl stop -n %u -D %s" SYSTEMQUOTE, ctx->node, g_node[ii].datanode[jj].datanodeLocalDataPath);
securec_check_intval(ret, (void)ret);
ret = system(command);
if (ret != 0) {
write_runlog(ERROR, "failed to stop db instance with command: \"%s\","
"nodeId=%u, systemReturn=%d, shellReturn=%d, errno=%d.\n",
command, ctx->node, ret, SHELL_RETURN_CODE(ret), errno);
return;
}
write_runlog(LOG, "stop db instance successfully, nodeid: %u, instanceid %u.\n", ctx->node, ctx->instId);
}
// judge whether current instance's id is the max
bool IsInstanceIdMax(const DnArbCtx *ctx)
{
uint32 maxId = ctx->instId;
for (int32 i = 0; i < ctx->dyNorPrim.count; ++i) {
if (!InstanceIsCandicate(ctx, ctx->dyNorPrim.itStatus[i].memIdx, true)) {
continue;
}
maxId = ctx->dyNorPrim.itStatus[i].instId > maxId ? ctx->dyNorPrim.itStatus[i].instId : maxId;
}
return ((maxId == ctx->instId) ? true : false);
}

View File

@ -28,7 +28,8 @@
#include "sys/epoll.h"
#include "cms_ddb_adapter.h"
#include "cms_common.h"
static const int CMS_NETWORK_ISOLATION_TIMES = 20;
static const int BOOL_STR_MAX_LEN = 10;
void ExecSystemSsh(uint32 remoteNodeid, const char *cmd, int *result, const char *resultPath)
{
int rc;
@ -332,6 +333,49 @@ void GetDdbTypeParam(void)
}
write_runlog(LOG, "ddbType is %d.\n", g_dbType);
}
/* Obtain arbitrate params from cm_server.conf in two nodes arch */
void GetTwoNodesArbitrateParams(void) {
get_config_param(configDir, "third_party_gateway_ip", g_paramsOn2Nodes.thirdPartyGatewayIp,
sizeof(g_paramsOn2Nodes.thirdPartyGatewayIp));
char szEnableFailover[BOOL_STR_MAX_LEN] = {0};
get_config_param(configDir, "cms_enable_failover_on2nodes", szEnableFailover, sizeof(szEnableFailover));
if (szEnableFailover[0] == '\0') {
write_runlog(WARNING, "parameter \"cms_enable_failover_on2nodes\" not provided, will use defaule value [%d].\n",
g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes);
} else {
if (!CheckBoolConfigParam(szEnableFailover)) {
write_runlog(WARNING, "invalid value for parameter \" cms_enable_failover_on2nodes \" in %s, "
"will use default value [%d].\n", configDir, g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes);
} else {
g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes = IsBoolCmParamTrue(szEnableFailover) ? true : false;
}
}
if (g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes == true && !IsIPAddrValid(g_paramsOn2Nodes.thirdPartyGatewayIp)) {
write_runlog(ERROR, "parameter \"cms_enable_failover_on2nodes\" is true, "
"but parameter \"third_party_gateway_ip\" is invalid, please check!\n");
exit(1);
}
g_paramsOn2Nodes.cmsNetworkIsolationTimeout = (uint32)get_int_value_from_config(configDir,
"cms_network_isolation_timeout", CMS_NETWORK_ISOLATION_TIMES);
char szCrashRecovery[BOOL_STR_MAX_LEN] = {0};
get_config_param(configDir, "cms_enable_db_crash_recovery", szCrashRecovery, sizeof(szCrashRecovery));
if (szCrashRecovery[0] == '\0') {
write_runlog(WARNING,
"parameter \"cms_enable_db_crash_recovery\" not provided, will use defaule value [%d].\n",
g_paramsOn2Nodes.cmsEnableDbCrashRecovery);
} else {
if (!CheckBoolConfigParam(szCrashRecovery)) {
write_runlog(FATAL, "invalid value for parameter \" cms_enable_db_crash_recovery \" in %s, "
"will use default value [%d].\n", configDir, g_paramsOn2Nodes.cmsEnableDbCrashRecovery);
} else {
g_paramsOn2Nodes.cmsEnableDbCrashRecovery = IsBoolCmParamTrue(szCrashRecovery) ? true : false;
}
}
}
void GetCmsParaFromConfig()
{
@ -361,6 +405,10 @@ void GetCmsParaFromConfig()
GetDdbTypeParam();
GetDelayArbitTimeFromConf();
GetDelayArbitClusterTimeFromConf();
if (g_cm_server_num == CMS_ONE_PRIMARY_ONE_STANDBY) {
GetTwoNodesArbitrateParams();
}
}
void GetDdbArbiCfg(int32 loadWay)

View File

@ -1194,7 +1194,7 @@ int SetFirstTermToDdb()
return 0;
}
int IncrementTermToDdb()
int IncrementTermToDdb(uint32 incTerm)
{
uint32 term = 0;
bool firstStart = false;
@ -1212,7 +1212,7 @@ int IncrementTermToDdb()
return -1;
}
term += CM_INCREMENT_TERM_VALUE;
term += incTerm;
if (SetTermToDdb(term) != 0) {
g_needIncTermToDdbAgain = true;
return -1;

View File

@ -30,6 +30,8 @@
static CM_ConnDdbInfo g_ddbSession = {0};
static DDB_RESULT GetKVInDDb(DdbConn *ddbConn, DrvText *drvKey, DrvText *drvValue, int32 logLevel = -1);
static status_t SetDdbWorkModeInDDb(DdbConn *ddbConn, unsigned int workMode, unsigned int voteNum);
static status_t DemoteDdbRole2StandbyInDDb(DdbConn *ddbConn);
void RestDdbConn(DdbConn *ddbConn, status_t st, const DDB_RESULT *ddbResult)
{
@ -561,6 +563,26 @@ void EtcdIpPortInfoBalance(ServerSocket *server, const char *azNames)
}
}
status_t SetDdbWorkMode(unsigned int workMode, unsigned int voteNum)
{
DdbConn *ddbConn = GetNextDdbConn();
if (ddbConn == NULL) {
write_runlog(ERROR, "%s:%d ddbConn is NULL.\n", __FUNCTION__, __LINE__);
return CM_ERROR;
}
return SetDdbWorkModeInDDb(ddbConn, workMode, voteNum);
}
status_t DemoteDdbRole2Standby()
{
DdbConn *ddbConn = GetNextDdbConn();
if (ddbConn == NULL) {
write_runlog(ERROR, "%s:%d ddbConn is NULL.\n", __FUNCTION__, __LINE__);
return CM_ERROR;
}
return DemoteDdbRole2StandbyInDDb(ddbConn);
}
static status_t InitEtcdServerList(DrvApiInfo *drvApiInfo, const char *azNames)
{
size_t len = (g_etcd_num + 1) * sizeof(ServerSocket);
@ -1027,3 +1049,23 @@ void LoadDdbParamterFromConfig()
}
LoadParamterFromConfigWithPrefixKey(configDir, "ddb_", SetDdbParam);
}
static status_t SetDdbWorkModeInDDb(DdbConn *ddbConn, unsigned int workMode, unsigned int voteNum)
{
if (g_dbType != DB_DCC) {
const char *dbStr = GetDdbToString(g_dbType);
write_runlog(ERROR, "current ddbType is %s, don't support this operation", dbStr);
return CM_ERROR;
}
return DdbSetWorkMode(ddbConn, workMode, voteNum);
}
static status_t DemoteDdbRole2StandbyInDDb(DdbConn *ddbConn)
{
if (g_dbType != DB_DCC) {
const char *dbStr = GetDdbToString(g_dbType);
write_runlog(ERROR, "current ddbType is %s, don't support this operation", dbStr);
return CM_ERROR;
}
return DdbDemoteRole2Standby(ddbConn);
}

View File

@ -71,6 +71,7 @@ ClusterRole backup_open = CLUSTER_PRIMARY;
ClusterInstallType g_clusterInstallType = INSTALL_TYPE_DEFAULT;
Alarm UnbalanceAlarmItem[1];
Alarm ServerSwitchAlarmItem[1];
Alarm DoublePrimaryAlarmItem[1];
synchronous_standby_mode current_cluster_az_status = AnyFirstNo;
volatile cm_start_mode cm_server_start_mode = MAJORITY_START; /* cm_arbitration_mode needs to be deleted. */
@ -155,6 +156,9 @@ uint32 g_sslCertExpireCheckInterval = SECONDS_PER_DAY;
uint32 g_diskTimeout = 200;
uint32 g_agentNetworkTimeout = 6;
DnArbitrateMode g_dnArbitrateMode = QUORUM;
uint32 g_ddbNetworkIsolationTimeout = 20;
ddb_work_mode g_ddbWorkMode = DDB_WORK_MODE_NONE;
uint32 g_bigVoteNumInMinorityMode = 0;
#ifdef ENABLE_MULTIPLE_NODES
uint32 coordinator_heartbeat_timeout = cn_delete_default_time;
int32 g_cmAgentDeleteCn = 30;
@ -255,6 +259,7 @@ CM_IOThreads gIOThreads;
CM_WorkThreads gWorkThreads;
CM_HAThreads gHAThreads;
CM_MonitorThread gMonitorThread;
CM_DdbStatusCheckAndSetThread gDdbCheckThread;
CM_MonitorNodeStopThread gMonitorNodeStopThread;
/*

View File

@ -2691,6 +2691,16 @@ int main(int argc, char** argv)
return -1;
}
if (g_cm_server_num == CMS_ONE_PRIMARY_ONE_STANDBY && g_dbType == DB_DCC) {
status = CM_CreateDdbStatusCheckThread();
if (status < 0) {
write_runlog(ERROR, "CM_CreateDdbStatusCheckThread failed!\n");
CloseAllDdbSession();
FreeNotifyMsg();
return -1;
}
}
GetMultiAzNodeInfo();
g_loopState.count = 1;
st = CmsCreateThreads();

View File

@ -30,12 +30,17 @@
#include "cms_process_messages.h"
#include "cms_write_dynamic_config.h"
#include "cms_arbitrate_cluster.h"
#include "cjson/cJSON.h"
#include "cms_monitor_main.h"
/* cluster unbalance check interval */
const int cluster_unbalance_check_interval = 10;
static int g_cluster_unbalance_check_interval = cluster_unbalance_check_interval;
static const uint32 CHECK_SLEEP_INTERVAL = 5;
static const uint32 MAX_VOTE_NUM = 2;
static const uint32 DDB_STATUS_CHECK_INTERVAL = 2;
static const uint32 CMS_ID_INDEX_ONE = 1;
static const uint32 CMS_ID_INDEX_TWO = 2;
static void RmAllBlackFile(const char *blackFile);
using MonitorContext = struct StMonitorContext {
@ -339,6 +344,10 @@ static void ReloadParametersFromConfigfile()
g_agentNetworkTimeout = get_uint32_value_from_config(configDir, "agent_network_timeout", 6);
GetDnArbitrateMode();
if (g_cm_server_num == CMS_ONE_PRIMARY_ONE_STANDBY) {
GetTwoNodesArbitrateParams();
}
#ifdef ENABLE_MULTIPLE_NODES
write_runlog(LOG,
"reload cm_server parameters:\n"
@ -501,6 +510,15 @@ static void CheckHB()
}
}
static inline void CheckDdbClusterStatusOn2Nodes()
{
if (g_ddbNetworkIsolationTimeout > 0) {
g_ddbNetworkIsolationTimeout--;
} else {
g_ddbNetworkIsolationTimeout = 0;
}
}
#ifdef ENABLE_MULTIPLE_NODES
static void DoCNTimeout(uint32 groupIdx, int memIdx)
{
@ -842,6 +860,311 @@ static void CheckMaxCluster()
CheckMaxClusterHeartbeartValue();
}
static status_t IsPeerCmsReachableOn2Nodes()
{
if (!ENABLED_AUTO_FAILOVER_ON2NODES(g_cm_server_num, g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes)) {
write_runlog(ERROR, "should be called by two node cluster with enabling auto failover only.\n");
return CM_ERROR;
}
// create socket
int socketFd = socket(AF_INET, SOCK_STREAM, 0);
if (socketFd == -1) {
write_runlog(ERROR, "could not create socket.\n");
return CM_ERROR;
}
struct timeval tv = { 0, 0 };
tv.tv_sec = CM_TCP_TIMEOUT;
(void)setsockopt(socketFd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(tv));
(void)setsockopt(socketFd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv));
struct sockaddr_in sockAddr;
status_t ret = CM_SUCCESS;
for (uint32 i = 0; i < g_cm_server_num; i++) {
if (g_node[i].cmServerId == g_currentNode->cmServerId) {
continue;
}
// got peer cms info
sockAddr.sin_addr.s_addr = inet_addr(g_node[i].cmServerLocalHAIP[0]);
sockAddr.sin_family = AF_INET;
sockAddr.sin_port = htons(g_node[i].cmServerLocalHAPort);
// Connect to peer cms dcc ip:port
if (connect(socketFd , (struct sockaddr *)&sockAddr , sizeof(sockAddr)) < 0) {
write_runlog(LOG, "could not connect to peer cms %s:%d.\n",
g_node[i].cmServerLocalHAIP[0], g_node[i].cmServerLocalHAPort);
ret = CM_ERROR;
} else {
write_runlog(DEBUG1, "connect to peer cms %s:%d successfuly.\n",
g_node[i].cmServerLocalHAIP[0], g_node[i].cmServerLocalHAPort);
ret = CM_SUCCESS;
}
break;
}
close(socketFd);
return ret;
}
static status_t GetClusterInfoFromDDb(char *info, int outLen)
{
if (g_inMaintainMode) {
write_runlog(LOG, "in maintain mode, can't do ddb cmd.\n");
return CM_SUCCESS;
}
char cmd[DCC_CMD_MAX_LEN] = {0};
char errMsg[ERR_MSG_LENGTH] = {0};
status_t ret = CM_SUCCESS;
errno_t rc = snprintf_s(cmd, DCC_CMD_MAX_LEN, DCC_CMD_MAX_LEN - 1, " %s", CM_DDB_CLUSTER_INFO_CMD);
securec_check_intval(rc, (void)rc);
ret = DoDdbExecCmd(cmd, info, &outLen, errMsg, DCC_CMD_MAX_OUTPUT_LEN);
if (ret != CM_SUCCESS) {
write_runlog(ERROR, "get ddb cluster info failed. error: %s\n", errMsg);
}
return ret;
}
static status_t IsPeerApplyIndexChanged(cJSON *nodes)
{
static int peerApplyIndex = 0;
int prevPeerApplyIndex = peerApplyIndex;
cJSON *applyIndex = NULL;
if (g_currentNode->cmServerId == CMS_ID_INDEX_ONE) {
applyIndex = cJSON_GetObjectItem(cJSON_GetArrayItem(nodes, 1), "apply_index");
} else if (g_currentNode->cmServerId == CMS_ID_INDEX_TWO) {
applyIndex = cJSON_GetObjectItem(cJSON_GetArrayItem(nodes, 0), "apply_index");
} else {
write_runlog(ERROR, "wrong cm server id: %d\n", g_currentNode->cmServerId);
exit(1);
}
if (applyIndex == NULL) {
write_runlog(ERROR, "cannot parse ddb cluster info {apply_index}.\n");
return CM_ERROR;
}
peerApplyIndex = applyIndex->valueint;
write_runlog(DEBUG5, "prevPeerApplyIndex: %d peerApplyIndex: %d g_ddbNetworkIsolationTimeout: %d\n",
prevPeerApplyIndex, peerApplyIndex, g_ddbNetworkIsolationTimeout);
if (peerApplyIndex == prevPeerApplyIndex) {
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t IsPeerCmsRolePrimary(cJSON *nodes)
{
int peerCmserverRole = -1;
cJSON *role = NULL;
if (g_currentNode->cmServerId == CMS_ID_INDEX_ONE) {
role = cJSON_GetObjectItem(cJSON_GetArrayItem(nodes, 1), "role");
} else if (g_currentNode->cmServerId == CMS_ID_INDEX_TWO) {
role = cJSON_GetObjectItem(cJSON_GetArrayItem(nodes, 0), "role");
}
if (role == NULL) {
write_runlog(ERROR, "cannot parse ddb cluster info {role}.\n");
return CM_ERROR;
}
peerCmserverRole = strcmp(cJSON_GetStringValue(role), "LEADER") == 0 ? CM_SERVER_PRIMARY : CM_SERVER_STANDBY;
write_runlog(DEBUG5, "peerCmserverRole: %s g_ddbNetworkIsolationTimeout: %d\n",
peerCmserverRole == CM_SERVER_PRIMARY ? "Primary" : "Standby", g_ddbNetworkIsolationTimeout);
if (peerCmserverRole != CM_SERVER_PRIMARY) {
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t IsDdbLogSyncOn2Nodes(char * info)
{
if (info == NULL) {
return CM_ERROR;
}
write_runlog(DEBUG5, "ddb cluster info: %s\n", info);
cJSON *root = cJSON_Parse(info);
if (root == NULL) {
write_runlog(ERROR, "cannot parse ddb cluster info {root}.\n");
return CM_ERROR;
}
cJSON *stream = cJSON_GetArrayItem(cJSON_GetObjectItem(root, "stream_list"), 0);
if (stream == NULL) {
write_runlog(ERROR, "cannot parse ddb cluster info {stream_list}.\n");
cJSON_Delete(root);
return CM_ERROR;
}
cJSON *nodes = cJSON_GetObjectItem(stream, "nodes");
if (nodes == NULL) {
write_runlog(ERROR, "cannot parse ddb cluster info {nodes}.\n");
cJSON_Delete(root);
return CM_ERROR;
}
switch (g_HA_status->local_role) {
case CM_SERVER_PRIMARY:
return IsPeerApplyIndexChanged(nodes);
case CM_SERVER_STANDBY:
return IsPeerCmsRolePrimary(nodes);
default:
write_runlog(ERROR, "unexpected local_role: %d\n", g_HA_status->local_role);
return CM_ERROR;
}
cJSON_Delete(root);
return CM_SUCCESS;
}
static inline void DdbSetDdbWorkMode(ddb_work_mode workMode, unsigned int voteNum, uint32 isBigVoteNum)
{
if (SetDdbWorkMode(workMode, voteNum) != CM_SUCCESS) {
write_runlog(ERROR, "setting work mode: %d failed with minVoteNum: %d isBigVoteNum: %d\n",
workMode, voteNum, isBigVoteNum);
return;
}
g_ddbWorkMode = workMode;
if (workMode == DDB_WORK_MODE_MINORITY) {
g_bigVoteNumInMinorityMode = isBigVoteNum;
}
}
static void DdbMinorityWorkModeSetInMajority()
{
uint32 minVoteNum = 1;
if (IsReachableIP(g_paramsOn2Nodes.thirdPartyGatewayIp) == CM_SUCCESS) {
// third party gateway is reachable, setting a small vote num to make sure current node works as primary.
write_runlog(LOG, "promote node to primary\n");
DdbSetDdbWorkMode(DDB_WORK_MODE_MINORITY, minVoteNum, 0);
} else {
// third party gateway is not reachable, setting a big vote num to make sure current node works as standby.
minVoteNum += MAX_VOTE_NUM;
DdbSetDdbWorkMode(DDB_WORK_MODE_MINORITY, minVoteNum, 1);
if (g_HA_status->local_role == CM_SERVER_PRIMARY) {
// primary node need to demote to standby
write_runlog(LOG, "demote node to standby\n");
if (DemoteDdbRole2Standby() != CM_SUCCESS) {
write_runlog(ERROR, "demote node to standby failed\n");
return;
}
}
}
write_runlog(LOG, "go into minority work mode with minVoteNum: %d g_bigVoteNumInMinorityMode: %d.\n",
minVoteNum, g_bigVoteNumInMinorityMode);
(void)pthread_rwlock_wrlock(&term_update_rwlock);
IncrementTermToDdb(CM_INCREMENT_BIG_TERM_VALUE);
(void)pthread_rwlock_unlock(&term_update_rwlock);
}
static void DdbMinorityWorkModeSetInMinority()
{
uint32 minVoteNum = 1;
if (IsReachableIP(g_paramsOn2Nodes.thirdPartyGatewayIp) == CM_SUCCESS && g_bigVoteNumInMinorityMode == 1) {
write_runlog(LOG, "reset minority work mode and become primary.\n");
DdbSetDdbWorkMode(DDB_WORK_MODE_MINORITY, minVoteNum, 0);
} else if (IsReachableIP(g_paramsOn2Nodes.thirdPartyGatewayIp) != CM_SUCCESS && g_bigVoteNumInMinorityMode == 0) {
minVoteNum += MAX_VOTE_NUM;
write_runlog(LOG, "reset minority work mode and become standby.\n");
DdbSetDdbWorkMode(DDB_WORK_MODE_MINORITY, minVoteNum, 1);
if (g_HA_status->local_role == CM_SERVER_PRIMARY) {
write_runlog(LOG, "demote node to standby\n");
if (DemoteDdbRole2Standby() != CM_SUCCESS) {
write_runlog(ERROR, "demote node to standby failed\n");
}
}
}
}
static void DdbMinorityWorkModeSetInStartup()
{
uint32 minVoteNum = 1;
if (IsReachableIP(g_paramsOn2Nodes.thirdPartyGatewayIp) == CM_SUCCESS) {
write_runlog(LOG, "start up with minority work mode and minVoteNum: %d.\n", minVoteNum);
DdbSetDdbWorkMode(DDB_WORK_MODE_MINORITY, minVoteNum, 0);
} else {
minVoteNum += MAX_VOTE_NUM;
write_runlog(LOG, "start up with minority work mode and minVoteNum: %d.\n", minVoteNum);
DdbSetDdbWorkMode(DDB_WORK_MODE_MINORITY, minVoteNum, 1);
DemoteDdbRole2Standby();
}
}
static int DdbStatusCheck() {
/*
* start to do dcc cluster status check.
*/
char info[DCC_CMD_MAX_OUTPUT_LEN] = {0};
if (IsPeerCmsReachableOn2Nodes() == CM_SUCCESS ||
(GetClusterInfoFromDDb(info, DCC_CMD_MAX_OUTPUT_LEN) == CM_SUCCESS
&& IsDdbLogSyncOn2Nodes(info) == CM_SUCCESS)) {
/*
* network are good between two nodes. reset g_ddbNetworkIsolationTimeout.
*/
g_ddbNetworkIsolationTimeout = g_paramsOn2Nodes.cmsNetworkIsolationTimeout;
return CM_SUCCESS;
}
return CM_ERROR;
}
static void DoDdbStatusCheckAndSet()
{
/*
* if the cluster are not two nodes cluster, or don't enable auto failover, skip check.
*/
if (!ENABLED_AUTO_FAILOVER_ON2NODES(g_cm_server_num, g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes)) {
cm_sleep(DDB_STATUS_CHECK_INTERVAL);
return;
}
if (DdbStatusCheck() == CM_SUCCESS) {
if (g_ddbWorkMode != DDB_WORK_MODE_MAJORITY) {
write_runlog(LOG, "go into majority work mode.\n");
DdbSetDdbWorkMode(DDB_WORK_MODE_MAJORITY, 0, 0);
}
cm_sleep(DDB_STATUS_CHECK_INTERVAL);
return;
}
if (g_ddbNetworkIsolationTimeout != 0) {
cm_sleep(DDB_STATUS_CHECK_INTERVAL);
return;
}
/*
* go into minority work mode, because:
* 1. we cannot get ddb cluster info sync within g_cms_ddb_log_sync_timeout
* 2. we cannot reach peer node dcc ip:port
*/
switch (g_ddbWorkMode) {
case DDB_WORK_MODE_MAJORITY:
DdbMinorityWorkModeSetInMajority();
break;
case DDB_WORK_MODE_MINORITY:
DdbMinorityWorkModeSetInMinority();
break;
default:
DdbMinorityWorkModeSetInStartup();
}
cm_sleep(DDB_STATUS_CHECK_INTERVAL);
return;
}
static void DoMonitor(MonitorContext *ctx)
{
CheckKerberosHB();
@ -856,6 +1179,16 @@ static void DoMonitor(MonitorContext *ctx)
CheckHB();
if (g_dbType == DB_DCC &&
ENABLED_AUTO_FAILOVER_ON2NODES(g_cm_server_num, g_paramsOn2Nodes.cmsEnableFailoverOn2Nodes)) {
/*
* two nodes cluster and enable auto failover.
* when network isolation happened, cms choice a node as new primary if
* the node can reach the third party gateway.
*/
CheckDdbClusterStatusOn2Nodes();
}
CheckMaxCluster();
CheckRoleChange();
@ -929,6 +1262,22 @@ void *CM_ThreadMonitorMain(void *argp)
return NULL;
}
void *CM_ThreadDdbStatusCheckAndSetMain(void *argp)
{
CM_DdbStatusCheckAndSetThread *pCheckThread = (CM_DdbStatusCheckAndSetThread *)argp;
/* unify log style */
thread_name = "DdbStatusCheck";
write_runlog(LOG, "Starting Ddb Status Check thread\n");
pCheckThread->thread.type = THREAD_TYPE_DDB_STATUS_CHECKER;
for (;;) {
DoDdbStatusCheckAndSet();
}
return NULL;
}
static void GetStopNodes(char *stopAzNodes, size_t len)
{
if (stopAzNodes == NULL || len == 0) {
@ -1365,3 +1714,4 @@ void *CheckGtmModMain(void *arg)
return NULL;
}
#endif

View File

@ -606,7 +606,9 @@ static void SetNodeMsgCurState(InitNodeMsg *nodeMsg, int32 status)
static int ProcessHaStatus2AzStartSyncThr(int count, int groupDnNum, InitNodeMsg *nodeMsg)
{
SetNodeMsgMethodStr(nodeMsg, "[ProcessHaStatus2AzStartSyncThr]");
if ((nodeMsg->norPrimInCurSL + nodeMsg->normalVoteAzPrimary) != 1 || nodeMsg->normStdbInCurSL < count / 2) {
// note that, number of normal standby in sync list should >= groupDnNum/2, and groupDnNum should >= 3
if ((nodeMsg->norPrimInCurSL + nodeMsg->normalVoteAzPrimary) != 1 ||
(g_cm_server_num > CMS_ONE_PRIMARY_ONE_STANDBY && nodeMsg->normStdbInCurSL < count / 2)) {
SetNodeMsgCurState(nodeMsg, CM_STATUS_NEED_REPAIR);
return -1;
}

View File

@ -78,6 +78,22 @@ int CM_CreateMonitor(void)
return 0;
}
/**
* @brief Create a ddb cluster status check Thread object
*
* @return int
*/
int CM_CreateDdbStatusCheckThread(void)
{
CM_DdbStatusCheckAndSetThread* pCheckThread = &gDdbCheckThread;
errno_t rc = memset_s(pCheckThread, sizeof(CM_DdbStatusCheckAndSetThread), 0, sizeof(CM_DdbStatusCheckAndSetThread));
securec_check_errno(rc, (void)rc);
if (pthread_create(&(gDdbCheckThread.thread.tid), NULL, CM_ThreadDdbStatusCheckAndSetMain, pCheckThread) != 0) {
return -1;
}
return 0;
}
#ifdef ENABLE_MULTIPLE_NODES
status_t CmCreateCheckGtmModThread()
{

View File

@ -237,6 +237,8 @@ typedef status_t (*DrvDdbExecCmd)(DrvCon_t session, char *cmdLine, char *value,
typedef status_t (*DrvDdbSetBlocked)(unsigned int setBlock, unsigned int waitTimeoutMs);
typedef status_t (*DrvSetParam)(const char *key, const char *value);
typedef status_t (*DrvStop)(bool *ddbStop);
typedef status_t (*DrvSetWorkMode)(DrvCon_t session, unsigned int workMode, unsigned int voteNum);
typedef status_t (*DrvDemoteDdbRole)(DrvCon_t session);
typedef struct DdbDriverSt {
pthread_rwlock_t lock;
@ -268,6 +270,9 @@ typedef struct DdbDriverSt {
DrvDdbSetBlocked setBlocked; // set ddb block
DrvSetParam setParam; // set ddb param
DrvStop stop;
DrvSetWorkMode setWorkMode; // set ddb work mode
DrvDemoteDdbRole demoteDdbRole;
bool ddbStopped;
} DdbDriver;
@ -311,5 +316,7 @@ status_t DdbExecCmd(DdbConn *ddbConn, char *cmdLine, char *output, int *outputLe
status_t DdbSetBlocked(const DdbConn *ddbConn, unsigned int setBlock, unsigned waitTimeoutMs);
status_t DDbSetParam(const DdbConn *ddbConn, const char *key, const char *value);
status_t DdbStop(DdbConn *ddbConn);
status_t DdbSetWorkMode(DdbConn *ddbConn, unsigned int workMode, unsigned int voteNum);
status_t DdbDemoteRole2Standby(DdbConn *ddbConn);
#endif

View File

@ -285,6 +285,7 @@ extern char g_agentTargetBarrier[BARRIERLEN];
extern char g_environmentThreshold[CM_PATH_LENGTH];
extern char g_doradoIp[CM_IP_LENGTH];
#ifndef ENABLE_MULTIPLE_NODES
extern char g_dbServiceVip[CM_IP_LENGTH];
extern char g_enableFenceDn[10];
#endif
extern uint32 g_diskTimeout;

View File

@ -64,6 +64,7 @@ typedef enum ClientErrorEn {
typedef unsigned char bool8;
#define CMS_ONE_PRIMARY_ONE_STANDBY 2
#define CM_EXIT ((int)-2)
#define CM_FALSE (uint8)0
#define CM_TRUE (uint8)1

View File

@ -40,6 +40,14 @@ using namespace std;
extern "C" {
#endif
// two nodes arch usage
typedef struct st_arbitrate_params_on2nodes {
char thirdPartyGatewayIp[CM_IP_LENGTH];
bool cmsEnableFailoverOn2Nodes;
bool cmsEnableDbCrashRecovery;
uint32 cmsNetworkIsolationTimeout;
} ArbitrateParamsOn2Nodes;
typedef struct st_conn_option {
int connect_timeout; /* ms */
int socket_timeout; /* ms */
@ -189,6 +197,9 @@ typedef struct Instance_t {
extern conn_option_t g_sslOption;
/* two nodes arch usage */
extern ArbitrateParamsOn2Nodes g_paramsOn2Nodes;
/**
* @def SHELL_RETURN_CODE
* @brief Get the shell command return code.
@ -287,5 +298,9 @@ void listen_ip_merge(uint32 ipCnt, const char (*ipListen)[CM_IP_LENGTH], char *r
bool IsNodeIdValid(int nodeId);
void FreeSslOpton();
status_t IsReachableIP(char *ip);
bool IsIPAddrValid(const char *ipAddr);
bool IsNeedCheckFloatIp();
#endif

View File

@ -384,6 +384,9 @@ const int INSTANCE_WALSNDSTATE_UNKNOWN = 6;
#define SSL_ENABLE (1)
#define SSL_DISABLE (2)
#define CM_DDB_CLUSTER_INFO_CMD "--cluster_info"
extern int g_gtmPhonyDeadTimes;
extern int g_dnPhonyDeadTimes[CM_MAX_DATANODE_PER_NODE];
extern int g_cnPhonyDeadTimes;
@ -1193,6 +1196,7 @@ typedef struct agent_to_cm_datanode_status_report_st {
int dn_restart_counts;
int phony_dead_times;
int dn_restart_counts_in_hour;
int dnVipStatus;
} agent_to_cm_datanode_status_report;
typedef struct AgentToCmserverDnSyncListSt {
@ -1499,6 +1503,7 @@ typedef struct cm_instance_datanode_report_status_st {
int phony_dead_times;
int phony_dead_interval;
int dn_restart_counts_in_hour;
int dnVipStatus;
bool is_finish_redo_cmd_sent;
uint64 ckpt_redo_point;
char barrierID[BARRIERLEN];

View File

@ -51,6 +51,8 @@
#define CM_INCREMENT_TERM_VALUE 100
#define CM_INCREMENT_BIG_TERM_VALUE 10000
#define CM_UINT32_MAX 0xFFFFFFFFU
#define CM_MIN_CONN_TO_DDB (2)
@ -113,6 +115,10 @@ typedef struct CM_MonitorThread_t {
CM_WorkThread thread;
} CM_MonitorThread;
typedef struct CM_DdbStatusCheckAndSetThread_t {
CM_WorkThread thread;
} CM_DdbStatusCheckAndSetThread;
typedef struct CM_MonitorNodeStopThread_t {
CM_WorkThread thread;
} CM_MonitorNodeStopThread;
@ -199,6 +205,7 @@ typedef struct CM_ConnDdbInfo_t {
#define THREAD_TYPE_AGENT_SERVER 4
#define THREAD_TYPE_INIT 5
#define THREAD_TYPE_ALARM_CHECKER 6
#define THREAD_TYPE_DDB_STATUS_CHECKER 7
#define MONITOR_CYCLE_TIMER 1000000
#define MONITOR_CYCLE_TIMER_OUT 6000000

View File

@ -41,8 +41,8 @@ extern void ReportReadOnlyAlarm(AlarmType alarmType, const char* instanceName, u
extern void InstanceAlarmItemInitialize(void);
extern void report_phony_dead_alarm(AlarmType alarmType, const char* instanceName, uint32 instanceid);
extern void report_unbalanced_alarm(AlarmType alarmType);
extern void ReportClusterDoublePrimaryAlarm(
AlarmType alarmType, AlarmId alarmId, uint32 instanceId, const char* serviceType);
extern void UnbalanceAlarmItemInitialize(void);
extern void ServerSwitchAlarmItemInitialize(void);
extern void report_server_switch_alarm(AlarmType alarmType, const char* instanceName);

View File

@ -155,4 +155,6 @@ typedef struct DnInstInfo_t {
void DatanodeInstanceArbitrate(MsgRecvInfo* recvMsgInfo, const agent_to_cm_datanode_status_report *agentRep);
bool IsCurrentNodeDorado(uint32 node);
void StopFakePrimaryResourceInstance(const DnArbCtx *ctx);
bool IsInstanceIdMax(const DnArbCtx *ctx);
#endif

View File

@ -45,6 +45,7 @@ int UpdateDynamicConfig();
void UpdateAzNodeInfo();
void GetDdbTypeParam(void);
void GetDdbArbiCfg(int32 loadWay);
void GetTwoNodesArbitrateParams(void);
status_t GetMaintainPath(char *maintainFile, uint32 fileLen);
status_t GetDdbKVFilePath(char *kvFile, uint32 fileLen);
bool IsUpgradeCluster(void);

View File

@ -54,5 +54,5 @@ DDB_RESULT GetHistoryClusterExceptSyncListFromDdb(void);
bool SetGroupExpectSyncList(uint32 groupIndex, const CurrentInstanceStatus *statusInstance);
int SetTermToDdb(uint32 term);
int IncrementTermToDdb(void);
int IncrementTermToDdb(uint32 incTerm = CM_INCREMENT_TERM_VALUE);
#endif

View File

@ -85,5 +85,7 @@ void RestDdbConn(DdbConn *ddbConn, status_t st, const DDB_RESULT *ddbResult);
status_t DoDdbExecCmd(const char *cmd, char *output, int *outputLen, char *errMsg, uint32 maxBufLen);
status_t DoDdbSetBlocked(unsigned int setBlock, unsigned int waitTimeoutMs);
void LoadDdbParamterFromConfig(void);
status_t SetDdbWorkMode(unsigned int workMode, unsigned int voteNum);
status_t DemoteDdbRole2Standby();
#endif

View File

@ -84,6 +84,12 @@ typedef enum GET_HEART_BEAT_FROM_ETCD {
CAN_GET_HEARTBEART
} getHeartBeatFromEtcd;
typedef enum DDB_WORK_MODE_ {
DDB_WORK_MODE_MAJORITY = 0,
DDB_WORK_MODE_MINORITY,
DDB_WORK_MODE_NONE,
} ddb_work_mode;
/* data structures to record instances that are in switchover procedure */
typedef struct switchover_instance_t {
uint32 node;
@ -220,7 +226,7 @@ typedef enum ThreadProcessStatusE {
" -t 60 -O ConnectTimeout=" SSH_CONNECT_TIMEOUT " -O ConnectionAttempts=" SSH_CONNECT_ATTEMPTS \
" -O ServerAliveInterval=" SSH_SERVER_ALIVE_INTERVAL " -O ServerAliveCountMax=" SSH_SERVER_ALIVE_COUNT_MAX " "
#define PSSH_TIMEOUT " -t 30 "
#define SERVICE_TYPE_DB "dn"
#define SWITCHOVER_DEFAULT_WAIT 120 /* It needs an integer multiple of 3, because of sleep(3) */
@ -253,6 +259,9 @@ const int DEFAULT_PHONY_DEAD_EFFECTIVE_TIME = 5;
} \
} while (0)
#define ENABLED_AUTO_FAILOVER_ON2NODES(nodeNum, autoFailover) \
((nodeNum) == CMS_ONE_PRIMARY_ONE_STANDBY && (autoFailover) == true)
bool &GetIsSharedStorageMode();
extern set<int> g_stopNodes;
@ -293,6 +302,7 @@ extern CM_WorkThreads gWorkThreads;
extern CM_IOThreads gIOThreads;
extern CM_HAThreads gHAThreads;
extern CM_MonitorThread gMonitorThread;
extern CM_DdbStatusCheckAndSetThread gDdbCheckThread;
extern CM_MonitorNodeStopThread gMonitorNodeStopThread;
extern cm_fenced_UDF_report_status* g_fenced_UDF_report_status_ptr;
extern pthread_rwlock_t instance_status_rwlock;
@ -306,6 +316,7 @@ extern dynamic_cms_timeline* g_timeline;
extern DynamicNodeReadOnlyInfo *g_dynamicNodeReadOnlyInfo;
extern Alarm UnbalanceAlarmItem[1];
extern Alarm ServerSwitchAlarmItem[1];
extern Alarm DoublePrimaryAlarmItem[1];
extern Alarm* AbnormalDdbAlarmList;
extern volatile logic_cluster_restart_mode cm_logic_cluster_restart_mode;
extern volatile cm_start_mode cm_server_start_mode;
@ -372,6 +383,9 @@ extern uint32 cmserver_self_vote_timeout;
extern uint32 instance_keep_heartbeat_timeout;
extern uint32 g_clusterStartingTimeout;
extern uint32 g_clusterStartingArbitDelay;
extern uint32 g_ddbNetworkIsolationTimeout;
extern ddb_work_mode g_ddbWorkMode;
extern uint32 g_bigVoteNumInMinorityMode;
#ifdef ENABLE_MULTIPLE_NODES
extern uint32 coordinator_heartbeat_timeout;
extern int32 g_cmAgentDeleteCn;

View File

@ -25,6 +25,7 @@
#define CMS_MONITOR_MAIN_H
extern void* CM_ThreadMonitorMain(void* argp);
extern void *CM_ThreadDdbStatusCheckAndSetMain(void *argp);
extern void* CM_ThreadMonitorNodeStopMain(void* argp);
void *CheckBlackList(void *arg);

View File

@ -30,6 +30,7 @@
int CM_CreateHA(void);
int CM_CreateMonitor(void);
int CM_CreateMonitorStopNode(void);
int CM_CreateDdbStatusCheckThread(void);
int CM_CreateWorkThreadPool(uint32 ctlWorkerCount, uint32 agentWorkerCount);
int CM_CreateIOThreadPool(uint32 thrCount);
void CreateDnGroupStatusCheckAndArbitrateThread(void);

View File

@ -88,6 +88,7 @@ typedef enum AlarmId {
ALM_AI_PgxcNodeMismatch = 0x404F0061,
ALM_AI_StreamingDisasterRecoveryCnDisconnected = 0x404F0070,
ALM_AI_StreamingDisasterRecoveryDnDisconnected = 0x404F0071,
ALM_AI_DbInstanceDoublePrimary = 0x404F007A,
ALM_AI_BUTT = 0x7FFFFFFFFFFFFFFF /* force compiler to decide AlarmId as uint64 */
} AlarmId;

View File

@ -54,6 +54,7 @@
1078919265 PgxcNodeMismatch pgxc_node不一致 Coordinator %s pgxc_node mismatch. CN实例%s pgxc_node不一致 critical
1078919280 StreamingDisasterRecoveryCnDisconnected 灾备集群cn断连 In streaming standby cluster,cn is disconnected from cn %s of the primary cluster. 灾备集群CN实例与主集群CN实例%s断连 critical
1078919281 StreamingDisasterRecoveryDnDisconnected 灾备集群dn断连 In streaming standby cluster,dn %s is disconnected from Corresponding Shard dn of the primary cluster. 灾备集群DN实例%s与主集群对应分片DN实例断连 critical
1078919290 DBInstanceDoublePrimary 数据库实例双主 Multi primary db instances have been detected, %s. 本节点的数据库主实例ID:%s critical
#used for om_monitor
alarm_component = /opt/huawei/snas/bin/snas_cm_cmd
alarm_report_interval = 10