@ -223,6 +223,8 @@ THR_LOCAL bool redo_oldversion_xlog = false;
|
||||
|
||||
static const int ONE_SECOND = 1000000;
|
||||
|
||||
const int UWAL_SLEEP_TIME = 10000;
|
||||
|
||||
/*
|
||||
* XLOGfileslop is the maximum number of preallocated future XLOG segments.
|
||||
* When we are done with an old XLOG segment file, we will recycle it as a
|
||||
@ -3094,13 +3096,21 @@ static void XLogWriteUwal(XLogRecPtr UwalWriteRqst)
|
||||
pgstat_report_waitevent(WAIT_EVENT_WAL_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(startTime);
|
||||
if (g_instance.attr.attr_storage.uwal_async_append_switch) {
|
||||
while ((ret = GsUwalWriteAsync(&t_thrd.xlog_cxt.uwalInfo.id, nbytes, from, infos)) ==
|
||||
REPLICA_APPEND_LOG_FAIL)
|
||||
;
|
||||
ret = GsUwalWriteAsync(&t_thrd.xlog_cxt.uwalInfo.id, nbytes, from, infos);
|
||||
while (ret == REPLICA_APPEND_LOG_FAIL || ret == U_CAPACITY_NOT_ENOUGH) {
|
||||
if (ret == U_CAPACITY_NOT_ENOUGH) {
|
||||
usleep(UWAL_SLEEP_TIME);
|
||||
}
|
||||
ret = GsUwalWriteAsync(&t_thrd.xlog_cxt.uwalInfo.id, nbytes, from, infos);
|
||||
}
|
||||
} else {
|
||||
while ((ret = GsUwalWrite(&t_thrd.xlog_cxt.uwalInfo.id, nbytes, from, infos)) ==
|
||||
REPLICA_APPEND_LOG_FAIL)
|
||||
;
|
||||
ret = GsUwalWrite(&t_thrd.xlog_cxt.uwalInfo.id, nbytes, from, infos);
|
||||
while (ret == REPLICA_APPEND_LOG_FAIL || ret == U_CAPACITY_NOT_ENOUGH) {
|
||||
if (ret == U_CAPACITY_NOT_ENOUGH) {
|
||||
usleep(UWAL_SLEEP_TIME);
|
||||
}
|
||||
ret = GsUwalWrite(&t_thrd.xlog_cxt.uwalInfo.id, nbytes, from, infos);
|
||||
}
|
||||
}
|
||||
INSTR_TIME_SET_CURRENT(endTime);
|
||||
INSTR_TIME_SUBTRACT(endTime, startTime);
|
||||
|
@ -336,6 +336,41 @@ static bool GsUwalParseConfig(cJSON *uwalConfJSON)
|
||||
securec_check(rc, "\0", "\0");
|
||||
}
|
||||
|
||||
cJSON *replinodesJSON = cJSON_GetObjectItem(uwalConfJSON, "uwal_replinodes");
|
||||
if (cJSON_IsArray(replinodesJSON)) {
|
||||
int arrLen = cJSON_GetArraySize(replinodesJSON);
|
||||
for (int i = 0; i < arrLen; i++) {
|
||||
cJSON *subObj = cJSON_GetArrayItem(replinodesJSON, i);
|
||||
if (nullptr == subObj) {
|
||||
continue;
|
||||
} else {
|
||||
cJSON *subIdJSON = cJSON_GetObjectItem(subObj, "id");
|
||||
if (subIdJSON == nullptr) {
|
||||
ereport(ERROR, (errmsg("No item uwal_nodeid in uwal_replinodes")));
|
||||
return false;
|
||||
}
|
||||
if (subIdJSON->valueint < 0 || subIdJSON->valueint >= MAX_GAUSS_NODE) {
|
||||
ereport(ERROR, (errmsg("uwal_nodeid out of range [0, 7]")));
|
||||
return false;
|
||||
}
|
||||
int nodeId = subIdJSON->valueint;
|
||||
cJSON *subProtocolJSON = cJSON_GetObjectItem(subObj, "protocol");
|
||||
if (subProtocolJSON == nullptr) {
|
||||
ereport(WARNING, (errmsg("No item protocol in uwal_replinodes, use the default protocol tcp")));
|
||||
rc = strcpy_s(g_uwalConfig.repliNodes[nodeId], UWAL_PROTOCOL_LEN, "tcp");
|
||||
securec_check(rc, "\0", "\0");
|
||||
} else if (strcasecmp(subProtocolJSON->valuestring, "rdma") && strcasecmp(subProtocolJSON->valuestring, "tcp")) {
|
||||
ereport(WARNING, (errmsg("protocol only support tcp and rdma, use the default protocol tcp")));
|
||||
rc = strcpy_s(g_uwalConfig.repliNodes[nodeId], UWAL_PROTOCOL_LEN, "tcp");
|
||||
securec_check(rc, "\0", "\0");
|
||||
} else {
|
||||
rc = strcpy_s(g_uwalConfig.repliNodes[nodeId], UWAL_PROTOCOL_LEN, subProtocolJSON->valuestring);
|
||||
securec_check(rc, "\0", "\0");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ParseUwalCpuBindInfo(uwalConfJSON);
|
||||
return true;
|
||||
}
|
||||
@ -717,7 +752,7 @@ int GsUwalWalSenderNotify(bool exceptSelf)
|
||||
netInfo.ipv4Addr = ock_uwal_ipv4_inet_to_int((char *)replConnInfo->remoteuwalhost);
|
||||
netInfo.port = replConnInfo->remoteuwalport;
|
||||
netInfo.protocol = NET_PROTOCOL_TCP;
|
||||
if (!strcasecmp(g_uwalConfig.protocol, "rdma")) {
|
||||
if (!strcasecmp(g_uwalConfig.protocol, "rdma") && !strcasecmp(g_uwalConfig.repliNodes[standbyStateInfo.nodeId], "rdma")) {
|
||||
netInfo.protocol = NET_PROTOCOL_RDMA;
|
||||
}
|
||||
|
||||
@ -785,7 +820,7 @@ int GsUwalWalReceiverNotify(bool isConnectedToPrimary)
|
||||
netInfo.ipv4Addr = ock_uwal_ipv4_inet_to_int((char *)replConnInfo->remotehost);
|
||||
netInfo.port = replConnInfo->remoteuwalport;
|
||||
netInfo.protocol = NET_PROTOCOL_TCP;
|
||||
if (!strcasecmp(g_uwalConfig.protocol, "rdma")) {
|
||||
if (!strcasecmp(g_uwalConfig.protocol, "rdma") && !strcasecmp(g_uwalConfig.repliNodes[primaryStateInfo.nodeId], "rdma")) {
|
||||
netInfo.protocol = NET_PROTOCOL_RDMA;
|
||||
}
|
||||
|
||||
|
@ -83,6 +83,7 @@ typedef struct {
|
||||
bool bindCpuSwitch;
|
||||
int bindCpuNum;
|
||||
int bindCpuStart;
|
||||
char repliNodes[MAX_NODE_NUM][UWAL_PROTOCOL_LEN];
|
||||
} UwalConfig;
|
||||
|
||||
int gs_uwal_load_symbols();
|
||||
|
Reference in New Issue
Block a user