修改syscache,catcache 的访问方式,可以接受hook

This commit is contained in:
chenbd
2023-02-22 11:22:39 -05:00
parent 3fee79a080
commit 728bb6933b
5 changed files with 69 additions and 28 deletions

View File

@ -1063,7 +1063,11 @@ static void CatalogCacheInitializeCache(CatCache* cache)
keytype = OIDOID;
}
GetCCHashEqFuncs(keytype, &cache->cc_hashfunc[i], &eqfunc, &cache->cc_fastequal[i]);
if (u_sess->hook_cxt.pluginCCHashEqFuncs == NULL ||
!((pluginCCHashEqFuncs)(u_sess->hook_cxt.pluginCCHashEqFuncs))(keytype, &cache->cc_hashfunc[i], &eqfunc,
&cache->cc_fastequal[i], cache->id)) {
GetCCHashEqFuncs(keytype, &cache->cc_hashfunc[i], &eqfunc, &cache->cc_fastequal[i]);
}
/*
* Do equality-function lookup (we assume this won't need a catalog
@ -1580,7 +1584,7 @@ static HeapTuple SearchCatCacheMiss(
{
ScanKeyData cur_skey[CATCACHE_MAXKEYS];
Relation relation;
SysScanDesc scandesc;
SysScanDesc scandesc = NULL;
HeapTuple ntp;
CatCTup* ct = NULL;
Datum arguments[CATCACHE_MAXKEYS];
@ -1652,16 +1656,27 @@ static HeapTuple SearchCatCacheMiss(
ereport(DEBUG1, (errmsg("cache->cc_reloid - %d", cache->cc_reloid)));
scandesc = systable_beginscan(
relation, cache->cc_indexoid, IndexScanOK(cache->id), NULL, nkeys, cur_skey);
if (u_sess->hook_cxt.pluginSearchCatHook != NULL) {
if (HeapTupleIsValid(ntp = ((searchCatFunc)(u_sess->hook_cxt.pluginSearchCatHook))(relation,
cache->cc_indexoid, cache->id, nkeys, cur_skey, &scandesc))) {
ct = CatalogCacheCreateEntry(cache, ntp, arguments, hashValue, hashIndex, false);
/* immediately set the refcount to 1 */
ResourceOwnerEnlargeCatCacheRefs(t_thrd.utils_cxt.CurrentResourceOwner);
ct->refcount++;
ResourceOwnerRememberCatCacheRef(t_thrd.utils_cxt.CurrentResourceOwner, &ct->tuple);
}
} else {
scandesc = systable_beginscan(
relation, cache->cc_indexoid, IndexScanOK(cache->id), NULL, nkeys, cur_skey);
while (HeapTupleIsValid(ntp = systable_getnext(scandesc))) {
ct = CatalogCacheCreateEntry(cache, ntp, arguments, hashValue, hashIndex, false);
/* immediately set the refcount to 1 */
ResourceOwnerEnlargeCatCacheRefs(t_thrd.utils_cxt.CurrentResourceOwner);
ct->refcount++;
ResourceOwnerRememberCatCacheRef(t_thrd.utils_cxt.CurrentResourceOwner, &ct->tuple);
break; /* assume only one match */
while (HeapTupleIsValid(ntp = systable_getnext(scandesc))) {
ct = CatalogCacheCreateEntry(cache, ntp, arguments, hashValue, hashIndex, false);
/* immediately set the refcount to 1 */
ResourceOwnerEnlargeCatCacheRefs(t_thrd.utils_cxt.CurrentResourceOwner);
ct->refcount++;
ResourceOwnerRememberCatCacheRef(t_thrd.utils_cxt.CurrentResourceOwner, &ct->tuple);
break; /* assume only one match */
}
}
systable_endscan(scandesc);

View File

@ -781,27 +781,45 @@ GlobalCatCTup *GlobalSysTupCache::SearchTupleMiss(InsertCatTupInfo *tup_info)
cur_skey[1].sk_argument = arguments[1];
cur_skey[2].sk_argument = arguments[2];
cur_skey[3].sk_argument = arguments[3];
SysScanDesc scandesc =
systable_beginscan(relation, m_relinfo.cc_indexoid, IndexScanOK(cc_id), NULL,
m_relinfo.cc_nkeys, cur_skey);
if (tup_info->has_concurrent_lock) {
AcquireGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
HeapTuple ntp;
while (HeapTupleIsValid(ntp = systable_getnext(scandesc))) {
tup_info->ntp = ntp;
if (!tup_info->has_concurrent_lock) {
tup_info->canInsertGSC = false;
} else {
tup_info->canInsertGSC = CanTupleInertGSC(ntp);
if (!tup_info->canInsertGSC) {
/* unlock concurrent immediately, any one can invalid cache now */
ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
SysScanDesc scandesc = NULL;
if (u_sess->hook_cxt.pluginSearchCatHook != NULL) {
if (HeapTupleIsValid(ntp = ((searchCatFunc)(u_sess->hook_cxt.pluginSearchCatHook))(relation,
m_relinfo.cc_indexoid, cc_id, m_relinfo.cc_nkeys, cur_skey, &scandesc))) {
tup_info->ntp = ntp;
if (!tup_info->has_concurrent_lock) {
tup_info->canInsertGSC = false;
} else {
tup_info->canInsertGSC = CanTupleInertGSC(ntp);
if (!tup_info->canInsertGSC) {
/* unlock concurrent immediately, any one can invalid cache now */
ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
}
ct = InsertHeapTupleIntoCatCacheInSingle(tup_info);
}
} else {
scandesc =
systable_beginscan(relation, m_relinfo.cc_indexoid, IndexScanOK(cc_id), NULL,
m_relinfo.cc_nkeys, cur_skey);
while (HeapTupleIsValid(ntp = systable_getnext(scandesc))) {
tup_info->ntp = ntp;
if (!tup_info->has_concurrent_lock) {
tup_info->canInsertGSC = false;
} else {
tup_info->canInsertGSC = CanTupleInertGSC(ntp);
if (!tup_info->canInsertGSC) {
/* unlock concurrent immediately, any one can invalid cache now */
ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
}
ct = InsertHeapTupleIntoCatCacheInSingle(tup_info);
break; /* assume only one match */
}
ct = InsertHeapTupleIntoCatCacheInSingle(tup_info);
break; /* assume only one match */
}
/* unlock finally */
if (tup_info->has_concurrent_lock) {
@ -1489,7 +1507,11 @@ void GlobalSysTupCache::InitRelationInfo()
ereport(FATAL, (errmsg("only sys attr supported in caches is OID")));
keytype = OIDOID;
}
GetCCHashEqFuncs(keytype, &m_relinfo.cc_hashfunc[i], &eqfunc, &m_relinfo.cc_fastequal[i]);
if (u_sess->hook_cxt.pluginCCHashEqFuncs == NULL ||
!((pluginCCHashEqFuncs)(u_sess->hook_cxt.pluginCCHashEqFuncs))(keytype, &m_relinfo.cc_hashfunc[i], &eqfunc,
&m_relinfo.cc_fastequal[i], cc_id)) {
GetCCHashEqFuncs(keytype, &m_relinfo.cc_hashfunc[i], &eqfunc, &m_relinfo.cc_fastequal[i]);
}
/*
* Do equality-function lookup (we assume this won't need a catalog
* lookup for any supported type)

View File

@ -435,7 +435,7 @@ bool CacheLoader::fill_global_settings_map(PGconn *conn)
{
m_global_settings_list.clear();
const char *global_settings_query =
"SELECT gs_client_global_keys.Oid, gs_client_global_keys.global_key_name, pg_namespace.nspname, "
"SELECT gs_client_global_keys.oid, gs_client_global_keys.global_key_name, pg_namespace.nspname, "
"gs_client_global_keys_args.function_name, gs_client_global_keys_args.key, gs_client_global_keys_args.value, "
"EXTRACT(EPOCH from gs_client_global_keys.create_date) as change_epoch "
"FROM gs_client_global_keys JOIN pg_namespace ON (pg_namespace.Oid = gs_client_global_keys.key_namespace) "
@ -729,7 +729,7 @@ bool CacheLoader::fill_cached_columns(PGconn *conn)
const bool CacheLoader::fill_cached_types(PGconn *conn)
{
const char *query = "select pg_type.Oid, current_database(), nspname ,typname from pg_type join pg_namespace on "
const char *query = "select pg_type.oid, current_database(), nspname ,typname from pg_type join pg_namespace on "
"(typnamespace=pg_namespace.Oid) where typrelid in (Select rel_id from gs_encrypted_columns);";
DataFetcher data_fetcher = conn->client_logic->m_data_fetcher_manager->get_data_fetcher();

View File

@ -2722,6 +2722,8 @@ typedef struct knl_u_hook_context {
void *aggIsSupportedHook;
void *searchFuncHook;
void *plannerHook;
void *pluginSearchCatHook;
void *pluginCCHashEqFuncs;
} knl_u_hook_context;
/* PBE message flag */
typedef enum {

View File

@ -43,4 +43,6 @@ extern uint32 CatalogCacheComputeHashValue(CCHashFN *cc_hashfunc, int nkeys, Dat
HeapTuple GetPgAttributeAttrTuple(TupleDesc tupleDesc, const Form_pg_attribute attr);
void GetCCHashEqFuncs(Oid keytype, CCHashFN *hashfunc, RegProcedure *eqfunc, CCFastEqualFN *fasteqfunc);
void SearchCatCacheCheck();
typedef HeapTuple (*searchCatFunc)(Relation relation, Oid indexoid, int cacheId, int nkeys, ScanKeyData* cur_skey, SysScanDesc *scandesc);
typedef bool (*pluginCCHashEqFuncs)(Oid keytype, CCHashFN *hashfunc, RegProcedure *eqfunc, CCFastEqualFN *fasteqfunc, int cacheId);
#endif