From 6ad5f787ed1dacfbcd47c43a54a47308b2505f8e Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Wed, 28 Jul 2021 14:33:08 +0800 Subject: [PATCH] support extension define session variables --- contrib/dblink/dblink.cpp | 101 ++++++++++++------ contrib/dblink/dblink.h | 2 + src/common/backend/utils/fmgr/dfmgr.cpp | 8 ++ src/common/backend/utils/init/postinit.cpp | 29 +++++ .../optimizer/commands/extension.cpp | 22 ++++ .../process/threadpool/knl_instance.cpp | 1 + .../process/threadpool/knl_session.cpp | 2 + src/include/commands/extension.h | 2 + .../knl/knl_guc/knl_session_attr_common.h | 2 + src/include/knl/knl_instance.h | 1 + src/include/utils/postinit.h | 2 + 11 files changed, 137 insertions(+), 35 deletions(-) diff --git a/contrib/dblink/dblink.cpp b/contrib/dblink/dblink.cpp index e2a308179..e903f4ffc 100755 --- a/contrib/dblink/dblink.cpp +++ b/contrib/dblink/dblink.cpp @@ -55,6 +55,7 @@ #include "utils/rel.h" #include "utils/rel_gs.h" #include "access/heapam.h" +#include "commands/extension.h" #include "dblink.h" @@ -111,9 +112,14 @@ static void validate_pkattnums( static int applyRemoteGucs(PGconn* conn); static void restoreLocalGucs(int nestlevel); -/* Global */ -static THR_LOCAL remoteConn* pconn = NULL; -static THR_LOCAL HTAB* remoteConnHash = NULL; +static uint32 dblink_index; +#define PCONN (get_session_context()->pconn) +#define REMOTE_CONN_HASH (get_session_context()->remoteConnHash) + +typedef struct dblink_session_context { + remoteConn* pconn; + HTAB* remoteConnHash; +} dblink_session_context; /* * Following is list that holds multiple remote connections. @@ -204,15 +210,40 @@ typedef struct remoteConnHashEnt { #define DBLINK_INIT \ do { \ - if (!pconn) { \ - pconn = (remoteConn*)MemoryContextAlloc( \ - THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION), sizeof(remoteConn)); \ - pconn->conn = NULL; \ - pconn->openCursorCount = 0; \ - pconn->newXactForCursor = FALSE; \ + if (!PCONN) { \ + PCONN = (remoteConn*)MemoryContextAlloc( \ + SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION), sizeof(remoteConn)); \ + PCONN->conn = NULL; \ + PCONN->openCursorCount = 0; \ + PCONN->newXactForCursor = FALSE; \ } \ } while (0) +void set_extension_index(uint32 index) +{ + dblink_index = index; +} + +void init_session_vars(void) +{ + RepallocSessionVarsArrayIfNecessary(); + + dblink_session_context* psc = + (dblink_session_context*)MemoryContextAllocZero(u_sess->self_mem_cxt, sizeof(dblink_session_context)); + u_sess->attr.attr_common.extension_session_vars_array[dblink_index] = psc; + + psc->pconn = NULL; + psc->remoteConnHash = NULL; +} + +dblink_session_context* get_session_context() +{ + if (u_sess->attr.attr_common.extension_session_vars_array[dblink_index] == NULL) { + init_session_vars(); + } + return (dblink_session_context*)u_sess->attr.attr_common.extension_session_vars_array[dblink_index]; +} + /* * Create a persistent connection to another database */ @@ -236,7 +267,7 @@ Datum dblink_connect(PG_FUNCTION_ARGS) if (connname) rconn = (remoteConn*)MemoryContextAlloc( - THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION), sizeof(remoteConn)); + SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION), sizeof(remoteConn)); /* first check for valid foreign data server */ connstr = get_connect_string(conname_or_str); @@ -269,7 +300,7 @@ Datum dblink_connect(PG_FUNCTION_ARGS) rconn->conn = conn; createNewConnection(connname, rconn); } else - pconn->conn = conn; + PCONN->conn = conn; PG_RETURN_TEXT_P(cstring_to_text("OK")); } @@ -292,7 +323,7 @@ Datum dblink_disconnect(PG_FUNCTION_ARGS) if (rconn) conn = rconn->conn; } else - conn = pconn->conn; + conn = PCONN->conn; if (!conn) DBLINK_CONN_NOT_AVAIL; @@ -302,7 +333,7 @@ Datum dblink_disconnect(PG_FUNCTION_ARGS) deleteConnection(conname); pfree(rconn); } else - pconn->conn = NULL; + PCONN->conn = NULL; PG_RETURN_TEXT_P(cstring_to_text("OK")); } @@ -330,14 +361,14 @@ Datum dblink_open(PG_FUNCTION_ARGS) /* text,text */ curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); - rconn = pconn; + rconn = PCONN; } else if (PG_NARGS() == 3) { /* might be text,text,text or text,text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID) { curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); fail = PG_GETARG_BOOL(2); - rconn = pconn; + rconn = PCONN; } else { conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); curname = text_to_cstring(PG_GETARG_TEXT_PP(1)); @@ -410,13 +441,13 @@ Datum dblink_close(PG_FUNCTION_ARGS) if (PG_NARGS() == 1) { /* text */ curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); - rconn = pconn; + rconn = PCONN; } else if (PG_NARGS() == 2) { /* might be text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); fail = PG_GETARG_BOOL(1); - rconn = pconn; + rconn = PCONN; } else { conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); curname = text_to_cstring(PG_GETARG_TEXT_PP(1)); @@ -500,7 +531,7 @@ Datum dblink_fetch(PG_FUNCTION_ARGS) curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); howmany = PG_GETARG_INT32(1); fail = PG_GETARG_BOOL(2); - conn = pconn->conn; + conn = PCONN->conn; } else { conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); curname = text_to_cstring(PG_GETARG_TEXT_PP(1)); @@ -514,7 +545,7 @@ Datum dblink_fetch(PG_FUNCTION_ARGS) /* text,int */ curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); howmany = PG_GETARG_INT32(1); - conn = pconn->conn; + conn = PCONN->conn; } if (!conn) @@ -610,7 +641,7 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) } else if (is_two) { /* text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { - conn = pconn->conn; + conn = PCONN->conn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); fail = PG_GETARG_BOOL(1); } else { @@ -619,7 +650,7 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) } } else if (is_one) { /* text */ - conn = pconn->conn; + conn = PCONN->conn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); } else if (!is_async) { /* shouldn't happen */ @@ -1119,8 +1150,8 @@ Datum dblink_get_connections(PG_FUNCTION_ARGS) remoteConnHashEnt* hentry = NULL; ArrayBuildState* astate = NULL; - if (remoteConnHash) { - hash_seq_init(&status, remoteConnHash); + if (REMOTE_CONN_HASH) { + hash_seq_init(&status, REMOTE_CONN_HASH); while ((hentry = (remoteConnHashEnt*)hash_seq_search(&status)) != NULL) { /* stash away current value */ astate = accumArrayResult(astate, CStringGetTextDatum(hentry->name), false, TEXTOID, CurrentMemoryContext); @@ -1247,7 +1278,7 @@ Datum dblink_exec(PG_FUNCTION_ARGS) } else if (PG_NARGS() == 2) { /* might be text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { - conn = pconn->conn; + conn = PCONN->conn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); fail = PG_GETARG_BOOL(1); } else { @@ -1256,7 +1287,7 @@ Datum dblink_exec(PG_FUNCTION_ARGS) } } else if (PG_NARGS() == 1) { /* must be single text argument */ - conn = pconn->conn; + conn = PCONN->conn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); } else /* shouldn't happen */ @@ -1706,7 +1737,7 @@ Datum dblink_get_notify(PG_FUNCTION_ARGS) if (PG_NARGS() == 1) DBLINK_GET_NAMED_CONN; else - conn = pconn->conn; + conn = PCONN->conn; /* create the tuplestore in per-query memory */ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; @@ -2209,12 +2240,12 @@ static remoteConn* getConnectionByName(const char* name) remoteConnHashEnt* hentry = NULL; char* key = NULL; - if (!remoteConnHash) - remoteConnHash = createConnHash(); + if (!REMOTE_CONN_HASH) + REMOTE_CONN_HASH = createConnHash(); key = pstrdup(name); truncate_identifier(key, strlen(key), false); - hentry = (remoteConnHashEnt*)hash_search(remoteConnHash, key, HASH_FIND, NULL); + hentry = (remoteConnHashEnt*)hash_search(REMOTE_CONN_HASH, key, HASH_FIND, NULL); if (hentry) return (hentry->rconn); @@ -2238,12 +2269,12 @@ static void createNewConnection(const char* name, remoteConn* rconn) bool found = false; char* key = NULL; - if (!remoteConnHash) - remoteConnHash = createConnHash(); + if (!REMOTE_CONN_HASH) + REMOTE_CONN_HASH = createConnHash(); key = pstrdup(name); truncate_identifier(key, strlen(key), true); - hentry = (remoteConnHashEnt*)hash_search(remoteConnHash, key, HASH_ENTER, &found); + hentry = (remoteConnHashEnt*)hash_search(REMOTE_CONN_HASH, key, HASH_ENTER, &found); if (found) { PQfinish(rconn->conn); @@ -2262,12 +2293,12 @@ static void deleteConnection(const char* name) bool found = false; char* key = NULL; - if (!remoteConnHash) - remoteConnHash = createConnHash(); + if (!REMOTE_CONN_HASH) + REMOTE_CONN_HASH = createConnHash(); key = pstrdup(name); truncate_identifier(key, strlen(key), false); - hentry = (remoteConnHashEnt*)hash_search(remoteConnHash, key, HASH_REMOVE, &found); + hentry = (remoteConnHashEnt*)hash_search(REMOTE_CONN_HASH, key, HASH_REMOVE, &found); if (!hentry) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("undefined connection name"))); diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h index 340edc095..5f4703c27 100644 --- a/contrib/dblink/dblink.h +++ b/contrib/dblink/dblink.h @@ -58,5 +58,7 @@ extern "C" Datum dblink_build_sql_delete(PG_FUNCTION_ARGS); extern "C" Datum dblink_build_sql_update(PG_FUNCTION_ARGS); extern "C" Datum dblink_current_query(PG_FUNCTION_ARGS); extern "C" Datum dblink_get_notify(PG_FUNCTION_ARGS); +extern "C" void set_extension_index(uint32 index); +extern "C" void init_session_vars(void); #endif /* DBLINK_H */ diff --git a/src/common/backend/utils/fmgr/dfmgr.cpp b/src/common/backend/utils/fmgr/dfmgr.cpp index 215c0cb52..fbdf00197 100644 --- a/src/common/backend/utils/fmgr/dfmgr.cpp +++ b/src/common/backend/utils/fmgr/dfmgr.cpp @@ -179,6 +179,7 @@ void* internal_load_library(const char* libname) DynamicFileList* file_scanner = NULL; FileListInit* file_init_scanner = NULL; PGModuleMagicFunction magic_func; + void (*set_extension_index)(uint32); struct stat stat_buf; PG_init_t PG_init = NULL; char* file = last_dir_separator(libname); @@ -311,6 +312,13 @@ void* internal_load_library(const char* libname) errhint("Extension libraries are required to use the PG_MODULE_MAGIC macro."))); } + /* Look up the set_extension_index function within extension's so */ + set_extension_index = (void(*)(uint32))pg_dlsym(file_scanner->handle, "set_extension_index"); + if (set_extension_index) { + uint32 idx = pg_atomic_fetch_add_u32(&g_instance.extensionNum, 1); + (*set_extension_index)(idx); + } + /* OK to link it into list */ if (file_list == NULL) file_list = file_scanner; diff --git a/src/common/backend/utils/init/postinit.cpp b/src/common/backend/utils/init/postinit.cpp index 78fe07f28..a439983c6 100644 --- a/src/common/backend/utils/init/postinit.cpp +++ b/src/common/backend/utils/init/postinit.cpp @@ -86,6 +86,11 @@ #include "instruments/percentile.h" #include "instruments/instr_workload.h" #include "gs_policy/policy_common.h" +#ifndef WIN32_ONLY_COMPILER +#include "dynloader.h" +#else +#include "port/dynloader/win32.h" +#endif #ifdef PGXC #include "catalog/pgxc_node.h" @@ -1737,6 +1742,8 @@ void PostgresInitializer::InitSession() InitSettings(); + InitExtensionVariable(); + FinishInit(); AuditUserLogin(); @@ -2260,6 +2267,28 @@ void PostgresInitializer::InitSettings() InitializeClientEncoding(); } +void PostgresInitializer::InitExtensionVariable() +{ + int initExtArraySize = 10; + void (*init_session_vars)(void); + + /* initialize u_sess->attr.attr_common.extension_session_vars_array */ + u_sess->attr.attr_common.extension_session_vars_array_size = initExtArraySize; + u_sess->attr.attr_common.extension_session_vars_array = + (void**)MemoryContextAllocZero(u_sess->self_mem_cxt, (Size)(initExtArraySize * sizeof(void*))); + + DynamicFileList* file_scanner = NULL; + for (file_scanner = file_list; file_scanner != NULL; file_scanner = file_scanner->next) { + /* + * If the library has a init_session_vars() function, call it for + * initializing extension session variables. + */ + init_session_vars = (void(*)(void))pg_dlsym(file_scanner->handle, "init_session_vars"); + if (init_session_vars != NULL) + (*init_session_vars)(); + } +} + void PostgresInitializer::FinishInit() { /* report this backend in the PgBackendStatus array */ diff --git a/src/gausskernel/optimizer/commands/extension.cpp b/src/gausskernel/optimizer/commands/extension.cpp index 2b3464573..30a0b8431 100644 --- a/src/gausskernel/optimizer/commands/extension.cpp +++ b/src/gausskernel/optimizer/commands/extension.cpp @@ -2889,3 +2889,25 @@ void AlterExtensionOwner_oid(Oid extensionOid, Oid newOwnerId) heap_close(rel, NoLock); } + +/* + * Expand the size of extension_session_vars_array_size by twice the number + * of extensions each time if latter is bigger. + */ +void RepallocSessionVarsArrayIfNecessary() +{ + uint32 currExtensionNum = pg_atomic_read_u32(&g_instance.extensionNum); + uint32 currArraySize = u_sess->attr.attr_common.extension_session_vars_array_size; + int rc; + + if (currExtensionNum >= currArraySize) { + u_sess->attr.attr_common.extension_session_vars_array = (void**)repalloc( + u_sess->attr.attr_common.extension_session_vars_array, currExtensionNum * 2 * sizeof(void*)); + + rc = memset_s(&u_sess->attr.attr_common.extension_session_vars_array[currArraySize], + (currExtensionNum * 2 - currArraySize) * sizeof(void*), 0, + (currExtensionNum * 2 - currArraySize) * sizeof(void*)); + securec_check(rc, "", ""); + u_sess->attr.attr_common.extension_session_vars_array_size = currExtensionNum * 2; + } +} diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index 6a0972f4b..c3b7b9c05 100644 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -550,6 +550,7 @@ void knl_instance_init() g_instance.stat_cxt.track_memory_inited = false; g_instance.proc_base = NULL; g_instance.proc_array_idx = NULL; + pg_atomic_init_u32(&g_instance.extensionNum, 0); /* * Set up the process wise memory context. The memory allocated from this diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index fa6a298be..18057863c 100644 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -91,6 +91,8 @@ static void knl_u_attr_init(knl_session_attr* attr) attr->attr_sql.under_explain = false; attr->attr_resource.enable_auto_explain = false; attr->attr_sql.enable_upsert_to_merge = false; + attr->attr_common.extension_session_vars_array_size = 0; + attr->attr_common.extension_session_vars_array = NULL; } void knl_u_executor_init(knl_u_executor_context* exec_cxt) diff --git a/src/include/commands/extension.h b/src/include/commands/extension.h index d630f0c9f..4443f21e5 100644 --- a/src/include/commands/extension.h +++ b/src/include/commands/extension.h @@ -42,4 +42,6 @@ extern void AlterExtensionNamespace(List* names, const char* newschema); extern void AlterExtensionOwner_oid(Oid extensionOid, Oid newOwnerId); +extern void RepallocSessionVarsArrayIfNecessary(); + #endif /* EXTENSION_H */ diff --git a/src/include/knl/knl_guc/knl_session_attr_common.h b/src/include/knl/knl_guc/knl_session_attr_common.h index 2a59f331d..ddbb4f827 100644 --- a/src/include/knl/knl_guc/knl_session_attr_common.h +++ b/src/include/knl/knl_guc/knl_session_attr_common.h @@ -208,6 +208,8 @@ typedef struct knl_session_attr_common { char* router_att; bool enable_router; int gpc_clean_timeout; + uint32 extension_session_vars_array_size; + void** extension_session_vars_array; } knl_session_attr_common; #endif /* SRC_INCLUDE_KNL_KNL_SESSION_ATTR_COMMON_H_ */ diff --git a/src/include/knl/knl_instance.h b/src/include/knl/knl_instance.h index 67eacca24..7c699a2bc 100644 --- a/src/include/knl/knl_instance.h +++ b/src/include/knl/knl_instance.h @@ -876,6 +876,7 @@ typedef struct knl_instance_context { void *raw_parser_hook[DB_CMPT_MAX]; void *plsql_parser_hook[DB_CMPT_MAX]; #endif + pg_atomic_uint32 extensionNum; } knl_instance_context; extern long random(); diff --git a/src/include/utils/postinit.h b/src/include/utils/postinit.h index 5928b8ec3..08b0569b6 100644 --- a/src/include/utils/postinit.h +++ b/src/include/utils/postinit.h @@ -160,6 +160,8 @@ private: void InitSettings(); + void InitExtensionVariable(); + void FinishInit(); void AuditUserLogin();