Files
openGauss-server/src/gausskernel/runtime/executor/lightProxy.cpp
2021-03-06 12:39:28 +08:00

1401 lines
46 KiB
C++

/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* lightProxy.cpp
*
* IDENTIFICATION
* src/gausskernel/runtime/executor/lightProxy.cpp
*
* -------------------------------------------------------------------------
*/
#include "access/transam.h"
#include "access/xact.h"
#include "utils/dynahash.h"
#include "utils/hotkey.h"
#include "catalog/pgxc_node.h"
#include "commands/prepare.h"
#include "executor/executor.h"
#include "executor/lightProxy.h"
#include "mb/pg_wchar.h"
#include "optimizer/pgxcship.h"
#include "pgxc/poolmgr.h"
#include "pgxc/pgxc.h"
#include "utils/snapmgr.h"
#include "pgstat.h"
#include "pgaudit.h"
#include "pgxc/route.h"
#include "libpq/pqformat.h"
#include "gs_policy/policy_common.h"
#include "instruments/instr_unique_sql.h"
#include "instruments/instr_slow_query.h"
#include "tcop/tcopprot.h"
#include "optimizer/streamplan.h"
const int MAX_COMMAND = 51;
typedef struct commandType {
CmdType type;
const char* commandTag;
} CommandType;
static CommandType g_command_type_array[MAX_COMMAND] = {{CMD_DML, "SELECT"},
{CMD_DML, "UPDATE"},
{CMD_DML, "INSERT"},
{CMD_DML, "DELETE"},
{CMD_DML, "MERGE"},
{CMD_TCL, "BEGIN"},
{CMD_TCL, "COMMIT"},
{CMD_TCL, "ROLLBACK"},
{CMD_TCL, "START"},
{CMD_TCL, "SAVEPOINT"},
{CMD_TCL, "RELEASE"},
{CMD_TCL, "PREPARE TRANSACTION"},
{CMD_DDL, "PREPARE"},
{CMD_DML, "CHECKPOINT"},
{CMD_DML, "TRUNCATE"},
{CMD_DML, "EXPLAIN"},
{CMD_DML, "SHOW"},
{CMD_DML, "LOCK"},
{CMD_DML, "COPY"},
{CMD_DML, "CLUSTER"},
{CMD_DML, "ANONYMOUS"},
{CMD_DML, "VACUUM"},
{CMD_DML, "DELTA"},
{CMD_DML, "ANALYZE"},
{CMD_DML, "EXECUTE"},
{CMD_DDL, "MOVE"},
{CMD_DDL, "FETCH"},
{CMD_DCL, "CREATE ROLE"},
{CMD_DCL, "CREATE USER"},
{CMD_DDL, "CREATE"},
{CMD_DCL, "DROP ROLE"},
{CMD_DCL, "DROP USER"},
{CMD_DDL, "DROP"},
{CMD_DCL, "ALTER ROLE"},
{CMD_DCL, "ALTER USER"},
{CMD_DCL, "ALTER DEFAULT PRIVILEGES"},
{CMD_DDL, "ALTER"},
{CMD_DDL, "CLOSE"},
{CMD_DDL, "DEALLOCATE"},
{CMD_DDL, "DECLARE"},
{CMD_DDL, "REINDEX"},
{CMD_DDL, "COMMENT"},
{CMD_DDL, "BARRIER"},
{CMD_DCL, "GRANT"},
{CMD_DCL, "REVOKE"},
{CMD_DCL, "NOTIFY"},
{CMD_DCL, "LISTEN"},
{CMD_DCL, "LOAD"},
{CMD_DCL, "DISCARD"},
{CMD_DCL, "REASSIGN"},
{CMD_DCL, "CONSTRAINTS"}};
extern void pgxc_node_init(PGXCNodeHandle* handle, int sock);
extern void pgxc_handle_unsupported_stmts(Query* query);
extern Oid exprType(const Node* expr);
extern bool light_xactnodes_member(bool write, const void* datum);
extern int light_node_send_begin(PGXCNodeHandle* handle, bool check_gtm_mode);
extern int light_handle_response(PGXCNodeHandle* conn, lightProxyMsgCtl* msgctl, lightProxy* lp);
extern void light_node_report_error(lightProxyErrData* combiner);
extern bool light_node_receive(PGXCNodeHandle* handle);
extern bool light_node_receive_from_logic_conn(PGXCNodeHandle* handle);
extern void light_pgaudit_ExecutorEnd(Query* query);
void report_qps_type(CmdType commandType);
CmdType set_cmd_type(const char* commandTag);
static void report_iud_time_for_lightproxy(const Query* query)
{
if (u_sess->attr.attr_sql.enable_save_datachanged_timestamp == false)
return;
if (!IS_PGXC_COORDINATOR && !IS_SINGLE_NODE)
return;
if (query->commandType != CMD_INSERT && query->commandType != CMD_DELETE && query->commandType != CMD_UPDATE &&
query->commandType != CMD_MERGE) {
return;
}
if (query->rtable == NULL)
return;
if (query->resultRelation <= list_length(query->rtable)) {
RangeTblEntry* rte = (RangeTblEntry*)list_nth(query->rtable, query->resultRelation - 1);
if (RTE_RELATION != rte->rtekind)
return;
MemoryContext current_ctx = CurrentMemoryContext;
Relation rel = NULL;
PG_TRY();
{
rel = heap_open(rte->relid, AccessShareLock);
if (rel->rd_rel->relkind == RELKIND_RELATION) {
if (rel->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT ||
rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED) {
pgstat_report_data_changed(rte->relid, STATFLG_RELATION, rel->rd_rel->relisshared);
}
}
heap_close(rel, AccessShareLock);
}
PG_CATCH();
{
(void)MemoryContextSwitchTo(current_ctx);
ErrorData* edata = CopyErrorData();
ereport(DEBUG1, (errmsg("Failed to send data changed time, cause: %s", edata->message)));
FlushErrorState();
FreeErrorData(edata);
if (rel != NULL)
heap_close(rel, AccessShareLock);
}
PG_END_TRY();
}
}
static void report_unsupport_light(LightUnSupportType type)
{
if (type == CTRL_DISABLE) {
return;
}
char* unsupport_msg[MAX_UNSUPPORT_TYPE] = {"guc ctrl disable",
"not support client encoding different from database encoding",
"not support cursor",
"not support execute direct on",
"not support others cmd type except I/D/U/S",
"not support table entry relkind is foreign",
"not support query has a statement trigger",
"not support user-defined type "};
ereport(DEBUG2, (errmsg("[LIGHT PROXY] check failed with type: %s.", unsupport_msg[type])));
return;
}
static bool isSupportLightQuery(Query* query)
{
ListCell* item = NULL;
if (!u_sess->attr.attr_sql.enable_fast_query_shipping ||
(!u_sess->attr.attr_sql.enable_light_proxy && !GTM_LITE_MODE)) {
report_unsupport_light(CTRL_DISABLE);
return false;
}
if (pg_get_client_encoding() != GetDatabaseEncoding()) {
report_unsupport_light(ENCODE_UNSUPPORT);
return false;
}
/* not support cursor */
if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) {
report_unsupport_light(CURSOR_UNSUPPORT);
return false;
}
/* not support execute direct on */
if (query->utilityStmt && IsA(query->utilityStmt, RemoteQuery)) {
report_unsupport_light(REMOTE_UNSUPPORT);
return false;
}
/* not support others */
if (query->commandType != CMD_SELECT && query->commandType != CMD_UPDATE && query->commandType != CMD_INSERT &&
query->commandType != CMD_DELETE && !(HAS_ROUTER && query->commandType == CMD_MERGE)) {
report_unsupport_light(CMD_UNSUPPORT);
return false;
}
foreach (item, query->rtable) {
RangeTblEntry* rte = (RangeTblEntry*)lfirst(item);
#ifdef ENABLE_MOT
if (rte->relkind == RELKIND_STREAM || (rte->relkind == RELKIND_FOREIGN_TABLE && !isMOTFromTblOid(rte->relid))) {
#else
if (rte->relkind == RELKIND_STREAM || rte->relkind == RELKIND_FOREIGN_TABLE) {
#endif
report_unsupport_light(FOREIGN_UNSUPPORT);
return false;
}
/*
* Essentially, lightProxy is a fast path for FQSed when length
* of exec_nodes is 1, which is not supported when query has a
* statement trigger.
*/
if (pgxc_find_statement_trigger(rte->relid, query->commandType)) {
report_unsupport_light(STATEMENT_UNSUPPORT);
return false;
}
}
/* check the target list for T message */
foreach (item, query->targetList) {
TargetEntry* tle = (TargetEntry*)lfirst(item);
if (tle->resjunk)
continue;
/* not support user-defined type */
if (exprType((Node*)tle->expr) >= FirstBootstrapObjectId) {
report_unsupport_light(USERTYPE_UNSUPPORT);
return false;
}
}
return true;
}
lightProxy::lightProxy(Query *query)
: m_cplan(NULL),
m_nodeIdx(-1),
m_context(NULL),
m_stmtName(NULL),
m_formats(NULL),
m_entry(NULL),
m_query(query),
m_handle(NULL)
{
m_cmdType = query ? query->commandType : CMD_UNKNOWN;
queryType = CMD_DML;
m_portalName = NULL;
m_msgctl = (lightProxyMsgCtl*)palloc0(sizeof(lightProxyMsgCtl));
m_msgctl->errData = (lightProxyErrData*)palloc0(sizeof(lightProxyErrData));
m_isRowTriggerShippable = query->isRowTriggerShippable;
initStringInfo(&m_bindMessage);
initStringInfo(&m_describeMessage);
#ifdef LPROXY_DEBUG
m_msgctl->stmt_name = NULL;
m_msgctl->query_string = query->sql_statement;
#endif
}
lightProxy::lightProxy(MemoryContext context, CachedPlanSource *psrc, const char *portalname, const char *stmtname)
: m_cplan(psrc),
m_nodeIdx(-1),
m_context(context),
m_stmtName(NULL),
m_portalName(NULL),
m_formats(NULL),
m_entry(NULL),
m_cmdType(CMD_UNKNOWN),
m_query(NULL),
m_handle(NULL)
{
MemoryContext old_context = MemoryContextSwitchTo(context);
if (psrc) {
m_cmdType = set_cmd_type(psrc->commandTag);
queryType = set_command_type_by_commandTag(psrc->commandTag);
}
initStringInfo(&m_bindMessage);
initStringInfo(&m_describeMessage);
m_msgctl = (lightProxyMsgCtl *)palloc0(sizeof(lightProxyMsgCtl));
m_msgctl->errData = (lightProxyErrData *)palloc0(sizeof(lightProxyErrData));
if (psrc != NULL && list_length(psrc->query_list) == 1 && linitial(psrc->query_list) != NULL) {
m_isRowTriggerShippable = ((Query*)linitial(psrc->query_list))->isRowTriggerShippable;
} else {
m_isRowTriggerShippable = false;
}
if (portalname != NULL && portalname[0] != '\0') {
storeLightProxy(portalname);
} else {
m_portalName = NULL;
}
m_stmtName = (stmtname != NULL && stmtname[0] != '\0') ? pstrdup(stmtname) : NULL;
#ifdef LPROXY_DEBUG
m_msgctl->stmt_name = pstrdup(m_stmtName);
m_msgctl->query_string = pstrdup(m_cplan->query_string);
#endif
MemoryContextSwitchTo(old_context);
}
lightProxy::~lightProxy()
{
m_cplan = NULL;
m_context = NULL;
m_query = NULL;
m_handle = NULL;
m_msgctl = NULL;
m_entry = NULL;
pfree_ext(m_formats);
}
void lightProxy::getResultFormat(StringInfo message)
{
pfree_ext(m_formats);
if (m_cplan->resultDesc == NULL)
return;
/* Get the result format codes */
int numRFormats = pq_getmsgint(message, 2);
int i = 0;
int natts = m_cplan->resultDesc->natts;
int16* formats = NULL;
m_formats = (int16*)palloc(natts * sizeof(int16));
/* Get the result format codes */
if (numRFormats > 0) {
formats = (int16*)palloc(numRFormats * sizeof(int16));
for (i = 0; i < numRFormats; i++)
formats[i] = pq_getmsgint(message, 2);
}
pq_getmsgend(message);
if (numRFormats > 1) {
if (numRFormats != natts) {
pfree_ext(formats);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("bind message has %d result formats but query has %d columns", numRFormats, natts)));
}
int rc = memcpy_s(m_formats, natts * sizeof(int16), formats, natts * sizeof(int16));
securec_check(rc, "\0", "\0");
} else if (numRFormats > 0) {
for (i = 0; i < natts; i++)
m_formats[i] = formats[0];
} else {
for (i = 0; i < natts; i++)
m_formats[i] = 0;
}
pfree_ext(formats);
}
void lightProxy::saveMsg(int msgType, StringInfo message)
{
// to do , save memory consumption
AutoContextSwitch contexts(m_context);
if (msgType == BIND_MESSAGE) {
/* clean previous message if exists */
if (m_bindMessage.len > 0)
resetStringInfo(&m_bindMessage);
if (m_describeMessage.len > 0)
resetStringInfo(&m_describeMessage);
getResultFormat(message);
appendBinaryStringInfo(&m_bindMessage, message->data, message->len);
} else if (msgType == DESC_MESSAGE) {
/* clean previous message if exists */
if (m_describeMessage.len > 0)
resetStringInfo(&m_describeMessage);
appendBinaryStringInfo(&m_describeMessage, message->data, message->len);
}
}
void lightProxy::connect()
{
List* dn_allocate = NULL;
errno_t ss_rc = 0;
m_handle = &u_sess->pgxc_cxt.dn_handles[m_nodeIdx];
if (!IS_VALID_CONNECTION(m_handle)) {
dn_allocate = lappend_int(dn_allocate, m_nodeIdx);
PoolConnDef* pfds = PoolManagerGetConnections(dn_allocate, NULL);
if (pfds == NULL)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("[LIGHT PROXY] Failed to get pooled connections from %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
int fdsock = pfds->fds[0];
PoolConnInfo* conn_info = &pfds->connInfos[0];
pgxc_node_init(m_handle, fdsock);
ss_rc = memcpy_s(&m_handle->connInfo, sizeof(PoolConnInfo), conn_info, sizeof(PoolConnInfo));
securec_check(ss_rc, "\0", "\0");
u_sess->pgxc_cxt.dn_handles[m_nodeIdx] = *m_handle;
u_sess->pgxc_cxt.datanode_count++;
if (pfds->gsock[0].type != GSOCK_INVALID) {
m_handle->gsock = pfds->gsock[0];
m_handle->is_logic_conn = true;
}
pgxc_node_free_def(pfds);
pfds = NULL;
} else if (m_handle->state == DN_CONNECTION_STATE_QUERY) {
BufferConnection(m_handle);
}
}
void lightProxy::sendParseIfNecessary()
{
/* if no stmt_name, we need to send parse every time */
if (m_stmtName == NULL || m_stmtName[0] == '\0') {
if (pgxc_node_send_parse(
m_handle, m_stmtName, m_cplan->query_string, m_cplan->num_params, m_cplan->param_types)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send parse to %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
return;
}
if (unlikely(m_entry == NULL)) {
/*
* If we have reloaded pooler, we need to add it into datanode_queries again,
* as we do in parse phrase previously.
*/
m_entry = light_set_datanode_queries(m_stmtName);
Assert(m_entry != NULL);
}
Assert(m_nodeIdx != -1);
bool need_send_again = false;
/* see if statement already active on the node */
for (int i = 0; i < m_entry->current_nodes_number; i++) {
if (m_entry->dns_node_indices[i] == m_nodeIdx) {
if (ENABLE_CN_GPC) {
need_send_again = true;
} else {
return;
}
}
}
if (pgxc_node_send_parse(
m_handle, m_stmtName, m_cplan->query_string, m_cplan->num_params, m_cplan->param_types)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send parse to %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
if (need_send_again) {
return;
}
/* After cluster expansion, must expand entry->dns_node_indices array too */
if (unlikely(m_entry->current_nodes_number == m_entry->max_nodes_number)) {
int* new_dns_node_indices = (int*)MemoryContextAllocZero(
u_sess->pcache_cxt.datanode_queries->hcxt, m_entry->max_nodes_number * 2 * sizeof(int));
errno_t error_no = EOK;
error_no = memcpy_s(new_dns_node_indices,
m_entry->max_nodes_number * 2 * sizeof(int),
m_entry->dns_node_indices,
m_entry->max_nodes_number * sizeof(int));
securec_check(error_no, "\0", "\0");
pfree_ext(m_entry->dns_node_indices);
m_entry->dns_node_indices = new_dns_node_indices;
m_entry->max_nodes_number = m_entry->max_nodes_number * 2;
elog(LOG,
"expand node ids array for active datanode statements "
"after cluster expansion, now array size is %d",
m_entry->max_nodes_number);
}
/* statement is not active on the specified node append item to the list */
m_entry->dns_node_indices[m_entry->current_nodes_number++] = m_nodeIdx;
}
/*
* @Description: Send BEGIN command to the DataNode. Also send the GXID for the transaction.
* See pgxc_node_begin for more details.
*/
void lightProxy::proxyNodeBegin(bool is_read_only)
{
bool need_tran_block = false;
GlobalTransactionId gxid = InvalidTransactionId;
if (HAS_ROUTER) {
// push down function to router dn, need transaction
is_read_only = false;
}
if (IsTransactionBlock()) {
need_tran_block = true;
} else if (is_read_only) {
need_tran_block = false;
} else {
need_tran_block = true;
}
if (is_read_only) {
gxid = GetCurrentTransactionIdIfAny();
} else if (GTM_MODE) {
gxid = GetCurrentTransactionId();
if (!GlobalTransactionIdIsValid(gxid)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("[LIGHT PROXY] Failed to get new transaction id")));
}
}
/*
* Send GXID and check for errors
*/
if (GTM_MODE && GlobalTransactionIdIsValid(gxid) &&
pgxc_node_send_gxid(m_handle, gxid, false)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send gxid %lu to %s[%u]",
gxid,
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
/*
* If the node is already a participant in the transaction, skip it
*/
if (light_xactnodes_member(false, m_handle) || light_xactnodes_member(true, m_handle)) {
if (!is_read_only) {
RegisterTransactionNodes(1, (void**)&m_handle, true);
}
} else {
SetCurrentStmtTimestamp();
TimestampTz gtmstart_timestamp = GetCurrentGTMStartTimestamp();
TimestampTz stmtsys_timestamp = GetCurrentStmtsysTimestamp();
if (GlobalTimestampIsValid(gtmstart_timestamp) &&
GlobalTimestampIsValid(stmtsys_timestamp) &&
pgxc_node_send_timestamp(m_handle, gtmstart_timestamp, stmtsys_timestamp)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send timestamp to %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
if (need_tran_block) {
if (light_node_send_begin(m_handle, g_instance.attr.attr_storage.enable_gtm_free)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send internal begin to %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
LPROXY_DEBUG(elog(DEBUG2,
"[LIGHT PROXY] Send internal begin to DataNode %u: query %s",
m_handle->nodeoid,
m_msgctl->query_string));
/* recieve message */
m_msgctl->cnMsg = true;
handleResponse();
RegisterTransactionNodes(1, (void**)&m_handle, !is_read_only);
}
}
CommandId cid = GetCurrentCommandId(!is_read_only);
if (pgxc_node_send_cmd_id(m_handle, cid) < 0) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send cid to %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
/* print the XactWriteNodes and XactReadNodes list info */
PrintRegisteredTransactionNodes();
Snapshot snapshot = GetActiveSnapshot();
if (!GTM_FREE_MODE && snapshot != NULL &&
pgxc_node_send_snapshot(m_handle, snapshot)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send snapshot to %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
if (u_sess->attr.attr_resource.use_workload_manager && *u_sess->wlm_cxt->control_group &&
pgxc_node_send_wlm_cgroup(m_handle)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send cgroup to %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
/* Only generate one time when debug_query_id = 0 in CN */
if (unlikely(u_sess->debug_query_id == 0)) {
u_sess->debug_query_id = generate_unique_id64(&gt_queryId);
pgstat_report_queryid(u_sess->debug_query_id);
}
if (pgxc_node_send_queryid(m_handle, u_sess->debug_query_id)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send query id to %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
/* Instrumentation/Unique SQL: send unique sql id to DN node */
if (is_unique_sql_enabled() && pgxc_node_send_unique_sql_id(m_handle)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send unique sql id to %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
}
void lightProxy::setCurrentProxy(lightProxy* proxy)
{
/* set process_count = NULL for common PBE */
if (proxy != NULL && proxy->m_msgctl != NULL && proxy->m_msgctl->process_count != NULL)
proxy->m_msgctl->process_count = NULL;
u_sess->exec_cxt.cur_light_proxy_obj = proxy;
}
ExecNodes* lightProxy::checkRouterQuery(Query* query)
{
ExecNodes* exec_nodes = NULL;
if (!isSupportLightQuery(query) || !HAS_ROUTER) {
return NULL;
}
exec_nodes = makeNode(ExecNodes);
exec_nodes->nodeList = lappend_int(exec_nodes->nodeList, u_sess->exec_cxt.CurrentRouter->GetRouterNodeId());
return exec_nodes;
}
/*
* Constraints specially defined for Light CN are checked here
*/
ExecNodes* lightProxy::checkLightQuery(Query* query)
{
ExecNodes* exec_nodes = NULL;
/* for UPSERT, use the insert part to check Light CN */
if (query->upsertQuery != NULL) {
/* only allow UPSERT transformed MERGE statement have an upsertQuery */
if (unlikely(query->commandType != CMD_MERGE)) {
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_OPTIMIZER_INCONSISTENT_STATE),
(errmsg("INSERT ON DUPLICATE KEY UPDATE must have an transformed InsertStmt query."))));
}
query = query->upsertQuery;
}
if (!isSupportLightQuery(query)) {
return NULL;
}
/* handle the un-supported statements, obvious errors etc. */
pgxc_handle_unsupported_stmts(query);
/* Do permissions checks */
Assert(IS_PGXC_COORDINATOR && !IsConnFromCoord());
(void)ExecCheckRTPerms(query->rtable, true);
exec_nodes = pgxc_is_query_shippable(query, 0, true);
return exec_nodes;
}
void lightProxy::tearDown(lightProxy *proxy)
{
MemoryContext context = proxy->m_context;
proxy->removeLpByStmtName(proxy->m_stmtName);
removeLightProxy(proxy->m_portalName);
MemoryContextDelete(context);
lightProxy::setCurrentProxy(NULL);
}
void lightProxy::initStmtHtab()
{
HASHCTL hash_ctl;
errno_t rc = 0;
int htab_size = 64;
rc = memset_s(&hash_ctl, sizeof(hash_ctl), 0, sizeof(hash_ctl));
securec_check(rc, "\0", "\0");
hash_ctl.keysize = NAMEDATALEN;
hash_ctl.entrysize = sizeof(stmtLpObj);
hash_ctl.hcxt = u_sess->cache_mem_cxt;
u_sess->pcache_cxt.stmt_lightproxy_htab = hash_create("lightProxy Named Object for GPC",
htab_size,
&hash_ctl,
HASH_ELEM | HASH_CONTEXT);
}
void lightProxy::initlightProxyTable()
{
HASHCTL hash_ctl;
errno_t rc = 0;
rc = memset_s(&hash_ctl, sizeof(hash_ctl), 0, sizeof(hash_ctl));
securec_check(rc, "\0", "\0");
hash_ctl.keysize = NAMEDATALEN;
hash_ctl.entrysize = sizeof(lightProxyNamedObj);
hash_ctl.hcxt = u_sess->cache_mem_cxt;
u_sess->pcache_cxt.lightproxy_objs = hash_create("lightProxy Named Object", 64, &hash_ctl, HASH_ELEM | HASH_CONTEXT);
}
void lightProxy::removeLpByStmtName(const char *stmtname)
{
if (u_sess->pcache_cxt.stmt_lightproxy_htab && stmtname != NULL && stmtname[0] != '\0') {
(void)hash_search(u_sess->pcache_cxt.stmt_lightproxy_htab, stmtname, HASH_REMOVE, NULL);
}
}
void lightProxy::removeLightProxy(const char* portalname)
{
if(u_sess->pcache_cxt.lightproxy_objs && portalname != NULL) {
hash_search(u_sess->pcache_cxt.lightproxy_objs, portalname, HASH_REMOVE, NULL);
}
}
void lightProxy::storeLpByStmtName(const char *stmtname)
{
stmtLpObj *entry = NULL;
if (!u_sess->pcache_cxt.stmt_lightproxy_htab)
initStmtHtab();
entry = (stmtLpObj *)hash_search(u_sess->pcache_cxt.stmt_lightproxy_htab, stmtname, HASH_ENTER, NULL);
entry->proxy = this;
}
void lightProxy::storeLightProxy(const char* portalname)
{
lightProxyNamedObj* entry = NULL;
if(!u_sess->pcache_cxt.lightproxy_objs)
initlightProxyTable();
entry = (lightProxyNamedObj*)hash_search(u_sess->pcache_cxt.lightproxy_objs, portalname, HASH_ENTER, NULL);
entry->proxy = this;
pfree_ext(m_portalName);
MemoryContext old_context = MemoryContextSwitchTo(m_context);
m_portalName = pstrdup(portalname);
(void)MemoryContextSwitchTo(old_context);
}
lightProxy *lightProxy::locateLpByStmtName(const char *stmtname)
{
stmtLpObj *entry = NULL;
if (u_sess->pcache_cxt.stmt_lightproxy_htab && stmtname && stmtname[0] != '\0') {
entry = (stmtLpObj *)hash_search(u_sess->pcache_cxt.stmt_lightproxy_htab, stmtname, HASH_FIND, NULL);
}
if (entry) {
return entry->proxy;
} else {
return NULL;
}
}
lightProxy* lightProxy::locateLightProxy(const char* portalname)
{
lightProxyNamedObj* entry = NULL;
if(u_sess->pcache_cxt.lightproxy_objs) {
entry = (lightProxyNamedObj*)hash_search(u_sess->pcache_cxt.lightproxy_objs, portalname, HASH_FIND, NULL);
}
if(entry) {
return entry->proxy;
} else {
return NULL;
}
}
bool lightProxy::processMsg(int msgType, StringInfo msg)
{
lightProxy* lp = u_sess->exec_cxt.cur_light_proxy_obj;
if (msgType == EXEC_MESSAGE) {
lp = lightProxy::tryLocateLightProxy(msg);
}
bool res = false;
bool old_status = u_sess->exec_cxt.need_track_resource;
if (lp != NULL) {
switch (msgType) {
case BIND_MESSAGE:
case DESC_MESSAGE:
lp->saveMsg(msgType, msg);
break;
case EXEC_MESSAGE:
if (u_sess->attr.attr_resource.resource_track_cost == 0 &&
u_sess->attr.attr_resource.enable_resource_track &&
u_sess->attr.attr_resource.resource_track_level != RESOURCE_TRACK_NONE) {
u_sess->exec_cxt.need_track_resource = true;
WLMSetCollectInfoStatus(WLM_STATUS_RUNNING);
}
lp->runMsg(msg);
u_sess->exec_cxt.need_track_resource = old_status;
break;
default:
ereport(ERROR,
(errcode(ERRCODE_CASE_NOT_FOUND), errmsg("invalid msgType %d for process message \n", msgType)));
}
res = true;;
}
/*
* Emit duration logging if appropriate.
*/
char msec_str[32];
switch (check_log_duration(msec_str, false)) {
case 1:
ereport(DEBUG1, (errmsg("duration: %s ms, queryid %ld, unique id %lu", msec_str, u_sess->debug_query_id, u_sess->slow_query_cxt.slow_query.unique_sql_id), errhidestmt(true)));
break;
case 2: {
ereport(DEBUG1,
(errmsg("duration: %s ms queryid %ld unique id %ld", msec_str, u_sess->debug_query_id, u_sess->slow_query_cxt.slow_query.unique_sql_id),
errhidestmt(true)));
break;
}
default:
break;
}
return res;
}
void lightProxy::assemableMsg(char msgtype, StringInfo msgBuf, bool trigger_ship)
{
int msg_len = 4 + msgBuf->len;
errno_t ss_rc;
/* If trigger is being shipped to DN. */
if (trigger_ship) {
ensure_out_buffer_capacity(msg_len + 2, m_handle);
m_handle->outBuffer[m_handle->outEnd++] = 'a';
} else {
ensure_out_buffer_capacity(msg_len + 1, m_handle);
}
m_handle->outBuffer[m_handle->outEnd++] = msgtype;
msg_len = htonl(msg_len);
ss_rc = memcpy_s(m_handle->outBuffer + m_handle->outEnd, m_handle->outSize - m_handle->outEnd,
&msg_len, sizeof(uint32));
securec_check(ss_rc, "\0", "\0");
m_handle->outEnd += 4;
ss_rc = memcpy_s(m_handle->outBuffer + m_handle->outEnd, m_handle->outSize - m_handle->outEnd,
msgBuf->data, (size_t)msgBuf->len);
securec_check(ss_rc, "\0", "\0");
m_handle->outEnd += msgBuf->len;
}
void lightProxy::handleResponse()
{
int res = 0;
/*
* Reset lightProxyErrData. This only happens for PBE.
* Memory of lightProxyErrData itself is in m_context, no need to free here.
* Memory of char* inside is in t_thrd.mem_cxt.msg_mem_cxt, no need to free too.
*/
if (m_msgctl->errData->hasError) {
errno_t rc = 0;
rc = memset_s(m_msgctl->errData, sizeof(lightProxyErrData), 0, sizeof(lightProxyErrData));
securec_check_c(rc, "\0", "\0");
}
m_handle->state = DN_CONNECTION_STATE_QUERY;
// process all messages.
while (true) {
if (m_handle->is_logic_conn)
res = light_node_receive_from_logic_conn(m_handle);
else
res = light_node_receive(m_handle);
if (res) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("[LIGHT PROXY] Failed to fetch from Datanode %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
res = light_handle_response(m_handle, m_msgctl, this);
if (res == LPROXY_FINISH) {
break;
} else if (res == LPROXY_ERROR) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Unexpected response from %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
}
/* report error if any */
if (m_msgctl->errData->hasError) {
setCurrentProxy(NULL);
light_node_report_error(m_msgctl->errData);
}
}
lightProxy* lightProxy::tryLocateLightProxy(StringInfo msg)
{
lightProxy* lp = u_sess->exec_cxt.cur_light_proxy_obj;
int oldCursor = msg->cursor;
const char* portal_name = pq_getmsgstring(msg);
if(portal_name[0] != '\0') {
lp = lightProxy::locateLightProxy(portal_name);
lightProxy::setCurrentProxy(lp);
}
msg->cursor = oldCursor;
return lp;
}
void lightProxy::runSimpleQuery(StringInfo exec_message)
{
connect();
LPROXY_DEBUG(ereport(DEBUG2, (errmodule(MOD_LIGHTPROXY), errmsg(
"[LIGHT PROXY] Got exec_simple_query slim to Datanode %u: query %s",
m_handle->nodeoid,
m_query->sql_statement))));
bool is_read_only = (m_query->commandType == CMD_SELECT && !m_query->hasForUpdate);
proxyNodeBegin(is_read_only);
bool trigger_ship = false;
if (u_sess->attr.attr_sql.enable_trigger_shipping && m_isRowTriggerShippable)
trigger_ship = true;
assemableMsg('Q', exec_message, trigger_ship);
if (pgxc_node_flush(m_handle)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Unexpected response from %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
m_msgctl->sendDMsg = true;
m_msgctl->cnMsg = false;
m_msgctl->hasResult = (m_query->commandType == CMD_SELECT || m_query->returningList != NIL);
handleResponse();
/* pgaudit */
if ((u_sess->attr.attr_security.Audit_DML_SELECT != 0 || u_sess->attr.attr_security.Audit_DML != 0) &&
u_sess->attr.attr_security.Audit_enabled && IsPostmasterEnvironment) {
light_pgaudit_ExecutorEnd(m_query);
}
/* unified auditing policy */
if (!(g_instance.status > NoShutdown) && light_unified_audit_executor_hook) {
light_unified_audit_executor_hook(m_query);
}
/* doing sql count accordiong to cmdType */
if (u_sess->attr.attr_common.pgstat_track_activities && u_sess->attr.attr_common.pgstat_track_sql_count &&
!u_sess->attr.attr_sql.enable_cluster_resize) {
report_qps_type(m_cmdType);
report_qps_type(queryType);
}
/* update unique sql stat */
if (is_unique_sql_enabled() && is_local_unique_sql()) {
UpdateUniqueSQLStat(NULL, NULL, GetCurrentStatementLocalStartTimestamp());
}
pgstate_update_percentile_responsetime();
// no more proxy
setCurrentProxy(NULL);
report_iud_time_for_lightproxy(m_query);
}
int lightProxy::runBatchMsg(StringInfo batch_message, bool sendDMsg, int batch_count)
{
int process_count = 0;
SetUniqueSQLIdFromCachedPlanSource(this->m_cplan);
connect();
LPROXY_DEBUG(elog(DEBUG2,
"[LIGHT PROXY] Got Batch slim to DataNode %u: name %s, query %s",
m_handle->nodeoid,
m_stmtName,
m_cplan->query_string));
/* Must set snapshot before starting executor. */
PushActiveSnapshot(GetTransactionSnapshot(GTM_LITE_MODE));
proxyNodeBegin(m_cplan->is_read_only);
/* check if we need to send parse or not */
sendParseIfNecessary();
assemableMsg('U', batch_message);
// send sync message and flush
if (pgxc_node_send_sync(m_handle)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send sync to %s[%u]",
m_handle->remoteNodeName,
m_handle->nodeoid)));
}
m_msgctl->sendDMsg = sendDMsg;
m_msgctl->process_count = &process_count;
m_msgctl->cnMsg = false;
m_msgctl->hasResult = (m_cplan->resultDesc != NULL) ? true : false;
handleResponse();
PopActiveSnapshot();
/*
* We need a CommandCounterIncrement after every query, except
* those that start or end a transaction block.
*/
CommandCounterIncrement();
/* pgaudit */
if ((u_sess->attr.attr_security.Audit_DML_SELECT != 0 || u_sess->attr.attr_security.Audit_DML != 0) &&
u_sess->attr.attr_security.Audit_enabled && IsPostmasterEnvironment) {
for (int i = 0; i < batch_count; i++)
light_pgaudit_ExecutorEnd((Query*)linitial(m_cplan->query_list));
}
/* unified auditing policy */
if (!(g_instance.status > NoShutdown) && light_unified_audit_executor_hook) {
light_unified_audit_executor_hook((Query*)linitial(m_cplan->query_list));
}
/*
* track_sql_count is on, counting WaitEventSQL for per user
*/
if (u_sess->attr.attr_common.pgstat_track_activities && u_sess->attr.attr_common.pgstat_track_sql_count &&
!u_sess->attr.attr_sql.enable_cluster_resize) {
for (int i = 0; i < batch_count; i++) {
report_qps_type(m_cmdType);
report_qps_type(queryType);
}
}
// finish with this proxy
setCurrentProxy(NULL);
return process_count;
}
void lightProxy::runMsg(StringInfo exec_message)
{
/*
* If we are in aborted transaction state, the only portals we can
* actually run are those containing COMMIT or ROLLBACK commands.
*/
if (IsAbortedTransactionBlockState())
ereport(ERROR,
(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
errmsg("current transaction is aborted, "
"commands ignored until end of transaction block, firstChar[%c]",
u_sess->proc_cxt.firstChar), 0));
connect();
LPROXY_DEBUG(elog(DEBUG2,
"[LIGHT PROXY] Got Execute slim to DataNode %u: name %s, query %s",
m_handle->nodeoid,
m_stmtName,
m_cplan->query_string));
bool trigger_ship = false;
if (u_sess->attr.attr_sql.enable_trigger_shipping && m_isRowTriggerShippable)
trigger_ship = true;
/*
* Ensure we are in a transaction command (this should normally be the
* case already due to prior BIND).
*/
start_xact_command();
/* Set after start transaction in case there is no CurrentResourceOwner */
SetUniqueSQLIdFromCachedPlanSource(this->m_cplan);
/* Must set snapshot before starting executor, unless it is a MOT tables transaction. */
#ifdef ENABLE_MOT
if (!IsMOTEngineUsed()) {
#endif
PushActiveSnapshot(GetTransactionSnapshot(GTM_LITE_MODE));
#ifdef ENABLE_MOT
}
#endif
proxyNodeBegin(m_cplan->is_read_only);
/* check if we need to send parse or not */
sendParseIfNecessary();
if (m_bindMessage.len > 0) {
assemableMsg('B', &m_bindMessage);
resetStringInfo(&m_bindMessage);
}
if (m_describeMessage.len > 0) {
assemableMsg('D', &m_describeMessage);
resetStringInfo(&m_describeMessage);
m_msgctl->sendDMsg = true;
} else {
m_msgctl->sendDMsg = false;
}
assemableMsg('E', exec_message, trigger_ship);
/* send sync message and flush */
if (pgxc_node_send_sync(m_handle)) {
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("[LIGHT PROXY] Failed to send sync to %s[%u]",
m_handle->remoteNodeName, m_handle->nodeoid)));
}
m_msgctl->cnMsg = false;
m_msgctl->hasResult = (m_cplan->resultDesc != NULL) ? true : false;
handleResponse();
#ifdef ENABLE_MOT
if (!IsMOTEngineUsed()) {
#endif
PopActiveSnapshot();
#ifdef ENABLE_MOT
}
#endif
/*
* We need a CommandCounterIncrement after every query, except
* those that start or end a transaction block.
*/
CommandCounterIncrement();
t_thrd.wlm_cxt.parctl_state.except = 0;
if (ENABLE_WORKLOAD_CONTROL) {
if (g_instance.wlm_cxt->dynamic_workload_inited) {
dywlm_client_max_release(&t_thrd.wlm_cxt.parctl_state);
} else {
WLMParctlRelease(&t_thrd.wlm_cxt.parctl_state);
}
}
/* pgaudit */
if ((u_sess->attr.attr_security.Audit_DML_SELECT != 0 || u_sess->attr.attr_security.Audit_DML != 0) &&
u_sess->attr.attr_security.Audit_enabled && IsPostmasterEnvironment) {
light_pgaudit_ExecutorEnd((Query*)linitial(m_cplan->query_list));
}
/* unified auditing policy */
if (!(g_instance.status > NoShutdown) && light_unified_audit_executor_hook) {
light_unified_audit_executor_hook((Query*)linitial(m_cplan->query_list));
}
/*
* track_sql_count is on, counting WaitEventSQL for per user
*/
if (u_sess->attr.attr_common.pgstat_track_activities && u_sess->attr.attr_common.pgstat_track_sql_count &&
!u_sess->attr.attr_sql.enable_cluster_resize) {
report_qps_type(m_cmdType);
report_qps_type(queryType);
}
/* update unique sql stat */
if (is_unique_sql_enabled() && is_local_unique_sql()) {
UpdateUniqueSQLStat(NULL, NULL, GetCurrentStatementLocalStartTimestamp());
}
pgstate_update_percentile_responsetime();
setCurrentProxy(NULL);
}
bool lightProxy::isDeleteLimit(const Query* query)
{
if (query == NULL || query->commandType != CMD_DELETE)
return false;
if (query->limitCount != NULL)
return true;
return false;
}
/*
* @Description: according to commandType get corresponsile WaitEventSQL,
* and call function 'pgstat_report_wait_count' to increase sql count
*/
void report_qps_type(CmdType commandType)
{
switch (commandType) {
case CMD_SELECT:
pgstat_report_wait_count(WAIT_EVENT_SQL_SELECT);
break;
case CMD_UPDATE:
pgstat_report_wait_count(WAIT_EVENT_SQL_UPDATE);
break;
case CMD_INSERT:
pgstat_report_wait_count(WAIT_EVENT_SQL_INSERT);
break;
case CMD_DELETE:
pgstat_report_wait_count(WAIT_EVENT_SQL_DELETE);
break;
case CMD_MERGE:
pgstat_report_wait_count(WAIT_EVENT_SQL_MERGEINTO);
break;
case CMD_DML:
pgstat_report_wait_count(WAIT_EVENT_SQL_DML);
break;
case CMD_DDL:
pgstat_report_wait_count(WAIT_EVENT_SQL_DDL);
break;
case CMD_DCL:
pgstat_report_wait_count(WAIT_EVENT_SQL_DCL);
break;
case CMD_TCL:
pgstat_report_wait_count(WAIT_EVENT_SQL_TCL);
break;
default:
/* do not map any commandType */
break;
}
}
/*
* @Description: according to sql query get corresponsile cmdType
* @in - const char *commandTag
* @out - static CmdType
*/
CmdType set_cmd_type(const char* commandTag)
{
CmdType cmd_type = CMD_UNKNOWN;
if (strcmp(commandTag, "SELECT") == 0)
cmd_type = CMD_SELECT;
else if (strcmp(commandTag, "UPDATE") == 0)
cmd_type = CMD_UPDATE;
else if (strcmp(commandTag, "INSERT") == 0)
cmd_type = CMD_INSERT;
else if (strcmp(commandTag, "DELETE") == 0)
cmd_type = CMD_DELETE;
else if (strcmp(commandTag, "MERGE") == 0)
cmd_type = CMD_MERGE;
return cmd_type;
}
CmdType set_command_type_by_commandTag(const char* commandTag)
{
CmdType cmd_type = CMD_UNKNOWN;
int i;
for (i = 0; i < MAX_COMMAND; i++) {
if (strstr(commandTag, g_command_type_array[i].commandTag))
return g_command_type_array[i].type;
}
return cmd_type;
}
bool IsLightProxyOn(void)
{
return (u_sess->exec_cxt.cur_light_proxy_obj != NULL);
}
bool exec_query_through_light_proxy(List* querytree_list, Node* parsetree, bool snapshot_set, StringInfo msg,
MemoryContext OptimizerContext)
{
if ((list_length(querytree_list) == 1) && !IsA(parsetree, CreateTableAsStmt)) {
ExecNodes* single_exec_node = NULL;
lightProxy* proxy = NULL;
Query* query = (Query*)linitial(querytree_list);
if (ENABLE_ROUTER(query->commandType)) {
single_exec_node = lightProxy::checkRouterQuery(query);
} else {
single_exec_node = lightProxy::checkLightQuery(query);
}
/* only deal with single node */
if (single_exec_node && list_length(single_exec_node->nodeList) +
list_length(single_exec_node->primarynodelist) == 1) {
/* GTMLite: need to mark that this is single shard statement */
u_sess->exec_cxt.single_shard_stmt = true;
if (CmdtypeSupportsHotkey(query->commandType))
SendHotkeyToPgstat();
proxy = New(OptimizerContext) lightProxy(query);
proxy->m_nodeIdx = linitial_int(single_exec_node->nodeList);
bool old_status = u_sess->exec_cxt.need_track_resource;
if (u_sess->attr.attr_resource.resource_track_cost == 0 &&
u_sess->attr.attr_resource.enable_resource_track &&
u_sess->attr.attr_resource.resource_track_level != RESOURCE_TRACK_NONE) {
u_sess->exec_cxt.need_track_resource = true;
WLMSetCollectInfoStatus(WLM_STATUS_RUNNING);
}
proxy->runSimpleQuery(msg);
/* Done with the snapshot used for parsing/planning */
if (snapshot_set) {
PopActiveSnapshot();
}
FreeExecNodes(&single_exec_node);
u_sess->exec_cxt.need_track_resource = old_status;
t_thrd.wlm_cxt.parctl_state.except = 0;
return true;
}
FreeExecNodes(&single_exec_node);
CleanHotkeyCandidates(true);
return false;
}
return false;
}
void GPCDropLPIfNecessary(const char *stmt_name, bool need_drop_dnstmt, bool need_del, CachedPlanSource *reset_plan) {
if (stmt_name == NULL || stmt_name[0] == '\0' || !IS_PGXC_COORDINATOR)
return;
lightProxy *lp = lightProxy::locateLpByStmtName(stmt_name);
if (lp != NULL) {
if (reset_plan) {
lp->m_cplan = reset_plan;
}
if (stmt_name && need_drop_dnstmt) {
lp->m_entry = NULL;
DropDatanodeStatement(stmt_name);
}
if (need_del) {
lightProxy::tearDown(lp);
}
return;
}
return;
}
void GPCFillMsgForLp(CachedPlanSource* psrc)
{
Assert(psrc != NULL);
Assert(psrc->gpc.status.IsSharePlan());
if (psrc->gpc.status.InSavePlanList(GPC_SHARED)) {
pfree_ext(psrc->gpc.key);
MemoryContext oldcxt = MemoryContextSwitchTo(psrc->context);
psrc->gpc.key = (GPCKey*)palloc0(sizeof(GPCKey));
psrc->gpc.key->query_string = psrc->query_string;
psrc->gpc.key->query_length = (uint32)strlen(psrc->query_string);
psrc->gpc.key->spi_signature = psrc->spi_signature;
GlobalPlanCache::EnvFill(&psrc->gpc.key->env, psrc->dependsOnRole);
psrc->gpc.key->env.search_path = psrc->search_path;
psrc->gpc.key->env.num_params = psrc->num_params;
(void)MemoryContextSwitchTo(oldcxt);
}
}