support pagerepair during primary startup

This commit is contained in:
wuyuechuan
2023-08-14 10:33:53 +08:00
parent 583782210c
commit 96942b2ca0
3 changed files with 112 additions and 12 deletions

View File

@ -238,6 +238,53 @@ int GetRemoteConnInfo(char* remoteAddress, char* remoteReadConnInfo, int len)
return errCode;
}
/**
* get connection
* @param remoteAddress ip@port
* @return connection if success or else nullptr
*/
static PGconn* RemoteGetConnection(char* remoteAddress)
{
char remoteReadConnInfo[MAXPGPATH];
int errCode = GetRemoteConnInfo(remoteAddress, remoteReadConnInfo, MAXPGPATH);
if (errCode != REMOTE_READ_OK) {
return nullptr;
}
PGconn* conGet = RemoteReadGetConn(remoteReadConnInfo);
if (conGet == nullptr) {
errCode = REMOTE_READ_RPC_ERROR;
return nullptr;
}
/* need to close by caller */
return conGet;
}
uint64 RemoteGetXlogReplayPtr(char* remoteAddress)
{
PGconn* conGet = RemoteGetConnection(remoteAddress);
if (conGet == nullptr) {
return InvalidXLogRecPtr;
}
PGresult* res = PQexec(conGet, "SELECT lsn::varchar from pg_last_xlog_replay_location()");
if (PQresultStatus(res) != PGRES_TUPLES_OK || PQgetisnull(res, 0, 0)) {
PQclear(res);
res = nullptr;
PQfinish(conGet);
conGet = nullptr;
return InvalidXLogRecPtr;
}
uint32 hi = 0;
uint32 lo = 0;
/* get remote lsn location */
if (sscanf_s(PQgetvalue(res, 0, 0), "%X/%X", &hi, &lo) != 2) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse log location \"%s\"", PQgetvalue(res, 0, 0))));
}
PQclear(res);
PQfinish(conGet);
return (((uint64)hi) << 32) | lo;
}
/*
* @Description: remote read page

View File

@ -30,6 +30,7 @@
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/remote_read.h"
#include "service/remote_read_client.h"
#include <arpa/inet.h>
/*
@ -102,6 +103,62 @@ void GetPrimaryServiceAddress(char *address, size_t address_len)
SpinLockRelease(&walrcv->mutex);
}
static void FormatAddressByReplConn(replconninfo* replconninfo, char* remoteAddress, int addressLen)
{
int rc = snprintf_s(remoteAddress, addressLen, (addressLen - 1), "%s@%d",
replconninfo->remotehost,
replconninfo->remoteport);
securec_check_ss(rc, "", "");
}
/**
* in startup, walsnder is not ready. we need to get remote address from replConnArray
* @param firstAddress first address
* @param secondAddress second address
* @param addressLen address length
*/
static void GetRemoteReadAddressFromReplconn(char* firstAddress, char* secondAddress, size_t addressLen)
{
XLogRecPtr fastest_replay = InvalidXLogRecPtr;
XLogRecPtr second_fastest_replay = InvalidXLogRecPtr;
int fastest = 0;
int second_fastest = 0;
for (int i = 0; i < MAX_REPLNODE_NUM; i++) {
if (t_thrd.postmaster_cxt.ReplConnArray[i]) {
char remoteAddress[MAXPGPATH];
FormatAddressByReplConn(t_thrd.postmaster_cxt.ReplConnArray[i], remoteAddress, MAXPGPATH);
XLogRecPtr insertXLogRecPtr = RemoteGetXlogReplayPtr(remoteAddress);
if (XLByteLT(second_fastest_replay, insertXLogRecPtr)) {
if (XLByteLT(fastest_replay, insertXLogRecPtr)) {
/* walsnd_replay is larger than fastest_replay */
second_fastest = fastest;
second_fastest_replay = fastest_replay;
fastest = i;
fastest_replay = insertXLogRecPtr;
} else {
/* walsnd_replay is in the range (second_fastest_replay, fastest_replay] */
second_fastest = i;
second_fastest_replay = insertXLogRecPtr;
}
}
}
}
/* find fastest replay standby */
if (!XLogRecPtrIsInvalid(fastest_replay)) {
FormatAddressByReplConn(t_thrd.postmaster_cxt.ReplConnArray[fastest], firstAddress, MAXPGPATH);
}
/* find second fastest replay standby */
if (!XLogRecPtrIsInvalid(second_fastest_replay)) {
FormatAddressByReplConn(t_thrd.postmaster_cxt.ReplConnArray[second_fastest], secondAddress, MAXPGPATH);
}
}
/*
* @Description: get remote address
* @IN/OUT first_address: first address
@ -134,18 +191,13 @@ void GetRemoteReadAddress(char* firstAddress, char* secondAddress, size_t addres
}
} else if (IS_DN_MULTI_STANDYS_MODE()) {
if (serverMode == PRIMARY_MODE) {
GetFastestReplayStandByServiceAddress(firstAddress, secondAddress, addressLen);
if (firstAddress[0] != '\0') {
GetIPAndPort(firstAddress, ip, port, MAX_IPADDR_LEN);
rc = snprintf_s(firstAddress, addressLen, (addressLen - 1), "%s@%s", ip, port);
securec_check_ss(rc, "", "");
}
if (secondAddress[0] != '\0') {
GetIPAndPort(secondAddress, ip, port, MAX_IPADDR_LEN);
rc = snprintf_s(secondAddress, addressLen, (addressLen - 1), "%s@%s", ip, port);
securec_check_ss(rc, "", "");
}
/* address format: ip@port */
if (RecoveryInProgress()) {
/* during recovery, walsnder is not valid so we cant get standby info from walsnder */
GetRemoteReadAddressFromReplconn(firstAddress, secondAddress, addressLen);
} else {
GetFastestReplayStandByServiceAddress(firstAddress, secondAddress, addressLen);
}
} else if (serverMode == STANDBY_MODE) {
GetPrimaryServiceAddress(firstAddress, addressLen);
if (firstAddress[0] != '\0') {

View File

@ -43,6 +43,7 @@ extern int RemoteGetPage(char* remote_address, RepairBlockKey *key, uint32 block
extern int RemoteGetFile(char* remoteAddress, RemoteReadFileKey *key, uint64 lsn, uint32 size, char* pageData,
XLogRecPtr *remote_lsn, uint32 *remote_size, int timeout);
extern int RemoteGetFileSize(char* remoteAddress, RemoteReadFileKey *key, uint64 lsn, int64 *size, int timeout);
extern uint64 RemoteGetXlogReplayPtr(char* remoteAddress);
#endif /* REMOTE_READ_CLIENT_H */