新增query_page_distribution_info系统函数, 推进DMS commit_id

This commit is contained in:
佘兴彬
2023-07-21 11:43:10 +08:00
parent f6052e4804
commit d7cb7b3982
12 changed files with 263 additions and 3 deletions

View File

@ -12916,3 +12916,7 @@ AddFuncGroup(
"query_node_reform_info", 1,
AddBuiltinFunc(_0(2867), _1("query_node_reform_info"), _2(3), _3(true), _4(true), _5(query_node_reform_info), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(64), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(0), _21(10, INT4OID, TEXTOID, TEXTOID, TEXTOID, BOOLOID, TEXTOID, TEXTOID, INT4OID, TEXTOID, TEXTOID), _22(10,'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o'), _23(10, "reform_node_id", "reform_type", "reform_start_time", "reform_end_time", "is_reform_success", "redo_start_time", "redo_end_time", "xlog_total_bytes", "hashmap_construct_time", "action"), _24(NULL), _25("query_node_reform_info"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(false), _32(false), _33("query node reform information"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0))
),
AddFuncGroup(
"query_page_distribution_info", 1,
AddBuiltinFunc(_0(2866), _1("query_page_distribution_info"), _2(3), _3(true), _4(true), _5(query_page_distribution_info), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(64), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(3, TEXTOID, INT4OID, INT4OID), _21(11, TEXTOID, INT4OID, INT4OID, INT4OID, BOOLOID, BOOLOID, BOOLOID, TEXTOID, OIDOID, OIDOID, BOOLOID), _22(11, 'i', 'i', 'i', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o'), _23(11, "relname", "fork", "blockno", "instance_id", "is_master", "is_owner", "is_copy", "lock_mode", "mem_lsn", "disk_lsn", "is_dirty"), _24(NULL), _25("query_page_distribution_info"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("statistics: query page distribution information "), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0))
),

View File

@ -62,6 +62,8 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/buf/buf_internals.h"
#include "storage/buf/bufmgr.h"
#include "storage/buf/bufpage.h"
#include "workload/cpwlm.h"
#include "workload/workload.h"
#include "pgxc/pgxcnode.h"
@ -14899,7 +14901,6 @@ Datum gs_get_index_status(PG_FUNCTION_ARGS)
}
SRF_RETURN_DONE(funcctx);
}
#endif
TupleDesc create_query_node_reform_info_tupdesc()
@ -15063,3 +15064,199 @@ Datum query_node_reform_info(PG_FUNCTION_ARGS)
}
SRF_RETURN_DONE(funcctx);
}
void buftag_get_buf_info(BufferTag tag, stat_buf_info_t *buf_info)
{
errno_t err = memset_s(buf_info, sizeof(stat_buf_info_t), 0, sizeof(stat_buf_info_t));
securec_check(err, "\0", "\0");
securec_check(err, "", "");
uint32 hash_code = BufTableHashCode(&tag);
LWLock *lock = BufMappingPartitionLock(hash_code);
(void)LWLockAcquire(lock, LW_SHARED);
int buf_id = BufTableLookup(&tag, hash_code);
if (buf_id >= 0) {
BufferDesc *buf_desc = GetBufferDescriptor(buf_id);
buf_info->rec_lsn = buf_desc->extra->rec_lsn;
buf_info->lsn_on_disk = buf_desc->extra->lsn_on_disk;
buf_info->aio_in_progress = buf_desc->extra->aio_in_progress;
buf_info->dirty_queue_loc = buf_desc->extra->dirty_queue_loc;
buf_info->mem_lsn = BufferGetLSN(buf_desc);
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
buf_info->lock_mode = buf_ctrl->lock_mode;
err = memcpy_s(buf_info->data, DMS_RESID_SIZE, &tag, sizeof(BufferTag));
securec_check(err, "", "");
LWLockRelease(lock);
} else {
LWLockRelease(lock);
ereport(INFO, (errmsg("buffer does not exist in local buffer pool ")));
}
}
RelFileNode relname_get_relfilenode(text* relname)
{
RelFileNode rnode;
RangeVar* relrv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
Relation rel = relation_openrv(relrv, AccessShareLock);
if (rel == NULL) {
ereport(ERROR, (errmsg("Open relation failed!")));
return rnode;
}
RelationOpenSmgr(rel);
rnode = rel->rd_smgr->smgr_rnode.node;
relation_close(rel, AccessShareLock);
return rnode;
}
TupleDesc create_query_page_distribution_info_tupdesc()
{
int column = 8;
TupleDesc tupdesc = CreateTemplateTupleDesc(column, false);
TupleDescInitEntry(tupdesc, (AttrNumber)1, "instance_id", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber)2, "is_master", BOOLOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber)3, "is_owner", BOOLOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber)4, "is_copy", BOOLOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber)5, "lock_mode", TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber)6, "mem_lsn", OIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber)7, "disk_lsn", OIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber)8, "is_dirty", BOOLOID, -1, 0);
BlessTupleDesc(tupdesc);
return tupdesc;
}
int compute_copy_insts_count(uint64 bitmap)
{
int count = 0;
for (uint8 i = 0; i < DMS_MAX_INSTANCES; i++) {
uint64 tmp = (uint64)1 << i;
if (bitmap & tmp) {
count++;
}
}
return count;
}
/* this struct is used to control the iteration during query_page_distribution_info */
typedef struct st_dms_iterate {
stat_drc_info_t *drc_info;
uint8 iterate_idx;
} dms_iterate_t;
Datum query_page_distribution_info_internal(text* relname, ForkNumber fork, BlockNumber blockno, PG_FUNCTION_ARGS)
{
if (fork >= MAX_FORKNUM) {
ereport(ERROR, (errmsg("[SS] forknumber must be less than MAX_FORKNUM(4)!")));
}
FuncCallContext *funcctx = NULL;
unsigned char masterId = CM_INVALID_ID8;
if (SRF_IS_FIRSTCALL()) {
RelFileNode rnode = relname_get_relfilenode(relname);
BufferTag tag;
INIT_BUFFERTAG(tag, rnode, fork, blockno);
stat_buf_info_t buf_info;
buftag_get_buf_info(tag, &buf_info);
char resid[DMS_PAGEID_SIZE];
errno_t rc = memcpy_s(resid, DMS_PAGEID_SIZE, &tag, sizeof(BufferTag));
securec_check(rc, "\0", "\0");
funcctx = SRF_FIRSTCALL_INIT();
MemoryContext oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
stat_drc_info_t *drc_info = (stat_drc_info_t*)palloc0(sizeof(stat_drc_info_t));
InitDmsBufContext(&drc_info->dms_ctx, tag);
drc_info->claimed_owner = CM_INVALID_ID8;
drc_info->buf_info[0] = buf_info;
rc = memcpy_s(drc_info->data, DMS_PAGEID_SIZE, &tag, sizeof(BufferTag));
securec_check(rc, "\0", "\0");
int is_found = 0;
int ret = get_drc_info(&is_found, drc_info);
if (ret != DMS_SUCCESS) {
ereport(ERROR, (errmsg("[SS] some errors occurred while querying DRC!")));
}
if (!is_found) {
ereport(INFO, (errmsg("[SS] could not find a DRC entry in DRC for page (%u/%u/%u/%d/%d %d-%u)!",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt, fork, blockno)));
}
int count = compute_copy_insts_count(drc_info->copy_insts);
dms_iterate_t *iterate = (dms_iterate_t*)palloc0(sizeof(dms_iterate_t));
iterate->drc_info = drc_info;
iterate->iterate_idx = 0;
count = (drc_info->claimed_owner == masterId) ? count : count + 1;
funcctx->user_fctx = (void*)iterate;
funcctx->tuple_desc = create_query_page_distribution_info_tupdesc();
funcctx->max_calls = (!is_found) ? 0 : (count + 1);
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
if (funcctx->call_cntr < funcctx->max_calls) {
Datum values[8];
bool nulls[8] = {false};
bool ret_tup = false;
dms_iterate_t *iterate = (dms_iterate_t*)funcctx->user_fctx;
for (uint8 i = iterate->iterate_idx; i < DMS_MAX_INSTANCES; i++) {
uint64 tmp = (uint64)1 << i;
if ((iterate->drc_info->copy_insts & tmp) || (iterate->drc_info->claimed_owner == i) || (iterate->drc_info->master_id == i)) {
ret_tup = true;
values[0] = UInt8GetDatum(i); // instance id
values[1] = BoolGetDatum(iterate->drc_info->master_id == i); // is master?
values[2] = BoolGetDatum(iterate->drc_info->claimed_owner == i); // is owner?
if (iterate->drc_info->copy_insts & tmp) { // is copy?
values[3] = BoolGetDatum(true);
iterate->drc_info->copy_insts = iterate->drc_info->copy_insts & ~tmp;
} else {
values[3] = BoolGetDatum(false);
}
if (iterate->drc_info->buf_info[i].lock_mode == 1) { // lock mode
values[4] = CStringGetTextDatum("Share lock");
} else if (iterate->drc_info->buf_info[i].lock_mode == 2) {
values[4] = CStringGetTextDatum("Exclusive lock");
} else {
values[4] = CStringGetTextDatum("No lock");
}
values[5] = UInt64GetDatum((uint64)iterate->drc_info->buf_info[i].mem_lsn); // mem lsn
values[6] = UInt64GetDatum((uint64)iterate->drc_info->buf_info[i].lsn_on_disk); // disk lsn
if (iterate->drc_info->buf_info[i].dirty_queue_loc != PG_UINT64_MAX && // is dirty?
iterate->drc_info->buf_info[i].dirty_queue_loc != 0) {
values[7] = BoolGetDatum(true);
} else {
values[7] = BoolGetDatum(false);
}
iterate->iterate_idx = i + 1;
break;
}
}
if (ret_tup) {
HeapTuple tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
if (tuple != NULL) {
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
}
} else {
SRF_RETURN_DONE(funcctx);
}
}
SRF_RETURN_DONE(funcctx);
}
Datum query_page_distribution_info(PG_FUNCTION_ARGS)
{
if (!ENABLE_DMS) {
ereport(ERROR, (errmsg("[SS] cannot query query_page_distribution_info without shared storage deployment!")));
}
if (SS_STANDBY_MODE) {
ereport(ERROR, (errmsg("[SS] cannot query query_page_distribution_info at Standby node while DMS enabled!")));
}
text* relname = PG_GETARG_TEXT_PP(0);
ForkNumber fork = PG_GETARG_INT64(1);
BlockNumber blockno = PG_GETARG_INT64(2);
return query_page_distribution_info_internal(relname, fork, blockno, fcinfo);
}

View File

@ -75,12 +75,13 @@ bool will_shutdown = false;
* NEXT | 92899 | ? | ?
*
********************************************/
const uint32 GRAND_VERSION_NUM = 92911;
const uint32 GRAND_VERSION_NUM = 92912;
/********************************************
* 2.VERSION NUM FOR EACH FEATURE
* Please write indescending order.
********************************************/
const uint32 PAGE_DIST_VERSION_NUM = 92912;
const uint32 NODE_REFORM_INFO_VERSION_NUM = 92911;
const uint32 GB18030_2022_VERSION_NUM = 92908;
const uint32 PARAM_MARK_VERSION_NUM = 92907;

View File

@ -129,6 +129,8 @@ int ss_dms_func_init()
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_reform_req_opengauss_ondemand_redo_buffer));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_get_mes_max_watting_rooms));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_send_opengauss_oldest_xmin));
SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_get_drc_info));
g_ss_dms_func.inited = true;
return DMS_SUCCESS;
}
@ -358,3 +360,8 @@ int dms_send_opengauss_oldest_xmin(dms_context_t *dms_ctx, unsigned long long ol
{
return g_ss_dms_func.dms_send_opengauss_oldest_xmin(dms_ctx, oldest_xmin, dest_id);
}
int get_drc_info(int* is_found, stat_drc_info_t* drc_info)
{
return g_ss_dms_func.dms_get_drc_info(is_found, drc_info);
}

View File

@ -47,6 +47,7 @@
#include "ddes/dms/ss_dms_bufmgr.h"
#include "storage/file/fio_device.h"
#include "storage/buf/bufmgr.h"
#include "storage/buf/buf_internals.h"
/*
* Wake up startup process to replay WAL, or to notice that
@ -2045,6 +2046,14 @@ int CBOndemandRedoPageForStandby(void *block_key, int32 *redo_status)
return GS_SUCCESS;;
}
void CBGetBufInfo(char* resid, stat_buf_info_t *buf_info)
{
BufferTag tag;
errno_t err = memcpy_s(&tag, DMS_RESID_SIZE, resid, DMS_RESID_SIZE);
securec_check(err, "\0", "\0");
buftag_get_buf_info(tag, buf_info);
}
void DmsInitCallback(dms_callback_t *callback)
{
// used in reform
@ -2109,4 +2118,6 @@ void DmsInitCallback(dms_callback_t *callback)
callback->cache_msg = CBCacheMsg;
callback->need_flush = CBMarkNeedFlush;
callback->update_node_oldest_xmin = CBUpdateNodeOldestXmin;
callback->get_buf_info = CBGetBufInfo;
}

View File

@ -0,0 +1 @@
DROP FUNCTION IF EXISTS pg_catalog.query_page_distribution_info() CASCADE;

View File

@ -0,0 +1 @@
DROP FUNCTION IF EXISTS pg_catalog.query_page_distribution_info() CASCADE;

View File

@ -0,0 +1,17 @@
DROP FUNCTION IF EXISTS pg_catalog.query_page_distribution_info() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 2866;
CREATE FUNCTION pg_catalog.query_page_distribution_info
(
text,
int4,
int4,
OUT master_id int4,
OUT is_master boolean,
OUT is_owner boolean,
OUT is_copy boolean,
OUT lock_mode text,
OUT mem_lsn bigint,
OUT disk_lsn bigint,
OUT is_dirty boolean
)
RETURNS SETOF record LANGUAGE INTERNAL as 'query_page_distribution_info';

View File

@ -0,0 +1,17 @@
DROP FUNCTION IF EXISTS pg_catalog.query_page_distribution_info() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 2866;
CREATE FUNCTION pg_catalog.query_page_distribution_info
(
text,
int4,
int4,
OUT master_id int4,
OUT is_master boolean,
OUT is_owner boolean,
OUT is_copy boolean,
OUT lock_mode text,
OUT mem_lsn bigint,
OUT disk_lsn bigint,
OUT is_dirty boolean
)
RETURNS SETOF record LANGUAGE INTERNAL as 'query_page_distribution_info';

View File

@ -882,7 +882,6 @@ typedef struct st_dms_callback {
//for shared storage backup
dms_set_inst_behavior set_inst_behavior;
dms_db_prepare db_prepare;
dms_get_buf_info get_buf_info;
} dms_callback_t;

View File

@ -32,6 +32,7 @@ extern "C" {
#endif
#define SS_LIBDMS_NAME "libdms.so"
#define CM_INVALID_ID8 0xff
typedef struct st_ss_dms_func {
bool inited;
@ -85,6 +86,7 @@ typedef struct st_ss_dms_func {
int *redo_status);
unsigned int (*dms_get_mes_max_watting_rooms)(void);
int (*dms_send_opengauss_oldest_xmin)(dms_context_t *dms_ctx, unsigned long long oldest_xmin, unsigned char dest_id);
int (*dms_get_drc_info)(int* is_found, stat_drc_info_t* drc_info);
} ss_dms_func_t;
int ss_dms_func_init();
@ -134,6 +136,8 @@ int dms_reform_req_opengauss_ondemand_redo_buffer(dms_context_t *dms_ctx, void *
unsigned int dms_get_mes_max_watting_rooms(void);
int dms_send_opengauss_oldest_xmin(dms_context_t *dms_ctx, unsigned long long oldest_xmin, unsigned char dest_id);
int get_drc_info(int* is_found, stat_drc_info_t* drc_info);
#ifdef __cplusplus
}
#endif

View File

@ -87,4 +87,5 @@ void SSUnPinBuffer(BufferDesc* buf_desc);
bool SSOndemandRequestPrimaryRedo(BufferTag tag);
bool SSLWLockAcquireTimeout(LWLock* lock, LWLockMode mode);
bool SSWaitIOTimeout(BufferDesc *buf);
void buftag_get_buf_info(BufferTag tag, stat_buf_info_t *buf_info);
#endif