Files
openGauss-server/src/gausskernel/storage/remote/remote_read.cpp
2023-08-17 11:34:12 +08:00

276 lines
9.0 KiB
C++
Executable File

/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* remote_read.cpp
* using simple C API interface
* Don't include any of RPC header file.
*
* IDENTIFICATION
* src/gausskernel/storage/remote/remote_read.cpp
*
* ---------------------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "pgxc/pgxc.h"
#include "postmaster/postmaster.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/remote_read.h"
#include "service/remote_read_client.h"
#include <arpa/inet.h>
/*
* @Description: get remote error message
* @IN error_code: remote read error code
* @Return: remote read error message
*/
const char* RemoteReadErrMsg(int error_code)
{
const char* error_msg = "";
switch (error_code) {
case REMOTE_READ_OK:
error_msg = "normal";
break;
case REMOTE_READ_NEED_WAIT:
error_msg = "local LSN larger than remote LSN and wait remote redo timeout";
break;
case REMOTE_READ_CRC_ERROR:
error_msg = "remote data checksum error, maybe remote data corrupted";
break;
case REMOTE_READ_RPC_ERROR:
error_msg = "rpc error";
break;
case REMOTE_READ_SIZE_ERROR:
error_msg = "size error";
break;
case REMOTE_READ_IO_ERROR:
error_msg = "io error";
break;
case REMOTE_READ_RPC_TIMEOUT:
error_msg = "timeout";
break;
case REMOTE_READ_BLCKSZ_NOT_SAME:
error_msg = "BLCKSZ different between remote and local";
break;
case REMOTE_READ_MEMCPY_ERROR:
error_msg = "memcpy_s error";
break;
case REMOTE_READ_CONN_ERROR:
error_msg = "remote connect status not ok";
break;
default:
error_msg = "error code unknown";
break;
}
return error_msg;
}
void GetPrimaryServiceAddress(char *address, size_t address_len)
{
if (address == NULL || address_len == 0 || t_thrd.walreceiverfuncs_cxt.WalRcv == NULL)
return;
bool is_running = false;
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
int rc = 0;
SpinLockAcquire(&walrcv->mutex);
is_running = walrcv->isRuning;
SpinLockRelease(&walrcv->mutex);
if (walrcv->pid == 0 || !is_running)
return;
SpinLockAcquire(&walrcv->mutex);
rc = snprintf_s(address, address_len, (address_len - 1), "%s@%d", walrcv->conn_channel.remotehost,
walrcv->conn_channel.remoteport);
securec_check_ss(rc, "\0", "\0");
SpinLockRelease(&walrcv->mutex);
}
static void FormatAddressByReplConn(replconninfo* replconninfo, char* remoteAddress, int addressLen)
{
int rc = snprintf_s(remoteAddress, addressLen, (addressLen - 1), "%s@%d",
replconninfo->remotehost,
replconninfo->remoteport);
securec_check_ss(rc, "", "");
}
/**
* in startup, walsnder is not ready. we need to get remote address from replConnArray
* @param firstAddress first address
* @param secondAddress second address
* @param addressLen address length
*/
static void GetRemoteReadAddressFromReplconn(char* firstAddress, char* secondAddress, size_t addressLen)
{
XLogRecPtr fastest_replay = InvalidXLogRecPtr;
XLogRecPtr second_fastest_replay = InvalidXLogRecPtr;
int fastest = 0;
int second_fastest = 0;
for (int i = 0; i < MAX_REPLNODE_NUM; i++) {
if (t_thrd.postmaster_cxt.ReplConnArray[i]) {
char remoteAddress[MAXPGPATH];
FormatAddressByReplConn(t_thrd.postmaster_cxt.ReplConnArray[i], remoteAddress, MAXPGPATH);
XLogRecPtr insertXLogRecPtr = RemoteGetXlogReplayPtr(remoteAddress);
if (XLByteLT(second_fastest_replay, insertXLogRecPtr)) {
if (XLByteLT(fastest_replay, insertXLogRecPtr)) {
/* walsnd_replay is larger than fastest_replay */
second_fastest = fastest;
second_fastest_replay = fastest_replay;
fastest = i;
fastest_replay = insertXLogRecPtr;
} else {
/* walsnd_replay is in the range (second_fastest_replay, fastest_replay] */
second_fastest = i;
second_fastest_replay = insertXLogRecPtr;
}
}
}
}
/* find fastest replay standby */
if (!XLogRecPtrIsInvalid(fastest_replay)) {
FormatAddressByReplConn(t_thrd.postmaster_cxt.ReplConnArray[fastest], firstAddress, MAXPGPATH);
}
/* find second fastest replay standby */
if (!XLogRecPtrIsInvalid(second_fastest_replay)) {
FormatAddressByReplConn(t_thrd.postmaster_cxt.ReplConnArray[second_fastest], secondAddress, MAXPGPATH);
}
}
/*
* @Description: get remote address
* @IN/OUT first_address: first address
* @IN/OUT second_address: second_address
* @IN address_len: address len
*/
void GetRemoteReadAddress(char* firstAddress, char* secondAddress, size_t addressLen)
{
char ip[MAX_IPADDR_LEN] = {0};
char port[MAX_IPADDR_LEN] = {0};
errno_t rc = EOK;
/* make sure first_address is correct */
if (firstAddress == NULL || addressLen == 0)
return;
volatile HaShmemData* hashmdata = t_thrd.postmaster_cxt.HaShmData;
ServerMode serverMode = hashmdata->current_mode;
if (IS_DN_DUMMY_STANDYS_MODE()) {
if (serverMode == PRIMARY_MODE && !IsPrimaryStandByReadyToRemoteRead()) {
return;
}
if (t_thrd.postmaster_cxt.ReplConnArray[1]) {
rc = snprintf_s(firstAddress, addressLen, (addressLen - 1),
"%s@%d", t_thrd.postmaster_cxt.ReplConnArray[1]->remotehost,
t_thrd.postmaster_cxt.ReplConnArray[1]->remoteport);
securec_check_ss(rc, "", "");
}
} else if (IS_DN_MULTI_STANDYS_MODE()) {
if (serverMode == PRIMARY_MODE) {
/* address format: ip@port */
if (RecoveryInProgress()) {
/* during recovery, walsnder is not valid so we cant get standby info from walsnder */
GetRemoteReadAddressFromReplconn(firstAddress, secondAddress, addressLen);
} else {
GetFastestReplayStandByServiceAddress(firstAddress, secondAddress, addressLen);
}
} else if (serverMode == STANDBY_MODE) {
GetPrimaryServiceAddress(firstAddress, addressLen);
if (firstAddress[0] != '\0') {
GetIPAndPort(firstAddress, ip, port, MAX_IPADDR_LEN);
rc = snprintf_s(firstAddress, addressLen, (addressLen - 1), "%s@%s", ip, port);
securec_check_ss(rc, "", "");
}
}
} else {
return;
}
}
void GetIPAndPort(char* address, char* ip, char* port, size_t len)
{
char* outerPtr = NULL;
errno_t rc = EOK;
char* tmpIp;
char* tempPort;
tmpIp = strtok_r(address, "@", &outerPtr);
tempPort = strtok_r(NULL, "@", &outerPtr);
if (tmpIp != NULL && tmpIp[0] != '\0' && tempPort != NULL && tempPort[0] != '\0' &&
strlen(tmpIp) + strlen(tempPort) + 1 < len) {
rc = strcpy_s(ip, MAX_IPADDR_LEN, tmpIp);
securec_check(rc, "", "");
rc = strcpy_s(port, MAX_IPADDR_LEN, tempPort);
securec_check(rc, "", "");
}
return;
}
/*
* @Description: have remote node to read
* @Return: true if have remote node
*/
bool CanRemoteRead()
{
volatile HaShmemData* hashmdata = t_thrd.postmaster_cxt.HaShmData;
ServerMode serveMode = hashmdata->current_mode;
if (IsRemoteReadModeOn() && !IS_DN_WITHOUT_STANDBYS_MODE() && IS_PGXC_DATANODE && serveMode != NORMAL_MODE &&
serveMode != PENDING_MODE && serveMode != STANDBY_MODE) {
return true;
}
return false;
}
/*
* @Description: remote mode is on
* @Return: true if emote read is on
*/
bool IsRemoteReadModeOn()
{
return g_instance.attr.attr_storage.remote_read_mode != REMOTE_READ_OFF;
}
/*
* @Description: set remote mode off and get old remote_read_mode
* @Return: old remote_read_mode
*/
int SetRemoteReadModeOffAndGetOldMode()
{
int oldRemoteRead = g_instance.attr.attr_storage.remote_read_mode;
g_instance.attr.attr_storage.remote_read_mode = REMOTE_READ_OFF;
return oldRemoteRead;
}
void SetRemoteReadMode(int mode)
{
g_instance.attr.attr_storage.remote_read_mode = mode;
return;
}