From 96942b2ca07f3de075a256e73c79aaa87bec1783 Mon Sep 17 00:00:00 2001 From: wuyuechuan Date: Mon, 14 Aug 2023 10:33:53 +0800 Subject: [PATCH] support pagerepair during primary startup --- .../cbb/grpc/remote_read_client.cpp | 47 ++++++++++++ .../storage/remote/remote_read.cpp | 76 ++++++++++++++++--- src/include/service/remote_read_client.h | 1 + 3 files changed, 112 insertions(+), 12 deletions(-) diff --git a/src/gausskernel/cbb/grpc/remote_read_client.cpp b/src/gausskernel/cbb/grpc/remote_read_client.cpp index a07b4cae7..0459d3129 100755 --- a/src/gausskernel/cbb/grpc/remote_read_client.cpp +++ b/src/gausskernel/cbb/grpc/remote_read_client.cpp @@ -238,6 +238,53 @@ int GetRemoteConnInfo(char* remoteAddress, char* remoteReadConnInfo, int len) return errCode; } +/** + * get connection + * @param remoteAddress ip@port + * @return connection if success or else nullptr + */ +static PGconn* RemoteGetConnection(char* remoteAddress) +{ + char remoteReadConnInfo[MAXPGPATH]; + int errCode = GetRemoteConnInfo(remoteAddress, remoteReadConnInfo, MAXPGPATH); + if (errCode != REMOTE_READ_OK) { + return nullptr; + } + PGconn* conGet = RemoteReadGetConn(remoteReadConnInfo); + if (conGet == nullptr) { + errCode = REMOTE_READ_RPC_ERROR; + return nullptr; + } + /* need to close by caller */ + return conGet; +} + +uint64 RemoteGetXlogReplayPtr(char* remoteAddress) +{ + PGconn* conGet = RemoteGetConnection(remoteAddress); + if (conGet == nullptr) { + return InvalidXLogRecPtr; + } + PGresult* res = PQexec(conGet, "SELECT lsn::varchar from pg_last_xlog_replay_location()"); + if (PQresultStatus(res) != PGRES_TUPLES_OK || PQgetisnull(res, 0, 0)) { + PQclear(res); + res = nullptr; + PQfinish(conGet); + conGet = nullptr; + return InvalidXLogRecPtr; + } + uint32 hi = 0; + uint32 lo = 0; + /* get remote lsn location */ + if (sscanf_s(PQgetvalue(res, 0, 0), "%X/%X", &hi, &lo) != 2) { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse log location \"%s\"", PQgetvalue(res, 0, 0)))); + } + PQclear(res); + PQfinish(conGet); + return (((uint64)hi) << 32) | lo; +} + /* * @Description: remote read page diff --git a/src/gausskernel/storage/remote/remote_read.cpp b/src/gausskernel/storage/remote/remote_read.cpp index 505e159a7..1fd6dd115 100755 --- a/src/gausskernel/storage/remote/remote_read.cpp +++ b/src/gausskernel/storage/remote/remote_read.cpp @@ -30,6 +30,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/remote_read.h" +#include "service/remote_read_client.h" #include /* @@ -102,6 +103,62 @@ void GetPrimaryServiceAddress(char *address, size_t address_len) SpinLockRelease(&walrcv->mutex); } + +static void FormatAddressByReplConn(replconninfo* replconninfo, char* remoteAddress, int addressLen) +{ + int rc = snprintf_s(remoteAddress, addressLen, (addressLen - 1), "%s@%d", + replconninfo->remotehost, + replconninfo->remoteport); + securec_check_ss(rc, "", ""); +} + +/** + * in startup, walsnder is not ready. we need to get remote address from replConnArray + * @param firstAddress first address + * @param secondAddress second address + * @param addressLen address length + */ +static void GetRemoteReadAddressFromReplconn(char* firstAddress, char* secondAddress, size_t addressLen) +{ + XLogRecPtr fastest_replay = InvalidXLogRecPtr; + XLogRecPtr second_fastest_replay = InvalidXLogRecPtr; + int fastest = 0; + int second_fastest = 0; + for (int i = 0; i < MAX_REPLNODE_NUM; i++) { + if (t_thrd.postmaster_cxt.ReplConnArray[i]) { + char remoteAddress[MAXPGPATH]; + FormatAddressByReplConn(t_thrd.postmaster_cxt.ReplConnArray[i], remoteAddress, MAXPGPATH); + XLogRecPtr insertXLogRecPtr = RemoteGetXlogReplayPtr(remoteAddress); + if (XLByteLT(second_fastest_replay, insertXLogRecPtr)) { + if (XLByteLT(fastest_replay, insertXLogRecPtr)) { + /* walsnd_replay is larger than fastest_replay */ + second_fastest = fastest; + second_fastest_replay = fastest_replay; + + fastest = i; + fastest_replay = insertXLogRecPtr; + } else { + /* walsnd_replay is in the range (second_fastest_replay, fastest_replay] */ + second_fastest = i; + second_fastest_replay = insertXLogRecPtr; + } + } + } + } + + /* find fastest replay standby */ + if (!XLogRecPtrIsInvalid(fastest_replay)) { + FormatAddressByReplConn(t_thrd.postmaster_cxt.ReplConnArray[fastest], firstAddress, MAXPGPATH); + + } + + /* find second fastest replay standby */ + if (!XLogRecPtrIsInvalid(second_fastest_replay)) { + FormatAddressByReplConn(t_thrd.postmaster_cxt.ReplConnArray[second_fastest], secondAddress, MAXPGPATH); + } + +} + /* * @Description: get remote address * @IN/OUT first_address: first address @@ -134,18 +191,13 @@ void GetRemoteReadAddress(char* firstAddress, char* secondAddress, size_t addres } } else if (IS_DN_MULTI_STANDYS_MODE()) { if (serverMode == PRIMARY_MODE) { - GetFastestReplayStandByServiceAddress(firstAddress, secondAddress, addressLen); - if (firstAddress[0] != '\0') { - GetIPAndPort(firstAddress, ip, port, MAX_IPADDR_LEN); - rc = snprintf_s(firstAddress, addressLen, (addressLen - 1), "%s@%s", ip, port); - securec_check_ss(rc, "", ""); - } - - if (secondAddress[0] != '\0') { - GetIPAndPort(secondAddress, ip, port, MAX_IPADDR_LEN); - rc = snprintf_s(secondAddress, addressLen, (addressLen - 1), "%s@%s", ip, port); - securec_check_ss(rc, "", ""); - } + /* address format: ip@port */ + if (RecoveryInProgress()) { + /* during recovery, walsnder is not valid so we cant get standby info from walsnder */ + GetRemoteReadAddressFromReplconn(firstAddress, secondAddress, addressLen); + } else { + GetFastestReplayStandByServiceAddress(firstAddress, secondAddress, addressLen); + } } else if (serverMode == STANDBY_MODE) { GetPrimaryServiceAddress(firstAddress, addressLen); if (firstAddress[0] != '\0') { diff --git a/src/include/service/remote_read_client.h b/src/include/service/remote_read_client.h index 032e119ec..9689bd324 100755 --- a/src/include/service/remote_read_client.h +++ b/src/include/service/remote_read_client.h @@ -43,6 +43,7 @@ extern int RemoteGetPage(char* remote_address, RepairBlockKey *key, uint32 block extern int RemoteGetFile(char* remoteAddress, RemoteReadFileKey *key, uint64 lsn, uint32 size, char* pageData, XLogRecPtr *remote_lsn, uint32 *remote_size, int timeout); extern int RemoteGetFileSize(char* remoteAddress, RemoteReadFileKey *key, uint64 lsn, int64 *size, int timeout); +extern uint64 RemoteGetXlogReplayPtr(char* remoteAddress); #endif /* REMOTE_READ_CLIENT_H */