支持uniqueSQL自动淘汰
修复bug 修复bug 修复bug 修改检视意见
This commit is contained in:
@ -88,6 +88,18 @@ typedef struct {
|
||||
ListCell *curr_cell;
|
||||
} UniqueSQLResults;
|
||||
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
/* The mapping of UniqueSQLKey to updated_time */
|
||||
typedef struct {
|
||||
UniqueSQLKey key;
|
||||
TimestampTz updated_time; /* latest update time for the unique sql entry */
|
||||
} KeyUpdatedtime;
|
||||
|
||||
static KeyUpdatedtime* GetSortedEntryList();
|
||||
static int KeyUpdatedtimeCmp(const void* a, const void* b);
|
||||
static bool AutoRecycleUniqueSQLEntry();
|
||||
#endif
|
||||
|
||||
/* ---------Thread Local Variable---------- */
|
||||
/* save prev-hooks */
|
||||
static post_parse_analyze_hook_type g_prev_post_parse_analyze_hook = NULL;
|
||||
@ -648,10 +660,10 @@ void UpdateUniqueSQLStat(Query* query, const char* sql, int64 elapse_start_time,
|
||||
* 2, on local CN, if entry not existed, only can insert new entry when query(Query *)
|
||||
* is not NULL
|
||||
*/
|
||||
#ifdef ENABLE_MULTIPLE_NODES
|
||||
if (is_local_unique_sql() && query == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* control unique sql number by instr_unique_sql_count. */
|
||||
long totalCount = hash_get_num_entries(g_instance.stat_cxt.UniqueSQLHashtbl);
|
||||
if (totalCount >= u_sess->attr.attr_common.instr_unique_sql_count) {
|
||||
@ -659,6 +671,25 @@ void UpdateUniqueSQLStat(Query* query, const char* sql, int64 elapse_start_time,
|
||||
(errmodule(MOD_INSTR), errmsg("[UniqueSQL] Failed to insert unique sql for up to limit")));
|
||||
return;
|
||||
}
|
||||
#else
|
||||
if (is_local_unique_sql() && query == NULL && u_sess->unique_sql_cxt.unique_sql_text == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* control unique sql number by instr_unique_sql_count. */
|
||||
long totalCount = hash_get_num_entries(g_instance.stat_cxt.UniqueSQLHashtbl);
|
||||
if (totalCount >= u_sess->attr.attr_common.instr_unique_sql_count) {
|
||||
if (g_instance.attr.attr_common.unique_sql_clean_ratio != 0) {
|
||||
if (!AutoRecycleUniqueSQLEntry()) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
ereport(DEBUG2,
|
||||
(errmodule(MOD_INSTR), errmsg("[UniqueSQL] Failed to insert unique sql for up to limit")));
|
||||
return;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
(void)LockUniqueSQLHashPartition(hashCode, LW_EXCLUSIVE);
|
||||
entry = (UniqueSQL*)hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, &key, HASH_ENTER, &found);
|
||||
@ -670,7 +701,21 @@ void UpdateUniqueSQLStat(Query* query, const char* sql, int64 elapse_start_time,
|
||||
|
||||
if (!found) {
|
||||
resetUniqueSQLEntry(entry);
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
if (query == NULL && u_sess->unique_sql_cxt.unique_sql_text != NULL) {
|
||||
entry->unique_sql = (char*)(entry + 1);
|
||||
errno_t rc = memset_s(entry->unique_sql, UNIQUE_SQL_MAX_LEN, 0, UNIQUE_SQL_MAX_LEN);
|
||||
securec_check(rc, "\0", "\0");
|
||||
entry->is_local = true;
|
||||
int unique_sql_textLen = strlen(u_sess->unique_sql_cxt.unique_sql_text);
|
||||
rc = memcpy_s(entry->unique_sql, UNIQUE_SQL_MAX_LEN, u_sess->unique_sql_cxt.unique_sql_text, unique_sql_textLen);
|
||||
securec_check(rc, "\0", "\0");
|
||||
} else {
|
||||
set_unique_sql_string_in_entry(entry, query, sql, u_sess->unique_sql_cxt.multi_sql_offset);
|
||||
}
|
||||
#else
|
||||
set_unique_sql_string_in_entry(entry, query, sql, u_sess->unique_sql_cxt.multi_sql_offset);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
@ -1758,6 +1803,14 @@ static void SetLocalUniqueSQLId(List* query_list)
|
||||
}
|
||||
|
||||
u_sess->unique_sql_cxt.unique_sql_user_id = GetUserId();
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
/* store the normalized uniquesq text into u_sess->Unique_sql_cxt in stage B
|
||||
* or E of PBE, only if auto-cleanup is enabled
|
||||
*/
|
||||
if (g_instance.attr.attr_common.unique_sql_clean_ratio != 0) {
|
||||
u_sess->unique_sql_cxt.unique_sql_text = query->unique_sql_text;
|
||||
}
|
||||
#endif
|
||||
instr_stmt_report_query(u_sess->unique_sql_cxt.unique_sql_id);
|
||||
|
||||
/* for BE message, only can have one SQL each time,
|
||||
@ -2337,6 +2390,116 @@ static int CheckParameter(const char* global, const char* cleanType, int64 value
|
||||
return cleantype;
|
||||
}
|
||||
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
/*
|
||||
* AutoRecycleUniqueSQLEntry
|
||||
* This function is called when the hash table is full, and then
|
||||
* randomly cleans a certain number of unique SQL in the hash table,
|
||||
* which is instr_unique_sql_count * ratio + the bloated number.
|
||||
* return:
|
||||
* true - successful or there is still free space
|
||||
* fasle - failed
|
||||
*/
|
||||
static bool AutoRecycleUniqueSQLEntry()
|
||||
{
|
||||
LWLockAcquire(UniqueSqlEvictLock, LW_EXCLUSIVE);
|
||||
/* Clean is not needed if there is still free space in unique SQL hash table. */
|
||||
long totalCount = hash_get_num_entries(g_instance.stat_cxt.UniqueSQLHashtbl);
|
||||
if (totalCount < u_sess->attr.attr_common.instr_unique_sql_count - 1) {
|
||||
LWLockRelease(UniqueSqlEvictLock);
|
||||
ereport(DEBUG1,
|
||||
(errmodule(MOD_INSTR), errmsg("[UniqueSQL] There is still free space in unique SQL hash table and no need to clean it up.")));
|
||||
return true;
|
||||
}
|
||||
int instr_unique_sql_count = u_sess->attr.attr_common.instr_unique_sql_count;
|
||||
/* If the number of entries is too large, it may cause the problem of
|
||||
* applying for large memory. Aotu-cleanup is not performed in this situation.
|
||||
* maxEntryNum - The maximum number of KeyUpdatedtime that 1G memory can store.
|
||||
*/
|
||||
long maxEntryNum = long(1024 * 1024 * 1024) / sizeof(KeyUpdatedtime);
|
||||
if (totalCount >= maxEntryNum) {
|
||||
LWLockRelease(UniqueSqlEvictLock);
|
||||
ereport(WARNING,
|
||||
(errmodule(MOD_INSTR), errcode(ERRCODE_LOG), errmsg("[UniqueSQL] instr_unique_sql_count is too large, uniquesql auto-clean will not happen.")));
|
||||
return false;
|
||||
}
|
||||
double ratio = g_instance.attr.attr_common.unique_sql_clean_ratio;
|
||||
int cleanCount = Max(int(ratio * instr_unique_sql_count + totalCount - instr_unique_sql_count), 1);
|
||||
/* get remove entry list */
|
||||
KeyUpdatedtime* removeList = GetSortedEntryList();
|
||||
if (removeList == NULL) {
|
||||
LWLockRelease(UniqueSqlEvictLock);
|
||||
return false;
|
||||
}
|
||||
/* clean */
|
||||
for (int i = 0; i < cleanCount; ++i) {
|
||||
UniqueSQLKey* key = &(removeList[i].key);
|
||||
uint32 hashCode = uniqueSQLHashCode(key, sizeof(UniqueSQLKey));
|
||||
(void)LockUniqueSQLHashPartition(hashCode, LW_EXCLUSIVE);
|
||||
|
||||
/* remove entry, need update valid stat timestamp */
|
||||
UpdateUniqueSQLValidStatTimestamp();
|
||||
hash_search(g_instance.stat_cxt.UniqueSQLHashtbl, key, HASH_REMOVE, NULL);
|
||||
UnlockUniqueSQLHashPartition(hashCode);
|
||||
}
|
||||
pfree(removeList);
|
||||
LWLockRelease(UniqueSqlEvictLock);
|
||||
ereport(DEBUG1,
|
||||
(errmodule(MOD_INSTR), errmsg("[UniqueSQL] Auto-cleanup over, %d uniquesqls are recycled.", cleanCount)));
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* GetSortedEntryList
|
||||
* get all of entry, and sort them by entry's updated_time
|
||||
*/
|
||||
static KeyUpdatedtime* GetSortedEntryList()
|
||||
{
|
||||
UniqueSQL* entry = NULL;
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
int j = 0;
|
||||
int i;
|
||||
for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) {
|
||||
LWLockAcquire(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i), LW_SHARED);
|
||||
}
|
||||
long totalCount = hash_get_num_entries(g_instance.stat_cxt.UniqueSQLHashtbl);
|
||||
KeyUpdatedtime* removeList = NULL;
|
||||
removeList = (KeyUpdatedtime*)palloc0_noexcept(totalCount * sizeof(KeyUpdatedtime));
|
||||
if (removeList == NULL) {
|
||||
for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) {
|
||||
LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i));
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
hash_seq_init(&hash_seq, g_instance.stat_cxt.UniqueSQLHashtbl);
|
||||
while ((entry = (UniqueSQL*)hash_seq_search(&hash_seq)) != NULL) {
|
||||
KeyUpdatedtime keyUpdatedtime;
|
||||
keyUpdatedtime.key.cn_id = entry->key.cn_id;
|
||||
keyUpdatedtime.key.user_id = entry->key.user_id;
|
||||
keyUpdatedtime.key.unique_sql_id = entry->key.unique_sql_id;
|
||||
keyUpdatedtime.updated_time = entry->updated_time;
|
||||
removeList[j++] = keyUpdatedtime;
|
||||
}
|
||||
for (i = 0; i < NUM_UNIQUE_SQL_PARTITIONS; i++) {
|
||||
LWLockRelease(GetMainLWLockByIndex(FirstUniqueSQLMappingLock + i));
|
||||
}
|
||||
qsort((void*)removeList, j, sizeof(KeyUpdatedtime), KeyUpdatedtimeCmp);
|
||||
return removeList;
|
||||
}
|
||||
|
||||
static int KeyUpdatedtimeCmp(const void* a, const void* b)
|
||||
{
|
||||
const KeyUpdatedtime* i1 = (const KeyUpdatedtime*)a;
|
||||
const KeyUpdatedtime* i2 = (const KeyUpdatedtime*)b;
|
||||
if (i1->updated_time < i2->updated_time)
|
||||
return -1;
|
||||
else if (i1->updated_time == i2->updated_time)
|
||||
return 0;
|
||||
else
|
||||
return 1;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Reset unique sql stat info for the current database */
|
||||
Datum reset_unique_sql(PG_FUNCTION_ARGS)
|
||||
{
|
||||
@ -2410,6 +2573,9 @@ void ResetCurrentUniqueSQL(bool need_reset_cn_id)
|
||||
|
||||
/* used when nested portal calling case */
|
||||
u_sess->unique_sql_cxt.portal_nesting_level = 0;
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
u_sess->unique_sql_cxt.unique_sql_text = NULL;
|
||||
#endif
|
||||
}
|
||||
|
||||
void FindUniqueSQL(UniqueSQLKey key, char* unique_sql)
|
||||
|
Reference in New Issue
Block a user