diff --git a/src/gausskernel/runtime/executor/execParallel.cpp b/src/gausskernel/runtime/executor/execParallel.cpp index 1e3a59c76..9b161aadc 100644 --- a/src/gausskernel/runtime/executor/execParallel.cpp +++ b/src/gausskernel/runtime/executor/execParallel.cpp @@ -269,7 +269,7 @@ void ExecParallelReinitialize(ParallelExecutorInfo *pei) * Sets up the required infrastructure for backend workers to perform * execution and return results to the main backend. */ -ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) +ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, int64 tuples_needed) { ExecParallelEstimateContext e; ExecParallelInitializeDSMContext d; @@ -320,6 +320,9 @@ ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, int rc = memset_s(&queryInfo, sizeof(ParallelQueryInfo), 0, sizeof(ParallelQueryInfo)); securec_check(rc, "", ""); + queryInfo.tuples_needed = tuples_needed; + queryInfo.eflags = estate->es_top_eflags; + /* Store serialized PlannedStmt. */ queryInfo.pstmt_space = ExecSerializePlan(planstate->plan, estate); @@ -627,9 +630,14 @@ void ParallelQueryMain(void *seg) InstrStartParallelQuery(); /* Start up the executor, have it run the plan, and then shut it down. */ - (void)ExecutorStart(queryDesc, 0); + (void)ExecutorStart(queryDesc, cxt->pwCtx->queryInfo.eflags); ExecParallelInitializeWorker(queryDesc->planstate, seg); - ExecutorRun(queryDesc, ForwardScanDirection, 0L); + + /* Pass down any tuple bound */ + int64 tuples_needed = cxt->pwCtx->queryInfo.tuples_needed; + ExecSetTupleBound(tuples_needed, queryDesc->planstate); + + ExecutorRun(queryDesc, ForwardScanDirection, tuples_needed < 0 ? 0 : tuples_needed); ExecutorFinish(queryDesc); /* Report buffer usage during parallel execution. */ diff --git a/src/gausskernel/runtime/executor/execProcnode.cpp b/src/gausskernel/runtime/executor/execProcnode.cpp index 77f86b765..80f52eae4 100755 --- a/src/gausskernel/runtime/executor/execProcnode.cpp +++ b/src/gausskernel/runtime/executor/execProcnode.cpp @@ -1375,3 +1375,96 @@ bool ExecShutdownNode(PlanState *node) return planstate_tree_walker(node, (bool (*)())ExecShutdownNode, NULL); } + +/* + * ExecSetTupleBound + * + * Set a tuple bound for a planstate node. This lets child plan nodes + * optimize based on the knowledge that the maximum number of tuples that + * their parent will demand is limited. The tuple bound for a node may + * only be changed between scans (i.e., after node initialization or just + * before an ExecReScan call). + * + * Any negative tuples_needed value means "no limit", which should be the + * default assumption when this is not called at all for a particular node. + * + * Note: if this is called repeatedly on a plan tree, the exact same set + * of nodes must be updated with the new limit each time; be careful that + * only unchanging conditions are tested here. + */ +void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node) +{ + /* + * Since this function recurses, in principle we should check stack depth + * here. In practice, it's probably pointless since the earlier node + * initialization tree traversal would surely have consumed more stack. + */ + if (IsA(child_node, SortState) || IsA(child_node, VecSortState)) { + /* + * If it is a Sort node, notify it that it can use bounded sort. + * + * Note: it is the responsibility of nodeSort.c to react properly to + * changes of these parameters. If we ever redesign this, it'd be a + * good idea to integrate this signaling with the parameter-change + * mechanism. + */ + SortState *sortState = (SortState *)child_node; + + if (tuples_needed < 0) { + /* make sure flag gets reset if needed upon rescan */ + sortState->bounded = false; + } else { + sortState->bounded = true; + sortState->bound = tuples_needed; + } + } else if (IsA(child_node, MergeAppendState)) { + /* + * If it is a MergeAppend, we can apply the bound to any nodes that + * are children of the MergeAppend, since the MergeAppend surely need + * read no more than that many tuples from any one input. + */ + MergeAppendState *maState = (MergeAppendState *)child_node; + + for (int i = 0; i < maState->ms_nplans; i++) { + ExecSetTupleBound(tuples_needed, maState->mergeplans[i]); + } + } else if (IsA(child_node, ResultState) || IsA(child_node, VecResultState)) { + /* + * An extra consideration here is that if the Result is projecting a + * targetlist that contains any SRFs, we can't assume that every input + * tuple generates an output tuple, so a Sort underneath might need to + * return more than N tuples to satisfy LIMIT N. So we cannot use + * bounded sort. + * + * If Result supported qual checking, we'd have to punt on seeing a + * qual, too. Note that having a resconstantqual is not a + * showstopper: if that fails we're not getting any rows at all. + */ + if (outerPlanState(child_node) && !expression_returns_set((Node*)child_node->plan->targetlist)) { + ExecSetTupleBound(tuples_needed, outerPlanState(child_node)); + } + } else if (IsA(child_node, GatherState)) { + /* + * A Gather node can propagate the bound to its workers. As with + * MergeAppend, no one worker could possibly need to return more + * tuples than the Gather itself needs to. + * + * Note: As with Sort, the Gather node is responsible for reacting + * properly to changes to this parameter. + */ + GatherState *gstate = (GatherState *)child_node; + + gstate->tuples_needed = tuples_needed; + + /* Also pass down the bound to our own copy of the child plan */ + ExecSetTupleBound(tuples_needed, outerPlanState(child_node)); + } + + /* + * In principle we could descend through any plan node type that is + * certain not to discard or combine input rows; but on seeing a node that + * can do that, we can't propagate the bound any further. For the moment + * it's unclear that any other cases are worth checking here. + */ +} + diff --git a/src/gausskernel/runtime/executor/nodeGather.cpp b/src/gausskernel/runtime/executor/nodeGather.cpp index 7d5595ec2..728c5d9fe 100644 --- a/src/gausskernel/runtime/executor/nodeGather.cpp +++ b/src/gausskernel/runtime/executor/nodeGather.cpp @@ -66,6 +66,7 @@ GatherState *ExecInitGather(Gather *node, EState *estate, int eflags) gatherstate->ps.state = estate; gatherstate->need_to_scan_locally = !node->single_copy && u_sess->attr.attr_sql.parallel_leader_participation; + gatherstate->tuples_needed = -1; /* * Miscellaneous initialization @@ -81,9 +82,10 @@ GatherState *ExecInitGather(Gather *node, EState *estate, int eflags) gatherstate->ps.qual = (List *)ExecInitExpr((Expr *)node->plan.qual, (PlanState *)gatherstate); /* - * tuple table initialization + * tuple table initialization, doesn't need tuple_mcxt */ - gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate); + gatherstate->funnel_slot = MakeTupleTableSlot(false); + estate->es_tupleTable = lappend(estate->es_tupleTable, gatherstate->funnel_slot); ExecInitResultTupleSlot(estate, &gatherstate->ps); /* @@ -98,7 +100,12 @@ GatherState *ExecInitGather(Gather *node, EState *estate, int eflags) * Initialize result tuple type and projection info. */ ExecAssignResultTypeFromTL(&gatherstate->ps); - ExecAssignProjectionInfo(&gatherstate->ps, NULL); + if (tlist_matches_tupdesc(&gatherstate->ps, gatherstate->ps.plan->targetlist, + OUTER_VAR, ExecGetResultType(outerPlanState(gatherstate)))) { + gatherstate->ps.ps_ProjInfo = NULL; + } else { + ExecAssignProjectionInfo(&gatherstate->ps, NULL); + } /* * Initialize funnel slot to same tuple descriptor as outer plan. @@ -143,9 +150,9 @@ TupleTableSlot *ExecGather(GatherState *node) */ if (gather->num_workers > 0 && IsInParallelMode()) { /* Initialize the workers required to execute Gather node. */ - if (!node->pei) - node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, gather->num_workers); - + if (!node->pei) { + node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, gather->num_workers, node->tuples_needed); + } /* * Register backend workers. We might not get as many as we * requested, or indeed any at all. @@ -200,7 +207,6 @@ TupleTableSlot *ExecGather(GatherState *node) * returned by a TupleQueueReader; to make sure we don't leave a dangling * pointer around, clear the working slot first. */ - (void)ExecClearTuple(node->funnel_slot); ExprContext *econtext = node->ps.ps_ExprContext; ResetExprContext(econtext); @@ -211,8 +217,14 @@ TupleTableSlot *ExecGather(GatherState *node) * plan ourselves. */ slot = gather_getnext(node); - if (TupIsNull(slot)) + if (TupIsNull(slot)) { return NULL; + } + + /* If no projection is required, we're done. */ + if (node->ps.ps_ProjInfo == NULL) { + return slot; + } /* * form the result tuple using ExecProject(), and return it --- unless diff --git a/src/gausskernel/runtime/executor/nodeLimit.cpp b/src/gausskernel/runtime/executor/nodeLimit.cpp index 4f2f34b30..80f97a790 100755 --- a/src/gausskernel/runtime/executor/nodeLimit.cpp +++ b/src/gausskernel/runtime/executor/nodeLimit.cpp @@ -25,7 +25,7 @@ #include "executor/nodeLimit.h" #include "nodes/nodeFuncs.h" -static void pass_down_bound(LimitState* node, PlanState* child_node); +static int64 compute_tuples_needed(LimitState *node); /* ---------------------------------------------------------------- * ExecLimit @@ -273,61 +273,26 @@ void recompute_limits(LimitState* node) /* Set state-machine state */ node->lstate = LIMIT_RESCAN; - /* Notify child node about limit, if useful */ - pass_down_bound(node, outerPlanState(node)); + /* + * Notify child node about limit. Note: think not to "optimize" by + * skipping ExecSetTupleBound if compute_tuples_needed returns < 0. We + * must update the child node anyway, in case this is a rescan and the + * previous time we got a different result. + */ + ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node)); } /* - * If we have a COUNT, and our input is a Sort node, notify it that it can - * use bounded sort. Also, if our input is a MergeAppend, we can apply the - * same bound to any Sorts that are direct children of the MergeAppend, - * since the MergeAppend surely need read no more than that many tuples from - * any one input. We also have to be prepared to look through a Result, - * since the planner might stick one atop MergeAppend for projection purposes. - * - * This is a bit of a kluge, but we don't have any more-abstract way of - * communicating between the two nodes; and it doesn't seem worth trying - * to invent one without some more examples of special communication needs. - * - * Note: it is the responsibility of nodeSort.c to react properly to - * changes of these parameters. If we ever do redesign this, it'd be a - * good idea to integrate this signaling with the parameter-change mechanism. + * Compute the maximum number of tuples needed to satisfy this Limit node. + * Return a negative value if there is not a determinable limit. */ -static void pass_down_bound(LimitState* node, PlanState* child_node) +static int64 compute_tuples_needed(LimitState *node) { - if (IsA(child_node, SortState) || IsA(child_node, VecSortState)) { - SortState* sortState = (SortState*)child_node; - int64 tuples_needed = node->count + node->offset; - - /* negative test checks for overflow in sum */ - if (node->noCount || tuples_needed < 0) { - /* make sure flag gets reset if needed upon rescan */ - sortState->bounded = false; - } else { - sortState->bounded = true; - sortState->bound = tuples_needed; - } - } else if (IsA(child_node, MergeAppendState)) { - MergeAppendState* maState = (MergeAppendState*)child_node; - int i; - - for (i = 0; i < maState->ms_nplans; i++) - pass_down_bound(node, maState->mergeplans[i]); - } else if (IsA(child_node, ResultState) || IsA(child_node, VecResultState)) { - /* - * An extra consideration here is that if the Result is projecting a - * targetlist that contains any SRFs, we can't assume that every input - * tuple generates an output tuple, so a Sort underneath might need to - * return more than N tuples to satisfy LIMIT N. So we cannot use - * bounded sort. - * - * If Result supported qual checking, we'd have to punt on seeing a - * qual, too. Note that having a resconstantqual is not a - * showstopper: if that fails we're not getting any rows at all. - */ - if (outerPlanState(child_node) && !expression_returns_set((Node*)child_node->plan->targetlist)) - pass_down_bound(node, outerPlanState(child_node)); + if (node->noCount) { + return -1; } + /* Note: if this overflows, we'll return a negative value, which is OK */ + return node->count + node->offset; } /* ---------------------------------------------------------------- diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 3a55efe58..13725311b 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -31,7 +31,8 @@ typedef struct ParallelExecutorInfo { struct TupleQueueReader **reader; /* tuple reader/writer support */ } ParallelExecutorInfo; -extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers); +extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, + int nworkers, int64 tuples_needed); extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei, TupleDesc tupDesc); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 2a0aa45d6..8a6b2fe0d 100755 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -248,6 +248,7 @@ extern TupleTableSlot* ExecProcNode(PlanState* node); extern Node* MultiExecProcNode(PlanState* node); extern void ExecEndNode(PlanState* node); extern bool ExecShutdownNode(PlanState *node); +extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node); extern long ExecGetPlanMemCost(Plan* node); diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 6db7dbbbe..c02a1e68e 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -2043,6 +2043,8 @@ typedef struct ParallelQueryInfo { char *pstmt_space; char *param_space; Size param_len; + int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ + int eflags; int pscan_num; ParallelHeapScanDescData **pscan; } ParallelQueryInfo; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index c90f0a670..11f1e32c5 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2302,6 +2302,7 @@ typedef struct GatherState { struct TupleQueueReader **reader; TupleTableSlot *funnel_slot; bool need_to_scan_locally; + int64 tuples_needed; } GatherState; /* ----------------