From a886daf3298ab828f5bb732ff596c956c1fb36dc Mon Sep 17 00:00:00 2001 From: LiHeng Date: Sun, 8 Nov 2020 20:20:40 +0800 Subject: [PATCH] add identify available zone command for cascade standby We add a column for identify mode before for identify az, which not compatible with distributed gaussdb. Therefore, we remove the added column in identify mode, and add another command identify az to make sure a cascade standby and its upstream both in same available zone. --- src/bin/pg_ctl/pg_build.cpp | 2 +- .../storage/replication/datareceiver.cpp | 4 +- .../storage/replication/libpqwalreceiver.cpp | 75 +++++++++++++------ .../storage/replication/repl_gram.y | 14 +++- .../storage/replication/repl_scanner.l | 1 + .../storage/replication/walsender.cpp | 43 +++++++++-- src/include/nodes/nodes.h | 3 + src/include/nodes/replnodes.h | 6 ++ 8 files changed, 116 insertions(+), 32 deletions(-) diff --git a/src/bin/pg_ctl/pg_build.cpp b/src/bin/pg_ctl/pg_build.cpp index cffb07d78..c4c57e0bc 100755 --- a/src/bin/pg_ctl/pg_build.cpp +++ b/src/bin/pg_ctl/pg_build.cpp @@ -781,7 +781,7 @@ static ServerMode get_remote_mode(PGconn* conn_get) if (PQresultStatus(res) != PGRES_TUPLES_OK) { goto exit; } - if (PQnfields(res) != 2 || PQntuples(res) != 1) { + if (PQnfields(res) != 1 || PQntuples(res) != 1) { goto exit; } primary_mode = (ServerMode)pg_atoi((const char*)PQgetvalue(res, 0, 0), 4, 0); diff --git a/src/gausskernel/storage/replication/datareceiver.cpp b/src/gausskernel/storage/replication/datareceiver.cpp index b77795b51..7d9283625 100755 --- a/src/gausskernel/storage/replication/datareceiver.cpp +++ b/src/gausskernel/storage/replication/datareceiver.cpp @@ -1490,7 +1490,7 @@ static void DataRcvStreamConnect(char* conninfo) "the primary server: %s", PQerrorMessage(t_thrd.datareceiver_cxt.dataStreamingConn)))); } - if (PQnfields(res) != 2 || PQntuples(res) != 1) { + if (PQnfields(res) != 1 || PQntuples(res) != 1) { int ntuples = PQntuples(res); int nfields = PQnfields(res); @@ -1498,7 +1498,7 @@ static void DataRcvStreamConnect(char* conninfo) ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS), errmsg("invalid response from primary server"), - errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.", ntuples, nfields))); + errdetail("Expected 1 tuple with 1 fields, got %d tuples with %d fields.", ntuples, nfields))); } primary_mode = (ServerMode)pg_strtoint32(PQgetvalue(res, 0, 0)); if (primary_mode != PRIMARY_MODE) { diff --git a/src/gausskernel/storage/replication/libpqwalreceiver.cpp b/src/gausskernel/storage/replication/libpqwalreceiver.cpp index 5eeb6ea08..49bb1f34f 100755 --- a/src/gausskernel/storage/replication/libpqwalreceiver.cpp +++ b/src/gausskernel/storage/replication/libpqwalreceiver.cpp @@ -58,6 +58,52 @@ extern void SetDataRcvDummyStandbySyncPercent(int percent); #define AmWalReceiverForDummyStandby() \ (t_thrd.walreceiver_cxt.AmWalReceiverForFailover && !t_thrd.walreceiver_cxt.AmWalReceiverForStandby) +#ifndef ENABLE_MULTIPLE_NODES +/* + * Identify remote az should be same with local for a cascade standby. + */ +static void IdentifyRemoteAvailableZone(void) +{ + if (!t_thrd.xlog_cxt.is_cascade_standby) { + return; + } + + volatile WalRcvData* walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; + int nRet = 0; + PGresult* res = NULL; + + /* Send query and get available zone of the remote server. */ + res = libpqrcv_PQexec("IDENTIFY_AZ"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + PQclear(res); + ereport(ERROR, + (errcode(ERRCODE_INVALID_STATUS), + errmsg("could not receive the ongoing az infomation from " + "the remote server: %s", + PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn)))); + } + + /* check remote az */ + char remoteAZname[NAMEDATALEN]; + nRet = snprintf_s(remoteAZname, NAMEDATALEN, NAMEDATALEN -1, "%s", PQgetvalue(res, 0, 0)); + securec_check_ss(nRet, "", ""); + + if (strcmp(remoteAZname, g_instance.attr.attr_storage.available_zone) != 0) { + PQclear(res); + + SpinLockAcquire(&walrcv->mutex); + walrcv->conn_errno = REPL_INFO_ERROR; + SpinLockRelease(&walrcv->mutex); + + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("the remote available zone should be same with local, remote is %s, local is %s", + remoteAZname, g_instance.attr.attr_storage.available_zone))); + } + PQclear(res); +} +#endif + /* * Establish the connection to the primary server for XLOG streaming */ @@ -351,7 +397,7 @@ retry: PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn)))); return false; } - if (PQnfields(res) != 2 || PQntuples(res) != 1) { + if (PQnfields(res) != 1 || PQntuples(res) != 1) { int num_tuples = PQntuples(res); int num_fields = PQnfields(res); @@ -360,7 +406,7 @@ retry: (errcode(ERRCODE_INVALID_STATUS), errmsg("invalid response from remote server"), errdetail( - "Expected 1 tuple with 2 fields, got %d tuples with %d fields.", num_tuples, num_fields))); + "Expected 1 tuple with 1 fields, got %d tuples with %d fields.", num_tuples, num_fields))); return false; } remoteMode = (ServerMode)pg_strtoint32(PQgetvalue(res, 0, 0)); @@ -394,28 +440,15 @@ retry: return false; } - /* check remote az */ - char remoteAZname[NAMEDATALEN]; - nRet = snprintf_s(remoteAZname, NAMEDATALEN, NAMEDATALEN -1, "%s", PQgetvalue(res, 0, 1)); - securec_check_ss(nRet, "", ""); - if (t_thrd.xlog_cxt.is_cascade_standby && - (strcmp(remoteAZname, g_instance.attr.attr_storage.available_zone) != 0)) { - PQclear(res); - - SpinLockAcquire(&walrcv->mutex); - walrcv->conn_errno = REPL_INFO_ERROR; - SpinLockRelease(&walrcv->mutex); - - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("the remote available zone should be same with local, remote is %s, local is %s", - remoteAZname, g_instance.attr.attr_storage.available_zone))); - return false; - } - PQclear(res); } +#ifndef ENABLE_MULTIPLE_NODES + if (t_thrd.xlog_cxt.is_cascade_standby) { + IdentifyRemoteAvailableZone(); + } +#endif + /* * Get the system identifier and timeline ID as a DataRow message from the * primary server. diff --git a/src/gausskernel/storage/replication/repl_gram.y b/src/gausskernel/storage/replication/repl_gram.y index 7b8da37b7..3425f8954 100755 --- a/src/gausskernel/storage/replication/repl_gram.y +++ b/src/gausskernel/storage/replication/repl_gram.y @@ -81,6 +81,7 @@ %token K_IDENTIFY_MAXLSN %token K_IDENTIFY_CONSISTENCE %token K_IDENTIFY_CHANNEL +%token K_IDENTIFY_AZ %token K_LABEL %token K_PROGRESS %token K_FAST @@ -97,7 +98,7 @@ %token K_SLOT %type command -%type base_backup start_replication start_data_replication fetch_mot_checkpoint start_logical_replication identify_system identify_version identify_mode identify_consistence create_replication_slot drop_replication_slot identify_maxlsn identify_channel +%type base_backup start_replication start_data_replication fetch_mot_checkpoint start_logical_replication identify_system identify_version identify_mode identify_consistence create_replication_slot drop_replication_slot identify_maxlsn identify_channel identify_az %type base_backup_opt_list %type base_backup_opt %type plugin_options plugin_opt_list @@ -130,6 +131,7 @@ command: | drop_replication_slot | identify_maxlsn | identify_channel + | identify_az ; /* @@ -202,6 +204,16 @@ identify_channel: } ; +/* + * IDENTIFY_AZ + */ +identify_az: + K_IDENTIFY_AZ + { + $$ = (Node *) makeNode(IdentifyAZCmd); + } + ; + /* * BASE_BACKUP [LABEL '