聚集Limit下推优化

This commit is contained in:
‘ljy’
2023-05-29 14:43:02 +08:00
parent 5d6882b3d8
commit 658aea19b8
40 changed files with 2588 additions and 63 deletions

View File

@ -172,6 +172,7 @@ static TupleTableSlot* agg_retrieve_direct(AggState* aggstate);
static void agg_fill_hash_table(AggState* aggstate);
static TupleTableSlot* agg_retrieve_hash_table(AggState* aggstate);
static TupleTableSlot* agg_retrieve(AggState* node);
static TupleTableSlot* agg_sort_group_retrieve_direct(AggState* aggstate);
static bool prepare_data_source(AggState* node);
static TupleTableSlot* fetch_input_tuple(AggState* aggstate);
@ -1518,12 +1519,13 @@ static TupleTableSlot* agg_retrieve(AggState* node)
static TupleTableSlot* ExecAgg(PlanState* state)
{
AggState* node = castNode(AggState, state);
TupleTableSlot* slot = NULL;
/*
* just for cooperation analysis. do nothing if is_dummy is true.
* is_dummy is true that means Agg node is deparsed to remote sql in ForeignScan node.
*/
if (((Agg*)node->ss.ps.plan)->is_dummy) {
TupleTableSlot* slot = ExecProcNode(outerPlanState(node));
slot = ExecProcNode(outerPlanState(node));
return slot;
}
@ -1533,27 +1535,34 @@ static TupleTableSlot* ExecAgg(PlanState* state)
* expressions). If so, try to project another one.
*/
if (node->ss.ps.ps_vec_TupFromTlist) {
TupleTableSlot* result = NULL;
ExprDoneCond isDone;
result = ExecProject(node->ss.ps.ps_ProjInfo, &isDone);
slot = ExecProject(node->ss.ps.ps_ProjInfo, &isDone);
if (isDone == ExprMultipleResult)
return result;
return slot;
/* Done with that source tuple... */
node->ss.ps.ps_vec_TupFromTlist = false;
}
/*
* Exit if nothing left to do.
*/
if (node->agg_done)
return NULL;
/* Dispatch based on strategy */
if (((Agg*)node->ss.ps.plan)->aggstrategy == AGG_HASHED)
return agg_retrieve(node);
else
return agg_retrieve_direct(node);
if (!node->agg_done) {
/* Dispatch based on strategy */
switch (((Agg*)node->ss.ps.plan)->aggstrategy)
{
case AGG_HASHED:
slot = agg_retrieve(node);
break;
case AGG_PLAIN:
case AGG_SORTED:
slot = agg_retrieve_direct(node);
break;
case AGG_SORT_GROUP:
slot = agg_sort_group_retrieve_direct(node);
break;
}
if (!TupIsNull(slot))
return slot;
}
return NULL;
}
/*
@ -1856,6 +1865,156 @@ static TupleTableSlot* agg_retrieve_direct(AggState* aggstate)
return NULL;
}
/*
* ExecAgg for sort-group case
*/
static TupleTableSlot *agg_sort_group_retrieve_direct(AggState *aggstate)
{
ExprContext *econtext = NULL;
ExprContext *tmpcontext = NULL;
AggStatePerAgg peragg;
AggStatePerGroup pergroup;
TupleTableSlot *outerslot = NULL;
TupleTableSlot *firstSlot = NULL;
TupleTableSlot *result = NULL;
Assert(aggstate->phase->numsets == 0);
/*
* get state info from node
*
* econtext is the per-output-tuple expression context
* tmpcontext is the per-input-tuple expression context
*
*/
econtext = aggstate->ss.ps.ps_ExprContext;
tmpcontext = aggstate->tmpcontext;
peragg = aggstate->peragg;
pergroup = aggstate->pergroup;
firstSlot = aggstate->ss.ss_ScanTupleSlot;
/*
* We loop retrieving groups until we find one matching
*aggstate->ss.ps.qual
*
* For grouping sets, we have the invariant that aggstate->projected_set
* is either -1 (initial call) or the index (starting from 0) in
* gset_lengths for the group we just completed (either by projecting a
* row or by discarding it in the qual).
*
* aggstate->ss.ps.qual
*/
while (!aggstate->agg_done) {
/*
* Clear the per-output-tuple context for each group, as well as
* aggcontext (which contains any pass-by-ref transvalues of the old
* group). Some aggregate functions store working state in child
* contexts; those now get reset automatically without us needing to
* do anything special.
*
* We use ReScanExprContext not just ResetExprContext because we want
* any registered shutdown callbacks to be called. That allows
* aggregate functions to ensure they've cleaned up any non-memory
* resources.
*
*/
ReScanExprContext(econtext);
MemoryContextReset(aggstate->aggcontexts[0]);
tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
/*
* If we don't already have the first tuple of the new group,
* fetch it from the outer plan.
*/
if (aggstate->grp_firstTuple == NULL) {
outerslot = fetch_input_tuple(aggstate);
if (!TupIsNull(outerslot)) {
/*
* Make a copy of the first input tuple; we will use this
* for comparisons (in group mode) and for projection.
*/
aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
} else {
aggstate->agg_done = true;
return NULL;
}
}
aggstate->new_group_trigger = false; /*reset new group trigger*/
/*
* Initialize working state for a new input tuple group.
*/
initialize_aggregates(aggstate, peragg, pergroup, 1);
if (aggstate->grp_firstTuple != NULL) {
/*
* Store the copied first input tuple in the tuple table slot
* reserved for it. The tuple will be deleted when it is
* cleared from the slot.
*/
(void)ExecStoreTuple(aggstate->grp_firstTuple, firstSlot, InvalidBuffer, true);
aggstate->grp_firstTuple = NULL; /* don't keep two
* pointers */
/* set up for first advance_aggregates call */
tmpcontext->ecxt_outertuple = firstSlot;
/*
* Process each outer-plan tuple, and then fetch the next one,
* until we exhaust the outer plan or cross a group boundary.
*/
for (;;) {
advance_aggregates(aggstate, pergroup);
/* Reset per-input-tuple context after each tuple */
ResetExprContext(tmpcontext);
outerslot = fetch_input_tuple(aggstate);
if (TupIsNull(outerslot)) {
aggstate->agg_done = true;
break;
}
/* set up for next advance_aggregates call */
tmpcontext->ecxt_outertuple = outerslot;
/*
* check whether we've new group
*/
if (aggstate->new_group_trigger) {
aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
break; /*we've new group*/
}
}
}
/*
* Use the representative input tuple for any references to
* non-aggregated input columns in aggregate direct args, the node
* qual, and the tlist.
*/
econtext->ecxt_outertuple = firstSlot;
prepare_projection_slot(aggstate, econtext->ecxt_outertuple, 0);
finalize_aggregates(aggstate, peragg, pergroup, 0);
/*
* If there's no row to project right now, we must continue rather
* than returning a null since there might be more groups.
*/
result = project_aggregates(aggstate);
if (result != NULL)
return result;
}
/* No more groups */
return NULL;
}
/*
* ExecAgg for hashed case: phase 1, read input and build hash table
*/
@ -2179,6 +2338,11 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
outerPlan = outerPlan(node);
outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
if (node->aggstrategy == AGG_SORT_GROUP) {
SortGroupState* srotGroup = (SortGroupState*) outerPlanState(aggstate);
Assert(IsA(srotGroup, SortGroupState));
srotGroup->new_group_trigger = &aggstate->new_group_trigger;
}
/*
* initialize source tuple type.
@ -2223,15 +2387,20 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
AggStatePerPhase phasedata = &aggstate->phases[phase];
Agg* aggnode = NULL;
Sort* sortnode = NULL;
SortGroup* sortGroupNode = NULL;
int num_sets;
if (phase > 0) {
aggnode = (Agg*)list_nth(node->chain, phase - 1);
sortnode = (Sort*)aggnode->plan.lefttree;
Assert(IsA(sortnode, Sort));
if (aggnode->plan.lefttree) {
if (IsA(aggnode->plan.lefttree, Sort)) {
sortnode = castNode(Sort, aggnode->plan.lefttree);
} else if (IsA(aggnode->plan.lefttree, SortGroup)) {
sortGroupNode = castNode(SortGroup, aggnode->plan.lefttree);
}
}
} else {
aggnode = node;
sortnode = NULL;
}
phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
@ -2265,7 +2434,7 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
/*
* If we are grouping, precompute fmgr lookup data for inner loop.
*/
if (aggnode->aggstrategy == AGG_SORTED) {
if (aggnode->aggstrategy == AGG_SORTED || aggnode->aggstrategy == AGG_SORT_GROUP) {
Assert(aggnode->numCols > 0);
phasedata->eqfunctions = execTuplesMatchPrepare(aggnode->numCols, aggnode->grpOperators);
@ -2276,6 +2445,7 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
phasedata->aggstrategy = aggstate->aggstrategy;
}
phasedata->sortnode = sortnode;
phasedata->sortGroupNode = sortGroupNode;
}
/*