support extension define session variables

This commit is contained in:
chenxiaobin19
2021-07-28 14:33:08 +08:00
parent 468d6bcbc2
commit 6ad5f787ed
11 changed files with 137 additions and 35 deletions

View File

@ -55,6 +55,7 @@
#include "utils/rel.h"
#include "utils/rel_gs.h"
#include "access/heapam.h"
#include "commands/extension.h"
#include "dblink.h"
@ -111,9 +112,14 @@ static void validate_pkattnums(
static int applyRemoteGucs(PGconn* conn);
static void restoreLocalGucs(int nestlevel);
/* Global */
static THR_LOCAL remoteConn* pconn = NULL;
static THR_LOCAL HTAB* remoteConnHash = NULL;
static uint32 dblink_index;
#define PCONN (get_session_context()->pconn)
#define REMOTE_CONN_HASH (get_session_context()->remoteConnHash)
typedef struct dblink_session_context {
remoteConn* pconn;
HTAB* remoteConnHash;
} dblink_session_context;
/*
* Following is list that holds multiple remote connections.
@ -204,15 +210,40 @@ typedef struct remoteConnHashEnt {
#define DBLINK_INIT \
do { \
if (!pconn) { \
pconn = (remoteConn*)MemoryContextAlloc( \
THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION), sizeof(remoteConn)); \
pconn->conn = NULL; \
pconn->openCursorCount = 0; \
pconn->newXactForCursor = FALSE; \
if (!PCONN) { \
PCONN = (remoteConn*)MemoryContextAlloc( \
SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION), sizeof(remoteConn)); \
PCONN->conn = NULL; \
PCONN->openCursorCount = 0; \
PCONN->newXactForCursor = FALSE; \
} \
} while (0)
void set_extension_index(uint32 index)
{
dblink_index = index;
}
void init_session_vars(void)
{
RepallocSessionVarsArrayIfNecessary();
dblink_session_context* psc =
(dblink_session_context*)MemoryContextAllocZero(u_sess->self_mem_cxt, sizeof(dblink_session_context));
u_sess->attr.attr_common.extension_session_vars_array[dblink_index] = psc;
psc->pconn = NULL;
psc->remoteConnHash = NULL;
}
dblink_session_context* get_session_context()
{
if (u_sess->attr.attr_common.extension_session_vars_array[dblink_index] == NULL) {
init_session_vars();
}
return (dblink_session_context*)u_sess->attr.attr_common.extension_session_vars_array[dblink_index];
}
/*
* Create a persistent connection to another database
*/
@ -236,7 +267,7 @@ Datum dblink_connect(PG_FUNCTION_ARGS)
if (connname)
rconn = (remoteConn*)MemoryContextAlloc(
THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION), sizeof(remoteConn));
SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION), sizeof(remoteConn));
/* first check for valid foreign data server */
connstr = get_connect_string(conname_or_str);
@ -269,7 +300,7 @@ Datum dblink_connect(PG_FUNCTION_ARGS)
rconn->conn = conn;
createNewConnection(connname, rconn);
} else
pconn->conn = conn;
PCONN->conn = conn;
PG_RETURN_TEXT_P(cstring_to_text("OK"));
}
@ -292,7 +323,7 @@ Datum dblink_disconnect(PG_FUNCTION_ARGS)
if (rconn)
conn = rconn->conn;
} else
conn = pconn->conn;
conn = PCONN->conn;
if (!conn)
DBLINK_CONN_NOT_AVAIL;
@ -302,7 +333,7 @@ Datum dblink_disconnect(PG_FUNCTION_ARGS)
deleteConnection(conname);
pfree(rconn);
} else
pconn->conn = NULL;
PCONN->conn = NULL;
PG_RETURN_TEXT_P(cstring_to_text("OK"));
}
@ -330,14 +361,14 @@ Datum dblink_open(PG_FUNCTION_ARGS)
/* text,text */
curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
rconn = pconn;
rconn = PCONN;
} else if (PG_NARGS() == 3) {
/* might be text,text,text or text,text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID) {
curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
fail = PG_GETARG_BOOL(2);
rconn = pconn;
rconn = PCONN;
} else {
conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
@ -410,13 +441,13 @@ Datum dblink_close(PG_FUNCTION_ARGS)
if (PG_NARGS() == 1) {
/* text */
curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
rconn = pconn;
rconn = PCONN;
} else if (PG_NARGS() == 2) {
/* might be text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) {
curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
fail = PG_GETARG_BOOL(1);
rconn = pconn;
rconn = PCONN;
} else {
conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
@ -500,7 +531,7 @@ Datum dblink_fetch(PG_FUNCTION_ARGS)
curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
howmany = PG_GETARG_INT32(1);
fail = PG_GETARG_BOOL(2);
conn = pconn->conn;
conn = PCONN->conn;
} else {
conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
@ -514,7 +545,7 @@ Datum dblink_fetch(PG_FUNCTION_ARGS)
/* text,int */
curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
howmany = PG_GETARG_INT32(1);
conn = pconn->conn;
conn = PCONN->conn;
}
if (!conn)
@ -610,7 +641,7 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
} else if (is_two) {
/* text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) {
conn = pconn->conn;
conn = PCONN->conn;
sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
fail = PG_GETARG_BOOL(1);
} else {
@ -619,7 +650,7 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
}
} else if (is_one) {
/* text */
conn = pconn->conn;
conn = PCONN->conn;
sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
} else if (!is_async) {
/* shouldn't happen */
@ -1119,8 +1150,8 @@ Datum dblink_get_connections(PG_FUNCTION_ARGS)
remoteConnHashEnt* hentry = NULL;
ArrayBuildState* astate = NULL;
if (remoteConnHash) {
hash_seq_init(&status, remoteConnHash);
if (REMOTE_CONN_HASH) {
hash_seq_init(&status, REMOTE_CONN_HASH);
while ((hentry = (remoteConnHashEnt*)hash_seq_search(&status)) != NULL) {
/* stash away current value */
astate = accumArrayResult(astate, CStringGetTextDatum(hentry->name), false, TEXTOID, CurrentMemoryContext);
@ -1247,7 +1278,7 @@ Datum dblink_exec(PG_FUNCTION_ARGS)
} else if (PG_NARGS() == 2) {
/* might be text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) {
conn = pconn->conn;
conn = PCONN->conn;
sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
fail = PG_GETARG_BOOL(1);
} else {
@ -1256,7 +1287,7 @@ Datum dblink_exec(PG_FUNCTION_ARGS)
}
} else if (PG_NARGS() == 1) {
/* must be single text argument */
conn = pconn->conn;
conn = PCONN->conn;
sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
} else
/* shouldn't happen */
@ -1706,7 +1737,7 @@ Datum dblink_get_notify(PG_FUNCTION_ARGS)
if (PG_NARGS() == 1)
DBLINK_GET_NAMED_CONN;
else
conn = pconn->conn;
conn = PCONN->conn;
/* create the tuplestore in per-query memory */
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
@ -2209,12 +2240,12 @@ static remoteConn* getConnectionByName(const char* name)
remoteConnHashEnt* hentry = NULL;
char* key = NULL;
if (!remoteConnHash)
remoteConnHash = createConnHash();
if (!REMOTE_CONN_HASH)
REMOTE_CONN_HASH = createConnHash();
key = pstrdup(name);
truncate_identifier(key, strlen(key), false);
hentry = (remoteConnHashEnt*)hash_search(remoteConnHash, key, HASH_FIND, NULL);
hentry = (remoteConnHashEnt*)hash_search(REMOTE_CONN_HASH, key, HASH_FIND, NULL);
if (hentry)
return (hentry->rconn);
@ -2238,12 +2269,12 @@ static void createNewConnection(const char* name, remoteConn* rconn)
bool found = false;
char* key = NULL;
if (!remoteConnHash)
remoteConnHash = createConnHash();
if (!REMOTE_CONN_HASH)
REMOTE_CONN_HASH = createConnHash();
key = pstrdup(name);
truncate_identifier(key, strlen(key), true);
hentry = (remoteConnHashEnt*)hash_search(remoteConnHash, key, HASH_ENTER, &found);
hentry = (remoteConnHashEnt*)hash_search(REMOTE_CONN_HASH, key, HASH_ENTER, &found);
if (found) {
PQfinish(rconn->conn);
@ -2262,12 +2293,12 @@ static void deleteConnection(const char* name)
bool found = false;
char* key = NULL;
if (!remoteConnHash)
remoteConnHash = createConnHash();
if (!REMOTE_CONN_HASH)
REMOTE_CONN_HASH = createConnHash();
key = pstrdup(name);
truncate_identifier(key, strlen(key), false);
hentry = (remoteConnHashEnt*)hash_search(remoteConnHash, key, HASH_REMOVE, &found);
hentry = (remoteConnHashEnt*)hash_search(REMOTE_CONN_HASH, key, HASH_REMOVE, &found);
if (!hentry)
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("undefined connection name")));

View File

@ -58,5 +58,7 @@ extern "C" Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
extern "C" Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
extern "C" Datum dblink_current_query(PG_FUNCTION_ARGS);
extern "C" Datum dblink_get_notify(PG_FUNCTION_ARGS);
extern "C" void set_extension_index(uint32 index);
extern "C" void init_session_vars(void);
#endif /* DBLINK_H */

View File

@ -179,6 +179,7 @@ void* internal_load_library(const char* libname)
DynamicFileList* file_scanner = NULL;
FileListInit* file_init_scanner = NULL;
PGModuleMagicFunction magic_func;
void (*set_extension_index)(uint32);
struct stat stat_buf;
PG_init_t PG_init = NULL;
char* file = last_dir_separator(libname);
@ -311,6 +312,13 @@ void* internal_load_library(const char* libname)
errhint("Extension libraries are required to use the PG_MODULE_MAGIC macro.")));
}
/* Look up the set_extension_index function within extension's so */
set_extension_index = (void(*)(uint32))pg_dlsym(file_scanner->handle, "set_extension_index");
if (set_extension_index) {
uint32 idx = pg_atomic_fetch_add_u32(&g_instance.extensionNum, 1);
(*set_extension_index)(idx);
}
/* OK to link it into list */
if (file_list == NULL)
file_list = file_scanner;

View File

@ -86,6 +86,11 @@
#include "instruments/percentile.h"
#include "instruments/instr_workload.h"
#include "gs_policy/policy_common.h"
#ifndef WIN32_ONLY_COMPILER
#include "dynloader.h"
#else
#include "port/dynloader/win32.h"
#endif
#ifdef PGXC
#include "catalog/pgxc_node.h"
@ -1737,6 +1742,8 @@ void PostgresInitializer::InitSession()
InitSettings();
InitExtensionVariable();
FinishInit();
AuditUserLogin();
@ -2260,6 +2267,28 @@ void PostgresInitializer::InitSettings()
InitializeClientEncoding();
}
void PostgresInitializer::InitExtensionVariable()
{
int initExtArraySize = 10;
void (*init_session_vars)(void);
/* initialize u_sess->attr.attr_common.extension_session_vars_array */
u_sess->attr.attr_common.extension_session_vars_array_size = initExtArraySize;
u_sess->attr.attr_common.extension_session_vars_array =
(void**)MemoryContextAllocZero(u_sess->self_mem_cxt, (Size)(initExtArraySize * sizeof(void*)));
DynamicFileList* file_scanner = NULL;
for (file_scanner = file_list; file_scanner != NULL; file_scanner = file_scanner->next) {
/*
* If the library has a init_session_vars() function, call it for
* initializing extension session variables.
*/
init_session_vars = (void(*)(void))pg_dlsym(file_scanner->handle, "init_session_vars");
if (init_session_vars != NULL)
(*init_session_vars)();
}
}
void PostgresInitializer::FinishInit()
{
/* report this backend in the PgBackendStatus array */

View File

@ -2889,3 +2889,25 @@ void AlterExtensionOwner_oid(Oid extensionOid, Oid newOwnerId)
heap_close(rel, NoLock);
}
/*
* Expand the size of extension_session_vars_array_size by twice the number
* of extensions each time if latter is bigger.
*/
void RepallocSessionVarsArrayIfNecessary()
{
uint32 currExtensionNum = pg_atomic_read_u32(&g_instance.extensionNum);
uint32 currArraySize = u_sess->attr.attr_common.extension_session_vars_array_size;
int rc;
if (currExtensionNum >= currArraySize) {
u_sess->attr.attr_common.extension_session_vars_array = (void**)repalloc(
u_sess->attr.attr_common.extension_session_vars_array, currExtensionNum * 2 * sizeof(void*));
rc = memset_s(&u_sess->attr.attr_common.extension_session_vars_array[currArraySize],
(currExtensionNum * 2 - currArraySize) * sizeof(void*), 0,
(currExtensionNum * 2 - currArraySize) * sizeof(void*));
securec_check(rc, "", "");
u_sess->attr.attr_common.extension_session_vars_array_size = currExtensionNum * 2;
}
}

View File

@ -550,6 +550,7 @@ void knl_instance_init()
g_instance.stat_cxt.track_memory_inited = false;
g_instance.proc_base = NULL;
g_instance.proc_array_idx = NULL;
pg_atomic_init_u32(&g_instance.extensionNum, 0);
/*
* Set up the process wise memory context. The memory allocated from this

View File

@ -91,6 +91,8 @@ static void knl_u_attr_init(knl_session_attr* attr)
attr->attr_sql.under_explain = false;
attr->attr_resource.enable_auto_explain = false;
attr->attr_sql.enable_upsert_to_merge = false;
attr->attr_common.extension_session_vars_array_size = 0;
attr->attr_common.extension_session_vars_array = NULL;
}
void knl_u_executor_init(knl_u_executor_context* exec_cxt)

View File

@ -42,4 +42,6 @@ extern void AlterExtensionNamespace(List* names, const char* newschema);
extern void AlterExtensionOwner_oid(Oid extensionOid, Oid newOwnerId);
extern void RepallocSessionVarsArrayIfNecessary();
#endif /* EXTENSION_H */

View File

@ -208,6 +208,8 @@ typedef struct knl_session_attr_common {
char* router_att;
bool enable_router;
int gpc_clean_timeout;
uint32 extension_session_vars_array_size;
void** extension_session_vars_array;
} knl_session_attr_common;
#endif /* SRC_INCLUDE_KNL_KNL_SESSION_ATTR_COMMON_H_ */

View File

@ -876,6 +876,7 @@ typedef struct knl_instance_context {
void *raw_parser_hook[DB_CMPT_MAX];
void *plsql_parser_hook[DB_CMPT_MAX];
#endif
pg_atomic_uint32 extensionNum;
} knl_instance_context;
extern long random();

View File

@ -160,6 +160,8 @@ private:
void InitSettings();
void InitExtensionVariable();
void FinishInit();
void AuditUserLogin();