Files
openGauss-server/src/gausskernel/storage/replication/libpqwalreceiver.cpp
2024-09-23 20:34:45 +08:00

1737 lines
69 KiB
C++
Executable File

/* -------------------------------------------------------------------------
*
* libpqwalreceiver.cpp
*
* This file contains the libpq-specific parts of walreceiver. It's
* loaded as a dynamic module to avoid linking the main server binary with
* libpq.
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
* src/gausskernel/storage/replication/libpqwalreceiver.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include <sys/time.h>
#include "libpq/libpq-int.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "replication/walreceiver.h"
#include "replication/walsender_private.h"
#include "replication/libpqwalreceiver.h"
#include "replication/ss_disaster_cluster.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "utils/guc.h"
#ifdef HAVE_NETINET_TCP_H
#include <netinet/tcp.h>
#endif
#include <arpa/inet.h>
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#include "utils/int8.h"
#include "utils/pg_lsn.h"
#include "utils/builtins.h"
/* Prototypes for private functions */
static bool libpq_select(int timeout_ms);
static PGresult *libpqrcv_PQexec(const char *query);
static void ha_set_conn_channel(void);
static void ha_add_disconnect_count(void);
static void ha_set_port_to_remote(PGconn *dummy_conn, int ha_port);
static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
extern void SetDataRcvDummyStandbySyncPercent(int percent);
#define AmWalReceiverForDummyStandby() \
(t_thrd.walreceiver_cxt.AmWalReceiverForFailover && !t_thrd.walreceiver_cxt.AmWalReceiverForStandby)
#ifndef ENABLE_MULTIPLE_NODES
/*
* Identify remote az should be same with local for a cascade standby.
*/
static void IdentifyRemoteAvailableZone(void)
{
if (!t_thrd.xlog_cxt.is_cascade_standby) {
return;
}
volatile WalRcvData* walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
int nRet = 0;
PGresult* res = NULL;
/* Send query and get available zone of the remote server. */
res = libpqrcv_PQexec("IDENTIFY_AZ");
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS),
errmsg("could not receive the ongoing az infomation from "
"the remote server: %s",
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
}
/* check remote az */
char remoteAZname[NAMEDATALEN];
nRet = snprintf_s(remoteAZname, NAMEDATALEN, NAMEDATALEN -1, "%s", PQgetvalue(res, 0, 0));
securec_check_ss(nRet, "", "");
if (strcmp(remoteAZname, g_instance.attr.attr_storage.available_zone) != 0) {
PQclear(res);
SpinLockAcquire(&walrcv->mutex);
walrcv->conn_errno = REPL_INFO_ERROR;
SpinLockRelease(&walrcv->mutex);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the remote available zone should be same with local, remote is %s, local is %s",
remoteAZname, g_instance.attr.attr_storage.available_zone)));
}
PQclear(res);
}
#endif
/*
* Establish the connection to the primary server for XLOG streaming
*/
bool libpqrcv_connect_for_TLI(TimeLineID *timeLineID, char *conninfo)
{
char conninfoRepl[MAXCONNINFO + 75] = {0};
TimeLineID remoteTli;
PGresult *res = NULL;
int nRet = 0;
/*
* Connect using deliberately undocumented parameter: replication. The
* database name is ignored by the server in replication mode, but specify
* "replication" for .pgpass lookup.
*/
if (!dummyStandbyMode) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("dummyStandbyMode should be true")));
}
nRet = snprintf_s(conninfoRepl, sizeof(conninfoRepl), sizeof(conninfoRepl) - 1,
"%s dbname=replication replication=true "
"fallback_application_name=dummystandby connect_timeout=%d enable_ce=1",
conninfo, u_sess->attr.attr_storage.wal_receiver_connect_timeout);
securec_check_ss(nRet, "", "");
ereport(LOG, (errmsg("Connecting to primary :%s", conninfo)));
t_thrd.libwalreceiver_cxt.streamConn = PQconnectdb(conninfoRepl);
if (PQstatus(t_thrd.libwalreceiver_cxt.streamConn) != CONNECTION_OK) {
char *errMsg = PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn);
char *subErrMsg = "can not accept connection in standby mode";
ereport(LOG, (errmsg("wal receiver could not connect to the primary server,the connection info :%s : %s",
conninfo, PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
if (errMsg != NULL && strstr(errMsg, subErrMsg) != NULL) {
clean_failover_host_conninfo_for_dummy();
}
PQfinish(t_thrd.libwalreceiver_cxt.streamConn);
t_thrd.libwalreceiver_cxt.streamConn = NULL;
return false;
}
ereport(LOG, (errmsg("Connected to primary :%s success.", conninfo)));
/*
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
PQclear(res);
ereport(LOG, (errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s",
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
PQfinish(t_thrd.libwalreceiver_cxt.streamConn);
t_thrd.libwalreceiver_cxt.streamConn = NULL;
return false;
}
if (PQnfields(res) < 3 || PQntuples(res) != 1) {
int ntuples = PQntuples(res);
int nfields = PQnfields(res);
PQclear(res);
ereport(
LOG,
(errmsg("invalid response from primary server"),
errdetail("Could not identify system: Got %d rows and %d fields, expected %d rows and %d or more fields.",
ntuples, nfields, 1, 3)));
PQfinish(t_thrd.libwalreceiver_cxt.streamConn);
t_thrd.libwalreceiver_cxt.streamConn = NULL;
return false;
}
remoteTli = pg_strtoint32(PQgetvalue(res, 0, 1));
/*
* Confirm that the current timeline of the primary is the same as the
* recovery target timeline.
*/
PQclear(res);
*timeLineID = t_thrd.xlog_cxt.ThisTimeLineID = remoteTli;
libpqrcv_disconnect();
return true;
}
#ifndef ENABLE_MULTIPLE_NODES
#define IS_PRIMARY_NORMAL(servermode) (servermode == PRIMARY_MODE)
#else
#define IS_PRIMARY_NORMAL(servermode) ((servermode == PRIMARY_MODE) || (servermode == NORMAL_MODE))
#endif
static bool CheckRemoteServerSharedStorage(ServerMode remoteMode, PGresult* res)
{
if (IS_SHARED_STORAGE_PRIMARY_CLUSTER_STANDBY_MODE && !IS_SHARED_STORAGE_CASCADE_STANDBY_MODE) {
if (!IS_PRIMARY_NORMAL(remoteMode)) {
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("for standby the mode of the remote server must be primary, current is %s",
wal_get_role_string(remoteMode, true))));
return false;
}
} else if (IS_SHARED_STORAGE_CASCADE_STANDBY_MODE) {
if (remoteMode != MAIN_STANDBY_MODE) {
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("for main-standby cluster standby the mode of the remote server must be "
"main standby, current is %s", wal_get_role_string(remoteMode, true))));
return false;
}
} else if (IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE) {
if (!IS_PRIMARY_NORMAL(remoteMode) && remoteMode != MAIN_STANDBY_MODE) {
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("for standby cluster standby the mode of the remote server must be "
"primary or main standby, current is %s", wal_get_role_string(remoteMode, true))));
return false;
}
}
return true;
}
static bool CheckSSRemoteServerMode(ServerMode remoteMode, PGresult* res)
{
if (SS_DISASTER_MAIN_STANDBY_NODE) {
if (remoteMode != PRIMARY_MODE) {
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("when ss disaster, main standby of standby cluster of the remote server must be primary, current is %s",
wal_get_role_string(remoteMode, true))));
return false;
}
}
return true;
}
void StartRemoteStreaming(const LibpqrcvConnectParam *options)
{
Assert(t_thrd.libwalreceiver_cxt.streamConn != NULL);
StringInfoData cmd;
initStringInfo(&cmd);
appendStringInfoString(&cmd, "START_REPLICATION");
if (!t_thrd.walreceiver_cxt.AmWalReceiverForFailover && options->slotname != NULL) {
appendStringInfo(&cmd, " SLOT \"%s\"", options->slotname);
}
if (options->logical) {
appendStringInfo(&cmd, " LOGICAL");
}
appendStringInfo(&cmd, " %X/%X", (uint32)(options->startpoint >> 32), (uint32)(options->startpoint));
if (options->logical) {
char *pubnames_literal = NULL;
appendStringInfoString(&cmd, " (");
appendStringInfo(&cmd, "proto_version '%u'", options->protoVersion);
char *pubnames_str =
stringlist_to_identifierstr(t_thrd.libwalreceiver_cxt.streamConn, options->publicationNames);
pubnames_literal = PQescapeLiteral(t_thrd.libwalreceiver_cxt.streamConn, pubnames_str, strlen(pubnames_str));
appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
PQfreemem(pubnames_literal);
pfree(pubnames_str);
if (options->binary && PQserverVersion(t_thrd.libwalreceiver_cxt.streamConn) >= 90204) {
appendStringInfoString(&cmd, ", binary 'true'");
ereport(DEBUG5, (errmsg("append binary true")));
}
if (options->useSnapshot) {
appendStringInfoString(&cmd, ", usesnapshot 'true'");
}
appendStringInfoChar(&cmd, ')');
}
PGresult *res = libpqrcv_PQexec(cmd.data);
pfree(cmd.data);
if (PQresultStatus(res) != PGRES_COPY_BOTH) {
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS), errmsg("could not start WAL streaming: %s",
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
}
PQclear(res);
}
void CreateRemoteReplicationSlot(XLogRecPtr startpoint, const char* slotname, bool isLogical, XLogRecPtr *lsn,
bool useSnapshot, CommitSeqNo *csn)
{
Assert(t_thrd.libwalreceiver_cxt.streamConn != NULL);
StringInfoData cmd;
initStringInfo(&cmd);
appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
if (isLogical) {
appendStringInfoString(&cmd, " LOGICAL pgoutput");
} else {
appendStringInfo(&cmd, " PHYSICAL %X/%X", (uint32)(startpoint >> 32), (uint32)(startpoint));
}
if (useSnapshot) {
appendStringInfoString(&cmd, " USE_SNAPSHOT");
}
PGresult *res = libpqrcv_PQexec(cmd.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
PQclear(res);
pfree(cmd.data);
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS), errmsg("could not create replication slot %s : %s", slotname,
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
}
if (lsn) {
*lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid, CStringGetDatum(PQgetvalue(res, 0, 1))));
}
if (useSnapshot && csn) {
*csn = (CommitSeqNo)DatumGetInt64(
DirectFunctionCall1Coll(int8in, InvalidOid, CStringGetDatum(PQgetvalue(res, 0, 4))));
}
pfree(cmd.data);
PQclear(res);
}
/* checkRemote: don't check the result is checkRemote is false, just let remote do some init */
void IdentifyRemoteSystem(bool checkRemote)
{
Assert(t_thrd.libwalreceiver_cxt.streamConn != NULL);
char *remoteSysid = NULL;
char localSysid[32] = {0};
TimeLineID remoteTli;
TimeLineID localTli;
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
PGresult *res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("could not receive database system identifier and timeline ID from "
"the remote server: %s",
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
}
if (PQnfields(res) != 4 || PQntuples(res) != 1) {
int num_tuples = PQntuples(res);
int num_fields = PQnfields(res);
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS), errmsg("invalid response from remote server"),
errdetail("Could not identify system: Got %d rows and %d fields, expected %d rows and %d or more fields.",
num_tuples, num_fields, 1, 4)));
}
if (!checkRemote) {
PQclear(res);
return;
}
remoteSysid = PQgetvalue(res, 0, 0);
remoteTli = pg_strtoint32(PQgetvalue(res, 0, 1));
/*
* Confirm that the system identifier of the primary is the same as ours.
*/
int nRet = snprintf_s(localSysid, sizeof(localSysid), sizeof(localSysid) - 1, UINT64_FORMAT, GetSystemIdentifier());
securec_check_ss(nRet, "", "");
if (strcmp(remoteSysid, localSysid) != 0) {
if (dummyStandbyMode) {
/* delete local xlog. */
ProcessWSRmXLog();
if (g_instance.attr.attr_storage.enable_mix_replication) {
while (true) {
if (!ws_dummy_data_writer_use_file) {
CloseWSDataFileOnDummyStandby();
break;
} else {
pg_usleep(100000); /* sleep 0.1 s */
}
}
ProcessWSRmData();
}
// only walreceiver set standby_sysid.datareceiver do not set again.
int rc = memcpy_s(localSysid, sizeof(localSysid), remoteSysid, sizeof(localSysid));
securec_check(rc, "", "");
sync_system_identifier = strtoul(remoteSysid, 0, 10);
ereport(LOG, (errmsg("DummyStandby system identifier differs between the primary"),
errdetail("The primary's identifier is %s, the standby's identifier is %s.sync_system_identifier=%lu",
remoteSysid, localSysid, sync_system_identifier)));
} else {
remoteSysid = pstrdup(remoteSysid);
PQclear(res);
/*
* If the system id is different,
* then set error message in WalRcv and rebuild reason in HaShmData.
*/
SpinLockAcquire(&walrcv->mutex);
if (AmWalReceiverForDummyStandby()) {
walrcv->dummyStandbyConnectFailed = true;
}
SpinLockRelease(&walrcv->mutex);
ha_set_rebuild_connerror(SYSTEMID_REBUILD, REPL_INFO_ERROR);
if (!t_thrd.xlog_cxt.is_cascade_standby) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS),
errmsg("database system identifier differs between the primary and standby"),
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
remoteSysid,
localSysid)));
} else {
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS),
errmsg("database system identifier differs between the standby and cascade standby"),
errdetail("The standby's identifier is %s, the cascade standby's identifier is %s.",
remoteSysid,
localSysid)));
}
}
}
/*
* Confirm that the current timeline of the primary is the same as the
* recovery target timeline.
*/
if (dummyStandbyMode) {
localTli = remoteTli;
} else {
localTli = GetRecoveryTargetTLI();
}
PQclear(res);
if (t_thrd.walreceiver_cxt.AmWalReceiverForFailover) {
t_thrd.xlog_cxt.ThisTimeLineID = localTli;
} else {
if (remoteTli != localTli) {
/*
* If the timeline id different,
* then set error message in WalRcv and rebuild reason in HaShmData.
*/
ha_set_rebuild_connerror(TIMELINE_REBUILD, REPL_INFO_ERROR);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("timeline %u of the primary does not match recovery target timeline %u", remoteTli,
localTli)));
}
t_thrd.xlog_cxt.ThisTimeLineID = remoteTli;
}
}
/* identify remote mode, should do this after connect success. */
ServerMode IdentifyRemoteMode()
{
Assert(t_thrd.libwalreceiver_cxt.streamConn != NULL);
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
PGresult *res = libpqrcv_PQexec("IDENTIFY_MODE");
ServerMode remoteMode = UNKNOWN_MODE;
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS), errmsg("could not receive the ongoing mode infomation from "
"the remote server: %s",
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
}
if (PQnfields(res) != 1 || PQntuples(res) != 1) {
int num_tuples = PQntuples(res);
int num_fields = PQnfields(res);
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS), errmsg("invalid response from remote server"),
errdetail("Expected 1 tuple with 1 fields, got %d tuples with %d fields.", num_tuples,
num_fields)));
}
remoteMode = (ServerMode)pg_strtoint32(PQgetvalue(res, 0, 0));
if (walrcv->conn_target != REPCONNTARGET_PUBLICATION &&
!t_thrd.walreceiver_cxt.AmWalReceiverForFailover &&
(!IS_PRIMARY_NORMAL(remoteMode)) &&
/* remoteMode of cascade standby is a standby */
!t_thrd.xlog_cxt.is_cascade_standby && !IS_SHARED_STORAGE_MODE && !SS_DISASTER_CLUSTER) {
PQclear(res);
if (dummyStandbyMode) {
clean_failover_host_conninfo_for_dummy();
}
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("the mode of the remote server must be primary, current is %s",
wal_get_role_string(remoteMode, true))));
}
if (t_thrd.postmaster_cxt.HaShmData->is_cascade_standby && remoteMode != STANDBY_MODE &&
!IS_SHARED_STORAGE_MODE && !SS_DISASTER_CLUSTER) {
PQclear(res);
SpinLockAcquire(&walrcv->mutex);
walrcv->conn_errno = REPL_INFO_ERROR;
SpinLockRelease(&walrcv->mutex);
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("for cascade standby the mode of the remote server must be standby, current is %s",
wal_get_role_string(remoteMode, true))));
}
if (IS_SHARED_STORAGE_MODE) {
if (!CheckRemoteServerSharedStorage(remoteMode, res)) {
return UNKNOWN_MODE;
}
}
if (SS_DISASTER_CLUSTER && !CheckSSRemoteServerMode(remoteMode, res)) {
return UNKNOWN_MODE;
}
PQclear(res);
return remoteMode;
}
/* identify remote version, should do this after connect success. */
static int32 IdentifyRemoteVersion()
{
Assert(t_thrd.libwalreceiver_cxt.streamConn != NULL);
const int versionFields = 3;
uint32 remoteSversion;
uint32 localSversion;
char *remotePversion = NULL;
char *localPversion = NULL;
uint32 remoteTerm;
uint32 localTerm;
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
PGresult *res = libpqrcv_PQexec("IDENTIFY_VERSION");
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("could not receive database system version and protocol version from "
"the remote server: %s",
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
}
if (PQnfields(res) != versionFields || PQntuples(res) != 1) {
int ntuples = PQntuples(res);
int nfields = PQnfields(res);
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS), errmsg("invalid response from remote server"),
errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.", ntuples, nfields)));
}
remoteSversion = pg_strtoint32(PQgetvalue(res, 0, 0));
localSversion = PG_VERSION_NUM;
remotePversion = PQgetvalue(res, 0, 1);
localPversion = pstrdup(PG_PROTOCOL_VERSION);
remoteTerm = pg_strtoint32(PQgetvalue(res, 0, 2));
localTerm = Max(g_instance.comm_cxt.localinfo_cxt.term_from_file, g_instance.comm_cxt.localinfo_cxt.term_from_xlog);
ereport(LOG, (errmsg("remote term[%u], local term[%u]", remoteTerm, localTerm)));
if (localPversion == NULL) {
PQclear(res);
if (t_thrd.role != APPLY_WORKER) {
ha_set_rebuild_connerror(VERSION_REBUILD, REPL_INFO_ERROR);
}
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS),
errmsg("could not get the local protocal version, make sure the PG_PROTOCOL_VERSION is defined")));
}
if (!IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && !SS_DISASTER_MAIN_STANDBY_NODE) {
if (walrcv->conn_target != REPCONNTARGET_DUMMYSTANDBY && (localTerm == 0 || localTerm > remoteTerm) &&
!AM_HADR_WAL_RECEIVER) {
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("invalid local term or remote term smaller than local. remote term[%u], local term[%u]",
remoteTerm, localTerm)));
}
}
/*
* If the version of the remote server is not the same as the local's, then set error
* message in WalRcv and rebuild reason in HaShmData
*/
if (remoteSversion != localSversion || strncmp(remotePversion, localPversion, strlen(PG_PROTOCOL_VERSION)) != 0) {
PQclear(res);
if (t_thrd.role != APPLY_WORKER) {
ha_set_rebuild_connerror(VERSION_REBUILD, REPL_INFO_ERROR);
}
if (remoteSversion != localSversion) {
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("database system version is different between the remote and local"),
errdetail("The remote's system version is %u, the local's system version is %u.",
remoteSversion, localSversion)));
} else {
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("the remote protocal version %s is not the same as the local protocal version %s.",
remotePversion, localPversion)));
}
}
PQclear(res);
pfree_ext(localPversion);
return remoteTerm;
}
static void NotifyWalSendForMainStandby(uint32 localTerm, uint32 remoteTerm)
{
for (int i = 0; i < g_instance.attr.attr_storage.max_wal_senders; i++) {
volatile WalSnd *walsnd = &t_thrd.walsender_cxt.WalSndCtl->walsnds[i];
if (walsnd->pid == 0)
continue;
SpinLockAcquire(&walsnd->mutex);
if (IS_MULTI_DISASTER_RECOVER_MODE && localTerm != remoteTerm) {
walsnd->isTermChanged = true;
}
walsnd->sendKeepalive = true;
SpinLockRelease(&walsnd->mutex);
}
}
/*
* Establish the connection to the primary server for XLOG streaming
*/
bool libpqrcv_connect(char *conninfo, XLogRecPtr *startpoint, char *slotname, int channel_identifier)
{
char conninfoRepl[MAXCONNINFO + 75];
PGresult *res = NULL;
char cmd[1024];
char *remoteRecCrc = NULL;
pg_crc32 recCrc = 0;
XLogRecPtr localRec;
pg_crc32 localRecCrc = 0;
uint32 remoteTerm;
uint32 localTerm;
ServerMode remoteMode = UNKNOWN_MODE;
int haveXlog = 0;
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
int count = 0;
int nRet = 0;
errno_t rc = EOK;
XLogRecPtr remoteMaxLsn = InvalidXLogRecPtr;
pg_crc32 remoteMaxLsnCrc = 0;
char *remoteMaxLsnStr = NULL;
char *remoteMaxLsnCrcStr = NULL;
uint32 hi, lo;
char uuidOption[MAXCONNINFO] = {0};
if (u_sess->attr.attr_storage.repl_auth_mode == REPL_AUTH_UUID &&
u_sess->attr.attr_storage.repl_uuid != NULL && strlen(u_sess->attr.attr_storage.repl_uuid) > 0) {
nRet = snprintf_s(uuidOption, sizeof(uuidOption), sizeof(uuidOption) - 1,
"options='-c repl_uuid=%s'", u_sess->attr.attr_storage.repl_uuid);
securec_check_ss(nRet, "\0", "\0");
}
/*
* Connect using deliberately undocumented parameter: replication. The
* database name is ignored by the server in replication mode, but specify
* "replication" for .pgpass lookup.
*
* For normal standbys(i.e. not hadr standby or Dorado standby), UUID is passed as startup option
* if uuid auth mode is required.
*/
if (dummyStandbyMode) {
nRet = snprintf_s(conninfoRepl, sizeof(conninfoRepl), sizeof(conninfoRepl) - 1,
"%s dbname=replication replication=true "
"fallback_application_name=dummystandby "
"connect_timeout=%d %s",
conninfo, u_sess->attr.attr_storage.wal_receiver_connect_timeout, uuidOption);
#ifndef ENABLE_MULTIPLE_NODES
} else if (AM_HADR_WAL_RECEIVER) {
#else
} else if (AM_HADR_WAL_RECEIVER || AM_HADR_CN_WAL_RECEIVER) {
#endif
char passwd[MAXPGPATH] = {'\0'};
char username[MAXPGPATH] = {'\0'};
GetPasswordForHadrStreamingReplication(username, passwd);
nRet = snprintf_s(conninfoRepl, sizeof(conninfoRepl), sizeof(conninfoRepl) - 1,
"%s dbname=postgres replication=%s "
"fallback_application_name=hadr_%s "
"connect_timeout=%d user=%s password=%s",
conninfo, AM_HADR_WAL_RECEIVER ? "hadr_main_standby" : "hadr_standby_cn",
(u_sess->attr.attr_common.application_name &&
strlen(u_sess->attr.attr_common.application_name) > 0)
? u_sess->attr.attr_common.application_name
: "walreceiver",
u_sess->attr.attr_storage.wal_receiver_connect_timeout, username, passwd);
rc = memset_s(passwd, MAXPGPATH, 0, MAXPGPATH);
securec_check(rc, "\0", "\0");
} else if (IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE ||
SS_DORADO_MAIN_STANDBY_NODE) {
nRet = snprintf_s(conninfoRepl, sizeof(conninfoRepl), sizeof(conninfoRepl) - 1,
"%s dbname=postgres replication=standby_cluster "
"fallback_application_name=%s_hass "
"connect_timeout=%d",
conninfo,
(u_sess->attr.attr_common.application_name &&
strlen(u_sess->attr.attr_common.application_name) > 0)
? u_sess->attr.attr_common.application_name
: "walreceiver",
u_sess->attr.attr_storage.wal_receiver_connect_timeout);
} else {
nRet = snprintf_s(conninfoRepl, sizeof(conninfoRepl), sizeof(conninfoRepl) - 1,
"%s dbname=replication replication=true "
"fallback_application_name=%s "
"connect_timeout=%d %s",
conninfo,
(u_sess->attr.attr_common.application_name &&
strlen(u_sess->attr.attr_common.application_name) > 0)
? u_sess->attr.attr_common.application_name
: "walreceiver",
u_sess->attr.attr_storage.wal_receiver_connect_timeout,
uuidOption);
}
securec_check_ss(nRet, "", "");
#ifndef ENABLE_MULTIPLE_NODES
if (AM_HADR_WAL_RECEIVER) {
#else
if (AM_HADR_WAL_RECEIVER || AM_HADR_CN_WAL_RECEIVER) {
#endif
char *tmp = NULL;
char *tok = NULL;
char printConnInfo[MAXCONNINFO] = {0};
char* copyConnInfo = NULL;
copyConnInfo = pstrdup(conninfoRepl);
if (copyConnInfo == NULL) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS), errmsg("could not get walreceiver conncetion info.")));
}
tok = strtok_s(copyConnInfo, " ", &tmp);
if (tok == NULL) {
if (copyConnInfo != NULL) {
rc = memset_s(copyConnInfo, sizeof(copyConnInfo), 0, sizeof(copyConnInfo));
securec_check(rc, "\0", "\0");
pfree_ext(copyConnInfo);
}
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS), errmsg("could not parse walreceiver conncetion info string.")));
}
while (strncmp(tok, "password=", strlen("password=")) != 0) {
nRet = strcat_s(printConnInfo, MAXCONNINFO - 1, tok);
securec_check_ss(nRet, "", "");
nRet = strcat_s(printConnInfo, MAXCONNINFO - 1, " ");
securec_check_ss(nRet, "", "");
tok = strtok_s(NULL, " ", &tmp);
}
ereport(LOG, (errmsg("Connecting to remote server :%s", printConnInfo)));
if (copyConnInfo != NULL) {
rc = memset_s(copyConnInfo, sizeof(copyConnInfo), 0, sizeof(copyConnInfo));
securec_check(rc, "\0", "\0");
pfree_ext(copyConnInfo);
}
} else {
ereport(LOG, (errmsg("Connecting to remote server :%s", conninfoRepl)));
}
retry:
/* 1. try to connect to primary */
t_thrd.libwalreceiver_cxt.streamConn = PQconnectdb(conninfoRepl);
if (PQstatus(t_thrd.libwalreceiver_cxt.streamConn) != CONNECTION_OK) {
/* If startupxlog shut down walreceiver, we need not to retry. */
if (++count < u_sess->attr.attr_storage.wal_receiver_connect_retries && !WalRcvIsShutdown()) {
ereport(
LOG,
(errmsg("retry: %d, walreceiver could not connect to the remote server,the connection info :%s : %s",
count, conninfo, PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
libpqrcv_disconnect();
goto retry;
}
ha_set_rebuild_connerror(CONNECT_REBUILD, CHANNEL_ERROR);
if (AmWalReceiverForDummyStandby()) {
SpinLockAcquire(&walrcv->mutex);
walrcv->dummyStandbyConnectFailed = true;
SpinLockRelease(&walrcv->mutex);
}
ha_add_disconnect_count();
ereport(WARNING, (errcode(ERRCODE_CONNECTION_TIMED_OUT),
errmsg("walreceiver could not connect to the remote server,the connection info :%s : %s",
conninfo, PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
libpqrcv_disconnect();
ereport(ERROR, (errcode(ERRCODE_CONNECTION_TIMED_OUT),
errmsg("walreceiver could not connect and shutting down")));
}
ereport(LOG, (errmsg("Connected to remote server :%s success.", conninfo)));
#ifndef ENABLE_MULTIPLE_NODES
if (AM_HADR_WAL_RECEIVER) {
#else
if (AM_HADR_WAL_RECEIVER || AM_HADR_CN_WAL_RECEIVER) {
#endif
rc = memset_s(conninfoRepl, MAXCONNINFO + 75, 0, MAXCONNINFO + 75);
securec_check(rc, "\0", "\0");
}
/* 2. identify version */
remoteTerm = IdentifyRemoteVersion();
localTerm = Max(g_instance.comm_cxt.localinfo_cxt.term_from_file, g_instance.comm_cxt.localinfo_cxt.term_from_xlog);
/* If connect to primary or standby (for failover), check remote role */
if (!t_thrd.walreceiver_cxt.AmWalReceiverForFailover || t_thrd.walreceiver_cxt.AmWalReceiverForStandby) {
/* Send query and get server mode of the remote server. */
remoteMode = IdentifyRemoteMode();
if (remoteMode == UNKNOWN_MODE) {
return false;
}
}
#ifndef ENABLE_MULTIPLE_NODES
if (g_instance.attr.attr_storage.enable_availablezone &&
t_thrd.xlog_cxt.is_cascade_standby && !t_thrd.postmaster_cxt.HaShmData->is_cross_region) {
IdentifyRemoteAvailableZone();
}
#endif
if (IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE) {
if (walrcv->conn_target != REPCONNTARGET_DUMMYSTANDBY && (localTerm == 0 || localTerm > remoteTerm)) {
PQclear(res);
ha_set_rebuild_connerror(WALSEGMENT_REBUILD, REPL_INFO_ERROR);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("shared storage: invalid local term or remote term smaller than local."
"remote term[%u], local term[%u]", remoteTerm, localTerm)));
return false;
}
}
/*
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
IdentifyRemoteSystem(true);
if (dummyStandbyMode) {
char msgBuf[XLOG_READER_MAX_MSGLENTH] = {0};
/* find the max lsn by xlog file, if i'm dummystandby or standby for failover */
localRec = FindMaxLSN(t_thrd.proc_cxt.DataDir, msgBuf, XLOG_READER_MAX_MSGLENTH, &localRecCrc);
} else {
SpinLockAcquire(&walrcv->mutex);
localRecCrc = walrcv->latestRecordCrc;
localRec = walrcv->latestValidRecord;
SpinLockRelease(&walrcv->mutex);
ereport(LOG,
(errmsg("local request lsn/crc [%X/%X, %u]", (uint32)(localRec >> 32), (uint32)localRec, localRecCrc)));
if (!XRecOffIsValid(localRec)) {
ereport(PANIC,
(errmsg("Invalid xlog offset at %X/%X. Please check xlog files or rebuild the primary/standby "
"relationship.",
(uint32)(localRec >> 32), (uint32)localRec)));
}
}
/* if dummystandby has no xlog, dont check crc */
if (!(dummyStandbyMode && XLogRecPtrIsInvalid(localRec))) {
nRet = snprintf_s(cmd, sizeof(cmd), sizeof(cmd) - 1, "IDENTIFY_CONSISTENCE %X/%X", (uint32)(localRec >> 32),
(uint32)localRec);
securec_check_ss(nRet, "", "");
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("failed to identify consistence at %X/%X: %s", (uint32)(localRec >> 32),
(uint32)localRec, PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
return false;
}
/*
* Indentify consistence is motified when importing cm-ha enhancement code.
* To support greyupgrade, msg with 1 row of 2 and 3 cols is handled to
* go through two different logics. Will remove later.
*/
if ((PQnfields(res) != 3 && PQnfields(res) != 2) || PQntuples(res) != 1) {
int ntuples = PQntuples(res);
int nfields = PQnfields(res);
PQclear(res);
ha_set_rebuild_connerror(WALSEGMENT_REBUILD, REPL_INFO_ERROR);
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS), errmsg("invalid response from primary server"),
errdetail("Could not identify system: Got %d rows and %d fields, expected 1 row and 2 or 3 fields",
ntuples, nfields)));
return false;
}
if (PQnfields(res) == 3) {
/* col1: crc of standby rec. */
remoteRecCrc = PQgetvalue(res, 0, 0);
if (remoteRecCrc && sscanf_s(remoteRecCrc, "%8X", &recCrc) != 1) {
PQclear(res);
ha_set_rebuild_connerror(WALSEGMENT_REBUILD, REPL_INFO_ERROR);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("could not parse remote record's crc, remoteRecCrc=%s recCrc=%u",
(remoteRecCrc[0] != 0) ? remoteRecCrc : "NULL", (uint32)recCrc)));
return false;
}
/* col2: max lsn of dummy standby. */
remoteMaxLsnStr = PQgetvalue(res, 0, 1);
if (sscanf_s(remoteMaxLsnStr, "%X/%X", &hi, &lo) != 2) {
PQclear(res);
ha_set_rebuild_connerror(WALSEGMENT_REBUILD, REPL_INFO_ERROR);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS), errmsg("could not parse remoteMaxLsn")));
return false;
}
remoteMaxLsn = ((uint64)hi << 32) | lo;
/* col3: crc of max lsn of dummy standby. */
remoteMaxLsnCrcStr = PQgetvalue(res, 0, 2);
if (remoteMaxLsnCrcStr && sscanf_s(remoteMaxLsnCrcStr, "%8X", &remoteMaxLsnCrc) != 1) {
PQclear(res);
ha_set_rebuild_connerror(WALSEGMENT_REBUILD, REPL_INFO_ERROR);
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS),
errmsg("could not parse remote max record's crc, remoteMaxLsnCrc=%s maxLsnCrc=%u",
(remoteMaxLsnCrcStr[0] != 0) ? remoteMaxLsnCrcStr : "NULL", (uint32)remoteMaxLsnCrc)));
return false;
}
PQclear(res);
if (!t_thrd.walreceiver_cxt.AmWalReceiverForFailover) {
/* including recCrc == 0, which means local has more xlog */
if (recCrc != localRecCrc) {
/* dummy standby connect to primary */
if (dummyStandbyMode) {
if (recCrc == IGNORE_REC_CRC) {
ereport(LOG, (errmsg("receive ignore reccrc, rm xlog.")));
ProcessWSRmXLog();
} else {
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("dummystandby's local request lsn[%X/%X] 's crc "
"mismatched with remote server"
"crc(local, remote):[%u,%u].",
(uint32)(localRec >> 32), (uint32)localRec, localRecCrc, recCrc)));
}
} else {
/*
* standby connect to primary
* Direct check Primary and Standby, trigger build.
*/
if (t_thrd.postmaster_cxt.HaShmData->is_cross_region &&
t_thrd.postmaster_cxt.HaShmData->is_cascade_standby &&
recCrc == 0 && XLByteLT(remoteMaxLsn, localRec)) {
/* The cascaded standby instance of the disaster recovery cluster
has more logs than the main standby instance, the build is not triggered */
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("cascaded standby's local request lsn[%X/%X] 's crc mismatched with "
"main standby crc(local, remote):[%u,%u], remote max lsn is [%X/%X].",
(uint32)(localRec >> 32), (uint32)localRec, localRecCrc, recCrc,
(uint32)(remoteMaxLsn >> 32), (uint32)remoteMaxLsn)));
return false;
} else if (!SS_STREAM_CLUSTER) {
ha_set_rebuild_connerror(WALSEGMENT_REBUILD, REPL_INFO_ERROR);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("standby's local request lsn[%X/%X] 's crc mismatched with remote server "
"crc(local, remote):[%u,%u].",
(uint32)(localRec >> 32), (uint32)localRec, localRecCrc, recCrc)));
}
}
}
} else if (t_thrd.walreceiver_cxt.AmWalReceiverForStandby) {
/* standby connect to standby */
bool crcvalid = false;
/* local xlog must be more than(or equal to) remote, and last crc must be matched */
if (XLByteLE(remoteMaxLsn, localRec) &&
remoteMaxLsnCrc == GetXlogRecordCrc(remoteMaxLsn, crcvalid, XLogPageRead, 0)) {
ereport(LOG, (errmsg("crc check on remote standby success, local standby "
"will promote to primary")));
SetWalRcvDummyStandbySyncPercent(SYNC_DUMMY_STANDBY_END);
/* also set datareceiver to continue failover */
SetDataRcvDummyStandbySyncPercent(SYNC_DUMMY_STANDBY_END);
haveXlog = false;
} else {
SpinLockAcquire(&walrcv->mutex);
walrcv->conn_errno = REPL_INFO_ERROR;
SpinLockRelease(&walrcv->mutex);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("crc of %X/%X is different across remote and local standby, "
"standby promote failed",
(uint32)(remoteMaxLsn >> 32), (uint32)remoteMaxLsn)));
}
} else {
/* standby connect to dummystandby */
if (recCrc != 0 && recCrc != localRecCrc) {
/* FUTURE CASE::
* When standby failover, its xlog is not the same with secondary
* standby, walreceiver will ereport ERROR, or the standby
* promoting will hang, and if the primary is pending, cm server
* will not arbitrate the primary.
*/
SpinLockAcquire(&walrcv->mutex);
if (AmWalReceiverForDummyStandby()) {
walrcv->dummyStandbyConnectFailed = true;
}
SpinLockRelease(&walrcv->mutex);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("invalid crc on secondary standby, has xlog, standby promote failed, "
"(local, remote) = (%u, %u) on %X/%X",
localRecCrc, recCrc, (uint32)(localRec >> 32), (uint32)localRec)));
} else if (recCrc == 0) {
bool crcvalid = false;
/*
* Standby ------>(lsn is 10)
* DummyStandby --->(lsn is 5)
* recCrc of lsn == 10 of dummy is 0
* We should check crc of lsn is 5 of standby
*/
if (XLByteEQ(remoteMaxLsn, InvalidXLogRecPtr) ||
(XLByteLT(remoteMaxLsn, localRec) &&
remoteMaxLsnCrc == GetXlogRecordCrc(remoteMaxLsn, crcvalid, XLogPageRead, 0))) {
ereport(LOG, (errmsg("invalid crc on secondary standby, no xlog, standby "
"will promote primary")));
SetWalRcvDummyStandbySyncPercent(SYNC_DUMMY_STANDBY_END);
haveXlog = false;
} else {
SpinLockAcquire(&walrcv->mutex);
if (AmWalReceiverForDummyStandby()) {
walrcv->dummyStandbyConnectFailed = true;
}
SpinLockRelease(&walrcv->mutex);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("crc of %X/%X is different across dummy and standby, "
"standby promote failed",
(uint32)(remoteMaxLsn >> 32), (uint32)remoteMaxLsn)));
}
}
}
} else {
char *primary_reccrc = PQgetvalue(res, 0, 0);
bool havexlog = pg_strtoint32(PQgetvalue(res, 0, 1));
if (primary_reccrc && sscanf_s(primary_reccrc, "%8X", &recCrc) != 1) {
PQclear(res);
ha_set_rebuild_connerror(WALSEGMENT_REBUILD, REPL_INFO_ERROR);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("could not parse primary record's crc,primary_reccrc=%s reccrc=%u",
(primary_reccrc[0] != 0) ? primary_reccrc : "NULL", (uint32)recCrc)));
return false;
}
PQclear(res);
if (0 == recCrc && t_thrd.walreceiver_cxt.AmWalReceiverForFailover) {
if (0 == havexlog) {
ereport(LOG, (errmsg("invalid crc on secondary standby, no xlog, standby "
"will promoting primary")));
SetWalRcvDummyStandbySyncPercent(SYNC_DUMMY_STANDBY_END);
return true;
} else {
/* FUTURE CASE::
* When standby failover, its xlog is not the same with secondary
* standby, walreceiver will ereport ERROR, or the standby
* promoting will hang, and if the primary is pending, cm server
* will not arbitrate the primary.
*/
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS), errmsg("invalid crc on secondary standby, has xlog, "
"standby promote failed")));
}
} else if (recCrc != walrcv->latestRecordCrc) {
ha_set_rebuild_connerror(WALSEGMENT_REBUILD, REPL_INFO_ERROR);
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS),
errmsg("standby_rec=%x/%x "
"standby latest record's crc %u and primary corresponding record's crc %u not matched",
(uint32)(walrcv->latestValidRecord >> 32), (uint32)walrcv->latestValidRecord,
walrcv->latestRecordCrc, recCrc)));
}
}
}
if (t_thrd.walreceiver_cxt.AmWalReceiverForFailover) {
if (t_thrd.walreceiver_cxt.AmWalReceiverForStandby) {
/* failover to standby */
if (!haveXlog) {
return true;
}
} else {
/* failover to dummy */
ha_set_port_to_remote(t_thrd.libwalreceiver_cxt.streamConn, channel_identifier);
if (recCrc == 0 && !haveXlog) {
return true;
}
}
} else {
ha_set_port_to_remote(t_thrd.libwalreceiver_cxt.streamConn, channel_identifier);
}
/* Create replication slot if need */
if (AM_HADR_WAL_RECEIVER && slotname != NULL) {
rc = strcat_s(slotname, NAMEDATALEN - 1, "_hadr");
securec_check(rc, "", "");
} else if (IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE && slotname != NULL) {
rc = strcat_s(slotname, NAMEDATALEN - 1, "_hass");
securec_check(rc, "", "");
}
if (!t_thrd.walreceiver_cxt.AmWalReceiverForFailover && slotname != NULL) {
CreateRemoteReplicationSlot(*startpoint, slotname, false, NULL);
}
/* Start streaming from the point requested by startup process */
LibpqrcvConnectParam options;
rc = memset_s(&options, sizeof(LibpqrcvConnectParam), 0, sizeof(LibpqrcvConnectParam));
securec_check(rc, "", "");
options.slotname = slotname;
options.startpoint = *startpoint;
options.logical = false;
StartRemoteStreaming(&options);
ereport(LOG,
(errmsg("streaming replication successfully connected to primary, the connection is %s, start from %X/%X ",
conninfo, (uint32)(*startpoint >> 32), (uint32)(*startpoint))));
if (t_thrd.postmaster_cxt.HaShmData->is_hadr_main_standby) {
NotifyWalSendForMainStandby(localTerm, remoteTerm);
}
if (!t_thrd.walreceiver_cxt.AmWalReceiverForFailover) {
SpinLockAcquire(&walrcv->mutex);
walrcv->peer_role = remoteMode;
SpinLockRelease(&walrcv->mutex);
}
volatile HaShmemData *hashmdata = t_thrd.postmaster_cxt.HaShmData;
SpinLockAcquire(&hashmdata->mutex);
hashmdata->disconnect_count[hashmdata->current_repl] = 0;
hashmdata->prev_repl = hashmdata->current_repl;
SpinLockRelease(&hashmdata->mutex);
/*
* If the streaming replication successfully connected to primary,
* then clean the rebuild reason in HaShmData.
*/
ha_set_rebuild_connerror(NONE_REBUILD, NONE_ERROR);
/* Save the current connect channel info in WalRcv */
ha_set_conn_channel();
return true;
}
/*
* Wait until we can read WAL stream, or timeout.
*
* Returns true if data has become available for reading, false if timed out
* or interrupted by signal.
*
* This is based on pqSocketCheck.
*/
static bool libpq_select(int timeout_ms)
{
int ret;
Assert(t_thrd.libwalreceiver_cxt.streamConn != NULL);
if (PQsocket(t_thrd.libwalreceiver_cxt.streamConn) < 0) {
ereport(ERROR, (errcode_for_socket_access(), errmsg("socket not open")));
}
/* We use poll(2) if available, otherwise select(2) */
{
#ifdef HAVE_POLL
struct pollfd input_fd;
input_fd.fd = PQsocket(t_thrd.libwalreceiver_cxt.streamConn);
input_fd.events = POLLIN | POLLERR;
input_fd.revents = 0;
ret = poll(&input_fd, 1, timeout_ms);
#else /* !HAVE_POLL */
fd_set input_mask;
struct timeval timeout = { 0, 0 };
struct timeval *ptr_timeout = NULL;
FD_ZERO(&input_mask);
FD_SET(PQsocket(t_thrd.libwalreceiver_cxt.streamConn), &input_mask);
if (timeout_ms < 0) {
ptr_timeout = NULL;
} else {
timeout.tv_sec = timeout_ms / 1000;
timeout.tv_usec = (timeout_ms % 1000) * 1000;
ptr_timeout = &timeout;
}
ret = select(PQsocket(t_thrd.libwalreceiver_cxt.streamConn) + 1, &input_mask, NULL, NULL, ptr_timeout);
#endif /* HAVE_POLL */
}
if (ret == 0 || (ret < 0 && errno == EINTR)) {
return false;
}
if (ret < 0) {
ereport(ERROR, (errcode_for_socket_access(), errmsg("select() failed: %m")));
}
return true;
}
/*
* Send a query and wait for the results by using the asynchronous libpq
* functions and the backend version of select().
*
* We must not use the regular blocking libpq functions like PQexec()
* since they are uninterruptible by signals on some platforms, such as
* Windows.
*
* We must also not use vanilla select() here since it cannot handle the
* signal emulation layer on Windows.
*
* The function is modeled on PQexec() in libpq, but only implements
* those parts that are in use in the walreceiver.
*
* Queries are always executed on the connection in streamConn.
*/
static PGresult *libpqrcv_PQexec(const char *query)
{
PGresult *result = NULL;
PGresult *lastResult = NULL;
/*
* PQexec() silently discards any prior query results on the connection.
* This is not required for walreceiver since it's expected that walsender
* won't generate any such junk results.
*/
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
*/
if (!PQsendQuery(t_thrd.libwalreceiver_cxt.streamConn, query)) {
return NULL;
}
for (;;) {
/*
* Receive data until PQgetResult is ready to get the result without
* blocking.
*/
while (PQisBusy(t_thrd.libwalreceiver_cxt.streamConn)) {
CHECK_FOR_INTERRUPTS();
/*
* We don't need to break down the sleep into smaller increments,
* and check for interrupts after each nap, since we can just
* elog(FATAL) within SIGTERM signal handler if the signal arrives
* in the middle of establishment of replication connection.
*/
if (!libpq_select(-1)) {
pqClearAsyncResult(t_thrd.libwalreceiver_cxt.streamConn);
continue; /* interrupted */
}
if (PQconsumeInput(t_thrd.libwalreceiver_cxt.streamConn) == 0) {
pqClearAsyncResult(t_thrd.libwalreceiver_cxt.streamConn);
PQclear(lastResult);
return NULL; /* trouble */
}
}
/*
* Emulate the PQexec()'s behavior of returning the last result when
* there are many. Since walsender will never generate multiple
* results, we skip the concatenation of error messages.
*/
result = PQgetResult(t_thrd.libwalreceiver_cxt.streamConn);
if (result == NULL) {
break; /* query is complete */
}
PQclear(lastResult);
lastResult = result;
if (PQresultStatus(lastResult) == PGRES_COPY_IN || PQresultStatus(lastResult) == PGRES_COPY_OUT ||
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
PQstatus(t_thrd.libwalreceiver_cxt.streamConn) == CONNECTION_BAD) {
break;
}
}
return lastResult;
}
void libpqrcv_check_conninfo(const char *conninfo)
{
PQconninfoOption *opts = PQconninfoParse(conninfo, NULL);
if (opts == NULL) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid connection string syntax")));
}
PQconninfoFree(opts);
}
/*
* Disconnect connection to primary, if any.
*/
void libpqrcv_disconnect(void)
{
PQfinish(t_thrd.libwalreceiver_cxt.streamConn);
t_thrd.libwalreceiver_cxt.streamConn = NULL;
if (t_thrd.libwalreceiver_cxt.recvBuf != NULL) {
PQfreemem(t_thrd.libwalreceiver_cxt.recvBuf);
t_thrd.libwalreceiver_cxt.recvBuf = NULL;
}
}
/*
* Receive a message available from XLOG stream, blocking for
* maximum of 'timeout' ms.
*
* Returns:
*
* True if data was received. *type, *buffer and *len are set to
* the type of the received data, buffer holding it, and length,
* respectively.
*
* False if no data was available within timeout, or wait was interrupted
* by signal.
*
* The buffer returned is only valid until the next call of this function or
* libpq_connect/disconnect.
*
* ereports on error.
*/
bool libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
{
int rawlen;
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
if (t_thrd.libwalreceiver_cxt.recvBuf != NULL) {
PQfreemem(t_thrd.libwalreceiver_cxt.recvBuf);
}
t_thrd.libwalreceiver_cxt.recvBuf = NULL;
/* Try to receive a CopyData message */
rawlen = PQgetCopyData(t_thrd.libwalreceiver_cxt.streamConn, &t_thrd.libwalreceiver_cxt.recvBuf, 1);
if (rawlen == 0) {
/*
* No data available yet. If the caller requested to block, wait for
* more data to arrive.
*/
if (timeout > 0) {
if (!libpq_select(timeout)) {
return false;
}
}
if (PQconsumeInput(t_thrd.libwalreceiver_cxt.streamConn) == 0) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS), errmsg("could not receive data from WAL streaming: %s",
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
}
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(t_thrd.libwalreceiver_cxt.streamConn, &t_thrd.libwalreceiver_cxt.recvBuf, 1);
if (rawlen == 0) {
return false;
}
}
if (rawlen == -1) { /* end-of-streaming or error */
PGresult *res = NULL;
const char *sqlstate = NULL;
int retcode = 0;
res = PQgetResult(t_thrd.libwalreceiver_cxt.streamConn);
if (PQresultStatus(res) == PGRES_COMMAND_OK) {
PQclear(res);
/* Verify that there are no more results */
res = PQgetResult(t_thrd.libwalreceiver_cxt.streamConn);
if (res != NULL) {
PQclear(res);
ereport(ERROR,
(errmsg("unexpected result after CommandComplete: %s",
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
}
*len = -1;
return false;
}
sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
if (sqlstate && strlen(sqlstate) == 5) {
retcode = MAKE_SQLSTATE(sqlstate[0], sqlstate[1], sqlstate[2], sqlstate[3], sqlstate[4]);
}
if (retcode == ERRCODE_UNDEFINED_FILE) {
/* in SS dorado standby cluster, the walreceiver not need to receive wal */
if (t_thrd.role != APPLY_WORKER && !SS_DORADO_STANDBY_CLUSTER) {
ha_set_rebuild_connerror(WALSEGMENT_REBUILD, REPL_INFO_ERROR);
}
SpinLockAcquire(&walrcv->mutex);
walrcv->ntries++;
SpinLockRelease(&walrcv->mutex);
}
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS), errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
}
if (rawlen < -1) {
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS), errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
}
/* Return received messages to caller */
if (type == NULL) {
*buffer = t_thrd.libwalreceiver_cxt.recvBuf;
*len = rawlen;
return true;
}
*type = *((unsigned char *)t_thrd.libwalreceiver_cxt.recvBuf);
if ((IS_SHARED_STORAGE_MODE || SS_DORADO_CLUSTER) && !AM_HADR_WAL_RECEIVER && *type == 'w') {
*len = 0;
return false;
}
*buffer = t_thrd.libwalreceiver_cxt.recvBuf + sizeof(*type);
*len = rawlen - sizeof(*type);
return true;
}
/*
* Send a message to XLOG stream.
*
* ereports on error.
*/
void libpqrcv_send(const char *buffer, int nbytes)
{
if (PQputCopyData(t_thrd.libwalreceiver_cxt.streamConn, buffer, nbytes) <= 0 ||
PQflush(t_thrd.libwalreceiver_cxt.streamConn))
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS), errmsg("could not send data to WAL stream: %s",
PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn))));
}
/*
* Convert tuple query result to tuplestore.
*/
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
{
int tupn;
int coln;
int nfields = PQnfields(pgres);
HeapTuple tuple;
AttInMetadata *attinmeta;
MemoryContext rowcontext;
MemoryContext oldcontext;
/* Make sure we got expected number of fields. */
if (nfields != nRetTypes)
ereport(ERROR,
(errmsg("invalid query responser"), errdetail("Expected %d fields, got %d fields.", nRetTypes, nfields)));
walres->tuplestore = tuplestore_begin_heap(true, false, u_sess->attr.attr_memory.work_mem);
/* Create tuple descriptor corresponding to expected result. */
walres->tupledesc = CreateTemplateTupleDesc(nRetTypes, false);
for (coln = 0; coln < nRetTypes; coln++)
TupleDescInitEntry(walres->tupledesc, (AttrNumber)coln + 1, PQfname(pgres, coln), retTypes[coln], -1, 0);
attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
/* No point in doing anything here if there were no tuples returned. */
if (PQntuples(pgres) == 0)
return;
/* Create temporary context for local allocations. */
rowcontext = AllocSetContextCreate(CurrentMemoryContext, "libpqrcv query result context", ALLOCSET_DEFAULT_SIZES);
/* Process returned rows. */
for (tupn = 0; tupn < PQntuples(pgres); tupn++) {
char *cstrs[MaxTupleAttributeNumber];
CHECK_FOR_INTERRUPTS();
/* Do the allocations in temporary context. */
oldcontext = MemoryContextSwitchTo(rowcontext);
/*
* Fill cstrs with null-terminated strings of column values.
*/
for (coln = 0; coln < nfields; coln++) {
if (PQgetisnull(pgres, tupn, coln))
cstrs[coln] = NULL;
else
cstrs[coln] = PQgetvalue(pgres, tupn, coln);
}
/* Convert row to a tuple, and add it to the tuplestore */
tuple = BuildTupleFromCStrings(attinmeta, cstrs);
tuplestore_puttuple(walres->tuplestore, tuple);
/* Clean up */
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(rowcontext);
}
MemoryContextDelete(rowcontext);
}
/*
* Public interface for sending generic queries (and commands).
*
* This can only be called from process connected to database.
*/
WalRcvExecResult* libpqrcv_exec(const char *query, const int nRetTypes, const Oid *retTypes)
{
PGresult *pgres = NULL;
WalRcvExecResult *walres = (WalRcvExecResult *)palloc0(sizeof(WalRcvExecResult));
if (u_sess->proc_cxt.MyDatabaseId == InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("the query interface requires a database connection")));
pgres = libpqrcv_PQexec(query);
switch (PQresultStatus(pgres)) {
case PGRES_SINGLE_TUPLE:
case PGRES_TUPLES_OK:
walres->status = WALRCV_OK_TUPLES;
libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
break;
case PGRES_COPY_IN:
walres->status = WALRCV_OK_COPY_IN;
break;
case PGRES_COPY_OUT:
walres->status = WALRCV_OK_COPY_OUT;
break;
case PGRES_COPY_BOTH:
walres->status = WALRCV_OK_COPY_BOTH;
break;
case PGRES_COMMAND_OK:
walres->status = WALRCV_OK_COMMAND;
break;
/* Empty query is considered error. */
case PGRES_EMPTY_QUERY:
walres->status = WALRCV_ERROR;
walres->err = _("empty query");
break;
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
case PGRES_BAD_RESPONSE:
walres->status = WALRCV_ERROR;
walres->err = pstrdup(PQerrorMessage(t_thrd.libwalreceiver_cxt.streamConn));
walres->sqlstate = MAKE_SQLSTATE(t_thrd.libwalreceiver_cxt.streamConn->last_sqlstate[0],
t_thrd.libwalreceiver_cxt.streamConn->last_sqlstate[1],
t_thrd.libwalreceiver_cxt.streamConn->last_sqlstate[2],
t_thrd.libwalreceiver_cxt.streamConn->last_sqlstate[3],
t_thrd.libwalreceiver_cxt.streamConn->last_sqlstate[4]);
break;
}
PQclear(pgres);
return walres;
}
void HaSetRebuildRepInfoError(HaRebuildReason reason)
{
if (NONE_REBUILD == reason) {
ha_set_rebuild_connerror(NONE_REBUILD, NONE_ERROR);
} else if (DCF_LOG_LOSS_REBUILD == reason) {
ha_set_rebuild_connerror(DCF_LOG_LOSS_REBUILD, DCF_LOG_ERROR);
} else {
/* Assert the left is REPL_INFO_ERROR */
ha_set_rebuild_connerror(reason, REPL_INFO_ERROR);
}
}
void SetObsRebuildReason(HaRebuildReason reason)
{
ha_set_rebuild_connerror(reason, REPL_INFO_ERROR);
}
/*
* Get the current channel info from streamConn, then save it in WalRcv.
*/
static void ha_set_conn_channel()
{
struct sockaddr *laddr = (struct sockaddr *)PQLocalSockaddr(t_thrd.libwalreceiver_cxt.streamConn);
struct sockaddr *raddr = (struct sockaddr *)PQRemoteSockaddr(t_thrd.libwalreceiver_cxt.streamConn);
char local_ip[IP_LEN] = {0};
char remote_ip[IP_LEN] = {0};
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
errno_t rc = 0;
char *result = NULL;
if (laddr == NULL || raddr == NULL) {
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("sockaddr is NULL, because there is no connection to primary")));
return;
}
if (laddr->sa_family == AF_INET6) {
result = inet_net_ntop(AF_INET6, &((struct sockaddr_in6 *)laddr)->sin6_addr, 128, local_ip, IP_LEN);
if (result == NULL) {
ereport(WARNING, (errmsg("inet_net_ntop failed, error: %d", EAFNOSUPPORT)));
}
} else if (laddr->sa_family == AF_INET) {
result = inet_net_ntop(AF_INET, &((struct sockaddr_in *)laddr)->sin_addr, 32, local_ip, IP_LEN);
if (result == NULL) {
ereport(WARNING, (errmsg("inet_net_ntop failed, error: %d", EAFNOSUPPORT)));
}
}
if (raddr->sa_family == AF_INET6) {
result = inet_net_ntop(AF_INET6, &((struct sockaddr_in6 *)raddr)->sin6_addr, 128, remote_ip, IP_LEN);
if (result == NULL) {
ereport(WARNING, (errmsg("inet_net_ntop failed, error: %d", EAFNOSUPPORT)));
}
} else if (raddr->sa_family == AF_INET) {
result = inet_net_ntop(AF_INET, &((struct sockaddr_in *)raddr)->sin_addr, 32, remote_ip, IP_LEN);
if (result == NULL) {
ereport(WARNING, (errmsg("inet_net_ntop failed, error: %d", EAFNOSUPPORT)));
}
}
SpinLockAcquire(&walrcv->mutex);
rc = strncpy_s((char *)walrcv->conn_channel.localhost, sizeof(walrcv->conn_channel.localhost), local_ip,
IP_LEN - 1);
securec_check(rc, "", "");
walrcv->conn_channel.localhost[IP_LEN - 1] = '\0';
if (laddr->sa_family == AF_INET6) {
walrcv->conn_channel.localport = ntohs(((struct sockaddr_in6 *)laddr)->sin6_port);
} else if (laddr->sa_family == AF_INET) {
walrcv->conn_channel.localport = ntohs(((struct sockaddr_in *)laddr)->sin_port);
}
rc = strncpy_s((char *)walrcv->conn_channel.remotehost, sizeof(walrcv->conn_channel.remotehost), remote_ip,
IP_LEN - 1);
securec_check(rc, "", "");
walrcv->conn_channel.remotehost[IP_LEN - 1] = '\0';
if (raddr->sa_family == AF_INET6) {
walrcv->conn_channel.remoteport = ntohs(((struct sockaddr_in6 *)raddr)->sin6_port);
} else if (raddr->sa_family == AF_INET) {
walrcv->conn_channel.remoteport = ntohs(((struct sockaddr_in *)raddr)->sin_port);
}
SpinLockRelease(&walrcv->mutex);
}
/* Add disconnect_count of the current repl */
static void ha_add_disconnect_count()
{
volatile HaShmemData *hashmdata = t_thrd.postmaster_cxt.HaShmData;
SpinLockAcquire(&hashmdata->mutex);
hashmdata->disconnect_count[hashmdata->current_repl] += 1;
SpinLockRelease(&hashmdata->mutex);
}
static void ha_set_port_to_remote(PGconn *dummy_conn, int ha_port)
{
int nRet = 0;
char cmd[64];
PGresult *res = NULL;
if (ha_port == 0 || dummy_conn == NULL) {
ereport(WARNING, (errcode(ERRCODE_INVALID_STATUS), errmsg("could not set channel identifier, "
"local port or connection is invalid")));
return;
}
/*
* using localport for channel identifier,
* make primary can using ip and port to find out the channel
*/
nRet = snprintf_s(cmd, sizeof(cmd), sizeof(cmd) - 1, "IDENTIFY_CHANNEL %d", ha_port);
securec_check_ss(nRet, "", "");
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
PQclear(res);
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS), errmsg("could not set channel identifier, localport %d : %s",
ha_port, PQerrorMessage(dummy_conn))));
}
PQclear(res);
return;
}
/*
* Given a List of strings, return it as single comma separated
* string, quoting identifiers as needed.
*
* This is essentially the reverse of SplitIdentifierString.
*
* The caller should free the result.
*/
static char *stringlist_to_identifierstr(PGconn *conn, List *strings)
{
ListCell *lc;
StringInfoData res;
bool first = true;
initStringInfo(&res);
foreach (lc, strings) {
char *val = strVal(lfirst(lc));
char *val_escaped = NULL;
if (first) {
first = false;
} else {
appendStringInfoChar(&res, ',');
}
val_escaped = PQescapeIdentifier(conn, val, strlen(val));
if (!val_escaped) {
free(res.data);
return NULL;
}
appendStringInfoString(&res, val_escaped);
PQfreemem(val_escaped);
}
return res.data;
}