/* * Copyright (c) 2020 Huawei Technologies Co.,Ltd. * * openGauss is licensed under Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: * * http://license.coscl.org.cn/MulanPSL2 * * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PSL v2 for more details. * ------------------------------------------------------------------------- * * instr_unique_sql.cpp * functions for unique SQL * * IDENTIFICATION * src/gausskernel/cbb/instruments/unique_sql/unique_sql.cpp * * ------------------------------------------------------------------------- */ #include "postgres.h" #include "knl/knl_variable.h" #include "instruments/instr_unique_sql.h" #include "instruments/instr_statement.h" #include "instruments/instr_slow_query.h" #include "instruments/unique_query.h" #include "utils/atomic.h" #include "executor/hashjoin.h" #include "utils/lsyscache.h" #include "utils/hsearch.h" #include "access/hash.h" #include "access/xact.h" #include "utils/memutils.h" #include "miscadmin.h" #include "pgxc/pgxc.h" #include "pgstat.h" #include "funcapi.h" #include "parser/analyze.h" #include "commands/prepare.h" #include "libpq/pqformat.h" #include "libpq/libpq.h" #include "commands/user.h" #include "instruments/unique_sql_basic.h" #include "instruments/instr_handle_mgr.h" #include "optimizer/streamplan.h" static bool need_reuse_unique_sql_id(Query *query); namespace UniqueSq { void unique_sql_post_parse_analyze(ParseState* pstate, Query* query); int get_conn_count_from_all_handles(PGXCNodeAllHandles* pgxc_handles, bool is_cn); PGXCNodeHandle** get_handles_from_all_handles(PGXCNodeAllHandles* pgxc_handles, bool is_cn); } // namespace UniqueSq /* * Note: for unique sql track type * * top - only top SQL, for example, when call a proc, there will be * multi-sqls in the proc implementatation, only the original * call proc SQL will be collected by unique sql module. * all - (not currently supported) take above case as example, all statements will be recorded */ /* unique SQL max hash table size */ const int UNIQUE_SQL_MAX_HASH_SIZE = 1000; /* * CN to collect unique sql stat from DN, each time will * send "predefined size" unique sql keys to DN */ const int UNIQUE_SQL_IDS_ARRAY_SIZE = 10; /* used for max unique sql string length, * using GUC: pgstat_track_activity_query_size, * as PgBackendStatus::st_activity also using the GUC parameter */ #define UNIQUE_SQL_MAX_LEN (g_instance.attr.attr_common.pgstat_track_activity_query_size + 1) #define MAX_UINT32 (0xFFFFFFFF) /* for each memory allocating, max unique sql count in the memory */ #define MAX_MEM_UNIQUE_SQL_ENTRY_COUNT 1000.0 #define UNIQUE_SQL_HASH_TBL "unique sql hash table" #define STRING_MAX_LEN 256 typedef struct { List *batch_list; ListCell *curr_cell; } UniqueSQLResults; #ifndef ENABLE_MULTIPLE_NODES /* The mapping of UniqueSQLKey to updated_time */ typedef struct { UniqueSQLKey key; TimestampTz updated_time; /* latest update time for the unique sql entry */ } KeyUpdatedtime; static KeyUpdatedtime* GetSortedEntryList(); static int KeyUpdatedtimeCmp(const void* a, const void* b); static bool AutoRecycleUniqueSQLEntry(); #endif void instr_unique_sql_reset_start_time(); static uint32 uniqueSQLHashCode(const void* key, Size size) { const UniqueSQLKey* k = (const UniqueSQLKey*)key; return hash_uint32((uint32)k->cn_id) ^ hash_uint32((uint32)k->user_id) ^ hash_uint32((uint32)k->unique_sql_id); } static int uniqueSQLMatch(const void* key1, const void* key2, Size keysize) { const UniqueSQLKey* k1 = (const UniqueSQLKey*)key1; const UniqueSQLKey* k2 = (const UniqueSQLKey*)key2; if (k1 != NULL && k2 != NULL && k1->user_id == k2->user_id && k1->unique_sql_id == k2->unique_sql_id) { return 0; } else { return 1; } } static LWLock* LockUniqueSQLHashPartition(uint32 hashCode, LWLockMode lockMode) { LWLock* partitionLock = GetMainLWLockByIndex(FirstUniqueSQLMappingLock + (hashCode % NUM_UNIQUE_SQL_PARTITIONS)); LWLockAcquire(partitionLock, lockMode); return partitionLock; } static void UnlockUniqueSQLHashPartition(uint32 hashCode) { LWLock* partitionLock = GetMainLWLockByIndex(FirstUniqueSQLMappingLock + (hashCode % NUM_UNIQUE_SQL_PARTITIONS)); LWLockRelease(partitionLock); } /* * resetUniqueSQLEntry - reset UniqueSQL entry except key */ static void resetUniqueSQLEntry(UniqueSQL* entry) { if (entry != NULL) { pg_atomic_write_u64(&(entry->calls), 0); entry->unique_sql = NULL; // reset elapse time stat gs_lock_test_and_set_64(&(entry->elapse_time.total_time), 0); gs_lock_test_and_set_64(&(entry->elapse_time.min_time), 0); gs_lock_test_and_set_64(&(entry->elapse_time.max_time), 0); // reset row activity stat pg_atomic_write_u64(&(entry->row_activity.returned_rows), 0); pg_atomic_write_u64(&(entry->row_activity.tuples_fetched), 0); pg_atomic_write_u64(&(entry->row_activity.tuples_returned), 0); pg_atomic_write_u64(&(entry->row_activity.tuples_inserted), 0); pg_atomic_write_u64(&(entry->row_activity.tuples_updated), 0); pg_atomic_write_u64(&(entry->row_activity.tuples_deleted), 0); // cache_io pg_atomic_write_u64(&(entry->cache_io.blocks_fetched), 0); pg_atomic_write_u64(&(entry->cache_io.blocks_hit), 0); // parse info pg_atomic_write_u64(&(entry->parse.soft_parse), 0); pg_atomic_write_u64(&(entry->parse.hard_parse), 0); // sort hash work_mem info pg_atomic_write_u64(&(entry->sort_state.counts), 0); gs_lock_test_and_set_64(&(entry->sort_state.total_time), 0); gs_lock_test_and_set_64(&(entry->sort_state.used_work_mem), 0); pg_atomic_write_u64(&(entry->sort_state.spill_counts), 0); pg_atomic_write_u64(&(entry->sort_state.spill_size), 0); pg_atomic_write_u64(&(entry->hash_state.counts), 0); gs_lock_test_and_set_64(&(entry->hash_state.total_time), 0); gs_lock_test_and_set_64(&(entry->hash_state.used_work_mem), 0); pg_atomic_write_u64(&(entry->hash_state.spill_counts), 0); pg_atomic_write_u64(&(entry->hash_state.spill_size), 0); // time Info for (uint32 idx = 0; idx < TOTAL_TIME_INFO_TYPES; idx++) { gs_lock_test_and_set_64(&(entry->timeInfo.TimeInfoArray[idx]), 0); } // net info for (uint32 idx = 0; idx < TOTAL_NET_INFO_TYPES; idx++) { pg_atomic_write_u64(&(entry->netInfo.netInfoArray[idx]), 0); } entry->is_local = false; } } /* * InitUniqueSQL - init unique sql resources * * Init resource when postmaster startup, * basic unique SQL objects are created * without GUI parameter control. */ void InitUniqueSQL() { // init memory context if (g_instance.stat_cxt.UniqueSqlContext == NULL) { g_instance.stat_cxt.UniqueSqlContext = AllocSetContextCreate(g_instance.instance_context, "UniqueSQLContext", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, SHARED_CONTEXT); } // init unique sql hash table HASHCTL ctl; errno_t rc; rc = memset_s(&ctl, sizeof(ctl), 0, sizeof(ctl)); securec_check_c(rc, "\0", "\0"); ctl.hcxt = g_instance.stat_cxt.UniqueSqlContext; ctl.keysize = sizeof(UniqueSQLKey); // alloc extra space for normalized query(only CN stores sql string) if (need_normalize_unique_string()) { ctl.entrysize = sizeof(UniqueSQL) + UNIQUE_SQL_MAX_LEN; } else { ctl.entrysize = sizeof(UniqueSQL); } ctl.hash = uniqueSQLHashCode; ctl.match = uniqueSQLMatch; ctl.num_partitions = NUM_UNIQUE_SQL_PARTITIONS; g_instance.stat_cxt.UniqueSQLHashtbl = hash_create(UNIQUE_SQL_HASH_TBL, UNIQUE_SQL_MAX_HASH_SIZE, &ctl, HASH_ELEM | HASH_SHRCTX | HASH_FUNCTION | HASH_COMPARE | HASH_PARTITION | HASH_NOEXCEPT); init_builtin_unique_sql(); } void instr_unique_sql_register_hook() { // only register the hooks on CN if ((!IS_PGXC_COORDINATOR) && (!IS_SINGLE_NODE)) { return; } // register hooks t_thrd.statement_cxt.instr_prev_post_parse_analyze_hook = (void *)post_parse_analyze_hook; post_parse_analyze_hook = UniqueSq::unique_sql_post_parse_analyze; } /* * UpdateUniqueSQLCalls - update unique SQL calls */ static void UpdateUniqueSQLCalls(UniqueSQL* unique_sql) { Assert(u_sess->attr.attr_resource.enable_resource_track && (u_sess->attr.attr_common.instr_unique_sql_count > 0)); if (unique_sql == NULL) { return; } pg_atomic_fetch_add_u64(&(unique_sql->calls), 1); } /* * updateMaxValueForAtomicType - using atomic type to store max value, * we need update the max value by using atomic method */ static void updateMaxValueForAtomicType(int64 new_val, int64* max) { int64 prev; do { prev = *max; } while (prev < new_val && !gs_compare_and_swap_64(max, prev, new_val)); } /* * updateMinValueForAtomicType - update ming value for atomic type */ static void updateMinValueForAtomicType(int64 new_val, int64* mix) { int64 prev; do { prev = *mix; } while ((prev == 0 || prev > new_val) && !gs_compare_and_swap_64(mix, prev, new_val)); } /* * UpdateUniqueSQLElapseTime - update elase time of the unique sql * * elapse end time is calculated in this function */ static void UpdateUniqueSQLElapseTime(UniqueSQL* unique_sql, int64 elapse_start) { Assert(u_sess->attr.attr_resource.enable_resource_track && (u_sess->attr.attr_common.instr_unique_sql_count > 0)); if (unique_sql == NULL || elapse_start == 0) { return; } TimestampTz elapse_time = GetCurrentTimestamp() - elapse_start; /* Because the time precision is microseconds, * all actions less than microseconds are recorded as 0. * When the duration is 0, we set the duration to 1 * */ elapse_time = (elapse_time == 0) ? 1 : elapse_time; /* update unique sql's total/max/min time */ gs_atomic_add_64(&(unique_sql->elapse_time.total_time), elapse_time); updateMaxValueForAtomicType(elapse_time, &(unique_sql->elapse_time.max_time)); updateMinValueForAtomicType(elapse_time, &(unique_sql->elapse_time.min_time)); } /* * UpdateUniqueSQLStatDetail - update row activity or cache/IO */ static void UpdateUniqueSQLStatDetail(UniqueSQL* unique_sql, PgStat_TableCounts* agg_table_stat, uint64 returned_rows) { Assert(u_sess->attr.attr_resource.enable_resource_track && (u_sess->attr.attr_common.instr_unique_sql_count > 0)); if (unique_sql == NULL) { return; } if (agg_table_stat != NULL) { // row activity pg_atomic_fetch_add_u64(&unique_sql->row_activity.tuples_fetched, agg_table_stat->t_tuples_fetched); pg_atomic_fetch_add_u64(&unique_sql->row_activity.tuples_returned, agg_table_stat->t_tuples_returned); pg_atomic_fetch_add_u64(&unique_sql->row_activity.tuples_inserted, agg_table_stat->t_tuples_inserted); pg_atomic_fetch_add_u64(&unique_sql->row_activity.tuples_updated, agg_table_stat->t_tuples_updated); pg_atomic_fetch_add_u64(&unique_sql->row_activity.tuples_deleted, agg_table_stat->t_tuples_deleted); // cache_io pg_atomic_fetch_add_u64(&unique_sql->cache_io.blocks_fetched, agg_table_stat->t_blocks_fetched); pg_atomic_fetch_add_u64(&unique_sql->cache_io.blocks_hit, agg_table_stat->t_blocks_hit); #ifdef ENABLE_MULTIPLE_NODES /* SQL: 'START TRANSACTION' should not be passed down to DN directly */ if (IS_PGXC_DATANODE && unique_sql->key.unique_sql_id == START_TRX_UNIQUE_SQL_ID) { ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] recv 'START TRANSACTION' on DN"))); } #endif } if (returned_rows > 0) { pg_atomic_fetch_add_u64(&unique_sql->row_activity.returned_rows, returned_rows); instr_stmt_report_returned_rows(returned_rows); } } /* * update hard/soft parse counter info * * soft parse - reuse plan * hard parse - generate new plan */ static void UpdateUniqueSQLParse(UniqueSQL* unique_sql) { if (unique_sql == NULL) { return; } if (u_sess->unique_sql_cxt.unique_sql_soft_parse > 0) { pg_atomic_fetch_add_u64(&unique_sql->parse.soft_parse, u_sess->unique_sql_cxt.unique_sql_soft_parse); instr_stmt_report_soft_parse(u_sess->unique_sql_cxt.unique_sql_soft_parse); } if (u_sess->unique_sql_cxt.unique_sql_hard_parse > 0) { pg_atomic_fetch_add_u64(&unique_sql->parse.hard_parse, u_sess->unique_sql_cxt.unique_sql_hard_parse); instr_stmt_report_hard_parse(u_sess->unique_sql_cxt.unique_sql_hard_parse); } ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, agg soft parse: %lu, hard parse: %lu", u_sess->unique_sql_cxt.unique_sql_id, u_sess->unique_sql_cxt.unique_sql_soft_parse, u_sess->unique_sql_cxt.unique_sql_hard_parse))); /* for parse counter, when update unique stat, aggregate&reset last parse counter */ UniqueSQLStatCountResetParseCounter(); } /* * update sort/hash work_mem info when executor run finished */ static void UpdateUniqueSQLSortHashInfo(UniqueSQL* unique_sql) { if (unique_sql == NULL) { return; } if (u_sess->unique_sql_cxt.unique_sql_sort_instr->has_sorthash) { /* if query contains SORT operation, the unique sql sort info will be updated */ unique_sql_sorthash_instr* sort_instr = u_sess->unique_sql_cxt.unique_sql_sort_instr; pg_atomic_fetch_add_u64(&unique_sql->sort_state.counts, sort_instr->counts); gs_atomic_add_64(&unique_sql->sort_state.total_time, sort_instr->total_time); gs_atomic_add_64(&unique_sql->sort_state.used_work_mem, sort_instr->used_work_mem); pg_atomic_fetch_add_u64(&unique_sql->sort_state.spill_counts, sort_instr->spill_counts); pg_atomic_fetch_add_u64(&unique_sql->sort_state.spill_size, sort_instr->spill_size); ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, sort state updated", u_sess->unique_sql_cxt.unique_sql_id))); /* reset the sort counter */ errno_t rc = memset_s(sort_instr, sizeof(unique_sql_sorthash_instr), 0, sizeof(unique_sql_sorthash_instr)); securec_check(rc, "", ""); } if (u_sess->unique_sql_cxt.unique_sql_hash_instr->has_sorthash) { /* if query contains HASH operation, the unique sql hash info should be updated */ unique_sql_sorthash_instr* hash_instr = u_sess->unique_sql_cxt.unique_sql_hash_instr; pg_atomic_fetch_add_u64(&unique_sql->hash_state.counts, hash_instr->counts); gs_atomic_add_64(&unique_sql->hash_state.total_time, hash_instr->total_time); gs_atomic_add_64(&unique_sql->hash_state.used_work_mem, hash_instr->used_work_mem); pg_atomic_fetch_add_u64(&unique_sql->hash_state.spill_counts, hash_instr->spill_counts); pg_atomic_fetch_add_u64(&unique_sql->hash_state.spill_size, hash_instr->spill_size); ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, hash state updated", u_sess->unique_sql_cxt.unique_sql_id))); /* reset the hash counter */ errno_t rc = memset_s(hash_instr, sizeof(unique_sql_sorthash_instr), 0, sizeof(unique_sql_sorthash_instr)); securec_check(rc, "", ""); } } /* * get hash state from hashtable to update unqiue sql hash info later */ void UpdateUniqueSQLHashStats(HashJoinTable hashtable, TimestampTz* start_time) { /* isUniqueSQLContextInvalid happens when the query is explain xxx */ if (!is_unique_sql_enabled() || isUniqueSQLContextInvalid()) { return; } /* the first time enter hash operation, init hash state */ unique_sql_sorthash_instr* instr = u_sess->unique_sql_cxt.unique_sql_hash_instr; instr->has_sorthash = true; /* update time info */ if (*start_time == 0) { *start_time = GetCurrentTimestamp(); } else if (hashtable != NULL) { /* increased by hash exec time */ instr->total_time += GetCurrentTimestamp() - *start_time; /* update work mem info */ instr->counts += 1; instr->spill_counts += hashtable->spill_count; /* get the space used in kbs */ instr->spill_size += (*hashtable->spill_size + 1023) / 1024; instr->used_work_mem += (hashtable->spacePeak + 1023) / 1024; } } /* UpdateUniqueSQLVecSortStats - parse the vector sort information from the Batchsortstate, * used to update for the uniuqe sql sort infomation. */ void UpdateUniqueSQLVecSortStats(Batchsortstate* state, uint64 spill_count, TimestampTz* start_time) { /* isUniqueSQLContextInvalid happens when the query is explain xxx */ if (!is_unique_sql_enabled() || isUniqueSQLContextInvalid()) { return; } unique_sql_sorthash_instr* instr = u_sess->unique_sql_cxt.unique_sql_sort_instr; /* the first time enter sort executor and init the state */ instr->has_sorthash = true; if (*start_time == 0) { *start_time = GetCurrentTimestamp(); } else if (state != NULL) { instr->counts += 1; instr->total_time += GetCurrentTimestamp() - *start_time; /* update vect sort info of space used in kbs */ if (state->m_tapeset != NULL) { instr->spill_counts += spill_count; instr->spill_size += LogicalTapeSetBlocks(state->m_tapeset) * (BLCKSZ / 1024); } else { instr->used_work_mem += (state->m_allowedMem - state->m_availMem + 1023) / 1024; } } } static void set_unique_sql_string_in_entry(UniqueSQL* entry, Query* query, const char* sql, int32 multi_sql_offset) { errno_t rc = EOK; // only CN stores normalized query string if (need_normalize_unique_string()) { entry->unique_sql = (char*)(entry + 1); rc = memset_s(entry->unique_sql, UNIQUE_SQL_MAX_LEN, 0, UNIQUE_SQL_MAX_LEN); securec_check(rc, "\0", "\0"); } else { entry->unique_sql = NULL; } if (entry->unique_sql != NULL && sql != NULL && query != NULL) { entry->is_local = true; // generate and store normalized query string if (normalized_unique_querystring(query, sql, entry->unique_sql, UNIQUE_SQL_MAX_LEN - 1, multi_sql_offset)) { entry->unique_sql = trim(entry->unique_sql); } else { ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, normalized " "SQL failed!", u_sess->unique_sql_cxt.unique_sql_id))); } } } static void UpdateUniqueSQLTimeStat(UniqueSQL* entry, int64 timeInfo[]) { if (timeInfo != NULL) { int idx; for (idx = 0; idx < TOTAL_TIME_INFO_TYPES; idx++) { (void)gs_atomic_add_64(&(entry->timeInfo.TimeInfoArray[idx]), timeInfo[idx]); } } } static void UpdateUniqueSQLNetInfo(UniqueSQL* entry, const uint64* netInfo) { if (netInfo == NULL) return; for (int i = 0; i < TOTAL_NET_INFO_TYPES; i++) { (void)pg_atomic_fetch_add_u64(&(entry->netInfo.netInfoArray[i]), netInfo[i]); } } bool isUniqueSQLContextInvalid() { if (u_sess->unique_sql_cxt.unique_sql_id == 0 || g_instance.stat_cxt.UniqueSQLHashtbl == NULL || !OidIsValid(u_sess->unique_sql_cxt.unique_sql_user_id)) { return true; } if (IS_SINGLE_NODE) { return false; } if (!OidIsValid(u_sess->unique_sql_cxt.unique_sql_cn_id)) { return true; } return false; } void UpdateSingleNodeByPassUniqueSQLStat(bool isTopLevel) { if (IS_SINGLE_NODE && is_unique_sql_enabled() && isTopLevel) { if (IS_UNIQUE_SQL_TRACK_TOP && IsTopUniqueSQL()) { instr_unique_sql_report_elapse_time(u_sess->unique_sql_cxt.unique_sql_start_time); instr_unique_sql_reset_start_time(); } if (u_sess->unique_sql_cxt.unique_sql_start_time != 0) { int64 duration = GetCurrentTimestamp() - u_sess->unique_sql_cxt.unique_sql_start_time; pgstat_update_responstime_singlenode( u_sess->unique_sql_cxt.unique_sql_id, u_sess->unique_sql_cxt.unique_sql_start_time, duration); } } } /* * UpdateUniqueSQLStat - update unique sql's stat info * * 1, find/create unique sql entry * 2, udpate stat info * - calls * - response time(only input the start elapse time) */ void UpdateUniqueSQLStat(Query* query, const char* sql, int64 elapse_start_time, PgStat_TableCounts* agg_table_stat, UniqueSQLStat* sqlStat) { /* unique sql id can't be zero */ if (isUniqueSQLContextInvalid()) { ereport(DEBUG2, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] Failed to update entry - cn id: %u, user id: %u, unique sql id: %lu,", u_sess->unique_sql_cxt.unique_sql_cn_id, u_sess->unique_sql_cxt.unique_sql_user_id, u_sess->unique_sql_cxt.unique_sql_id))); return; } UniqueSQLKey key; UniqueSQL* entry = NULL; bool found = false; key.unique_sql_id = u_sess->unique_sql_cxt.unique_sql_id; key.cn_id = u_sess->unique_sql_cxt.unique_sql_cn_id; key.user_id = u_sess->unique_sql_cxt.unique_sql_user_id; ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] update entry - cn id: %u, user id: %u, sql id: %lu,", key.cn_id, key.user_id, key.unique_sql_id))); uint32 hashCode = uniqueSQLHashCode(&key, sizeof(key)); (void)LockUniqueSQLHashPartition(hashCode, LW_SHARED); entry = (UniqueSQL*)hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &key, HASH_FIND, NULL); if (entry == NULL) { UnlockUniqueSQLHashPartition(hashCode); /* * Handle race condition between HASH insert/clean * consider below scenario: * 1, user runs SQL S1 firstly. * 2, insert normalized sql string to unique sql hash * 3, ----> clean unique sql hash table(another session) * 4, insert/update other SQL stat, such as, soft/hard parse counter, etc. * 5, then the sql text field will be empty forever. * * solution: * 1, on DN/remote CN(utiliti statement), no special control * 2, on local CN, if entry not existed, only can insert new entry when query(Query *) * is not NULL */ #ifdef ENABLE_MULTIPLE_NODES if (is_local_unique_sql() && query == NULL) { return; } /* control unique sql number by instr_unique_sql_count. */ long totalCount = hash_get_num_entries(g_instance.stat_cxt.UniqueSQLHashtbl); if (totalCount >= u_sess->attr.attr_common.instr_unique_sql_count) { ereport(DEBUG2, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] Failed to insert unique sql for up to limit"))); return; } #else if (is_local_unique_sql() && query == NULL && u_sess->unique_sql_cxt.unique_sql_text == NULL) { return; } /* control unique sql number by instr_unique_sql_count. */ long totalCount = hash_get_num_entries(g_instance.stat_cxt.UniqueSQLHashtbl); if (totalCount >= u_sess->attr.attr_common.instr_unique_sql_count) { if (g_instance.attr.attr_common.enable_auto_clean_unique_sql) { if (!AutoRecycleUniqueSQLEntry()) { return; } } else { ereport(DEBUG2, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] Failed to insert unique sql for up to limit"))); return; } } #endif (void)LockUniqueSQLHashPartition(hashCode, LW_EXCLUSIVE); entry = (UniqueSQL*)hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &key, HASH_ENTER, &found); // out of memory if (entry == NULL) { UnlockUniqueSQLHashPartition(hashCode); return; } if (!found) { resetUniqueSQLEntry(entry); #ifndef ENABLE_MULTIPLE_NODES if (query == NULL && u_sess->unique_sql_cxt.unique_sql_text != NULL) { entry->unique_sql = (char*)(entry + 1); errno_t rc = memset_s(entry->unique_sql, UNIQUE_SQL_MAX_LEN, 0, UNIQUE_SQL_MAX_LEN); securec_check(rc, "\0", "\0"); entry->is_local = true; int unique_sql_textLen = strlen(u_sess->unique_sql_cxt.unique_sql_text); rc = memcpy_s(entry->unique_sql, UNIQUE_SQL_MAX_LEN, u_sess->unique_sql_cxt.unique_sql_text, unique_sql_textLen); securec_check(rc, "\0", "\0"); } else { set_unique_sql_string_in_entry(entry, query, sql, u_sess->unique_sql_cxt.multi_sql_offset); } #else set_unique_sql_string_in_entry(entry, query, sql, u_sess->unique_sql_cxt.multi_sql_offset); #endif } } if ((IS_PGXC_COORDINATOR || IS_SINGLE_NODE) && elapse_start_time != 0) { ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, update entry n_calls", key.unique_sql_id))); UpdateUniqueSQLCalls(entry); UpdateUniqueSQLElapseTime(entry, elapse_start_time); UpdateUniqueSQLStatDetail(entry, NULL, u_sess->unique_sql_cxt.unique_sql_returned_rows_counter); u_sess->unique_sql_cxt.need_update_calls = false; } else if (IS_PGXC_DATANODE && agg_table_stat != NULL) { UpdateUniqueSQLStatDetail(entry, agg_table_stat, 0); instr_stmt_report_unique_sql_info(agg_table_stat, NULL, NULL); } /* parse info(CN & DN) */ UpdateUniqueSQLParse(entry); /* Sort&Hash info */ UpdateUniqueSQLSortHashInfo(entry); if (sqlStat != NULL) { // record SQL's time Info UpdateUniqueSQLTimeStat(entry, sqlStat->timeInfo); /* record SQL's net info */ UpdateUniqueSQLNetInfo(entry, sqlStat->netInfo); // record statement KPI info instr_stmt_report_unique_sql_info(NULL, sqlStat->timeInfo, sqlStat->netInfo); } (void)gs_lock_test_and_set_64(&entry->updated_time, GetCurrentTimestamp()); UnlockUniqueSQLHashPartition(hashCode); } /* * SendUniqueSQLIds - send unique sql ids to specific DN * return false failed */ static bool SendUniqueSQLIds( PGXCNodeHandle* handle, uint32* cn_ids, Oid* user_ids, uint64* query_ids, uint32* slot_indexes, uint count) { /* * msg format: * 'i' + 4 + 1 + 4 + (4 + 4 + 4 + 8) * count * 'i' + msg_len + 's'+ count + (slot_indexes + cn id + user_oid + unique_sql_id) + (...) */ size_t msg_len = sizeof(uint32) + sizeof(char) + sizeof(uint32) + (sizeof(uint32) + sizeof(uint32) + sizeof(Oid) + sizeof(uint64)) * count; int rc; uint32 n32; if (handle->state != DN_CONNECTION_STATE_IDLE) { return false; } ensure_out_buffer_capacity(1 + msg_len, handle); Assert(handle->outBuffer != NULL); /* instrumentation */ handle->outBuffer[handle->outEnd++] = 'i'; /* message length, not including 'i' */ msg_len = htonl(msg_len); rc = memcpy_s(handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, &msg_len, sizeof(uint32)); securec_check(rc, "\0", "\0"); handle->outEnd += sizeof(uint32); /* unique sql ids to collect stat */ handle->outBuffer[handle->outEnd++] = 's'; n32 = htonl(count); rc = memcpy_s(handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, &n32, sizeof(uint32)); securec_check(rc, "\0", "\0"); handle->outEnd += sizeof(uint32); for (uint32 i = 0; i < count; i++) { /* slot index */ rc = memcpy_s( handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, slot_indexes + i, sizeof(uint32)); securec_check(rc, "\0", "\0"); handle->outEnd += sizeof(uint32); /* cn id */ rc = memcpy_s(handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, cn_ids + i, sizeof(uint32)); securec_check(rc, "\0", "\0"); handle->outEnd += sizeof(uint32); /* user id */ rc = memcpy_s(handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, user_ids + i, sizeof(Oid)); securec_check(rc, "\0", "\0"); handle->outEnd += sizeof(Oid); /* query id */ rc = memcpy_s( handle->outBuffer + handle->outEnd, handle->outSize - handle->outEnd, query_ids + i, sizeof(uint64)); securec_check(rc, "\0", "\0"); handle->outEnd += sizeof(uint64); } handle->state = DN_CONNECTION_STATE_QUERY; ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] send unique sql IDs to CN/DN, count: %u", count))); return pgxc_node_flush(handle) == 0; } /* * AggUniqueSQLStat - update unique sql's stat from DNs * * unique_sql_array - preallocated unique sql array * msg - msg from other CN/DN */ static void AggUniqueSQLStat(UniqueSQL* unique_sql_array, int arr_size, StringInfoData* recv_msg) { if (unique_sql_array == NULL) { return; } uint32 index = (uint32)pq_getmsgint(recv_msg, sizeof(uint32)); if (index >= (uint32)arr_size) { ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] receive stat: index %u >= array size: %d!", index, arr_size))); return; } uint32 recv_cn_id = (uint32)pq_getmsgint(recv_msg, sizeof(uint32)); Oid recv_user_id = (Oid)pq_getmsgint(recv_msg, sizeof(uint32)); uint64 recv_unique_sql_id = (uint64)pq_getmsgint64(recv_msg); if (unique_sql_array[index].key.cn_id != recv_cn_id || unique_sql_array[index].key.user_id != recv_user_id || unique_sql_array[index].key.unique_sql_id != recv_unique_sql_id) { ereport(ERROR, (errmsg("[UniqueSQL] check unique sql array slot index!"))); } // row activity unique_sql_array[index].row_activity.returned_rows += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].row_activity.tuples_fetched += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].row_activity.tuples_returned += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].row_activity.tuples_inserted += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].row_activity.tuples_updated += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].row_activity.tuples_deleted += (uint64)pq_getmsgint64(recv_msg); // cache io unique_sql_array[index].cache_io.blocks_fetched += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].cache_io.blocks_hit += (uint64)pq_getmsgint64(recv_msg); // parse info unique_sql_array[index].parse.soft_parse += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].parse.hard_parse += (uint64)pq_getmsgint64(recv_msg); // time Info for (uint32 idx = 0; idx < TOTAL_TIME_INFO_TYPES; idx++) { unique_sql_array[index].timeInfo.TimeInfoArray[idx] += (uint64)pq_getmsgint64(recv_msg); } // net info for (uint32 idx = 0; idx < TOTAL_NET_INFO_TYPES; idx++) { unique_sql_array[index].netInfo.netInfoArray[idx] += (uint64)pq_getmsgint64(recv_msg); } TimestampTz tmp = pq_getmsgint64(recv_msg); // updated_time if (unique_sql_array[index].updated_time < tmp) { unique_sql_array[index].updated_time = tmp; } // sort&hash info unique_sql_array[index].sort_state.counts += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].sort_state.total_time += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].sort_state.used_work_mem += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].sort_state.spill_counts += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].sort_state.spill_size += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].hash_state.counts += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].hash_state.total_time += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].hash_state.used_work_mem += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].hash_state.spill_counts += (uint64)pq_getmsgint64(recv_msg); unique_sql_array[index].hash_state.spill_size += (uint64)pq_getmsgint64(recv_msg); } /* * * @Description: handle response messages from remote(CN/DN) * */ static void handle_message_from_remote( UniqueSQL* unique_sql_array, int arr_size, PGXCNodeHandle* handle, bool* hasError, bool* isFinished) { Assert(handle != NULL); char* msg = NULL; int len = 0; char msgType = get_message(handle, &len, &msg); switch (msgType) { case '\0': /* message is not completed */ case 'A': /* NotificationResponse */ case 'S': /* SetCommandComplete */ break; case 'E': /* message is error */ *hasError = true; break; case 'r': { if (len <= 0) { ereport(ERROR, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] invalid 'r' message. node: %s", handle->remoteNodeName))); break; } StringInfoData recvMsg; initStringInfo(&recvMsg); appendBinaryStringInfo(&recvMsg, msg, len); AggUniqueSQLStat(unique_sql_array, arr_size, &recvMsg); pq_getmsgend(&recvMsg); pfree(recvMsg.data); break; } case 'f': ereport(DEBUG2, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] current CN/DN stat is finished. node: %s", handle->remoteNodeName))); *isFinished = true; break; case 'Z': if (*hasError) { ereport(LOG, (errmsg("[UniqueSQL] receive stat: get error message. node %s, %s", handle->remoteNodeName, handle->error ? handle->error : ""))); *isFinished = true; } break; default: ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] receive stat: unexpected message type %c, node: %s", msgType, handle->remoteNodeName))); *hasError = true; *isFinished = true; break; } } /** * @Description: get unique sql stat from remote node(CN/DN) * @return - true if successful, else false */ static bool RecvUniqueSQLStat(UniqueSQL* unique_sql_array, int arr_size, PGXCNodeAllHandles* pgxc_handles, bool is_cn) { ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] try to receive unique sql stat from remote :%s", is_cn ? "CN" : "DN"))); int conn_count; PGXCNodeHandle** handles = NULL; conn_count = UniqueSq::get_conn_count_from_all_handles(pgxc_handles, is_cn); handles = UniqueSq::get_handles_from_all_handles(pgxc_handles, is_cn); if (conn_count > 0 && handles == NULL) { ereport( LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] receive stat: invalid remote connection handler array"))); return false; } for (int i = 0; i < conn_count; i++) { PGXCNodeHandle* handle = handles[i]; if (handle == NULL) { ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] receive stat: invalid remote connection handler"))); return false; } bool hasError = false; bool isFinished = false; while (true) { if (!pgxc_node_receive(1, &handle, NULL)) { handle_message_from_remote(unique_sql_array, arr_size, handle, &hasError, &isFinished); if (isFinished || hasError) { break; } } else { hasError = true; break; } } handle->state = DN_CONNECTION_STATE_IDLE; if (hasError) { ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] receive stat: fetch failed, node: %s", handle->remoteNodeName))); return false; } } return true; } /** * @Description: wrapper method to get CN/DN count from PGXCNodeAllHandles */ int UniqueSq::get_conn_count_from_all_handles(PGXCNodeAllHandles* pgxc_handles, bool is_cn) { if (pgxc_handles != NULL) { return is_cn ? pgxc_handles->co_conn_count : pgxc_handles->dn_conn_count; } return 0; } /** * @Description: wrapper method to get CN/DN handle from PGXCNodeAllHandles */ PGXCNodeHandle** UniqueSq::get_handles_from_all_handles(PGXCNodeAllHandles* pgxc_handles, bool is_cn) { if (pgxc_handles != NULL) { return is_cn ? pgxc_handles->coord_handles : pgxc_handles->datanode_handles; } return NULL; } /** * @Description: send unique sql keys to remote CN/DNs * @return - return true if send keys to remote node successfully */ static bool SendUniqueSQLIDsToRemote(PGXCNodeAllHandles* pgxc_handles, uint32* cn_ids, Oid* user_ids, uint64* query_ids, uint32* slot_indexes, uint count, bool is_cn) { int conn_count; PGXCNodeHandle** handles = NULL; conn_count = UniqueSq::get_conn_count_from_all_handles(pgxc_handles, is_cn); handles = UniqueSq::get_handles_from_all_handles(pgxc_handles, is_cn); if (conn_count > 0 && handles == NULL) { ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] send key: invalid remote connection handler array"))); return false; } for (int i = 0; i < conn_count; i++) { PGXCNodeHandle* handle = handles[i]; if (handle == NULL) { ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] send key: invalid remote connection handler"))); return false; } if (handle->state == DN_CONNECTION_STATE_QUERY) { BufferConnection(handle); } handle->state = DN_CONNECTION_STATE_IDLE; if (!SendUniqueSQLIds(handle, cn_ids, user_ids, query_ids, slot_indexes, count)) { /* connection issue */ ereport( LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] send key: failed, node: %s", handle->remoteNodeName))); return false; } } return true; } /* * @Description: get unique sql stat from CNs/DNs * including: * - Cache/IO * - Row activities * - etc * * @return - return false if failed */ static bool GetUniqueSQLStatFromRemote(UniqueSQL* unique_sql_array, int arr_size, PGXCNodeAllHandles* pgxc_handles, uint32* cn_ids, Oid* user_ids, uint64* query_ids, uint32* slot_indexes, uint count) { Assert(pgxc_handles); Assert(unique_sql_array); /* 1, send all unique sql ids to CN/DN */ if (!SendUniqueSQLIDsToRemote(pgxc_handles, cn_ids, user_ids, query_ids, slot_indexes, count, true) || !SendUniqueSQLIDsToRemote(pgxc_handles, cn_ids, user_ids, query_ids, slot_indexes, count, false)) { return false; } /* 2, receive cn/dn's result, append the result to unique sql entry */ if (RecvUniqueSQLStat(unique_sql_array, arr_size, pgxc_handles, true) && RecvUniqueSQLStat(unique_sql_array, arr_size, pgxc_handles, false)) { return true; } return false; } /** * @Description: copy unique sql entry from hash table to pre-allocated buffer * @return - void */ static void copy_unique_sql_entry(UniqueSQL* unique_sql_array, int num, int i, UniqueSQL* entry) { errno_t rc = EOK; unique_sql_array[i].key.cn_id = entry->key.cn_id; unique_sql_array[i].key.user_id = entry->key.user_id; unique_sql_array[i].key.unique_sql_id = entry->key.unique_sql_id; unique_sql_array[i].calls = entry->calls; if (need_normalize_unique_string() && entry->unique_sql != NULL) { unique_sql_array[i].unique_sql = (char*)(unique_sql_array + num) + i * UNIQUE_SQL_MAX_LEN; rc = memcpy_s(unique_sql_array[i].unique_sql, UNIQUE_SQL_MAX_LEN, entry->unique_sql, strlen(entry->unique_sql)); securec_check(rc, "\0", "\0"); } else { unique_sql_array[i].unique_sql = NULL; } // elapse time unique_sql_array[i].elapse_time.total_time = entry->elapse_time.total_time; unique_sql_array[i].elapse_time.min_time = entry->elapse_time.min_time; unique_sql_array[i].elapse_time.max_time = entry->elapse_time.max_time; // row activity unique_sql_array[i].row_activity.returned_rows = entry->row_activity.returned_rows; unique_sql_array[i].row_activity.tuples_fetched = entry->row_activity.tuples_fetched; unique_sql_array[i].row_activity.tuples_returned = entry->row_activity.tuples_returned; unique_sql_array[i].row_activity.tuples_inserted = entry->row_activity.tuples_inserted; unique_sql_array[i].row_activity.tuples_updated = entry->row_activity.tuples_updated; unique_sql_array[i].row_activity.tuples_deleted = entry->row_activity.tuples_deleted; // Cache/IO unique_sql_array[i].cache_io.blocks_fetched = entry->cache_io.blocks_fetched; unique_sql_array[i].cache_io.blocks_hit = entry->cache_io.blocks_hit; // parse info unique_sql_array[i].parse.soft_parse = entry->parse.soft_parse; unique_sql_array[i].parse.hard_parse = entry->parse.hard_parse; // sort&hash work mem info unique_sql_array[i].sort_state.counts = entry->sort_state.counts; unique_sql_array[i].sort_state.total_time = entry->sort_state.total_time; unique_sql_array[i].sort_state.used_work_mem = entry->sort_state.used_work_mem; unique_sql_array[i].sort_state.spill_counts = entry->sort_state.spill_counts; unique_sql_array[i].sort_state.spill_size = entry->sort_state.spill_size; unique_sql_array[i].hash_state.counts = entry->hash_state.counts; unique_sql_array[i].hash_state.total_time = entry->hash_state.total_time; unique_sql_array[i].hash_state.used_work_mem = entry->hash_state.used_work_mem; unique_sql_array[i].hash_state.spill_counts = entry->hash_state.spill_counts; unique_sql_array[i].hash_state.spill_size = entry->hash_state.spill_size; // time Info rc = memcpy_s(&unique_sql_array[i].timeInfo, sizeof(UniqueSQLTime), &entry->timeInfo, sizeof(UniqueSQLTime)); securec_check(rc, "\0", "\0"); // net info rc = memcpy_s(&unique_sql_array[i].netInfo, sizeof(UniqueSQLNetInfo), &entry->netInfo, sizeof(UniqueSQLNetInfo)); securec_check(rc, "\0", "\0"); /* if is local, will fetch more information from CN/DNs */ unique_sql_array[i].is_local = entry->is_local; unique_sql_array[i].updated_time = entry->updated_time; } /** * @Description: send unique sql key to remote node * @return - false if failed */ static bool package_and_send_unqiue_sql_key_to_remote( UniqueSQL* unique_sql_array, int num, PGXCNodeAllHandles* pgxc_handles) { /* key = CN id + user id + query id */ uint32 cn_ids[UNIQUE_SQL_IDS_ARRAY_SIZE] = {0}; Oid user_ids[UNIQUE_SQL_IDS_ARRAY_SIZE] = {0}; uint64 query_ids[UNIQUE_SQL_IDS_ARRAY_SIZE] = {0}; uint32 slot_indexes[UNIQUE_SQL_IDS_ARRAY_SIZE] = {0}; /* only send unique sql id to remote CN/DN if entry is local */ int array_idx = 0; int rc = 0; const int N32_BIT = 32; for (int i = 0; i < num; i++) { if (!unique_sql_array[i].is_local) { continue; } int index = array_idx++; index = index % UNIQUE_SQL_IDS_ARRAY_SIZE; uint32 n32 = 0; cn_ids[index] = htonl(unique_sql_array[i].key.cn_id); /* -- unique sql id -- */ /* high order half */ n32 = (uint32)(unique_sql_array[i].key.unique_sql_id >> N32_BIT); n32 = htonl(n32); rc = memcpy_s(query_ids + index, sizeof(uint32), &n32, sizeof(uint32)); securec_check(rc, "\0", "\0"); /* low order half */ n32 = (uint32)unique_sql_array[i].key.unique_sql_id; n32 = htonl(n32); rc = memcpy_s((unsigned char*)(query_ids + index) + sizeof(uint32), sizeof(uint32), &n32, sizeof(uint32)); securec_check(rc, "\0", "\0"); user_ids[index] = htonl(unique_sql_array[i].key.user_id); /* store original array index */ slot_indexes[index] = htonl(i); /* each time send 10 unique sql ids or * at the end of unique_sql_array */ if ((index == UNIQUE_SQL_IDS_ARRAY_SIZE - 1)) { if (!GetUniqueSQLStatFromRemote(unique_sql_array, num, pgxc_handles, cn_ids, user_ids, query_ids, slot_indexes, UNIQUE_SQL_IDS_ARRAY_SIZE)) { return false; } } else if (i == num - 1) { if (!GetUniqueSQLStatFromRemote( unique_sql_array, num, pgxc_handles, cn_ids, user_ids, query_ids, slot_indexes, index + 1)) { return false; } } } return true; } static int do_copy_entry(const List *unique_sql_batch_list, int num) { UniqueSQL *unique_sql_array = NULL; HASH_SEQ_STATUS hash_seq; ListCell *tmp_unique_sql_cell = NULL; UniqueSQL* entry = NULL; int i = 0; hash_seq_init(&hash_seq, g_instance.stat_cxt.UniqueSQLHashtbl); while ((entry = (UniqueSQL*)hash_seq_search(&hash_seq)) != NULL && i < num) { /* DN - copy all elements; CN - only copy local unique sql */ if (IS_PGXC_DATANODE || (IS_PGXC_COORDINATOR && entry->is_local)) { int curr_idx = i % (int)MAX_MEM_UNIQUE_SQL_ENTRY_COUNT; if (curr_idx == 0) { if (i == 0) { tmp_unique_sql_cell = list_head(unique_sql_batch_list); } else { tmp_unique_sql_cell = lnext(tmp_unique_sql_cell); } unique_sql_array = (UniqueSQL *)lfirst(tmp_unique_sql_cell); } copy_unique_sql_entry(unique_sql_array, (int)MAX_MEM_UNIQUE_SQL_ENTRY_COUNT, curr_idx, entry); i++; } } return i; } static UniqueSQLResults *build_unique_sql_batch_list(List *unique_sql_batch_list) { UniqueSQLResults *results = (UniqueSQLResults *)palloc0_noexcept(sizeof(UniqueSQLResults)); if (results == NULL) { if (unique_sql_batch_list != NIL) list_free_deep(unique_sql_batch_list); ereport(ERROR, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] palloc failed for results!"))); } results->batch_list = unique_sql_batch_list; results->curr_cell = NULL; return results; } /** * @Description: get unique sql stat from CNs and DNs * @in unique_sql_array - local pre-allocated unique sql array * @in num - unique sql entry counts on local CN * @return - true if success, else will be false */ static bool get_unique_sql_stat_from_remote(const List* unique_sql_batch_list, int num) { /* * get row activity and cache/io from DN nodes, * 1, each time send 10 unique sql ids to DN; * 2, DN replies the unique sql's cache/IO and * row activities * get all dn connection */ List* cn_list = GetAllCoordNodes(); PGXCNodeAllHandles* pgxc_handles = NULL; PG_TRY(); { pgxc_handles = get_handles(NULL, cn_list, false); } PG_CATCH(); { release_pgxc_handles(pgxc_handles); list_free_ext(cn_list); PG_RE_THROW(); } PG_END_TRY(); Assert(pgxc_handles != NULL); int unique_sql_count = 0; UniqueSQL* unique_sql_array = NULL; ListCell *batch_cell = NULL; int batch_count = (int)ceil(num / MAX_MEM_UNIQUE_SQL_ENTRY_COUNT); for (int i = 0; i < batch_count; i++) { if (i == 0) batch_cell = list_head(unique_sql_batch_list); else batch_cell = lnext(batch_cell); unique_sql_array = (UniqueSQL *)lfirst(batch_cell); if ((i == (batch_count - 1)) && (num % (int)MAX_MEM_UNIQUE_SQL_ENTRY_COUNT != 0)) { unique_sql_count = num % (int)MAX_MEM_UNIQUE_SQL_ENTRY_COUNT; } else { unique_sql_count = (int)MAX_MEM_UNIQUE_SQL_ENTRY_COUNT; } if (!package_and_send_unqiue_sql_key_to_remote(unique_sql_array, unique_sql_count, pgxc_handles)) { release_pgxc_handles(pgxc_handles); list_free_ext(cn_list); ereport(ERROR, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] during get stat from remote, failed to send/recv data!"))); return false; } } /* free connection */ release_pgxc_handles(pgxc_handles); list_free_ext(cn_list); return true; } static void log_unique_sql_result_mem(uint64 total_size, uint64 malloc_size, uint64 index) { if (total_size > 1 * 1024 * 1024 * 1024) { ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] idx[%lu] - new memory allocated: %lu", index, malloc_size))); ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] total memory allocated: %lu", total_size))); } } /* * @Description: get unique sql's stat, and save them to pre-allocated array * @out num - valid unique sql entry count * @return - the pre-allocated buffer */ void* GetUniqueSQLStat(long* num) { // must enable pgstat_track_counts to track row activity if (!is_unique_sql_enabled()) { ereport(LOG, (errmsg("[UniqueSQL] GUC parameter 'enable_resource_track' " "or 'instr_unique_sql_count' is zero, unique sql view will be empty!"))); *num = 0; return NULL; } if (g_instance.stat_cxt.UniqueSQLHashtbl == NULL) { *num = 0; return NULL; } int i = 0; uint64 total_size = 0; UniqueSQL* unique_sql_array = NULL; List* unique_sql_batch_list = NIL; for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) { LWLockAcquire(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i), LW_SHARED); } *num = hash_get_num_entries(g_instance.stat_cxt.UniqueSQLHashtbl); if (*num > 0) { int unique_sql_str_len = (IS_PGXC_COORDINATOR || IS_SINGLE_NODE) ? UNIQUE_SQL_MAX_LEN : 0; for (int j = 0; j < ceil(*num / MAX_MEM_UNIQUE_SQL_ENTRY_COUNT); j++) { /* memory format: [entry_1] [entry_2] [entry_3] ... | [sql_1] [sql_2] [sql_3] ... */ int malloc_size = (int)MAX_MEM_UNIQUE_SQL_ENTRY_COUNT * (sizeof(UniqueSQL) + unique_sql_str_len); unique_sql_array = (UniqueSQL*)palloc0_noexcept(malloc_size); if (unique_sql_array == NULL) { for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) { LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i)); } if (unique_sql_batch_list != NIL) list_free_deep(unique_sql_batch_list); ereport(ERROR, (errmsg("[UniqueSQL] palloc0 error when querying unique sql stat!"))); } total_size += malloc_size; log_unique_sql_result_mem(total_size, malloc_size, j); unique_sql_batch_list = lappend(unique_sql_batch_list, unique_sql_array); } /* copy entries to preallocated memory on source CN/DN(which accepts DBE_PERF.statement) */ *num = do_copy_entry(unique_sql_batch_list, *num); } for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) { LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i)); } if (*num == 0) { return NULL; } UniqueSQLResults *results = build_unique_sql_batch_list(unique_sql_batch_list); if (IS_PGXC_DATANODE) return results; if (!get_unique_sql_stat_from_remote(unique_sql_batch_list, *num)) { *num = 0; } return results; } static void create_tuple_entry(TupleDesc tupdesc) { int num = 0; int i = 0; TupleDescInitEntry(tupdesc, (AttrNumber)++i, "node_name", NAMEOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "node_id", INT4OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "user_name", NAMEOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "user_id", OIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "unique_sql_id", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "query", TEXTOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_calls", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "min_elapse_time", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "max_elapse_time", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "total_elapse_time", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_returned_rows", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_tuples_fetched", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_tuples_returned", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_tuples_inserted", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_tuples_updated", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_tuples_deleted", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_blocks_fetched", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_blocks_hit", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_soft_parse", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "n_hard_parse", INT8OID, -1, 0); for (num = 0; num < TOTAL_TIME_INFO_TYPES_P1; num++) { if (num == NET_SEND_TIME) { continue; } TupleDescInitEntry(tupdesc, (AttrNumber)++i, TimeInfoTypeName[num], INT8OID, -1, 0); } TupleDescInitEntry(tupdesc, (AttrNumber)++i, "NET_SEND_INFO", TEXTOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "NET_RECV_INFO", TEXTOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "NET_STREAM_SEND_INFO", TEXTOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "NET_STREAM_RECV_INFO", TEXTOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "last_updated", TIMESTAMPTZOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "sort_count", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "sort_time", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "sort_mem_used", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "sort_spill_count", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "sort_spill_size", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "hash_count", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "hash_time", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "hash_mem_used", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "hash_spill_count", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, "hash_spill_size", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)++i, TimeInfoTypeName[NET_SEND_TIME], INT8OID, -1, 0); for (num = TOTAL_TIME_INFO_TYPES_P1; num < TOTAL_TIME_INFO_TYPES; num++) { TupleDescInitEntry(tupdesc, (AttrNumber)++i, TimeInfoTypeName[num], INT8OID, -1, 0); } } static void set_tuple_cn_node_name(UniqueSQL* unique_sql, Datum* values, int* i) { // cn node name if (IS_PGXC_COORDINATOR || IS_SINGLE_NODE) { char* node_name = get_pgxc_node_name_by_node_id(unique_sql->key.cn_id, false); if (node_name != NULL) { values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum(node_name)); pfree(node_name); } else { #ifdef ENABLE_MULTIPLE_NODES values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum("*REMOVED_NODE*")); #else values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum(g_instance.attr.attr_common.PGXCNodeName)); #endif } } else { values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum("")); } } static void set_tuple_user_name(UniqueSQL* unique_sql, Datum* values, int* i) { char user_name[NAMEDATALEN] = {0}; // user name if (IS_PGXC_COORDINATOR || IS_SINGLE_NODE) { if (GetRoleName(unique_sql->key.user_id, user_name, sizeof(user_name)) != NULL) { values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum(user_name)); } else { values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum("*REMOVED_USER*")); } } else { values[(*i)++] = DirectFunctionCall1(namein, CStringGetDatum("")); } } static void set_tuple_unique_sql(UniqueSQL* unique_sql, Datum* values, int arr_size, int* i) { if (*i >= arr_size) { return; } if (unique_sql->unique_sql != NULL) { values[(*i)++] = CStringGetTextDatum(unique_sql->unique_sql); } else { values[(*i)++] = CStringGetTextDatum(""); } } static void set_tuple_value(UniqueSQL* unique_sql, Datum* values, bool* nulls, int arr_size) { int i = 0; int num = 0; set_tuple_cn_node_name(unique_sql, values, &i); values[i++] = UInt32GetDatum(unique_sql->key.cn_id); set_tuple_user_name(unique_sql, values, &i); // basic info values[i++] = ObjectIdGetDatum(unique_sql->key.user_id); values[i++] = Int64GetDatum(unique_sql->key.unique_sql_id); set_tuple_unique_sql(unique_sql, values, arr_size, &i); values[i++] = Int64GetDatum(unique_sql->calls); // response time values[i++] = Int64GetDatum(unique_sql->elapse_time.min_time); values[i++] = Int64GetDatum(unique_sql->elapse_time.max_time); values[i++] = Int64GetDatum(unique_sql->elapse_time.total_time); // row activity values[i++] = Int64GetDatum(unique_sql->row_activity.returned_rows); values[i++] = Int64GetDatum(unique_sql->row_activity.tuples_fetched); values[i++] = Int64GetDatum(unique_sql->row_activity.tuples_returned); values[i++] = Int64GetDatum(unique_sql->row_activity.tuples_inserted); values[i++] = Int64GetDatum(unique_sql->row_activity.tuples_updated); values[i++] = Int64GetDatum(unique_sql->row_activity.tuples_deleted); // cache/IO values[i++] = Int64GetDatum(unique_sql->cache_io.blocks_fetched); values[i++] = Int64GetDatum(unique_sql->cache_io.blocks_hit); // parse Info values[i++] = Int64GetDatum(unique_sql->parse.soft_parse); values[i++] = Int64GetDatum(unique_sql->parse.hard_parse); // time Info p1 for (num = 0; num < TOTAL_TIME_INFO_TYPES_P1; num++) { if (num == NET_SEND_TIME) { continue; } values[i++] = Int64GetDatum(unique_sql->timeInfo.TimeInfoArray[num]); } int idx = 0; while (idx < TOTAL_NET_INFO_TYPES) { char netInfo[STRING_MAX_LEN]; uint64 time = unique_sql->netInfo.netInfoArray[idx++]; uint64 n_calls = unique_sql->netInfo.netInfoArray[idx++]; uint64 size = unique_sql->netInfo.netInfoArray[idx++]; errno_t rc = sprintf_s(netInfo, sizeof(netInfo), "{\"time\":%lu, \"n_calls\":%lu, \"size\":%lu}", time, n_calls, size); securec_check_ss(rc, "\0", "\0"); values[i++] = CStringGetTextDatum(netInfo); } // updated_time values[i++] = TimestampTzGetDatum(unique_sql->updated_time); // sort&hash work mem Info values[i++] = Int64GetDatum(unique_sql->sort_state.counts); values[i++] = Int64GetDatum(unique_sql->sort_state.total_time); values[i++] = Int64GetDatum(unique_sql->sort_state.used_work_mem); values[i++] = Int64GetDatum(unique_sql->sort_state.spill_counts); values[i++] = Int64GetDatum(unique_sql->sort_state.spill_size); values[i++] = Int64GetDatum(unique_sql->hash_state.counts); values[i++] = Int64GetDatum(unique_sql->hash_state.total_time); values[i++] = Int64GetDatum(unique_sql->hash_state.used_work_mem); values[i++] = Int64GetDatum(unique_sql->hash_state.spill_counts); values[i++] = Int64GetDatum(unique_sql->hash_state.spill_size); // time Info values[i++] = Int64GetDatum(unique_sql->timeInfo.TimeInfoArray[NET_SEND_TIME]); for (num = TOTAL_TIME_INFO_TYPES_P1; num < TOTAL_TIME_INFO_TYPES; num++) { values[i++] = Int64GetDatum(unique_sql->timeInfo.TimeInfoArray[num]); } Assert(arr_size == i); } void set_current_batch_list_cell(FuncCallContext* funcctx) { if (funcctx->call_cntr % (int)MAX_MEM_UNIQUE_SQL_ENTRY_COUNT == 0) { if (funcctx->call_cntr == 0) { ((UniqueSQLResults *)(funcctx->user_fctx))->curr_cell = list_head((List *)(((UniqueSQLResults *)(funcctx->user_fctx))->batch_list)); } else { ((UniqueSQLResults *)(funcctx->user_fctx))->curr_cell = lnext(((UniqueSQLResults *)(funcctx->user_fctx))->curr_cell); } } } static void check_unique_sql_permission() { if (!superuser() && !isMonitoradmin(GetUserId())) { ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("only system/monitor admin can query unique sql view")))); } } static void CheckVersion() { if (t_thrd.proc->workingVersionNum < STATEMENT_TRACK_VERSION) { ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("This view cannot be select during upgrade")))); } } /* * get_instr_unique_sql - C function to get unique sql stat info */ Datum get_instr_unique_sql(PG_FUNCTION_ARGS) { FuncCallContext* funcctx = NULL; long num = 0; #define INSTRUMENTS_UNIQUE_SQL_ATTRNUM (35 + TOTAL_TIME_INFO_TYPES) CheckVersion(); check_unique_sql_permission(); if (SRF_IS_FIRSTCALL()) { MemoryContext oldcontext = NULL; TupleDesc tupdesc = NULL; funcctx = SRF_FIRSTCALL_INIT(); oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); tupdesc = CreateTemplateTupleDesc(INSTRUMENTS_UNIQUE_SQL_ATTRNUM, false); create_tuple_entry(tupdesc); funcctx->tuple_desc = BlessTupleDesc(tupdesc); funcctx->user_fctx = GetUniqueSQLStat(&num); funcctx->max_calls = num; MemoryContextSwitchTo(oldcontext); if (funcctx->max_calls == 0) { if (funcctx->user_fctx) { pfree_ext(funcctx->user_fctx); } SRF_RETURN_DONE(funcctx); } } funcctx = SRF_PERCALL_SETUP(); if (funcctx->call_cntr < funcctx->max_calls) { Datum values[INSTRUMENTS_UNIQUE_SQL_ATTRNUM]; bool nulls[INSTRUMENTS_UNIQUE_SQL_ATTRNUM] = {false}; HeapTuple tuple = NULL; int rc = 0; set_current_batch_list_cell(funcctx); UniqueSQL* batch_unique_sql = (UniqueSQL *)lfirst(((UniqueSQLResults *)(funcctx->user_fctx))->curr_cell); UniqueSQL* unique_sql = batch_unique_sql + funcctx->call_cntr % (int)MAX_MEM_UNIQUE_SQL_ENTRY_COUNT; rc = memset_s(values, sizeof(values), 0, sizeof(values)); securec_check(rc, "\0", "\0"); rc = memset_s(nulls, sizeof(nulls), 0, sizeof(nulls)); securec_check(rc, "\0", "\0"); set_tuple_value(unique_sql, values, nulls, INSTRUMENTS_UNIQUE_SQL_ATTRNUM); tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); } else { if (funcctx->user_fctx) { list_free_deep(((UniqueSQLResults *)(funcctx->user_fctx))->batch_list); pfree_ext(funcctx->user_fctx); } SRF_RETURN_DONE(funcctx); } } bool CheckSkipSQL(Query* query) { return query->utilityStmt != NULL && (IsA(query->utilityStmt, ExplainStmt) || (IsA(query->utilityStmt, RemoteQuery) && (((RemoteQuery*)query->utilityStmt)->exec_direct_type != EXEC_DIRECT_NONE))); } /* * GenerateUniqueSQLInfo - generate unique sql info * * such as unique query id/normalized unique sql string */ void GenerateUniqueSQLInfo(const char* sql, Query* query) { /* won't record unique sql info during abort transaction stat, * * below scenario also won't record unique sql stat: * - begin; * - select * from not_exist_relation;(cause abort stat) * - rollback; * * rollback is run successfully, but also will ignore it, as now * we have limitation, in abort stat, can't access system relation. * refer to the assert in method "relation_open" */ if (sql == NULL || query == NULL || g_instance.stat_cxt.UniqueSQLHashtbl == NULL || !is_local_unique_sql() || IsAbortedTransactionBlockState() || CheckSkipSQL(query) || u_sess->unique_sql_cxt.skipUniqueSQLCount != 0 || u_sess->unique_sql_cxt.skip_sql_in_open) { return; } if (u_sess->pbe_message == BIND_MESSAGE_QUERY || u_sess->pbe_message == EXECUTE_MESSAGE_QUERY || u_sess->pbe_message == EXECUTE_BATCH_MESSAGE_QUERY) { query->uniqueSQLId = u_sess->unique_sql_cxt.unique_sql_id; ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, will not be reproduced when binding", u_sess->unique_sql_cxt.unique_sql_id))); } bool checkHasTopSQL = IS_UNIQUE_SQL_TRACK_TOP && IsTopUniqueSQL() && (u_sess->unique_sql_cxt.force_generate_unique_sql == false); if (checkHasTopSQL) { ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, already has top SQL", u_sess->unique_sql_cxt.unique_sql_id))); return; } const char* current_sql = NULL; if (u_sess->unique_sql_cxt.is_multi_unique_sql) { current_sql = u_sess->unique_sql_cxt.curr_single_unique_sql; } else { current_sql = sql; } if (current_sql == NULL) { return; } if (!need_reuse_unique_sql_id(query)) { u_sess->unique_sql_cxt.unique_sql_id = generate_unique_queryid(query, current_sql); } query->uniqueSQLId = u_sess->unique_sql_cxt.unique_sql_id; u_sess->slow_query_cxt.slow_query.unique_sql_id = u_sess->unique_sql_cxt.unique_sql_id; /* dynamic enable statement tracking */ instr_stmt_dynamic_change_level(); if (!OidIsValid(u_sess->unique_sql_cxt.unique_sql_cn_id)) { Oid node_oid = get_pgxc_nodeoid(g_instance.attr.attr_common.PGXCNodeName); u_sess->unique_sql_cxt.unique_sql_cn_id = get_pgxc_node_id(node_oid); if (u_sess->globalSessionId.sessionId) { u_sess->globalSessionId.nodeId = u_sess->unique_sql_cxt.unique_sql_cn_id; pgstat_report_global_session_id(u_sess->globalSessionId); } } u_sess->unique_sql_cxt.unique_sql_user_id = GetUserId(); ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] generate unique id: %lu, cn id: %u, user id: %u", u_sess->unique_sql_cxt.unique_sql_id, u_sess->unique_sql_cxt.unique_sql_cn_id, u_sess->unique_sql_cxt.unique_sql_user_id))); UpdateUniqueSQLStat(query, current_sql, 0); instr_stmt_report_query(u_sess->unique_sql_cxt.unique_sql_id); pgstat_report_unique_sql_id(false); /* if track top enabled, only TOP SQL will generate unique sql id */ if (IS_UNIQUE_SQL_TRACK_TOP) { SetIsTopUniqueSQL(true); } } /* * unique_sql_post_parse_analyze - generate sql id */ void UniqueSq::unique_sql_post_parse_analyze(ParseState* pstate, Query* query) { Assert(IS_PGXC_COORDINATOR || IS_SINGLE_NODE); if (pstate != NULL) { /* generate unique sql id */ if (is_unique_sql_enabled()) { GenerateUniqueSQLInfo(pstate->p_sourcetext, query); } } if (t_thrd.statement_cxt.instr_prev_post_parse_analyze_hook != NULL) { ((post_parse_analyze_hook_type)(t_thrd.statement_cxt.instr_prev_post_parse_analyze_hook))(pstate, query); } } /* * get Query from query_list, and then set unique sql id to session */ static void SetLocalUniqueSQLId(List* query_list) { if (query_list == NULL || IsAbortedTransactionBlockState()) { return; } Query* query = NULL; if (list_length(query_list) > 0) { if (IsA(linitial(query_list), Query)) { query = (Query*)linitial(query_list); if (query != NULL) { u_sess->unique_sql_cxt.unique_sql_id = query->uniqueSQLId; u_sess->slow_query_cxt.slow_query.unique_sql_id = u_sess->unique_sql_cxt.unique_sql_id; ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("SetLocalUniqueSQLId %s to %lu", query->sql_statement, u_sess->unique_sql_cxt.unique_sql_id))); if (!OidIsValid(u_sess->unique_sql_cxt.unique_sql_cn_id)) { Oid node_oid = get_pgxc_nodeoid(g_instance.attr.attr_common.PGXCNodeName); u_sess->unique_sql_cxt.unique_sql_cn_id = get_pgxc_node_id(node_oid); u_sess->globalSessionId.nodeId = u_sess->unique_sql_cxt.unique_sql_cn_id; pgstat_report_global_session_id(u_sess->globalSessionId); } u_sess->unique_sql_cxt.unique_sql_user_id = GetUserId(); #ifndef ENABLE_MULTIPLE_NODES /* store the normalized uniquesq text into u_sess->Unique_sql_cxt in stage B * or E of PBE, only if auto-cleanup is enabled */ if (g_instance.attr.attr_common.enable_auto_clean_unique_sql) { u_sess->unique_sql_cxt.unique_sql_text = query->unique_sql_text; } #endif instr_stmt_report_query(u_sess->unique_sql_cxt.unique_sql_id); pgstat_report_unique_sql_id(false); /* dynamic enable statement tracking */ instr_stmt_dynamic_change_level(); /* for BE message, only can have one SQL each time, * so set top to true, then n_calls can be Updated * in PortalRun */ SetIsTopUniqueSQL(true); } } } } /* Set parameters from params */ void SetParamsFromParams(ParamListInfo params) { CHECK_STMT_HANDLE(); if (!u_sess->attr.attr_common.track_stmt_parameter || CURRENT_STMT_METRIC_HANDLE->params) { return; } /* We mustn't call user-defined I/O functions when in an aborted xact */ if (params && params->numParams > 0 && !IsAbortedTransactionBlockState()) { StringInfoData param_str; MemoryContext oldcontext; int paramno; initStringInfo(¶m_str); for (paramno = 0; paramno < params->numParams; paramno++) { ParamExternData* prm = ¶ms->params[paramno]; Oid typoutput; bool typisvarlena = false; char* pstring = NULL; char* p = NULL; appendStringInfo(¶m_str, "%s$%d = ", (paramno > 0) ? ", " : " parameters: ", paramno + 1); if (prm->isnull || !OidIsValid(prm->ptype)) { appendStringInfoString(¶m_str, "NULL"); continue; } getTypeOutputInfo(prm->ptype, &typoutput, &typisvarlena); pstring = OidOutputFunctionCall(typoutput, prm->value); appendStringInfoCharMacro(¶m_str, '\''); for (p = pstring; *p; p++) { if (*p == '\'') /* double single quotes */ appendStringInfoCharMacro(¶m_str, *p); appendStringInfoCharMacro(¶m_str, *p); } appendStringInfoCharMacro(¶m_str, '\''); pfree(pstring); } oldcontext = MemoryContextSwitchTo(u_sess->statement_cxt.stmt_stat_cxt); CURRENT_STMT_METRIC_HANDLE->params = pstrdup(param_str.data); pfree(param_str.data); if (CURRENT_STMT_METRIC_HANDLE->query) { Size len = strlen(CURRENT_STMT_METRIC_HANDLE->query) + strlen(CURRENT_STMT_METRIC_HANDLE->params) + 2; char *tmpstr = (char *)palloc(len * sizeof(char)); errno_t rc = sprintf_s(tmpstr, len, "%s;%s", CURRENT_STMT_METRIC_HANDLE->query, CURRENT_STMT_METRIC_HANDLE->params); securec_check_ss_c(rc, "\0", "\0"); pfree(CURRENT_STMT_METRIC_HANDLE->query); pfree_ext(CURRENT_STMT_METRIC_HANDLE->params); CURRENT_STMT_METRIC_HANDLE->query = tmpstr; } MemoryContextSwitchTo(oldcontext); } } /* * set current prepared statement's unique sql id * * 1, get portal by name * 2, fetch prepared statement from local prepared statement cache * 3, get prepared statement's query list * 4, for utility stmt: * only should has one Query in list(ToDo: need confirm) * 5, for other stmt: * all Query's unique sql id should be same */ void SetUniqueSQLIdFromPortal(Portal portal, CachedPlanSource* unnamed_psrc) { if (!is_unique_sql_enabled() || !is_local_unique_sql() || portal == NULL) { return; } SetParamsFromParams(portal->portalParams); List* query_list = NULL; if (portal->prepStmtName && portal->prepStmtName[0] != '\0') { /* for named prepared statement */ PreparedStatement *pstmt = FetchPreparedStatement(portal->prepStmtName, true, false); if (pstmt != NULL && pstmt->plansource != NULL) { query_list = pstmt->plansource->query_list; } } else if (unnamed_psrc != NULL) { /* for unnamed prepared statement */ query_list = unnamed_psrc->query_list; } SetLocalUniqueSQLId(query_list); ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, set unique sql id for PBE", u_sess->unique_sql_cxt.unique_sql_id))); } /* * set unique sql id from lightProxy::runMsg & bind */ void SetUniqueSQLIdFromCachedPlanSource(CachedPlanSource* cplan) { if (!is_unique_sql_enabled() || !is_local_unique_sql() || cplan == NULL) { return; } SetLocalUniqueSQLId(cplan->query_list); ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, set unique sql id for PBE", u_sess->unique_sql_cxt.unique_sql_id))); } /* set unique sql id from exec_batch_bind_execute */ void SetUniqueSQLIdInBatchBindExecute(CachedPlanSource* cplan, const ParamListInfo* params_set, int batch_count) { if (!is_unique_sql_enabled() || !is_local_unique_sql() || cplan == NULL) { return; } if (!u_sess->attr.attr_common.track_stmt_parameter) { SetLocalUniqueSQLId(cplan->query_list); } else { for (int i = 0; i < batch_count; i++) { if (CURRENT_STMT_METRIC_HANDLE != nullptr) { pfree_ext(CURRENT_STMT_METRIC_HANDLE->query); } SetParamsFromParams(params_set[i]); SetLocalUniqueSQLId(cplan->query_list); } } ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, set unique sql id for batch bind execute", u_sess->unique_sql_cxt.unique_sql_id))); } static void package_unique_sql_msg_on_remote(StringInfoData* buf, UniqueSQL* entry, uint32 recv_slot_index, uint32 recv_cn_id, Oid recv_user_id, uint64 recv_unique_sql_id) { /* now we send unique sql stat one by one, later maybe we can * send batch of unique sql stat * Send one unique sql stat each time, message format: * 1 | + 4 + 4 + 8 + 6*8 + * 'r' | + slot index + user id + unique sql id + row activity + * * 2*8 + 2*8 + TOTAL_TIME_INFO_TYPES*8 | * cache/IO + parse info + time info | */ pq_beginmessage(buf, 'r'); /* send back slot indexes(from original CN) */ pq_sendint(buf, recv_slot_index, sizeof(uint32)); /* key */ pq_sendint(buf, recv_cn_id, sizeof(uint32)); pq_sendint(buf, recv_user_id, sizeof(uint32)); pq_sendint64(buf, recv_unique_sql_id); /* row activity */ pq_sendint64(buf, entry->row_activity.returned_rows); pq_sendint64(buf, entry->row_activity.tuples_fetched); pq_sendint64(buf, entry->row_activity.tuples_returned); pq_sendint64(buf, entry->row_activity.tuples_inserted); pq_sendint64(buf, entry->row_activity.tuples_updated); pq_sendint64(buf, entry->row_activity.tuples_deleted); /* Cache/IO */ pq_sendint64(buf, entry->cache_io.blocks_fetched); pq_sendint64(buf, entry->cache_io.blocks_hit); /* parse info */ pq_sendint64(buf, entry->parse.soft_parse); pq_sendint64(buf, entry->parse.hard_parse); /* time Info */ for (uint32 idx = 0; idx < TOTAL_TIME_INFO_TYPES; idx++) { pq_sendint64(buf, entry->timeInfo.TimeInfoArray[idx]); } /* net info */ for (uint32 idx = 0; idx < TOTAL_NET_INFO_TYPES; idx++) { pq_sendint64(buf, entry->netInfo.netInfoArray[idx]); } /* send latest updated time */ if (t_thrd.proc->workingVersionNum >= STATEMENT_TRACK_VERSION) { pq_sendint64(buf, (uint64)entry->updated_time); /* sort hash work mem info */ pq_sendint64(buf, entry->sort_state.counts); pq_sendint64(buf, entry->sort_state.total_time); pq_sendint64(buf, entry->sort_state.used_work_mem); pq_sendint64(buf, entry->sort_state.spill_counts); pq_sendint64(buf, entry->sort_state.spill_size); pq_sendint64(buf, entry->hash_state.counts); pq_sendint64(buf, entry->hash_state.total_time); pq_sendint64(buf, entry->hash_state.used_work_mem); pq_sendint64(buf, entry->hash_state.spill_counts); pq_sendint64(buf, entry->hash_state.spill_size); } /* ... */ pq_endmessage(buf); } static void reset_reply_resource_owner_and_ctx(ResourceOwner old_cur_owner, MemoryContext old_ctx, bool is_commit) { ResourceOwnerRelease(t_thrd.utils_cxt.CurrentResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, is_commit, true); ResourceOwnerRelease(t_thrd.utils_cxt.CurrentResourceOwner, RESOURCE_RELEASE_LOCKS, is_commit, true); ResourceOwnerRelease(t_thrd.utils_cxt.CurrentResourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, is_commit, true); ResourceOwner tmpOwner = t_thrd.utils_cxt.CurrentResourceOwner; t_thrd.utils_cxt.CurrentResourceOwner = old_cur_owner; ResourceOwnerDelete(tmpOwner); (void)MemoryContextSwitchTo(old_ctx); } /* * ReplyUniqueSQLsStat - DN reply unique SQL IDs stat info */ void ReplyUniqueSQLsStat(StringInfo msg, uint32 count) { ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] get unique sql count: %u", count))); ResourceOwner old_cur_owner = t_thrd.utils_cxt.CurrentResourceOwner; MemoryContext old_ctx = MemoryContextSwitchTo(t_thrd.mem_cxt.msg_mem_cxt); t_thrd.utils_cxt.CurrentResourceOwner = ResourceOwnerCreate(NULL, "ReplyUniqueSQL", THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_DFX)); PG_TRY(); { check_unique_sql_permission(); } PG_CATCH(); { reset_reply_resource_owner_and_ctx(old_cur_owner, old_ctx, false); PG_RE_THROW(); } PG_END_TRY(); reset_reply_resource_owner_and_ctx(old_cur_owner, old_ctx, true); if (count > 0 && count < UINT_MAX) { StringInfoData buf; for (uint32 i = 0; i < count; i++) { uint32 recv_slot_index = (uint32)pq_getmsgint(msg, sizeof(uint32)); uint32 recv_cn_id = (uint32)pq_getmsgint(msg, sizeof(uint32)); Oid recv_user_id = (Oid)pq_getmsgint(msg, sizeof(uint32)); uint64 recv_unique_sql_id = (uint64)pq_getmsgint64(msg); ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] get cn_id: %u, user id: %u, unique sql id: %lu", recv_cn_id, recv_user_id, recv_unique_sql_id))); // get unique sql entry from HTAB UniqueSQLKey key = {0, 0}; UniqueSQL* entry = NULL; key.unique_sql_id = recv_unique_sql_id; key.user_id = recv_user_id; key.cn_id = recv_cn_id; uint32 hashCode = uniqueSQLHashCode(&key, sizeof(key)); (void)LockUniqueSQLHashPartition(hashCode, LW_SHARED); entry = (UniqueSQL*)hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &key, HASH_FIND, NULL); if (entry != NULL) { package_unique_sql_msg_on_remote( &buf, entry, recv_slot_index, recv_cn_id, recv_user_id, recv_unique_sql_id); } UnlockUniqueSQLHashPartition(hashCode); } pq_beginmessage(&buf, 'f'); pq_endmessage(&buf); pq_flush(); if (buf.data) { pfree(buf.data); } } else { ereport(ERROR, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] recv invalid sql ids count"))); } } /* * PrintPgStatTableCounter - print current PgStat_TableCounts * * L - last total stat counter after exit pgstat_report_stat * T - the time when entering pgstat_report_stat * C - current sql row stat counter * for debug purposes */ void PrintPgStatTableCounter(char type, PgStat_TableCounts* stat) { if (!is_unique_sql_enabled() || !IS_PGXC_DATANODE) { return; } ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, row stat counter[%c] - " "Returned: %ld, Fetched: %ld, " "Inserted: %ld, Updated: %ld, Deleted: %ld," "Blocks_Fetched: %ld, Blocks_Hit: %ld", u_sess->unique_sql_cxt.unique_sql_id, type, stat->t_tuples_returned, stat->t_tuples_fetched, stat->t_tuples_inserted, stat->t_tuples_updated, stat->t_tuples_deleted, stat->t_blocks_fetched, stat->t_blocks_hit))); } /* * UpdateUniqueSQLStatOnRmote - on dn/cn, update unique sql stat, * including row activity, Cache/IO, parse counter, etc * * main unique sql update entry: * CN -> CN/DN (DDL) * CN -> DN (DML) * * need consider below scenarios: * 1, simple query(pgstat_report_stat will clean pgStatTabList * by calling frequence) * 2, transaction block, such as: * begin; * insert xxxx; * delete XXX; * commit; * * main implementation logic: * - update last stat counter * - new sql enter * - current sql stat counter = current stat counter - last stat counter * - update unique sql HTAB stat * - update last stat counter * - ... * Notes: * we reuse pgStatTabList, as each query's row activity is stored in it. */ void UpdateUniqueSQLStatOnRemote() { Assert(need_update_unique_sql_row_stat()); Assert(u_sess->attr.attr_resource.enable_resource_track && (u_sess->attr.attr_common.instr_unique_sql_count > 0)); ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] unique id: %lu, update " "unique sql stat on remote", u_sess->unique_sql_cxt.unique_sql_id))); /* * Some SQLs doesn't have unique sql id on DN, * for example: * on CN: gsql - create table xxxx(generate unique sql ID) * on DN will be: * START TRANSACTION ISOLATION LEVEL read committed READ WRITE * create table xxx(with unique SQL ID) * PREPARE transaction 'xxx' * COMMIT PREPARED 'xxx' * we now only collect the stat counter for the create statement. * so need update last stat counter for 'START TRANSACTION...' */ if (u_sess->unique_sql_cxt.unique_sql_id != 0) { if (CalcSQLRowStatCounter( u_sess->unique_sql_cxt.last_stat_counter, u_sess->unique_sql_cxt.current_table_counter)) { UpdateUniqueSQLStat(NULL, NULL, 0, u_sess->unique_sql_cxt.current_table_counter); } else { /* update parse info */ UpdateUniqueSQLStat(NULL, NULL, 0, NULL); } } } /* is unique sql enabled * * status = enable_resource_track + unique sql on/off GUI parameter */ bool is_unique_sql_enabled() { return u_sess->attr.attr_resource.enable_resource_track && (u_sess->attr.attr_common.instr_unique_sql_count > 0); } /* by default, instr_unique_sql_count is 0, * when unique sql is "set instr_unique_sql_count = 100", * then won't generate unique sql id firstly(GUC/false), but * when the sql enter UpdateUniqueSQLStat, is_unique_sql_enabled() * will be true, then assert unique_sql_id != 0 will be failed in * UpdateUniqueSQLStat. * * so in this case, we won't call UpdateUniqueSQLStat */ bool IsNeedUpdateUniqueSQLStat(Portal portal) { if (u_sess->unique_sql_cxt.unique_sql_id == 0) { if (portal != NULL && portal->stmts != NULL && list_length(portal->stmts) == 1 && nodeTag(linitial(portal->stmts)) == T_VariableSetStmt) { VariableSetStmt* stmt = (VariableSetStmt*)(linitial(portal->stmts)); if (stmt->name && strcmp(stmt->name, "instr_unique_sql_count") == 0) { return false; } } } return true; } /* setter and getter for is_top_unique_sql */ void SetIsTopUniqueSQL(bool value) { u_sess->unique_sql_cxt.is_top_unique_sql = value; } bool IsTopUniqueSQL() { return u_sess->unique_sql_cxt.is_top_unique_sql; } /* return GUC unique sql tracking type */ int GetUniqueSQLTrackType() { return u_sess->attr.attr_common.unique_sql_track_type; } /* if CN and not connection from other CN, it is local unique SQL, * else is remote unique SQL */ bool is_local_unique_sql() { if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { return true; } if (IS_SINGLE_NODE) { return true; } return false; } bool need_update_unique_sql_row_stat() { if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { return false; } return true; } bool need_normalize_unique_string() { if (IS_PGXC_COORDINATOR || IS_SINGLE_NODE) { return true; } return false; } enum CleanType { INVALIDTYPE = 0, ALL, BY_GUC, BY_USERID, BY_CNID }; /* WDR will take snapshot of unique sql, reset opration * will remove unique sql entry from hash table. * during two snapshot, there will be 'reset opration', * then we cannot generate report between two snapshots. */ static void UpdateUniqueSQLValidStatTimestamp() { gs_lock_test_and_set_64(&g_instance.stat_cxt.NodeStatResetTime, GetCurrentTimestamp()); } static void CleanupAllUniqueSqlEntry() { int i; UniqueSQL* entry = NULL; HASH_SEQ_STATUS hash_seq; for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) { LWLockAcquire(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i), LW_EXCLUSIVE); } /* remove entry, need update valid stat timestamp */ UpdateUniqueSQLValidStatTimestamp(); HTAB *old = g_instance.stat_cxt.UniqueSQLHashtbl; MemoryContext oldcxt = CurrentMemoryContext; uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; PG_TRY(); { InitUniqueSQL(); } PG_CATCH(); { /* errfinish will reset InterruptHoldoffCount, then LWLockRelease will be cored */ t_thrd.int_cxt.InterruptHoldoffCount = saveInterruptHoldoffCount; (void)MemoryContextSwitchTo(oldcxt); ErrorData* edata = NULL; edata = CopyErrorData(); FlushErrorState(); /* hash create failed, reuse the old hash table */ g_instance.stat_cxt.UniqueSQLHashtbl = old; old = NULL; ereport(WARNING, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] re-create hash failed, reason: '%s', so using old one", edata->message))); /* remove the allocated memory which throw by hash_create */ if (t_thrd.dyhash_cxt.CurrentDynaHashCxt != NULL && strcmp(t_thrd.dyhash_cxt.CurrentDynaHashCxt->name, UNIQUE_SQL_HASH_TBL) == 0) { MemoryContextDelete(t_thrd.dyhash_cxt.CurrentDynaHashCxt); } } PG_END_TRY(); if (old != NULL) { hash_destroy(old); } else { /* reuse old one, clean entry but still not return memory to system */ hash_seq_init(&hash_seq, g_instance.stat_cxt.UniqueSQLHashtbl); while ((entry = (UniqueSQL*)hash_seq_search(&hash_seq)) != NULL) { hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &entry->key, HASH_REMOVE, NULL); } } for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) { LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i)); } } static List* GetRemoveEntryList(int cleanType, uint64 cleanValue) { List* entryList = NIL; UniqueSQL* entry = NULL; HASH_SEQ_STATUS hash_seq; int i; for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) { LWLockAcquire(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i), LW_SHARED); } hash_seq_init(&hash_seq, g_instance.stat_cxt.UniqueSQLHashtbl); while ((entry = (UniqueSQL*)hash_seq_search(&hash_seq)) != NULL) { if ((cleanType == BY_USERID && entry->key.user_id == (Oid)cleanValue) || (cleanType == BY_CNID && entry->key.cn_id == (uint32)cleanValue)) { UniqueSQLKey* key = (UniqueSQLKey*)palloc0_noexcept(sizeof(UniqueSQLKey)); if (key == NULL) { for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) { LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i)); } if (entryList != NIL) { list_free(entryList); } ereport(ERROR, (errmodule(MOD_INSTR), errmsg("out of memory during allocating list element."))); } key->cn_id = entry->key.cn_id; key->user_id = entry->key.user_id; key->unique_sql_id = entry->key.unique_sql_id; entryList = lappend(entryList, key); } } for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) { LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i)); } return entryList; } static void CleanupInstrUniqueSqlEntry(int cleanType, uint64 cleanValue) { ListCell* cell = NULL; List* removeList = NIL; if (g_instance.stat_cxt.UniqueSQLHashtbl == NULL) { ereport(LOG, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("unique sql hashtable is NULL")))); return; } if (cleanType == ALL) { CleanupAllUniqueSqlEntry(); } else { removeList = GetRemoveEntryList(cleanType, cleanValue); if (removeList != NIL) { foreach (cell, removeList) { UniqueSQLKey* key = (UniqueSQLKey*)lfirst(cell); uint32 hashCode = uniqueSQLHashCode(key, sizeof(UniqueSQLKey)); (void)LockUniqueSQLHashPartition(hashCode, LW_EXCLUSIVE); /* remove entry, need update valid stat timestamp */ UpdateUniqueSQLValidStatTimestamp(); hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, key, HASH_REMOVE, NULL); UnlockUniqueSQLHashPartition(hashCode); } list_free(removeList); } } } static int CheckParameter(const char* global, const char* cleanType, int64 value, bool* isGlobal) { int cleantype = INVALIDTYPE; if (!is_unique_sql_enabled()) { ereport(LOG, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("unique sql is disabled")))); return INVALIDTYPE; } if (strcasecmp(global, "GLOBAL") == 0) { *isGlobal = true; if (!IS_PGXC_COORDINATOR && !IS_SINGLE_NODE) { ereport(ERROR, (errcode(ERRCODE_WARNING), (errmsg("Cleanup global unique sql info only support on CN nodes.")))); } } else if (strcasecmp(global, "LOCAL") == 0) { *isGlobal = false; } else { ereport(ERROR, (errcode(ERRCODE_WARNING), (errmsg("First parameter is wrong. USAGE: [GLOBAL/LOCAL],[ALL/BY_USERID/BY_CNID],[VALUE]")))); } if (strcasecmp(cleanType, "ALL") == 0) { cleantype = ALL; } else if (strcasecmp(cleanType, "BY_GUC") == 0) { if (AmWLMWorkerProcess()) { cleantype = BY_GUC; } else { ereport(WARNING, (errmsg("[UniqueSQL] BY_GUC is only used by GUC setting, use [ALL/BY_USERID/BY_CNID]"))); return INVALIDTYPE; } } else if (strcasecmp(cleanType, "BY_USERID") == 0) { if (value <= 0 || value > MAX_UINT32) { ereport(WARNING, (errmsg("[UniqueSQL] third parameter of reset unique sql is out of range with BY_USERID"))); return INVALIDTYPE; } cleantype = BY_USERID; } else if (strcasecmp(cleanType, "BY_CNID") == 0) { if (value <= INT_MIN || value > MAX_UINT32) { ereport(WARNING, (errmsg("[UniqueSQL] third parameter of reset unique sql is out of range with BY_CNID"))); return INVALIDTYPE; } cleantype = BY_CNID; } else { ereport(ERROR, (errcode(ERRCODE_WARNING), (errmsg("Second parameter is wrong. USAGE:[GLOBAL/LOCAL],[ALL/BY_USERID/BY_CNID],[VALUE]")))); } return cleantype; } #ifndef ENABLE_MULTIPLE_NODES /* * AutoRecycleUniqueSQLEntry * This function is called when the hash table is full, and then * randomly cleans a certain number of unique SQL in the hash table, * which is instr_unique_sql_count * ratio + the bloated number. * return: * true - successful or there is still free space * fasle - failed */ static bool AutoRecycleUniqueSQLEntry() { LWLockAcquire(UniqueSqlEvictLock, LW_EXCLUSIVE); /* Clean is not needed if there is still free space in unique SQL hash table. */ long totalCount = hash_get_num_entries(g_instance.stat_cxt.UniqueSQLHashtbl); if (totalCount < u_sess->attr.attr_common.instr_unique_sql_count - 1) { LWLockRelease(UniqueSqlEvictLock); ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] There is still free space in unique SQL hash table and no need to clean it up."))); return true; } int instr_unique_sql_count = u_sess->attr.attr_common.instr_unique_sql_count; /* If the number of entries is too large, it may cause the problem of * applying for large memory. Aotu-cleanup is not performed in this situation. * maxEntryNum - The maximum number of KeyUpdatedtime that 1G memory can store. */ long maxEntryNum = long(1024 * 1024 * 1024) / sizeof(KeyUpdatedtime); if (totalCount >= maxEntryNum) { LWLockRelease(UniqueSqlEvictLock); ereport(WARNING, (errmodule(MOD_INSTR), errcode(ERRCODE_LOG), errmsg("[UniqueSQL] instr_unique_sql_count is too large, uniquesql auto-clean will not happen."))); return false; } const double cleanRatio = 0.1; int cleanCount = Max(int(cleanRatio * instr_unique_sql_count + (totalCount - instr_unique_sql_count)), 1); /* get remove entry list */ KeyUpdatedtime* removeList = GetSortedEntryList(); if (removeList == NULL) { LWLockRelease(UniqueSqlEvictLock); return false; } /* clean */ for (int i = 0; i < cleanCount; ++i) { UniqueSQLKey* key = &(removeList[i].key); uint32 hashCode = uniqueSQLHashCode(key, sizeof(UniqueSQLKey)); (void)LockUniqueSQLHashPartition(hashCode, LW_EXCLUSIVE); /* remove entry, need update valid stat timestamp */ UpdateUniqueSQLValidStatTimestamp(); hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, key, HASH_REMOVE, NULL); UnlockUniqueSQLHashPartition(hashCode); } pfree(removeList); LWLockRelease(UniqueSqlEvictLock); ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] Auto-cleanup over, %d uniquesqls are recycled.", cleanCount))); return true; } /* * GetSortedEntryList * get all of entry, and sort them by entry's updated_time */ static KeyUpdatedtime* GetSortedEntryList() { UniqueSQL* entry = NULL; HASH_SEQ_STATUS hash_seq; int j = 0; int i; for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) { LWLockAcquire(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i), LW_SHARED); } long totalCount = hash_get_num_entries(g_instance.stat_cxt.UniqueSQLHashtbl); KeyUpdatedtime* removeList = NULL; removeList = (KeyUpdatedtime*)palloc0_noexcept(totalCount * sizeof(KeyUpdatedtime)); if (removeList == NULL) { for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) { LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i)); } return NULL; } hash_seq_init(&hash_seq, g_instance.stat_cxt.UniqueSQLHashtbl); while ((entry = (UniqueSQL*)hash_seq_search(&hash_seq)) != NULL) { KeyUpdatedtime keyUpdatedtime; keyUpdatedtime.key.cn_id = entry->key.cn_id; keyUpdatedtime.key.user_id = entry->key.user_id; keyUpdatedtime.key.unique_sql_id = entry->key.unique_sql_id; keyUpdatedtime.updated_time = entry->updated_time; removeList[j++] = keyUpdatedtime; } for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) { LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i)); } qsort((void*)removeList, j, sizeof(KeyUpdatedtime), KeyUpdatedtimeCmp); return removeList; } static int KeyUpdatedtimeCmp(const void* a, const void* b) { const KeyUpdatedtime* i1 = (const KeyUpdatedtime*)a; const KeyUpdatedtime* i2 = (const KeyUpdatedtime*)b; if (i1->updated_time < i2->updated_time) return -1; else if (i1->updated_time == i2->updated_time) return 0; else return 1; } #endif /* Reset unique sql stat info for the current database */ Datum reset_unique_sql(PG_FUNCTION_ARGS) { int cleantype; ParallelFunctionState* state = NULL; bool isGlobal = false; char* global = text_to_cstring(PG_GETARG_TEXT_PP(0)); char* cleanType = text_to_cstring(PG_GETARG_TEXT_PP(1)); int64 cleanValue = PG_GETARG_INT64(2); if (!superuser() && !isMonitoradmin(GetUserId())) { ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("only system/monitor admin can reset unique sql")))); } cleantype = CheckParameter(global, cleanType, cleanValue, &isGlobal); if (cleantype == INVALIDTYPE) { PG_RETURN_INT64(0); } ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] clean unique sql, " "clean scope: %s, clean type: %s, clean value: %lu!", global, cleanType, (uint64)cleanValue))); if (cleantype == BY_GUC) { cleantype = BY_CNID; cleanType = "BY_CNID"; Oid node_oid = get_pgxc_nodeoid(g_instance.attr.attr_common.PGXCNodeName); cleanValue = get_pgxc_node_id(node_oid); } CleanupInstrUniqueSqlEntry(cleantype, cleanValue); if (isGlobal && !IS_SINGLE_NODE) { StringInfoData buf; ExecNodes* exec_nodes = (ExecNodes*)makeNode(ExecNodes); exec_nodes->baselocatortype = LOCATOR_TYPE_HASH; exec_nodes->accesstype = RELATION_ACCESS_READ; exec_nodes->primarynodelist = NIL; exec_nodes->en_expr = NULL; exec_nodes->en_relid = InvalidOid; exec_nodes->nodeList = NIL; initStringInfo(&buf); appendStringInfo(&buf, "SELECT pg_catalog.reset_unique_sql('LOCAL','%s',%ld);", cleanType, cleanValue); state = RemoteFunctionResultHandler(buf.data, exec_nodes, NULL, true, EXEC_ON_ALL_NODES, true); FreeParallelFunctionState(state); pfree_ext(buf.data); } ereport(LOG, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] clean unique sql finished."))); PG_RETURN_INT64(1); } /* * Reset session variables before/after each unique sql */ void ResetCurrentUniqueSQL(bool need_reset_cn_id) { u_sess->unique_sql_cxt.unique_sql_id = 0; u_sess->unique_sql_cxt.unique_sql_user_id = InvalidOid; if (need_reset_cn_id) { u_sess->unique_sql_cxt.unique_sql_cn_id = InvalidOid; } /* multi query only used in exec_simple_query */ u_sess->unique_sql_cxt.is_multi_unique_sql = false; u_sess->unique_sql_cxt.curr_single_unique_sql = NULL; u_sess->unique_sql_cxt.multi_sql_offset = 0; u_sess->unique_sql_cxt.need_update_calls = true; /* used when nested portal calling case */ u_sess->unique_sql_cxt.portal_nesting_level = 0; #ifndef ENABLE_MULTIPLE_NODES u_sess->unique_sql_cxt.unique_sql_text = NULL; #endif u_sess->unique_sql_cxt.skipUniqueSQLCount = 0; /* used in open cursor case, open cursor will generate sub sqls, * but fetch's select SQL may not be the first child, so we * generate all sql's unique sql id */ u_sess->unique_sql_cxt.force_generate_unique_sql = false; } void FindUniqueSQL(UniqueSQLKey key, char* unique_sql) { errno_t rc = 0; uint32 hashCode = uniqueSQLHashCode(&key, sizeof(key)); (void)LockUniqueSQLHashPartition(hashCode, LW_SHARED); /* step 2. find the unique query from UniqueSQLHashtbl, then insert into ASHUniqueSQLHashtbl */ UniqueSQL *entry = (UniqueSQL*)hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &key, HASH_FIND, NULL); if (entry == NULL) { rc = strcpy_s(unique_sql, UNIQUE_SQL_MAX_LEN, ""); securec_check(rc, "\0", "\0"); } else { rc = strcpy_s(unique_sql, UNIQUE_SQL_MAX_LEN, entry->unique_sql); securec_check(rc, "\0", "\0"); } UnlockUniqueSQLHashPartition(hashCode); } char* FindCurrentUniqueSQL() { const char* hint = "/* missing SQL statement, GUC instr_unique_sql_count is too small. */"; char *unique_query = NULL; UniqueSQLKey key; key.unique_sql_id = u_sess->unique_sql_cxt.unique_sql_id; key.cn_id = u_sess->unique_sql_cxt.unique_sql_cn_id; key.user_id = u_sess->unique_sql_cxt.unique_sql_user_id; uint32 hashCode = uniqueSQLHashCode(&key, sizeof(key)); (void)LockUniqueSQLHashPartition(hashCode, LW_SHARED); UniqueSQL *entry = (UniqueSQL*)hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &key, HASH_FIND, NULL); if (entry != NULL && entry->unique_sql != NULL) { unique_query = pstrdup(entry->unique_sql); } UnlockUniqueSQLHashPartition(hashCode); if (unique_query == NULL) { unique_query = pstrdup(hint); } return unique_query; } static bool need_reuse_unique_sql_id(Query *query) { /* for fetch statement, need to reuse unique sql id from source cursor SQL */ if (query->commandType == CMD_UTILITY && query->utilityStmt != NULL && IsA(query->utilityStmt, FetchStmt)) { FetchStmt *fetch_stmt = (FetchStmt*)query->utilityStmt; if (fetch_stmt->portalname != NULL) { Portal fetch_stmt_portal = GetPortalByName(fetch_stmt->portalname); if (PortalIsValid(fetch_stmt_portal) && fetch_stmt_portal->queryDesc != NULL && fetch_stmt_portal->queryDesc->plannedstmt != NULL && fetch_stmt_portal->queryDesc->plannedstmt->uniqueSQLId != 0) { u_sess->unique_sql_cxt.unique_sql_id = fetch_stmt_portal->queryDesc->plannedstmt->uniqueSQLId; ereport(DEBUG1, (errmodule(MOD_INSTR), errmsg("[UniqueSQL] use cursor SQL's unique id: %lu", u_sess->unique_sql_cxt.unique_sql_id))); return true; } } } return false; } bool is_instr_top_portal() { return u_sess->unique_sql_cxt.portal_nesting_level == 1; } void increase_instr_portal_nesting_level() { if (is_local_unique_sql()) { u_sess->unique_sql_cxt.portal_nesting_level++; } } void decrease_instr_portal_nesting_level() { if (is_local_unique_sql()) { u_sess->unique_sql_cxt.portal_nesting_level--; } } static void instr_unique_sql_handle_multi_sql_time_info() { timeInfoRecordEnd(); timeInfoRecordStart(); } /* handle multi sql case in exec_simple_query */ void instr_unique_sql_handle_multi_sql(bool is_first_parsetree) { if (!is_first_parsetree) { statement_commit_metirc_context(); statement_init_metric_context(); instr_stmt_report_start_time(); } if (is_local_unique_sql()) { if (IS_SINGLE_NODE || IS_PGXC_COORDINATOR) { u_sess->debug_query_id = generate_unique_id64(>_queryId); pgstat_report_queryid(u_sess->debug_query_id); } /* reset unique_sql */ if (is_unique_sql_enabled()) { if (!is_first_parsetree) { instr_unique_sql_handle_multi_sql_time_info(); } ResetCurrentUniqueSQL(); u_sess->unique_sql_cxt.unique_sql_start_time = (u_sess->unique_sql_cxt.unique_sql_start_time > 0) ? GetCurrentTimestamp() : GetCurrentStatementLocalStartTimestamp(); /* * INSTR: when track type is TOP, before query to start, * we reset is_top_unique_sql to false */ if (IS_UNIQUE_SQL_TRACK_TOP) SetIsTopUniqueSQL(false); /* reset unique sql returned rows(SELECT) */ UniqueSQLStatCountResetReturnedRows(); UniqueSQLStatCountResetParseCounter(); } } } /* report unique sql elapse time and n_calls */ void instr_unique_sql_report_elapse_time(int64 elapse_start_time) { UpdateUniqueSQLStat(NULL, NULL, elapse_start_time); } /* If condition is true, set u_sess->unique_sql_cxt.unique_sql_start_time to val1, else to val2. */ void instr_unique_sql_set_start_time(bool condition, int64 val1, int64 val2) { if (is_local_unique_sql() && is_unique_sql_enabled()) { u_sess->unique_sql_cxt.unique_sql_start_time = condition ? val1 : val2; } } void instr_unique_sql_reset_start_time() { u_sess->unique_sql_cxt.unique_sql_start_time = 0; }