add identify available zone command for cascade standby
We add a column for identify mode before for identify az, which not compatible with distributed gaussdb. Therefore, we remove the added column in identify mode, and add another command identify az to make sure a cascade standby and its upstream both in same available zone.
This commit is contained in:
@ -781,7 +781,7 @@ static ServerMode get_remote_mode(PGconn* conn_get)
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
|
||||
goto exit;
|
||||
}
|
||||
if (PQnfields(res) != 2 || PQntuples(res) != 1) {
|
||||
if (PQnfields(res) != 1 || PQntuples(res) != 1) {
|
||||
goto exit;
|
||||
}
|
||||
primary_mode = (ServerMode)pg_atoi((const char*)PQgetvalue(res, 0, 0), 4, 0);
|
||||
|
||||
@ -1490,7 +1490,7 @@ static void DataRcvStreamConnect(char* conninfo)
|
||||
"the primary server: %s",
|
||||
PQerrorMessage(t_thrd.datareceiver_cxt.dataStreamingConn))));
|
||||
}
|
||||
if (PQnfields(res) != 2 || PQntuples(res) != 1) {
|
||||
if (PQnfields(res) != 1 || PQntuples(res) != 1) {
|
||||
int ntuples = PQntuples(res);
|
||||
int nfields = PQnfields(res);
|
||||
|
||||
@ -1498,7 +1498,7 @@ static void DataRcvStreamConnect(char* conninfo)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("invalid response from primary server"),
|
||||
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.", ntuples, nfields)));
|
||||
errdetail("Expected 1 tuple with 1 fields, got %d tuples with %d fields.", ntuples, nfields)));
|
||||
}
|
||||
primary_mode = (ServerMode)pg_strtoint32(PQgetvalue(res, 0, 0));
|
||||
if (primary_mode != PRIMARY_MODE) {
|
||||
|
||||
@ -58,6 +58,52 @@ extern void SetDataRcvDummyStandbySyncPercent(int percent);
|
||||
#define AmWalReceiverForDummyStandby() \
|
||||
(t_thrd.walreceiver_cxt.AmWalReceiverForFailover && !t_thrd.walreceiver_cxt.AmWalReceiverForStandby)
|
||||
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
/*
|
||||
* Identify remote az should be same with local for a cascade standby.
|
||||
*/
|
||||
static void IdentifyRemoteAvailableZone(void)
|
||||
{
|
||||
if (!t_thrd.xlog_cxt.is_cascade_standby) {
|
||||
return;
|
||||
}
|
||||
|
||||
volatile WalRcvData* walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
|
||||
int nRet = 0;
|
||||
PGresult* res = NULL;
|
||||
|
||||
/* Send query and get available zone of the remote server. */
|
||||
res = libpqrcv_PQexec("IDENTIFY_AZ");
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
|
||||
PQclear(res);
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("could not receive the ongoing az infomation from "
|
||||
"the remote server: %s",
|
||||
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
|
||||
}
|
||||
|
||||
/* check remote az */
|
||||
char remoteAZname[NAMEDATALEN];
|
||||
nRet = snprintf_s(remoteAZname, NAMEDATALEN, NAMEDATALEN -1, "%s", PQgetvalue(res, 0, 0));
|
||||
securec_check_ss(nRet, "", "");
|
||||
|
||||
if (strcmp(remoteAZname, g_instance.attr.attr_storage.available_zone) != 0) {
|
||||
PQclear(res);
|
||||
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
walrcv->conn_errno = REPL_INFO_ERROR;
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("the remote available zone should be same with local, remote is %s, local is %s",
|
||||
remoteAZname, g_instance.attr.attr_storage.available_zone)));
|
||||
}
|
||||
PQclear(res);
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Establish the connection to the primary server for XLOG streaming
|
||||
*/
|
||||
@ -351,7 +397,7 @@ retry:
|
||||
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
|
||||
return false;
|
||||
}
|
||||
if (PQnfields(res) != 2 || PQntuples(res) != 1) {
|
||||
if (PQnfields(res) != 1 || PQntuples(res) != 1) {
|
||||
int num_tuples = PQntuples(res);
|
||||
int num_fields = PQnfields(res);
|
||||
|
||||
@ -360,7 +406,7 @@ retry:
|
||||
(errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("invalid response from remote server"),
|
||||
errdetail(
|
||||
"Expected 1 tuple with 2 fields, got %d tuples with %d fields.", num_tuples, num_fields)));
|
||||
"Expected 1 tuple with 1 fields, got %d tuples with %d fields.", num_tuples, num_fields)));
|
||||
return false;
|
||||
}
|
||||
remoteMode = (ServerMode)pg_strtoint32(PQgetvalue(res, 0, 0));
|
||||
@ -394,28 +440,15 @@ retry:
|
||||
return false;
|
||||
}
|
||||
|
||||
/* check remote az */
|
||||
char remoteAZname[NAMEDATALEN];
|
||||
nRet = snprintf_s(remoteAZname, NAMEDATALEN, NAMEDATALEN -1, "%s", PQgetvalue(res, 0, 1));
|
||||
securec_check_ss(nRet, "", "");
|
||||
if (t_thrd.xlog_cxt.is_cascade_standby &&
|
||||
(strcmp(remoteAZname, g_instance.attr.attr_storage.available_zone) != 0)) {
|
||||
PQclear(res);
|
||||
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
walrcv->conn_errno = REPL_INFO_ERROR;
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("the remote available zone should be same with local, remote is %s, local is %s",
|
||||
remoteAZname, g_instance.attr.attr_storage.available_zone)));
|
||||
return false;
|
||||
}
|
||||
|
||||
PQclear(res);
|
||||
}
|
||||
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
if (t_thrd.xlog_cxt.is_cascade_standby) {
|
||||
IdentifyRemoteAvailableZone();
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Get the system identifier and timeline ID as a DataRow message from the
|
||||
* primary server.
|
||||
|
||||
@ -81,6 +81,7 @@
|
||||
%token K_IDENTIFY_MAXLSN
|
||||
%token K_IDENTIFY_CONSISTENCE
|
||||
%token K_IDENTIFY_CHANNEL
|
||||
%token K_IDENTIFY_AZ
|
||||
%token K_LABEL
|
||||
%token K_PROGRESS
|
||||
%token K_FAST
|
||||
@ -97,7 +98,7 @@
|
||||
%token K_SLOT
|
||||
|
||||
%type <node> command
|
||||
%type <node> base_backup start_replication start_data_replication fetch_mot_checkpoint start_logical_replication identify_system identify_version identify_mode identify_consistence create_replication_slot drop_replication_slot identify_maxlsn identify_channel
|
||||
%type <node> base_backup start_replication start_data_replication fetch_mot_checkpoint start_logical_replication identify_system identify_version identify_mode identify_consistence create_replication_slot drop_replication_slot identify_maxlsn identify_channel identify_az
|
||||
%type <list> base_backup_opt_list
|
||||
%type <defelt> base_backup_opt
|
||||
%type <list> plugin_options plugin_opt_list
|
||||
@ -130,6 +131,7 @@ command:
|
||||
| drop_replication_slot
|
||||
| identify_maxlsn
|
||||
| identify_channel
|
||||
| identify_az
|
||||
;
|
||||
|
||||
/*
|
||||
@ -202,6 +204,16 @@ identify_channel:
|
||||
}
|
||||
;
|
||||
|
||||
/*
|
||||
* IDENTIFY_AZ
|
||||
*/
|
||||
identify_az:
|
||||
K_IDENTIFY_AZ
|
||||
{
|
||||
$$ = (Node *) makeNode(IdentifyAZCmd);
|
||||
}
|
||||
;
|
||||
|
||||
/*
|
||||
* BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] [TABLESPACE_MAP]
|
||||
*/
|
||||
|
||||
@ -91,6 +91,7 @@ IDENTIFY_MODE { return K_IDENTIFY_MODE; }
|
||||
IDENTIFY_MAXLSN { return K_IDENTIFY_MAXLSN; }
|
||||
IDENTIFY_CONSISTENCE { return K_IDENTIFY_CONSISTENCE; }
|
||||
IDENTIFY_CHANNEL { return K_IDENTIFY_CHANNEL; }
|
||||
IDENTIFY_AZ { return K_IDENTIFY_AZ; }
|
||||
LABEL { return K_LABEL; }
|
||||
NOWAIT { return K_NOWAIT; }
|
||||
PROGRESS { return K_PROGRESS; }
|
||||
|
||||
@ -677,7 +677,7 @@ void IdentifyMode(void)
|
||||
|
||||
/* Send a RowDescription message */
|
||||
pq_beginmessage(&buf, 'T');
|
||||
pq_sendint16(&buf, 2); /* 2 fields */
|
||||
pq_sendint16(&buf, 1); /* 1 fields */
|
||||
|
||||
/* first field */
|
||||
pq_sendstring(&buf, "smode"); /* col name */
|
||||
@ -687,8 +687,34 @@ void IdentifyMode(void)
|
||||
pq_sendint16(&buf, 4); /* typlen */
|
||||
pq_sendint32(&buf, 0); /* typmod */
|
||||
pq_sendint16(&buf, 0); /* format code */
|
||||
pq_endmessage_noblock(&buf);
|
||||
|
||||
/* second field */
|
||||
/* Send a DataRow message */
|
||||
pq_beginmessage(&buf, 'D');
|
||||
pq_sendint16(&buf, 1); /* # of columns */
|
||||
pq_sendint32(&buf, strlen(smode)); /* col1 len */
|
||||
pq_sendbytes(&buf, (char*)smode, strlen(smode));
|
||||
pq_endmessage_noblock(&buf);
|
||||
|
||||
/* Send CommandComplete and ReadyForQuery messages */
|
||||
EndCommand_noblock("SELECT", DestRemote);
|
||||
ReadyForQuery_noblock(DestRemote, u_sess->attr.attr_storage.wal_sender_timeout);
|
||||
/* ReadyForQuery did pq_flush_if_available for us */
|
||||
}
|
||||
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
/*
|
||||
* IDENTIFY_AZ
|
||||
*/
|
||||
void IdentifyAvailableZone(void)
|
||||
{
|
||||
StringInfoData buf;
|
||||
|
||||
/* Send a RowDescription message */
|
||||
pq_beginmessage(&buf, 'T');
|
||||
pq_sendint16(&buf, 1); /* 1 fields */
|
||||
|
||||
/* first field */
|
||||
pq_sendstring(&buf, "azname"); /* col name */
|
||||
pq_sendint32(&buf, 0); /* table oid */
|
||||
pq_sendint16(&buf, 0); /* attnum */
|
||||
@ -700,12 +726,9 @@ void IdentifyMode(void)
|
||||
|
||||
/* Send a DataRow message */
|
||||
pq_beginmessage(&buf, 'D');
|
||||
pq_sendint16(&buf, 2); /* # of columns */
|
||||
pq_sendint32(&buf, strlen(smode)); /* col1 len */
|
||||
pq_sendbytes(&buf, (char*)smode, strlen(smode));
|
||||
|
||||
pq_sendint16(&buf, 1); /* # of columns */
|
||||
char* azname = g_instance.attr.attr_storage.available_zone;
|
||||
pq_sendint32(&buf, strlen(azname)); /* col2 len */
|
||||
pq_sendint32(&buf, strlen(azname)); /* col1 len */
|
||||
pq_sendbytes(&buf, (char*)azname, strlen(azname));
|
||||
pq_endmessage_noblock(&buf);
|
||||
|
||||
@ -714,6 +737,7 @@ void IdentifyMode(void)
|
||||
ReadyForQuery_noblock(DestRemote, u_sess->attr.attr_storage.wal_sender_timeout);
|
||||
/* ReadyForQuery did pq_flush_if_available for us */
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* IDENTIFY_MAXLSN
|
||||
@ -1763,6 +1787,11 @@ static bool HandleWalReplicationCommand(const char* cmd_string)
|
||||
IdentifyChannel((IdentifyChannelCmd*)cmd_node);
|
||||
break;
|
||||
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
case T_IdentifyAZCmd:
|
||||
IdentifyAvailableZone();
|
||||
break;
|
||||
#endif
|
||||
case T_BaseBackupCmd:
|
||||
MarkPostmasterChildNormal();
|
||||
SetWalSndPeerMode(STANDBY_MODE);
|
||||
|
||||
@ -513,6 +513,9 @@ typedef enum NodeTag {
|
||||
T_IdentifyMaxLsnCmd,
|
||||
T_IdentifyConsistenceCmd,
|
||||
T_IdentifyChannelCmd,
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
T_IdentifyAZCmd,
|
||||
#endif
|
||||
T_BaseBackupCmd,
|
||||
T_CreateReplicationSlotCmd,
|
||||
T_DropReplicationSlotCmd,
|
||||
|
||||
@ -49,6 +49,12 @@ typedef struct IdentifyChannelCmd {
|
||||
int channel_identifier;
|
||||
} IdentifyChannelCmd;
|
||||
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
typedef struct IdentifyAZCmd {
|
||||
NodeTag type;
|
||||
} IdentifyAZCmd;
|
||||
#endif
|
||||
|
||||
/* ----------------------
|
||||
* BASE_BACKUP command
|
||||
* ----------------------
|
||||
|
||||
Reference in New Issue
Block a user