/* * Copyright (c) 2020 Huawei Technologies Co.,Ltd. * * openGauss is licensed under Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: * * http://license.coscl.org.cn/MulanPSL2 * * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PSL v2 for more details. * --------------------------------------------------------------------------------------- * * IDENTIFICATION * contrib/gauss_connector/gc_fdw.cpp * * --------------------------------------------------------------------------------------- */ #include "postgres.h" #include "knl/knl_variable.h" #include "gc_fdw.h" #include "access/htup.h" #include "access/sysattr.h" #include "access/transam.h" #include "catalog/pg_class.h" #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "libpq/pqformat.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/cost.h" #include "optimizer/clauses.h" #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" #include "optimizer/restrictinfo.h" #include "optimizer/var.h" #include "optimizer/tlist.h" #include "parser/parsetree.h" #include "pgxc/pgxcnode.h" #include "pgxc/pgFdwRemote.h" #include "storage/buf/block.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/selfuncs.h" /* Default CPU cost to start up a foreign query. */ #define DEFAULT_FDW_STARTUP_COST 100.0 /* Default CPU cost to process 1 row (above and beyond u_sess->attr.attr_sql.cpu_tuple_cost). */ #define DEFAULT_FDW_TUPLE_COST 0.01 /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 /* * Execution state of a foreign scan using gc_fdw. */ typedef struct GcFdwScanState { Relation rel; /* relcache entry for the foreign table. NULL * for a foreign join scan. */ TupleDesc tupdesc; /* tuple descriptor of scan */ AttInMetadata* attinmeta; /* attribute datatype conversion metadata */ /* extracted fdw_private data */ char* query; /* text of SELECT command */ List* retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ PGconn* conn; /* connection for the scan */ PGXCNodeAllHandles* pgxc_handle; unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ FmgrInfo* param_flinfo; /* output conversion functions for them */ List* param_exprs; /* executable expressions for param values */ const char** param_values; /* textual values of query parameters */ /* for storing result tuples */ HeapTuple* tuples; /* array of currently-retrieved tuples */ int num_tuples; /* # of tuples in array */ int next_tuple; /* index of next one to return */ /* batch-level state, for optimizing rewinds and avoiding useless fetch */ int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ int fetch_size; /* number of tuples per fetch */ int num_datanode; /* number of execute datanode */ int current_idx; /* current index of datanode */ int cycle_idx; /* cycle index of datanode */ int max_idx; /* max index of datanode */ bool have_remote_encoding; /* remote encoding */ int remote_encoding; PgFdwRemoteInfo* remoteinfo; /* remote table info */ IterateForeignScan_function IterateForeignScan; VecIterateForeignScan_function VecIterateForeignScan; ReScanForeignScan_function ReScanForeignScan; Oid serverid; bool hasagg; /* agg in sql? */ bool has_array; RemoteQueryState* remotestate; TupleTableSlot* resultSlot; TupleTableSlot* scanSlot; } GcFdwScanState; /* * Identify the attribute where data conversion fails. */ typedef struct ConversionLocation { Relation rel; /* foreign table's relcache entry. */ AttrNumber cur_attno; /* attribute number being processed, or 0 */ /* * In case of foreign join push down, fdw_scan_tlist is used to identify * the Var node corresponding to the error location and * fsstate->ss.ps.state gives access to the RTEs of corresponding relation * to get the relation name and attribute name. */ ForeignScanState* fsstate; } ConversionLocation; /* Callback argument for ec_member_matches_foreign */ typedef struct { Expr* current; /* current expr, or NULL if not yet found */ List* already_used; /* expressions already dealt with */ } ec_member_foreign_arg; /* * SQL functions */ PG_FUNCTION_INFO_V1(gc_fdw_handler); /* * FDW callback routines */ static void gcGetForeignRelSize(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid); static void gcGetForeignPaths(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid); static ForeignScan *gcGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan); static void gcBeginForeignScan(ForeignScanState* node, int eflags); static TupleTableSlot* gcIterateForeignScan(ForeignScanState* node); static VectorBatch* gcIterateVecForeignScan(VecForeignScanState* node); static void gcReScanForeignScan(ForeignScanState* node); static TupleTableSlot* gcIterateNormalForeignScan(ForeignScanState* node); static VectorBatch* gcIterateNormalVecForeignScan(VecForeignScanState* node); static TupleTableSlot* gcIteratePBEForeignScan(ForeignScanState* node); static VectorBatch* gcIteratePBEVecForeignScan(VecForeignScanState* node); static void gcReScanNormalForeignScan(ForeignScanState* node); static void gcReScanPBEForeignScan(ForeignScanState* node); static void gcEndForeignScan(ForeignScanState* node); static void gcExplainForeignScan(ForeignScanState* node, ExplainState* es); static bool gcAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc* func, BlockNumber* totalpages, void* additionalData, bool estimate_table_rownum); /* * Helper functions */ static void gc_estimate_path_cost_size(PlannerInfo* root, RelOptInfo* baserel, List* join_conds, List* pathkeys, double* p_rows, int* p_width, Cost* p_startup_cost, Cost* p_total_cost); static void create_cursor(ForeignScanState* node); static void fetch_more_data(ForeignScanState* node); static int gcfdw_send_and_fetch_version(PGXCNodeAllHandles* pgxc_handles); static void gcfdw_fetch_remote_table_info( PGXCNodeAllHandles* pgxc_handles, ForeignTable* table, void* remote_info, PgFdwMessageTag tag); static bool gcfdw_get_table_encode(ForeignTable* table, int* remote_encoding); static void gcfdw_send_remote_encode(PGXCNodeAllHandles* pgxc_handles, int remote_encoding); static void gcfdw_send_remote_query_param(GcFdwScanState* fsstate); static void gcfdw_get_datanode_idx(ForeignScanState* node); static void close_cursor(PGconn* conn, unsigned int cursor_number); static void prepare_query_params(PlanState* node, List* fdw_exprs, int numParams, FmgrInfo** param_flinfo, List** param_exprs, const char*** param_values); static void process_query_params( ExprContext* econtext, FmgrInfo* param_flinfo, List* param_exprs, const char** param_values); static HeapTuple make_tuple_from_result_row(PGresult* res, int row, Relation rel, AttInMetadata* attinmeta, List* retrieved_attrs, ForeignScanState* fsstate, MemoryContext temp_context); static HeapTuple make_tuple_from_agg_result(PGresult* res, int row, Relation rel, AttInMetadata* attinmeta, List* retrieved_attrs, ForeignScanState* fsstate, MemoryContext temp_context); static void conversion_error_callback(void* arg); static void gcValidateTableDef(Node* Obj); static bool CheckForeignExtOption(List* extOptList, const char* str); static bool GcFdwCanSkip(GcFdwScanState* fsstate); static void GcFdwCopyRemoteInfo(PgFdwRemoteInfo* new_remote_info, PgFdwRemoteInfo* ori_remote_info); extern bool hasSpecialArrayType(TupleDesc desc); extern Snapshot CopySnapshotByCurrentMcxt(Snapshot snapshot); /* * Foreign-data wrapper handler function: return a struct with pointers * to my callback routines. */ Datum gc_fdw_handler(PG_FUNCTION_ARGS) { FdwRoutine* routine = makeNode(FdwRoutine); /* Functions for scanning foreign tables */ routine->GetForeignRelSize = gcGetForeignRelSize; routine->GetForeignPaths = gcGetForeignPaths; routine->GetForeignPlan = gcGetForeignPlan; routine->BeginForeignScan = gcBeginForeignScan; routine->IterateForeignScan = gcIterateForeignScan; routine->VecIterateForeignScan = gcIterateVecForeignScan; routine->ReScanForeignScan = gcReScanForeignScan; routine->EndForeignScan = gcEndForeignScan; /* Support functions for EXPLAIN */ routine->ExplainForeignScan = gcExplainForeignScan; /* Support functions for ANALYZE */ routine->AnalyzeForeignTable = gcAnalyzeForeignTable; /* Check create/alter foreign table */ routine->ValidateTableDef = gcValidateTableDef; PG_RETURN_POINTER(routine); } /* * gcGetForeignRelSize * Estimate # of rows and width of the result of the scan * * We should consider the effect of all baserestrictinfo clauses here, but * not any join clauses. */ static void gcGetForeignRelSize(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid) { if (isRestoreMode) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmodule(MOD_COOP_ANALYZE), errmsg("cooperation analysis: can't execute the query in restore mode."))); } if (u_sess->attr.attr_common.upgrade_mode != 0) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmodule(MOD_COOP_ANALYZE), errmsg("cooperation analysis: can't execute the query in upgrade mode."))); } if (IS_PGXC_DATANODE) { Relation relation = RelationIdGetRelation(foreigntableid); if (RelationIsValid(relation)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmodule(MOD_COOP_ANALYZE), errmsg("Query on datanode is not " "supported currently for the foreign table: %s.", RelationGetRelationName(relation)))); } else { ereport(ERROR, (errcode(ERRCODE_UNDEFINED_TABLE), errmodule(MOD_COOP_ANALYZE), errmsg("could not open relation with OID %u", foreigntableid))); } } GcFdwRelationInfo* fpinfo = NULL; ListCell* lc = NULL; RangeTblEntry* rte = planner_rt_fetch(baserel->relid, root); const char* relnamespace = NULL; const char* relname = NULL; const char* refname = NULL; /* * We use GcFdwRelationInfo to pass various information to subsequent * functions. */ fpinfo = (GcFdwRelationInfo*)palloc0(sizeof(GcFdwRelationInfo)); baserel->fdw_private = (void*)fpinfo; /* Base foreign tables need to be pushed down always. */ fpinfo->pushdown_safe = true; /* Look up foreign-table catalog info. */ fpinfo->table = GetForeignTable(foreigntableid); fpinfo->server = GetForeignServer(fpinfo->table->serverid); /* * Extract user-settable option values. */ fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; fpinfo->shippable_extensions = NIL; fpinfo->fetch_size = 1000; PGXCNodeAllHandles* pgxc_handle = NULL; int remote_encoding; bool have_remote_encoding = gcfdw_get_table_encode(fpinfo->table, &remote_encoding); (void)GetConnection(fpinfo->table->serverid, &pgxc_handle, true, have_remote_encoding); PgFdwRemoteInfo* remote_info = &fpinfo->remote_info; u_sess->pgxc_cxt.gc_fdw_run_version = gcfdw_send_and_fetch_version(pgxc_handle); if (u_sess->pgxc_cxt.gc_fdw_run_version >= GCFDW_VERSION_V1R8C10_1 && have_remote_encoding == true) { gcfdw_send_remote_encode(pgxc_handle, remote_encoding); } gcfdw_fetch_remote_table_info(pgxc_handle, fpinfo->table, remote_info, PGFDW_GET_TABLE_INFO); Assert(remote_info->snapsize > 0); if (remote_info->snapsize == 0) { ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND), errmsg("cooperation analysis: not receive the snapshot from remote cluster"))); } /* * Identify which baserestrictinfo clauses can be sent to the remote * server and which can't. */ classifyConditions(root, baserel, baserel->baserestrictinfo, &fpinfo->remote_conds, &fpinfo->local_conds); /* * Identify which attributes will need to be retrieved from the remote * server. These include all attrs needed for joins or final output, plus * all attrs used in the local_conds. (Note: if we end up using a * parameterized scan, it's possible that some of the join clauses will be * sent to the remote and thus we wouldn't really need to retrieve the * columns used in them. Doesn't seem worth detecting that case though.) */ fpinfo->attrs_used = NULL; pull_varattnos((Node*)baserel->reltarget->exprs, baserel->relid, &fpinfo->attrs_used); foreach (lc, fpinfo->local_conds) { RestrictInfo* rinfo = lfirst_node(RestrictInfo, lc); pull_varattnos((Node*)rinfo->clause, baserel->relid, &fpinfo->attrs_used); } /* * Compute the selectivity and cost of the local_conds, so we don't have * to do it over again for each path. The best we can do for these * conditions is to estimate selectivity on the basis of local statistics. */ fpinfo->local_conds_sel = clauselist_selectivity(root, fpinfo->local_conds, baserel->relid, JOIN_INNER, NULL); cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); /* * Set cached relation costs to some negative value, so that we can detect * when they are set to some sensible costs during one (usually the first) * of the calls to gc_estimate_path_cost_size(). */ fpinfo->rel_startup_cost = -1; fpinfo->rel_total_cost = -1; /* * estimate using whatever statistics we * have locally, in a way similar to ordinary tables. */ { /* * If the foreign table has never been ANALYZEd, it will have relpages * and reltuples equal to zero, which most likely has nothing to do * with reality. We can't do a whole lot about that if we're not * allowed to consult the remote server, but we can use a hack similar * to plancat.c's treatment of empty relations: use a minimum size * estimate of 10 pages, and divide by the column-datatype-based width * estimate to get the corresponding number of tuples. */ if (baserel->pages == 0 && baserel->tuples == 0) { baserel->pages = 10; const int BLOCK_NUM = 10; baserel->tuples = (double)(BLOCK_NUM * BLCKSZ) / (baserel->reltarget->width + MAXALIGN(offsetof(HeapTupleHeaderData, t_bits))); } /* Estimate baserel size as best we can with local statistics. */ set_baserel_size_estimates(root, baserel); /* Fill in basically-bogus cost estimates for use later. */ gc_estimate_path_cost_size( root, baserel, NIL, NIL, &fpinfo->rows, &fpinfo->width, &fpinfo->startup_cost, &fpinfo->total_cost); } /* * Set the name of relation in fpinfo, while we are constructing it here. * It will be used to build the string describing the join relation in * EXPLAIN output. We can't know whether VERBOSE option is specified or * not, so always schema-qualify the foreign table name. */ fpinfo->relation_name = makeStringInfo(); relnamespace = get_namespace_name(get_rel_namespace(foreigntableid)); relname = get_rel_name(foreigntableid); refname = rte->eref->aliasname; const char* quote_relnamespace = (relnamespace != NULL) ? quote_identifier(relnamespace) : "\"Unknown\""; const char* quote_relname = (relname != NULL) ? quote_identifier(relname) : "\"Unknown\""; appendStringInfo(fpinfo->relation_name, "%s.%s", quote_relnamespace, quote_relname); if (*refname && strcmp(refname, relname) != 0) appendStringInfo(fpinfo->relation_name, " %s", quote_identifier(rte->eref->aliasname)); /* No outer and inner relations. */ fpinfo->make_outerrel_subquery = false; fpinfo->make_innerrel_subquery = false; fpinfo->lower_subquery_rels = NULL; /* Set the relation index. */ fpinfo->relation_index = baserel->relid; fpinfo->reloid = foreigntableid; } /* * gcGetForeignPaths * Create possible scan paths for a scan on the foreign table */ static void gcGetForeignPaths(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid) { GcFdwRelationInfo* fpinfo = (GcFdwRelationInfo*)baserel->fdw_private; ForeignPath* path = NULL; /* * Create simplest ForeignScan path node and add it to baserel. This path * corresponds to SeqScan path of regular tables (though depending on what * baserestrict conditions we were able to send to remote, there might * actually be an indexscan happening there). We already did all the work * to estimate cost and size of this path. */ path = create_foreignscan_path(root, baserel, fpinfo->startup_cost, fpinfo->total_cost, NIL, /* no pathkeys */ NULL, /* no outer rel either */ NULL, /* no outer path either */ NIL, /* no fdw_private list */ u_sess->opt_cxt.query_dop); add_path(root, baserel, (Path*)path); } /* * gcGetForeignPlan * Create ForeignScan plan node which implements selected best path */ static ForeignScan* gcGetForeignPlan(PlannerInfo* root, RelOptInfo* foreignrel, Oid foreigntableid, ForeignPath* best_path, List* tlist, List* scan_clauses, Plan* outer_plan) { GcFdwRelationInfo* fpinfo = (GcFdwRelationInfo*)foreignrel->fdw_private; Index scan_relid; List* fdw_private = NIL; List* remote_exprs = NIL; List* local_exprs = NIL; List* params_list = NIL; List* fdw_scan_tlist = NIL; List* fdw_recheck_quals = NIL; List* retrieved_attrs = NIL; StringInfoData sql; ListCell* lc = NULL; if (IS_SIMPLE_REL(foreignrel)) { /* * For base relations, set scan_relid as the relid of the relation. */ scan_relid = foreignrel->relid; /* * In a base-relation scan, we must apply the given scan_clauses. * * Separate the scan_clauses into those that can be executed remotely * and those that can't. baserestrictinfo clauses that were * previously determined to be safe or unsafe by classifyConditions * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything * else in the scan_clauses list will be a join clause, which we have * to check for remote-safety. * * Note: the join clauses we see here should be the exact same ones * previously examined by gcGetForeignPaths. Possibly it'd be * worth passing forward the classification work done then, rather * than repeating it here. * * This code must match "extract_actual_clauses(scan_clauses, false)" * except for the additional decision about remote versus local * execution. */ foreach (lc, scan_clauses) { RestrictInfo* rinfo = lfirst_node(RestrictInfo, lc); /* Ignore any pseudoconstants, they're dealt with elsewhere */ if (rinfo->pseudoconstant) continue; if (list_member_ptr(fpinfo->remote_conds, rinfo)) { remote_exprs = lappend(remote_exprs, rinfo->clause); } else if (list_member_ptr(fpinfo->local_conds, rinfo)) { local_exprs = lappend(local_exprs, rinfo->clause); } else if (is_foreign_expr(root, foreignrel, rinfo->clause)) { remote_exprs = lappend(remote_exprs, rinfo->clause); } else { local_exprs = lappend(local_exprs, rinfo->clause); } } /* * For a base-relation scan, we have to support EPQ recheck, which * should recheck all the remote quals. */ fdw_recheck_quals = remote_exprs; } else { /* * Join relation or upper relation - set scan_relid to 0. */ scan_relid = 0; /* * For a join rel, baserestrictinfo is NIL and we are not considering * parameterization right now, so there should be no scan_clauses for * a joinrel or an upper rel either. */ Assert(!scan_clauses); /* * Instead we get the conditions to apply from the fdw_private * structure. */ remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false); local_exprs = extract_actual_clauses(fpinfo->local_conds, false); /* * We leave fdw_recheck_quals empty in this case, since we never need * to apply EPQ recheck clauses. In the case of a joinrel, EPQ * recheck is handled elsewhere --- see postgresGetForeignJoinPaths(). * If we're planning an upperrel (ie, remote grouping or aggregation) * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be * allowed, and indeed we *can't* put the remote clauses into * fdw_recheck_quals because the unaggregated Vars won't be available * locally. */ /* Build the list of columns to be fetched from the foreign server. */ fdw_scan_tlist = build_tlist_to_deparse(foreignrel); } /* * Build the query string to be sent for execution, and identify * expressions to be sent as parameters. */ initStringInfo(&sql); gcDeparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, remote_exprs, best_path->path.pathkeys, false, &retrieved_attrs, ¶ms_list); /* Remember remote_exprs for possible use by postgresPlanDirectModify */ fpinfo->final_remote_exprs = remote_exprs; /* * Build the fdw_private list that will be available to the executor. * Items in the list must match order in enum FdwScanPrivateIndex. * index: * FdwScanPrivateSelectSql, FdwScanPrivateRetrievedAttrs, FdwScanPrivateFetchSize, */ fdw_private = list_make3(makeString(sql.data), retrieved_attrs, makeInteger(fpinfo->fetch_size)); /* FdwScanPrivateRemoteInfo */ PgFdwRemoteInfo* remote_info = makeNode(PgFdwRemoteInfo); remote_info->snapsize = fpinfo->remote_info.snapsize; remote_info->snapshot = (Snapshot)palloc0(remote_info->snapsize); GcFdwCopyRemoteInfo(remote_info, &fpinfo->remote_info); fdw_private = lappend(fdw_private, remote_info); /* FdwScanPrivateStrTargetlist */ fdw_private = lappend(fdw_private, NIL); /* FdwScanPrivateAggResultTargetlist */ fdw_private = lappend(fdw_private, NIL); /* FdwScanPrivateAggScanTargetlist */ fdw_private = lappend(fdw_private, NIL); /* FdwScanPrivateAggColmap */ fdw_private = lappend(fdw_private, NIL); /* FdwScanPrivateRemoteQuals */ fdw_private = lappend(fdw_private, remote_exprs); /* FdwScanPrivateParamList */ fdw_private = lappend(fdw_private, params_list); if (IS_JOIN_REL(foreignrel)) fdw_private = lappend(fdw_private, makeString(fpinfo->relation_name->data)); /* * Create the ForeignScan node for the given relation. * * Note that the remote parameter expressions are stored in the fdw_exprs * field of the finished plan node; we can't keep them in private state * because then they wouldn't be subject to later planner processing. */ return make_foreignscan(tlist, local_exprs, scan_relid, params_list, fdw_private, NIL, NIL, NULL, EXEC_ON_DATANODES); } /* * gcBeginForeignScan * Initiate an executor scan of a foreign openGauss table. */ static void gcBeginForeignScan(ForeignScanState* node, int eflags) { ForeignScan* fsplan = (ForeignScan*)node->ss.ps.plan; EState* estate = node->ss.ps.state; GcFdwScanState* fsstate = NULL; RangeTblEntry* rte = NULL; ForeignTable* table = NULL; int rtindex; int numParams; /* * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. */ if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; /* * We'll save private state in node->fdw_state. */ fsstate = (GcFdwScanState*)palloc0(sizeof(GcFdwScanState)); node->fdw_state = (void*)fsstate; /* * Identify which user to do the remote access as. This should match what * ExecCheckRTEPerms() does. In case of a join or aggregate, use the * lowest-numbered member RTE as a representative; we would get the same * result from any. */ Assert(fsplan->scan.scanrelid > 0); rtindex = fsplan->scan.scanrelid; rte = rt_fetch(rtindex, estate->es_range_table); /* Get info about foreign table. */ table = GetForeignTable(rte->relid); if (IS_PGXC_COORDINATOR) { return; } /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fsplan->fdw_private, FdwScanPrivateSelectSql)); fsstate->retrieved_attrs = (List*)list_nth(fsplan->fdw_private, FdwScanPrivateRetrievedAttrs); fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private, FdwScanPrivateFetchSize)); fsstate->remoteinfo = (PgFdwRemoteInfo*)list_nth(fsplan->fdw_private, FdwScanPrivateRemoteInfo); /* Create contexts for batches of tuples and per-tuple temp workspace. */ fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, "gc_fdw tuple data", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); fsstate->temp_cxt = AllocSetContextCreate(t_thrd.mem_cxt.msg_mem_cxt, "gc_fdw temporary data", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); /* * Get info we'll need for converting data fetched from the foreign server * into local representation and error reporting during that process. */ List* agg_result_tlist = (List*)list_nth(fsplan->fdw_private, FdwScanPrivateAggResultTargetlist); if (NIL == agg_result_tlist) /* remote sql without agg func */ { fsstate->rel = node->ss.ss_currentRelation; fsstate->tupdesc = RelationGetDescr(fsstate->rel); TupleDesc scan_desc = CreateTemplateTupleDesc(list_length(fsstate->retrieved_attrs), false); TupleTableSlot* slot = node->ss.ss_ScanTupleSlot; for (int i = 0; i < list_length(fsstate->retrieved_attrs); i++) { int attidx = list_nth_int(fsstate->retrieved_attrs, i); Form_pg_attribute attr = &slot->tts_tupleDescriptor->attrs[attidx - 1]; const char* attname = (const char*)(attr->attname.data); TupleDescInitEntry(scan_desc, i + 1, attname, attr->atttypid, attr->atttypmod, 0); } fsstate->scanSlot = MakeSingleTupleTableSlot(scan_desc); fsstate->resultSlot = node->ss.ss_ScanTupleSlot; } else /* remote sql with agg func */ { fsstate->hasagg = true; TupleDesc resultDesc = ExecTypeFromTL(agg_result_tlist, false); if (hasSpecialArrayType(resultDesc)) fsstate->has_array = true; ExecAssignResultType(&node->ss.ps, resultDesc); node->ss.ps.qual = NIL; node->ss.ps.ps_ProjInfo = NULL; if (true == fsstate->has_array) { List* agg_scan_tlist = (List*)list_nth(fsplan->fdw_private, FdwScanPrivateAggScanTargetlist); TupleDesc scanDesc = ExecTypeFromTL(agg_scan_tlist, false); ExecSetSlotDescriptor(node->ss.ss_ScanTupleSlot, resultDesc); fsstate->tupdesc = scanDesc; fsstate->scanSlot = MakeSingleTupleTableSlot(scanDesc); fsstate->resultSlot = node->ss.ss_ScanTupleSlot; } else { ExecSetSlotDescriptor(node->ss.ss_ScanTupleSlot, resultDesc); fsstate->tupdesc = resultDesc; fsstate->scanSlot = node->ss.ss_ScanTupleSlot; fsstate->resultSlot = node->ss.ss_ScanTupleSlot; } fsstate->rel = NULL; } for (int i = 0; i < fsstate->resultSlot->tts_tupleDescriptor->natts; i++) { fsstate->resultSlot->tts_isnull[i] = true; } fsstate->resultSlot->tts_flags &= ~TTS_FLAG_EMPTY; fsstate->scanSlot->tts_flags &= ~TTS_FLAG_EMPTY; fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); /* * Prepare for processing of parameters used in remote query, if any. */ numParams = list_length(fsplan->fdw_exprs); fsstate->numParams = numParams; if (numParams > 0) { prepare_query_params((PlanState *)node, fsplan->fdw_exprs, numParams, &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); } /* get encoding */ fsstate->have_remote_encoding = gcfdw_get_table_encode(table, &fsstate->remote_encoding); /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ fsstate->conn = GetConnection( table->serverid, &fsstate->pgxc_handle, numParams > 0 ? true : false, fsstate->have_remote_encoding); u_sess->pgxc_cxt.gc_fdw_run_version = gcfdw_send_and_fetch_version(fsstate->pgxc_handle); fsstate->serverid = table->serverid; /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); fsstate->cursor_exists = false; if (numParams > 0) { fsstate->IterateForeignScan = gcIteratePBEForeignScan; fsstate->VecIterateForeignScan = gcIteratePBEVecForeignScan; fsstate->ReScanForeignScan = gcReScanPBEForeignScan; } else { fsstate->IterateForeignScan = gcIterateNormalForeignScan; fsstate->VecIterateForeignScan = gcIterateNormalVecForeignScan; fsstate->ReScanForeignScan = gcReScanNormalForeignScan; } /* get datanode information */ fsstate->num_datanode = fsstate->remoteinfo->datanodenum; /* set datanode index */ gcfdw_get_datanode_idx(node); /* send index and snapshot to remote */ gcfdw_send_remote_query_param(fsstate); } /* * gcIterateForeignScan * Retrieve next row from the result set, or clear tuple slot to indicate * EOF. */ static TupleTableSlot* gcIterateForeignScan(ForeignScanState* node) { if (IS_PGXC_COORDINATOR) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("EXECUTE DIRECT cannot execute SELECT query with foreign table on coordinator"))); } GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; return fsstate->IterateForeignScan(node); } /* * postgresIteratelVecForeignScan * Retrieve next VectorBatch from the result set. */ static VectorBatch* gcIterateVecForeignScan(VecForeignScanState* node) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Retrieve next vector batch is not supported for Foreign table."))); return NULL; } /* * gcReScanForeignScan * Restart the scan. */ static void gcReScanForeignScan(ForeignScanState* node) { if (IS_PGXC_COORDINATOR) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("EXECUTE DIRECT cannot execute SELECT query with foreign table on coordinator"))); } GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; return fsstate->ReScanForeignScan(node); } static void postgresConstructResultSlotWithArray(ForeignScanState* node) { GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; ForeignScan* fsplan = (ForeignScan*)node->ss.ps.plan; List* colmap = (List*)list_nth(fsplan->fdw_private, FdwScanPrivateAggColmap); TupleTableSlot* resultSlot = fsstate->resultSlot; TupleTableSlot* scanSlot = fsstate->scanSlot; TupleDesc resultDesc = resultSlot->tts_tupleDescriptor; long scanAttr, resultAttr, map; for (scanAttr = 0, resultAttr = 0; resultAttr < resultDesc->natts; resultAttr++, scanAttr += map) { Assert(list_length(colmap) == resultDesc->natts); Oid typoid = resultDesc->attrs[resultAttr].atttypid; Value* val = (Value*)list_nth(colmap, resultAttr); map = val->val.ival; if (1 == map) /* other type */ { resultSlot->tts_isnull[resultAttr] = scanSlot->tts_isnull[scanAttr]; resultSlot->tts_values[resultAttr] = scanSlot->tts_values[scanAttr]; } else /* array type*/ { const int MAX_TRANSDATUMS = 6; Datum transdatums[MAX_TRANSDATUMS]; ArrayType* result = NULL; long attrnum, i; if (map > MAX_TRANSDATUMS) { ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("colmap value is not correct."))); } if (INT8ARRAYOID == typoid) { Assert(2 == map); attrnum = scanAttr; for (attrnum = scanAttr, i = 0; i < map; i++, attrnum++) transdatums[i] = scanSlot->tts_isnull[attrnum] ? Int64GetDatum(0) : scanSlot->tts_values[attrnum]; result = construct_array(transdatums, map, INT8OID, 8, true, 's'); } else if (FLOAT4ARRAYOID == typoid) { for (attrnum = scanAttr, i = 0; i < map; i++, attrnum++) transdatums[i] = scanSlot->tts_isnull[attrnum] ? Float4GetDatumFast(0) : scanSlot->tts_values[attrnum]; result = construct_array(transdatums, map, FLOAT8OID, sizeof(float8), FLOAT8PASSBYVAL, 'd'); } else if (FLOAT8ARRAYOID == typoid) { for (attrnum = scanAttr, i = 0; i < map; i++, attrnum++) transdatums[i] = scanSlot->tts_isnull[attrnum] ? Float8GetDatumFast(0) : scanSlot->tts_values[attrnum]; result = construct_array(transdatums, map, FLOAT8OID, sizeof(float8), FLOAT8PASSBYVAL, 'd'); } else if (NUMERICARRAY == typoid) { Datum d = Int64GetDatum(0); for (attrnum = scanAttr, i = 0; i < map; i++, attrnum++) transdatums[i] = scanSlot->tts_isnull[attrnum] ? DirectFunctionCall1(int8_numeric, d) : scanSlot->tts_values[attrnum]; result = construct_array(transdatums, map, NUMERICOID, -1, false, 'i'); } else ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("unsupport data type in agg pushdown."))); resultSlot->tts_isnull[resultAttr] = false; resultSlot->tts_values[resultAttr] = PointerGetDatum(result); } } resultSlot->tts_nvalid = resultDesc->natts; resultSlot->tts_flags &= ~TTS_FLAG_EMPTY; } static void postgresMapResultFromScanSlot(ForeignScanState* node) { GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; if (fsstate->hasagg == false) { int j = 0; ListCell* lc = NULL; foreach (lc, fsstate->retrieved_attrs) { int i = lfirst_int(lc); fsstate->resultSlot->tts_isnull[i - 1] = fsstate->scanSlot->tts_isnull[j]; fsstate->resultSlot->tts_values[i - 1] = fsstate->scanSlot->tts_values[j]; j++; } fsstate->resultSlot->tts_nvalid = fsstate->resultSlot->tts_tupleDescriptor->natts; } else { if (false == fsstate->has_array) { // do nothing } else { postgresConstructResultSlotWithArray(node); } } } /* * postgresNormalIterateForeignScan * Retrieve next row from the result set, or clear tuple slot to indicate * EOF. */ static TupleTableSlot* gcIterateNormalForeignScan(ForeignScanState* node) { GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; /* reset tupleslot on the begin */ (void)ExecClearTuple(fsstate->resultSlot); fsstate->resultSlot->tts_flags &= ~TTS_FLAG_EMPTY; TupleTableSlot* slot = node->ss.ss_ScanTupleSlot; /* * If this is the first call after Begin or ReScan, we need to create the * cursor on the remote side. */ if (!fsstate->cursor_exists) { AutoContextSwitch memGuard(fsstate->batch_cxt); fsstate->cursor_exists = true; if (GcFdwCanSkip(fsstate)) { return ExecClearTuple(slot); } fsstate->num_tuples = 0; pgfdw_send_query(fsstate->pgxc_handle, fsstate->query, &fsstate->remotestate); } MemoryContextReset(fsstate->temp_cxt); MemoryContext oldcontext = MemoryContextSwitchTo(fsstate->temp_cxt); if (!PgfdwGetTuples(fsstate->pgxc_handle->dn_conn_count, fsstate->pgxc_handle->datanode_handles, fsstate->remotestate, fsstate->scanSlot)) { fsstate->eof_reached = true; pgfdw_node_report_error(fsstate->remotestate); MemoryContextSwitchTo(oldcontext); return NULL; } fsstate->num_tuples++; postgresMapResultFromScanSlot(node); pgfdw_node_report_error(fsstate->remotestate); MemoryContextSwitchTo(oldcontext); return fsstate->resultSlot; } /* * gcIteratePBEForeignScan * Retrieve next row from the result set, or clear tuple slot to indicate * EOF. */ static TupleTableSlot* gcIteratePBEForeignScan(ForeignScanState* node) { GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; TupleTableSlot* slot = node->ss.ss_ScanTupleSlot; /* * If this is the first call after Begin or ReScan, we need to create the * cursor on the remote side. */ if (!fsstate->cursor_exists) { if (GcFdwCanSkip(fsstate)) { return ExecClearTuple(slot); } create_cursor(node); } /* * Get some more tuples, if we've run out. */ if (fsstate->next_tuple >= fsstate->num_tuples) { /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) { fetch_more_data(node); } /* If we didn't get any tuples, must be end of data. */ if (fsstate->next_tuple >= fsstate->num_tuples) { return ExecClearTuple(slot); } } HeapTuple tuple = fsstate->tuples[fsstate->next_tuple++]; if (fsstate->hasagg && fsstate->has_array) { MemoryContext old = CurrentMemoryContext; MemoryContextSwitchTo(fsstate->batch_cxt); TupleDesc scanDesc = fsstate->scanSlot->tts_tupleDescriptor; TupleDesc resultDesc = fsstate->resultSlot->tts_tupleDescriptor; heap_deform_tuple(tuple, scanDesc, fsstate->scanSlot->tts_values, fsstate->scanSlot->tts_isnull); postgresMapResultFromScanSlot(node); tuple = heap_form_tuple(resultDesc, fsstate->resultSlot->tts_values, fsstate->resultSlot->tts_isnull); MemoryContextSwitchTo(old); } /* * Return the next tuple. */ (void)ExecStoreTuple(tuple, slot, InvalidBuffer, false); return slot; } /* * gcIterateNormalVecForeignScan * Retrieve next VectorBatch from the result set. */ static VectorBatch* gcIterateNormalVecForeignScan(VecForeignScanState* node) { GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; TupleTableSlot* slot = node->ss.ss_ScanTupleSlot; VectorBatch* batch = node->m_pScanBatch; Datum* values = node->m_values; bool* nulls = node->m_nulls; MemoryContext scanMcxt = node->scanMcxt; MemoryContextReset(fsstate->temp_cxt); MemoryContext oldcontext = MemoryContextSwitchTo(fsstate->temp_cxt); /* * If this is the first call after Begin or ReScan, we need to create the * cursor on the remote side. */ if (!fsstate->cursor_exists) { AutoContextSwitch memGuard(fsstate->batch_cxt); fsstate->cursor_exists = true; if (GcFdwCanSkip(fsstate)) { return NULL; } fsstate->num_tuples = 0; pgfdw_send_query(fsstate->pgxc_handle, fsstate->query, &fsstate->remotestate); } MemoryContextSwitchTo(oldcontext); MemoryContextReset(scanMcxt); oldcontext = MemoryContextSwitchTo(scanMcxt); /* init vectorbatch */ batch->Reset(true); /* get vectorbatch from tuple */ for (batch->m_rows = 0; batch->m_rows < BatchMaxSize; batch->m_rows++) { for (int i = 0; i < slot->tts_tupleDescriptor->natts; i++) { slot->tts_isnull[i] = true; } if (!PgfdwGetTuples(fsstate->pgxc_handle->dn_conn_count, fsstate->pgxc_handle->datanode_handles, fsstate->remotestate, fsstate->scanSlot)) { fsstate->eof_reached = true; pgfdw_node_report_error(fsstate->remotestate); break; } pgfdw_node_report_error(fsstate->remotestate); postgresMapResultFromScanSlot(node); values = fsstate->resultSlot->tts_values; nulls = fsstate->resultSlot->tts_isnull; int rows = batch->m_rows; for (int i = 0; i < batch->m_cols; i++) { ScalarVector* vec = &(batch->m_arr[i]); if (nulls[i]) { vec->m_rows++; vec->SetNull(rows); continue; } if (vec->m_desc.encoded) vec->AddVar(values[i], rows); else vec->m_vals[rows] = values[i]; vec->m_rows++; } fsstate->next_tuple++; if (fsstate->next_tuple >= fsstate->num_tuples) { batch->m_rows++; break; } } MemoryContextSwitchTo(oldcontext); return batch; } /* * gcIteratePBEVecForeignScan * Retrieve next VectorBatch from the result set. */ static VectorBatch* gcIteratePBEVecForeignScan(VecForeignScanState* node) { GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; VectorBatch* batch = node->m_pScanBatch; Datum* values = node->m_values; bool* nulls = node->m_nulls; MemoryContext scanMcxt = node->scanMcxt; TupleDesc tupdesc = fsstate->tupdesc; MemoryContext oldMemoryContext = NULL; /* * If this is the first call after Begin or ReScan, we need to create the * cursor on the remote side. */ if (!fsstate->cursor_exists) { if (GcFdwCanSkip(fsstate)) { return NULL; } create_cursor(node); } /* * Get some more tuples, if we've run out. */ if (fsstate->next_tuple >= fsstate->num_tuples) { /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) fetch_more_data(node); /* If we didn't get any tuples, must be end of data. */ if (fsstate->next_tuple >= fsstate->num_tuples) { return NULL; } } MemoryContextReset(scanMcxt); oldMemoryContext = MemoryContextSwitchTo(scanMcxt); /* init vectorbatch */ batch->Reset(true); /* get vectorbatch from tuple */ for (batch->m_rows = 0; batch->m_rows < BatchMaxSize; batch->m_rows++) { HeapTuple tuple = fsstate->tuples[fsstate->next_tuple]; if (fsstate->hasagg && fsstate->has_array) { TupleDesc scanDesc = fsstate->scanSlot->tts_tupleDescriptor; TupleDesc resultDesc = fsstate->resultSlot->tts_tupleDescriptor; heap_deform_tuple(tuple, scanDesc, fsstate->scanSlot->tts_values, fsstate->scanSlot->tts_isnull); postgresMapResultFromScanSlot(node); tuple = heap_form_tuple(resultDesc, fsstate->resultSlot->tts_values, fsstate->resultSlot->tts_isnull); } heap_deform_tuple(tuple, tupdesc, values, nulls); int rows = batch->m_rows; for (int i = 0; i < batch->m_cols; i++) { ScalarVector* vec = &(batch->m_arr[i]); if (nulls[i]) { vec->m_rows++; vec->SetNull(rows); continue; } if (vec->m_desc.encoded) vec->AddVar(values[i], rows); else vec->m_vals[rows] = values[i]; vec->m_rows++; } fsstate->next_tuple++; if (fsstate->next_tuple >= fsstate->num_tuples) { batch->m_rows++; break; } } MemoryContextSwitchTo(oldMemoryContext); return batch; } /* * gcReScanNormalForeignScan * Restart the scan. */ static void gcReScanNormalForeignScan(ForeignScanState* node) { GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; /* If we haven't created the cursor yet, nothing to do. */ if (!fsstate->cursor_exists) return; fsstate->cursor_exists = false; // recontected DirectReleaseConnection(fsstate->conn, fsstate->pgxc_handle, false); fsstate->conn = GetConnection(fsstate->serverid, &fsstate->pgxc_handle, false, fsstate->have_remote_encoding); gcfdw_send_remote_query_param(fsstate); /* Now force a fresh FETCH. */ fsstate->tuples = NULL; fsstate->num_tuples = 0; fsstate->next_tuple = 0; fsstate->eof_reached = false; } /* * gcReScanPBEForeignScan * Restart the scan. */ static void gcReScanPBEForeignScan(ForeignScanState* node) { GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; char sql[64] = {0}; PGresult* res = NULL; /* If we haven't created the cursor yet, nothing to do. */ if (!fsstate->cursor_exists) return; fsstate->cursor_exists = false; errno_t rc = snprintf_s(sql, sizeof(sql), sizeof(sql) - 1, "CLOSE c%u", fsstate->cursor_number); securec_check_ss(rc, "", ""); // recontected DirectReleaseConnection(fsstate->conn, fsstate->pgxc_handle, true); fsstate->conn = GetConnection(fsstate->serverid, &fsstate->pgxc_handle, true, fsstate->have_remote_encoding); gcfdw_send_remote_query_param(fsstate); /* * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = pgfdw_exec_query(fsstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) { pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); } PQclear(res); /* Now force a fresh FETCH. */ fsstate->tuples = NULL; fsstate->num_tuples = 0; fsstate->next_tuple = 0; fsstate->eof_reached = false; } /* * gcEndForeignScan * Finish scanning foreign table and dispose objects used for this scan */ static void gcEndForeignScan(ForeignScanState* node) { if (IS_PGXC_COORDINATOR) { return; } GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ if (fsstate == NULL) return; /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists && fsstate->numParams > 0) { close_cursor(fsstate->conn, fsstate->cursor_number); DirectReleaseConnection(fsstate->conn, fsstate->pgxc_handle, true); } else { DirectReleaseConnection(fsstate->conn, fsstate->pgxc_handle, false); } /* MemoryContexts will be deleted automatically. */ } /* * gcExplainForeignScan * Produce extra output for EXPLAIN of a ForeignScan on a foreign table */ static void gcExplainForeignScan(ForeignScanState* node, ExplainState* es) { List* fdw_private = NIL; char* sql = NULL; fdw_private = ((ForeignScan*)node->ss.ps.plan)->fdw_private; /* * Add remote query, when VERBOSE option is specified. */ if (es->verbose) { sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); if (t_thrd.explain_cxt.explain_perf_mode != EXPLAIN_NORMAL && es->planinfo->m_verboseInfo) { appendStringInfoSpaces(es->planinfo->m_verboseInfo->info_str, 8); appendStringInfo(es->planinfo->m_verboseInfo->info_str, "Remote SQL: %s\n", sql); } else { ExplainPropertyText("Remote SQL", sql, es); } } } static void GetJoinRelCostInfo( const GcFdwRelationInfo *fpinfo, double retrieved_rows, PlannerInfo *root, Cost *startup_cost, Cost *run_cost) { GcFdwRelationInfo *fpinfo_i = NULL; GcFdwRelationInfo *fpinfo_o = NULL; QualCost join_cost; QualCost remote_conds_cost; double nrows; /* For join we expect inner and outer relations set */ Assert(fpinfo->innerrel && fpinfo->outerrel); fpinfo_i = (GcFdwRelationInfo *)fpinfo->innerrel->fdw_private; fpinfo_o = (GcFdwRelationInfo *)fpinfo->outerrel->fdw_private; /* Estimate of number of rows in cross product */ nrows = fpinfo_i->rows * fpinfo_o->rows; /* Clamp retrieved rows estimate to at most size of cross product */ retrieved_rows = Min(retrieved_rows, nrows); /* * The cost of foreign join is estimated as cost of generating * rows for the joining relations + cost for applying quals on the * rows. */ /* Calculate the cost of clauses pushed down to the foreign server */ cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root); /* Calculate the cost of applying join clauses */ cost_qual_eval(&join_cost, fpinfo->joinclauses, root); /* * Startup cost includes startup cost of joining relations and the * startup cost for join and other clauses. We do not include the * startup cost specific to join strategy (e.g. setting up hash * tables) since we do not know what strategy the foreign server * is going to use. */ *startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost; *startup_cost += join_cost.startup; *startup_cost += remote_conds_cost.startup; *startup_cost += fpinfo->local_conds_cost.startup; /* * Run time cost includes: * * 1. Run time cost (total_cost - startup_cost) of relations being * joined * * 2. Run time cost of applying join clauses on the cross product * of the joining relations. * * 3. Run time cost of applying pushed down other clauses on the * result of join * * 4. Run time cost of applying nonpushable other clauses locally * on the result fetched from the foreign server. */ *run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost; *run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost; *run_cost += nrows * join_cost.per_tuple; nrows = clamp_row_est(nrows * fpinfo->joinclause_sel); *run_cost += nrows * remote_conds_cost.per_tuple; *run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; } /* * gc_estimate_path_cost_size * Get cost and size estimates for a foreign scan on given foreign relation * either a base relation or a join between foreign relations or an upper * relation containing foreign relations. * * param_join_conds are the parameterization clauses with outer relations. * pathkeys specify the expected sort order if any for given path being costed. * * The function returns the cost and size estimates in p_row, p_width, * p_startup_cost and p_total_cost variables. */ static void gc_estimate_path_cost_size(PlannerInfo* root, RelOptInfo* foreignrel, List* param_join_conds, List* pathkeys, double* p_rows, int* p_width, Cost* p_startup_cost, Cost* p_total_cost) { GcFdwRelationInfo* fpinfo = (GcFdwRelationInfo*)foreignrel->fdw_private; double rows; double retrieved_rows; int width; Cost startup_cost; Cost total_cost; Cost cpu_per_tuple; /* * estimate rows using whatever statistics we have locally, in a way * similar to ordinary tables. */ { Cost run_cost = 0; /* * We don't support join conditions in this mode (hence, no * parameterized paths can be made). */ Assert(param_join_conds == NIL); /* * Use rows/width estimates made by set_baserel_size_estimates() for * base foreign relations and set_joinrel_size_estimates() for join * between foreign relations. */ rows = foreignrel->rows; width = foreignrel->reltarget->width; /* Back into an estimate of the number of retrieved rows. */ retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); /* * We will come here again and again with different set of pathkeys * that caller wants to cost. We don't need to calculate the cost of * bare scan each time. Instead, use the costs if we have cached them * already. */ if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0) { startup_cost = fpinfo->rel_startup_cost; run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost; } else if (IS_JOIN_REL(foreignrel)) { GetJoinRelCostInfo(fpinfo, retrieved_rows, root, &startup_cost, &run_cost); } else { /* Clamp retrieved rows estimates to at most foreignrel->tuples. */ retrieved_rows = Min(retrieved_rows, foreignrel->tuples); /* * Cost as though this were a seqscan, which is pessimistic. We * effectively imagine the local_conds are being evaluated * remotely, too. */ startup_cost = 0; run_cost = 0; run_cost += u_sess->attr.attr_sql.seq_page_cost * foreignrel->pages; startup_cost += foreignrel->baserestrictcost.startup; cpu_per_tuple = u_sess->attr.attr_sql.cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; run_cost += cpu_per_tuple * foreignrel->tuples; } /* * Without remote estimates, we have no real way to estimate the cost * of generating sorted output. It could be free if the query plan * the remote side would have chosen generates properly-sorted output * anyway, but in most cases it will cost something. Estimate a value * high enough that we won't pick the sorted path when the ordering * isn't locally useful, but low enough that we'll err on the side of * pushing down the ORDER BY clause when it's useful to do so. */ if (pathkeys != NIL) { startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER; run_cost *= DEFAULT_FDW_SORT_MULTIPLIER; } total_cost = startup_cost + run_cost; } /* * Cache the costs for scans without any pathkeys or parameterization * before adding the costs for transferring data from the foreign server. * These costs are useful for costing the join between this relation and * another foreign relation or to calculate the costs of paths with * pathkeys for this relation, when the costs can not be obtained from the * foreign server. This function will be called at least once for every * foreign relation without pathkeys and parameterization. */ if (pathkeys == NIL && param_join_conds == NIL) { fpinfo->rel_startup_cost = startup_cost; fpinfo->rel_total_cost = total_cost; } /* * Add some additional cost factors to account for connection overhead * (fdw_startup_cost), transferring data across the network * (fdw_tuple_cost per retrieved row), and local manipulation of the data * (cpu_tuple_cost per retrieved row). */ startup_cost += fpinfo->fdw_startup_cost; total_cost += fpinfo->fdw_startup_cost; total_cost += fpinfo->fdw_tuple_cost * retrieved_rows; total_cost += u_sess->attr.attr_sql.cpu_tuple_cost * retrieved_rows; /* Return results. */ *p_rows = rows; *p_width = width; *p_startup_cost = startup_cost; *p_total_cost = total_cost; } /* * Create cursor for node's query with current parameter values. */ static void create_cursor(ForeignScanState* node) { GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; ExprContext* econtext = node->ss.ps.ps_ExprContext; int numParams = fsstate->numParams; const char** values = fsstate->param_values; PGconn* conn = fsstate->conn; StringInfoData buf; PGresult* res = NULL; /* * Construct array of query parameter values in text format. We do the * conversions in the short-lived per-tuple context, so as not to cause a * memory leak over repeated scans. */ if (numParams > 0) { MemoryContext oldcontext = NULL; oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); process_query_params(econtext, fsstate->param_flinfo, fsstate->param_exprs, values); MemoryContextSwitchTo(oldcontext); } /* Construct the DECLARE CURSOR command */ initStringInfo(&buf); appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", fsstate->cursor_number, fsstate->query); /* * Notice that we pass NULL for paramTypes, thus forcing the remote server * to infer types for all parameters. Since we explicitly cast every * parameter (see deparse.c), the "inference" is trivial and will produce * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. */ res = PQexecParams(conn, buf.data, numParams, NULL, values, NULL, NULL, 0); if (res == NULL) { pgfdw_report_error(ERROR, NULL, conn, false, buf.data); } /* * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ if (PQresultStatus(res) != PGRES_COMMAND_OK) { pgfdw_report_error(ERROR, res, conn, true, fsstate->query); } PQclear(res); /* Mark the cursor as created, and show no tuples have been retrieved */ fsstate->cursor_exists = true; fsstate->tuples = NULL; fsstate->num_tuples = 0; fsstate->next_tuple = 0; fsstate->eof_reached = false; /* Clean up */ pfree(buf.data); } /* * Fetch some more rows from the node's cursor. */ static void fetch_more_data(ForeignScanState* node) { GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; PGresult* volatile res = NULL; MemoryContext oldcontext = NULL; /* * We'll store the tuples in the batch_cxt. First, flush the previous * batch. */ fsstate->tuples = NULL; MemoryContextReset(fsstate->batch_cxt); oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { PGconn* conn = fsstate->conn; char sql[64] = {0}; int numrows; int i; errno_t rc = snprintf_s( sql, sizeof(sql), sizeof(sql) - 1, "FETCH %d FROM c%u", fsstate->fetch_size, fsstate->cursor_number); securec_check_ss(rc, "", ""); res = pgfdw_exec_query(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) { pgfdw_report_error(ERROR, res, conn, false, fsstate->query); } /* Convert the data into HeapTuples */ numrows = PQntuples(res); if (numrows == 0) { fsstate->tuples = NULL; } else { fsstate->tuples = (HeapTuple*)palloc0(numrows * sizeof(HeapTuple)); } fsstate->num_tuples = numrows; fsstate->next_tuple = 0; for (i = 0; i < numrows; i++) { Assert(IsA(node->ss.ps.plan, ForeignScan) || IsA(node->ss.ps.plan, VecForeignScan)); if (!fsstate->hasagg) { fsstate->tuples[i] = make_tuple_from_result_row( res, i, fsstate->rel, fsstate->attinmeta, fsstate->retrieved_attrs, node, fsstate->temp_cxt); } else { fsstate->tuples[i] = make_tuple_from_agg_result( res, i, fsstate->rel, fsstate->attinmeta, fsstate->retrieved_attrs, node, fsstate->temp_cxt); } } /* Must be EOF if we didn't get as many tuples as we asked for. */ fsstate->eof_reached = (numrows < fsstate->fetch_size); PQclear(res); res = NULL; } PG_CATCH(); { if (res != NULL) { PQclear(res); } PG_RE_THROW(); } PG_END_TRY(); MemoryContextSwitchTo(oldcontext); } /* * Fetch remote version */ void pgfdw_fetch_remote_version(PGconn* conn) { PGresult* volatile res = NULL; /* PGresult must be released before leaving this function. */ PG_TRY(); { char sql[64] = {0}; int numrows; int result_count = 0; errno_t rc = snprintf_s(sql, sizeof(sql), sizeof(sql) - 1, "SELECT count(1) FROM pg_extension WHERE extname='gc_fdw';"); securec_check_ss(rc, "", ""); res = pgfdw_exec_query(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, NULL); /* Convert the data into HeapTuples */ numrows = PQntuples(res); result_count = atoi(PQgetvalue(res, 0, 0)); if (result_count <= 0) { ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND), errmsg("gc_fdw: the remote version is not match."))); } PQclear(res); res = NULL; } PG_CATCH(); { if (res != NULL) PQclear(res); PG_RE_THROW(); } PG_END_TRY(); } /* * Fetch remote version. */ static int gcfdw_send_and_fetch_version(PGXCNodeAllHandles* pgxc_handles) { StringInfoData retbuf; initStringInfo(&retbuf); pq_sendint(&retbuf, GCFDW_VERSION, 4); PgFdwRemoteSender(pgxc_handles, retbuf.data, retbuf.len, PGFDW_GET_VERSION); int version = 0; PgFdwRemoteReceiver(pgxc_handles, &version, sizeof(PgFdwRemoteInfo)); Assert(version >= GCFDW_VERSION_V1R8C10); pfree(retbuf.data); retbuf.data = NULL; return version; } /* * Fetch remote table information. */ static void gcfdw_fetch_remote_table_info( PGXCNodeAllHandles* pgxc_handles, ForeignTable* table, void* remote_info, PgFdwMessageTag tag) { const char* nspname = NULL; const char* relname = NULL; ListCell* lc = NULL; /* * Use value of FDW options if any, instead of the name of object itself. */ foreach (lc, table->options) { DefElem* def = (DefElem*)lfirst(lc); if (strcmp(def->defname, "schema_name") == 0) nspname = defGetString(def); else if (strcmp(def->defname, "table_name") == 0) relname = defGetString(def); } Relation rel = heap_open(table->relid, AccessShareLock); if (nspname == NULL) nspname = get_namespace_name(RelationGetNamespace(rel)); if (relname == NULL) relname = RelationGetRelationName(rel); Assert(nspname != NULL && relname != NULL); int nsplen = strlen(nspname); int rellen = strlen(relname); /* * write scheme name and table name. */ StringInfoData retbuf; initStringInfo(&retbuf); pq_sendint(&retbuf, nsplen, 1); pq_sendbytes(&retbuf, nspname, nsplen); pq_sendint(&retbuf, rellen, 1); pq_sendbytes(&retbuf, relname, rellen); /* * write column name and column type name. */ TupleDesc tupdesc = RelationGetDescr(rel); int att_name_len; int type_name_len; char* type_name = NULL; pq_sendint(&retbuf, tupdesc->natts, 4); for (int i = 0; i < tupdesc->natts; i++) { att_name_len = strlen(tupdesc->attrs[i].attname.data); pq_sendint(&retbuf, att_name_len, 4); pq_sendbytes(&retbuf, tupdesc->attrs[i].attname.data, att_name_len); Assert(InvalidOid != tupdesc->attrs[i].atttypid); type_name = get_typename(tupdesc->attrs[i].atttypid); type_name_len = strlen(type_name); pq_sendint(&retbuf, type_name_len, 4); pq_sendbytes(&retbuf, type_name, type_name_len); pq_sendint(&retbuf, tupdesc->attrs[i].atttypmod, 4); pfree(type_name); } relation_close(rel, AccessShareLock); PgFdwRemoteSender(pgxc_handles, retbuf.data, retbuf.len, tag); if (tag == PGFDW_GET_TABLE_INFO) { PgFdwRemoteInfo* info = (PgFdwRemoteInfo*)remote_info; PgFdwRemoteReceiver(pgxc_handles, info, sizeof(PgFdwRemoteInfo)); } else /* For analyze */ { PGFDWTableAnalyze* info = (PGFDWTableAnalyze*)remote_info; PgFdwRemoteReceiver(pgxc_handles, info, sizeof(PGFDWTableAnalyze)); } pfree(retbuf.data); retbuf.data = NULL; } /* * get foreign table encode from option */ static bool gcfdw_get_table_encode(ForeignTable* table, int* remote_encoding) { bool have_remote_encoding = false; ListCell* lc = NULL; DefElem* def = NULL; foreach (lc, table->options) { def = (DefElem*)lfirst(lc); if (strcmp(def->defname, "encoding") == 0) { *remote_encoding = pg_char_to_encoding(defGetString(def)); if (*remote_encoding == -1) { ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("%s is not a valid encoding name", defGetString(def)))); } have_remote_encoding = true; } } return have_remote_encoding; } /* * Send remote encode information. */ static void gcfdw_send_remote_encode(PGXCNodeAllHandles* pgxc_handles, int remote_encoding) { StringInfoData retbuf; initStringInfo(&retbuf); pq_sendint(&retbuf, GetDatabaseEncoding(), 4); pq_sendint(&retbuf, remote_encoding, 4); PgFdwRemoteSender(pgxc_handles, retbuf.data, retbuf.len, PGFDW_GET_ENCODE); PgFdwRemoteReceiver(pgxc_handles, NULL, 0); pfree(retbuf.data); retbuf.data = NULL; } /* * Send remote query param. */ static void gcfdw_send_remote_query_param(GcFdwScanState* fsstate) { if (u_sess->pgxc_cxt.gc_fdw_run_version >= GCFDW_VERSION_V1R8C10_1 && fsstate->have_remote_encoding == true) { gcfdw_send_remote_encode(fsstate->pgxc_handle, fsstate->remote_encoding); } StringInfoData retbuf; initStringInfo(&retbuf); pq_sendint(&retbuf, fsstate->current_idx, 4); pq_sendint(&retbuf, fsstate->cycle_idx, 4); PgFdwSendSnapshot(&retbuf, fsstate->remoteinfo->snapshot, fsstate->remoteinfo->snapsize); PgFdwRemoteSender(fsstate->pgxc_handle, retbuf.data, retbuf.len, PGFDW_QUERY_PARAM); PgFdwRemoteReceiver(fsstate->pgxc_handle, NULL, 0); pfree(retbuf.data); retbuf.data = NULL; } /* * Get datanode idx. */ static void gcfdw_get_datanode_idx(ForeignScanState* node) { Plan* fsplan = node->ss.ps.plan; GcFdwScanState* fsstate = (GcFdwScanState*)node->fdw_state; if (IS_PGXC_DATANODE) { fsstate->max_idx = fsstate->num_datanode; fsstate->cycle_idx = list_length(fsplan->exec_nodes->nodeList) * SET_DOP(fsplan->dop); fsstate->current_idx = 0; bool found = false; ListCell* lc = NULL; foreach (lc, fsplan->exec_nodes->nodeList) { if (lfirst_int(lc) == u_sess->pgxc_cxt.PGXCNodeId) { found = true; break; } fsstate->current_idx++; } if (found) { fsstate->current_idx = u_sess->stream_cxt.smp_id * list_length(fsplan->exec_nodes->nodeList) + fsstate->current_idx; } else { fsstate->current_idx = -1; } } else { fsstate->max_idx = 1; fsstate->cycle_idx = 1; fsstate->current_idx = 0; } Assert(fsstate->current_idx < fsstate->cycle_idx); } /* * Force assorted GUC parameters to settings that ensure that we'll output * data values in a form that is unambiguous to the remote server. * * This is rather expensive and annoying to do once per row, but there's * little choice if we want to be sure values are transmitted accurately; * we can't leave the settings in place between rows for fear of affecting * user-visible computations. * * We use the equivalent of a function SET option to allow the settings to * persist only until the caller calls reset_transmission_modes(). If an * error is thrown in between, guc.c will take care of undoing the settings. * * The return value is the nestlevel that must be passed to * reset_transmission_modes() to undo things. */ int set_transmission_modes(void) { int nestlevel = NewGUCNestLevel(); /* * The values set here should match what pg_dump does. See also * configure_remote_session in connection.c. */ if (u_sess->time_cxt.DateStyle != USE_ISO_DATES) (void)set_config_option("datestyle", "ISO", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false); if (u_sess->attr.attr_common.IntervalStyle != INTSTYLE_POSTGRES) (void)set_config_option( "intervalstyle", "postgres", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false); if (u_sess->attr.attr_common.extra_float_digits < 3) (void)set_config_option("extra_float_digits", "3", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false); return nestlevel; } /* * Undo the effects of set_transmission_modes(). */ void reset_transmission_modes(int nestlevel) { AtEOXact_GUC(true, nestlevel); } /* * Utility routine to close a cursor. */ static void close_cursor(PGconn* conn, unsigned int cursor_number) { char sql[64] = {0}; PGresult* res = NULL; errno_t rc = snprintf_s(sql, sizeof(sql), sizeof(sql) - 1, "CLOSE c%u", cursor_number); securec_check_ss(rc, "", ""); /* * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = pgfdw_exec_query(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); } /* * Prepare for processing of parameters used in remote query. */ static void prepare_query_params(PlanState* node, List* fdw_exprs, int numParams, FmgrInfo** param_flinfo, List** param_exprs, const char*** param_values) { int i = 0; ListCell* lc = NULL; Assert(numParams > 0); /* Prepare for output conversion of parameters used in remote query. */ *param_flinfo = (FmgrInfo *)palloc0(sizeof(FmgrInfo) * numParams); foreach (lc, fdw_exprs) { Node *param_expr = (Node *)lfirst(lc); Oid typefnoid = InvalidOid; bool isvarlena = false; getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); fmgr_info(typefnoid, &(*param_flinfo)[i]); i++; } /* * Prepare remote-parameter expressions for evaluation. (Note: in * practice, we expect that all these expressions will be just Params, so * we could possibly do something more efficient than using the full * expression-eval machinery for this. But probably there would be little * benefit, and it'd require gc_fdw to know more than is desirable * about Param evaluation.) */ *param_exprs = ExecInitExprList(fdw_exprs, node); /* Allocate buffer for text form of query parameters. */ *param_values = (const char **)palloc0(numParams * sizeof(char *)); } /* * Construct array of query parameter values in text format. */ static void process_query_params( ExprContext* econtext, FmgrInfo* param_flinfo, List* param_exprs, const char** param_values) { int i = 0; ListCell* lc = NULL; int nestlevel = set_transmission_modes(); foreach (lc, param_exprs) { ExprState *expr_state = (ExprState *)lfirst(lc); Datum expr_value; bool isNull = false; /* Evaluate the parameter expression */ expr_value = ExecEvalExpr(expr_state, econtext, &isNull); /* * Get string representation of each parameter value by invoking * type-specific output function, unless the value is null. */ if (isNull) { param_values[i] = NULL; } else { param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value); } i++; } reset_transmission_modes(nestlevel); } /* * gcAnalyzeForeignTable * Test whether analyzing this foreign table is supported */ static bool gcAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc* func, BlockNumber* totalpages, void* additionalData, bool estimate_table_rownum) { PGFDWTableAnalyze* info = (PGFDWTableAnalyze*)additionalData; Oid relid = info->relid; ForeignTable* table = GetForeignTable(relid); int remote_encoding; bool have_remote_encoding = gcfdw_get_table_encode(table, &remote_encoding); (void)GetConnection(table->serverid, &info->pgxc_handles, false, have_remote_encoding); u_sess->pgxc_cxt.gc_fdw_run_version = gcfdw_send_and_fetch_version(info->pgxc_handles); if (u_sess->pgxc_cxt.gc_fdw_run_version >= GCFDW_VERSION_V1R8C10_1 && have_remote_encoding == true) { gcfdw_send_remote_encode(info->pgxc_handles, remote_encoding); } gcfdw_fetch_remote_table_info(info->pgxc_handles, table, info, PGFDW_ANALYZE_TABLE); return true; } /* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is * conversion data for the rel's tupdesc, and retrieved_attrs is an * integer list of the table column numbers present in the PGresult. * temp_context is a working context that can be reset after each tuple. */ static HeapTuple make_tuple_from_result_row(PGresult* res, int row, Relation rel, AttInMetadata* attinmeta, List* retrieved_attrs, ForeignScanState* fsstate, MemoryContext temp_context) { HeapTuple tuple = NULL; TupleDesc tupdesc = NULL; Datum* values = NULL; bool* nulls = NULL; ItemPointer ctid = NULL; Oid oid = InvalidOid; ConversionLocation errpos; ErrorContextCallback errcallback; MemoryContext oldcontext = NULL; ListCell* lc = NULL; int j = 0; Assert(row < PQntuples(res)); /* * Do the following work in a temp context that we reset after each tuple. * This cleans up not only the data we have direct access to, but any * cruft the I/O functions might leak. */ oldcontext = MemoryContextSwitchTo(temp_context); if (rel) { tupdesc = RelationGetDescr(rel); } else { GcFdwScanState *fdw_sstate = NULL; Assert(fsstate != NULL); fdw_sstate = (GcFdwScanState *)fsstate->fdw_state; tupdesc = fdw_sstate->tupdesc; } values = (Datum*)palloc0(tupdesc->natts * sizeof(Datum)); nulls = (bool*)palloc(tupdesc->natts * sizeof(bool)); /* Initialize to nulls for any columns not present in result */ errno_t rc = memset_s(nulls, tupdesc->natts * sizeof(bool), true, tupdesc->natts * sizeof(bool)); securec_check(rc, "\0", "\0"); /* * Set up and install callback to report where conversion error occurs. */ errpos.rel = rel; errpos.cur_attno = 0; errpos.fsstate = fsstate; errcallback.callback = conversion_error_callback; errcallback.arg = (void*)&errpos; errcallback.previous = t_thrd.log_cxt.error_context_stack; t_thrd.log_cxt.error_context_stack = &errcallback; /* * i indexes columns in the relation, j indexes columns in the PGresult. */ foreach (lc, retrieved_attrs) { int i = lfirst_int(lc); char* valstr = NULL; /* fetch next column's textual value */ if (PQgetisnull(res, row, j)) { valstr = NULL; } else { valstr = PQgetvalue(res, row, j); } /* * convert value to internal representation * * Note: we ignore system columns other than ctid and oid in result */ errpos.cur_attno = i; if (i > 0) { /* ordinary column */ Assert(i <= tupdesc->natts); if (valstr == NULL) { nulls[i - 1] = true; } else { nulls[i - 1] = false; } /* Apply the input function even to nulls, to support domains */ values[i - 1] = InputFunctionCall( &attinmeta->attinfuncs[i - 1], valstr, attinmeta->attioparams[i - 1], attinmeta->atttypmods[i - 1]); } else if (i == SelfItemPointerAttributeNumber) { /* ctid */ if (valstr != NULL) { Datum datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); ctid = (ItemPointer)DatumGetPointer(datum); } } else if (i == ObjectIdAttributeNumber) { /* oid */ if (valstr != NULL) { Datum datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr)); oid = DatumGetObjectId(datum); } } errpos.cur_attno = 0; j++; } /* Uninstall error context callback. */ t_thrd.log_cxt.error_context_stack = errcallback.previous; /* * Check we got the expected number of columns. Note: j == 0 and * PQnfields == 1 is expected, since deparse emits a NULL if no columns. */ if (j > 0 && j != PQnfields(res)) ereport(ERROR, (errcode(ERRCODE_OPERATE_RESULT_NOT_EXPECTED), errmsg("remote query result does not match the foreign table"))); /* * Build the result tuple in caller's memory context. */ MemoryContextSwitchTo(oldcontext); tuple = heap_form_tuple(tupdesc, values, nulls); /* * If we have a CTID to return, install it in both t_self and t_ctid. * t_self is the normal place, but if the tuple is converted to a * composite Datum, t_self will be lost; setting t_ctid allows CTID to be * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code). */ if (ctid) tuple->t_self = tuple->t_data->t_ctid = *ctid; /* * Stomp on the xmin, xmax, and cmin fields from the tuple created by * heap_form_tuple. heap_form_tuple actually creates the tuple with * DatumTupleFields, not HeapTupleFields, but the executor expects * HeapTupleFields and will happily extract system columns on that * assumption. If we don't do this then, for example, the tuple length * ends up in the xmin field, which isn't what we want. */ /* * If we have an OID to return, install it. */ if (OidIsValid(oid)) HeapTupleSetOid(tuple, oid); /* Clean up */ MemoryContextReset(temp_context); return tuple; } static HeapTuple make_tuple_from_agg_result(PGresult* res, int row, Relation rel, AttInMetadata* attinmeta, List* retrieved_attrs, ForeignScanState* fsstate, MemoryContext temp_context) { HeapTuple tuple = NULL; TupleDesc tupdesc = NULL; Datum* values = NULL; bool* nulls = NULL; MemoryContext oldcontext = NULL; int j = 0; Assert(row < PQntuples(res)); /* * Do the following work in a temp context that we reset after each tuple. * This cleans up not only the data we have direct access to, but any * cruft the I/O functions might leak. */ oldcontext = MemoryContextSwitchTo(temp_context); GcFdwScanState *fdw_sstate = NULL; Assert(fsstate != NULL); fdw_sstate = (GcFdwScanState *)fsstate->fdw_state; tupdesc = fdw_sstate->tupdesc; values = (Datum *)palloc0(tupdesc->natts * sizeof(Datum)); nulls = (bool *)palloc(tupdesc->natts * sizeof(bool)); /* Initialize to nulls for any columns not present in result */ errno_t rc = memset_s(nulls, tupdesc->natts * sizeof(bool), true, tupdesc->natts * sizeof(bool)); securec_check(rc, "\0", "\0"); /* * i indexes columns in the relation, j indexes columns in the PGresult. */ for (int i = 1; i <= tupdesc->natts; i++) { char *valstr = NULL; /* fetch next column's textual value */ if (PQgetisnull(res, row, j)) { valstr = NULL; } else { valstr = PQgetvalue(res, row, j); } nulls[i - 1] = (valstr == NULL); /* Apply the input function even to nulls, to support domains */ values[i - 1] = InputFunctionCall( &attinmeta->attinfuncs[i - 1], valstr, attinmeta->attioparams[i - 1], attinmeta->atttypmods[i - 1]); j++; } /* * Check we got the expected number of columns. Note: j == 0 and * PQnfields == 1 is expected, since deparse emits a NULL if no columns. */ if (j > 0 && j != PQnfields(res)) ereport(ERROR, (errcode(ERRCODE_OPERATE_RESULT_NOT_EXPECTED), errmsg("remote query result does not match the foreign table"))); /* * Build the result tuple in caller's memory context. */ MemoryContextSwitchTo(oldcontext); tuple = heap_form_tuple(tupdesc, values, nulls); /* * Stomp on the xmin, xmax, and cmin fields from the tuple created by * heap_form_tuple. heap_form_tuple actually creates the tuple with * DatumTupleFields, not HeapTupleFields, but the executor expects * HeapTupleFields and will happily extract system columns on that * assumption. If we don't do this then, for example, the tuple length * ends up in the xmin field, which isn't what we want. */ /* Clean up */ MemoryContextReset(temp_context); return tuple; } /* * Callback function which is called when error occurs during column value * conversion. Print names of column and relation. */ static void conversion_error_callback(void *arg) { const char *attname = NULL; const char *relname = NULL; bool is_wholerow = false; ConversionLocation *errpos = (ConversionLocation *)arg; if (errpos->rel) { /* error occurred in a scan against a foreign table */ TupleDesc tupdesc = RelationGetDescr(errpos->rel); if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) { attname = NameStr(tupdesc->attrs[errpos->cur_attno - 1].attname); } else if (errpos->cur_attno == SelfItemPointerAttributeNumber) { attname = "ctid"; } else if (errpos->cur_attno == ObjectIdAttributeNumber) { attname = "oid"; } relname = RelationGetRelationName(errpos->rel); } else { /* error occurred in a scan against a foreign join */ errcontext("processing expression at position %d in select list", errpos->cur_attno); } if (relname != NULL) { if (is_wholerow) { errcontext("whole-row reference to foreign table \"%s\"", relname); } else if (attname != NULL) { errcontext("column \"%s\" of foreign table \"%s\"", attname, relname); } } } /* * Find an equivalence class member expression, all of whose Vars, come from * the indicated relation. */ extern Expr* find_em_expr_for_rel(EquivalenceClass* ec, RelOptInfo* rel) { ListCell* lc_em = NULL; foreach (lc_em, ec->ec_members) { EquivalenceMember* em = (EquivalenceMember*)lfirst(lc_em); if (bms_is_subset(em->em_relids, rel->relids)) { /* * If there is more than one equivalence member whose Vars are * taken entirely from this relation, we'll be content to choose * any one of those. */ return em->em_expr; } } /* We didn't find any suitable equivalence class expression */ return NULL; } /* * gcValidateTableDef * Check create/alter foreign table */ static void gcValidateTableDef(Node* Obj) { if (NULL == Obj) return; switch (nodeTag(Obj)) { case T_AlterTableStmt: { List* cmds = ((AlterTableStmt*)Obj)->cmds; ListCell* lcmd = NULL; foreach (lcmd, cmds) { AlterTableCmd* cmd = (AlterTableCmd*)lfirst(lcmd); if (cmd->subtype != AT_AlterColumnType && cmd->subtype != AT_GenericOptions && cmd->subtype != AT_AlterColumnGenericOptions && cmd->subtype != AT_ChangeOwner && cmd->subtype != AT_AddNodeList && cmd->subtype != AT_DeleteNodeList && cmd->subtype != AT_UpdateSliceLike) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmodule(MOD_COOP_ANALYZE), errmsg("Un-support feature"), errdetail("target table is a foreign table"))); } if (cmd->subtype == AT_GenericOptions && nodeTag(cmd->def) == T_List) { List* defs = (List*)cmd->def; ListCell* lc = NULL; foreach (lc, defs) { DefElem* def = (DefElem*)lfirst(lc); if (def->defaction == DEFELEM_ADD || def->defaction == DEFELEM_DROP) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmodule(MOD_COOP_ANALYZE), errmsg("Un-support feature"), errdetail("target table is a foreign table"))); } } } } break; } case T_CreateForeignTableStmt: { DistributeBy* DisByOp = ((CreateStmt*)Obj)->distributeby; /* Start a node by isRestoreMode when adding a node, do not regist distributeby * info for pgxc_class. So distributeby clause is not needed. */ if (IS_PGXC_COORDINATOR && !isRestoreMode) { if (NULL == DisByOp) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmodule(MOD_COOP_ANALYZE), errmsg("Need DISTRIBUTE BY clause for the foreign table."))); } if (DISTTYPE_ROUNDROBIN != DisByOp->disttype) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmodule(MOD_COOP_ANALYZE), errmsg("Unsupport distribute type."), errdetail("Supported option values are \"roundrobin\"."))); } if (((CreateForeignTableStmt*)Obj)->part_state) { /* as for gc_fdw foreign table, unsupport parition table. */ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("The gc_fdw partition foreign table is not supported."))); } } /*if the error_relation exists for a gc_fdw foreign table ,the warning reminds that it is not support*/ if (((CreateForeignTableStmt*)Obj)->error_relation != NULL && IsA(((CreateForeignTableStmt*)Obj)->error_relation, RangeVar)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmodule(MOD_COOP_ANALYZE), errmsg("The error_relation of the foreign table is not support."))); } /* check write only */ if (((CreateForeignTableStmt*)Obj)->write_only) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmodule(MOD_COOP_ANALYZE), errmsg("Unsupport write only."))); } /* check optLogRemote and optRejectLimit */ if (CheckForeignExtOption(((CreateForeignTableStmt*)Obj)->extOptions, optLogRemote)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmodule(MOD_COOP_ANALYZE), errmsg("The REMOTE LOG of gc_fdw foreign table is not support."))); } if (CheckForeignExtOption(((CreateForeignTableStmt*)Obj)->extOptions, optRejectLimit)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmodule(MOD_COOP_ANALYZE), errmsg("The PER NODE REJECT LIMIT of gc_fdw foreign table is not support."))); } break; } default: ereport(ERROR, (errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE), errmodule(MOD_COOP_ANALYZE), errmsg("unrecognized node type: %d", (int)nodeTag(Obj)))); } } /* * @Description: check log_remote and reject_limit of extOption * @IN OptList: The foreign table option list. * @Return: true if log_remote or reject_limit, false for other * @See also: */ static bool CheckForeignExtOption(List* extOptList, const char* str) { bool CheckExtOption = false; ListCell* cell = NULL; foreach (cell, extOptList) { DefElem* Opt = (DefElem*)lfirst(cell); if (0 == pg_strcasecmp(Opt->defname, str)) { CheckExtOption = true; break; } } return CheckExtOption; } static bool GcFdwCanSkip(GcFdwScanState* fsstate) { if (fsstate->current_idx == -1 || fsstate->current_idx >= fsstate->max_idx) { return true; } if (fsstate->remoteinfo->reltype == LOCATOR_TYPE_REPLICATED && fsstate->current_idx != 0) { return true; } return false; } static void GcFdwCopyRemoteInfo(PgFdwRemoteInfo* new_remote_info, PgFdwRemoteInfo* ori_remote_info) { new_remote_info->reltype = ori_remote_info->reltype; new_remote_info->datanodenum = ori_remote_info->datanodenum; errno_t rc = memcpy_s((char*)new_remote_info->snapshot, new_remote_info->snapsize, (char*)ori_remote_info->snapshot, ori_remote_info->snapsize); if (rc != 0) { Assert(0); } securec_check_c(rc, "\0", "\0"); } bool hasSpecialArrayType(TupleDesc desc) { for (int i = 0; i < desc->natts; i++) { Oid typoid = desc->attrs[i].atttypid; if (INT8ARRAYOID == typoid || FLOAT8ARRAYOID == typoid || FLOAT4ARRAYOID == typoid || NUMERICARRAY == typoid) { return true; } } return false; } // end of file