减少nodeagg算子初始化转移函数的次数,减少算子nodeagg的投影次数

Signed-off-by: 夏自豪 <xiazihao3@huawei.com>
This commit is contained in:
夏自豪
2023-02-25 08:19:45 +00:00
committed by x30033890
parent 2114241085
commit 1b8ab5a2e2
3 changed files with 117 additions and 69 deletions

View File

@ -131,6 +131,7 @@
#include "executor/executor.h"
#include "executor/node/nodeAgg.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
#include "optimizer/tlist.h"
@ -151,7 +152,7 @@ static TupleTableSlot* ExecAgg(PlanState* state);
static void initialize_aggregates(
AggState* aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup, int numReset = 0);
static void advance_transition_function(
AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, FunctionCallInfoData* fcinfo);
AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate);
static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup);
static void process_ordered_aggregate_single(
AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate);
@ -295,14 +296,14 @@ static void initialize_aggregate(AggState* aggstate, AggStatePerAgg peraggstate,
*/
if (peraggstate->numInputs == 1) {
peraggstate->sortstates[aggstate->current_set] =
tuplesort_begin_datum(peraggstate->evaldesc->attrs[0].atttypid,
tuplesort_begin_datum(peraggstate->sortdesc->attrs[0].atttypid,
peraggstate->sortOperators[0],
peraggstate->sortCollations[0],
peraggstate->sortNullsFirst[0],
local_work_mem,
false);
} else {
peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_heap(peraggstate->evaldesc,
peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_heap(peraggstate->sortdesc,
peraggstate->numSortCols,
peraggstate->sortColIdx,
peraggstate->sortOperators,
@ -410,27 +411,31 @@ static void initialize_aggregates(AggState* aggstate, AggStatePerAgg peragg, Agg
}
/*
* Given new input value(s), advance the transition function of an aggregate.
* Given new input value(s), advance the transition function of one aggregate
* state within one grouping set only (already set in aggstate->current_set)
*
* The new values (and null flags) have been preloaded into argument positions
* 1 and up in fcinfo, so that we needn't copy them again to pass to the
* transition function. No other fields of fcinfo are assumed valid.
* 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
* pass to the transition function. We also expect that the static fields of
* the fcinfo are already initialized; that was done by ExecInitAgg().
*
* It doesn't matter which memory context this is called in.
*/
static void advance_transition_function(
AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, FunctionCallInfoData* fcinfo)
AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate)
{
int numTransInputs = peraggstate->numTransInputs;
FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo;
MemoryContext oldContext;
Datum newVal;
int i;
if (peraggstate->transfn.fn_strict) {
/*
* For a strict transfn, nothing happens when there's a NULL input; we
* just keep the prior transValue.
*/
int numTransInputs = peraggstate->numTransInputs;
int i;
for (i = 1; i <= numTransInputs; i++) {
if (fcinfo->argnull[i])
return;
@ -617,19 +622,21 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup)
int setno = 0;
int numGroupingSets = Max(aggstate->phase->numsets, 1);
int numAggs = aggstate->numaggs;
TupleTableSlot *slot = aggstate->evalslot;
/* compute input for all aggregates */
if (aggstate->evalproj)
aggstate->evalslot = ExecProject(aggstate->evalproj, NULL);
for (aggno = 0; aggno < aggstate->numaggs; aggno++) {
AggStatePerAgg peraggstate = &aggstate->peragg[aggno];
int numTransInputs = peraggstate->numTransInputs;
int i;
TupleTableSlot* slot = NULL;
/* Evaluate the current input expressions for this aggregate */
slot = ExecProject(peraggstate->evalproj, NULL);
int inputoff = peraggstate->inputoff;
if (peraggstate->numSortCols > 0) {
/* DISTINCT and/or ORDER BY case */
Assert(slot->tts_nvalid == peraggstate->numInputs);
Assert(slot->tts_nvalid >= (peraggstate->numInputs + inputoff));
/*
* If the transfn is strict, we want to check for nullity before
@ -640,7 +647,7 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup)
*/
if (peraggstate->transfn.fn_strict) {
for (i = 0; i < numTransInputs; i++) {
if (slot->tts_isnull[i])
if (slot->tts_isnull[i + inputoff])
break;
}
if (i < numTransInputs)
@ -651,29 +658,33 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup)
/* OK, put the tuple into the tuplesort object */
if (peraggstate->numInputs == 1)
tuplesort_putdatum(peraggstate->sortstates[setno], slot->tts_values[0], slot->tts_isnull[0]);
else
tuplesort_puttupleslot(peraggstate->sortstates[setno], slot);
tuplesort_putdatum(peraggstate->sortstates[setno], slot->tts_values[inputoff],
slot->tts_isnull[inputoff]);
else {
errno_t errorno = EOK;
ExecClearTuple(peraggstate->sortslot);
errorno = memcpy_s(peraggstate->sortslot->tts_values, peraggstate->numInputs * sizeof(Datum),
&slot->tts_values[inputoff], peraggstate->numInputs * sizeof(Datum));
securec_check(errorno, "\0", "\0");
errorno = memcpy_s(peraggstate->sortslot->tts_isnull, peraggstate->numInputs * sizeof(bool),
&slot->tts_isnull[inputoff], peraggstate->numInputs * sizeof(bool));
securec_check(errorno, "\0", "\0");
peraggstate->sortslot->tts_nvalid = peraggstate->numInputs;
ExecStoreVirtualTuple(peraggstate->sortslot);
tuplesort_puttupleslot(peraggstate->sortstates[setno], peraggstate->sortslot);
}
}
} else {
/* We can apply the transition function immediately */
FunctionCallInfoData fcinfo;
/* Init FunctionCallInfoData for transition function before loading argument values. */
InitFunctionCallInfoData(fcinfo,
&(peraggstate->transfn),
numTransInputs + 1,
peraggstate->aggCollation,
(Node*)aggstate,
NULL);
FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo;
/* Load values into fcinfo */
/* Start from 1, since the 0th arg will be the transition value */
Assert(slot->tts_nvalid >= numTransInputs);
for (i = 0; i < numTransInputs; i++) {
fcinfo.arg[i + 1] = slot->tts_values[i];
fcinfo.argnull[i + 1] = slot->tts_isnull[i];
fcinfo.argTypes[i + 1] = InvalidOid;
fcinfo->arg[i + 1] = slot->tts_values[i + inputoff];
fcinfo->argnull[i + 1] = slot->tts_isnull[i + inputoff];
fcinfo->argTypes[i + 1] = InvalidOid;
}
for (setno = 0; setno < numGroupingSets; setno++) {
AggStatePerGroup pergroupstate = &pergroup[aggno + (setno * numAggs)];
@ -689,10 +700,10 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup)
* we are collecting results sent by the Datanodes, so advance
* collections instead of transitions
*/
advance_collection_function(aggstate, peraggstate, pergroupstate, &fcinfo);
advance_collection_function(aggstate, peraggstate, pergroupstate, fcinfo);
} else
#endif
advance_transition_function(aggstate, peraggstate, pergroupstate, &fcinfo);
advance_transition_function(aggstate, peraggstate, pergroupstate);
}
}
}
@ -731,22 +742,22 @@ static void process_ordered_aggregate_single(
bool isDistinct = (peraggstate->numDistinctCols > 0);
Datum* newVal = NULL;
bool* isNull = NULL;
FunctionCallInfoData fcinfo;
FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo;
Assert(peraggstate->numDistinctCols < 2);
tuplesort_performsort(peraggstate->sortstates[aggstate->current_set]);
/* Init FunctionCallInfoData for transition function before loading argument values. */
InitFunctionCallInfoData(fcinfo,
InitFunctionCallInfoData(*fcinfo,
&(peraggstate->transfn),
peraggstate->numArguments + 1,
peraggstate->aggCollation,
(Node*)aggstate,
NULL);
/* Load the column into argument 1 (arg 0 will be transition value) */
newVal = fcinfo.arg + 1;
isNull = fcinfo.argnull + 1;
newVal = fcinfo->arg + 1;
isNull = fcinfo->argnull + 1;
/*
* Note: if input type is pass-by-ref, the datums returned by the sort are
@ -773,7 +784,7 @@ static void process_ordered_aggregate_single(
if (!peraggstate->inputtypeByVal && !*isNull)
pfree(DatumGetPointer(*newVal));
} else {
advance_transition_function(aggstate, peraggstate, pergroupstate, &fcinfo);
advance_transition_function(aggstate, peraggstate, pergroupstate);
/* forget the old value, if any */
if (!oldIsNull && !peraggstate->inputtypeByVal)
pfree(DatumGetPointer(oldVal));
@ -806,8 +817,8 @@ static void process_ordered_aggregate_multi(
AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate)
{
MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
FunctionCallInfoData fcinfo;
TupleTableSlot* slot1 = peraggstate->evalslot;
FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo;
TupleTableSlot* slot1 = peraggstate->sortslot;
TupleTableSlot* slot2 = peraggstate->uniqslot;
int numTransInputs = peraggstate->numTransInputs;
int numDistinctCols = peraggstate->numDistinctCols;
@ -833,7 +844,7 @@ static void process_ordered_aggregate_multi(
!execTuplesMatch(
slot1, slot2, numDistinctCols, peraggstate->sortColIdx, peraggstate->equalfns, workcontext)) {
/* Init FunctionCallInfoData for transition function before loading argument values. */
InitFunctionCallInfoData(fcinfo,
InitFunctionCallInfoData(*fcinfo,
&(peraggstate->transfn),
numTransInputs + 1,
peraggstate->aggCollation,
@ -842,11 +853,11 @@ static void process_ordered_aggregate_multi(
/* Load values into fcinfo */
/* Start from 1, since the 0th arg will be the transition value */
for (i = 0; i < numTransInputs; i++) {
fcinfo.arg[i + 1] = slot1->tts_values[i];
fcinfo.argnull[i + 1] = slot1->tts_isnull[i];
fcinfo->arg[i + 1] = slot1->tts_values[i];
fcinfo->argnull[i + 1] = slot1->tts_isnull[i];
}
advance_transition_function(aggstate, peraggstate, pergroupstate, &fcinfo);
advance_transition_function(aggstate, peraggstate, pergroupstate);
if (numDistinctCols > 0) {
/* swap the slot pointers to retain the current tuple */
@ -1957,10 +1968,12 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
ExprContext* econtext = NULL;
int numaggs, aggno;
int phase;
List *combined_inputeval;
ListCell* l = NULL;
Bitmapset* all_grouped_cols = NULL;
int numGroupingSets = 1;
int numPhases;
int column_offset;
int currentsortno = 0;
int i = 0;
int j = 0;
@ -2434,6 +2447,7 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
}
#endif /* PGXC */
/* set up infrastructure for calling the transfn and finalfn */
fmgr_info(transfn_oid, &peraggstate->transfn);
fmgr_info_set_expr((Node*)transfnexpr, &peraggstate->transfn);
@ -2449,7 +2463,13 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
}
#endif /* PGXC */
peraggstate->aggCollation = aggref->inputcollid;
InitFunctionCallInfoData(peraggstate->transfn_fcinfo,
&peraggstate->transfn,
peraggstate->numTransInputs + 1,
peraggstate->aggCollation,
(Node*)aggstate,
NULL);
/* get info zbout relevant datatypes */
get_typlenbyval(aggref->aggtype, &peraggstate->resulttypeLen, &peraggstate->resulttypeByVal);
get_typlenbyval(aggtranstype, &peraggstate->transtypeLen, &peraggstate->transtypeByVal);
@ -2496,20 +2516,6 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
"aggregate %u needs to have compatible input type and transition type", aggref->aggfnoid)));
}
/*
* Get a tupledesc corresponding to the inputs (including sort
* expressions) of the agg.
*/
peraggstate->evaldesc = ExecTypeFromTL(aggref->args, false);
/* Create slot we're going to do argument evaluation in */
peraggstate->evalslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(peraggstate->evalslot, peraggstate->evaldesc);
/* Set up projection info for evaluation */
peraggstate->evalproj =
ExecBuildProjectionInfo(aggrefstate->args, aggstate->tmpcontext, peraggstate->evalslot, NULL);
/*
* If we're doing either DISTINCT or ORDER BY, then we have a list of
* SortGroupClause nodes; fish out the data in them and stick them
@ -2534,6 +2540,12 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
peraggstate->numDistinctCols = numDistinctCols;
if (numSortCols > 0) {
/* Get a tupledesc and slot corresponding to the aggregated inputs
* (including sort expressions) of the agg.
*/
peraggstate->sortdesc = ExecTypeFromTL(aggref->args, false);
peraggstate->sortslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(peraggstate->sortslot, peraggstate->sortdesc);
/*
* We don't implement DISTINCT or ORDER BY aggs in the HASHED case
* (yet)
@ -2546,7 +2558,7 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
} else if (numDistinctCols > 0) {
/* we will need an extra slot to store prior values */
peraggstate->uniqslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(peraggstate->uniqslot, peraggstate->evaldesc);
ExecSetSlotDescriptor(peraggstate->uniqslot, peraggstate->sortdesc);
}
/* Extract the sort information for use later */
@ -2596,6 +2608,32 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
/* Update numaggs to match number of unique aggregates found */
aggstate->numaggs = aggno + 1;
combined_inputeval = NIL;
column_offset = 0;
for (int transno =0;transno < aggstate->numaggs; transno++) {
AggStatePerAggData* pertrans = &peragg[transno];
ListCell *arg;
pertrans->inputoff = column_offset;
foreach(arg,pertrans->aggref->args)
{
TargetEntry *source_tle =(TargetEntry *)lfirst(arg);
TargetEntry *tle;
Assert(IsA(source_tle, TargetEntry));
tle = flatCopyTargetEntry(source_tle);
tle->resno += column_offset;
combined_inputeval = lappend(combined_inputeval, tle);
}
column_offset += list_length(pertrans->aggref->args);
}
aggstate->evaldesc = ExecTypeFromTL(combined_inputeval, false);
aggstate->evalslot = ExecInitExtraTupleSlot(estate);
combined_inputeval = (List *)ExecInitExpr((Expr *)combined_inputeval, (PlanState *)aggstate);
aggstate->evalproj = ExecBuildProjectionInfo(combined_inputeval, aggstate->tmpcontext, aggstate->evalslot, NULL);
ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc);
AggWriteFileControl* TempFilePara = (AggWriteFileControl*)palloc(sizeof(AggWriteFileControl));
TempFilePara->strategy = MEMORY_HASHAGG;