2194 lines
78 KiB
C++
2194 lines
78 KiB
C++
/*
|
|
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
|
|
*
|
|
* openGauss is licensed under Mulan PSL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PSL v2.
|
|
* You may obtain a copy of Mulan PSL v2 at:
|
|
*
|
|
* http://license.coscl.org.cn/MulanPSL2
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PSL v2 for more details.
|
|
* -------------------------------------------------------------------------
|
|
*
|
|
* instr_unique_sql.cpp
|
|
* functions for unique SQL
|
|
*
|
|
* IDENTIFICATION
|
|
* src/gausskernel/cbb/instruments/unique_sql/unique_sql.cpp
|
|
*
|
|
* -------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
#include "knl/knl_variable.h"
|
|
#include "instruments/instr_unique_sql.h"
|
|
#include "instruments/unique_query.h"
|
|
#include "utils/atomic.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/hsearch.h"
|
|
#include "access/hash.h"
|
|
#include "access/xact.h"
|
|
#include "executor/hashjoin.h"
|
|
#include "utils/memutils.h"
|
|
#include "miscadmin.h"
|
|
#include "pgxc/pgxc.h"
|
|
#include "pgstat.h"
|
|
#include "funcapi.h"
|
|
#include "parser/analyze.h"
|
|
#include "commands/prepare.h"
|
|
#include "libpq/pqformat.h"
|
|
#include "libpq/libpq.h"
|
|
#include "commands/user.h"
|
|
namespace UniqueSq {
|
|
void unique_sql_post_parse_analyze(ParseState* pstate, Query* query);
|
|
int get_conn_count_from_all_handles(PGXCNodeAllHandles* pgxc_handles, bool is_cn);
|
|
PGXCNodeHandle** get_handles_from_all_handles(PGXCNodeAllHandles* pgxc_handles, bool is_cn);
|
|
} // namespace UniqueSq
|
|
/*
|
|
* Note: for unique sql track type
|
|
*
|
|
* top - only top SQL, for example, when call a proc, there will be
|
|
* multi-sqls in the proc implementatation, only the original
|
|
* call proc SQL will be collected by unique sql module.
|
|
* all - (not currently supported) take above case as example, all statements will be recorded
|
|
*/
|
|
/* unique SQL max hash table size */
|
|
const int UNIQUE_SQL_MAX_HASH_SIZE = 1000;
|
|
|
|
/*
|
|
* CN to collect unique sql stat from DN, each time will
|
|
* send "predefined size" unique sql keys to DN
|
|
*/
|
|
const int UNIQUE_SQL_IDS_ARRAY_SIZE = 10;
|
|
|
|
/* used for max unique sql string length,
|
|
* using GUC: pgstat_track_activity_query_size,
|
|
* as PgBackendStatus::st_activity also using the GUC parameter
|
|
*/
|
|
#define UNIQUE_SQL_MAX_LEN (g_instance.attr.attr_common.pgstat_track_activity_query_size + 1)
|
|
#define MAX_UINT32 (0xFFFFFFFF)
|
|
/*
|
|
* Unique SQL Key - CN + User Oid + Unique SQL ID
|
|
*
|
|
* Using CN node name as one member of the key, for below reasons:
|
|
* - in the future, Unique SQL purge logic can remove items by CN name.
|
|
* - same Unique SQL ID, maybe ran on diff CN nodes, so the stat information
|
|
* cannot be updated into one same entry
|
|
* - when we want to get unique SQL entry stat from unique sql view, CN will
|
|
* pull stat from DN, so can get self DN stat, no other CN's stat on DN
|
|
* - using CN node id not name, can save the key size and network transfer
|
|
* cost
|
|
* - for one specific CN, node oids on different CNs are not same, but node_id
|
|
* is same
|
|
*/
|
|
typedef struct {
|
|
uint32 cn_id; /* SQL is run on which CN node,
|
|
* same with node_id in PGXC_NODE */
|
|
Oid user_id; /* user id */
|
|
uint64 unique_sql_id; /* unique sql id */
|
|
} UniqueSQLKey;
|
|
|
|
typedef struct {
|
|
int64 total_time; /* total time for the unique sql entry */
|
|
int64 min_time; /* min time for unique sql entry's history events */
|
|
int64 max_time; /* max time for unique sql entry's history events */
|
|
} UniqueSQLElapseTime;
|
|
|
|
typedef struct UniqueSQLTime {
|
|
int64 TimeInfoArray[TOTAL_TIME_INFO_TYPES];
|
|
} UniqueSQLTime;
|
|
|
|
typedef struct {
|
|
pg_atomic_uint64 returned_rows; /* select SQL returned rows */
|
|
|
|
pg_atomic_uint64 tuples_fetched; /* randowm IO */
|
|
pg_atomic_uint64 tuples_returned; /* sequence IO */
|
|
|
|
pg_atomic_uint64 tuples_inserted; /* inserted tuples counter */
|
|
pg_atomic_uint64 tuples_updated; /* updated tuples counter */
|
|
pg_atomic_uint64 tuples_deleted; /* deleted tuples counter */
|
|
} UniqueSQLRowActivity;
|
|
|
|
typedef struct {
|
|
pg_atomic_uint64 blocks_fetched; /* the blocks fetched times */
|
|
pg_atomic_uint64 blocks_hit; /* the blocks hit times in buffer */
|
|
} UniqueSQLCacheIO;
|
|
|
|
typedef struct {
|
|
pg_atomic_uint64 soft_parse; /* reuse plan counter */
|
|
pg_atomic_uint64 hard_parse; /* new generated plan counter */
|
|
} UniqueSQLParse;
|
|
|
|
// we use uint64 to store data, but SQL type only support signed 64, how to fix this issue?
|
|
typedef struct {
|
|
UniqueSQLKey key; /* CN oid + user oid + unique sql id */
|
|
|
|
/* alloc extra UNIQUE_SQL_MAX_LEN space to store unique sql string */
|
|
char* unique_sql; /* unique sql text */
|
|
|
|
pg_atomic_uint64 calls; /* calling times */
|
|
UniqueSQLElapseTime elapse_time; /* elapst time stat in ms */
|
|
UniqueSQLTime timeInfo;
|
|
|
|
UniqueSQLRowActivity row_activity; /* row activity */
|
|
UniqueSQLCacheIO cache_io; /* cache/IO */
|
|
UniqueSQLParse parse; /* hard/soft parse counter */
|
|
bool is_local; /* local sql(run from current node) */
|
|
UniqueSQLWorkMemInfo sort_state; /* work mem info of sort operation */
|
|
UniqueSQLWorkMemInfo hash_state; /* work mem info of hash operation */
|
|
} UniqueSQL;
|
|
|
|
/* ---------Thread Local Variable---------- */
|
|
/* save prev-hooks */
|
|
static post_parse_analyze_hook_type g_prev_post_parse_analyze_hook = NULL;
|
|
|
|
static uint32 uniqueSQLHashCode(const void* key, Size size)
|
|
{
|
|
const UniqueSQLKey* k = (const UniqueSQLKey*)key;
|
|
|
|
return hash_uint32((uint32)k->cn_id) ^ hash_uint32((uint32)k->user_id) ^ hash_uint32((uint32)k->unique_sql_id);
|
|
}
|
|
|
|
static int uniqueSQLMatch(const void* key1, const void* key2, Size keysize)
|
|
{
|
|
const UniqueSQLKey* k1 = (const UniqueSQLKey*)key1;
|
|
const UniqueSQLKey* k2 = (const UniqueSQLKey*)key2;
|
|
|
|
if (k1 != NULL && k2 != NULL && k1->user_id == k2->user_id && k1->unique_sql_id == k2->unique_sql_id) {
|
|
return 0;
|
|
} else {
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
static LWLock* LockUniqueSQLHashPartition(uint32 hashCode, LWLockMode lockMode)
|
|
{
|
|
LWLock* partitionLock = GetMainLWLockByIndex(FirstUniqueSQLMappingLock + (hashCode % NUM_UNIQUE_SQL_PARTITIONS));
|
|
LWLockAcquire(partitionLock, lockMode);
|
|
|
|
return partitionLock;
|
|
}
|
|
|
|
static void UnlockUniqueSQLHashPartition(uint32 hashCode)
|
|
{
|
|
LWLock* partitionLock = GetMainLWLockByIndex(FirstUniqueSQLMappingLock + (hashCode % NUM_UNIQUE_SQL_PARTITIONS));
|
|
LWLockRelease(partitionLock);
|
|
}
|
|
|
|
/*
|
|
* resetUniqueSQLEntry - reset UniqueSQL entry except key
|
|
*/
|
|
static void resetUniqueSQLEntry(UniqueSQL* entry)
|
|
{
|
|
if (entry != NULL) {
|
|
pg_atomic_write_u64(&(entry->calls), 0);
|
|
entry->unique_sql = NULL;
|
|
|
|
// reset elapse time stat
|
|
gs_lock_test_and_set_64(&(entry->elapse_time.total_time), 0);
|
|
gs_lock_test_and_set_64(&(entry->elapse_time.min_time), 0);
|
|
gs_lock_test_and_set_64(&(entry->elapse_time.max_time), 0);
|
|
|
|
// reset row activity stat
|
|
pg_atomic_write_u64(&(entry->row_activity.returned_rows), 0);
|
|
pg_atomic_write_u64(&(entry->row_activity.tuples_fetched), 0);
|
|
pg_atomic_write_u64(&(entry->row_activity.tuples_returned), 0);
|
|
pg_atomic_write_u64(&(entry->row_activity.tuples_inserted), 0);
|
|
pg_atomic_write_u64(&(entry->row_activity.tuples_updated), 0);
|
|
pg_atomic_write_u64(&(entry->row_activity.tuples_deleted), 0);
|
|
|
|
// cache_io
|
|
pg_atomic_write_u64(&(entry->cache_io.blocks_fetched), 0);
|
|
pg_atomic_write_u64(&(entry->cache_io.blocks_hit), 0);
|
|
|
|
// parse info
|
|
pg_atomic_write_u64(&(entry->parse.soft_parse), 0);
|
|
pg_atomic_write_u64(&(entry->parse.hard_parse), 0);
|
|
|
|
// sort hash work_mem info
|
|
pg_atomic_write_u64(&(entry->sort_state.counts), 0);
|
|
gs_lock_test_and_set_64(&(entry->sort_state.total_time), 0);
|
|
gs_lock_test_and_set_64(&(entry->sort_state.used_work_mem), 0);
|
|
pg_atomic_write_u64(&(entry->sort_state.spill_counts), 0);
|
|
pg_atomic_write_u64(&(entry->sort_state.spill_size), 0);
|
|
|
|
pg_atomic_write_u64(&(entry->hash_state.counts), 0);
|
|
gs_lock_test_and_set_64(&(entry->hash_state.total_time), 0);
|
|
gs_lock_test_and_set_64(&(entry->hash_state.used_work_mem), 0);
|
|
pg_atomic_write_u64(&(entry->hash_state.spill_counts), 0);
|
|
pg_atomic_write_u64(&(entry->hash_state.spill_size), 0);
|
|
|
|
// time Info
|
|
for (uint32 idx = 0; idx < TOTAL_TIME_INFO_TYPES; idx++) {
|
|
gs_lock_test_and_set_64(&(entry->timeInfo.TimeInfoArray[idx]), 0);
|
|
}
|
|
|
|
entry->is_local = false;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* InitUniqueSQL - init unique sql resources
|
|
*
|
|
* Init resource when postmaster startup,
|
|
* basic unique SQL objects are created
|
|
* without GUI parameter control.
|
|
*/
|
|
void InitUniqueSQL()
|
|
{
|
|
// init memory context
|
|
g_instance.stat_cxt.UniqueSqlContext = AllocSetContextCreate(g_instance.instance_context,
|
|
"UniqueSQLContext",
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
ALLOCSET_DEFAULT_MAXSIZE,
|
|
SHARED_CONTEXT);
|
|
|
|
// init unique sql hash table
|
|
HASHCTL ctl;
|
|
errno_t rc;
|
|
|
|
rc = memset_s(&ctl, sizeof(ctl), 0, sizeof(ctl));
|
|
securec_check_c(rc, "\0", "\0");
|
|
|
|
ctl.hcxt = g_instance.stat_cxt.UniqueSqlContext;
|
|
ctl.keysize = sizeof(UniqueSQLKey);
|
|
|
|
// alloc extra space for normalized query(only CN stores sql string)
|
|
if (need_normalize_unique_string()) {
|
|
ctl.entrysize = sizeof(UniqueSQL) + UNIQUE_SQL_MAX_LEN;
|
|
} else {
|
|
ctl.entrysize = sizeof(UniqueSQL);
|
|
}
|
|
|
|
ctl.hash = uniqueSQLHashCode;
|
|
ctl.match = uniqueSQLMatch;
|
|
ctl.num_partitions = NUM_UNIQUE_SQL_PARTITIONS;
|
|
|
|
g_instance.stat_cxt.UniqueSQLHashtbl = hash_create("unique sql hash table",
|
|
UNIQUE_SQL_MAX_HASH_SIZE,
|
|
&ctl,
|
|
HASH_ELEM | HASH_SHRCTX | HASH_FUNCTION | HASH_COMPARE | HASH_PARTITION | HASH_NOEXCEPT);
|
|
}
|
|
|
|
void instr_unique_sql_register_hook()
|
|
{
|
|
// only register the hooks on CN
|
|
if ((!IS_PGXC_COORDINATOR) && (!IS_SINGLE_NODE)) {
|
|
return;
|
|
}
|
|
|
|
// register hooks
|
|
g_prev_post_parse_analyze_hook = post_parse_analyze_hook;
|
|
post_parse_analyze_hook = UniqueSq::unique_sql_post_parse_analyze;
|
|
}
|
|
|
|
/*
|
|
* UpdateUniqueSQLCalls - update unique SQL calls
|
|
*/
|
|
static void UpdateUniqueSQLCalls(UniqueSQL* unique_sql)
|
|
{
|
|
Assert(u_sess->attr.attr_resource.enable_resource_track &&
|
|
(u_sess->attr.attr_common.instr_unique_sql_count > 0));
|
|
|
|
if (unique_sql == NULL) {
|
|
return;
|
|
}
|
|
|
|
pg_atomic_fetch_add_u64(&(unique_sql->calls), 1);
|
|
}
|
|
|
|
/*
|
|
* updateMaxValueForAtomicType - using atomic type to store max value,
|
|
* we need update the max value by using atomic method
|
|
*/
|
|
static void updateMaxValueForAtomicType(int64 new_val, int64* max)
|
|
{
|
|
int64 prev;
|
|
do {
|
|
prev = *max;
|
|
} while (prev < new_val && !gs_compare_and_swap_64(max, prev, new_val));
|
|
}
|
|
|
|
/*
|
|
* updateMinValueForAtomicType - update ming value for atomic type
|
|
*/
|
|
static void updateMinValueForAtomicType(int64 new_val, int64* mix)
|
|
{
|
|
int64 prev;
|
|
do {
|
|
prev = *mix;
|
|
} while ((prev == 0 || prev > new_val) && !gs_compare_and_swap_64(mix, prev, new_val));
|
|
}
|
|
|
|
/*
|
|
* UpdateUniqueSQLElapseTime - update elase time of the unique sql
|
|
*
|
|
* elapse end time is calculated in this function
|
|
*/
|
|
static void UpdateUniqueSQLElapseTime(UniqueSQL* unique_sql, int64 elapse_start)
|
|
{
|
|
Assert(u_sess->attr.attr_resource.enable_resource_track &&
|
|
(u_sess->attr.attr_common.instr_unique_sql_count > 0));
|
|
if (unique_sql == NULL || elapse_start == 0) {
|
|
return;
|
|
}
|
|
|
|
TimestampTz elapse_time = GetCurrentTimestamp() - elapse_start;
|
|
|
|
/* update unique sql's total/max/min time */
|
|
gs_atomic_add_64(&(unique_sql->elapse_time.total_time), elapse_time);
|
|
updateMaxValueForAtomicType(elapse_time, &(unique_sql->elapse_time.max_time));
|
|
updateMinValueForAtomicType(elapse_time, &(unique_sql->elapse_time.min_time));
|
|
}
|
|
|
|
/*
|
|
* UpdateUniqueSQLStatDetail - update row activity or cache/IO
|
|
*/
|
|
static void UpdateUniqueSQLStatDetail(UniqueSQL* unique_sql, PgStat_TableCounts* agg_table_stat, uint64 returned_rows)
|
|
{
|
|
Assert(u_sess->attr.attr_resource.enable_resource_track &&
|
|
(u_sess->attr.attr_common.instr_unique_sql_count > 0));
|
|
if (unique_sql == NULL) {
|
|
return;
|
|
}
|
|
|
|
if (agg_table_stat != NULL) {
|
|
// row activity
|
|
pg_atomic_fetch_add_u64(&unique_sql->row_activity.tuples_fetched, agg_table_stat->t_tuples_fetched);
|
|
pg_atomic_fetch_add_u64(&unique_sql->row_activity.tuples_returned, agg_table_stat->t_tuples_returned);
|
|
|
|
pg_atomic_fetch_add_u64(&unique_sql->row_activity.tuples_inserted, agg_table_stat->t_tuples_inserted);
|
|
pg_atomic_fetch_add_u64(&unique_sql->row_activity.tuples_updated, agg_table_stat->t_tuples_updated);
|
|
pg_atomic_fetch_add_u64(&unique_sql->row_activity.tuples_deleted, agg_table_stat->t_tuples_deleted);
|
|
|
|
// cache_io
|
|
pg_atomic_fetch_add_u64(&unique_sql->cache_io.blocks_fetched, agg_table_stat->t_blocks_fetched);
|
|
pg_atomic_fetch_add_u64(&unique_sql->cache_io.blocks_hit, agg_table_stat->t_blocks_hit);
|
|
}
|
|
|
|
if (returned_rows > 0) {
|
|
pg_atomic_fetch_add_u64(&unique_sql->row_activity.returned_rows, returned_rows);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* update hard/soft parse counter info
|
|
*
|
|
* soft parse - reuse plan
|
|
* hard parse - generate new plan
|
|
*/
|
|
static void UpdateUniqueSQLParse(UniqueSQL* unique_sql)
|
|
{
|
|
if (unique_sql == NULL) {
|
|
return;
|
|
}
|
|
|
|
if (u_sess->unique_sql_cxt.unique_sql_soft_parse > 0) {
|
|
pg_atomic_fetch_add_u64(&unique_sql->parse.soft_parse, u_sess->unique_sql_cxt.unique_sql_soft_parse);
|
|
}
|
|
|
|
if (u_sess->unique_sql_cxt.unique_sql_hard_parse > 0) {
|
|
pg_atomic_fetch_add_u64(&unique_sql->parse.hard_parse, u_sess->unique_sql_cxt.unique_sql_hard_parse);
|
|
}
|
|
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] unique id: %lu, agg soft parse: %lu, hard parse: %lu",
|
|
u_sess->unique_sql_cxt.unique_sql_id,
|
|
u_sess->unique_sql_cxt.unique_sql_soft_parse,
|
|
u_sess->unique_sql_cxt.unique_sql_hard_parse)));
|
|
|
|
/* for parse counter, when update unique stat, aggregate&reset last parse counter */
|
|
UniqueSQLStatCountResetParseCounter();
|
|
}
|
|
|
|
/*
|
|
* update sort/hash work_mem info when executor run finished
|
|
*/
|
|
static void UpdateUniqueSQLSortHashInfo(UniqueSQL* unique_sql)
|
|
{
|
|
if (unique_sql == NULL) {
|
|
return;
|
|
}
|
|
|
|
if (u_sess->unique_sql_cxt.unique_sql_sort_instr->has_sorthash) {
|
|
/* if query contains SORT operation, the unique sql sort info will be updated */
|
|
unique_sql_instr* sort_instr = u_sess->unique_sql_cxt.unique_sql_sort_instr;
|
|
|
|
pg_atomic_fetch_add_u64(&unique_sql->sort_state.counts, sort_instr->counts);
|
|
gs_atomic_add_64(&unique_sql->sort_state.total_time, sort_instr->total_time);
|
|
gs_atomic_add_64(&unique_sql->sort_state.used_work_mem, sort_instr->used_work_mem);
|
|
pg_atomic_fetch_add_u64(&unique_sql->sort_state.spill_counts, sort_instr->spill_counts);
|
|
pg_atomic_fetch_add_u64(&unique_sql->sort_state.spill_size, sort_instr->spill_size);
|
|
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] unique id: %lu, sort state updated",
|
|
u_sess->unique_sql_cxt.unique_sql_id)));
|
|
|
|
/* reset the sort counter */
|
|
errno_t rc = memset_s(sort_instr, sizeof(unique_sql_instr), 0, sizeof(unique_sql_instr));
|
|
securec_check(rc, "", "");
|
|
}
|
|
|
|
if (u_sess->unique_sql_cxt.unique_sql_hash_instr->has_sorthash) {
|
|
/* if query contains HASH operation, the unique sql hash info should be updated */
|
|
unique_sql_instr* hash_instr = u_sess->unique_sql_cxt.unique_sql_hash_instr;
|
|
|
|
pg_atomic_fetch_add_u64(&unique_sql->hash_state.counts, hash_instr->counts);
|
|
gs_atomic_add_64(&unique_sql->hash_state.total_time, hash_instr->total_time);
|
|
gs_atomic_add_64(&unique_sql->hash_state.used_work_mem, hash_instr->used_work_mem);
|
|
pg_atomic_fetch_add_u64(&unique_sql->hash_state.spill_counts, hash_instr->spill_counts);
|
|
pg_atomic_fetch_add_u64(&unique_sql->hash_state.spill_size, hash_instr->spill_size);
|
|
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] unique id: %lu, hash state updated",
|
|
u_sess->unique_sql_cxt.unique_sql_id)));
|
|
|
|
/* reset the hash counter */
|
|
errno_t rc = memset_s(hash_instr, sizeof(unique_sql_instr), 0, sizeof(unique_sql_instr));
|
|
securec_check(rc, "", "");
|
|
}
|
|
}
|
|
|
|
/*
|
|
* get hash state from hashtable to update unqiue sql hash info later
|
|
*/
|
|
void UpdateUniqueSQLHashStats(HashJoinTable hashtable, TimestampTz* start_time)
|
|
{
|
|
if (!is_unique_sql_enabled() || !is_local_unique_sql()) {
|
|
return;
|
|
}
|
|
|
|
/* the first time enter hash operation, init hash state */
|
|
unique_sql_instr* instr = u_sess->unique_sql_cxt.unique_sql_hash_instr;
|
|
instr->has_sorthash = true;
|
|
|
|
if (hashtable == NULL) {
|
|
/* update time info */
|
|
if (*start_time == 0) {
|
|
*start_time = GetCurrentTimestamp();
|
|
} else {
|
|
/* increased by hash exec time */
|
|
instr->total_time += GetCurrentTimestamp() - *start_time;
|
|
}
|
|
} else {
|
|
/* update work mem info */
|
|
instr->counts += 1;
|
|
instr->spill_counts += hashtable->spill_count;
|
|
|
|
/* get the space used in kbs */
|
|
instr->spill_size += (*hashtable->spill_size + 1023) / 1024;
|
|
instr->used_work_mem += (hashtable->spacePeak + 1023) / 1024;
|
|
}
|
|
}
|
|
|
|
/* UpdateUniqueSQLVecSortStats - parse the vector sort information from the Batchsortstate,
|
|
* used to update for the uniuqe sql sort infomation.
|
|
*/
|
|
void UpdateUniqueSQLVecSortStats(Batchsortstate* state, uint64 spill_count, TimestampTz* start_time)
|
|
{
|
|
if (!is_unique_sql_enabled() || !is_local_unique_sql()) {
|
|
return;
|
|
}
|
|
unique_sql_instr* instr = u_sess->unique_sql_cxt.unique_sql_sort_instr;
|
|
/* the first time enter sort executor and init the state */
|
|
instr->has_sorthash = true;
|
|
|
|
if (*start_time == 0) {
|
|
*start_time = GetCurrentTimestamp();
|
|
} else if (state != NULL) {
|
|
instr->counts += 1;
|
|
instr->total_time += GetCurrentTimestamp() - *start_time;
|
|
|
|
/* update vect sort info of space used in kbs */
|
|
if (state->m_tapeset != NULL) {
|
|
instr->spill_counts += spill_count;
|
|
instr->spill_size += LogicalTapeSetBlocks(state->m_tapeset) * (BLCKSZ / 1024);
|
|
} else {
|
|
instr->used_work_mem += (state->m_allowedMem - state->m_availMem + 1023) / 1024;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
static void mask_unique_sql_str(UniqueSQL* unique_sql)
|
|
{
|
|
errno_t rc;
|
|
|
|
/* hide password */
|
|
if (unique_sql->unique_sql != NULL) {
|
|
char* mask_str = NULL;
|
|
mask_str = maskPassword(unique_sql->unique_sql);
|
|
if (mask_str != NULL) {
|
|
rc = memset_s(unique_sql->unique_sql, UNIQUE_SQL_MAX_LEN - 1, 0, UNIQUE_SQL_MAX_LEN - 1);
|
|
securec_check(rc, "\0", "\0");
|
|
|
|
/* after calling maskPassword, mask_str can be longer than original string,
|
|
* now the length of masked password('*..*') is fixed to 'password_min_length'(GUC) */
|
|
size_t valid_mask_len = strlen(mask_str) > (size_t)(UNIQUE_SQL_MAX_LEN - 1)
|
|
? (size_t)(UNIQUE_SQL_MAX_LEN - 1)
|
|
: strlen(mask_str);
|
|
rc = memcpy_s(unique_sql->unique_sql, UNIQUE_SQL_MAX_LEN - 1, mask_str, valid_mask_len);
|
|
securec_check(rc, "\0", "\0");
|
|
pfree(mask_str);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void set_unique_sql_string_in_entry(UniqueSQL* entry, Query* query, const char* sql)
|
|
{
|
|
errno_t rc = EOK;
|
|
|
|
// only CN stores normalized query string
|
|
if (need_normalize_unique_string()) {
|
|
entry->unique_sql = (char*)(entry + 1);
|
|
rc = memset_s(entry->unique_sql, UNIQUE_SQL_MAX_LEN, 0, UNIQUE_SQL_MAX_LEN);
|
|
securec_check(rc, "\0", "\0");
|
|
} else {
|
|
entry->unique_sql = NULL;
|
|
}
|
|
|
|
if (entry->unique_sql != NULL && sql != NULL && query != NULL) {
|
|
entry->is_local = true;
|
|
// generate and store normalized query string
|
|
if (normalized_unique_querystring(query, sql, entry->unique_sql, UNIQUE_SQL_MAX_LEN - 1)) {
|
|
mask_unique_sql_str(entry);
|
|
entry->unique_sql = trim(entry->unique_sql);
|
|
} else {
|
|
ereport(LOG,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] unique id: %lu, normalized "
|
|
"SQL failed!",
|
|
u_sess->unique_sql_cxt.unique_sql_id)));
|
|
}
|
|
}
|
|
}
|
|
|
|
static void UpdateUniqueSQLTimeStat(UniqueSQL* entry, int64 timeInfo[])
|
|
{
|
|
if (timeInfo != NULL) {
|
|
int idx;
|
|
|
|
for (idx = 0; idx < TOTAL_TIME_INFO_TYPES; idx++) {
|
|
entry->timeInfo.TimeInfoArray[idx] += timeInfo[idx];
|
|
}
|
|
}
|
|
}
|
|
|
|
bool isUniqueSQLContextInvalid()
|
|
{
|
|
if (u_sess->unique_sql_cxt.unique_sql_id == 0 ||
|
|
g_instance.stat_cxt.UniqueSQLHashtbl == NULL ||
|
|
!OidIsValid(u_sess->unique_sql_cxt.unique_sql_user_id)) {
|
|
return true;
|
|
}
|
|
|
|
if (IS_SINGLE_NODE) {
|
|
return false;
|
|
}
|
|
|
|
if (!OidIsValid(u_sess->unique_sql_cxt.unique_sql_cn_id)) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void UpdateSingleNodeByPassUniqueSQLStat(bool isTopLevel)
|
|
{
|
|
if (IS_SINGLE_NODE && is_unique_sql_enabled() && isTopLevel) {
|
|
if (IS_UNIQUE_SQL_TRACK_TOP && IsTopUniqueSQL()) {
|
|
UpdateUniqueSQLStat(NULL, NULL, u_sess->unique_sql_cxt.unique_sql_start_time);
|
|
}
|
|
|
|
if (u_sess->unique_sql_cxt.unique_sql_start_time != 0) {
|
|
int64 duration = GetCurrentTimestamp() - u_sess->unique_sql_cxt.unique_sql_start_time;
|
|
pgstat_update_responstime_singlenode(
|
|
u_sess->unique_sql_cxt.unique_sql_id, u_sess->unique_sql_cxt.unique_sql_start_time, duration);
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* UpdateUniqueSQLStat - update unique sql's stat info
|
|
*
|
|
* 1, find/create unique sql entry
|
|
* 2, udpate stat info
|
|
* - calls
|
|
* - response time(only input the start elapse time)
|
|
*/
|
|
void UpdateUniqueSQLStat(
|
|
Query* query, const char* sql, int64 elapse_start_time, PgStat_TableCounts* agg_table_stat, int64 timeInfo[])
|
|
{
|
|
/* unique sql id can't be zero */
|
|
if (isUniqueSQLContextInvalid()) {
|
|
ereport(DEBUG2,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] Failed to update entry - cn id: %u, user id: %u, unique sql id: %lu,",
|
|
u_sess->unique_sql_cxt.unique_sql_cn_id,
|
|
u_sess->unique_sql_cxt.unique_sql_user_id,
|
|
u_sess->unique_sql_cxt.unique_sql_id)));
|
|
return;
|
|
}
|
|
|
|
UniqueSQLKey key;
|
|
UniqueSQL* entry = NULL;
|
|
bool found = false;
|
|
|
|
key.unique_sql_id = u_sess->unique_sql_cxt.unique_sql_id;
|
|
key.cn_id = u_sess->unique_sql_cxt.unique_sql_cn_id;
|
|
key.user_id = u_sess->unique_sql_cxt.unique_sql_user_id;
|
|
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] update entry - cn id: %u, user id: %u, sql id: %lu,",
|
|
key.cn_id,
|
|
key.user_id,
|
|
key.unique_sql_id)));
|
|
|
|
uint32 hashCode = uniqueSQLHashCode(&key, sizeof(key));
|
|
|
|
LockUniqueSQLHashPartition(hashCode, LW_SHARED);
|
|
entry = (UniqueSQL*)hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &key, HASH_FIND, NULL);
|
|
if (entry == NULL) {
|
|
UnlockUniqueSQLHashPartition(hashCode);
|
|
|
|
/*
|
|
* Handle race condition between HASH insert/clean
|
|
* consider below scenario:
|
|
* 1, user runs SQL S1 firstly.
|
|
* 2, insert normalized sql string to unique sql hash
|
|
* 3, ----> clean unique sql hash table(another session)
|
|
* 4, insert/update other SQL stat, such as, soft/hard parse counter, etc.
|
|
* 5, then the sql text field will be empty forever.
|
|
*
|
|
* solution:
|
|
* 1, on DN/remote CN(utiliti statement), no special control
|
|
* 2, on local CN, if entry not existed, only can insert new entry when query(Query *)
|
|
* is not NULL
|
|
*/
|
|
if (is_local_unique_sql() && query == NULL) {
|
|
return;
|
|
}
|
|
|
|
LockUniqueSQLHashPartition(hashCode, LW_EXCLUSIVE);
|
|
entry = (UniqueSQL*)hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &key, HASH_ENTER, &found);
|
|
// out of memory
|
|
if (entry == NULL) {
|
|
UnlockUniqueSQLHashPartition(hashCode);
|
|
return;
|
|
}
|
|
|
|
if (!found) {
|
|
/* control unique sql number by instr_unique_sql_count. */
|
|
long totalCount = hash_get_num_entries(g_instance.stat_cxt.UniqueSQLHashtbl);
|
|
if ((is_local_unique_sql()) &&
|
|
totalCount > u_sess->attr.attr_common.instr_unique_sql_count) {
|
|
hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &key, HASH_REMOVE, NULL);
|
|
UnlockUniqueSQLHashPartition(hashCode);
|
|
ereport(
|
|
DEBUG2, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] Failed to insert unique sql for up to limit")));
|
|
return;
|
|
} else {
|
|
resetUniqueSQLEntry(entry);
|
|
set_unique_sql_string_in_entry(entry, query, sql);
|
|
}
|
|
}
|
|
}
|
|
|
|
if ((IS_PGXC_COORDINATOR || IS_SINGLE_NODE) && elapse_start_time != 0) {
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, update entry n_calls", key.unique_sql_id)));
|
|
|
|
UpdateUniqueSQLCalls(entry);
|
|
UpdateUniqueSQLElapseTime(entry, elapse_start_time);
|
|
UpdateUniqueSQLStatDetail(entry, NULL, u_sess->unique_sql_cxt.unique_sql_returned_rows_counter);
|
|
} else if (IS_PGXC_DATANODE && agg_table_stat != NULL) {
|
|
UpdateUniqueSQLStatDetail(entry, agg_table_stat, 0);
|
|
}
|
|
/* parse info(CN & DN) */
|
|
UpdateUniqueSQLParse(entry);
|
|
|
|
// record SQL's time Info
|
|
UpdateUniqueSQLTimeStat(entry, timeInfo);
|
|
|
|
/* Sort&Hash info */
|
|
UpdateUniqueSQLSortHashInfo(entry);
|
|
|
|
UnlockUniqueSQLHashPartition(hashCode);
|
|
}
|
|
|
|
/*
|
|
* SendUniqueSQLIds - send unique sql ids to specific DN
|
|
* return false failed
|
|
*/
|
|
static bool SendUniqueSQLIds(
|
|
PGXCNodeHandle* handle, uint32* cn_ids, Oid* user_ids, uint64* query_ids, uint32* slot_indexes, uint count)
|
|
{
|
|
/*
|
|
* msg format:
|
|
* 'i' + 4 + 1 + 4 + (4 + 4 + 4 + 8) * count
|
|
* 'i' + msg_len + 's'+ count + (slot_indexes + cn id + user_oid + unique_sql_id) + (...)
|
|
*/
|
|
size_t msg_len = sizeof(uint32) + sizeof(char) + sizeof(uint32) +
|
|
(sizeof(uint32) + sizeof(uint32) + sizeof(Oid) + sizeof(uint64)) * count;
|
|
int rc;
|
|
uint32 n32;
|
|
|
|
if (handle->state != DN_CONNECTION_STATE_IDLE) {
|
|
return false;
|
|
}
|
|
|
|
ensure_out_buffer_capacity(1 + msg_len, handle);
|
|
Assert(handle->outBuffer != NULL);
|
|
/* instrumentation */
|
|
handle->outBuffer[handle->outEnd++] = 'i';
|
|
|
|
/* message length, not including 'i' */
|
|
msg_len = htonl(msg_len);
|
|
rc = memcpy_s(handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, &msg_len, sizeof(uint32));
|
|
securec_check(rc, "\0", "\0");
|
|
handle->outEnd += sizeof(uint32);
|
|
|
|
/* unique sql ids to collect stat */
|
|
handle->outBuffer[handle->outEnd++] = 's';
|
|
|
|
n32 = htonl(count);
|
|
rc = memcpy_s(handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, &n32, sizeof(uint32));
|
|
securec_check(rc, "\0", "\0");
|
|
handle->outEnd += sizeof(uint32);
|
|
|
|
for (uint32 i = 0; i < count; i++) {
|
|
/* slot index */
|
|
rc = memcpy_s(
|
|
handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, slot_indexes + i, sizeof(uint32));
|
|
securec_check(rc, "\0", "\0");
|
|
handle->outEnd += sizeof(uint32);
|
|
|
|
/* cn id */
|
|
rc = memcpy_s(handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, cn_ids + i, sizeof(uint32));
|
|
securec_check(rc, "\0", "\0");
|
|
handle->outEnd += sizeof(uint32);
|
|
|
|
/* user id */
|
|
rc = memcpy_s(handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, user_ids + i, sizeof(Oid));
|
|
securec_check(rc, "\0", "\0");
|
|
handle->outEnd += sizeof(Oid);
|
|
|
|
/* query id */
|
|
rc = memcpy_s(
|
|
handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, query_ids + i, sizeof(uint64));
|
|
securec_check(rc, "\0", "\0");
|
|
handle->outEnd += sizeof(uint64);
|
|
}
|
|
|
|
handle->state = DN_CONNECTION_STATE_QUERY;
|
|
ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] send unique sql IDs to CN/DN, count: %u", count)));
|
|
|
|
return pgxc_node_flush(handle) == 0;
|
|
}
|
|
|
|
/*
|
|
* AggUniqueSQLStat - update unique sql's stat from DNs
|
|
*
|
|
* unique_sql_array - preallocated unique sql array
|
|
* msg - msg from other CN/DN
|
|
*/
|
|
static void AggUniqueSQLStat(UniqueSQL* unique_sql_array, int arr_size, StringInfoData* recv_msg)
|
|
{
|
|
if (unique_sql_array == NULL || arr_size < 0) {
|
|
return;
|
|
}
|
|
|
|
uint32 index = (uint32)pq_getmsgint(recv_msg, sizeof(uint32));
|
|
if (index >= (uint32)arr_size) {
|
|
ereport(LOG,
|
|
(errmodule(MOD_INSTR), errmsg("[UniqueSQL] receive stat: index %u >= array size: %d!", index, arr_size)));
|
|
return;
|
|
}
|
|
|
|
uint32 recv_cn_id = (uint32)pq_getmsgint(recv_msg, sizeof(uint32));
|
|
Oid recv_user_id = (Oid)pq_getmsgint(recv_msg, sizeof(uint32));
|
|
uint64 recv_unique_sql_id = (uint64)pq_getmsgint64(recv_msg);
|
|
if (unique_sql_array[index].key.cn_id != recv_cn_id || unique_sql_array[index].key.user_id != recv_user_id ||
|
|
unique_sql_array[index].key.unique_sql_id != recv_unique_sql_id) {
|
|
ereport(ERROR, (errmsg("[UniqueSQL] check unique sql array slot index!")));
|
|
}
|
|
|
|
// row activity
|
|
unique_sql_array[index].row_activity.returned_rows += (uint64)pq_getmsgint64(recv_msg);
|
|
unique_sql_array[index].row_activity.tuples_fetched += (uint64)pq_getmsgint64(recv_msg);
|
|
unique_sql_array[index].row_activity.tuples_returned += (uint64)pq_getmsgint64(recv_msg);
|
|
unique_sql_array[index].row_activity.tuples_inserted += (uint64)pq_getmsgint64(recv_msg);
|
|
unique_sql_array[index].row_activity.tuples_updated += (uint64)pq_getmsgint64(recv_msg);
|
|
unique_sql_array[index].row_activity.tuples_deleted += (uint64)pq_getmsgint64(recv_msg);
|
|
|
|
// cache io
|
|
unique_sql_array[index].cache_io.blocks_fetched += (uint64)pq_getmsgint64(recv_msg);
|
|
unique_sql_array[index].cache_io.blocks_hit += (uint64)pq_getmsgint64(recv_msg);
|
|
|
|
// parse info
|
|
unique_sql_array[index].parse.soft_parse += (uint64)pq_getmsgint64(recv_msg);
|
|
unique_sql_array[index].parse.hard_parse += (uint64)pq_getmsgint64(recv_msg);
|
|
|
|
// time Info
|
|
for (uint32 idx = 0; idx < TOTAL_TIME_INFO_TYPES; idx++) {
|
|
unique_sql_array[index].timeInfo.TimeInfoArray[idx] += (uint64)pq_getmsgint64(recv_msg);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* * @Description: handle response messages from remote(CN/DN)
|
|
* */
|
|
static void handle_message_from_remote(
|
|
UniqueSQL* unique_sql_array, int arr_size, PGXCNodeHandle* handle, bool* hasError, bool* isFinished)
|
|
{
|
|
Assert(handle != NULL);
|
|
char* msg = NULL;
|
|
int len = 0;
|
|
char msgType = get_message(handle, &len, &msg);
|
|
switch (msgType) {
|
|
case '\0': /* message is not completed */
|
|
case 'A': /* NotificationResponse */
|
|
case 'S': /* SetCommandComplete */
|
|
break;
|
|
case 'E': /* message is error */
|
|
*hasError = true;
|
|
break;
|
|
case 'r': {
|
|
if (len <= 0) {
|
|
ereport(ERROR,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] invalid 'r' message. node: %s", handle->remoteNodeName)));
|
|
break;
|
|
}
|
|
StringInfoData recvMsg;
|
|
initStringInfo(&recvMsg);
|
|
appendBinaryStringInfo(&recvMsg, msg, len);
|
|
AggUniqueSQLStat(unique_sql_array, arr_size, &recvMsg);
|
|
pq_getmsgend(&recvMsg);
|
|
pfree(recvMsg.data);
|
|
break;
|
|
}
|
|
case 'f':
|
|
ereport(DEBUG2,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] current CN/DN stat is finished. node: %s", handle->remoteNodeName)));
|
|
*isFinished = true;
|
|
break;
|
|
case 'Z':
|
|
if (*hasError) {
|
|
ereport(LOG,
|
|
(errmsg("[UniqueSQL] receive stat: get error message. node %s, %s",
|
|
handle->remoteNodeName,
|
|
handle->error ? handle->error : "")));
|
|
*isFinished = true;
|
|
}
|
|
break;
|
|
default:
|
|
ereport(LOG,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] receive stat: unexpected message type %c, node: %s",
|
|
msgType,
|
|
handle->remoteNodeName)));
|
|
*hasError = true;
|
|
*isFinished = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @Description: get unique sql stat from remote node(CN/DN)
|
|
* @return - true if successful, else false
|
|
*/
|
|
static bool RecvUniqueSQLStat(UniqueSQL* unique_sql_array, int arr_size, PGXCNodeAllHandles* pgxc_handles, bool is_cn)
|
|
{
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] try to receive unique sql stat from remote :%s", is_cn ? "CN" : "DN")));
|
|
|
|
int conn_count;
|
|
PGXCNodeHandle** handles = NULL;
|
|
|
|
conn_count = UniqueSq::get_conn_count_from_all_handles(pgxc_handles, is_cn);
|
|
handles = UniqueSq::get_handles_from_all_handles(pgxc_handles, is_cn);
|
|
if (conn_count > 0 && handles == NULL) {
|
|
ereport(
|
|
LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] receive stat: invalid remote connection handler array")));
|
|
return false;
|
|
}
|
|
|
|
for (int i = 0; i < conn_count; i++) {
|
|
PGXCNodeHandle* handle = handles[i];
|
|
|
|
if (handle == NULL) {
|
|
ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] receive stat: invalid remote connection handler")));
|
|
return false;
|
|
}
|
|
|
|
bool hasError = false;
|
|
bool isFinished = false;
|
|
while (!pgxc_node_receive(1, &handle, NULL)) {
|
|
handle_message_from_remote(unique_sql_array, arr_size, handle, &hasError, &isFinished);
|
|
if (isFinished) {
|
|
break;
|
|
}
|
|
}
|
|
handle->state = DN_CONNECTION_STATE_IDLE;
|
|
|
|
if (hasError) {
|
|
ereport(LOG,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] receive stat: fetch failed, node: %s", handle->remoteNodeName)));
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* @Description: wrapper method to get CN/DN count from PGXCNodeAllHandles
|
|
*/
|
|
int UniqueSq::get_conn_count_from_all_handles(PGXCNodeAllHandles* pgxc_handles, bool is_cn)
|
|
{
|
|
if (pgxc_handles != NULL) {
|
|
return is_cn ? pgxc_handles->co_conn_count : pgxc_handles->dn_conn_count;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* @Description: wrapper method to get CN/DN handle from PGXCNodeAllHandles
|
|
*/
|
|
PGXCNodeHandle** UniqueSq::get_handles_from_all_handles(PGXCNodeAllHandles* pgxc_handles, bool is_cn)
|
|
{
|
|
if (pgxc_handles != NULL) {
|
|
return is_cn ? pgxc_handles->coord_handles : pgxc_handles->datanode_handles;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
/**
|
|
* @Description: send unique sql keys to remote CN/DNs
|
|
* @return - return true if send keys to remote node successfully
|
|
*/
|
|
static bool SendUniqueSQLIDsToRemote(PGXCNodeAllHandles* pgxc_handles, uint32* cn_ids, Oid* user_ids, uint64* query_ids,
|
|
uint32* slot_indexes, uint count, bool is_cn)
|
|
{
|
|
int conn_count;
|
|
PGXCNodeHandle** handles = NULL;
|
|
|
|
conn_count = UniqueSq::get_conn_count_from_all_handles(pgxc_handles, is_cn);
|
|
handles = UniqueSq::get_handles_from_all_handles(pgxc_handles, is_cn);
|
|
if (conn_count > 0 && handles == NULL) {
|
|
ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] send key: invalid remote connection handler array")));
|
|
return false;
|
|
}
|
|
|
|
for (int i = 0; i < conn_count; i++) {
|
|
PGXCNodeHandle* handle = handles[i];
|
|
if (handle == NULL) {
|
|
ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] send key: invalid remote connection handler")));
|
|
return false;
|
|
}
|
|
|
|
handle->state = DN_CONNECTION_STATE_IDLE;
|
|
if (!SendUniqueSQLIds(handle, cn_ids, user_ids, query_ids, slot_indexes, count)) {
|
|
/* connection issue */
|
|
ereport(
|
|
LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] send key: failed, node: %s", handle->remoteNodeName)));
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* @Description: get unique sql stat from CNs/DNs
|
|
* including:
|
|
* - Cache/IO
|
|
* - Row activities
|
|
* - etc
|
|
*
|
|
* @return - return false if failed
|
|
*/
|
|
static bool GetUniqueSQLStatFromRemote(UniqueSQL* unique_sql_array, int arr_size, PGXCNodeAllHandles* pgxc_handles,
|
|
uint32* cn_ids, Oid* user_ids, uint64* query_ids, uint32* slot_indexes, uint count)
|
|
{
|
|
Assert(pgxc_handles);
|
|
Assert(unique_sql_array);
|
|
|
|
/* 1, send all unique sql ids to CN/DN */
|
|
if (!SendUniqueSQLIDsToRemote(pgxc_handles, cn_ids, user_ids, query_ids, slot_indexes, count, true) ||
|
|
!SendUniqueSQLIDsToRemote(pgxc_handles, cn_ids, user_ids, query_ids, slot_indexes, count, false)) {
|
|
return false;
|
|
}
|
|
|
|
/* 2, receive cn/dn's result, append the result to unique sql entry */
|
|
if (RecvUniqueSQLStat(unique_sql_array, arr_size, pgxc_handles, true) &&
|
|
RecvUniqueSQLStat(unique_sql_array, arr_size, pgxc_handles, false)) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* @Description: copy unique sql entry from hash table to pre-allocated buffer
|
|
* @return - void
|
|
*/
|
|
static void copy_unique_sql_entry(UniqueSQL* unique_sql_array, int num, int i, UniqueSQL* entry)
|
|
{
|
|
errno_t rc = EOK;
|
|
|
|
unique_sql_array[i].key.cn_id = entry->key.cn_id;
|
|
unique_sql_array[i].key.user_id = entry->key.user_id;
|
|
unique_sql_array[i].key.unique_sql_id = entry->key.unique_sql_id;
|
|
|
|
unique_sql_array[i].calls = entry->calls;
|
|
|
|
if (need_normalize_unique_string() && entry->unique_sql != NULL) {
|
|
unique_sql_array[i].unique_sql = (char*)(unique_sql_array + num) + i * UNIQUE_SQL_MAX_LEN;
|
|
rc = memcpy_s(unique_sql_array[i].unique_sql, UNIQUE_SQL_MAX_LEN, entry->unique_sql, strlen(entry->unique_sql));
|
|
securec_check(rc, "\0", "\0");
|
|
} else {
|
|
unique_sql_array[i].unique_sql = NULL;
|
|
}
|
|
|
|
// elapse time
|
|
unique_sql_array[i].elapse_time.total_time = entry->elapse_time.total_time;
|
|
unique_sql_array[i].elapse_time.min_time = entry->elapse_time.min_time;
|
|
unique_sql_array[i].elapse_time.max_time = entry->elapse_time.max_time;
|
|
|
|
// row activity
|
|
unique_sql_array[i].row_activity.returned_rows = entry->row_activity.returned_rows;
|
|
unique_sql_array[i].row_activity.tuples_fetched = entry->row_activity.tuples_fetched;
|
|
unique_sql_array[i].row_activity.tuples_returned = entry->row_activity.tuples_returned;
|
|
unique_sql_array[i].row_activity.tuples_inserted = entry->row_activity.tuples_inserted;
|
|
unique_sql_array[i].row_activity.tuples_updated = entry->row_activity.tuples_updated;
|
|
unique_sql_array[i].row_activity.tuples_deleted = entry->row_activity.tuples_deleted;
|
|
|
|
// Cache/IO
|
|
unique_sql_array[i].cache_io.blocks_fetched = entry->cache_io.blocks_fetched;
|
|
unique_sql_array[i].cache_io.blocks_hit = entry->cache_io.blocks_hit;
|
|
|
|
// parse info
|
|
unique_sql_array[i].parse.soft_parse = entry->parse.soft_parse;
|
|
unique_sql_array[i].parse.hard_parse = entry->parse.hard_parse;
|
|
|
|
// sort&hash work mem info
|
|
unique_sql_array[i].sort_state.counts = entry->sort_state.counts;
|
|
unique_sql_array[i].sort_state.total_time = entry->sort_state.total_time;
|
|
unique_sql_array[i].sort_state.used_work_mem = entry->sort_state.used_work_mem;
|
|
unique_sql_array[i].sort_state.spill_counts = entry->sort_state.spill_counts;
|
|
unique_sql_array[i].sort_state.spill_size = entry->sort_state.spill_size;
|
|
|
|
unique_sql_array[i].hash_state.counts = entry->hash_state.counts;
|
|
unique_sql_array[i].hash_state.total_time = entry->hash_state.total_time;
|
|
unique_sql_array[i].hash_state.used_work_mem = entry->hash_state.used_work_mem;
|
|
unique_sql_array[i].hash_state.spill_counts = entry->hash_state.spill_counts;
|
|
unique_sql_array[i].hash_state.spill_size = entry->hash_state.spill_size;
|
|
|
|
// time Info
|
|
rc = memcpy_s(&unique_sql_array[i].timeInfo, sizeof(UniqueSQLTime), &entry->timeInfo, sizeof(UniqueSQLTime));
|
|
securec_check(rc, "\0", "\0");
|
|
|
|
/* if is local, will fetch more information from CN/DNs */
|
|
unique_sql_array[i].is_local = entry->is_local;
|
|
}
|
|
|
|
/**
|
|
* @Description: send unique sql key to remote node
|
|
* @return - false if failed
|
|
*/
|
|
static bool package_and_send_unqiue_sql_key_to_remote(
|
|
UniqueSQL* unique_sql_array, int num, PGXCNodeAllHandles* pgxc_handles)
|
|
{
|
|
/* key = CN id + user id + query id */
|
|
uint32 cn_ids[UNIQUE_SQL_IDS_ARRAY_SIZE] = {0};
|
|
Oid user_ids[UNIQUE_SQL_IDS_ARRAY_SIZE] = {0};
|
|
uint64 query_ids[UNIQUE_SQL_IDS_ARRAY_SIZE] = {0};
|
|
uint32 slot_indexes[UNIQUE_SQL_IDS_ARRAY_SIZE] = {0};
|
|
|
|
/* only send unique sql id to remote CN/DN if entry is local */
|
|
int array_idx = 0;
|
|
int rc = 0;
|
|
const int N32_BIT = 32;
|
|
|
|
for (int i = 0; i < num; i++) {
|
|
if (!unique_sql_array[i].is_local) {
|
|
continue;
|
|
}
|
|
|
|
int index = array_idx++;
|
|
index = index % UNIQUE_SQL_IDS_ARRAY_SIZE;
|
|
|
|
uint32 n32 = 0;
|
|
cn_ids[index] = htonl(unique_sql_array[i].key.cn_id);
|
|
|
|
/* -- unique sql id -- */
|
|
/* high order half */
|
|
n32 = (uint32)(unique_sql_array[i].key.unique_sql_id >> N32_BIT);
|
|
n32 = htonl(n32);
|
|
rc = memcpy_s(query_ids + index, sizeof(uint32), &n32, sizeof(uint32));
|
|
securec_check(rc, "\0", "\0");
|
|
|
|
/* low order half */
|
|
n32 = (uint32)unique_sql_array[i].key.unique_sql_id;
|
|
n32 = htonl(n32);
|
|
rc = memcpy_s((unsigned char*)(query_ids + index) + sizeof(uint32), sizeof(uint32), &n32, sizeof(uint32));
|
|
securec_check(rc, "\0", "\0");
|
|
|
|
user_ids[index] = htonl(unique_sql_array[i].key.user_id);
|
|
|
|
/* store original array index */
|
|
slot_indexes[index] = htonl(i);
|
|
|
|
/* each time send 10 unique sql ids or
|
|
* at the end of unique_sql_array
|
|
*/
|
|
if ((index == UNIQUE_SQL_IDS_ARRAY_SIZE - 1)) {
|
|
if (!GetUniqueSQLStatFromRemote(unique_sql_array,
|
|
num,
|
|
pgxc_handles,
|
|
cn_ids,
|
|
user_ids,
|
|
query_ids,
|
|
slot_indexes,
|
|
UNIQUE_SQL_IDS_ARRAY_SIZE)) {
|
|
return false;
|
|
}
|
|
} else if (i == num - 1) {
|
|
if (!GetUniqueSQLStatFromRemote(
|
|
unique_sql_array, num, pgxc_handles, cn_ids, user_ids, query_ids, slot_indexes, index + 1)) {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* @Description: get unique sql stat from CNs and DNs
|
|
* @in unique_sql_array - local pre-allocated unique sql array
|
|
* @in num - unique sql entry counts on local CN
|
|
* @return - true if success, else will be false
|
|
*/
|
|
static bool get_unique_sql_stat_from_remote(UniqueSQL* unique_sql_array, int num)
|
|
{
|
|
/*
|
|
* get row activity and cache/io from DN nodes,
|
|
* 1, each time send 10 unique sql ids to DN;
|
|
* 2, DN replies the unique sql's cache/IO and
|
|
* row activities
|
|
* get all dn connection
|
|
*/
|
|
List* cn_list = GetAllCoordNodes();
|
|
PGXCNodeAllHandles* pgxc_handles = NULL;
|
|
|
|
PG_TRY();
|
|
{
|
|
pgxc_handles = get_handles(NULL, cn_list, false);
|
|
}
|
|
PG_CATCH();
|
|
{
|
|
release_pgxc_handles(pgxc_handles);
|
|
list_free_ext(cn_list);
|
|
|
|
PG_RE_THROW();
|
|
}
|
|
PG_END_TRY();
|
|
Assert(pgxc_handles != NULL);
|
|
|
|
if (!package_and_send_unqiue_sql_key_to_remote(unique_sql_array, num, pgxc_handles)) {
|
|
release_pgxc_handles(pgxc_handles);
|
|
list_free_ext(cn_list);
|
|
|
|
ereport(ERROR,
|
|
(errmodule(MOD_INSTR), errmsg("[UniqueSQL] during get stat from remote, failed to send/recv data!")));
|
|
|
|
return false;
|
|
}
|
|
|
|
/* free connection */
|
|
release_pgxc_handles(pgxc_handles);
|
|
list_free_ext(cn_list);
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* @Description: get unique sql's stat, and save them to pre-allocated array
|
|
* @out num - valid unique sql entry count
|
|
* @return - the pre-allocated buffer
|
|
*/
|
|
void* GetUniqueSQLStat(long* num)
|
|
{
|
|
// must enable pgstat_track_counts to track row activity
|
|
if (!is_unique_sql_enabled()) {
|
|
ereport(LOG,
|
|
(errmsg("[UniqueSQL] GUC parameter 'enable_resource_track' "
|
|
"or 'instr_unique_sql_count' is zero, unique sql view will be empty!")));
|
|
*num = 0;
|
|
return NULL;
|
|
}
|
|
if (g_instance.stat_cxt.UniqueSQLHashtbl == NULL) {
|
|
*num = 0;
|
|
return NULL;
|
|
}
|
|
int i = 0;
|
|
HASH_SEQ_STATUS hash_seq;
|
|
UniqueSQL* unique_sql_array = NULL;
|
|
UniqueSQL* entry = NULL;
|
|
|
|
for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) {
|
|
LWLockAcquire(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i), LW_SHARED);
|
|
}
|
|
|
|
*num = hash_get_num_entries(g_instance.stat_cxt.UniqueSQLHashtbl);
|
|
if (*num > 0) {
|
|
/* memory format: [entry_1] [entry_2] [entry_3] ... | [sql_1] [sql_2] [sql_3] ... */
|
|
int unique_sql_str_len = (IS_PGXC_COORDINATOR || IS_SINGLE_NODE) ? UNIQUE_SQL_MAX_LEN : 0;
|
|
long array_size = *num * (sizeof(UniqueSQL) + unique_sql_str_len);
|
|
if (array_size/(*num) != (long)(sizeof(UniqueSQL) + unique_sql_str_len)) {
|
|
ereport(ERROR, (errmsg("[UniqueSQL] palloc0 size overflow!")));
|
|
}
|
|
unique_sql_array = (UniqueSQL*)palloc0_noexcept(array_size);
|
|
if (unique_sql_array == NULL) {
|
|
for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) {
|
|
LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i));
|
|
}
|
|
|
|
ereport(ERROR, (errmsg("[UniqueSQL] palloc0 error when querying unique sql stat!")));
|
|
}
|
|
|
|
i = 0;
|
|
hash_seq_init(&hash_seq, g_instance.stat_cxt.UniqueSQLHashtbl);
|
|
while ((entry = (UniqueSQL*)hash_seq_search(&hash_seq)) != NULL && i < *num) {
|
|
/* DN - copy all elements; CN - only copy local unique sql */
|
|
if (IS_PGXC_DATANODE || (IS_PGXC_COORDINATOR && entry->is_local)) {
|
|
copy_unique_sql_entry(unique_sql_array, *num, i, entry);
|
|
i++;
|
|
}
|
|
}
|
|
|
|
*num = i;
|
|
}
|
|
|
|
for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) {
|
|
LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i));
|
|
}
|
|
|
|
if (*num == 0) {
|
|
return NULL;
|
|
}
|
|
|
|
if (IS_PGXC_DATANODE) {
|
|
return unique_sql_array;
|
|
}
|
|
|
|
if (!get_unique_sql_stat_from_remote(unique_sql_array, *num)) {
|
|
*num = 0;
|
|
}
|
|
return unique_sql_array;
|
|
}
|
|
|
|
static void create_tuple_entry(TupleDesc tupdesc)
|
|
{
|
|
int num = 0;
|
|
int i = 0;
|
|
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "node_name", NAMEOID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "node_id", INT4OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "user_name", NAMEOID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "user_id", OIDOID, -1, 0);
|
|
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "unique_sql_id", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "query", TEXTOID, -1, 0);
|
|
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_calls", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "min_elapse_time", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "max_elapse_time", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "total_elapse_time", INT8OID, -1, 0);
|
|
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_returned_rows", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_tuples_fetched", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_tuples_returned", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_tuples_inserted", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_tuples_updated", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_tuples_deleted", INT8OID, -1, 0);
|
|
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_blocks_fetched", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_blocks_hit", INT8OID, -1, 0);
|
|
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_soft_parse", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_hard_parse", INT8OID, -1, 0);
|
|
|
|
for (num = 0; num < TOTAL_TIME_INFO_TYPES; num++) {
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, TimeInfoTypeName[num], INT8OID, -1, 0);
|
|
}
|
|
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "sort_count", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "sort_time", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "sort_mem_used", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "sort_spill_count", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "sort_spill_size", INT8OID, -1, 0);
|
|
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "hash_count", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "hash_time", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "hash_mem_used", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "hash_spill_count", INT8OID, -1, 0);
|
|
TupleDescInitEntry(tupdesc, (AttrNumber)++i, "hash_spill_size", INT8OID, -1, 0);
|
|
}
|
|
|
|
static void set_tuple_cn_node_name(UniqueSQL* unique_sql, Datum* values, int* i)
|
|
{
|
|
// cn node name
|
|
if (IS_PGXC_COORDINATOR || IS_SINGLE_NODE) {
|
|
char* node_name = get_pgxc_node_name_by_node_id(unique_sql->key.cn_id, false);
|
|
if (node_name != NULL) {
|
|
values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum(node_name));
|
|
pfree(node_name);
|
|
} else {
|
|
#ifdef ENABLE_MULTIPLE_NODES
|
|
values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum("*REMOVED_NODE*"));
|
|
#else
|
|
values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum(g_instance.attr.attr_common.PGXCNodeName));
|
|
#endif
|
|
}
|
|
} else {
|
|
values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum(""));
|
|
}
|
|
}
|
|
|
|
static void set_tuple_user_name(UniqueSQL* unique_sql, Datum* values, int* i)
|
|
{
|
|
char user_name[NAMEDATALEN] = {0};
|
|
|
|
// user name
|
|
if (IS_PGXC_COORDINATOR || IS_SINGLE_NODE) {
|
|
if (GetRoleName(unique_sql->key.user_id, user_name, sizeof(user_name)) != NULL) {
|
|
values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum(user_name));
|
|
} else {
|
|
values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum("*REMOVED_USER*"));
|
|
}
|
|
} else {
|
|
values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum(""));
|
|
}
|
|
}
|
|
|
|
static void set_tuple_unique_sql(UniqueSQL* unique_sql, Datum* values, int* i)
|
|
{
|
|
if (unique_sql->unique_sql != NULL) {
|
|
values[(*i)++] = CStringGetTextDatum(unique_sql->unique_sql);
|
|
} else {
|
|
values[(*i)++] = CStringGetTextDatum("");
|
|
}
|
|
}
|
|
|
|
static void set_tuple_value(UniqueSQL* unique_sql, Datum* values, bool* nulls, int arr_size)
|
|
{
|
|
int i = 0;
|
|
int num = 0;
|
|
|
|
set_tuple_cn_node_name(unique_sql, values, &i);
|
|
values[i++] = UInt32GetDatum(unique_sql->key.cn_id);
|
|
set_tuple_user_name(unique_sql, values, &i);
|
|
|
|
// basic info
|
|
values[i++] = ObjectIdGetDatum(unique_sql->key.user_id);
|
|
values[i++] = Int64GetDatum(unique_sql->key.unique_sql_id);
|
|
|
|
set_tuple_unique_sql(unique_sql, values, &i);
|
|
values[i++] = Int64GetDatum(unique_sql->calls);
|
|
|
|
// response time
|
|
values[i++] = Int64GetDatum(unique_sql->elapse_time.min_time);
|
|
values[i++] = Int64GetDatum(unique_sql->elapse_time.max_time);
|
|
values[i++] = Int64GetDatum(unique_sql->elapse_time.total_time);
|
|
|
|
// row activity
|
|
values[i++] = Int64GetDatum(unique_sql->row_activity.returned_rows);
|
|
values[i++] = Int64GetDatum(unique_sql->row_activity.tuples_fetched);
|
|
values[i++] = Int64GetDatum(unique_sql->row_activity.tuples_returned);
|
|
values[i++] = Int64GetDatum(unique_sql->row_activity.tuples_inserted);
|
|
values[i++] = Int64GetDatum(unique_sql->row_activity.tuples_updated);
|
|
values[i++] = Int64GetDatum(unique_sql->row_activity.tuples_deleted);
|
|
|
|
// cache/IO
|
|
values[i++] = Int64GetDatum(unique_sql->cache_io.blocks_fetched);
|
|
values[i++] = Int64GetDatum(unique_sql->cache_io.blocks_hit);
|
|
|
|
// parse Info
|
|
values[i++] = Int64GetDatum(unique_sql->parse.soft_parse);
|
|
values[i++] = Int64GetDatum(unique_sql->parse.hard_parse);
|
|
|
|
// time Info
|
|
for (num = 0; num < TOTAL_TIME_INFO_TYPES; num++) {
|
|
values[i++] = Int64GetDatum(unique_sql->timeInfo.TimeInfoArray[num]);
|
|
}
|
|
|
|
// sort&hash work mem Info
|
|
values[i++] = Int64GetDatum(unique_sql->sort_state.counts);
|
|
values[i++] = Int64GetDatum(unique_sql->sort_state.total_time);
|
|
values[i++] = Int64GetDatum(unique_sql->sort_state.used_work_mem);
|
|
values[i++] = Int64GetDatum(unique_sql->sort_state.spill_counts);
|
|
values[i++] = Int64GetDatum(unique_sql->sort_state.spill_size);
|
|
|
|
values[i++] = Int64GetDatum(unique_sql->hash_state.counts);
|
|
values[i++] = Int64GetDatum(unique_sql->hash_state.total_time);
|
|
values[i++] = Int64GetDatum(unique_sql->hash_state.used_work_mem);
|
|
values[i++] = Int64GetDatum(unique_sql->hash_state.spill_counts);
|
|
values[i++] = Int64GetDatum(unique_sql->hash_state.spill_size);
|
|
|
|
Assert(arr_size == i);
|
|
}
|
|
|
|
/*
|
|
* get_instr_unique_sql - C function to get unique sql stat info
|
|
*/
|
|
Datum get_instr_unique_sql(PG_FUNCTION_ARGS)
|
|
{
|
|
FuncCallContext* funcctx = NULL;
|
|
long num = 0;
|
|
|
|
#define INSTRUMENTS_UNIQUE_SQL_ATTRNUM (30 + TOTAL_TIME_INFO_TYPES)
|
|
|
|
if (!superuser()) {
|
|
ereport(
|
|
ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("only system admin can query unique sql view"))));
|
|
}
|
|
|
|
if (SRF_IS_FIRSTCALL()) {
|
|
MemoryContext oldcontext = NULL;
|
|
TupleDesc tupdesc = NULL;
|
|
|
|
funcctx = SRF_FIRSTCALL_INIT();
|
|
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
|
|
|
|
tupdesc = CreateTemplateTupleDesc(INSTRUMENTS_UNIQUE_SQL_ATTRNUM, false);
|
|
create_tuple_entry(tupdesc);
|
|
|
|
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
|
|
funcctx->user_fctx = GetUniqueSQLStat(&num);
|
|
funcctx->max_calls = num;
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
if (funcctx->max_calls == 0) {
|
|
if (funcctx->user_fctx) {
|
|
pfree_ext(funcctx->user_fctx);
|
|
}
|
|
SRF_RETURN_DONE(funcctx);
|
|
}
|
|
}
|
|
|
|
funcctx = SRF_PERCALL_SETUP();
|
|
if (funcctx->call_cntr < funcctx->max_calls) {
|
|
Datum values[INSTRUMENTS_UNIQUE_SQL_ATTRNUM];
|
|
bool nulls[INSTRUMENTS_UNIQUE_SQL_ATTRNUM] = {false};
|
|
HeapTuple tuple = NULL;
|
|
int rc = 0;
|
|
|
|
UniqueSQL* unique_sql = (UniqueSQL*)funcctx->user_fctx + funcctx->call_cntr;
|
|
|
|
rc = memset_s(values, sizeof(values), 0, sizeof(values));
|
|
securec_check(rc, "\0", "\0");
|
|
rc = memset_s(nulls, sizeof(nulls), 0, sizeof(nulls));
|
|
securec_check(rc, "\0", "\0");
|
|
|
|
set_tuple_value(unique_sql, values, nulls, INSTRUMENTS_UNIQUE_SQL_ATTRNUM);
|
|
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
|
|
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
|
|
} else {
|
|
if (funcctx->user_fctx) {
|
|
pfree_ext(funcctx->user_fctx);
|
|
}
|
|
SRF_RETURN_DONE(funcctx);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* GenerateUniqueSQLInfo - generate unique sql info
|
|
*
|
|
* such as unique query id/normalized unique sql string
|
|
*/
|
|
void GenerateUniqueSQLInfo(const char* sql, Query* query)
|
|
{
|
|
/* won't record unique sql info during abort transaction stat,
|
|
*
|
|
* below scenario also won't record unique sql stat:
|
|
* - begin;
|
|
* - select * from not_exist_relation;(cause abort stat)
|
|
* - rollback;
|
|
*
|
|
* rollback is run successfully, but also will ignore it, as now
|
|
* we have limitation, in abort stat, can't access system relation.
|
|
* refer to the assert in method "relation_open"
|
|
*/
|
|
if (sql == NULL || query == NULL || g_instance.stat_cxt.UniqueSQLHashtbl == NULL || !is_local_unique_sql() ||
|
|
IsAbortedTransactionBlockState()) {
|
|
return;
|
|
}
|
|
|
|
if (IS_UNIQUE_SQL_TRACK_TOP && IsTopUniqueSQL()) {
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] unique id: %lu, already has top SQL", u_sess->unique_sql_cxt.unique_sql_id)));
|
|
return;
|
|
}
|
|
|
|
const char* current_sql = NULL;
|
|
if (u_sess->unique_sql_cxt.is_multi_unique_sql) {
|
|
current_sql = u_sess->unique_sql_cxt.curr_single_unique_sql;
|
|
} else {
|
|
current_sql = sql;
|
|
}
|
|
|
|
if (current_sql == NULL) {
|
|
return;
|
|
}
|
|
|
|
u_sess->unique_sql_cxt.unique_sql_id = generate_unique_queryid(query, current_sql);
|
|
query->uniqueSQLId = u_sess->unique_sql_cxt.unique_sql_id;
|
|
|
|
if (!OidIsValid(u_sess->unique_sql_cxt.unique_sql_cn_id)) {
|
|
Oid node_oid = get_pgxc_nodeoid(g_instance.attr.attr_common.PGXCNodeName);
|
|
u_sess->unique_sql_cxt.unique_sql_cn_id = get_pgxc_node_id(node_oid);
|
|
}
|
|
|
|
u_sess->unique_sql_cxt.unique_sql_user_id = GetUserId();
|
|
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] generate unique id: %lu, cn id: %u, user id: %u",
|
|
u_sess->unique_sql_cxt.unique_sql_id,
|
|
u_sess->unique_sql_cxt.unique_sql_cn_id,
|
|
u_sess->unique_sql_cxt.unique_sql_user_id)));
|
|
|
|
UpdateUniqueSQLStat(query, current_sql, 0);
|
|
|
|
/* if track top enabled, only TOP SQL will generate unique sql id */
|
|
if (IS_UNIQUE_SQL_TRACK_TOP) {
|
|
SetIsTopUniqueSQL(true);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* unique_sql_post_parse_analyze - generate sql id
|
|
*/
|
|
void UniqueSq::unique_sql_post_parse_analyze(ParseState* pstate, Query* query)
|
|
{
|
|
Assert(IS_PGXC_COORDINATOR || IS_SINGLE_NODE);
|
|
|
|
/* generate unique sql id */
|
|
if (is_unique_sql_enabled() && pstate != NULL) {
|
|
GenerateUniqueSQLInfo(pstate->p_sourcetext, query);
|
|
}
|
|
|
|
if (g_prev_post_parse_analyze_hook != NULL) {
|
|
g_prev_post_parse_analyze_hook(pstate, query);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* get Query from query_list, and then set unique sql id to session
|
|
*/
|
|
static void SetLocalUniqueSQLId(List* query_list)
|
|
{
|
|
if (query_list == NULL || IsAbortedTransactionBlockState()) {
|
|
return;
|
|
}
|
|
|
|
Query* query = NULL;
|
|
if (list_length(query_list) > 0) {
|
|
if (IsA(linitial(query_list), Query)) {
|
|
query = (Query*)linitial(query_list);
|
|
if (query != NULL) {
|
|
u_sess->unique_sql_cxt.unique_sql_id = query->uniqueSQLId;
|
|
|
|
if (!OidIsValid(u_sess->unique_sql_cxt.unique_sql_cn_id)) {
|
|
Oid node_oid = get_pgxc_nodeoid(g_instance.attr.attr_common.PGXCNodeName);
|
|
u_sess->unique_sql_cxt.unique_sql_cn_id = get_pgxc_node_id(node_oid);
|
|
}
|
|
|
|
u_sess->unique_sql_cxt.unique_sql_user_id = GetUserId();
|
|
|
|
/* for BE message, only can have one SQL each time,
|
|
* so set top to true, then n_calls can be Updated
|
|
* in PortalRun
|
|
*/
|
|
SetIsTopUniqueSQL(true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* set current prepared statement's unique sql id
|
|
*
|
|
* 1, get portal by name
|
|
* 2, fetch prepared statement from local prepared statement cache
|
|
* 3, get prepared statement's query list
|
|
* 4, for utility stmt:
|
|
* only should has one Query in list(ToDo: need confirm)
|
|
* 5, for other stmt:
|
|
* all Query's unique sql id should be same
|
|
*/
|
|
void SetUniqueSQLIdFromPortal(Portal portal, CachedPlanSource* unnamed_psrc)
|
|
{
|
|
if (!is_unique_sql_enabled() || !is_local_unique_sql() || portal == NULL) {
|
|
return;
|
|
}
|
|
|
|
List* query_list = NULL;
|
|
if (portal->prepStmtName && portal->prepStmtName[0] != '\0') {
|
|
/* for named prepared statement */
|
|
PreparedStatement* pstmt = FetchPreparedStatement(portal->prepStmtName, true);
|
|
if (pstmt != NULL && pstmt->plansource != NULL) {
|
|
query_list = pstmt->plansource->query_list;
|
|
}
|
|
} else if (unnamed_psrc != NULL) {
|
|
/* for unnamed prepared statement */
|
|
query_list = unnamed_psrc->query_list;
|
|
}
|
|
|
|
SetLocalUniqueSQLId(query_list);
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] unique id: %lu, set unique sql id for PBE", u_sess->unique_sql_cxt.unique_sql_id)));
|
|
}
|
|
|
|
/*
|
|
* set unique sql id from lightProxy::runMsg & bind
|
|
*/
|
|
void SetUniqueSQLIdFromCachedPlanSource(CachedPlanSource* cplan)
|
|
{
|
|
if (!is_unique_sql_enabled() || !is_local_unique_sql() || cplan == NULL) {
|
|
return;
|
|
}
|
|
|
|
SetLocalUniqueSQLId(cplan->query_list);
|
|
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] unique id: %lu, set unique sql id for PBE", u_sess->unique_sql_cxt.unique_sql_id)));
|
|
}
|
|
|
|
static void package_unique_sql_msg_on_remote(StringInfoData* buf, UniqueSQL* entry, uint32 recv_slot_index,
|
|
uint32 recv_cn_id, Oid recv_user_id, uint64 recv_unique_sql_id)
|
|
{
|
|
/* now we send unique sql stat one by one, later maybe we can
|
|
* send batch of unique sql stat
|
|
* Send one unique sql stat each time, message format:
|
|
* 1 | + 4 + 4 + 8 + 6*8 +
|
|
* 'r' | + slot index + user id + unique sql id + row activity +
|
|
*
|
|
* 2*8 + 2*8 + TOTAL_TIME_INFO_TYPES*8 |
|
|
* cache/IO + parse info + time info |
|
|
*/
|
|
pq_beginmessage(buf, 'r');
|
|
|
|
/* send back slot indexes(from original CN) */
|
|
pq_sendint(buf, recv_slot_index, sizeof(uint32));
|
|
|
|
/* key */
|
|
pq_sendint(buf, recv_cn_id, sizeof(uint32));
|
|
pq_sendint(buf, recv_user_id, sizeof(uint32));
|
|
pq_sendint64(buf, recv_unique_sql_id);
|
|
|
|
/* row activity */
|
|
pq_sendint64(buf, entry->row_activity.returned_rows);
|
|
pq_sendint64(buf, entry->row_activity.tuples_fetched);
|
|
pq_sendint64(buf, entry->row_activity.tuples_returned);
|
|
pq_sendint64(buf, entry->row_activity.tuples_inserted);
|
|
pq_sendint64(buf, entry->row_activity.tuples_updated);
|
|
pq_sendint64(buf, entry->row_activity.tuples_deleted);
|
|
|
|
/* Cache/IO */
|
|
pq_sendint64(buf, entry->cache_io.blocks_fetched);
|
|
pq_sendint64(buf, entry->cache_io.blocks_hit);
|
|
|
|
/* parse info */
|
|
pq_sendint64(buf, entry->parse.soft_parse);
|
|
pq_sendint64(buf, entry->parse.hard_parse);
|
|
|
|
/* time Info */
|
|
for (uint32 idx = 0; idx < TOTAL_TIME_INFO_TYPES; idx++) {
|
|
pq_sendint64(buf, entry->timeInfo.TimeInfoArray[idx]);
|
|
}
|
|
|
|
/* sort hash work mem info */
|
|
pq_sendint64(buf, entry->sort_state.counts);
|
|
pq_sendint64(buf, entry->sort_state.total_time);
|
|
pq_sendint64(buf, entry->sort_state.used_work_mem);
|
|
pq_sendint64(buf, entry->sort_state.spill_counts);
|
|
pq_sendint64(buf, entry->sort_state.spill_size);
|
|
|
|
pq_sendint64(buf, entry->hash_state.counts);
|
|
pq_sendint64(buf, entry->hash_state.total_time);
|
|
pq_sendint64(buf, entry->hash_state.used_work_mem);
|
|
pq_sendint64(buf, entry->hash_state.spill_counts);
|
|
pq_sendint64(buf, entry->hash_state.spill_size);
|
|
|
|
/* ... */
|
|
pq_endmessage(buf);
|
|
}
|
|
|
|
/*
|
|
* ReplyUniqueSQLsStat - DN reply unique SQL IDs stat info
|
|
*/
|
|
void ReplyUniqueSQLsStat(StringInfo msg, uint32 count)
|
|
{
|
|
ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] get unique sql count: %u", count)));
|
|
|
|
if (count > 0) {
|
|
StringInfoData buf;
|
|
for (uint32 i = 0; i < count; i++) {
|
|
uint32 recv_slot_index = (uint32)pq_getmsgint(msg, sizeof(uint32));
|
|
uint32 recv_cn_id = (uint32)pq_getmsgint(msg, sizeof(uint32));
|
|
Oid recv_user_id = (Oid)pq_getmsgint(msg, sizeof(uint32));
|
|
uint64 recv_unique_sql_id = (uint64)pq_getmsgint64(msg);
|
|
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] get cn_id: %u, user id: %u, unique sql id: %lu",
|
|
recv_cn_id,
|
|
recv_user_id,
|
|
recv_unique_sql_id)));
|
|
|
|
// get unique sql entry from HTAB
|
|
UniqueSQLKey key = {0, 0};
|
|
UniqueSQL* entry = NULL;
|
|
key.unique_sql_id = recv_unique_sql_id;
|
|
key.user_id = recv_user_id;
|
|
key.cn_id = recv_cn_id;
|
|
|
|
uint32 hashCode = uniqueSQLHashCode(&key, sizeof(key));
|
|
|
|
LockUniqueSQLHashPartition(hashCode, LW_SHARED);
|
|
entry = (UniqueSQL*)hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &key, HASH_FIND, NULL);
|
|
if (entry != NULL) {
|
|
package_unique_sql_msg_on_remote(
|
|
&buf, entry, recv_slot_index, recv_cn_id, recv_user_id, recv_unique_sql_id);
|
|
}
|
|
UnlockUniqueSQLHashPartition(hashCode);
|
|
}
|
|
|
|
pq_beginmessage(&buf, 'f');
|
|
pq_endmessage(&buf);
|
|
pq_flush();
|
|
|
|
if (buf.data) {
|
|
pfree(buf.data);
|
|
}
|
|
} else {
|
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid unique sql count: %u.", count)));
|
|
}
|
|
}
|
|
|
|
/*
|
|
* PrintPgStatTableCounter - print current PgStat_TableCounts
|
|
*
|
|
* L - last total stat counter after exit pgstat_report_stat
|
|
* T - the time when entering pgstat_report_stat
|
|
* C - current sql row stat counter
|
|
* for debug purposes
|
|
*/
|
|
void PrintPgStatTableCounter(char type, PgStat_TableCounts* stat)
|
|
{
|
|
if (!is_unique_sql_enabled() || !IS_PGXC_DATANODE) {
|
|
return;
|
|
}
|
|
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] unique id: %lu, row stat counter[%c] - "
|
|
"Returned: %ld, Fetched: %ld, "
|
|
"Inserted: %ld, Updated: %ld, Deleted: %ld,"
|
|
"Blocks_Fetched: %ld, Blocks_Hit: %ld",
|
|
u_sess->unique_sql_cxt.unique_sql_id,
|
|
type,
|
|
stat->t_tuples_returned,
|
|
stat->t_tuples_fetched,
|
|
stat->t_tuples_inserted,
|
|
stat->t_tuples_updated,
|
|
stat->t_tuples_deleted,
|
|
stat->t_blocks_fetched,
|
|
stat->t_blocks_hit)));
|
|
}
|
|
|
|
/*
|
|
* UpdateUniqueSQLStatOnRmote - on dn/cn, update unique sql stat,
|
|
* including row activity, Cache/IO, parse counter, etc
|
|
*
|
|
* main unique sql update entry:
|
|
* CN -> CN/DN (DDL)
|
|
* CN -> DN (DML)
|
|
*
|
|
* need consider below scenarios:
|
|
* 1, simple query(pgstat_report_stat will clean pgStatTabList
|
|
* by calling frequence)
|
|
* 2, transaction block, such as:
|
|
* begin;
|
|
* insert xxxx;
|
|
* delete XXX;
|
|
* commit;
|
|
*
|
|
* main implementation logic:
|
|
* - update last stat counter
|
|
* - new sql enter
|
|
* - current sql stat counter = current stat counter - last stat counter
|
|
* - update unique sql HTAB stat
|
|
* - update last stat counter
|
|
* - ...
|
|
* Notes:
|
|
* we reuse pgStatTabList, as each query's row activity is stored in it.
|
|
*/
|
|
void UpdateUniqueSQLStatOnRemote()
|
|
{
|
|
Assert(need_update_unique_sql_row_stat());
|
|
Assert(u_sess->attr.attr_resource.enable_resource_track &&
|
|
(u_sess->attr.attr_common.instr_unique_sql_count > 0));
|
|
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] unique id: %lu, update "
|
|
"unique sql stat on remote",
|
|
u_sess->unique_sql_cxt.unique_sql_id)));
|
|
|
|
/*
|
|
* Some SQLs doesn't have unique sql id on DN,
|
|
* for example:
|
|
* on CN: gsql - create table xxxx(generate unique sql ID)
|
|
* on DN will be:
|
|
* START TRANSACTION ISOLATION LEVEL read committed READ WRITE
|
|
* create table xxx(with unique SQL ID)
|
|
* PREPARE transaction 'xxx'
|
|
* COMMIT PREPARED 'xxx'
|
|
* we now only collect the stat counter for the create statement.
|
|
* so need update last stat counter for 'START TRANSACTION...'
|
|
*/
|
|
if (u_sess->unique_sql_cxt.unique_sql_id != 0) {
|
|
if (CalcSQLRowStatCounter(
|
|
u_sess->unique_sql_cxt.last_stat_counter, u_sess->unique_sql_cxt.current_table_counter)) {
|
|
UpdateUniqueSQLStat(NULL, NULL, 0, u_sess->unique_sql_cxt.current_table_counter);
|
|
} else {
|
|
/* update parse info */
|
|
UpdateUniqueSQLStat(NULL, NULL, 0, NULL);
|
|
}
|
|
}
|
|
}
|
|
|
|
/* is unique sql enabled
|
|
*
|
|
* status = enable_resource_track + unique sql on/off GUI parameter
|
|
*/
|
|
bool is_unique_sql_enabled()
|
|
{
|
|
return u_sess->attr.attr_resource.enable_resource_track && (u_sess->attr.attr_common.instr_unique_sql_count > 0);
|
|
}
|
|
|
|
/* by default, instr_unique_sql_count is 0,
|
|
* when unique sql is "set instr_unique_sql_count = 100",
|
|
* then won't generate unique sql id firstly(GUC/false), but
|
|
* when the sql enter UpdateUniqueSQLStat, is_unique_sql_enabled()
|
|
* will be true, then assert unique_sql_id != 0 will be failed in
|
|
* UpdateUniqueSQLStat.
|
|
*
|
|
* so in this case, we won't call UpdateUniqueSQLStat
|
|
*/
|
|
bool IsNeedUpdateUniqueSQLStat(Portal portal)
|
|
{
|
|
if (u_sess->unique_sql_cxt.unique_sql_id == 0) {
|
|
if (portal != NULL && portal->stmts != NULL && list_length(portal->stmts) == 1 &&
|
|
nodeTag(linitial(portal->stmts)) == T_VariableSetStmt) {
|
|
VariableSetStmt* stmt = (VariableSetStmt*)(linitial(portal->stmts));
|
|
if (stmt->name && strcmp(stmt->name, "instr_unique_sql_count") == 0) {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/* setter and getter for is_top_unique_sql */
|
|
void SetIsTopUniqueSQL(bool value)
|
|
{
|
|
u_sess->unique_sql_cxt.is_top_unique_sql = value;
|
|
}
|
|
|
|
bool IsTopUniqueSQL()
|
|
{
|
|
return u_sess->unique_sql_cxt.is_top_unique_sql;
|
|
}
|
|
|
|
/* return GUC unique sql tracking type */
|
|
int GetUniqueSQLTrackType()
|
|
{
|
|
return u_sess->attr.attr_common.unique_sql_track_type;
|
|
}
|
|
|
|
/* if CN and not connection from other CN, it is local unique SQL,
|
|
* else is remote unique SQL
|
|
*/
|
|
bool is_local_unique_sql()
|
|
{
|
|
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
|
|
return true;
|
|
}
|
|
|
|
if (IS_SINGLE_NODE) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
bool need_update_unique_sql_row_stat()
|
|
{
|
|
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool need_normalize_unique_string()
|
|
{
|
|
if (IS_PGXC_COORDINATOR || IS_SINGLE_NODE) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
enum CleanType { INVALIDTYPE = 0, ALL, BY_GUC, BY_USERID, BY_CNID };
|
|
|
|
/* WDR will take snapshot of unique sql, reset opration
|
|
* will remove unique sql entry from hash table.
|
|
* during two snapshot, there will be 'reset opration',
|
|
* then we cannot generate report between two snapshots.
|
|
*/
|
|
static void UpdateUniqueSQLValidStatTimestamp()
|
|
{
|
|
gs_lock_test_and_set_64(&g_instance.stat_cxt.NodeStatResetTime, GetCurrentTimestamp());
|
|
}
|
|
|
|
static void CleanupAllUniqueSqlEntry()
|
|
{
|
|
UniqueSQL* entry = NULL;
|
|
HASH_SEQ_STATUS hash_seq;
|
|
int i;
|
|
for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) {
|
|
LWLockAcquire(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i), LW_EXCLUSIVE);
|
|
}
|
|
|
|
/* remove entry, need update valid stat timestamp */
|
|
UpdateUniqueSQLValidStatTimestamp();
|
|
hash_seq_init(&hash_seq, g_instance.stat_cxt.UniqueSQLHashtbl);
|
|
while ((entry = (UniqueSQL*)hash_seq_search(&hash_seq)) != NULL) {
|
|
hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &entry->key, HASH_REMOVE, NULL);
|
|
}
|
|
for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) {
|
|
LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i));
|
|
}
|
|
}
|
|
|
|
static List* GetRemoveEntryList(int cleanType, uint64 cleanValue)
|
|
{
|
|
List* entryList = NIL;
|
|
UniqueSQL* entry = NULL;
|
|
HASH_SEQ_STATUS hash_seq;
|
|
int i;
|
|
for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) {
|
|
LWLockAcquire(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i), LW_SHARED);
|
|
}
|
|
|
|
hash_seq_init(&hash_seq, g_instance.stat_cxt.UniqueSQLHashtbl);
|
|
while ((entry = (UniqueSQL*)hash_seq_search(&hash_seq)) != NULL) {
|
|
if ((cleanType == BY_USERID && entry->key.user_id == (Oid)cleanValue) ||
|
|
(cleanType == BY_CNID && entry->key.cn_id == (uint32)cleanValue)) {
|
|
UniqueSQLKey* key = (UniqueSQLKey*)palloc0_noexcept(sizeof(UniqueSQLKey));
|
|
if (key == NULL) {
|
|
for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) {
|
|
LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i));
|
|
}
|
|
if (entryList != NIL) {
|
|
list_free(entryList);
|
|
}
|
|
|
|
ereport(ERROR, (errmodule(MOD_INSTR), errmsg("out of memory during allocating list element.")));
|
|
}
|
|
|
|
key->cn_id = entry->key.cn_id;
|
|
key->user_id = entry->key.user_id;
|
|
key->unique_sql_id = entry->key.unique_sql_id;
|
|
entryList = lappend(entryList, key);
|
|
}
|
|
}
|
|
|
|
for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) {
|
|
LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i));
|
|
}
|
|
|
|
return entryList;
|
|
}
|
|
|
|
static void CleanupInstrUniqueSqlEntry(int cleanType, uint64 cleanValue)
|
|
{
|
|
ListCell* cell = NULL;
|
|
List* removeList = NIL;
|
|
|
|
if (g_instance.stat_cxt.UniqueSQLHashtbl == NULL) {
|
|
ereport(LOG, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("unique sql hashtable is NULL"))));
|
|
return;
|
|
}
|
|
|
|
if (cleanType == ALL) {
|
|
CleanupAllUniqueSqlEntry();
|
|
} else {
|
|
removeList = GetRemoveEntryList(cleanType, cleanValue);
|
|
if (removeList != NIL) {
|
|
foreach (cell, removeList) {
|
|
UniqueSQLKey* key = (UniqueSQLKey*)lfirst(cell);
|
|
uint32 hashCode = uniqueSQLHashCode(key, sizeof(UniqueSQLKey));
|
|
LockUniqueSQLHashPartition(hashCode, LW_EXCLUSIVE);
|
|
|
|
/* remove entry, need update valid stat timestamp */
|
|
UpdateUniqueSQLValidStatTimestamp();
|
|
hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, key, HASH_REMOVE, NULL);
|
|
UnlockUniqueSQLHashPartition(hashCode);
|
|
}
|
|
list_free(removeList);
|
|
}
|
|
}
|
|
}
|
|
|
|
static int CheckParameter(const char* global, const char* cleanType, int64 value, bool* isGlobal)
|
|
{
|
|
int cleantype = INVALIDTYPE;
|
|
|
|
if (!is_unique_sql_enabled()) {
|
|
ereport(LOG, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("unique sql is disabled"))));
|
|
return INVALIDTYPE;
|
|
}
|
|
|
|
if (strcasecmp(global, "GLOBAL") == 0) {
|
|
*isGlobal = true;
|
|
if (!IS_PGXC_COORDINATOR && !IS_SINGLE_NODE) {
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WARNING), (errmsg("Cleanup global unique sql info only support on CN nodes."))));
|
|
}
|
|
} else if (strcasecmp(global, "LOCAL") == 0) {
|
|
*isGlobal = false;
|
|
} else {
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WARNING),
|
|
(errmsg("First parameter is wrong. USAGE: [GLOBAL/LOCAL],[ALL/BY_USERID/BY_CNID],[VALUE]"))));
|
|
}
|
|
|
|
if (strcasecmp(cleanType, "ALL") == 0) {
|
|
cleantype = ALL;
|
|
} else if (strcasecmp(cleanType, "BY_GUC") == 0) {
|
|
if (AmWLMWorkerProcess()) {
|
|
cleantype = BY_GUC;
|
|
} else {
|
|
ereport(WARNING, (errmsg("[UniqueSQL] BY_GUC is only used by GUC setting, use [ALL/BY_USERID/BY_CNID]")));
|
|
return INVALIDTYPE;
|
|
}
|
|
} else if (strcasecmp(cleanType, "BY_USERID") == 0) {
|
|
if (value <= 0 || value > MAX_UINT32) {
|
|
ereport(WARNING,
|
|
(errmsg("[UniqueSQL] third parameter of reset unique sql is out of range with BY_USERID")));
|
|
return INVALIDTYPE;
|
|
}
|
|
cleantype = BY_USERID;
|
|
} else if (strcasecmp(cleanType, "BY_CNID") == 0) {
|
|
if (value <= INT_MIN || value > MAX_UINT32) {
|
|
ereport(WARNING, (errmsg("[UniqueSQL] third parameter of reset unique sql is out of range with BY_CNID")));
|
|
return INVALIDTYPE;
|
|
}
|
|
cleantype = BY_CNID;
|
|
} else {
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WARNING),
|
|
(errmsg("Second parameter is wrong. USAGE:[GLOBAL/LOCAL],[ALL/BY_USERID/BY_CNID],[VALUE]"))));
|
|
}
|
|
return cleantype;
|
|
}
|
|
|
|
/* Reset unique sql stat info for the current database */
|
|
Datum reset_unique_sql(PG_FUNCTION_ARGS)
|
|
{
|
|
int cleantype;
|
|
ParallelFunctionState* state = NULL;
|
|
bool isGlobal = false;
|
|
char* global = text_to_cstring(PG_GETARG_TEXT_PP(0));
|
|
char* cleanType = text_to_cstring(PG_GETARG_TEXT_PP(1));
|
|
int64 cleanValue = PG_GETARG_INT64(2);
|
|
if (!superuser()) {
|
|
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("only system admin can reset unique sql"))));
|
|
}
|
|
|
|
cleantype = CheckParameter(global, cleanType, cleanValue, &isGlobal);
|
|
if (cleantype == INVALIDTYPE) {
|
|
PG_RETURN_INT64(0);
|
|
}
|
|
|
|
ereport(LOG,
|
|
(errmodule(MOD_INSTR),
|
|
errmsg("[UniqueSQL] clean unique sql, "
|
|
"clean scope: %s, clean type: %s, clean value: %lu!",
|
|
global,
|
|
cleanType,
|
|
(uint64)cleanValue)));
|
|
if (cleantype == BY_GUC) {
|
|
cleantype = BY_CNID;
|
|
cleanType = "BY_CNID";
|
|
Oid node_oid = get_pgxc_nodeoid(g_instance.attr.attr_common.PGXCNodeName);
|
|
cleanValue = get_pgxc_node_id(node_oid);
|
|
}
|
|
|
|
CleanupInstrUniqueSqlEntry(cleantype, cleanValue);
|
|
if (isGlobal && !IS_SINGLE_NODE) {
|
|
StringInfoData buf;
|
|
ExecNodes* exec_nodes = (ExecNodes*)makeNode(ExecNodes);
|
|
exec_nodes->baselocatortype = LOCATOR_TYPE_HASH;
|
|
exec_nodes->accesstype = RELATION_ACCESS_READ;
|
|
exec_nodes->primarynodelist = NIL;
|
|
exec_nodes->en_expr = NULL;
|
|
exec_nodes->en_relid = InvalidOid;
|
|
exec_nodes->nodeList = NIL;
|
|
|
|
initStringInfo(&buf);
|
|
appendStringInfo(&buf, "SELECT pg_catalog.reset_unique_sql('LOCAL','%s',%ld);", cleanType, cleanValue);
|
|
state = RemoteFunctionResultHandler(buf.data, exec_nodes, NULL, true, EXEC_ON_ALL_NODES, true);
|
|
FreeParallelFunctionState(state);
|
|
pfree_ext(buf.data);
|
|
}
|
|
ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] clean unique sql finished.")));
|
|
PG_RETURN_INT64(1);
|
|
}
|
|
|
|
/*
|
|
* Reset session variables before/after each unique sql
|
|
*/
|
|
void ResetCurrentUniqueSQL(bool need_reset_cn_id)
|
|
{
|
|
u_sess->unique_sql_cxt.unique_sql_id = 0;
|
|
u_sess->unique_sql_cxt.unique_sql_user_id = InvalidOid;
|
|
if (need_reset_cn_id) {
|
|
u_sess->unique_sql_cxt.unique_sql_cn_id = InvalidOid;
|
|
}
|
|
|
|
/* multi query only used in exec_simple_query */
|
|
u_sess->unique_sql_cxt.is_multi_unique_sql = false;
|
|
u_sess->unique_sql_cxt.curr_single_unique_sql = NULL;
|
|
}
|