pgfdw support join\agg\sort\limit\lockrows

This commit is contained in:
gentle_hu
2022-11-15 15:31:38 +08:00
parent 0d771b4357
commit f995bd2209
58 changed files with 34971 additions and 1787 deletions

View File

@ -71,7 +71,8 @@ static TupleTableSlot* ForeignNext(ForeignScanState* node)
if (plan->fsSystemCol && !TupIsNull(slot)) {
HeapTuple tup = ExecMaterializeSlot(slot);
tup->t_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
if (!OidIsValid(tup->t_tableOid))
tup->t_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
tup->t_bucketId = RelationGetBktid(node->ss.ss_currentRelation);
#ifdef PGXC
tup->t_xc_node_id = u_sess->pgxc_cxt.PGXCNodeIdentifier;
@ -86,7 +87,6 @@ static TupleTableSlot* ForeignNext(ForeignScanState* node)
*/
static bool ForeignRecheck(ForeignScanState* node, TupleTableSlot* slot)
{
/* There are no access-method-specific conditions to recheck. */
return true;
}
@ -111,8 +111,10 @@ TupleTableSlot* ExecForeignScan(ForeignScanState* node)
ForeignScanState* ExecInitForeignScan(ForeignScan* node, EState* estate, int eflags)
{
ForeignScanState* scanstate = NULL;
Relation currentRelation;
Relation currentRelation = NULL;
FdwRoutine* fdwroutine = NULL;
Index scanrelid = node->scan.scanrelid;
int tlistvarno;
errno_t rc;
/* check for unsupported flags */
@ -159,25 +161,65 @@ ForeignScanState* ExecInitForeignScan(ForeignScan* node, EState* estate, int efl
/*
* open the base relation and acquire appropriate lock on it.
* or it is a foreign join, we should recreate resultdesc and project info
*/
if (node->rel == NULL) {
currentRelation = ExecOpenScanRelation(estate, node->scan.scanrelid);
} else {
if (node->in_compute_pool == false) {
if (scanrelid > 0) {
if (node->rel == NULL) {
currentRelation = ExecOpenScanRelation(estate, node->scan.scanrelid);
} else {
currentRelation = get_rel_from_meta(node->rel);
scanstate->options = node->options;
if (node->in_compute_pool == false) {
currentRelation = ExecOpenScanRelation(estate, node->scan.scanrelid);
} else {
currentRelation = get_rel_from_meta(node->rel);
scanstate->options = node->options;
}
}
scanstate->ss.ss_currentRelation = currentRelation;
/*
* Acquire function pointers from the FDW's handler, and init fdw_state.
*/
if (!node->rel) {
fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
} else {
ForeignDataWrapper* fdw = NULL;
if (node->options->stype == T_OBS_SERVER || node->options->stype == T_HDFS_SERVER) {
fdw = GetForeignDataWrapperByName(HDFS_FDW, false);
} else {
fdw = GetForeignDataWrapperByName(DIST_FDW, false);
}
fdwroutine = GetFdwRoutine(fdw->fdwhandler);
/* Save the data for later reuse in LocalMyDBCacheMemCxt */
FdwRoutine* cfdwroutine = (FdwRoutine*)MemoryContextAlloc(LocalMyDBCacheMemCxt(), sizeof(FdwRoutine));
rc = memcpy_s(cfdwroutine, sizeof(FdwRoutine), fdwroutine, sizeof(FdwRoutine));
securec_check(rc, "\0", "\0");
currentRelation->rd_fdwroutine = cfdwroutine;
}
} else {
/* We can't use the relcache, so get fdwroutine the hard way */
fdwroutine = GetFdwRoutineByServerId(node->fs_server);
}
scanstate->ss.ss_currentRelation = currentRelation;
/*
* get the scan type from the relation descriptor. (XXX at some point we
* might want to let the FDW editorialize on the scan tupdesc.)
* Determine the scan tuple type. If the FDW provided a targetlist
* describing the scan tuples, use that; else use base relation's rowtype.
*/
ExecAssignScanType(&scanstate->ss, RelationGetDescr(currentRelation));
if (node->fdw_scan_tlist != NIL || currentRelation == NULL) {
TupleDesc scan_tupdesc;
scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist, false);
ExecAssignScanType(&scanstate->ss, scan_tupdesc);
tlistvarno = INDEX_VAR;
} else {
TupleDesc scan_tupdesc;
/* don't trust FDWs to return tuples fulfilling NOT NULL constraints */
scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation));
ExecAssignScanType(&scanstate->ss, scan_tupdesc);
/* Node's targetlist will contain Vars with varno = scanrelid */
tlistvarno = scanrelid;
}
/*
* Initialize result tuple type and projection info.
@ -185,34 +227,16 @@ ForeignScanState* ExecInitForeignScan(ForeignScan* node, EState* estate, int efl
ExecAssignResultTypeFromTL(
&scanstate->ss.ps,
scanstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor->tdTableAmType);
ExecAssignScanProjectionInfo(&scanstate->ss);
Assert(scanstate->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor->tdTableAmType != TAM_INVALID);
/*
* Acquire function pointers from the FDW's handler, and init fdw_state.
*/
if (!node->rel) {
fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
} else {
ForeignDataWrapper* fdw = NULL;
if (node->options->stype == T_OBS_SERVER || node->options->stype == T_HDFS_SERVER) {
fdw = GetForeignDataWrapperByName(HDFS_FDW, false);
} else {
fdw = GetForeignDataWrapperByName(DIST_FDW, false);
}
fdwroutine = GetFdwRoutine(fdw->fdwhandler);
/* Save the data for later reuse in LocalMyDBCacheMemCxt */
FdwRoutine* cfdwroutine = (FdwRoutine*)MemoryContextAlloc(LocalMyDBCacheMemCxt(), sizeof(FdwRoutine));
rc = memcpy_s(cfdwroutine, sizeof(FdwRoutine), fdwroutine, sizeof(FdwRoutine));
securec_check(rc, "\0", "\0");
currentRelation->rd_fdwroutine = cfdwroutine;
}
ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
scanstate->fdwroutine = fdwroutine;
scanstate->fdw_state = NULL;
/* Initialize any outer plan. */
if (outerPlan(node)) {
outerPlanState(scanstate) = ExecInitNode(outerPlan(node), estate, eflags);
}
#ifdef ENABLE_MOT
if ((estate->mot_jit_context == NULL) || IS_PGXC_COORDINATOR || !JitExec::IsMotCodegenEnabled()) {
#endif
@ -250,6 +274,10 @@ void ExecEndForeignScan(ForeignScanState* node)
}
#endif
/* Shut down any outer plan. */
if (outerPlanState(node))
ExecEndNode(outerPlanState(node));
/* Free the exprcontext */
ExecFreeExprContext(&node->ss.ps);
@ -259,11 +287,13 @@ void ExecEndForeignScan(ForeignScanState* node)
/* close the relation. */
ForeignScan* scan = (ForeignScan*)node->ss.ps.plan;
if (NULL == scan->rel) {
ExecCloseScanRelation(node->ss.ss_currentRelation);
} else {
if (false == scan->in_compute_pool) {
if (node->ss.ss_currentRelation != NULL) {
if (NULL == scan->rel) {
ExecCloseScanRelation(node->ss.ss_currentRelation);
} else {
if (false == scan->in_compute_pool) {
ExecCloseScanRelation(node->ss.ss_currentRelation);
}
}
}