/* ------------------------------------------------------------------------- * * postgres_fdw.c * Foreign-data wrapper for remote openGauss servers * * Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd. * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group * * IDENTIFICATION * contrib/postgres_fdw/postgres_fdw.c * * ------------------------------------------------------------------------- */ #include "postgres.h" #include "postgres_fdw.h" #include "access/htup.h" #include "access/sysattr.h" #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/cost.h" #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" #include "optimizer/prep.h" #include "optimizer/restrictinfo.h" #include "optimizer/var.h" #include "optimizer/tlist.h" #include "parser/parsetree.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "knl/knl_guc.h" #include "access/tableam.h" #include "internal_interface.h" PG_MODULE_MAGIC; /* Default CPU cost to start up a foreign query. */ #define DEFAULT_FDW_STARTUP_COST 100.0 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */ #define DEFAULT_FDW_TUPLE_COST 0.01 /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 /* * Indexes of FDW-private information stored in fdw_private lists. * * We store various information in ForeignScan.fdw_private to pass it from * planner to executor. Currently we store: * * 1) SELECT statement text to be sent to the remote server * 2) Integer list of attribute numbers retrieved by the SELECT * * These items are indexed with the enum FdwScanPrivateIndex, so an item * can be fetched with list_nth(). For example, to get the SELECT * statement: sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)) */ enum FdwScanPrivateIndex { /* SQL statement to execute remotely (as a String node) */ FdwScanPrivateSelectSql, /* Integer list of attribute numbers retrieved by the SELECT */ FdwScanPrivateRetrievedAttrs, /* Integer representing the desired fetch_size */ FdwScanPrivateFetchSize, /* * String describing join i.e. names of relations being joined and types * of join, added when the scan is join */ FdwScanPrivateRelations }; /* * Similarly, this enum describes what's kept in the fdw_private list for * a ModifyTable node referencing a postgres_fdw foreign table. We store: * * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server * 2) Integer list of target attribute numbers for INSERT/UPDATE * (NIL for a DELETE) * 3) Boolean flag showing if the remote query has a RETURNING clause * 4) Integer list of attribute numbers retrieved by RETURNING, if any */ enum FdwModifyPrivateIndex { /* SQL statement to execute remotely (as a String node) */ FdwModifyPrivateUpdateSql, /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, /* has-returning flag (as an integer Value node) */ FdwModifyPrivateHasReturning, /* Integer list of attribute numbers retrieved by RETURNING */ FdwModifyPrivateRetrievedAttrs }; /* * Execution state of a foreign scan using postgres_fdw. */ typedef struct PgFdwScanState { Relation rel; /* relcache entry for the foreign table */ TupleDesc tupdesc; /* tuple descriptor of scan */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* extracted fdw_private data */ char *query; /* text of SELECT command */ List *retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ PGconn *conn; /* connection for the scan */ unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ FmgrInfo *param_flinfo; /* output conversion functions for them */ List *param_exprs; /* executable expressions for param values */ const char **param_values; /* textual values of query parameters */ /* for storing result tuples */ HeapTuple *tuples; /* array of currently-retrieved tuples */ int num_tuples; /* # of tuples in array */ int next_tuple; /* index of next one to return */ /* batch-level state, for optimizing rewinds and avoiding useless fetch */ int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ int fetch_size; /* number of tuples per fetch */ } PgFdwScanState; /* * Execution state of a foreign insert/update/delete operation. */ typedef struct PgFdwModifyState { Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* for remote query execution */ PGconn *conn; /* connection for the scan */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ List *target_attrs; /* list of target attribute numbers */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ /* info about parameters for prepared statement */ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ AttrNumber tboidAttno; /* attnum of input resjunk tableoid column */ int p_nums; /* number of parameters to transmit */ FmgrInfo *p_flinfo; /* output conversion functions for them */ /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ } PgFdwModifyState; /* * Workspace for analyzing a foreign table. */ typedef struct PgFdwAnalyzeState { Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ List *retrieved_attrs; /* attr numbers retrieved by query */ /* collected sample rows */ HeapTuple *rows; /* array of size targrows */ int targrows; /* target # of sample rows */ int numrows; /* # of sample rows collected */ /* for random sampling */ double samplerows; /* # of rows fetched */ double rowstoskip; /* # of rows to skip before next sample */ double rstate; /* random state */ /* working memory contexts */ MemoryContext anl_cxt; /* context for per-analyze lifespan data */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ } PgFdwAnalyzeState; /* * This enum describes what's kept in the fdw_private list for a ForeignPath. * We store: * * 1) Boolean flag showing if the remote query has the final sort * 2) Boolean flag showing if the remote query has the LIMIT clause */ enum FdwPathPrivateIndex { /* has-final-sort flag (as a Boolean node) */ FdwPathPrivateHasFinalSort, /* has-limit flag (as a Boolean node) */ FdwPathPrivateHasLimit }; /* Struct for extra information passed to estimate_path_cost_size() */ typedef struct { List *target; bool has_final_sort; bool has_limit; double limit_tuples; int64 count_est; int64 offset_est; } PgFdwPathExtraData; /* * Identify the attribute where data conversion fails. */ typedef struct ConversionLocation { Relation rel; /* foreign table's relcache entry */ AttrNumber cur_attno; /* attribute number being processed, or 0 */ ForeignScanState *fsstate; /* plan node being processed, or NULL */ } ConversionLocation; /* * SQL functions */ PG_FUNCTION_INFO_V1(postgres_fdw_handler); extern "C" Datum postgres_fdw_handler(PG_FUNCTION_ARGS); /* * FDW callback routines */ static void postgresGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid); static void postgresGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid); static ForeignScan *postgresGetForeignPlan(PlannerInfo *root, RelOptInfo *foreignrel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan); static void postgresBeginForeignScan(ForeignScanState *node, int eflags); static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); static void postgresReScanForeignScan(ForeignScanState *node); static void postgresEndForeignScan(ForeignScanState *node); static void postgresAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation); static List *postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index); static void postgresBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, List *fdw_private, int subplan_index, int eflags); static TupleTableSlot *postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); static TupleTableSlot *postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); static TupleTableSlot *postgresExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo); static int postgresIsForeignRelUpdatable(Relation rel); static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es); static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es); static void postgresExplainForeignScanRemote(ForeignScanState *node, ExplainState *es); static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages, void *additionalData, bool estimate_table_rownum); static void postgresGetForeignJoinPaths(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinType jointype, SpecialJoinInfo *sjinfo, List *restrictlist); static void _postgresGetForeignUpperPaths(FDWUpperRelCxt *ufdw_cxt, UpperRelationKind stage); static void postgresGetForeignUpperPaths(FDWUpperRelCxt* ufdwCxt, UpperRelationKind stage, Plan* mainPlan); /* * Helper functions */ static void estimate_path_cost_size(PlannerInfo *root, RelOptInfo *foreignrel, List *param_join_conds, List *pathkeys, PgFdwPathExtraData *fpextra, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost); static void get_remote_estimate(const char *sql, PGconn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost); static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); static PgFdwModifyState *createForeignModify(EState *estate, RangeTblEntry *rte, ResultRelInfo *resultRelInfo, CmdType operation, Plan *subplan, char *query, List *target_attrs, bool has_returning, List *retrieved_attrs); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, ItemPointer tableid, TupleTableSlot *slot); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows, void *additionalData, bool estimate_table_rownum); static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context); static void conversion_error_callback(void *arg); static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path); static void apply_server_options(PgFdwRelationInfo *fpinfo); static void apply_table_options(PgFdwRelationInfo *fpinfo); static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values); static void add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *grouped_rel, GroupPathExtraData *extra); static void add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *ordered_rel, OrderPathExtraData *extra); static void add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *final_rel, FinalPathExtraData *extra); static void adjust_foreign_grouping_path_cost(PlannerInfo *root, List *pathkeys, double retrieved_rows, double width, double limit_tuples, Cost *p_startup_cost, Cost *p_run_cost); static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values); /* * Foreign-data wrapper handler function: return a struct with pointers * to my callback routines. */ Datum postgres_fdw_handler(PG_FUNCTION_ARGS) { FdwRoutine *routine = makeNode(FdwRoutine); /* Functions for scanning foreign tables */ routine->GetForeignRelSize = postgresGetForeignRelSize; routine->GetForeignPaths = postgresGetForeignPaths; routine->GetForeignPlan = postgresGetForeignPlan; routine->BeginForeignScan = postgresBeginForeignScan; routine->IterateForeignScan = postgresIterateForeignScan; routine->ReScanForeignScan = postgresReScanForeignScan; routine->EndForeignScan = postgresEndForeignScan; /* Functions for updating foreign tables */ routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; routine->PlanForeignModify = postgresPlanForeignModify; routine->BeginForeignModify = postgresBeginForeignModify; routine->ExecForeignInsert = postgresExecForeignInsert; routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; routine->EndForeignModify = postgresEndForeignModify; routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; /* Support functions for EXPLAIN */ routine->ExplainForeignScan = postgresExplainForeignScan; routine->ExplainForeignModify = postgresExplainForeignModify; routine->ExplainForeignScanRemote = postgresExplainForeignScanRemote; /* Support functions for ANALYZE */ routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; routine->AcquireSampleRows = postgresAcquireSampleRowsFunc; /* Support functions for join push-down */ routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; PG_RETURN_POINTER(routine); } /* * postgresGetForeignRelSize * Estimate # of rows and width of the result of the scan * * We should consider the effect of all baserestrictinfo clauses here, but * not any join clauses. */ static void postgresGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid) { ListCell *lc = NULL; /* * We use PgFdwRelationInfo to pass various information to subsequent * functions. */ PgFdwRelationInfo* fpinfo = (PgFdwRelationInfo *)palloc0(sizeof(PgFdwRelationInfo)); baserel->fdw_private = (void *)fpinfo; /* Base foreign tables need to be pushed down always. */ fpinfo->pushdown_safe = true; /* Look up foreign-table catalog info. */ fpinfo->table = GetForeignTable(foreigntableid); fpinfo->server = GetForeignServer(fpinfo->table->serverid); /* * Extract user-settable option values. Note that per-table setting of * use_remote_estimate overrides per-server setting. */ fpinfo->use_remote_estimate = false; fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; fpinfo->shippable_extensions = NIL; fpinfo->fetch_size = 100; fpinfo->async_capable = false; apply_server_options(fpinfo); apply_table_options(fpinfo); /* * If the table or the server is configured to use remote estimates, * identify which user to do remote access as during planning. This * should match what ExecCheckRTEPerms() does. If we fail due to lack of * permissions, the query would have failed at runtime anyway. */ if (fpinfo->use_remote_estimate) { RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root); Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid); } else { fpinfo->user = NULL; } /* * Identify which baserestrictinfo clauses can be sent to the remote * server and which can't. */ classifyConditions(root, baserel, baserel->baserestrictinfo, &fpinfo->remote_conds, &fpinfo->local_conds); /* * Identify which attributes will need to be retrieved from the remote * server. These include all attrs needed for joins or final output, plus * all attrs used in the local_conds. (Note: if we end up using a * parameterized scan, it's possible that some of the join clauses will be * sent to the remote and thus we wouldn't really need to retrieve the * columns used in them. Doesn't seem worth detecting that case though.) */ fpinfo->attrs_used = NULL; pull_varattnos((Node *)baserel->reltarget->exprs, baserel->relid, &fpinfo->attrs_used); foreach (lc, fpinfo->local_conds) { RestrictInfo *rinfo = (RestrictInfo *)lfirst(lc); pull_varattnos((Node *)rinfo->clause, baserel->relid, &fpinfo->attrs_used); } /* * Compute the selectivity and cost of the local_conds, so we don't have * to do it over again for each path. The best we can do for these * conditions is to estimate selectivity on the basis of local statistics. */ fpinfo->local_conds_sel = clauselist_selectivity(root, fpinfo->local_conds, baserel->relid, JOIN_INNER, NULL); cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); /* * Set # of retrieved rows and cached relation costs to some negative * value, so that we can detect when they are set to some sensible values, * during one (usually the first) of the calls to estimate_path_cost_size. */ fpinfo->retrieved_rows = -1; fpinfo->rel_startup_cost = -1; fpinfo->rel_total_cost = -1; /* * If the table or the server is configured to use remote estimates, * connect to the foreign server and execute EXPLAIN to estimate the * number of rows selected by the restriction clauses, as well as the * average row width. Otherwise, estimate using whatever statistics we * have locally, in a way similar to ordinary tables. */ if (fpinfo->use_remote_estimate) { /* * Get cost/size estimates with help of remote server. Save the * values in fpinfo so we don't need to do it again to generate the * basic foreign path. */ estimate_path_cost_size(root, baserel, NIL, NIL, NULL, &fpinfo->rows, &fpinfo->width, &fpinfo->startup_cost, &fpinfo->total_cost); /* Report estimated baserel size to planner. */ baserel->rows = fpinfo->rows; baserel->reltarget->width = fpinfo->width; } else { /* * If the foreign table has never been ANALYZEd, it will have relpages * and reltuples equal to zero, which most likely has nothing to do * with reality. We can't do a whole lot about that if we're not * allowed to consult the remote server, but we can use a hack similar * to plancat.c's treatment of empty relations: use a minimum size * estimate of 10 pages, and divide by the column-datatype-based width * estimate to get the corresponding number of tuples. */ if (baserel->pages == 0 && baserel->tuples == 0) { baserel->pages = 10; baserel->tuples = (double)(10 * BLCKSZ) / (baserel->reltarget->width + sizeof(HeapTupleHeaderData)); } /* Estimate baserel size as best we can with local statistics. */ set_baserel_size_estimates(root, baserel); /* Fill in basically-bogus cost estimates for use later. */ estimate_path_cost_size(root, baserel, NIL, NIL, NULL, &fpinfo->rows, &fpinfo->width, &fpinfo->startup_cost, &fpinfo->total_cost); } /* * fpinfo->relation_name gets the numeric rangetable index of the foreign * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a * human-readable string at that time.) */ fpinfo->relation_name = psprintf("%u", baserel->relid); /* No outer and inner relations. */ fpinfo->make_outerrel_subquery = false; fpinfo->make_innerrel_subquery = false; fpinfo->lower_subquery_rels = NULL; /* Set the relation index. */ fpinfo->relation_index = baserel->relid; } /* * get_useful_ecs_for_relation * Determine which EquivalenceClasses might be involved in useful * orderings of this relation. * * This function is in some respects a mirror image of the core function * pathkeys_useful_for_merging: for a regular table, we know what indexes * we have and want to test whether any of them are useful. For a foreign * table, we don't know what indexes are present on the remote side but * want to speculate about which ones we'd like to use if they existed. * * This function returns a list of potentially-useful equivalence classes, * but it does not guarantee that an EquivalenceMember exists which contains * Vars only from the given relation. For example, given ft1 JOIN t1 ON * ft1.x + t1.x = 0, this function will say that the equivalence class * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and * t1 is local (or on a different server), it will turn out that no useful * ORDER BY clause can be generated. It's not our job to figure that out * here; we're only interested in identifying relevant ECs. */ static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel) { List *useful_eclass_list = NIL; ListCell *lc = NULL; Relids relids; /* * First, consider whether any active EC is potentially useful for a merge * join against this relation. */ if (rel->has_eclass_joins) { foreach (lc, root->eq_classes) { EquivalenceClass *cur_ec = (EquivalenceClass *)lfirst(lc); if (eclass_useful_for_merging(cur_ec, rel)) useful_eclass_list = lappend(useful_eclass_list, cur_ec); } } /* * Next, consider whether there are any non-EC derivable join clauses that * are merge-joinable. If the joininfo list is empty, we can exit * quickly. */ if (rel->joininfo == NIL) { return useful_eclass_list; } relids = rel->relids; /* Check each join clause in turn. */ foreach (lc, rel->joininfo) { RestrictInfo *restrictinfo = (RestrictInfo *)lfirst(lc); /* Consider only mergejoinable clauses */ if (restrictinfo->mergeopfamilies == NIL) { continue; } /* Make sure we've got canonical ECs. */ update_mergeclause_eclasses(root, restrictinfo); /* * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee * that left_ec and right_ec will be initialized, per comments in * distribute_qual_to_rels. * * We want to identify which side of this merge-joinable clause * contains columns from the relation produced by this RelOptInfo. We * test for overlap, not containment, because there could be extra * relations on either side. For example, suppose we've got something * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON * A.y = D.y. The input rel might be the joinrel between A and B, and * we'll consider the join clause A.y = D.y. relids contains a * relation not involved in the join class (B) and the equivalence * class for the left-hand side of the clause contains a relation not * involved in the input rel (C). Despite the fact that we have only * overlap and not containment in either direction, A.y is potentially * useful as a sort column. * * Note that it's even possible that relids overlaps neither side of * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list, * but overlaps neither side of B. In that case, we just skip this * join clause, since it doesn't suggest a useful sort order for this * relation. */ if (bms_overlap(relids, restrictinfo->right_ec->ec_relids)) { useful_eclass_list = list_append_unique_ptr(useful_eclass_list, restrictinfo->right_ec); } else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids)) { useful_eclass_list = list_append_unique_ptr(useful_eclass_list, restrictinfo->left_ec); } } return useful_eclass_list; } /* * get_useful_pathkeys_for_relation * Determine which orderings of a relation might be useful. * * Getting data in sorted order can be useful either because the requested * order matches the final output ordering for the overall query we're * planning, or because it enables an efficient merge join. Here, we try * to figure out which pathkeys to consider. */ static List *get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel) { List *useful_pathkeys_list = NIL; List *useful_eclass_list = NIL; PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *)rel->fdw_private; EquivalenceClass *query_ec = NULL; ListCell *lc = NULL; /* * Pushing the query_pathkeys to the remote server is always worth * considering, because it might let us avoid a local sort. */ fpinfo->qp_is_pushdown_safe = false; if (root->query_pathkeys) { bool query_pathkeys_ok = true; foreach (lc, root->query_pathkeys) { PathKey *pathkey = (PathKey *)lfirst(lc); /* * The planner and executor don't have any clever strategy for * taking data sorted by a prefix of the query's pathkeys and * getting it to be sorted by all of those pathkeys. We'll just * end up resorting the entire data set. So, unless we can push * down all of the query pathkeys, forget it. */ if (!is_foreign_pathkey(root, rel, pathkey)) { query_pathkeys_ok = false; break; } } if (query_pathkeys_ok) { useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys)); fpinfo->qp_is_pushdown_safe = true; } } /* * Even if we're not using remote estimates, having the remote side do the * sort generally won't be any worse than doing it locally, and it might * be much better if the remote side can generate data in the right order * without needing a sort at all. However, what we're going to do next is * try to generate pathkeys that seem promising for possible merge joins, * and that's more speculative. A wrong choice might hurt quite a bit, so * bail out if we can't use remote estimates. */ if (!fpinfo->use_remote_estimate) { return useful_pathkeys_list; } /* Get the list of interesting EquivalenceClasses. */ useful_eclass_list = get_useful_ecs_for_relation(root, rel); /* Extract unique EC for query, if any, so we don't consider it again. */ if (list_length(root->query_pathkeys) == 1) { PathKey *query_pathkey = (PathKey *)linitial(root->query_pathkeys); query_ec = query_pathkey->pk_eclass; } /* * As a heuristic, the only pathkeys we consider here are those of length * one. It's surely possible to consider more, but since each one we * choose to consider will generate a round-trip to the remote side, we * need to be a bit cautious here. It would sure be nice to have a local * cache of information about remote index definitions... */ foreach (lc, useful_eclass_list) { EquivalenceClass *cur_ec = (EquivalenceClass *)lfirst(lc); PathKey *pathkey = NULL; /* If redundant with what we did above, skip it. */ if (cur_ec == query_ec) { continue; } /* Can't push down the sort if the EC's opfamily is not shippable. */ if (!is_builtin(linitial_oid(cur_ec->ec_opfamilies))) { continue; } /* If no pushable expression for this rel, skip it. */ if (find_em_for_rel(root, cur_ec, rel) == NULL) { continue; } /* Looks like we can generate a pathkey, so let's do it. */ pathkey = make_canonical_pathkey(root, cur_ec, linitial_oid(cur_ec->ec_opfamilies), BTLessStrategyNumber, false); useful_pathkeys_list = lappend(useful_pathkeys_list, list_make1(pathkey)); } return useful_pathkeys_list; } /* * postgresGetForeignPaths * Create possible scan paths for a scan on the foreign table */ static void postgresGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid) { PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *)baserel->fdw_private; ListCell *lc = NULL; /* * Create simplest ForeignScan path node and add it to baserel. This path * corresponds to SeqScan path of regular tables (though depending on what * baserestrict conditions we were able to send to remote, there might * actually be an indexscan happening there). We already did all the work * to estimate cost and size of this path. * * Although this path uses no join clauses, it could still have required * parameterization due to LATERAL refs in its tlist. */ ForeignPath* path = create_foreignscan_path(root, baserel, fpinfo->startup_cost, fpinfo->total_cost, NIL, /* no pathkeys */ NULL, NULL, NIL); /* no fdw_private list */ add_path(root, baserel, (Path *)path); /* Add paths with pathkeys */ add_paths_with_pathkeys_for_rel(root, baserel, NULL); /* * If we're not using remote estimates, stop here. We have no way to * estimate whether any join clauses would be worth sending across, so * don't bother building parameterized paths. */ if (!fpinfo->use_remote_estimate) { return; } /* * Thumb through all join clauses for the rel to identify which outer * relations could supply one or more safe-to-send-to-remote join clauses. * We'll build a parameterized path for each such outer relation. * * It's convenient to manage this by representing each candidate outer * relation by the ParamPathInfo node for it. We can then use the * ppi_clauses list in the ParamPathInfo node directly as a list of the * interesting join clauses for that rel. This takes care of the * possibility that there are multiple safe join clauses for such a rel, * and also ensures that we account for unsafe join clauses that we'll * still have to enforce locally (since the parameterized-path machinery * insists that we handle all movable clauses). */ List *ppi_list = NIL; foreach (lc, baserel->joininfo) { RestrictInfo *rinfo = (RestrictInfo *)lfirst(lc); /* Check if clause can be moved to this rel */ if (!join_clause_is_movable_to(rinfo, baserel->relid)) { continue; } /* See if it is safe to send to remote */ if (!is_foreign_expr(root, baserel, rinfo->clause)) { continue; } /* Calculate required outer rels for the resulting path */ Relids required_outer = bms_union(rinfo->clause_relids, NULL); /* We do not want the foreign rel itself listed in required_outer */ required_outer = bms_del_member(required_outer, baserel->relid); /* * required_outer probably can't be empty here, but if it were, we * couldn't make a parameterized path. */ if (bms_is_empty(required_outer)) { continue; } /* Get the ParamPathInfo */ ParamPathInfo* param_info = get_baserel_parampathinfo(root, baserel, required_outer); Assert(param_info != NULL); /* * Add it to list unless we already have it. Testing pointer equality * is OK since get_baserel_parampathinfo won't make duplicates. */ ppi_list = list_append_unique_ptr(ppi_list, param_info); } /* * Now build a path for each useful outer relation. */ foreach (lc, ppi_list) { ParamPathInfo *param_info = (ParamPathInfo *)lfirst(lc); double rows; int width; Cost startup_cost; Cost total_cost; /* Get a cost estimate from the remote */ estimate_path_cost_size(root, baserel, param_info->ppi_clauses, NIL, NULL, &rows, &width, &startup_cost, &total_cost); /* * ppi_rows currently won't get looked at by anything, but still we * may as well ensure that it matches our idea of the rowcount. */ param_info->ppi_rows = rows; /* Make the path */ path = create_foreignscan_path(root, baserel, startup_cost, total_cost, NIL, /* no pathkeys */ param_info->ppi_req_outer, NULL, NIL); /* no fdw_private list */ add_path(root, baserel, (Path *)path); } } /* * postgresGetForeignPlan * Create ForeignScan plan node which implements selected best path */ static ForeignScan *postgresGetForeignPlan(PlannerInfo *root, RelOptInfo *foreignrel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan) { PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *)foreignrel->fdw_private; Index scan_relid; List *fdw_private = NIL; List *remote_exprs = NIL; List *local_exprs = NIL; List *params_list = NIL; List *fdw_scan_tlist = NIL; List *fdw_recheck_quals = NIL; List *retrieved_attrs = NIL; StringInfoData sql; bool has_final_sort = false; bool has_limit = false; ListCell *lc = NULL; /* * Get FDW private data created by postgresGetForeignUpperPaths(), if any. */ if (best_path->fdw_private) { has_final_sort = boolVal(list_nth(best_path->fdw_private, FdwPathPrivateHasFinalSort)); has_limit = boolVal(list_nth(best_path->fdw_private, FdwPathPrivateHasLimit)); } if (IS_SIMPLE_REL(foreignrel)) { /* * For base relations, set scan_relid as the relid of the relation. */ scan_relid = foreignrel->relid; /* * In a base-relation scan, we must apply the given scan_clauses. * * Separate the scan_clauses into those that can be executed remotely * and those that can't. baserestrictinfo clauses that were * previously determined to be safe or unsafe by classifyConditions * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything * else in the scan_clauses list will be a join clause, which we have * to check for remote-safety. * * Note: the join clauses we see here should be the exact same ones * previously examined by postgresGetForeignPaths. Possibly it'd be * worth passing forward the classification work done then, rather * than repeating it here. * * This code must match "extract_actual_clauses(scan_clauses, false)" * except for the additional decision about remote versus local * execution. */ foreach (lc, scan_clauses) { RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); /* Ignore any pseudoconstants, they're dealt with elsewhere */ if (rinfo->pseudoconstant) { continue; } if (list_member_ptr(fpinfo->remote_conds, rinfo)) { remote_exprs = lappend(remote_exprs, rinfo->clause); } else if (list_member_ptr(fpinfo->local_conds, rinfo)) { local_exprs = lappend(local_exprs, rinfo->clause); } else if (is_foreign_expr(root, foreignrel, rinfo->clause)) { remote_exprs = lappend(remote_exprs, rinfo->clause); } else { local_exprs = lappend(local_exprs, rinfo->clause); } } /* * For a base-relation scan, we have to support EPQ recheck, which * should recheck all the remote quals. */ fdw_recheck_quals = remote_exprs; } else { /* * Join relation or upper relation - set scan_relid to 0. */ scan_relid = 0; /* * For a join rel, baserestrictinfo is NIL and we are not considering * parameterization right now, so there should be no scan_clauses for * a joinrel or an upper rel either. */ Assert(!scan_clauses); /* * Instead we get the conditions to apply from the fdw_private * structure. */ remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false); local_exprs = extract_actual_clauses(fpinfo->local_conds, false); /* * We leave fdw_recheck_quals empty in this case, since we never need * to apply EPQ recheck clauses. In the case of a joinrel, EPQ * recheck is handled elsewhere --- see postgresGetForeignJoinPaths(). * If we're planning an upperrel (ie, remote grouping or aggregation) * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be * allowed, and indeed we *can't* put the remote clauses into * fdw_recheck_quals because the unaggregated Vars won't be available * locally. */ /* Build the list of columns to be fetched from the foreign server. */ fdw_scan_tlist = build_tlist_to_deparse(foreignrel); /* * Ensure that the outer plan produces a tuple whose descriptor * matches our scan tuple slot. Also, remove the local conditions * from outer plan's quals, lest they be evaluated twice, once by the * local plan and once by the scan. */ if (outer_plan) { ListCell *lc = NULL; /* * Right now, we only consider grouping and aggregation beyond * joins. Queries involving aggregates or grouping do not require * EPQ mechanism, hence should not have an outer plan here. */ Assert(!IS_UPPER_REL(foreignrel)); /* * First, update the plan's qual list if possible. In some cases * the quals might be enforced below the topmost plan level, in * which case we'll fail to remove them; it's not worth working * harder than this. */ foreach (lc, local_exprs) { Node *qual = (Node *)lfirst(lc); outer_plan->qual = list_delete(outer_plan->qual, qual); /* * For an inner join the local conditions of foreign scan plan * can be part of the joinquals as well. (They might also be * in the mergequals or hashquals, but we can't touch those * without breaking the plan.) */ if (IsA(outer_plan, NestLoop) || IsA(outer_plan, MergeJoin) || IsA(outer_plan, HashJoin)) { Join *join_plan = (Join *)outer_plan; if (join_plan->jointype == JOIN_INNER) { join_plan->joinqual = list_delete(join_plan->joinqual, qual); } } } /* * Now fix the subplan's tlist --- this might result in inserting * a Result node atop the plan tree. */ outer_plan = change_plan_targetlist(root, outer_plan, fdw_scan_tlist); } } /* * Build the query string to be sent for execution, and identify * expressions to be sent as parameters. */ initStringInfo(&sql); deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, remote_exprs, best_path->path.pathkeys, has_final_sort, has_limit, false, &retrieved_attrs, ¶ms_list); /* Remember remote_exprs for possible use by postgresPlanDirectModify */ fpinfo->final_remote_exprs = remote_exprs; /* * Build the fdw_private list that will be available to the executor. * Items in the list must match order in enum FdwScanPrivateIndex. */ fdw_private = list_make3(makeString(sql.data), retrieved_attrs, makeInteger(fpinfo->fetch_size)); if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) { fdw_private = lappend(fdw_private, makeString(fpinfo->relation_name)); } /* * Create the ForeignScan node for the given relation. * * Note that the remote parameter expressions are stored in the fdw_exprs * field of the finished plan node; we can't keep them in private state * because then they wouldn't be subject to later planner processing. */ return make_foreignscan(tlist, local_exprs, scan_relid, params_list, fdw_private, fdw_scan_tlist, fdw_recheck_quals, outer_plan); } /* * Construct a tuple descriptor for the scan tuples handled by a foreign join. */ static TupleDesc get_tupdesc_for_join_scan_tuples(ForeignScanState *node) { ForeignScan *fsplan = (ForeignScan *)node->ss.ps.plan; EState *estate = node->ss.ps.state; TupleDesc tupdesc; /* * The core code has already set up a scan tuple slot based on * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough, * but there's one case where it isn't. If we have any whole-row row * identifier Vars, they may have vartype RECORD, and we need to replace * that with the associated table's actual composite type. This ensures * that when we read those ROW() expression values from the remote server, * we can convert them to a composite type the local server knows. */ tupdesc = CreateTupleDescCopy(node->ss.ss_ScanTupleSlot->tts_tupleDescriptor); for (int i = 0; i < tupdesc->natts; i++) { Form_pg_attribute att = TupleDescAttr(tupdesc, i); Var *var = NULL; RangeTblEntry *rte = NULL; Oid reltype; /* Nothing to do if it's not a generic RECORD attribute */ if (att->atttypid != RECORDOID || att->atttypmod >= 0) { continue; } /* * If we can't identify the referenced table, do nothing. This'll * likely lead to failure later, but perhaps we can muddle through. */ var = (Var *)list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, i)->expr; if (!IsA(var, Var) || var->varattno != 0) { continue; } rte = (RangeTblEntry*)list_nth(estate->es_range_table, var->varno - 1); if (rte->rtekind != RTE_RELATION) { continue; } reltype = get_rel_type_id(rte->relid); if (!OidIsValid(reltype)) { continue; } att->atttypid = reltype; /* shouldn't need to change anything else */ } return tupdesc; } /* * postgresBeginForeignScan * Initiate an executor scan of a foreign PostgreSQL table. */ static void postgresBeginForeignScan(ForeignScanState *node, int eflags) { ForeignScan *fsplan = (ForeignScan *)node->ss.ps.plan; EState *estate = node->ss.ps.state; PgFdwScanState *fsstate = NULL; int numParams; /* * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. */ if (eflags & EXEC_FLAG_EXPLAIN_ONLY) { return; } /* * We'll save private state in node->fdw_state. */ fsstate = (PgFdwScanState *)palloc0(sizeof(PgFdwScanState)); node->fdw_state = (void *)fsstate; /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ fsstate->conn = GetConnectionByFScanState(node, false); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); fsstate->cursor_exists = false; /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fsplan->fdw_private, FdwScanPrivateSelectSql)); fsstate->retrieved_attrs = (List *)list_nth(fsplan->fdw_private, FdwScanPrivateRetrievedAttrs); fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private, FdwScanPrivateFetchSize)); /* Create contexts for batches of tuples and per-tuple temp workspace. */ fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw tuple data", ALLOCSET_DEFAULT_SIZES); fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); /* * Get info we'll need for converting data fetched from the foreign server * into local representation and error reporting during that process. */ if (fsplan->scan.scanrelid > 0) { fsstate->rel = node->ss.ss_currentRelation; fsstate->tupdesc = RelationGetDescr(fsstate->rel); } else { fsstate->rel = NULL; fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node); } fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); /* * Prepare for processing of parameters used in remote query, if any. */ numParams = list_length(fsplan->fdw_exprs); fsstate->numParams = numParams; if (numParams > 0) { prepare_query_params((PlanState *)node, fsplan->fdw_exprs, numParams, &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); } } /* * postgresIterateForeignScan * Retrieve next row from the result set, or clear tuple slot to indicate * EOF. */ static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *)node->fdw_state; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* * If this is the first call after Begin or ReScan, we need to create the * cursor on the remote side. */ if (!fsstate->cursor_exists) { create_cursor(node); } /* * Get some more tuples, if we've run out. */ if (fsstate->next_tuple >= fsstate->num_tuples) { /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) { fetch_more_data(node); } /* If we didn't get any tuples, must be end of data. */ if (fsstate->next_tuple >= fsstate->num_tuples) { return ExecClearTuple(slot); } } /* * Return the next tuple. */ (void)ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++], slot, InvalidBuffer, false); return slot; } /* * postgresReScanForeignScan * Restart the scan. */ static void postgresReScanForeignScan(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *)node->fdw_state; char sql[64] = {'\0'}; int rc; /* If we haven't created the cursor yet, nothing to do. */ if (!fsstate->cursor_exists) { return; } /* * If any internal parameters affecting this node have changed, we'd * better destroy and recreate the cursor. Otherwise, rewinding it should * be good enough. If we've only fetched zero or one batch, we needn't * even rewind the cursor, just rescan what we have. */ if (node->ss.ps.chgParam != NULL) { fsstate->cursor_exists = false; rc = snprintf_s(sql, sizeof(sql), sizeof(sql) - 1, "CLOSE c%u", fsstate->cursor_number); securec_check_ss(rc, "", ""); } else if (fsstate->fetch_ct_2 > 1) { rc = snprintf_s(sql, sizeof(sql), sizeof(sql) - 1, "MOVE BACKWARD ALL IN c%u", fsstate->cursor_number); securec_check_ss(rc, "", ""); } else { /* Easy: just rescan what we already have in memory, if anything */ fsstate->next_tuple = 0; return; } /* * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ PGresult* res = pgfdw_exec_query(fsstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) { pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); } PQclear(res); /* Now force a fresh FETCH. */ fsstate->tuples = NULL; fsstate->num_tuples = 0; fsstate->next_tuple = 0; fsstate->fetch_ct_2 = 0; fsstate->eof_reached = false; } /* * postgresEndForeignScan * Finish scanning foreign table and dispose objects used for this scan */ static void postgresEndForeignScan(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *)node->fdw_state; /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ if (fsstate == NULL) { return; } /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) { close_cursor(fsstate->conn, fsstate->cursor_number); } /* Release remote connection */ ReleaseConnection(fsstate->conn); fsstate->conn = NULL; /* MemoryContexts will be deleted automatically. */ } /* * postgresAddForeignUpdateTargets * Add resjunk column(s) needed for update/delete on a foreign table */ static void postgresAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation) { /* * In postgres_fdw, what we need is the ctid, same as for a regular table. * Make a Var representing the desired value */ Var* var = makeVar((Index)linitial_int(parsetree->resultRelations), SelfItemPointerAttributeNumber, TIDOID, -1, InvalidOid, 0); /* Wrap it in a resjunk TLE with the right name ... */ const char *attrname = "ctid"; TargetEntry* tle = makeTargetEntry((Expr *)var, list_length(parsetree->targetList) + 1, pstrdup(attrname), true); /* ... and add it to the query's targetlist */ parsetree->targetList = lappend(parsetree->targetList, tle); /* * Because of partition table, ctid alone cannot determine a tuple, and tableoid is required. * Otherwise, the tuples that should not be operated will be operated. * Foreign table can not distinguish the type of remote table at this time, so we always add it. */ Var* var2 = makeVar((Index)linitial_int(parsetree->resultRelations), TableOidAttributeNumber, OIDOID, -1, InvalidOid, 0); const char *attrname2 = "tableoid"; TargetEntry* tle2 = makeTargetEntry((Expr *)var2, list_length(parsetree->targetList) + 1, pstrdup(attrname2), true); parsetree->targetList = lappend(parsetree->targetList, tle2); } /* * postgresPlanForeignModify * Plan an insert/update/delete operation on a foreign table * * Note: currently, the plan tree generated for UPDATE/DELETE will always * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE) * and then the ModifyTable node will have to execute individual remote * UPDATE/DELETE commands. If there are no local conditions or joins * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING * and then do nothing at ModifyTable. Room for future optimization ... */ static List *postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index) { CmdType operation = plan->operation; RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); StringInfoData sql; List *targetAttrs = NIL; List *withCheckOptionList = NIL; List *returningList = NIL; List *retrieved_attrs = NIL; /* * Core code already has some lock on each rel being planned, so we can * use NoLock here. */ Relation rel = heap_open(rte->relid, NoLock); initStringInfo(&sql); /* * In an INSERT, we transmit all columns that are defined in the foreign * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the * foreign table, we transmit all columns like INSERT; else we transmit * only columns that were explicitly targets of the UPDATE, so as to avoid * unnecessary data transmission. (We can't do that for INSERT since we * would miss sending default values for columns not listed in the source * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since * those triggers might change values for non-target columns, in which * case we would miss sending changed values for those columns.) */ if (operation == CMD_INSERT || (operation == CMD_UPDATE && rel->trigdesc && rel->trigdesc->trig_update_before_row)) { TupleDesc tupdesc = RelationGetDescr(rel); int attnum; for (attnum = 1; attnum <= tupdesc->natts; attnum++) { Form_pg_attribute attr = &tupdesc->attrs[attnum - 1]; if (!attr->attisdropped) { targetAttrs = lappend_int(targetAttrs, attnum); } } } else if (operation == CMD_UPDATE) { Bitmapset *tmpset = bms_copy(rte->updatedCols); Bitmapset *allUpdatedCols = bms_union(tmpset, rte->extraUpdatedCols); AttrNumber col; while ((col = bms_first_member(allUpdatedCols)) >= 0) { col += FirstLowInvalidHeapAttributeNumber; if (col <= InvalidAttrNumber) { /* shouldn't happen */ elog(ERROR, "system-column update is not supported"); } targetAttrs = lappend_int(targetAttrs, col); } } /* * Extract the relevant WITH CHECK OPTION list if any. */ if (plan->withCheckOptionLists) { withCheckOptionList = (List*)list_nth(plan->withCheckOptionLists, subplan_index); } /* * Extract the relevant RETURNING list if any. */ if (plan->returningLists) { returningList = (List *)list_nth(plan->returningLists, subplan_index); } /* * Construct the SQL command string. */ switch (operation) { case CMD_INSERT: deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, withCheckOptionList, returningList, &retrieved_attrs); break; case CMD_UPDATE: deparseUpdateSql(&sql, rte, resultRelation, rel, targetAttrs, withCheckOptionList, returningList, &retrieved_attrs); break; case CMD_DELETE: deparseDeleteSql(&sql, rte, resultRelation, rel, returningList, &retrieved_attrs); break; default: elog(ERROR, "unexpected operation: %d", (int)operation); break; } heap_close(rel, NoLock); /* * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ return list_make4(makeString(sql.data), targetAttrs, makeInteger((retrieved_attrs != NIL)), retrieved_attrs); } /* * postgresBeginForeignModify * Begin an insert/update/delete operation on a foreign table */ static void postgresBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, List *fdw_private, int subplan_index, int eflags) { /* * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState * stays NULL. */ if (eflags & EXEC_FLAG_EXPLAIN_ONLY) { return; } /* Deconstruct fdw_private data. */ char *query = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql)); List *target_attrs = (List *)list_nth(fdw_private, FdwModifyPrivateTargetAttnums); bool has_returning = (bool)intVal(list_nth(fdw_private, FdwModifyPrivateHasReturning)); List *retrieved_attrs = (List *)list_nth(fdw_private, FdwModifyPrivateRetrievedAttrs); /* Find RTE. */ RangeTblEntry *rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, mtstate->ps.state->es_range_table); /* Construct an execution state. */ PgFdwModifyState *fmstate = createForeignModify(mtstate->ps.state, rte, resultRelInfo, mtstate->operation, mtstate->mt_plans[subplan_index]->plan, query, target_attrs, has_returning, retrieved_attrs); resultRelInfo->ri_FdwState = fmstate; } /* * Construct an execution state of a foreign insert/update/delete operation */ static PgFdwModifyState *createForeignModify(EState *estate, RangeTblEntry *rte, ResultRelInfo *resultRelInfo, CmdType operation, Plan *subplan, char *query, List *target_attrs, bool has_returning, List *retrieved_attrs) { PgFdwModifyState *fmstate = NULL; Relation rel = resultRelInfo->ri_RelationDesc; TupleDesc tupdesc = RelationGetDescr(rel); Oid userid; AttrNumber n_params; Oid typefnoid; bool isvarlena = false; ListCell *lc = NULL; /* Begin constructing PgFdwModifyState. */ fmstate = (PgFdwModifyState *)palloc0(sizeof(PgFdwModifyState)); fmstate->rel = rel; /* * Identify which user to do the remote access as. This should match what * ExecCheckRTEPerms() does. */ userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); /* Get info about foreign table. */ ForeignTable* table = GetForeignTable(RelationGetRelid(rel)); UserMapping* user = GetUserMapping(userid, table->serverid); ForeignServer *server = GetForeignServer(table->serverid); /* Open connection; report that we'll create a prepared statement. */ fmstate->conn = GetConnection(server, user, true); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ fmstate->query = query; fmstate->target_attrs = target_attrs; fmstate->has_returning = has_returning; fmstate->retrieved_attrs = retrieved_attrs; /* Create context for per-tuple temp workspace. */ fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); /* Prepare for input conversion of RETURNING results. */ if (fmstate->has_returning) { fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); } /* Prepare for output conversion of parameters used in prepared stmt. */ n_params = list_length(fmstate->target_attrs) + 2; // + ctid and tableoid fmstate->p_flinfo = (FmgrInfo *)palloc0(sizeof(FmgrInfo) * n_params); fmstate->p_nums = 0; if (operation == CMD_UPDATE || operation == CMD_DELETE) { Assert(subplan != NULL); /* Find the ctid and tableoid resjunk column in the subplan's result */ fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, "ctid"); if (!AttributeNumberIsValid(fmstate->ctidAttno)) { elog(ERROR, "could not find junk ctid column"); } fmstate->tboidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, "tableoid"); if (!AttributeNumberIsValid(fmstate->ctidAttno)) { elog(ERROR, "could not find junk tableoid column"); } /* First and second transmittable parameter will be ctid and tableoid */ getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); fmstate->p_nums++; getTypeOutputInfo(OIDOID, &typefnoid, &isvarlena); fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); fmstate->p_nums++; } if (operation == CMD_INSERT || operation == CMD_UPDATE) { /* Set up for remaining transmittable parameters */ foreach (lc, fmstate->target_attrs) { int attnum = lfirst_int(lc); Form_pg_attribute attr = &tupdesc->attrs[attnum - 1]; Assert(!attr->attisdropped); getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); fmstate->p_nums++; } } Assert(fmstate->p_nums <= n_params); return fmstate; } /* * postgresExecForeignInsert * Insert one row into a foreign table */ static TupleTableSlot *postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *)resultRelInfo->ri_FdwState; int n_rows; if (fmstate == NULL) { StringInfoData sql; Index resultRelation = resultRelInfo->ri_RangeTableIndex; Relation rel = resultRelInfo->ri_RelationDesc; TupleDesc tupdesc = RelationGetDescr(rel); List *targetAttrs = NIL; List *retrieved_attrs = NIL; RangeTblEntry *rte = rt_fetch(resultRelation, estate->es_range_table); initStringInfo(&sql); /* We transmit all columns that are defined in the foreign table. */ for (int attnum = 1; attnum <= tupdesc->natts; attnum++) { Form_pg_attribute attr = &tupdesc->attrs[attnum - 1]; if (!attr->attisdropped) { targetAttrs = lappend_int(targetAttrs, attnum); } } deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, NULL, NULL, &retrieved_attrs); fmstate = createForeignModify(estate, rte, resultRelInfo, CMD_INSERT, NULL, sql.data, targetAttrs, retrieved_attrs != NIL, retrieved_attrs); resultRelInfo->ri_FdwState = fmstate; } /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) { prepare_foreign_modify(fmstate); } /* Convert parameters needed by prepared statement to text form */ const char **p_values = convert_prep_stmt_params(fmstate, NULL, NULL, slot); /* * Execute the prepared statement. */ if (!PQsendQueryPrepared(fmstate->conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0)) { pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); } /* * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ PGresult* res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) { pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); } /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { n_rows = PQntuples(res); if (n_rows > 0) { store_returning_result(fmstate, slot, res); } } else { n_rows = atoi(PQcmdTuples(res)); } /* And clean up */ PQclear(res); MemoryContextReset(fmstate->temp_cxt); /* Return NULL if nothing was inserted on the remote end */ return (n_rows > 0) ? slot : NULL; } /* * postgresExecForeignUpdate * Update one row in a foreign table */ static TupleTableSlot *postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *)resultRelInfo->ri_FdwState; Datum citd_datum, tboidd_atum; bool isNull = false; int n_rows; /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) { prepare_foreign_modify(fmstate); } /* Get the ctid and tableoid that was passed up as a resjunk column */ citd_datum = ExecGetJunkAttribute(planSlot, fmstate->ctidAttno, &isNull); /* shouldn't ever get a null result... */ if (isNull) { elog(ERROR, "ctid is NULL"); } tboidd_atum = ExecGetJunkAttribute(planSlot, fmstate->tboidAttno, &isNull); /* shouldn't ever get a null result... */ if (isNull) { elog(ERROR, "tableoid is NULL"); } /* Convert parameters needed by prepared statement to text form */ const char **p_values = convert_prep_stmt_params(fmstate, (ItemPointer)DatumGetPointer(citd_datum), (ItemPointer)DatumGetPointer(tboidd_atum), slot); /* * Execute the prepared statement. */ if (!PQsendQueryPrepared(fmstate->conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0)) { pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); } /* * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ PGresult* res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) { pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); } /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { n_rows = PQntuples(res); if (n_rows > 0) { store_returning_result(fmstate, slot, res); } } else { n_rows = atoi(PQcmdTuples(res)); } /* And clean up */ PQclear(res); MemoryContextReset(fmstate->temp_cxt); /* Return NULL if nothing was updated on the remote end */ return (n_rows > 0) ? slot : NULL; } /* * postgresExecForeignDelete * Delete one row from a foreign table */ static TupleTableSlot *postgresExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *)resultRelInfo->ri_FdwState; Datum citd_datum, tboidd_atum; bool isNull = false; int n_rows; /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) { prepare_foreign_modify(fmstate); } /* Get the ctid that was passed up as a resjunk column */ citd_datum = ExecGetJunkAttribute(planSlot, fmstate->ctidAttno, &isNull); /* shouldn't ever get a null result... */ if (isNull) { elog(ERROR, "ctid is NULL"); } tboidd_atum = ExecGetJunkAttribute(planSlot, fmstate->tboidAttno, &isNull); /* shouldn't ever get a null result... */ if (isNull) { elog(ERROR, "tableoid is NULL"); } /* Convert parameters needed by prepared statement to text form */ const char **p_values = convert_prep_stmt_params(fmstate, (ItemPointer)DatumGetPointer(citd_datum), (ItemPointer)DatumGetPointer(tboidd_atum), NULL); /* * Execute the prepared statement. */ if (!PQsendQueryPrepared(fmstate->conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0)) { pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); } /* * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ PGresult *res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) { pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); } /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { n_rows = PQntuples(res); if (n_rows > 0) { store_returning_result(fmstate, slot, res); } } else { n_rows = atoi(PQcmdTuples(res)); } /* And clean up */ PQclear(res); MemoryContextReset(fmstate->temp_cxt); /* Return NULL if nothing was deleted on the remote end */ return (n_rows > 0) ? slot : NULL; } /* * postgresEndForeignModify * Finish an insert/update/delete operation on a foreign table */ static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo) { PgFdwModifyState *fmstate = (PgFdwModifyState *)resultRelInfo->ri_FdwState; /* If fmstate is NULL, we are in EXPLAIN; nothing to do */ if (fmstate == NULL) { return; } /* If we created a prepared statement, destroy it */ if (fmstate->p_name) { char sql[64] = {'\0'}; int rc = snprintf_s(sql, sizeof(sql), sizeof(sql) - 1, "DEALLOCATE %s", fmstate->p_name); securec_check_ss(rc, "", ""); /* * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ PGresult* res = pgfdw_exec_query(fmstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) { pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); } PQclear(res); fmstate->p_name = NULL; } /* Release remote connection */ ReleaseConnection(fmstate->conn); fmstate->conn = NULL; } /* * postgresIsForeignRelUpdatable * Determine whether a foreign table supports INSERT, UPDATE and/or * DELETE. */ static int postgresIsForeignRelUpdatable(Relation rel) { ListCell *lc = NULL; /* * By default, all postgres_fdw foreign tables are assumed updatable. This * can be overridden by a per-server setting, which in turn can be * overridden by a per-table setting. */ bool updatable = true; ForeignTable* table = GetForeignTable(RelationGetRelid(rel)); ForeignServer* server = GetForeignServer(table->serverid); foreach (lc, server->options) { DefElem *def = (DefElem *)lfirst(lc); if (strcmp(def->defname, "updatable") == 0) { updatable = defGetBoolean(def); } } foreach (lc, table->options) { DefElem *def = (DefElem *)lfirst(lc); if (strcmp(def->defname, "updatable") == 0) { updatable = defGetBoolean(def); } } /* * Currently "updatable" means support for INSERT, UPDATE and DELETE. */ return updatable ? (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0; } /* * postgresExplainForeignScan * Produce extra output for EXPLAIN of a ForeignScan on a foreign table */ static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) { ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan); List *fdw_private = plan->fdw_private; /* * Identify foreign scans that are really joins or upper relations. The * input looks something like "(1) LEFT JOIN (2)", and we must replace the * digit string(s), which are RT indexes, with the correct relation names. * We do that here, not when the plan is created, because we can't know * what aliases ruleutils.c will assign at plan creation time. */ if (list_length(fdw_private) > FdwScanPrivateRelations) { StringInfo relations; char *rawrelations = NULL; char *ptr = NULL; int minrti, rtoffset; rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations)); /* * A difficulty with using a string representation of RT indexes is * that setrefs.c won't update the string when flattening the * rangetable. To find out what rtoffset was applied, identify the * minimum RT index appearing in the string and compare it to the * minimum member of plan->fs_relids. (We expect all the relids in * the join will have been offset by the same amount; the Asserts * below should catch it if that ever changes.) */ minrti = INT_MAX; ptr = rawrelations; while (*ptr) { if (isdigit((unsigned char)*ptr)) { int rti = strtol(ptr, &ptr, 10); if (rti < minrti) { minrti = rti; } } else { ptr++; } } rtoffset = bms_next_member(plan->fs_relids, -1) - minrti; /* Now we can translate the string */ relations = makeStringInfo(); ptr = rawrelations; while (*ptr) { if (isdigit((unsigned char)*ptr)) { int rti = strtol(ptr, &ptr, 10); RangeTblEntry *rte = NULL; char *relname = NULL; char *refname = NULL; rti += rtoffset; Assert(bms_is_member(rti, plan->fs_relids)); rte = rt_fetch(rti, es->rtable); Assert(rte->rtekind == RTE_RELATION); /* This logic should agree with explain.c's ExplainTargetRel */ relname = get_rel_name(rte->relid); if (es->verbose) { char *name_space = get_namespace_name_or_temp(get_rel_namespace(rte->relid)); appendStringInfo(relations, "%s.%s", quote_identifier(name_space), quote_identifier(relname)); } else { appendStringInfoString(relations, quote_identifier(relname)); } refname = rte->eref->aliasname; if (strcmp(refname, relname) != 0) { appendStringInfo(relations, " %s", quote_identifier(refname)); } } else { appendStringInfoChar(relations, *ptr++); } } ExplainPropertyText("Relations", relations->data, es); } /* * Add remote query, when VERBOSE option is specified. */ if (es->verbose) { char *sql = NULL; sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); ExplainPropertyText("Remote SQL", sql, es); } } /* * postgresExplainForeignModify * Produce extra output for EXPLAIN of a ModifyTable on a foreign table */ static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es) { if (es->verbose) { char *sql = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql)); ExplainPropertyText("Remote SQL", sql, es); } } static void postgresExplainForeignScanRemote(ForeignScanState *node, ExplainState *es) { ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan); List *fdw_private = plan->fdw_private; char *sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); /* 1. prepare the explain sql, only support inherit 'verbose' and 'costs' currently. */ int len = strlen(sql) + 64; // length of explain (..) sql; char *esql = (char*)palloc(len); errno_t rc = sprintf_s(esql, len, "EXPLAIN (VERBOSE %s, COSTS %s) %s", es->verbose ? "ON" : "OFF", es->costs ? "ON" : "OFF", sql); securec_check_ss(rc, "\0", "\0"); /* 2. print the explain sql*/ appendStringInfo(es->es_frs.str, "%s \n", esql); /* 3. get and print the plan */ PGconn *conn = GetConnectionByFScanState(node, false); PGresult *volatile res = pgfdw_exec_query(conn, esql); if (PQresultStatus(res) != PGRES_TUPLES_OK) { appendStringInfo(es->es_frs.str, "failed to get remote plan, error massage is:\n%s\n", PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY)); } else { int numrows = PQntuples(res); for (int i = 0; i < numrows; i++) { appendStringInfo(es->es_frs.str, " %s\n", PQgetvalue(res, i, 0)); } } pfree(esql); PQclear(res); ReleaseConnection(conn); } /* * estimate_path_cost_size * Get cost and size estimates for a foreign scan on given foreign relation * either a base relation or a join between foreign relations or an upper * relation containing foreign relations. * * param_join_conds are the parameterization clauses with outer relations. * pathkeys specify the expected sort order if any for given path being costed. * fpextra specifies additional post-scan/join-processing steps such as the * final sort and the LIMIT restriction. * * The function returns the cost and size estimates in p_rows, p_width, * p_startup_cost and p_total_cost variables. */ static void estimate_path_cost_size(PlannerInfo *root, RelOptInfo *foreignrel, List *param_join_conds, List *pathkeys, PgFdwPathExtraData *fpextra, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost) { PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *)foreignrel->fdw_private; double rows; double retrieved_rows; int width; Cost startup_cost; Cost total_cost; /* * If the table or the server is configured to use remote estimates, * connect to the foreign server and execute EXPLAIN to estimate the * number of rows selected by the restriction+join clauses. Otherwise, * estimate rows using whatever statistics we have locally, in a way * similar to ordinary tables. */ if (fpinfo->use_remote_estimate) { List *remote_param_join_conds = NIL; List *local_param_join_conds = NIL; StringInfoData sql; PGconn *conn = NULL; Selectivity local_sel; QualCost local_cost; List *fdw_scan_tlist = NIL; List *remote_conds = NIL; /* Required only to be passed to deparseSelectStmtForRel */ List *retrieved_attrs = NIL; /* * param_join_conds might contain both clauses that are safe to send * across, and clauses that aren't. */ classifyConditions(root, foreignrel, param_join_conds, &remote_param_join_conds, &local_param_join_conds); /* Build the list of columns to be fetched from the foreign server. */ if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) { fdw_scan_tlist = build_tlist_to_deparse(foreignrel); } else { fdw_scan_tlist = NIL; } /* * The complete list of remote conditions includes everything from * baserestrictinfo plus any extra join_conds relevant to this * particular path. */ remote_conds = list_concat(remote_param_join_conds, fpinfo->remote_conds); /* * Construct EXPLAIN query including the desired SELECT, FROM, and * WHERE clauses. Params and other-relation Vars are replaced by dummy * values, so don't request params_list. */ initStringInfo(&sql); appendStringInfoString(&sql, "EXPLAIN "); deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, remote_conds, pathkeys, fpextra ? fpextra->has_final_sort : false, fpextra ? fpextra->has_limit : false, false, &retrieved_attrs, NULL); /* Get the remote estimate */ conn = GetConnection(fpinfo->server, fpinfo->user, false); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); retrieved_rows = rows; /* Factor in the selectivity of the locally-checked quals */ local_sel = clauselist_selectivity(root, local_param_join_conds, foreignrel->relid, JOIN_INNER, NULL); local_sel *= fpinfo->local_conds_sel; rows = clamp_row_est(rows * local_sel); /* Add in the eval cost of the locally-checked quals */ startup_cost += fpinfo->local_conds_cost.startup; total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; cost_qual_eval(&local_cost, local_param_join_conds, root); startup_cost += local_cost.startup; total_cost += local_cost.per_tuple * retrieved_rows; /* * Add in tlist eval cost for each output row. In case of an * aggregate, some of the tlist expressions such as grouping * expressions will be evaluated remotely, so adjust the costs. */ if (IS_UPPER_REL(foreignrel)) { QualCost tlist_cost; cost_qual_eval(&tlist_cost, fdw_scan_tlist, root); startup_cost -= tlist_cost.startup; total_cost -= tlist_cost.startup; total_cost -= tlist_cost.per_tuple * rows; } } else { Cost run_cost = 0; /* * We don't support join conditions in this mode (hence, no * parameterized paths can be made). */ Assert(param_join_conds == NIL); /* * We will come here again and again with different set of pathkeys or * additional post-scan/join-processing steps that caller wants to * cost. We don't need to calculate the cost/size estimates for the * underlying scan, join, or grouping each time. Instead, use those * estimates if we have cached them already. */ if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0) { Assert(fpinfo->retrieved_rows >= 0); rows = fpinfo->rows; retrieved_rows = fpinfo->retrieved_rows; width = fpinfo->width; startup_cost = fpinfo->rel_startup_cost; run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost; /* * If we estimate the costs of a foreign scan or a foreign join * with additional post-scan/join-processing steps, the scan or * join costs obtained from the cache wouldn't yet contain the * eval costs for the final scan/join target, which would've been * updated by apply_scanjoin_target_to_paths(); add the eval costs * now. */ } else if (IS_JOIN_REL(foreignrel)) { PgFdwRelationInfo *fpinfo_i; PgFdwRelationInfo *fpinfo_o; QualCost join_cost; QualCost remote_conds_cost; double nrows; /* Use rows/width estimates made by the core code. */ rows = foreignrel->rows; width = foreignrel->reltarget->width; /* For join we expect inner and outer relations set */ Assert(fpinfo->innerrel && fpinfo->outerrel); fpinfo_i = (PgFdwRelationInfo *)fpinfo->innerrel->fdw_private; fpinfo_o = (PgFdwRelationInfo *)fpinfo->outerrel->fdw_private; /* Estimate of number of rows in cross product */ nrows = fpinfo_i->rows * fpinfo_o->rows; /* * Back into an estimate of the number of retrieved rows. Just in * case this is nuts, clamp to at most nrows. */ retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); retrieved_rows = Min(retrieved_rows, nrows); /* * The cost of foreign join is estimated as cost of generating * rows for the joining relations + cost for applying quals on the * rows. */ /* * Calculate the cost of clauses pushed down to the foreign server */ cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root); /* Calculate the cost of applying join clauses */ cost_qual_eval(&join_cost, fpinfo->joinclauses, root); /* * Startup cost includes startup cost of joining relations and the * startup cost for join and other clauses. We do not include the * startup cost specific to join strategy (e.g. setting up hash * tables) since we do not know what strategy the foreign server * is going to use. */ startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost; startup_cost += join_cost.startup; startup_cost += remote_conds_cost.startup; startup_cost += fpinfo->local_conds_cost.startup; /* * Run time cost includes: * * 1. Run time cost (total_cost - startup_cost) of relations being * joined * * 2. Run time cost of applying join clauses on the cross product * of the joining relations. * * 3. Run time cost of applying pushed down other clauses on the * result of join * * 4. Run time cost of applying nonpushable other clauses locally * on the result fetched from the foreign server. */ run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost; run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost; run_cost += nrows * join_cost.per_tuple; nrows = clamp_row_est(nrows * fpinfo->joinclause_sel); run_cost += nrows * remote_conds_cost.per_tuple; run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; } else if (IS_UPPER_REL(foreignrel)) { RelOptInfo *outerrel = fpinfo->outerrel; PgFdwRelationInfo *ofpinfo; AggClauseCosts aggcosts; double input_rows; int numGroupCols; double numGroups = 1; unsigned int numDatanodes; /* The upper relation should have its outer relation set */ Assert(outerrel); /* * This cost model is mixture of costing done for sorted and * hashed aggregates in cost_agg(). We are not sure which * strategy will be considered at remote side, thus for * simplicity, we put all startup related costs in startup_cost * and all finalization and run cost are added in total_cost. */ ofpinfo = (PgFdwRelationInfo *)outerrel->fdw_private; /* Get rows from input rel */ input_rows = ofpinfo->rows; /* Collect statistics about aggregates for estimating costs. */ MemSet(&aggcosts, 0, sizeof(AggClauseCosts)); if (root->parse->hasAggs) { count_agg_clauses(root, (Node*)foreignrel->reltarget->exprs, &aggcosts); count_agg_clauses(root, root->parse->havingQual, &aggcosts); } /* Get number of grouping columns and possible number of groups */ numGroupCols = list_length(root->parse->groupClause); numDatanodes = ng_get_dest_num_data_nodes(root, foreignrel); numGroups = estimate_num_groups(root, get_sortgrouplist_exprs(root->parse->groupClause, fpinfo->grouped_tlist), input_rows, numDatanodes); /* * Get the retrieved_rows and rows estimates. If there are HAVING * quals, account for their selectivity. */ if (root->parse->havingQual) { /* * Factor in the selectivity of the remotely-checked quals, * openGauss not support calc selectivity of agg in having, so just using numGroups */ retrieved_rows = numGroups; /* Factor in the selectivity of the locally-checked quals */ rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel); } else { rows = retrieved_rows = numGroups; } /* Use width estimate made by the core code. */ width = foreignrel->reltarget->width; /* ----- * Startup cost includes: * 1. Startup cost for underneath input relation, adjusted for * tlist replacement by apply_scanjoin_target_to_paths() * 2. Cost of performing aggregation, per cost_agg() * ----- */ startup_cost = ofpinfo->rel_startup_cost; startup_cost += aggcosts.transCost.startup; startup_cost += aggcosts.transCost.per_tuple * input_rows; startup_cost += aggcosts.finalCost; startup_cost += (u_sess->attr.attr_sql.cpu_operator_cost * numGroupCols) * input_rows; /* ----- * Run time cost includes: * 1. Run time cost of underneath input relation, adjusted for * tlist replacement by apply_scanjoin_target_to_paths() * 2. Run time cost of performing aggregation, per cost_agg() * ----- */ run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost; run_cost += aggcosts.finalCost * numGroups; run_cost += u_sess->attr.attr_sql.cpu_tuple_cost * numGroups; /* Account for the eval cost of HAVING quals, if any */ if (root->parse->havingQual) { QualCost remote_cost; /* Add in the eval cost of the remotely-checked quals */ cost_qual_eval(&remote_cost, fpinfo->remote_conds, root); startup_cost += remote_cost.startup; run_cost += remote_cost.per_tuple * numGroups; /* Add in the eval cost of the locally-checked quals */ startup_cost += fpinfo->local_conds_cost.startup; run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; } } else { Cost cpu_per_tuple; /* Use rows/width estimates made by set_baserel_size_estimates. */ rows = foreignrel->rows; width = foreignrel->reltarget->width; /* * Back into an estimate of the number of retrieved rows. Just in * case this is nuts, clamp to at most foreignrel->tuples. */ retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); retrieved_rows = Min(retrieved_rows, foreignrel->tuples); /* * Cost as though this were a seqscan, which is pessimistic. We * effectively imagine the local_conds are being evaluated * remotely, too. */ startup_cost = 0; run_cost = 0; run_cost += u_sess->attr.attr_sql.seq_page_cost * foreignrel->pages; startup_cost += foreignrel->baserestrictcost.startup; cpu_per_tuple = u_sess->attr.attr_sql.cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; run_cost += cpu_per_tuple * foreignrel->tuples; } /* * Without remote estimates, we have no real way to estimate the cost * of generating sorted output. It could be free if the query plan * the remote side would have chosen generates properly-sorted output * anyway, but in most cases it will cost something. Estimate a value * high enough that we won't pick the sorted path when the ordering * isn't locally useful, but low enough that we'll err on the side of * pushing down the ORDER BY clause when it's useful to do so. */ if (pathkeys != NIL) { if (IS_UPPER_REL(foreignrel)) { Assert(foreignrel->reloptkind == RELOPT_UPPER_REL && fpinfo->stage == UPPERREL_GROUP_AGG); adjust_foreign_grouping_path_cost(root, pathkeys, retrieved_rows, width, fpextra->limit_tuples, &startup_cost, &run_cost); } else { startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER; run_cost *= DEFAULT_FDW_SORT_MULTIPLIER; } } total_cost = startup_cost + run_cost; /* Adjust the cost estimates if we have LIMIT */ if (fpextra && fpextra->has_limit) { adjust_limit_rows_costs(&rows, &startup_cost, &total_cost, fpextra->offset_est, fpextra->count_est); retrieved_rows = rows; } } /* * Cache the retrieved rows and cost estimates for scans, joins, or * groupings without any parameterization, pathkeys, or additional * post-scan/join-processing steps, before adding the costs for * transferring data from the foreign server. These estimates are useful * for costing remote joins involving this relation or costing other * remote operations on this relation such as remote sorts and remote * LIMIT restrictions, when the costs can not be obtained from the foreign * server. This function will be called at least once for every foreign * relation without any parameterization, pathkeys, or additional * post-scan/join-processing steps. */ if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL) { fpinfo->retrieved_rows = retrieved_rows; fpinfo->rel_startup_cost = startup_cost; fpinfo->rel_total_cost = total_cost; } /* * Add some additional cost factors to account for connection overhead * (fdw_startup_cost), transferring data across the network * (fdw_tuple_cost per retrieved row), and local manipulation of the data * (cpu_tuple_cost per retrieved row). */ startup_cost += fpinfo->fdw_startup_cost; total_cost += fpinfo->fdw_startup_cost; total_cost += fpinfo->fdw_tuple_cost * retrieved_rows; total_cost += u_sess->attr.attr_sql.cpu_tuple_cost * retrieved_rows; /* * If we have LIMIT, we should prefer performing the restriction remotely * rather than locally, as the former avoids extra row fetches from the * remote that the latter might cause. But since the core code doesn't * account for such fetches when estimating the costs of the local * restriction (see create_limit_path()), there would be no difference * between the costs of the local restriction and the costs of the remote * restriction estimated above if we don't use remote estimates (except * for the case where the foreignrel is a grouping relation, the given * pathkeys is not NIL, and the effects of a bounded sort for that rel is * accounted for in costing the remote restriction). Tweak the costs of * the remote restriction to ensure we'll prefer it if LIMIT is a useful * one. */ if (!fpinfo->use_remote_estimate && fpextra && fpextra->has_limit && fpextra->limit_tuples > 0 && fpextra->limit_tuples < fpinfo->rows) { Assert(fpinfo->rows > 0); total_cost -= (total_cost - startup_cost) * 0.05 * (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows; } /* Return results. */ *p_rows = rows; *p_width = width; *p_startup_cost = startup_cost; *p_total_cost = total_cost; } /* * Estimate costs of executing a SQL statement remotely. * The given "sql" must be an EXPLAIN command. */ static void get_remote_estimate(const char *sql, PGconn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost) { PGresult *volatile res = NULL; /* PGresult must be released before leaving this function. */ PG_TRY(); { int n; /* * Execute EXPLAIN remotely. */ res = pgfdw_exec_query(conn, sql); if (PQresultStatus(res) != PGRES_TUPLES_OK) { pgfdw_report_error(ERROR, res, conn, false, sql); } /* * Extract cost numbers for topmost plan node. Note we search for a * left paren from the end of the line to avoid being confused by * other uses of parentheses. */ char *line = PQgetvalue(res, 0, 0); if (strstr(line, "Bypass") != NULL) { line = PQgetvalue(res, 1, 0); } char *p = strrchr(line, '('); if (p == NULL) { elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); } n = sscanf_s(p, "(cost=%lf..%lf rows=%lf width=%d)", startup_cost, total_cost, rows, width); if (n != 4) { elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); } PQclear(res); res = NULL; } PG_CATCH(); { if (res) { PQclear(res); } PG_RE_THROW(); } PG_END_TRY(); } /* * Adjust the cost estimates of a foreign grouping path to include the cost of * generating properly-sorted output. */ static void adjust_foreign_grouping_path_cost(PlannerInfo *root, List *pathkeys, double retrieved_rows, double width, double limit_tuples, Cost *p_startup_cost, Cost *p_run_cost) { /* * If the GROUP BY clause isn't sort-able, the plan chosen by the remote * side is unlikely to generate properly-sorted output, so it would need * an explicit sort; adjust the given costs with cost_sort(). Likewise, * if the GROUP BY clause is sort-able but isn't a superset of the given * pathkeys, adjust the costs with that function. Otherwise, adjust the * costs by applying the same heuristic as for the scan or join case. */ if (!grouping_is_sortable(root->parse->groupClause) || !pathkeys_contained_in(pathkeys, root->group_pathkeys)) { Path sort_path; /* dummy for result of cost_sort */ cost_sort(&sort_path, pathkeys, *p_startup_cost + *p_run_cost, retrieved_rows, width, 0.0, u_sess->attr.attr_memory.work_mem, limit_tuples, root->glob->vectorized); *p_startup_cost = sort_path.startup_cost; *p_run_cost = sort_path.total_cost - sort_path.startup_cost; } else { /* * The default extra cost seems too large for foreign-grouping cases; * add 1/4th of that default. */ double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER - 1.0) * 0.25; *p_startup_cost *= sort_multiplier; *p_run_cost *= sort_multiplier; } } /* * Create cursor for node's query with current parameter values. */ static void create_cursor(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *)node->fdw_state; ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; PGconn *conn = fsstate->conn; StringInfoData buf; PGresult *res = NULL; /* * Construct array of query parameter values in text format. We do the * conversions in the short-lived per-tuple context, so as not to cause a * memory leak over repeated scans. */ if (numParams > 0) { MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); process_query_params(econtext, fsstate->param_flinfo, fsstate->param_exprs, values); MemoryContextSwitchTo(oldcontext); } /* Construct the DECLARE CURSOR command */ initStringInfo(&buf); appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", fsstate->cursor_number, fsstate->query); /* * Notice that we pass NULL for paramTypes, thus forcing the remote server * to infer types for all parameters. Since we explicitly cast every * parameter (see deparse.c), the "inference" is trivial and will produce * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. */ if (!PQsendQueryParams(conn, buf.data, numParams, NULL, values, NULL, NULL, 0)) { pgfdw_report_error(ERROR, NULL, conn, false, buf.data); } /* * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = pgfdw_get_result(conn, buf.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) { pgfdw_report_error(ERROR, res, conn, true, fsstate->query); } PQclear(res); /* Mark the cursor as created, and show no tuples have been retrieved */ fsstate->cursor_exists = true; fsstate->tuples = NULL; fsstate->num_tuples = 0; fsstate->next_tuple = 0; fsstate->fetch_ct_2 = 0; fsstate->eof_reached = false; /* Clean up */ pfree(buf.data); } /* * Fetch some more rows from the node's cursor. */ static void fetch_more_data(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *)node->fdw_state; PGresult *volatile res = NULL; MemoryContext oldcontext; /* * We'll store the tuples in the batch_cxt. First, flush the previous * batch. */ fsstate->tuples = NULL; MemoryContextReset(fsstate->batch_cxt); oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { PGconn *conn = fsstate->conn; int numrows; int i; char sql[64] = {'\0'}; /* This is a regular synchronous fetch. */ errno_t rc = snprintf_s(sql, sizeof(sql), sizeof(sql), "FETCH %d FROM c%u", fsstate->fetch_size, fsstate->cursor_number); securec_check_ss(rc, "\0", "\0"); res = pgfdw_exec_query(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) { pgfdw_report_error(ERROR, res, conn, false, fsstate->query); } /* Convert the data into HeapTuples */ numrows = PQntuples(res); fsstate->tuples = (HeapTuple *)palloc0(numrows * sizeof(HeapTuple)); fsstate->num_tuples = numrows; fsstate->next_tuple = 0; for (i = 0; i < numrows; i++) { Assert(IsA(node->ss.ps.plan, ForeignScan)); fsstate->tuples[i] = make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, fsstate->retrieved_attrs, node, fsstate->temp_cxt); } /* Update fetch_ct_2 */ if (fsstate->fetch_ct_2 < 2) { fsstate->fetch_ct_2++; } /* Must be EOF if we didn't get as many tuples as we asked for. */ fsstate->eof_reached = (numrows < fsstate->fetch_size); PQclear(res); res = NULL; } PG_CATCH(); { if (res) { PQclear(res); } PG_RE_THROW(); } PG_END_TRY(); MemoryContextSwitchTo(oldcontext); } /* * Force assorted GUC parameters to settings that ensure that we'll output * data values in a form that is unambiguous to the remote server. * * This is rather expensive and annoying to do once per row, but there's * little choice if we want to be sure values are transmitted accurately; * we can't leave the settings in place between rows for fear of affecting * user-visible computations. * * We use the equivalent of a function SET option to allow the settings to * persist only until the caller calls reset_transmission_modes(). If an * error is thrown in between, guc.c will take care of undoing the settings. * * The return value is the nestlevel that must be passed to * reset_transmission_modes() to undo things. */ int set_transmission_modes(void) { int nestlevel = NewGUCNestLevel(); /* * The values set here should match what pg_dump does. See also * configure_remote_session in connection.c. */ if (u_sess->time_cxt.DateStyle != USE_ISO_DATES) { (void)set_config_option("datestyle", "ISO", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0); } if (u_sess->attr.attr_common.IntervalStyle != INTSTYLE_POSTGRES) { (void)set_config_option("intervalstyle", "postgres", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0); } if (u_sess->attr.attr_common.extra_float_digits < 3) { (void)set_config_option("extra_float_digits", "3", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0); } return nestlevel; } /* * Undo the effects of set_transmission_modes(). */ void reset_transmission_modes(int nestlevel) { AtEOXact_GUC(true, nestlevel); } /* * Utility routine to close a cursor. */ static void close_cursor(PGconn *conn, unsigned int cursor_number) { char sql[64]; int rc = snprintf_s(sql, sizeof(sql), sizeof(sql) - 1, "CLOSE c%u", cursor_number); securec_check_ss(rc, "", ""); /* * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ PGresult* res = pgfdw_exec_query(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) { pgfdw_report_error(ERROR, res, conn, true, sql); } PQclear(res); } /* * prepare_foreign_modify * Establish a prepared statement for execution of INSERT/UPDATE/DELETE */ static void prepare_foreign_modify(PgFdwModifyState *fmstate) { char prep_name[NAMEDATALEN]; /* Construct name we'll use for the prepared statement. */ int rc = snprintf_s(prep_name, sizeof(prep_name), sizeof(prep_name) - 1, "pgsql_fdw_prep_%u", GetPrepStmtNumber(fmstate->conn)); securec_check_ss(rc, "", ""); char* p_name = pstrdup(prep_name); /* * We intentionally do not specify parameter types here, but leave the * remote server to derive them by default. This avoids possible problems * with the remote server using different type OIDs than we do. All of * the prepared statements we use in this module are simple enough that * the remote server will make the right choices. */ if (!PQsendPrepare(fmstate->conn, p_name, fmstate->query, 0, NULL)) { pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); } /* * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ PGresult* res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != PGRES_COMMAND_OK) { pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); } PQclear(res); /* This action shows that the prepare has been done. */ fmstate->p_name = p_name; } /* * convert_prep_stmt_params * Create array of text strings representing parameter values * * tupleid is ctid to send, or NULL if none * tableid is tableoid to send, or NULL if none * slot is slot to get remaining parameters from, or NULL if none * * Data is constructed in temp_cxt; caller should reset that after use. */ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, ItemPointer tableid, TupleTableSlot *slot) { int pindex = 0; MemoryContext oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); const char **p_values = (const char **)palloc(sizeof(char *) * fmstate->p_nums); /* 1st parameter should be ctid, if it's in use */ if (tupleid != NULL) { /* don't need set_transmission_modes for TID output */ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid)); pindex++; } /* 2sd parameter should be tableoid, if it's in use */ if (tupleid != NULL) { /* don't need set_transmission_modes for TABLEOID output */ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tableid)); pindex++; } /* get following parameters from slot */ if (slot != NULL && fmstate->target_attrs != NIL) { int nestlevel; ListCell *lc = NULL; nestlevel = set_transmission_modes(); foreach (lc, fmstate->target_attrs) { int attnum = lfirst_int(lc); Datum value; bool isnull = false; value = tableam_tslot_getattr(slot, attnum, &isnull); if (isnull) { p_values[pindex] = NULL; } else { p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], value); } pindex++; } reset_transmission_modes(nestlevel); } Assert(pindex == fmstate->p_nums); (void)MemoryContextSwitchTo(oldcontext); return p_values; } /* * store_returning_result * Store the result of a RETURNING clause * * On error, be sure to release the PGresult on the way out. Callers do not * have PG_TRY blocks to ensure this happens. */ static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res) { /* PGresult must be released before leaving this function. */ PG_TRY(); { HeapTuple newtup; newtup = make_tuple_from_result_row(res, 0, fmstate->rel, fmstate->attinmeta, fmstate->retrieved_attrs, NULL, fmstate->temp_cxt); /* tuple will be deleted when it is cleared from the slot */ (void)ExecStoreTuple(newtup, slot, InvalidBuffer, true); } PG_CATCH(); { if (res) { PQclear(res); } PG_RE_THROW(); } PG_END_TRY(); } /* * Prepare for processing of parameters used in remote query. */ static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values) { int i; ListCell *lc = NULL; Assert(numParams > 0); /* Prepare for output conversion of parameters used in remote query. */ *param_flinfo = (FmgrInfo *)palloc0(sizeof(FmgrInfo) * numParams); i = 0; foreach (lc, fdw_exprs) { Node *param_expr = (Node *)lfirst(lc); Oid typefnoid; bool isvarlena; getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); fmgr_info(typefnoid, &(*param_flinfo)[i]); i++; } /* * Prepare remote-parameter expressions for evaluation. (Note: in * practice, we expect that all these expressions will be just Params, so * we could possibly do something more efficient than using the full * expression-eval machinery for this. But probably there would be little * benefit, and it'd require postgres_fdw to know more than is desirable * about Param evaluation.) */ *param_exprs = ExecInitExprList(fdw_exprs, node); /* Allocate buffer for text form of query parameters. */ *param_values = (const char **)palloc0(numParams * sizeof(char *)); } /* * Construct array of query parameter values in text format. */ static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values) { int i; ListCell *lc = NULL; int nestlevel = set_transmission_modes(); i = 0; foreach (lc, param_exprs) { ExprState *expr_state = (ExprState *)lfirst(lc); Datum expr_value; bool isNull = false; /* Evaluate the parameter expression */ expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL); /* * Get string representation of each parameter value by invoking * type-specific output function, unless the value is null. */ if (isNull) { param_values[i] = NULL; } else { param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value); } i++; } reset_transmission_modes(nestlevel); } /* * postgresAnalyzeForeignTable * Test whether analyzing this foreign table is supported */ static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages, void *additionalData, bool estimate_table_rownum) { StringInfoData sql; PGresult *volatile res = NULL; /* Return the row-analysis function pointer */ *func = postgresAcquireSampleRowsFunc; /* * Now we have to get the number of pages. It's annoying that the ANALYZE * API requires us to return that now, because it forces some duplication * of effort between this routine and postgresAcquireSampleRowsFunc. But * it's probably not worth redefining that API at this point. * Get the connection to use. We do the remote access as the table's * owner, even if the ANALYZE was started by some other user. */ ForeignTable* table = GetForeignTable(RelationGetRelid(relation)); ForeignServer* server = GetForeignServer(table->serverid); UserMapping* user = GetUserMapping(relation->rd_rel->relowner, server->serverid); PGconn* conn = GetConnection(server, user, false); /* * Construct command to get page count for relation. */ initStringInfo(&sql); deparseAnalyzeSizeSql(&sql, relation); /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { res = pgfdw_exec_query(conn, sql.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) { pgfdw_report_error(ERROR, res, conn, false, sql.data); } if (PQntuples(res) != 1 || PQnfields(res) != 1) { elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query"); } *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10); PQclear(res); res = NULL; } PG_CATCH(); { if (res) { PQclear(res); } PG_RE_THROW(); } PG_END_TRY(); ReleaseConnection(conn); return true; } /* * Acquire a random sample of rows from foreign table managed by postgres_fdw. * * We fetch the whole table from the remote side and pick out some sample rows. * * Selected rows are returned in the caller-allocated array rows[], * which must have at least targrows entries. * The actual number of rows selected is returned as the function result. * We also count the total number of rows in the table and return it into * *totalrows. Note that *totaldeadrows is always set to 0. * * Note that the returned list of rows is not always in order by physical * position in the table. Therefore, correlation estimates derived later * may be meaningless, but it's OK because we don't use the estimates * currently (the planner only pays attention to correlation for indexscans). */ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows, void *additionalData, bool estimate_table_rownum) { PgFdwAnalyzeState astate; unsigned int cursor_number; StringInfoData sql; PGresult *volatile res = NULL; /* Initialize workspace state */ astate.rel = relation; astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation)); astate.rows = rows; astate.targrows = targrows; astate.numrows = 0; astate.samplerows = 0; astate.rowstoskip = -1; /* -1 means not set yet */ astate.rstate = anl_init_selection_state(targrows); /* Remember ANALYZE context, and create a per-tuple temp context */ astate.anl_cxt = CurrentMemoryContext; astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext, "postgres_fdw temporary data", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); /* * Get the connection to use. We do the remote access as the table's * owner, even if the ANALYZE was started by some other user. */ ForeignTable* table = GetForeignTable(RelationGetRelid(relation)); ForeignServer* server = GetForeignServer(table->serverid); UserMapping* user = GetUserMapping(relation->rd_rel->relowner, server->serverid); PGconn* conn = GetConnection(server, user, false); /* * Construct cursor that retrieves whole rows from remote. */ cursor_number = GetCursorNumber(conn); initStringInfo(&sql); appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number); deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs); /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { res = pgfdw_exec_query(conn, sql.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) { pgfdw_report_error(ERROR, res, conn, false, sql.data); } PQclear(res); res = NULL; /* Retrieve and process rows a batch at a time. */ for (;;) { char fetch_sql[64] = {'\0'}; int fetch_size; int numrows; int i; /* Allow users to cancel long query */ CHECK_FOR_INTERRUPTS(); /* * XXX possible future improvement: if rowstoskip is large, we * could issue a MOVE rather than physically fetching the rows, * then just adjust rowstoskip and samplerows appropriately. * The fetch size is arbitrary, but shouldn't be enormous. */ fetch_size = 100; /* Fetch some rows */ int rc = snprintf_s(fetch_sql, sizeof(fetch_sql), sizeof(fetch_sql) - 1, "FETCH %d FROM c%u", fetch_size, cursor_number); securec_check_ss(rc, "", ""); res = pgfdw_exec_query(conn, fetch_sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) { pgfdw_report_error(ERROR, res, conn, false, sql.data); } /* Process whatever we got. */ numrows = PQntuples(res); for (i = 0; i < numrows; i++) { analyze_row_processor(res, i, &astate); } PQclear(res); res = NULL; /* Must be EOF if we didn't get all the rows requested. */ if (numrows < fetch_size) { break; } } /* Close the cursor, just to be tidy. */ close_cursor(conn, cursor_number); } PG_CATCH(); { if (res) { PQclear(res); } PG_RE_THROW(); } PG_END_TRY(); ReleaseConnection(conn); /* We assume that we have no dead tuple. */ *totaldeadrows = 0.0; /* We've retrieved all living tuples from foreign server. */ *totalrows = astate.samplerows; /* * Emit some interesting relation info */ ereport(elevel, (errmsg("\"%s\": table contains %.0f rows, %d rows in sample", RelationGetRelationName(relation), astate.samplerows, astate.numrows))); return astate.numrows; } /* * Collect sample rows from the result of query. * - Use all tuples in sample until target # of samples are collected. * - Subsequently, replace already-sampled tuples randomly. */ static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) { int targrows = astate->targrows; int pos; /* array index to store tuple in */ /* Always increment sample row counter. */ astate->samplerows += 1; /* * Determine the slot where this sample row should be stored. Set pos to * negative value to indicate the row should be skipped. */ if (astate->numrows < targrows) { /* First targrows rows are always included into the sample */ pos = astate->numrows++; } else { /* * Now we start replacing tuples in the sample until we reach the end * of the relation. Same algorithm as in acquire_sample_rows in * analyze.c; see Jeff Vitter's paper. */ if (astate->rowstoskip < 0) { astate->rowstoskip = anl_get_next_S(astate->samplerows, targrows, &astate->rstate); } if (astate->rowstoskip <= 0) { /* Choose a random reservoir element to replace. */ pos = (int)(targrows * anl_random_fract()); Assert(pos >= 0 && pos < targrows); heap_freetuple(astate->rows[pos]); } else { /* Skip this tuple. */ pos = -1; } astate->rowstoskip -= 1; } if (pos >= 0) { /* * Create sample tuple from current result row, and store it in the * position determined above. The tuple has to be created in anl_cxt. */ MemoryContext oldcontext = MemoryContextSwitchTo(astate->anl_cxt); astate->rows[pos] = make_tuple_from_result_row(res, row, astate->rel, astate->attinmeta, astate->retrieved_attrs, NULL, astate->temp_cxt); (void)MemoryContextSwitchTo(oldcontext); } } /* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is * conversion data for the rel's tupdesc, and retrieved_attrs is an * integer list of the table column numbers present in the PGresult. * fsstate is the ForeignScan plan node's execution state. * temp_context is a working context that can be reset after each tuple. * * Note: either rel or fsstate, but not both, can be NULL. rel is NULL * if we're processing a remote join, while fsstate is NULL in a non-query * context such as ANALYZE, or if we're processing a non-scan query node. */ static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context) { HeapTuple tuple; TupleDesc tupdesc; Datum *values = NULL; bool *nulls = NULL; ItemPointer ctid = NULL; Oid oid = InvalidOid, tableoid = InvalidOid; ConversionLocation errpos; ErrorContextCallback errcallback; MemoryContext oldcontext; ListCell *lc = NULL; int j; Assert(row < PQntuples(res)); /* * Do the following work in a temp context that we reset after each tuple. * This cleans up not only the data we have direct access to, but any * cruft the I/O functions might leak. */ oldcontext = MemoryContextSwitchTo(temp_context); /* * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is * provided, otherwise look to the scan node's ScanTupleSlot. */ if (rel) { tupdesc = RelationGetDescr(rel); } else { Assert(fsstate); tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; } values = (Datum *)palloc0(tupdesc->natts * sizeof(Datum)); nulls = (bool *)palloc(tupdesc->natts * sizeof(bool)); /* Initialize to nulls for any columns not present in result */ errno_t rc = memset_s(nulls, tupdesc->natts * sizeof(bool), true, tupdesc->natts * sizeof(bool)); securec_check(rc, "\0", "\0"); /* * Set up and install callback to report where conversion error occurs. */ errpos.cur_attno = 0; errpos.rel = rel; errpos.fsstate = fsstate; errcallback.callback = conversion_error_callback; errcallback.arg = (void *)&errpos; errcallback.previous = t_thrd.log_cxt.error_context_stack; t_thrd.log_cxt.error_context_stack = &errcallback; /* * i indexes columns in the relation, j indexes columns in the PGresult. */ j = 0; foreach (lc, retrieved_attrs) { int i = lfirst_int(lc); char *valstr = NULL; /* fetch next column's textual value */ if (PQgetisnull(res, row, j)) { valstr = NULL; } else { valstr = PQgetvalue(res, row, j); } /* * convert value to internal representation * * Note: we ignore system columns other than ctid and oid in result */ errpos.cur_attno = i; if (i > 0) { /* ordinary column */ Assert(i <= tupdesc->natts); nulls[i - 1] = (valstr == NULL); /* Apply the input function even to nulls, to support domains */ values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1], valstr, attinmeta->attioparams[i - 1], attinmeta->atttypmods[i - 1]); } else if (i == SelfItemPointerAttributeNumber) { /* ctid */ if (valstr != NULL) { Datum datum; datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); ctid = (ItemPointer)DatumGetPointer(datum); } } else if (i == ObjectIdAttributeNumber) { /* oid */ if (valstr != NULL) { Datum datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr)); oid = DatumGetObjectId(datum); } } else if (i == TableOidAttributeNumber) { /* tableoid */ if (valstr != NULL) { Datum datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr)); tableoid = DatumGetObjectId(datum); } } errpos.cur_attno = 0; j++; } /* Uninstall error context callback. */ t_thrd.log_cxt.error_context_stack = errcallback.previous; /* * Check we got the expected number of columns. Note: j == 0 and * PQnfields == 1 is expected, since deparse emits a NULL if no columns. */ if (j > 0 && j != PQnfields(res)) { elog(ERROR, "remote query result does not match the foreign table"); } /* * Build the result tuple in caller's memory context. */ MemoryContextSwitchTo(oldcontext); tuple = heap_form_tuple(tupdesc, values, nulls); /* * If we have a CTID to return, install it in both t_self and t_ctid. * t_self is the normal place, but if the tuple is converted to a * composite Datum, t_self will be lost; setting t_ctid allows CTID to be * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code). */ if (ctid) { tuple->t_self = tuple->t_data->t_ctid = *ctid; } /* * If we have an OID to return, install it. */ if (OidIsValid(oid)) { HeapTupleSetOid(tuple, oid); } /* * If we have an TABLEOID to return, install it. */ if (OidIsValid(tableoid)) { tuple->t_tableOid = tableoid; } /* Clean up */ MemoryContextReset(temp_context); return tuple; } /* * Callback function which is called when error occurs during column value * conversion. Print names of column and relation. * * Note that this function mustn't do any catalog lookups, since we are in * an already-failed transaction. Fortunately, we can get the needed info * from the relation or the query's rangetable instead. */ static void conversion_error_callback(void *arg) { ConversionLocation *errpos = (ConversionLocation *)arg; Relation rel = errpos->rel; ForeignScanState *fsstate = errpos->fsstate; const char *attname = NULL; const char *relname = NULL; bool is_wholerow = false; /* * If we're in a scan node, always use aliases from the rangetable, for * consistency between the simple-relation and remote-join cases. Look at * the relation's tupdesc only if we're not in a scan node. */ if (fsstate) { /* ForeignScan case */ ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan); int varno = 0; AttrNumber colno = 0; if (fsplan->scan.scanrelid > 0) { /* error occurred in a scan against a foreign table */ varno = fsplan->scan.scanrelid; colno = errpos->cur_attno; } else { /* error occurred in a scan against a foreign join */ TargetEntry *tle = NULL; tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, errpos->cur_attno - 1); /* * Target list can have Vars and expressions. For Vars, we can * get some information, however for expressions we can't. Thus * for expressions, just show generic context message. */ if (IsA(tle->expr, Var)) { Var *var = (Var *)tle->expr; varno = var->varno; colno = var->varattno; } } if (varno > 0) { EState *estate = fsstate->ss.ps.state; RangeTblEntry *rte = exec_rt_fetch(varno, estate); relname = rte->eref->aliasname; if (colno == 0) { is_wholerow = true; } else if (colno > 0 && colno <= list_length(rte->eref->colnames)) { attname = strVal(list_nth(rte->eref->colnames, colno - 1)); } else if (colno == SelfItemPointerAttributeNumber) { attname = "ctid"; } } } else if (rel) { /* Non-ForeignScan case (we should always have a rel here) */ TupleDesc tupdesc = RelationGetDescr(rel); relname = RelationGetRelationName(rel); if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) { Form_pg_attribute attr = TupleDescAttr(tupdesc, errpos->cur_attno - 1); attname = NameStr(attr->attname); } else if (errpos->cur_attno == SelfItemPointerAttributeNumber) { attname = "ctid"; } } if (relname && is_wholerow){ errcontext("whole-row reference to foreign table \"%s\"", relname); } else if (relname && attname) { errcontext("column \"%s\" of foreign table \"%s\"", attname, relname); } else { errcontext("processing expression at position %d in select list", errpos->cur_attno); } } /* * Given an EquivalenceClass and a foreign relation, find an EC member * that can be used to sort the relation remotely according to a pathkey * using this EC. * * If there is more than one suitable candidate, return an arbitrary * one of them. If there is none, return NULL. * * This checks that the EC member expression uses only Vars from the given * rel and is shippable. Caller must separately verify that the pathkey's * ordering operator is shippable. */ EquivalenceMember *find_em_for_rel(PlannerInfo *root, EquivalenceClass *ec, RelOptInfo *rel) { ListCell *lc = NULL; foreach (lc, ec->ec_members) { EquivalenceMember *em = (EquivalenceMember *)lfirst(lc); /* * Note we require !bms_is_empty, else we'd accept constant * expressions which are not suitable for the purpose. */ if (bms_is_subset(em->em_relids, rel->relids) && !bms_is_empty(em->em_relids) && is_foreign_expr(root, rel, em->em_expr)) { return em; } } return NULL; } /* * Find an EquivalenceClass member that is to be computed as a sort column * in the given rel's reltarget, and is shippable. * * If there is more than one suitable candidate, return an arbitrary * one of them. If there is none, return NULL. * * This checks that the EC member expression uses only Vars from the given * rel and is shippable. Caller must separately verify that the pathkey's * ordering operator is shippable. */ EquivalenceMember *find_em_for_rel_target(PlannerInfo *root, EquivalenceClass *ec, RelOptInfo *rel) { PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *)rel->fdw_private; List* target = fpinfo->complete_tlist; ListCell *lc1 = NULL; int i; i = 0; foreach (lc1, target) { TargetEntry* te = (TargetEntry*)lfirst(lc1); Expr *expr = te->expr; Index sgref = te->ressortgroupref; ListCell *lc2; /* Ignore non-sort expressions */ if (sgref == 0 || get_sortgroupref_clause_noerr(sgref, root->parse->sortClause) == NULL) { i++; continue; } /* We ignore binary-compatible relabeling on both ends */ while (expr && IsA(expr, RelabelType)) { expr = ((RelabelType *)expr)->arg; } /* Locate an EquivalenceClass member matching this expr, if any */ foreach (lc2, ec->ec_members) { EquivalenceMember *em = (EquivalenceMember *)lfirst(lc2); Expr *em_expr = NULL; /* Don't match constants */ if (em->em_is_const) { continue; } /* Ignore child members */ if (em->em_is_child) { continue; } /* Match if same expression (after stripping relabel) */ em_expr = em->em_expr; while (em_expr && IsA(em_expr, RelabelType)) { em_expr = ((RelabelType *)em_expr)->arg; } if (!equal(em_expr, expr)) { continue; } /* Check that expression (including relabels!) is shippable */ if (is_foreign_expr(root, rel, em->em_expr)) { return em; } } i++; } return NULL; } /* * Assess whether the join between inner and outer relations can be pushed down * to the foreign server. As a side effect, save information we obtain in this * function to PgFdwRelationInfo passed in. */ static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, List* restrictlist) { PgFdwRelationInfo *fpinfo = NULL; PgFdwRelationInfo *fpinfo_o = NULL; PgFdwRelationInfo *fpinfo_i = NULL; ListCell *lc = NULL; List *joinclauses = NIL; /* * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins. * Constructing queries representing SEMI and ANTI joins is hard, hence * not considered right now. */ if (jointype != JOIN_INNER && jointype != JOIN_LEFT && jointype != JOIN_RIGHT && jointype != JOIN_FULL) { return false; } /* * If either of the joining relations is marked as unsafe to pushdown, the * join can not be pushed down. */ fpinfo = (PgFdwRelationInfo *)joinrel->fdw_private; fpinfo_o = (PgFdwRelationInfo *)outerrel->fdw_private; fpinfo_i = (PgFdwRelationInfo *)innerrel->fdw_private; if (!fpinfo_o || !fpinfo_o->pushdown_safe || !fpinfo_i || !fpinfo_i->pushdown_safe) { return false; } /* * If joining relations have local conditions, those conditions are * required to be applied before joining the relations. Hence the join can * not be pushed down. */ if (fpinfo_o->local_conds || fpinfo_i->local_conds) { return false; } /* * Merge FDW options. We might be tempted to do this after we have deemed * the foreign join to be OK. But we must do this beforehand so that we * know which quals can be evaluated on the foreign server, which might * depend on shippable_extensions. */ fpinfo->server = fpinfo_o->server; merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i); /* * Separate restrict list into join quals and pushed-down (other) quals. * * Join quals belonging to an outer join must all be shippable, else we * cannot execute the join remotely. Add such quals to 'joinclauses'. * * Add other quals to fpinfo->remote_conds if they are shippable, else to * fpinfo->local_conds. In an inner join it's okay to execute conditions * either locally or remotely; the same is true for pushed-down conditions * at an outer join. * * Note we might return failure after having already scribbled on * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we * won't consult those lists again if we deem the join unshippable. */ joinclauses = NIL; foreach (lc, restrictlist) { RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); bool is_remote_clause = is_foreign_expr(root, joinrel, rinfo->clause); if (IS_OUTER_JOIN(jointype) && !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids)) { if (!is_remote_clause) { return false; } joinclauses = lappend(joinclauses, rinfo); } else { if (is_remote_clause) { fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo); } else { fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo); } } } /* * deparseExplicitTargetList() isn't smart enough to handle anything other * than a Var. In particular, if there's some PlaceHolderVar that would * need to be evaluated within this join tree (because there's an upper * reference to a quantity that may go to NULL as a result of an outer * join), then we can't try to push the join down because we'll fail when * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that * needs to be evaluated *at the top* of this join tree is OK, because we * can do that locally after fetching the results from the remote side. */ foreach (lc, root->placeholder_list) { PlaceHolderInfo *phinfo = (PlaceHolderInfo*)lfirst(lc); Relids relids; /* PlaceHolderInfo refers to parent relids, not child relids. */ relids = joinrel->relids; if (bms_is_subset(phinfo->ph_eval_at, relids) && bms_nonempty_difference(relids, phinfo->ph_eval_at)) { return false; } } /* Save the join clauses, for later use. */ fpinfo->joinclauses = joinclauses; fpinfo->outerrel = outerrel; fpinfo->innerrel = innerrel; fpinfo->jointype = jointype; /* * By default, both the input relations are not required to be deparsed as * subqueries, but there might be some relations covered by the input * relations that are required to be deparsed as subqueries, so save the * relids of those relations for later use by the deparser. */ fpinfo->make_outerrel_subquery = false; fpinfo->make_innerrel_subquery = false; Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids)); Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids)); fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels, fpinfo_i->lower_subquery_rels); /* * Pull the other remote conditions from the joining relations into join * clauses or other remote clauses (remote_conds) of this relation * wherever possible. This avoids building subqueries at every join step. * * For an inner join, clauses from both the relations are added to the * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from * the outer side are added to remote_conds since those can be evaluated * after the join is evaluated. The clauses from inner side are added to * the joinclauses, since they need to be evaluated while constructing the * join. * * For a FULL OUTER JOIN, the other clauses from either relation can not * be added to the joinclauses or remote_conds, since each relation acts * as an outer relation for the other. * * The joining sides can not have local conditions, thus no need to test * shippability of the clauses being pulled up. * * The foreign join path is not necessarily better than join foreign scan * path, so we should use list_concat2. */ switch (jointype) { case JOIN_INNER: fpinfo->remote_conds = list_concat2(fpinfo->remote_conds, fpinfo_i->remote_conds); fpinfo->remote_conds = list_concat2(fpinfo->remote_conds, fpinfo_o->remote_conds); break; case JOIN_LEFT: fpinfo->joinclauses = list_concat2(fpinfo->joinclauses, fpinfo_i->remote_conds); fpinfo->remote_conds = list_concat2(fpinfo->remote_conds, fpinfo_o->remote_conds); break; case JOIN_RIGHT: fpinfo->joinclauses = list_concat2(fpinfo->joinclauses, fpinfo_o->remote_conds); fpinfo->remote_conds = list_concat2(fpinfo->remote_conds, fpinfo_i->remote_conds); break; case JOIN_FULL: /* * In this case, if any of the input relations has conditions, we * need to deparse that relation as a subquery so that the * conditions can be evaluated before the join. Remember it in * the fpinfo of this relation so that the deparser can take * appropriate action. Also, save the relids of base relations * covered by that relation for later use by the deparser. */ if (fpinfo_o->remote_conds) { fpinfo->make_outerrel_subquery = true; fpinfo->lower_subquery_rels = bms_add_members(fpinfo->lower_subquery_rels, outerrel->relids); } if (fpinfo_i->remote_conds) { fpinfo->make_innerrel_subquery = true; fpinfo->lower_subquery_rels = bms_add_members(fpinfo->lower_subquery_rels, innerrel->relids); } break; default: /* Should not happen, we have just checked this above */ elog(ERROR, "unsupported join type %d", jointype); } /* * For an inner join, all restrictions can be treated alike. Treating the * pushed down conditions as join conditions allows a top level full outer * join to be deparsed without requiring subqueries. */ if (jointype == JOIN_INNER) { Assert(!fpinfo->joinclauses); fpinfo->joinclauses = fpinfo->remote_conds; fpinfo->remote_conds = NIL; } /* Mark that this join can be pushed down safely */ fpinfo->pushdown_safe = true; /* Get user mapping */ if (fpinfo->use_remote_estimate) { if (fpinfo_o->use_remote_estimate) { fpinfo->user = fpinfo_o->user; } else { fpinfo->user = fpinfo_i->user; } } else { fpinfo->user = NULL; } /* * Set # of retrieved rows and cached relation costs to some negative * value, so that we can detect when they are set to some sensible values, * during one (usually the first) of the calls to estimate_path_cost_size. */ fpinfo->retrieved_rows = -1; fpinfo->rel_startup_cost = -1; fpinfo->rel_total_cost = -1; /* * Set the string describing this join relation to be used in EXPLAIN * output of corresponding ForeignScan. Note that the decoration we add * to the base relation names mustn't include any digits, or it'll confuse * postgresExplainForeignScan. */ fpinfo->relation_name = psprintf("(%s) %s JOIN (%s)", fpinfo_o->relation_name, get_jointype_name(fpinfo->jointype), fpinfo_i->relation_name); /* * Set the relation index. This is defined as the position of this * joinrel in the join_rel_list list plus the length of the rtable list. * Note that since this joinrel is at the end of the join_rel_list list * when we are called, we can get the position by list_length. */ Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */ fpinfo->relation_index = list_length(root->parse->rtable) + list_length(root->join_rel_list); return true; } static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path) { List *useful_pathkeys_list = NIL; /* List of all pathkeys */ ListCell *lc = NULL; useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel); /* Create one path for each set of pathkeys we found above. */ foreach (lc, useful_pathkeys_list) { double rows; int width; Cost startup_cost; Cost total_cost; List *useful_pathkeys = (List *)lfirst(lc); estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL, &rows, &width, &startup_cost, &total_cost); /* * The EPQ path must be at least as well sorted as the path itself, in * case it gets used as input to a mergejoin. */ /* but openGauss not support create_sort_path(). */ Assert(epq_path == NULL); if (IS_SIMPLE_REL(rel)) { add_path(NULL, rel, (Path *)create_foreignscan_path(root, rel, startup_cost, total_cost, useful_pathkeys, rel->lateral_relids, epq_path, NIL)); } else { add_path(NULL, rel, (Path *)create_foreign_join_path(root, rel, NULL, rows, startup_cost, total_cost, useful_pathkeys, rel->lateral_relids, epq_path, NIL)); } } } /* * Parse options from foreign server and apply them to fpinfo. * * New options might also require tweaking merge_fdw_options(). */ static void apply_server_options(PgFdwRelationInfo *fpinfo) { ListCell *lc = NULL; foreach (lc, fpinfo->server->options) { DefElem *def = (DefElem *)lfirst(lc); if (strcmp(def->defname, "use_remote_estimate") == 0) { fpinfo->use_remote_estimate = defGetBoolean(def); } else if (strcmp(def->defname, "fdw_startup_cost") == 0) { (void)parse_real(defGetString(def), &fpinfo->fdw_startup_cost, 0, NULL); } else if (strcmp(def->defname, "fdw_tuple_cost") == 0) { (void)parse_real(defGetString(def), &fpinfo->fdw_tuple_cost, 0, NULL); } else if (strcmp(def->defname, "fetch_size") == 0) { (void)parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); } } } /* * Parse options from foreign table and apply them to fpinfo. * * New options might also require tweaking merge_fdw_options(). */ static void apply_table_options(PgFdwRelationInfo *fpinfo) { ListCell *lc = NULL; foreach (lc, fpinfo->table->options) { DefElem *def = (DefElem *)lfirst(lc); if (strcmp(def->defname, "use_remote_estimate") == 0) { fpinfo->use_remote_estimate = defGetBoolean(def); } else if (strcmp(def->defname, "fetch_size") == 0) { (void)parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); } } } /* * Merge FDW options from input relations into a new set of options for a join * or an upper rel. * * For a join relation, FDW-specific information about the inner and outer * relations is provided using fpinfo_i and fpinfo_o. For an upper relation, * fpinfo_o provides the information for the input relation; fpinfo_i is * expected to NULL. */ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i) { /* We must always have fpinfo_o. */ Assert(fpinfo_o); /* fpinfo_i may be NULL, but if present the servers must both match. */ Assert(!fpinfo_i || fpinfo_i->server->serverid == fpinfo_o->server->serverid); /* * Copy the server specific FDW options. (For a join, both relations come * from the same server, so the server options should have the same value * for both relations.) */ fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost; fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost; fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; fpinfo->fetch_size = fpinfo_o->fetch_size; fpinfo->async_capable = fpinfo_o->async_capable; /* Merge the table level options from either side of the join. */ if (fpinfo_i) { /* * We'll prefer to use remote estimates for this join if any table * from either side of the join is using remote estimates. This is * most likely going to be preferred since they're already willing to * pay the price of a round trip to get the remote EXPLAIN. In any * case it's not entirely clear how we might otherwise handle this * best. */ fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate || fpinfo_i->use_remote_estimate; /* * Set fetch size to maximum of the joining sides, since we are * expecting the rows returned by the join to be proportional to the * relation sizes. */ fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); /* * We'll prefer to consider this join async-capable if any table from * either side of the join is considered async-capable. This would be * reasonable because in that case the foreign server would have its * own resources to scan that table asynchronously, and the join could * also be computed asynchronously using the resources. */ fpinfo->async_capable = fpinfo_o->async_capable || fpinfo_i->async_capable; } } /* * postgresGetForeignJoinPaths * Add possible ForeignPath to joinrel, if join is safe to push down. */ static void postgresGetForeignJoinPaths(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinType jointype, SpecialJoinInfo* sjinfo, List* restrictlist) { PgFdwRelationInfo *fpinfo = NULL; ForeignPath *joinpath = NULL; double rows; int width; Cost startup_cost; Cost total_cost; /* * Skip if this join combination has been considered already. */ if (joinrel->fdw_private) { return; } /* * This code does not work for joins with lateral references, since those * must have parameterized paths, which we don't generate yet. */ if (!bms_is_empty(joinrel->lateral_relids)) { return; } /* * Create unfinished PgFdwRelationInfo entry which is used to indicate * that the join relation is already considered, so that we won't waste * time in judging safety of join pushdown and adding the same paths again * if found safe. Once we know that this join can be pushed down, we fill * the entry. */ fpinfo = (PgFdwRelationInfo *)palloc0(sizeof(PgFdwRelationInfo)); fpinfo->pushdown_safe = false; joinrel->fdw_private = fpinfo; /* attrs_used is only for base relations. */ fpinfo->attrs_used = NULL; /* * If there is a possibility that EvalPlanQual will be executed, we need * to be able to reconstruct the row using scans of the base relations. * GetExistingLocalJoinPath will find a suitable path for this purpose in * the path list of the joinrel, if one exists. We must be careful to * call it before adding any ForeignPath, since the ForeignPath might * dominate the only suitable local path available. We also do it before * calling foreign_join_ok(), since that function updates fpinfo and marks * it as pushable if the join is found to be pushable. * * currently not support. */ if (root->parse->commandType == CMD_DELETE || root->parse->commandType == CMD_UPDATE || root->rowMarks) { ereport(DEBUG3, (errmsg("could not push down foreign join because the EPQ maybe executed."))); return; } if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, restrictlist)) { return; } /* * Compute the selectivity and cost of the local_conds, so we don't have * to do it over again for each path. The best we can do for these * conditions is to estimate selectivity on the basis of local statistics. * The local conditions are applied after the join has been computed on * the remote side like quals in WHERE clause, so pass jointype as * JOIN_INNER. */ fpinfo->local_conds_sel = clauselist_selectivity(root, fpinfo->local_conds, 0, JOIN_INNER, NULL); cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); /* * If we are going to estimate costs locally, estimate the join clause * selectivity here while we have special join info. */ if (!fpinfo->use_remote_estimate) { fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses, 0, fpinfo->jointype, sjinfo); } /* Estimate costs for bare join relation */ estimate_path_cost_size(root, joinrel, NIL, NIL, NULL, &rows, &width, &startup_cost, &total_cost); /* Now update this information in the joinrel */ joinrel->rows = rows; joinrel->reltarget->width = width; fpinfo->rows = rows; fpinfo->width = width; fpinfo->startup_cost = startup_cost; fpinfo->total_cost = total_cost; /* * Create a new join path and add it to the joinrel which represents a * join between foreign tables. */ joinpath = create_foreign_join_path(root, joinrel, NULL, /* default pathtarget */ rows, startup_cost, total_cost, NIL, /* no pathkeys */ joinrel->lateral_relids, NULL, NIL); /* no fdw_private */ /* Add generated path into joinrel by add_path(). */ add_path(NULL, joinrel, (Path *)joinpath); /* * Consider pathkeys for the join relation. */ add_paths_with_pathkeys_for_rel(root, joinrel, NULL); /* XXX Consider parameterized paths for the join relation */ } /* * Assess whether the aggregation, grouping and having operations can be pushed * down to the foreign server. As a side effect, save information we obtain in * this function to PgFdwRelationInfo of the input relation. */ static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, Node *havingQual, GroupPathExtraData *extra) { Query *query = root->parse; PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *)grouped_rel->fdw_private; List *grouping_target = extra->targetList; PgFdwRelationInfo *ofpinfo = NULL; ListCell *lc = NULL; int i; List *tlist = NIL; /* We currently don't support pushing Grouping Sets. */ if (query->groupingSets) { return false; } /* Get the fpinfo of the underlying scan relation. */ ofpinfo = (PgFdwRelationInfo *)fpinfo->outerrel->fdw_private; /* * If underlying scan relation has any local conditions, those conditions * are required to be applied before performing aggregation. Hence the * aggregate cannot be pushed down. */ if (ofpinfo->local_conds) { return false; } /* * Examine grouping expressions, as well as other expressions we'd need to * compute, and check whether they are safe to push down to the foreign * server. All GROUP BY expressions will be part of the grouping target * and thus there is no need to search for them separately. Add grouping * expressions into target list which will be passed to foreign server. * * A tricky fine point is that we must not put any expression into the * target list that is just a foreign param (that is, something that * deparse.c would conclude has to be sent to the foreign server). If we * do, the expression will also appear in the fdw_exprs list of the plan * node, and setrefs.c will get confused and decide that the fdw_exprs * entry is actually a reference to the fdw_scan_tlist entry, resulting in * a broken plan. Somewhat oddly, it's OK if the expression contains such * a node, as long as it's not at top level; then no match is possible. */ i = 0; foreach (lc, grouping_target) { TargetEntry* tle = (TargetEntry*)lfirst(lc); Expr *expr = tle->expr; Index sgref = tle->ressortgroupref; ListCell *l = NULL; /* Check whether this expression is part of GROUP BY clause */ if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause)) { /* * If any GROUP BY expression is not shippable, then we cannot * push down aggregation to the foreign server. */ if (!is_foreign_expr(root, grouped_rel, expr)) { return false; } /* * If it would be a foreign param, we can't put it into the tlist, * so we have to fail. */ if (is_foreign_param(root, grouped_rel, expr)) { return false; } /* * Pushable, so add to tlist. We need to create a TLE for this * expression and apply the sortgroupref to it. We cannot use * add_to_flat_tlist() here because that avoids making duplicate * entries in the tlist. If there are duplicate entries with * distinct sortgrouprefs, we have to duplicate that situation in * the output tlist. */ TargetEntry* new_tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false); new_tle->ressortgroupref = sgref; tlist = lappend(tlist, new_tle); } else { /* * Non-grouping expression we need to compute. Can we ship it * as-is to the foreign server? */ if (is_foreign_expr(root, grouped_rel, expr) && !is_foreign_param(root, grouped_rel, expr)) { /* Yes, so add to tlist as-is; OK to suppress duplicates */ tlist = add_to_flat_tlist(tlist, list_make1(expr)); } else { /* Not pushable as a whole; extract its Vars and aggregates */ List *aggvars = NIL; aggvars = pull_var_clause((Node *)expr, PVC_INCLUDE_AGGREGATES, PVC_REJECT_PLACEHOLDERS); /* * If any aggregate expression is not shippable, then we * cannot push down aggregation to the foreign server. (We * don't have to check is_foreign_param, since that certainly * won't return true for any such expression.) */ if (!is_foreign_expr(root, grouped_rel, (Expr *)aggvars)) { return false; } /* * Add aggregates, if any, into the targetlist. Plain Vars * outside an aggregate can be ignored, because they should be * either same as some GROUP BY column or part of some GROUP * BY expression. In either case, they are already part of * the targetlist and thus no need to add them again. In fact * including plain Vars in the tlist when they do not match a * GROUP BY column would cause the foreign server to complain * that the shipped query is invalid. */ foreach (l, aggvars) { Expr *expr = (Expr *)lfirst(l); if (IsA(expr, Aggref)) { tlist = add_to_flat_tlist(tlist, list_make1(expr)); } } } } i++; } /* * Classify the pushable and non-pushable HAVING clauses and save them in * remote_conds and local_conds of the grouped rel's fpinfo. */ if (havingQual) { ListCell *lc = NULL; foreach (lc, (List *)havingQual) { Expr *expr = (Expr *)lfirst(lc); RestrictInfo *rinfo = NULL; /* * Currently, the core code doesn't wrap havingQuals in * RestrictInfos, so we must make our own. */ Assert(!IsA(expr, RestrictInfo)); rinfo = make_restrictinfo(expr, true, false, false, root->qualSecurityLevel, grouped_rel->relids, NULL, NULL); if (is_foreign_expr(root, grouped_rel, expr)) { fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo); } else { fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo); } } } /* * If there are any local conditions, pull Vars and aggregates from it and * check whether they are safe to pushdown or not. */ if (fpinfo->local_conds) { List *aggvars = NIL; ListCell *lc = NULL; foreach (lc, fpinfo->local_conds) { RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); aggvars = list_concat(aggvars, pull_var_clause((Node *)rinfo->clause, PVC_INCLUDE_AGGREGATES, PVC_REJECT_PLACEHOLDERS)); } foreach (lc, aggvars) { Expr *expr = (Expr *)lfirst(lc); /* * If aggregates within local conditions are not safe to push * down, then we cannot push down the query. Vars are already * part of GROUP BY clause which are checked above, so no need to * access them again here. Again, we need not check * is_foreign_param for a foreign aggregate. */ if (IsA(expr, Aggref)) { if (!is_foreign_expr(root, grouped_rel, expr)) { return false; } tlist = add_to_flat_tlist(tlist, list_make1(expr)); } } } /* Store generated targetlist */ fpinfo->grouped_tlist = tlist; /* Safe to pushdown */ fpinfo->pushdown_safe = true; /* * Set # of retrieved rows and cached relation costs to some negative * value, so that we can detect when they are set to some sensible values, * during one (usually the first) of the calls to estimate_path_cost_size. */ fpinfo->retrieved_rows = -1; fpinfo->rel_startup_cost = -1; fpinfo->rel_total_cost = -1; /* * Set the string describing this grouped relation to be used in EXPLAIN * output of corresponding ForeignScan. Note that the decoration we add * to the base relation name mustn't include any digits, or it'll confuse * postgresExplainForeignScan. */ fpinfo->relation_name = psprintf("Aggregate on (%s)", ofpinfo->relation_name); return true; } static void init_upperrel_paths_context(FDWUpperRelCxt* ufdwCxt, Plan* mainPlan) { Assert(ufdwCxt->root != NULL && ufdwCxt->currentRel != NULL && ufdwCxt->spjExtra != NULL); PlannerInfo* root = ufdwCxt->root; if (root->parse->groupingSets) { ufdwCxt->state = FDW_UPPER_REL_END; return; } /* if the spj rel is unsafe to push down, upper rel is unsafe alse. */ PgFdwRelationInfo* fpinfo = (PgFdwRelationInfo *)ufdwCxt->currentRel->fdw_private; if (!fpinfo->pushdown_safe) { ufdwCxt->state = FDW_UPPER_REL_END; return; } fpinfo->stage = UPPERREL_INIT; /* the reltargetlist may bave been evaluted, we shoule adept it. */ ufdwCxt->currentRel->reltarget->exprs = extract_target_from_tel(ufdwCxt, fpinfo); ufdwCxt->state = FDW_UPPER_REL_INIT; } static void create_upperrel_plan_if_needed(FDWUpperRelCxt* ufdwCxt, Plan* mainPlan) { ListCell* lc = NULL; Path* bestPath = NULL; set_cheapest(ufdwCxt->currentRel, ufdwCxt->root); foreach(lc, ufdwCxt->currentRel->cheapest_total_path) { Path* p = (Path*)lfirst(lc); if (bestPath == NULL || bestPath->total_cost > p->total_cost) { bestPath = p; } } if (bestPath->total_cost < mainPlan->total_cost) { ufdwCxt->resultPlan = create_plan(ufdwCxt->root, bestPath); ufdwCxt->resultPathKeys = bestPath->pathkeys; apply_tlist_labeling(ufdwCxt->resultPlan->targetlist, mainPlan->targetlist); } } /* * postgresGetForeignUpperPaths * Add paths for post-join operations like aggregation, grouping etc. if * corresponding operations are safe to push down. */ static void postgresGetForeignUpperPaths(FDWUpperRelCxt* ufdwCxt, UpperRelationKind stage, Plan* mainPlan) { Assert(ufdwCxt != NULL && ufdwCxt->state != FDW_UPPER_REL_END); Assert(ufdwCxt->currentRel && ufdwCxt->currentRel->fdwroutine && ufdwCxt->currentRel->fdwroutine->GetForeignUpperPaths); switch (stage) { case UPPERREL_INIT: init_upperrel_paths_context(ufdwCxt, mainPlan); break; case UPPERREL_GROUP_AGG: case UPPERREL_ORDERED: ufdwCxt->state = FDW_UPPER_REL_TRY; _postgresGetForeignUpperPaths(ufdwCxt, stage); break; case UPPERREL_ROWMARKS: case UPPERREL_LIMIT: ufdwCxt->state = FDW_UPPER_REL_TRY; break; case UPPERREL_FINAL: if (ufdwCxt->state == FDW_UPPER_REL_TRY) { _postgresGetForeignUpperPaths(ufdwCxt, stage); if (ufdwCxt->currentRel->pathlist != NIL) { create_upperrel_plan_if_needed(ufdwCxt, mainPlan); } } ufdwCxt->state = FDW_UPPER_REL_END; break; default: ufdwCxt->state = FDW_UPPER_REL_END; break; } } static void _postgresGetForeignUpperPaths(FDWUpperRelCxt *ufdw_cxt, UpperRelationKind stage) { PgFdwRelationInfo *fpinfo = NULL; RelOptInfo *input_rel = ufdw_cxt->currentRel; RelOptInfo* output_rel = NULL; /* * If input rel is not safe to pushdown, then simply return as we cannot * perform any post-join operations on the foreign server. */ if (!input_rel->fdw_private || !((PgFdwRelationInfo *)input_rel->fdw_private)->pushdown_safe) { return; } fpinfo = (PgFdwRelationInfo *)palloc0(sizeof(PgFdwRelationInfo)); fpinfo->pushdown_safe = false; fpinfo->stage = stage; output_rel = make_upper_rel(ufdw_cxt, fpinfo); switch (stage) { case UPPERREL_GROUP_AGG: add_foreign_grouping_paths(ufdw_cxt->root, input_rel, output_rel, ufdw_cxt->groupExtra); break; case UPPERREL_ORDERED: add_foreign_ordered_paths(ufdw_cxt->root, input_rel, output_rel, ufdw_cxt->orderExtra); break; case UPPERREL_FINAL: add_foreign_final_paths(ufdw_cxt->root, input_rel, output_rel, ufdw_cxt->finalExtra); break; default: elog(ERROR, "unexpected upper relation: %d", (int)stage); break; } ufdw_cxt->currentRel = output_rel; } /* * add_foreign_grouping_paths * Add foreign path for grouping and/or aggregation. * * Given input_rel represents the underlying scan. The paths are added to the * given grouped_rel. */ static void add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *grouped_rel, GroupPathExtraData *extra) { Query *parse = root->parse; PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *)input_rel->fdw_private; PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *)grouped_rel->fdw_private; ForeignPath *grouppath = NULL; double rows; int width; Cost startup_cost; Cost total_cost; /* Nothing to be done, if there is no grouping or aggregation required. */ if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs && !root->hasHavingQual) { return; } /* save the input_rel as outerrel in fpinfo */ fpinfo->outerrel = input_rel; /* * Copy foreign table, foreign server, user mapping, FDW options etc. * details from the input relation's fpinfo. */ fpinfo->table = ifpinfo->table; fpinfo->server = ifpinfo->server; fpinfo->user = ifpinfo->user; merge_fdw_options(fpinfo, ifpinfo, NULL); /* * Assess if it is safe to push down aggregation and grouping. * * Use HAVING qual from extra. In case of child partition, it will have * translated Vars. */ if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual, extra)) { return; } /* * Compute the selectivity and cost of the local_conds, so we don't have * to do it over again for each path. (Currently we create just a single * path here, but in future it would be possible that we build more paths * such as pre-sorted paths as in postgresGetForeignPaths and * postgresGetForeignJoinPaths.) The best we can do for these conditions * is to estimate selectivity on the basis of local statistics. */ fpinfo->local_conds_sel = clauselist_selectivity(root, fpinfo->local_conds, 0, JOIN_INNER, NULL, false); cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); /* Estimate the cost of push down */ estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL, &rows, &width, &startup_cost, &total_cost); /* Now update this information in the fpinfo */ fpinfo->rows = rows; fpinfo->width = width; fpinfo->startup_cost = startup_cost; fpinfo->total_cost = total_cost; /* Create and add foreign path to the grouping relation. */ grouppath = create_foreign_upper_path(root, grouped_rel, grouped_rel->reltarget->exprs, rows, startup_cost, total_cost, NIL, /* no pathkeys */ NULL, NIL); /* no fdw_private */ /* Add generated path into grouped_rel by add_path(). */ add_path(root, grouped_rel, (Path *)grouppath); } /* * add_foreign_ordered_paths * Add foreign paths for performing the final sort remotely. * * Given input_rel contains the source-data Paths. The paths are added to the * given ordered_rel. */ static void add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *ordered_rel, OrderPathExtraData *extra) { PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *)input_rel->fdw_private; PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *)ordered_rel->fdw_private; PgFdwPathExtraData *fpextra = NULL; double rows; int width; Cost startup_cost; Cost total_cost; List *fdw_private; ForeignPath *ordered_path = NULL; ListCell *lc = NULL; /* Shouldn't get here unless the query has ORDER BY */ Assert(root->parse->sortClause); /* Save the input_rel as outerrel in fpinfo */ fpinfo->outerrel = input_rel; /* * Copy foreign table, foreign server, user mapping, FDW options etc. * details from the input relation's fpinfo. */ fpinfo->table = ifpinfo->table; fpinfo->server = ifpinfo->server; fpinfo->user = ifpinfo->user; merge_fdw_options(fpinfo, ifpinfo, NULL); /* * If the input_rel is a base or join relation, we would already have * considered pushing down the final sort to the remote server when * creating pre-sorted foreign paths for that relation, because the * query_pathkeys is set to the root->sort_pathkeys in that case (see * standard_qp_callback()). */ if (input_rel->reloptkind == RELOPT_BASEREL || input_rel->reloptkind == RELOPT_JOINREL) { Assert(equal(root->query_pathkeys, root->sort_pathkeys)); /* Safe to push down if the query_pathkeys is safe to push down */ fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe; return; } /* The input_rel should be a grouping relation */ Assert(input_rel->reloptkind == RELOPT_UPPER_REL && ifpinfo->stage == UPPERREL_GROUP_AGG); /* * We try to create a path below by extending a simple foreign path for * the underlying grouping relation to perform the final sort remotely, * which is stored into the fdw_private list of the resulting path. */ /* Assess if it is safe to push down the final sort */ foreach (lc, root->sort_pathkeys) { PathKey *pathkey = (PathKey *)lfirst(lc); EquivalenceClass *pathkey_ec = pathkey->pk_eclass; /* * is_foreign_expr would detect volatile expressions as well, but * checking ec_has_volatile here saves some cycles. */ if (pathkey_ec->ec_has_volatile) { return; } /* * Can't push down the sort if pathkey's opfamily is not shippable. */ if (!is_builtin(pathkey->pk_opfamily)) { return; } /* * The EC must contain a shippable EM that is computed in input_rel's * reltarget, else we can't push down the sort. */ if (find_em_for_rel_target(root, pathkey_ec, input_rel) == NULL) { return; } } /* Safe to push down */ fpinfo->pushdown_safe = true; /* Construct PgFdwPathExtraData */ fpextra = (PgFdwPathExtraData *)palloc0(sizeof(PgFdwPathExtraData)); fpextra->target = ordered_rel->reltarget->exprs; fpextra->has_final_sort = true; /* Estimate the costs of performing the final sort remotely */ estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra, &rows, &width, &startup_cost, &total_cost); /* * Build the fdw_private list that will be used by postgresGetForeignPlan. * Items in the list must match order in enum FdwPathPrivateIndex. */ fdw_private = list_make2(makeInteger(1), makeInteger(0)); /* Create foreign ordering path */ ordered_path = create_foreign_upper_path(root, input_rel, ordered_rel->reltarget->exprs, rows, startup_cost, total_cost, root->sort_pathkeys, NULL, /* no extra plan */ fdw_private); /* and add it to the ordered_rel */ add_path(root, ordered_rel, (Path *)ordered_path); } /* * add_foreign_final_paths * Add foreign paths for performing the final processing remotely. * * Given input_rel contains the source-data Paths. The paths are added to the * given final_rel. */ static void add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *final_rel, FinalPathExtraData *extra) { Query *parse = root->parse; PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *)input_rel->fdw_private; PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *)final_rel->fdw_private; bool has_final_sort = false; List *pathkeys = NIL; PgFdwPathExtraData *fpextra = NULL; bool save_use_remote_estimate = false; double rows; int width; Cost startup_cost; Cost total_cost; List *fdw_private = NIL; ForeignPath *final_path = NULL; /* * Currently, we only support this for SELECT commands */ if (parse->commandType != CMD_SELECT) { return; } /* * No work if there is no FOR UPDATE/SHARE clause and if there is no need * to add a LIMIT node, input rel pathlist is the result, just copy it */ if (!parse->rowMarks && !extra->limit_needed) { ListCell* lc = NULL; Path* path = NULL; ForeignPath *final_path = NULL; foreach(lc, input_rel->pathlist) { path = (Path*)lfirst(lc); if (IsA(path, ForeignPath)) { final_path = create_foreign_upper_path(root, path->parent, extra->targetList, path->rows, path->startup_cost, path->total_cost, path->pathkeys, NULL, /* no extra plan */ ((ForeignPath *)path)->fdw_private); /* and add it to the final_rel */ add_path(root, final_rel, (Path *)final_path); /* Safe to push down */ fpinfo->pushdown_safe = true; } } return; } /* Save the input_rel as outerrel in fpinfo */ fpinfo->outerrel = input_rel; /* * Copy foreign table, foreign server, user mapping, FDW options etc. * details from the input relation's fpinfo. */ fpinfo->table = ifpinfo->table; fpinfo->server = ifpinfo->server; fpinfo->user = ifpinfo->user; merge_fdw_options(fpinfo, ifpinfo, NULL); /* * If there is no need to add a LIMIT node, there might be a ForeignPath * in the input_rel's pathlist that implements all behavior of the query. * Note: we would already have accounted for the query's FOR UPDATE/SHARE * (if any) before we get here. */ if (!extra->limit_needed) { ListCell *lc = NULL; Assert(parse->rowMarks); /* * Grouping and aggregation are not supported with FOR UPDATE/SHARE, * so the input_rel should be a base, join, or ordered relation; and * if it's an ordered relation, its input relation should be a base or * join relation. */ Assert(input_rel->reloptkind == RELOPT_BASEREL || input_rel->reloptkind == RELOPT_JOINREL || (input_rel->reloptkind == RELOPT_UPPER_REL && ifpinfo->stage == UPPERREL_ORDERED && (ifpinfo->outerrel->reloptkind == RELOPT_BASEREL || ifpinfo->outerrel->reloptkind == RELOPT_JOINREL))); foreach (lc, input_rel->pathlist) { Path *path = (Path *)lfirst(lc); /* * apply_scanjoin_target_to_paths() uses create_projection_path() * to adjust each of its input paths if needed, whereas * create_ordered_paths() uses apply_projection_to_path() to do * that. So the former might have put a ProjectionPath on top of * the ForeignPath; look through ProjectionPath and see if the * path underneath it is ForeignPath. */ if (IsA(path, ForeignPath)) { /* * Create foreign final path; this gets rid of a * no-longer-needed outer plan (if any), which makes the * EXPLAIN output look cleaner */ final_path = create_foreign_upper_path(root, path->parent, extra->targetList, path->rows, path->startup_cost, path->total_cost, path->pathkeys, NULL, /* no extra plan */ NULL); /* no fdw_private */ /* and add it to the final_rel */ add_path(root, final_rel, (Path *)final_path); /* Safe to push down */ fpinfo->pushdown_safe = true; return; } } /* * If we get here it means no ForeignPaths; since we would already * have considered pushing down all operations for the query to the * remote server, give up on it. */ return; } Assert(extra->limit_needed); /* * If the input_rel is an ordered relation, replace the input_rel with its * input relation */ if (input_rel->reloptkind == RELOPT_UPPER_REL && ifpinfo->stage == UPPERREL_ORDERED) { input_rel = ifpinfo->outerrel; ifpinfo = (PgFdwRelationInfo *)input_rel->fdw_private; has_final_sort = true; pathkeys = root->sort_pathkeys; } /* The input_rel should be a base, join, or grouping relation */ Assert(input_rel->reloptkind == RELOPT_BASEREL || input_rel->reloptkind == RELOPT_JOINREL || (input_rel->reloptkind == RELOPT_UPPER_REL && ifpinfo->stage == UPPERREL_GROUP_AGG)); /* * We try to create a path below by extending a simple foreign path for * the underlying base, join, or grouping relation to perform the final * sort (if has_final_sort) and the LIMIT restriction remotely, which is * stored into the fdw_private list of the resulting path. (We * re-estimate the costs of sorting the underlying relation, if * has_final_sort.) * * Assess if it is safe to push down the LIMIT and OFFSET to the remote * server * * If the underlying relation has any local conditions, the LIMIT/OFFSET * cannot be pushed down. */ if (ifpinfo->local_conds) { return; } /* * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are * not safe to remote. */ if (!is_foreign_expr(root, input_rel, (Expr *)parse->limitOffset) || !is_foreign_expr(root, input_rel, (Expr *)parse->limitCount)) { return; } /* Safe to push down */ fpinfo->pushdown_safe = true; /* Construct PgFdwPathExtraData */ fpextra = (PgFdwPathExtraData *)palloc0(sizeof(PgFdwPathExtraData)); fpextra->target = extra->targetList; fpextra->has_final_sort = has_final_sort; fpextra->has_limit = extra->limit_needed; fpextra->limit_tuples = extra->limit_tuples; fpextra->count_est = extra->count_est; fpextra->offset_est = extra->offset_est; /* * Estimate the costs of performing the final sort and the LIMIT * restriction remotely. If has_final_sort is false, we wouldn't need to * execute EXPLAIN anymore if use_remote_estimate, since the costs can be * roughly estimated using the costs we already have for the underlying * relation, in the same way as when use_remote_estimate is false. Since * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to * false in that case. */ if (!fpextra->has_final_sort) { save_use_remote_estimate = ifpinfo->use_remote_estimate; ifpinfo->use_remote_estimate = false; } estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra, &rows, &width, &startup_cost, &total_cost); if (!fpextra->has_final_sort) { ifpinfo->use_remote_estimate = save_use_remote_estimate; } /* * Build the fdw_private list that will be used by postgresGetForeignPlan. * Items in the list must match order in enum FdwPathPrivateIndex. */ fdw_private = list_make2(makeInteger(has_final_sort), makeInteger(extra->limit_needed)); /* * Create foreign final path; this gets rid of a no-longer-needed outer * plan (if any), which makes the EXPLAIN output look cleaner */ final_path = create_foreign_upper_path(root, input_rel, extra->targetList, rows, startup_cost, total_cost, pathkeys, NULL, /* no extra plan */ fdw_private); /* and add it to the final_rel */ add_path(root, final_rel, (Path *)final_path); }