This commit is contained in:
james
2024-05-06 15:04:11 +08:00
committed by yaoxin
parent bda4745f9e
commit 25ba275200
9 changed files with 232 additions and 0 deletions

View File

@ -12999,4 +12999,8 @@ AddFuncGroup(
AddFuncGroup(
"query_all_drc_info", 1,
AddBuiltinFunc(_0(2870), _1("query_all_drc_info"), _2(1), _3(true), _4(true), _5(query_all_drc_info), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(27), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(1, INT4OID), _21(19, INT4OID, TEXTOID, INT4OID, INT8OID, INT4OID, INT4OID, INT4OID, INT4OID, INT4OID, INT4OID, INT2OID, INT8OID, INT8OID, INT2OID, INT4OID, INT4OID, INT4OID, INT4OID, INT4OID), _22(19, 'i', 'o', 'o', 'o','o','o','o','o','o','o', 'o', 'o', 'o','o','o','o','o','o','o'), _23(19, "TYPE", "RESOURCE_ID", "MASTER_ID", "COPY_INSTS", "CLAIMED_OWNER", "LOCK_MODE", "LAST_EDP", "TYPE", "IN_RECOVERY", "COPY_PROMOTE", "PART_ID", "EDP_MAP", "LSN", "LEN", "RECOVERY_SKIP", "RECYCLING", "CONVERTING_INST_ID", "CONVERTING_CURR_MODE", "CONVERTING_REQ_MODE"), _24(NULL), _25("query_all_drc_info"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("query all drc info"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0))
),
AddFuncGroup(
"gs_get_recv_locations", 1,
AddBuiltinFunc(_0(2872), _1("gs_get_recv_locations"), _2(0), _3(false), _4(true), _5(gs_get_recv_locations), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(10), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(0), _21(4, 25, 25, 25, 25), _22(4, 'o', 'o', 'o', 'o'), _23(4, "received_lsn", "write_lsn", "flush_lsn", "replay_lsn"), _24(NULL), _25("gs_get_recv_locations"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("statistics: information about WAL locations"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0))
),

View File

@ -81,6 +81,7 @@ const uint32 GRAND_VERSION_NUM = 92929;
* 2.VERSION NUM FOR EACH FEATURE
* Please write indescending order.
********************************************/
const uint32 FLUSH_LSN_VERSION_NUM = 92929;
const uint32 PRIOR_EXPR_VERSION_NUM = 92928;
const uint32 PUBLICATION_DDL_VERSION_NUM = 92921;
const uint32 UPSERT_ALIAS_VERSION_NUM = 92920;

View File

@ -43,6 +43,7 @@
#include <sys/stat.h>
#include <string>
#include "access/xlogreader.h"
#include "access/xlog_internal.h"
#include "access/xlog.h"
#include "access/multi_redo_api.h"
@ -63,6 +64,7 @@
#include "storage/copydir.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lock/lwlock.h"
#include "storage/pmsignal.h"
#include "storage/copydir.h"
#include "storage/procarray.h"
@ -209,6 +211,7 @@ static void ProcessEndXLogMessage(EndXLogMessage *endXLogMessage);
static void ProcessWalHeaderMessage(WalDataMessageHeader *msghdr);
static void ProcessWalDataHeaderMessage(WalDataPageMessageHeader *msghdr);
Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS);
Datum gs_get_recv_locations(PG_FUNCTION_ARGS);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
static void WalRcvShutdownHandler(SIGNAL_ARGS);
@ -2535,6 +2538,137 @@ Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
return (Datum)0;
}
/*
* Descriptions: Returns activity of walreveiver, including pids and xlog
* locations received from primary o cascading server.
*/
Datum gs_get_recv_locations(PG_FUNCTION_ARGS)
{
#define GS_STAT_GET_WAL_RECEIVER_COLS 4
ReturnSetInfo *rsinfo = (ReturnSetInfo *)fcinfo->resultinfo;
TupleDesc tupdesc = NULL;
Tuplestorestate *tupstore = NULL;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
char location[MAXFNAMELEN * 3] = {0};
XLogRecPtr rcvRedo;
XLogRecPtr rcvWrite;
XLogRecPtr rcvFlush;
XLogRecPtr rcvReceived;
XLogRecPtr localMaxLSN;
pg_crc32 localMaxLsnCrc = 0;
uint32 localMaxLsnLen = 0;
char maxLsnMsg[XLOG_READER_MAX_MSGLENTH] = {0};
TimeLineID tli = 0;
Datum values[GS_STAT_GET_WAL_RECEIVER_COLS];
bool nulls[GS_STAT_GET_WAL_RECEIVER_COLS];
errno_t rc = EOK;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
return (Datum)0;
}
if (!(rsinfo->allowedModes & SFRM_Materialize)) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not allowed in this context")));
}
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE){
ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("return type must be a row type")));
}
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
tupstore = tuplestore_begin_heap(true, false, u_sess->attr.attr_memory.work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
(void)MemoryContextSwitchTo(oldcontext);
load_server_mode();
if (t_thrd.xlog_cxt.server_mode != STANDBY_MODE &&
t_thrd.xlog_cxt.server_mode != CASCADE_STANDBY_MODE &&
t_thrd.xlog_cxt.server_mode != MAIN_STANDBY_MODE
) {
return (Datum)0;
}
SpinLockAcquire(&walrcv->mutex);
rcvReceived = walrcv->receiver_received_location;
rcvWrite = walrcv->receiver_write_location;
rcvFlush = walrcv->receiver_flush_location;
SpinLockRelease(&walrcv->mutex);
rc = memset_s(nulls, sizeof(nulls), 0, sizeof(nulls));
securec_check_c(rc, "\0", "\0");
if (!superuser() && !(isOperatoradmin(GetUserId()) && u_sess->attr.attr_security.operation_mode)) {
/*
* Only superusers can see details. Other users only get the pid
* value to know it's a receiver, but no details.
*/
rc = memset_s(&nulls[1], PG_STAT_GET_WAL_RECEIVER_COLS - 1, true, PG_STAT_GET_WAL_RECEIVER_COLS - 1);
securec_check(rc, "\0", "\0");
} else {
rcvRedo = GetXLogReplayRecPtr(NULL, NULL);
/* receiver_received_location */
if (rcvReceived == 0) {
localMaxLSN = FindMaxLSN(t_thrd.proc_cxt.DataDir, maxLsnMsg, XLOG_READER_MAX_MSGLENTH, &localMaxLsnCrc,
&localMaxLsnLen, &tli);
if (XLogRecPtrIsInvalid(localMaxLSN)) {
ereport(WARNING, (errmsg("find local max lsn fail: %s", maxLsnMsg)));
}else if (XLByteLT(localMaxLSN, rcvRedo)) {
localMaxLSN = rcvRedo;
}
rcvReceived = localMaxLSN;
}
rc = snprintf_s(location, sizeof(location), sizeof(location) - 1, "%X/%X", (uint32)(rcvReceived >> 32),
(uint32)rcvReceived);
securec_check_ss(rc, "\0", "\0");
values[0] = CStringGetTextDatum(location);
/* receiver_write_location */
if (rcvWrite == 0) {
rcvWrite = localMaxLSN;
}
rc = snprintf_s(location, sizeof(location), sizeof(location) - 1, "%X/%X", (uint32)(rcvWrite >> 32),
(uint32)rcvWrite);
securec_check_ss(rc, "\0", "\0");
values[1] = CStringGetTextDatum(location);
/* receiver_flush_location */
if (rcvFlush == 0) {
rcvFlush = localMaxLSN;
}
rc = snprintf_s(location, sizeof(location), sizeof(location) - 1, "%X/%X", (uint32)(rcvFlush >> 32),
(uint32)rcvFlush);
securec_check_ss(rc, "\0", "\0");
values[2] = CStringGetTextDatum(location);
rc = snprintf_s(location, sizeof(location), sizeof(location) - 1, "%X/%X", (uint32)(rcvRedo >> 32),
(uint32)rcvRedo);
securec_check_ss(rc, "\0", "\0");
values[3] = CStringGetTextDatum(location);
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
return (Datum)0;
}
/*
* Returns activity of ha state, including static connections,local role,
* database state and rebuild reason if database state is unnormal.

View File

@ -0,0 +1 @@
DROP FUNCTION IF EXISTS pg_catalog.gs_get_recv_locations();

View File

@ -0,0 +1 @@
DROP FUNCTION IF EXISTS pg_catalog.gs_get_recv_locations();

View File

@ -0,0 +1,11 @@
DROP FUNCTION IF EXISTS pg_catalog.gs_get_recv_locations() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 2872;
CREATE FUNCTION pg_catalog.gs_get_recv_locations(
out received_lsn text,
out write_lsn text,
out flush_lsn text,
out replay_lsn text)
RETURNS record LANGUAGE INTERNAL VOLATILE STRICT as 'gs_get_recv_locations';
comment on function pg_catalog.gs_get_recv_locations() is 'statistics: information about currently wal locations';

View File

@ -0,0 +1,11 @@
DROP FUNCTION IF EXISTS pg_catalog.gs_get_recv_locations() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 2872;
CREATE FUNCTION pg_catalog.gs_get_recv_locations(
out received_lsn text,
out write_lsn text,
out flush_lsn text,
out replay_lsn text)
RETURNS record LANGUAGE INTERNAL VOLATILE STRICT as 'gs_get_recv_locations';
comment on function pg_catalog.gs_get_recv_locations() is 'statistics: information about currently wal locations';

View File

@ -8,6 +8,7 @@ data_replication_single/datareplica_cstore_slow_catchup
data_replication_single/datareplica_cstore_failover
data_replication_single/datareplica_cstore_rstore
data_replication_single/datareplica_failover_consistency
data_replication_single/datareplica_with_xlogreplica_status
#data_replication_single/datareplica_forcepagewrite
data_replication_single/datareplica_vacuum
data_replication_single/datareplica_with_xlogreplica

View File

@ -0,0 +1,68 @@
#!/bin/sh
source ./standby_env.sh
function test_1()
{
check_instance
cstore_rawdata_lines=8
#create table
gsql -d $db -p $dn1_primary_port -c "DROP TABLE if exists cstore_copy_t1; create table cstore_copy_t1(c1 int2, c2 int4, c3 int8, c4 char(10), c5 varchar(12),c6 numeric(10,2)) with (orientation = column);"
gsql -d $db -p $dn1_primary_port -c "set enable_data_replicate=off; copy cstore_copy_t1 from '$scripts_dir/data/cstore_copy_t1.data' delimiter '|';"
#test the copy results on dn1_primary
if [ $(gsql -d $db -p $dn1_primary_port -c "select pgxc_pool_reload();select count(1) from cstore_copy_t1;" | grep `expr 1 \* $cstore_rawdata_lines` |wc -l) -eq 1 ]; then
echo "copy success on dn1_primary cstore_copy_t1"
else
echo "copy $failed_keyword on dn1_primary cstore_copy_t1"
exit 1
fi
#test the copy results on dn1_standby
if [ $(gsql -d $db -p $dn1_standby_port -m -c "select count(1) from cstore_copy_t1;" | grep `expr 1 \* $cstore_rawdata_lines` |wc -l) -eq 1 ]; then
echo "copy success on dn1_standby cstore_copy_t1"
else
echo "copy $failed_keyword on dn1_standby cstore_copy_t1"
exit 1
fi
# stop primary
stop_primary
# sleep
sleep 10
#test the copy status on dn1_standby
if [ $(gsql -d $db -p $dn1_standby_port -m -c "select gs_get_recv_locations();" | awk -F'|' 'NR==3 {print $4}') == '0/0' ]; then
echo " dn1_standby recv status is wrong."
exit 1
else
echo " dn1_standby recv status is normal."
fi
stop_standby
sleep 10
start_standby
#test the copy status on dn1_standby
if [ $(gsql -d $db -p $dn1_standby_port -m -c "select gs_get_recv_locations();" | awk -F'|' 'NR==3 {print $4}') == '0/0' ]; then
echo " dn1_standby recv status is wrong."
exit 1
else
echo " dn1_standby recv status is normal."
fi
}
function tear_down()
{
sleep 1
gsql -d $db -p $dn1_primary_port -c "DROP TABLE if exists cstore_copy_t1;"
}
test_1
tear_down