/*------------------------------------------------------------------------- * * file_fdw.c * foreign-data wrapper for server-side flat files. * * Copyright (c) 2010-2012, PostgreSQL Global Development Group * * IDENTIFICATION * contrib/file_fdw/file_fdw.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "knl/knl_variable.h" #include "access/reloptions.h" #include "catalog/pg_foreign_table.h" #include "commands/copy.h" #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" #include "commands/tablecmds.h" #include "foreign/fdwapi.h" #include "foreign/foreign.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodes.h" #include "optimizer/cost.h" #include "optimizer/pathnode.h" #include "optimizer/planmain.h" #include "optimizer/restrictinfo.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/rel_gs.h" #include "catalog/pg_user_mapping.h" /* * Describes the valid options for objects that use this wrapper. */ struct FileFdwOption { const char* optname; Oid optcontext; /* Oid of catalog in which option may appear */ }; /* * Valid options for file_fdw. * These options are based on the options for COPY FROM command. * But note that force_not_null is handled as a boolean option attached to * each column, not as a table option. * * Note: If you are adding new option for user mapping, you need to modify * fileGetOptions(), which currently doesn't bother to look at user mappings. */ static const struct FileFdwOption valid_options[] = { /* File options */ {"filename", ForeignTableRelationId}, /* Format options */ /* oids option is not supported */ {"format", ForeignTableRelationId}, {"header", ForeignTableRelationId}, {"delimiter", ForeignTableRelationId}, {"quote", ForeignTableRelationId}, {"escape", ForeignTableRelationId}, {"null", ForeignTableRelationId}, {"encoding", ForeignTableRelationId}, {"force_not_null", AttributeRelationId}, /* * force_quote is not supported by file_fdw because it's for COPY TO. */ /* Sentinel */ {NULL, InvalidOid}}; /* * FDW-specific information for RelOptInfo.fdw_private. */ typedef struct FileFdwPlanState { char* filename; /* file to read */ List* options; /* merged COPY options, excluding filename */ BlockNumber pages; /* estimate of file's physical size */ double ntuples; /* estimate of number of rows in file */ } FileFdwPlanState; /* * FDW-specific information for ForeignScanState.fdw_state. */ typedef struct FileFdwExecutionState { char* filename; /* file to read */ List* options; /* merged COPY options, excluding filename */ CopyState cstate; /* state of reading file */ } FileFdwExecutionState; /* * SQL functions */ extern "C" Datum file_fdw_handler(PG_FUNCTION_ARGS); extern "C" Datum file_fdw_validator(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(file_fdw_handler); PG_FUNCTION_INFO_V1(file_fdw_validator); /* * FDW callback routines */ static void fileGetForeignRelSize(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid); static void fileGetForeignPaths(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid); static ForeignScan *fileGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan); static void fileExplainForeignScan(ForeignScanState* node, ExplainState* es); static void fileBeginForeignScan(ForeignScanState* node, int eflags); static TupleTableSlot* fileIterateForeignScan(ForeignScanState* node); static void fileReScanForeignScan(ForeignScanState* node); static void fileEndForeignScan(ForeignScanState* node); static bool fileAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc* func, BlockNumber* totalpages, void* additionalData = 0, bool estimate_table_rownum = false); /* * Helper functions */ static bool is_valid_option(const char* option, Oid context); static void fileGetOptions(Oid foreigntableid, char** filename, List** other_options); static List* get_file_fdw_attribute_options(Oid relid); static void estimate_size(PlannerInfo* root, RelOptInfo* baserel, FileFdwPlanState* fdw_private); static void estimate_costs( PlannerInfo* root, RelOptInfo* baserel, FileFdwPlanState* fdw_private, Cost* startup_cost, Cost* total_cost); static int file_acquire_sample_rows(Relation onerel, int elevel, HeapTuple* rows, int targrows, double* totalrows, double* totaldeadrows, void* additionalData = NULL, bool estimate_table_rownum = false); static void fileValidateTableDef(Node* Obj); /* * Foreign-data wrapper handler function: return a struct with pointers * to my callback routines. */ Datum file_fdw_handler(PG_FUNCTION_ARGS) { FdwRoutine* fdwroutine = makeNode(FdwRoutine); fdwroutine->GetForeignRelSize = fileGetForeignRelSize; fdwroutine->GetForeignPaths = fileGetForeignPaths; fdwroutine->GetForeignPlan = fileGetForeignPlan; fdwroutine->ExplainForeignScan = fileExplainForeignScan; fdwroutine->BeginForeignScan = fileBeginForeignScan; fdwroutine->IterateForeignScan = fileIterateForeignScan; fdwroutine->ReScanForeignScan = fileReScanForeignScan; fdwroutine->EndForeignScan = fileEndForeignScan; fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable; fdwroutine->ValidateTableDef = fileValidateTableDef; /* @hdfs * PartitionTblProcess and BuildRuntimePredicate are only used for hdfs_fdw now, so set null here. */ fdwroutine->PartitionTblProcess = NULL; fdwroutine->BuildRuntimePredicate = NULL; PG_RETURN_POINTER(fdwroutine); } void check_file_fdw_permission() { if ((!initialuser()) && !(isOperatoradmin(GetUserId()) && u_sess->attr.attr_security.operation_mode)) { ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("Dist fdw are only available for the supper user and Operatoradmin"))); } } /* * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER, * USER MAPPING or FOREIGN TABLE that uses file_fdw. * * Raise an ERROR if the option or its value is considered invalid. */ Datum file_fdw_validator(PG_FUNCTION_ARGS) { List* options_list = untransformRelOptions(PG_GETARG_DATUM(0)); Oid catalog = PG_GETARG_OID(1); char* filename = NULL; DefElem* force_not_null = NULL; List* other_options = NIL; ListCell* cell = NULL; check_file_fdw_permission(); if (catalog == UserMappingRelationId) { ereport( ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("file_fdw doesn't support in USER MAPPING."))); } /* * Only superusers are allowed to set options of a file_fdw foreign table. * This is because the filename is one of those options, and we don't want * non-superusers to be able to determine which file gets read. * * Putting this sort of permissions check in a validator is a bit of a * crock, but there doesn't seem to be any other place that can enforce * the check more cleanly. * * Note that the valid_options[] array disallows setting filename at any * options level other than foreign table --- otherwise there'd still be a * security hole. */ if (catalog == ForeignTableRelationId && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("only superuser can change options of a file_fdw foreign table"))); /* * Check that only options supported by file_fdw, and allowed for the * current object type, are given. */ foreach (cell, options_list) { DefElem* def = (DefElem*)lfirst(cell); if (!is_valid_option(def->defname, catalog)) { const struct FileFdwOption* opt = NULL; StringInfoData buf; /* * Unknown option specified, complain about it. Provide a hint * with list of valid options for the object. */ initStringInfo(&buf); for (opt = valid_options; opt->optname; opt++) { if (catalog == opt->optcontext) appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "", opt->optname); } ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), errmsg("invalid option \"%s\"", def->defname), buf.len > 0 ? errhint("Valid options in this context are: %s", buf.data) : errhint("There are no valid options in this context."))); } /* * Separate out filename and force_not_null, since ProcessCopyOptions * won't accept them. (force_not_null only comes in a boolean * per-column flavor here.) */ if (strcmp(def->defname, "filename") == 0) { if (filename) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); filename = defGetString(def); } else if (strcmp(def->defname, "force_not_null") == 0) { if (force_not_null) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); force_not_null = def; /* Don't care what the value is, as long as it's a legal boolean */ (void)defGetBoolean(def); } else if (strcmp(def->defname, "format") == 0) { char* fmt = defGetString(def); if (strcasecmp(fmt, "fixed") == 0) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("file_fdw doesn't support fixed option in format"))); } other_options = lappend(other_options, def); } else { other_options = lappend(other_options, def); } } /* * Now apply the core COPY code's validation logic for more checks. */ ProcessCopyOptions(NULL, true, other_options); /* * Filename option is required for file_fdw foreign tables. */ if (catalog == ForeignTableRelationId && filename == NULL) ereport(ERROR, (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED), errmsg("filename is required for file_fdw foreign tables"))); PG_RETURN_VOID(); } /* * Check if the provided option is one of the valid options. * context is the Oid of the catalog holding the object the option is for. */ static bool is_valid_option(const char* option, Oid context) { const struct FileFdwOption* opt; for (opt = valid_options; opt->optname; opt++) { if (context == opt->optcontext && strcmp(opt->optname, option) == 0) return true; } return false; } /* * Fetch the options for a file_fdw foreign table. * * We have to separate out "filename" from the other options because * it must not appear in the options list passed to the core COPY code. */ static void fileGetOptions(Oid foreigntableid, char** filename, List** other_options) { ForeignTable* table = NULL; ForeignServer* server = NULL; ForeignDataWrapper* wrapper = NULL; List* options = NIL; ListCell* lc = NULL; ListCell* prev = NULL; /* * Extract options from FDW objects. We ignore user mappings because * file_fdw doesn't have any options that can be specified there. * * (XXX Actually, given the current contents of valid_options[], there's * no point in examining anything except the foreign table's own options. * Simplify?) */ table = GetForeignTable(foreigntableid); Assert(NULL != table); server = GetForeignServer(table->serverid); Assert(NULL != server); wrapper = GetForeignDataWrapper(server->fdwid); Assert(NULL != wrapper); options = NIL; options = list_concat(options, wrapper->options); options = list_concat(options, server->options); options = list_concat(options, table->options); options = list_concat(options, get_file_fdw_attribute_options(foreigntableid)); /* * Separate out the filename. */ *filename = NULL; prev = NULL; foreach (lc, options) { DefElem* def = (DefElem*)lfirst(lc); if (strcmp(def->defname, "filename") == 0) { *filename = defGetString(def); options = list_delete_cell(options, lc, prev); break; } prev = lc; } /* * The validator should have checked that a filename was included in the * options, but check again, just in case. */ if (*filename == NULL) elog(ERROR, "filename is required for file_fdw foreign tables"); *other_options = options; } /* * Retrieve per-column generic options from pg_attribute and construct a list * of DefElems representing them. * * At the moment we only have "force_not_null", which should be combined into * a single DefElem listing all such columns, since that's what COPY expects. */ static List* get_file_fdw_attribute_options(Oid relid) { Relation rel; TupleDesc tupleDesc; AttrNumber natts; AttrNumber attnum; List* fnncolumns = NIL; rel = heap_open(relid, AccessShareLock); tupleDesc = RelationGetDescr(rel); natts = tupleDesc->natts; /* Retrieve FDW options for all user-defined attributes. */ for (attnum = 1; attnum <= natts; attnum++) { Form_pg_attribute attr = &tupleDesc->attrs[attnum - 1]; List* options = NIL; ListCell* lc = NULL; /* Skip dropped attributes. */ if (attr->attisdropped) continue; options = GetForeignColumnOptions(relid, attnum); foreach (lc, options) { DefElem* def = (DefElem*)lfirst(lc); if (strcmp(def->defname, "force_not_null") == 0) { if (defGetBoolean(def)) { char* attname = pstrdup(NameStr(attr->attname)); fnncolumns = lappend(fnncolumns, makeString(attname)); } } /* maybe in future handle other options here */ } } heap_close(rel, AccessShareLock); /* Return DefElem only when some column(s) have force_not_null */ if (fnncolumns != NIL) return list_make1(makeDefElem("force_not_null", (Node*)fnncolumns)); else return NIL; } /* * fileGetForeignRelSize * Obtain relation size estimates for a foreign table */ static void fileGetForeignRelSize(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid) { FileFdwPlanState* fdw_private = NULL; /* * Fetch options. We only need filename at this point, but we might as * well get everything and not need to re-fetch it later in planning. */ fdw_private = (FileFdwPlanState*)palloc(sizeof(FileFdwPlanState)); fileGetOptions(foreigntableid, &fdw_private->filename, &fdw_private->options); baserel->fdw_private = (void*)fdw_private; /* Estimate relation size */ estimate_size(root, baserel, fdw_private); } /* * fileGetForeignPaths * Create possible access paths for a scan on the foreign table * * Currently we don't support any push-down feature, so there is only one * possible access path, which simply returns all records in the order in * the data file. */ static void fileGetForeignPaths(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid) { FileFdwPlanState* fdw_private = (FileFdwPlanState*)baserel->fdw_private; Cost startup_cost; Cost total_cost; /* Estimate costs */ estimate_costs(root, baserel, fdw_private, &startup_cost, &total_cost); /* Create a ForeignPath node and add it as only possible path */ add_path(root, baserel, (Path*)create_foreignscan_path(root, baserel, startup_cost, total_cost, NIL, /* no pathkeys */ NULL, /* no outer rel either */ NULL, /* no outer path either */ NIL)); /* no fdw_private data */ /* * If data file was sorted, and we knew it somehow, we could insert * appropriate pathkeys into the ForeignPath node to tell the planner * that. */ } /* * fileGetForeignPlan * Create a ForeignScan plan node for scanning the foreign table */ static ForeignScan *fileGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan) { Index scan_relid = baserel->relid; /* * We have no native ability to evaluate restriction clauses, so we just * put all the scan_clauses into the plan node's qual list for the * executor to check. So all we have to do here is strip RestrictInfo * nodes from the clauses and ignore pseudoconstants (which will be * handled elsewhere). */ scan_clauses = extract_actual_clauses(scan_clauses, false); /* Create the ForeignScan node */ return make_foreignscan(tlist, scan_clauses, scan_relid, NIL, /* no expressions to evaluate */ NIL, /* no private state either */ NIL, NIL, NULL, EXEC_ON_DATANODES); } /* * fileExplainForeignScan * Produce extra output for EXPLAIN */ static void fileExplainForeignScan(ForeignScanState* node, ExplainState* es) { char* filename = NULL; List* options = NIL; /* Fetch options --- we only need filename at this point */ fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), &filename, &options); ExplainPropertyText("Foreign File", filename, es); /* Suppress file size if we're not showing cost details */ if (es->costs) { struct stat stat_buf; if (stat(filename, &stat_buf) == 0) ExplainPropertyLong("Foreign File Size", (long)stat_buf.st_size, es); } } /* * fileBeginForeignScan * Initiate access to the file by creating CopyState */ static void fileBeginForeignScan(ForeignScanState* node, int eflags) { char* filename = NULL; List* options = NIL; CopyState cstate; FileFdwExecutionState* festate = NULL; /* * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. */ if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; /* Fetch options of foreign table */ fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), &filename, &options); /* * Create CopyState from FDW options. We always acquire all columns, so * as to match the expected ScanTupleSlot signature. */ cstate = BeginCopyFrom(node->ss.ss_currentRelation, filename, NIL, options, NULL, NULL); /* * Save state in node->fdw_state. We must save enough information to call * BeginCopyFrom() again. */ festate = (FileFdwExecutionState*)palloc(sizeof(FileFdwExecutionState)); festate->filename = filename; festate->options = options; festate->cstate = cstate; node->fdw_state = (void*)festate; } /* * fileIterateForeignScan * Read next record from the data file and store it into the * ScanTupleSlot as a virtual tuple */ static TupleTableSlot* fileIterateForeignScan(ForeignScanState* node) { FileFdwExecutionState* festate = (FileFdwExecutionState*)node->fdw_state; TupleTableSlot* slot = node->ss.ss_ScanTupleSlot; bool found = false; ErrorContextCallback errcontext; /* Set up callback to identify error line number. */ errcontext.callback = CopyFromErrorCallback; errcontext.arg = (void*)festate->cstate; errcontext.previous = t_thrd.log_cxt.error_context_stack; t_thrd.log_cxt.error_context_stack = &errcontext; /* * The protocol for loading a virtual tuple into a slot is first * ExecClearTuple, then fill the values/isnull arrays, then * ExecStoreVirtualTuple. If we don't find another row in the file, we * just skip the last step, leaving the slot empty as required. * * We can pass ExprContext = NULL because we read all columns from the * file, so no need to evaluate default expressions. * * We can also pass tupleOid = NULL because we don't allow oids for * foreign tables. */ ExecClearTuple(slot); found = NextCopyFrom(festate->cstate, NULL, slot->tts_values, slot->tts_isnull, NULL); if (found) ExecStoreVirtualTuple(slot); /* Remove error callback. */ t_thrd.log_cxt.error_context_stack = errcontext.previous; return slot; } /* * fileReScanForeignScan * Rescan table, possibly with new parameters */ static void fileReScanForeignScan(ForeignScanState* node) { FileFdwExecutionState* festate = (FileFdwExecutionState*)node->fdw_state; EndCopyFrom(festate->cstate); festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation, festate->filename, NIL, festate->options, NULL, NULL); } /* * fileEndForeignScan * Finish scanning foreign table and dispose objects used for this scan */ static void fileEndForeignScan(ForeignScanState* node) { FileFdwExecutionState* festate = (FileFdwExecutionState*)node->fdw_state; /* if festate is NULL, we are in EXPLAIN; nothing to do */ if (festate) EndCopyFrom(festate->cstate); } /* * fileAnalyzeForeignTable * Test whether analyzing this foreign table is supported * @hdfs * In order to match AnalyzeForeignTable function input parameter change * we add the parameter (void* additionalData). This parameter may not be * used by fileAnalyzeForeignTable function. */ static bool fileAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc* func, BlockNumber* totalpages, void* additionalData, bool estimate_table_rownum) { char* filename = NULL; List* options = NIL; struct stat stat_buf; /* Fetch options of foreign table */ fileGetOptions(RelationGetRelid(relation), &filename, &options); /* * Get size of the file. (XXX if we fail here, would it be better to just * return false to skip analyzing the table?) */ if (stat(filename, &stat_buf) < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", filename))); /* * Convert size to pages. Must return at least 1 so that we can tell * later on that pg_class.relpages is not default. */ *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ; if (*totalpages < 1) *totalpages = 1; *func = file_acquire_sample_rows; return true; } /* * Estimate size of a foreign table. * * The main result is returned in baserel->rows. We also set * fdw_private->pages and fdw_private->ntuples for later use in the cost * calculation. */ static void estimate_size(PlannerInfo* root, RelOptInfo* baserel, FileFdwPlanState* fdw_private) { struct stat stat_buf; BlockNumber pages; double ntuples; double nrows; /* * Get size of the file. It might not be there at plan time, though, in * which case we have to use a default estimate. */ if (stat(fdw_private->filename, &stat_buf) < 0) stat_buf.st_size = 10 * BLCKSZ; /* * Convert size to pages for use in I/O cost estimate later. */ pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ; if (pages < 1) pages = 1; fdw_private->pages = pages; /* * Estimate the number of tuples in the file. */ if (baserel->pages > 0) { /* * We have # of pages and # of tuples from pg_class (that is, from a * previous ANALYZE), so compute a tuples-per-page estimate and scale * that by the current file size. */ double density; density = baserel->tuples / (double)baserel->pages; ntuples = clamp_row_est(density * (double)pages); } else { /* * Otherwise we have to fake it. We back into this estimate using the * planner's idea of the relation width; which is bogus if not all * columns are being read, not to mention that the text representation * of a row probably isn't the same size as its internal * representation. Possibly we could do something better, but the * real answer to anyone who complains is "ANALYZE" ... */ int tuple_width; tuple_width = MAXALIGN(baserel->reltarget->width) + MAXALIGN(sizeof(HeapTupleHeaderData)); ntuples = clamp_row_est((double)stat_buf.st_size / (double)tuple_width); baserel->tuples = ntuples; } fdw_private->ntuples = ntuples; /* * Now estimate the number of rows returned by the scan after applying the * baserestrictinfo quals. */ nrows = ntuples * clauselist_selectivity(root, baserel->baserestrictinfo, 0, JOIN_INNER, NULL); nrows = clamp_row_est(nrows); /* Save the output-rows estimate for the planner */ baserel->rows = nrows; } /* * Estimate costs of scanning a foreign table. * * Results are returned in *startup_cost and *total_cost. */ static void estimate_costs( PlannerInfo* root, RelOptInfo* baserel, FileFdwPlanState* fdw_private, Cost* startup_cost, Cost* total_cost) { BlockNumber pages = fdw_private->pages; double ntuples = fdw_private->ntuples; Cost run_cost = 0; Cost cpu_per_tuple; /* * We estimate costs almost the same way as cost_seqscan(), thus assuming * that I/O costs are equivalent to a regular table file of the same size. * However, we take per-tuple CPU costs as 10x of a seqscan, to account * for the cost of parsing records. */ run_cost += u_sess->attr.attr_sql.seq_page_cost * pages; *startup_cost = baserel->baserestrictcost.startup; cpu_per_tuple = u_sess->attr.attr_sql.cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple; run_cost += cpu_per_tuple * ntuples; *total_cost = *startup_cost + run_cost; } /* * file_acquire_sample_rows -- acquire a random sample of rows from the table * * Selected rows are returned in the caller-allocated array rows[], * which must have at least targrows entries. * The actual number of rows selected is returned as the function result. * We also count the total number of rows in the file and return it into * *totalrows. Note that *totaldeadrows is always set to 0. * * Note that the returned list of rows is not always in order by physical * position in the file. Therefore, correlation estimates derived later * may be meaningless, but it's OK because we don't use the estimates * currently (the planner only pays attention to correlation for indexscans). */ static int file_acquire_sample_rows(Relation onerel, int elevel, HeapTuple* rows, int targrows, double* totalrows, double* totaldeadrows, void* additionalData, bool estimate_table_rownum) { int numrows = 0; double rowstoskip = -1; /* -1 means not set yet */ double rstate; TupleDesc tupDesc; Datum* values = NULL; bool* nulls = NULL; bool found = false; char* filename = NULL; List* options = NIL; CopyState cstate; ErrorContextCallback errcontext; MemoryContext oldcontext = CurrentMemoryContext; MemoryContext tupcontext; Assert(onerel); Assert(targrows > 0); tupDesc = RelationGetDescr(onerel); values = (Datum*)palloc(tupDesc->natts * sizeof(Datum)); nulls = (bool*)palloc(tupDesc->natts * sizeof(bool)); /* Fetch options of foreign table */ fileGetOptions(RelationGetRelid(onerel), &filename, &options); /* * Create CopyState from FDW options. */ cstate = BeginCopyFrom(onerel, filename, NIL, options, NULL, NULL); /* * Use per-tuple memory context to prevent leak of memory used to read * rows from the file with Copy routines. */ tupcontext = AllocSetContextCreate(CurrentMemoryContext, "file_fdw temporary context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); /* Prepare for sampling rows */ rstate = anl_init_selection_state(targrows); /* Set up callback to identify error line number. */ errcontext.callback = CopyFromErrorCallback; errcontext.arg = (void*)cstate; errcontext.previous = t_thrd.log_cxt.error_context_stack; t_thrd.log_cxt.error_context_stack = &errcontext; *totalrows = 0; *totaldeadrows = 0; for (;;) { /* Check for user-requested abort or sleep */ vacuum_delay_point(); /* Fetch next row */ MemoryContextReset(tupcontext); MemoryContextSwitchTo(tupcontext); found = NextCopyFrom(cstate, NULL, values, nulls, NULL); MemoryContextSwitchTo(oldcontext); if (!found) break; /* * The first targrows sample rows are simply copied into the * reservoir. Then we start replacing tuples in the sample until we * reach the end of the relation. This algorithm is from Jeff Vitter's * paper (see more info in commands/analyze.c). */ if (numrows < targrows) { rows[numrows++] = heap_form_tuple(tupDesc, values, nulls); } else { /* * t in Vitter's paper is the number of records already processed. * If we need to compute a new S value, we must use the * not-yet-incremented value of totalrows as t. */ if (rowstoskip < 0) rowstoskip = anl_get_next_S(*totalrows, targrows, &rstate); if (rowstoskip <= 0) { /* * Found a suitable tuple, so save it, replacing one old tuple * at random */ int k = (int)(targrows * anl_random_fract()); Assert(k >= 0 && k < targrows); heap_freetuple(rows[k]); rows[k] = heap_form_tuple(tupDesc, values, nulls); } rowstoskip -= 1; } *totalrows += 1; } /* Remove error callback. */ t_thrd.log_cxt.error_context_stack = errcontext.previous; /* Clean up. */ MemoryContextDelete(tupcontext); EndCopyFrom(cstate); pfree(values); pfree(nulls); /* * Emit some interesting relation info */ ereport(elevel, (errmsg("\"%s\": file contains %.0f rows; " "%d rows in sample", RelationGetRelationName(onerel), *totalrows, numrows))); return numrows; } /*@hdfs *brief: Validate table definition *input param @obj: A Obj including infomation to validate when alter tabel and create table. */ static void fileValidateTableDef(Node* Obj) { if (Obj == NULL) return; switch (nodeTag(Obj)) { case T_AlterTableStmt: { ListCell* lc = NULL; AlterTableStmt* stmt = (AlterTableStmt*)Obj; foreach (lc, stmt->cmds) { AlterTableCmd* cmd = (AlterTableCmd*)lfirst(lc); if (!FOREIGNTABLE_SUPPORT_AT_CMD(cmd->subtype)) { ereport(ERROR, (errmsg("Un-support feature"), errdetail("target table is a foreign table"))); } } break; } case T_CreateForeignTableStmt: break; default: elog(ERROR, "unrecognized node type: %d", (int)nodeTag(Obj)); break; } }