diff --git a/src/common/backend/catalog/builtin_funcs.ini b/src/common/backend/catalog/builtin_funcs.ini index 2ccf0e06e..2886bbc26 100644 --- a/src/common/backend/catalog/builtin_funcs.ini +++ b/src/common/backend/catalog/builtin_funcs.ini @@ -12999,4 +12999,8 @@ AddFuncGroup( AddFuncGroup( "query_all_drc_info", 1, AddBuiltinFunc(_0(2870), _1("query_all_drc_info"), _2(1), _3(true), _4(true), _5(query_all_drc_info), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(27), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(1, INT4OID), _21(19, INT4OID, TEXTOID, INT4OID, INT8OID, INT4OID, INT4OID, INT4OID, INT4OID, INT4OID, INT4OID, INT2OID, INT8OID, INT8OID, INT2OID, INT4OID, INT4OID, INT4OID, INT4OID, INT4OID), _22(19, 'i', 'o', 'o', 'o','o','o','o','o','o','o', 'o', 'o', 'o','o','o','o','o','o','o'), _23(19, "TYPE", "RESOURCE_ID", "MASTER_ID", "COPY_INSTS", "CLAIMED_OWNER", "LOCK_MODE", "LAST_EDP", "TYPE", "IN_RECOVERY", "COPY_PROMOTE", "PART_ID", "EDP_MAP", "LSN", "LEN", "RECOVERY_SKIP", "RECYCLING", "CONVERTING_INST_ID", "CONVERTING_CURR_MODE", "CONVERTING_REQ_MODE"), _24(NULL), _25("query_all_drc_info"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("query all drc info"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + ), + AddFuncGroup( + "gs_get_recv_locations", 1, + AddBuiltinFunc(_0(2872), _1("gs_get_recv_locations"), _2(0), _3(false), _4(true), _5(gs_get_recv_locations), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(10), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(0), _21(4, 25, 25, 25, 25), _22(4, 'o', 'o', 'o', 'o'), _23(4, "received_lsn", "write_lsn", "flush_lsn", "replay_lsn"), _24(NULL), _25("gs_get_recv_locations"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("statistics: information about WAL locations"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) ), \ No newline at end of file diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index b90100333..e813f923e 100644 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -81,6 +81,7 @@ const uint32 GRAND_VERSION_NUM = 92929; * 2.VERSION NUM FOR EACH FEATURE * Please write indescending order. ********************************************/ +const uint32 FLUSH_LSN_VERSION_NUM = 92929; const uint32 PRIOR_EXPR_VERSION_NUM = 92928; const uint32 PUBLICATION_DDL_VERSION_NUM = 92921; const uint32 UPSERT_ALIAS_VERSION_NUM = 92920; diff --git a/src/gausskernel/storage/replication/walreceiver.cpp b/src/gausskernel/storage/replication/walreceiver.cpp index e82b1cba6..29ca19d9a 100755 --- a/src/gausskernel/storage/replication/walreceiver.cpp +++ b/src/gausskernel/storage/replication/walreceiver.cpp @@ -43,6 +43,7 @@ #include #include +#include "access/xlogreader.h" #include "access/xlog_internal.h" #include "access/xlog.h" #include "access/multi_redo_api.h" @@ -63,6 +64,7 @@ #include "storage/copydir.h" #include "storage/ipc.h" #include "storage/latch.h" +#include "storage/lock/lwlock.h" #include "storage/pmsignal.h" #include "storage/copydir.h" #include "storage/procarray.h" @@ -209,6 +211,7 @@ static void ProcessEndXLogMessage(EndXLogMessage *endXLogMessage); static void ProcessWalHeaderMessage(WalDataMessageHeader *msghdr); static void ProcessWalDataHeaderMessage(WalDataPageMessageHeader *msghdr); Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS); +Datum gs_get_recv_locations(PG_FUNCTION_ARGS); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); static void WalRcvShutdownHandler(SIGNAL_ARGS); @@ -2535,6 +2538,137 @@ Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) return (Datum)0; } +/* + * Descriptions: Returns activity of walreveiver, including pids and xlog + * locations received from primary o cascading server. + */ +Datum gs_get_recv_locations(PG_FUNCTION_ARGS) +{ +#define GS_STAT_GET_WAL_RECEIVER_COLS 4 + ReturnSetInfo *rsinfo = (ReturnSetInfo *)fcinfo->resultinfo; + TupleDesc tupdesc = NULL; + Tuplestorestate *tupstore = NULL; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + + volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; + + char location[MAXFNAMELEN * 3] = {0}; + + XLogRecPtr rcvRedo; + XLogRecPtr rcvWrite; + XLogRecPtr rcvFlush; + XLogRecPtr rcvReceived; + XLogRecPtr localMaxLSN; + pg_crc32 localMaxLsnCrc = 0; + uint32 localMaxLsnLen = 0; + char maxLsnMsg[XLOG_READER_MAX_MSGLENTH] = {0}; + TimeLineID tli = 0; + + Datum values[GS_STAT_GET_WAL_RECEIVER_COLS]; + bool nulls[GS_STAT_GET_WAL_RECEIVER_COLS]; + errno_t rc = EOK; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + return (Datum)0; + } + if (!(rsinfo->allowedModes & SFRM_Materialize)) { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + } + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE){ + ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("return type must be a row type"))); + } + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, u_sess->attr.attr_memory.work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + (void)MemoryContextSwitchTo(oldcontext); + + load_server_mode(); + if (t_thrd.xlog_cxt.server_mode != STANDBY_MODE && + t_thrd.xlog_cxt.server_mode != CASCADE_STANDBY_MODE && + t_thrd.xlog_cxt.server_mode != MAIN_STANDBY_MODE + ) { + return (Datum)0; + } + + SpinLockAcquire(&walrcv->mutex); + rcvReceived = walrcv->receiver_received_location; + rcvWrite = walrcv->receiver_write_location; + rcvFlush = walrcv->receiver_flush_location; + SpinLockRelease(&walrcv->mutex); + + rc = memset_s(nulls, sizeof(nulls), 0, sizeof(nulls)); + securec_check_c(rc, "\0", "\0"); + + if (!superuser() && !(isOperatoradmin(GetUserId()) && u_sess->attr.attr_security.operation_mode)) { + /* + * Only superusers can see details. Other users only get the pid + * value to know it's a receiver, but no details. + */ + rc = memset_s(&nulls[1], PG_STAT_GET_WAL_RECEIVER_COLS - 1, true, PG_STAT_GET_WAL_RECEIVER_COLS - 1); + securec_check(rc, "\0", "\0"); + } else { + + rcvRedo = GetXLogReplayRecPtr(NULL, NULL); + /* receiver_received_location */ + if (rcvReceived == 0) { + localMaxLSN = FindMaxLSN(t_thrd.proc_cxt.DataDir, maxLsnMsg, XLOG_READER_MAX_MSGLENTH, &localMaxLsnCrc, + &localMaxLsnLen, &tli); + if (XLogRecPtrIsInvalid(localMaxLSN)) { + ereport(WARNING, (errmsg("find local max lsn fail: %s", maxLsnMsg))); + }else if (XLByteLT(localMaxLSN, rcvRedo)) { + localMaxLSN = rcvRedo; + } + rcvReceived = localMaxLSN; + } + rc = snprintf_s(location, sizeof(location), sizeof(location) - 1, "%X/%X", (uint32)(rcvReceived >> 32), + (uint32)rcvReceived); + securec_check_ss(rc, "\0", "\0"); + values[0] = CStringGetTextDatum(location); + + /* receiver_write_location */ + if (rcvWrite == 0) { + rcvWrite = localMaxLSN; + } + rc = snprintf_s(location, sizeof(location), sizeof(location) - 1, "%X/%X", (uint32)(rcvWrite >> 32), + (uint32)rcvWrite); + securec_check_ss(rc, "\0", "\0"); + values[1] = CStringGetTextDatum(location); + + /* receiver_flush_location */ + if (rcvFlush == 0) { + rcvFlush = localMaxLSN; + } + rc = snprintf_s(location, sizeof(location), sizeof(location) - 1, "%X/%X", (uint32)(rcvFlush >> 32), + (uint32)rcvFlush); + securec_check_ss(rc, "\0", "\0"); + values[2] = CStringGetTextDatum(location); + + rc = snprintf_s(location, sizeof(location), sizeof(location) - 1, "%X/%X", (uint32)(rcvRedo >> 32), + (uint32)rcvRedo); + securec_check_ss(rc, "\0", "\0"); + values[3] = CStringGetTextDatum(location); + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + return (Datum)0; +} + /* * Returns activity of ha state, including static connections,local role, * database state and rebuild reason if database state is unnormal. diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_929.sql b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_929.sql new file mode 100644 index 000000000..39320f2cb --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_929.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS pg_catalog.gs_get_recv_locations(); \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_929.sql b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_929.sql new file mode 100644 index 000000000..39320f2cb --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_929.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS pg_catalog.gs_get_recv_locations(); \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_929.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_929.sql new file mode 100644 index 000000000..bccda4027 --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_929.sql @@ -0,0 +1,11 @@ +DROP FUNCTION IF EXISTS pg_catalog.gs_get_recv_locations() CASCADE; + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 2872; + +CREATE FUNCTION pg_catalog.gs_get_recv_locations( + out received_lsn text, + out write_lsn text, + out flush_lsn text, + out replay_lsn text) +RETURNS record LANGUAGE INTERNAL VOLATILE STRICT as 'gs_get_recv_locations'; +comment on function pg_catalog.gs_get_recv_locations() is 'statistics: information about currently wal locations'; \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_929.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_929.sql new file mode 100644 index 000000000..bccda4027 --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_929.sql @@ -0,0 +1,11 @@ +DROP FUNCTION IF EXISTS pg_catalog.gs_get_recv_locations() CASCADE; + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 2872; + +CREATE FUNCTION pg_catalog.gs_get_recv_locations( + out received_lsn text, + out write_lsn text, + out flush_lsn text, + out replay_lsn text) +RETURNS record LANGUAGE INTERNAL VOLATILE STRICT as 'gs_get_recv_locations'; +comment on function pg_catalog.gs_get_recv_locations() is 'statistics: information about currently wal locations'; \ No newline at end of file diff --git a/src/test/ha/ha_schedule_single b/src/test/ha/ha_schedule_single index dfb55c130..407394e6c 100644 --- a/src/test/ha/ha_schedule_single +++ b/src/test/ha/ha_schedule_single @@ -8,6 +8,7 @@ data_replication_single/datareplica_cstore_slow_catchup data_replication_single/datareplica_cstore_failover data_replication_single/datareplica_cstore_rstore data_replication_single/datareplica_failover_consistency +data_replication_single/datareplica_with_xlogreplica_status #data_replication_single/datareplica_forcepagewrite data_replication_single/datareplica_vacuum data_replication_single/datareplica_with_xlogreplica diff --git a/src/test/ha/testcase/data_replication_single/datareplica_with_xlogreplica_status.sh b/src/test/ha/testcase/data_replication_single/datareplica_with_xlogreplica_status.sh new file mode 100644 index 000000000..c8ba9989b --- /dev/null +++ b/src/test/ha/testcase/data_replication_single/datareplica_with_xlogreplica_status.sh @@ -0,0 +1,68 @@ +#!/bin/sh + +source ./standby_env.sh + +function test_1() +{ +check_instance + +cstore_rawdata_lines=8 + +#create table + +gsql -d $db -p $dn1_primary_port -c "DROP TABLE if exists cstore_copy_t1; create table cstore_copy_t1(c1 int2, c2 int4, c3 int8, c4 char(10), c5 varchar(12),c6 numeric(10,2)) with (orientation = column);" + +gsql -d $db -p $dn1_primary_port -c "set enable_data_replicate=off; copy cstore_copy_t1 from '$scripts_dir/data/cstore_copy_t1.data' delimiter '|';" + +#test the copy results on dn1_primary +if [ $(gsql -d $db -p $dn1_primary_port -c "select pgxc_pool_reload();select count(1) from cstore_copy_t1;" | grep `expr 1 \* $cstore_rawdata_lines` |wc -l) -eq 1 ]; then + echo "copy success on dn1_primary cstore_copy_t1" +else + echo "copy $failed_keyword on dn1_primary cstore_copy_t1" + exit 1 +fi + +#test the copy results on dn1_standby +if [ $(gsql -d $db -p $dn1_standby_port -m -c "select count(1) from cstore_copy_t1;" | grep `expr 1 \* $cstore_rawdata_lines` |wc -l) -eq 1 ]; then + echo "copy success on dn1_standby cstore_copy_t1" +else + echo "copy $failed_keyword on dn1_standby cstore_copy_t1" + exit 1 +fi +# stop primary +stop_primary +# sleep +sleep 10 + +#test the copy status on dn1_standby +if [ $(gsql -d $db -p $dn1_standby_port -m -c "select gs_get_recv_locations();" | awk -F'|' 'NR==3 {print $4}') == '0/0' ]; then + echo " dn1_standby recv status is wrong." + exit 1 +else + echo " dn1_standby recv status is normal." +fi + +stop_standby + +sleep 10 + +start_standby + + #test the copy status on dn1_standby +if [ $(gsql -d $db -p $dn1_standby_port -m -c "select gs_get_recv_locations();" | awk -F'|' 'NR==3 {print $4}') == '0/0' ]; then + echo " dn1_standby recv status is wrong." + exit 1 +else + echo " dn1_standby recv status is normal." +fi + +} + +function tear_down() +{ +sleep 1 +gsql -d $db -p $dn1_primary_port -c "DROP TABLE if exists cstore_copy_t1;" +} + +test_1 +tear_down \ No newline at end of file