diff --git a/src/gausskernel/runtime/executor/nodeAgg.cpp b/src/gausskernel/runtime/executor/nodeAgg.cpp index 8a3b5c135..c64a09c25 100644 --- a/src/gausskernel/runtime/executor/nodeAgg.cpp +++ b/src/gausskernel/runtime/executor/nodeAgg.cpp @@ -131,6 +131,7 @@ #include "executor/executor.h" #include "executor/node/nodeAgg.h" #include "miscadmin.h" +#include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/clauses.h" #include "optimizer/tlist.h" @@ -151,7 +152,7 @@ static TupleTableSlot* ExecAgg(PlanState* state); static void initialize_aggregates( AggState* aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup, int numReset = 0); static void advance_transition_function( - AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, FunctionCallInfoData* fcinfo); + AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate); static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup); static void process_ordered_aggregate_single( AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate); @@ -295,14 +296,14 @@ static void initialize_aggregate(AggState* aggstate, AggStatePerAgg peraggstate, */ if (peraggstate->numInputs == 1) { peraggstate->sortstates[aggstate->current_set] = - tuplesort_begin_datum(peraggstate->evaldesc->attrs[0].atttypid, + tuplesort_begin_datum(peraggstate->sortdesc->attrs[0].atttypid, peraggstate->sortOperators[0], peraggstate->sortCollations[0], peraggstate->sortNullsFirst[0], local_work_mem, false); } else { - peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_heap(peraggstate->evaldesc, + peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_heap(peraggstate->sortdesc, peraggstate->numSortCols, peraggstate->sortColIdx, peraggstate->sortOperators, @@ -410,27 +411,31 @@ static void initialize_aggregates(AggState* aggstate, AggStatePerAgg peragg, Agg } /* - * Given new input value(s), advance the transition function of an aggregate. + * Given new input value(s), advance the transition function of one aggregate + * state within one grouping set only (already set in aggstate->current_set) * * The new values (and null flags) have been preloaded into argument positions - * 1 and up in fcinfo, so that we needn't copy them again to pass to the - * transition function. No other fields of fcinfo are assumed valid. + * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to + * pass to the transition function. We also expect that the static fields of + * the fcinfo are already initialized; that was done by ExecInitAgg(). * * It doesn't matter which memory context this is called in. */ static void advance_transition_function( - AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, FunctionCallInfoData* fcinfo) + AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate) { - int numTransInputs = peraggstate->numTransInputs; + FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo; MemoryContext oldContext; Datum newVal; - int i; if (peraggstate->transfn.fn_strict) { /* * For a strict transfn, nothing happens when there's a NULL input; we * just keep the prior transValue. */ + int numTransInputs = peraggstate->numTransInputs; + int i; + for (i = 1; i <= numTransInputs; i++) { if (fcinfo->argnull[i]) return; @@ -617,19 +622,21 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup) int setno = 0; int numGroupingSets = Max(aggstate->phase->numsets, 1); int numAggs = aggstate->numaggs; + TupleTableSlot *slot = aggstate->evalslot; + + /* compute input for all aggregates */ + if (aggstate->evalproj) + aggstate->evalslot = ExecProject(aggstate->evalproj, NULL); for (aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; int numTransInputs = peraggstate->numTransInputs; int i; - TupleTableSlot* slot = NULL; - - /* Evaluate the current input expressions for this aggregate */ - slot = ExecProject(peraggstate->evalproj, NULL); + int inputoff = peraggstate->inputoff; if (peraggstate->numSortCols > 0) { /* DISTINCT and/or ORDER BY case */ - Assert(slot->tts_nvalid == peraggstate->numInputs); + Assert(slot->tts_nvalid >= (peraggstate->numInputs + inputoff)); /* * If the transfn is strict, we want to check for nullity before @@ -640,7 +647,7 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup) */ if (peraggstate->transfn.fn_strict) { for (i = 0; i < numTransInputs; i++) { - if (slot->tts_isnull[i]) + if (slot->tts_isnull[i + inputoff]) break; } if (i < numTransInputs) @@ -651,29 +658,33 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup) /* OK, put the tuple into the tuplesort object */ if (peraggstate->numInputs == 1) - tuplesort_putdatum(peraggstate->sortstates[setno], slot->tts_values[0], slot->tts_isnull[0]); - else - tuplesort_puttupleslot(peraggstate->sortstates[setno], slot); + tuplesort_putdatum(peraggstate->sortstates[setno], slot->tts_values[inputoff], + slot->tts_isnull[inputoff]); + else { + errno_t errorno = EOK; + ExecClearTuple(peraggstate->sortslot); + errorno = memcpy_s(peraggstate->sortslot->tts_values, peraggstate->numInputs * sizeof(Datum), + &slot->tts_values[inputoff], peraggstate->numInputs * sizeof(Datum)); + securec_check(errorno, "\0", "\0"); + errorno = memcpy_s(peraggstate->sortslot->tts_isnull, peraggstate->numInputs * sizeof(bool), + &slot->tts_isnull[inputoff], peraggstate->numInputs * sizeof(bool)); + securec_check(errorno, "\0", "\0"); + peraggstate->sortslot->tts_nvalid = peraggstate->numInputs; + ExecStoreVirtualTuple(peraggstate->sortslot); + tuplesort_puttupleslot(peraggstate->sortstates[setno], peraggstate->sortslot); + } } } else { /* We can apply the transition function immediately */ - FunctionCallInfoData fcinfo; - - /* Init FunctionCallInfoData for transition function before loading argument values. */ - InitFunctionCallInfoData(fcinfo, - &(peraggstate->transfn), - numTransInputs + 1, - peraggstate->aggCollation, - (Node*)aggstate, - NULL); + FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo; /* Load values into fcinfo */ /* Start from 1, since the 0th arg will be the transition value */ Assert(slot->tts_nvalid >= numTransInputs); for (i = 0; i < numTransInputs; i++) { - fcinfo.arg[i + 1] = slot->tts_values[i]; - fcinfo.argnull[i + 1] = slot->tts_isnull[i]; - fcinfo.argTypes[i + 1] = InvalidOid; + fcinfo->arg[i + 1] = slot->tts_values[i + inputoff]; + fcinfo->argnull[i + 1] = slot->tts_isnull[i + inputoff]; + fcinfo->argTypes[i + 1] = InvalidOid; } for (setno = 0; setno < numGroupingSets; setno++) { AggStatePerGroup pergroupstate = &pergroup[aggno + (setno * numAggs)]; @@ -689,10 +700,10 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup) * we are collecting results sent by the Datanodes, so advance * collections instead of transitions */ - advance_collection_function(aggstate, peraggstate, pergroupstate, &fcinfo); + advance_collection_function(aggstate, peraggstate, pergroupstate, fcinfo); } else #endif - advance_transition_function(aggstate, peraggstate, pergroupstate, &fcinfo); + advance_transition_function(aggstate, peraggstate, pergroupstate); } } } @@ -731,22 +742,22 @@ static void process_ordered_aggregate_single( bool isDistinct = (peraggstate->numDistinctCols > 0); Datum* newVal = NULL; bool* isNull = NULL; - FunctionCallInfoData fcinfo; + FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo; Assert(peraggstate->numDistinctCols < 2); tuplesort_performsort(peraggstate->sortstates[aggstate->current_set]); /* Init FunctionCallInfoData for transition function before loading argument values. */ - InitFunctionCallInfoData(fcinfo, + InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), peraggstate->numArguments + 1, peraggstate->aggCollation, (Node*)aggstate, NULL); /* Load the column into argument 1 (arg 0 will be transition value) */ - newVal = fcinfo.arg + 1; - isNull = fcinfo.argnull + 1; + newVal = fcinfo->arg + 1; + isNull = fcinfo->argnull + 1; /* * Note: if input type is pass-by-ref, the datums returned by the sort are @@ -773,7 +784,7 @@ static void process_ordered_aggregate_single( if (!peraggstate->inputtypeByVal && !*isNull) pfree(DatumGetPointer(*newVal)); } else { - advance_transition_function(aggstate, peraggstate, pergroupstate, &fcinfo); + advance_transition_function(aggstate, peraggstate, pergroupstate); /* forget the old value, if any */ if (!oldIsNull && !peraggstate->inputtypeByVal) pfree(DatumGetPointer(oldVal)); @@ -806,8 +817,8 @@ static void process_ordered_aggregate_multi( AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate) { MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; - FunctionCallInfoData fcinfo; - TupleTableSlot* slot1 = peraggstate->evalslot; + FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo; + TupleTableSlot* slot1 = peraggstate->sortslot; TupleTableSlot* slot2 = peraggstate->uniqslot; int numTransInputs = peraggstate->numTransInputs; int numDistinctCols = peraggstate->numDistinctCols; @@ -833,7 +844,7 @@ static void process_ordered_aggregate_multi( !execTuplesMatch( slot1, slot2, numDistinctCols, peraggstate->sortColIdx, peraggstate->equalfns, workcontext)) { /* Init FunctionCallInfoData for transition function before loading argument values. */ - InitFunctionCallInfoData(fcinfo, + InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), numTransInputs + 1, peraggstate->aggCollation, @@ -842,11 +853,11 @@ static void process_ordered_aggregate_multi( /* Load values into fcinfo */ /* Start from 1, since the 0th arg will be the transition value */ for (i = 0; i < numTransInputs; i++) { - fcinfo.arg[i + 1] = slot1->tts_values[i]; - fcinfo.argnull[i + 1] = slot1->tts_isnull[i]; + fcinfo->arg[i + 1] = slot1->tts_values[i]; + fcinfo->argnull[i + 1] = slot1->tts_isnull[i]; } - advance_transition_function(aggstate, peraggstate, pergroupstate, &fcinfo); + advance_transition_function(aggstate, peraggstate, pergroupstate); if (numDistinctCols > 0) { /* swap the slot pointers to retain the current tuple */ @@ -1957,10 +1968,12 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) ExprContext* econtext = NULL; int numaggs, aggno; int phase; + List *combined_inputeval; ListCell* l = NULL; Bitmapset* all_grouped_cols = NULL; int numGroupingSets = 1; int numPhases; + int column_offset; int currentsortno = 0; int i = 0; int j = 0; @@ -2434,6 +2447,7 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) } #endif /* PGXC */ + /* set up infrastructure for calling the transfn and finalfn */ fmgr_info(transfn_oid, &peraggstate->transfn); fmgr_info_set_expr((Node*)transfnexpr, &peraggstate->transfn); @@ -2449,7 +2463,13 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) } #endif /* PGXC */ peraggstate->aggCollation = aggref->inputcollid; - + InitFunctionCallInfoData(peraggstate->transfn_fcinfo, + &peraggstate->transfn, + peraggstate->numTransInputs + 1, + peraggstate->aggCollation, + (Node*)aggstate, + NULL); + /* get info zbout relevant datatypes */ get_typlenbyval(aggref->aggtype, &peraggstate->resulttypeLen, &peraggstate->resulttypeByVal); get_typlenbyval(aggtranstype, &peraggstate->transtypeLen, &peraggstate->transtypeByVal); @@ -2496,20 +2516,6 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) "aggregate %u needs to have compatible input type and transition type", aggref->aggfnoid))); } - /* - * Get a tupledesc corresponding to the inputs (including sort - * expressions) of the agg. - */ - peraggstate->evaldesc = ExecTypeFromTL(aggref->args, false); - - /* Create slot we're going to do argument evaluation in */ - peraggstate->evalslot = ExecInitExtraTupleSlot(estate); - ExecSetSlotDescriptor(peraggstate->evalslot, peraggstate->evaldesc); - - /* Set up projection info for evaluation */ - peraggstate->evalproj = - ExecBuildProjectionInfo(aggrefstate->args, aggstate->tmpcontext, peraggstate->evalslot, NULL); - /* * If we're doing either DISTINCT or ORDER BY, then we have a list of * SortGroupClause nodes; fish out the data in them and stick them @@ -2534,6 +2540,12 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) peraggstate->numDistinctCols = numDistinctCols; if (numSortCols > 0) { + /* Get a tupledesc and slot corresponding to the aggregated inputs + * (including sort expressions) of the agg. + */ + peraggstate->sortdesc = ExecTypeFromTL(aggref->args, false); + peraggstate->sortslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(peraggstate->sortslot, peraggstate->sortdesc); /* * We don't implement DISTINCT or ORDER BY aggs in the HASHED case * (yet) @@ -2546,7 +2558,7 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) } else if (numDistinctCols > 0) { /* we will need an extra slot to store prior values */ peraggstate->uniqslot = ExecInitExtraTupleSlot(estate); - ExecSetSlotDescriptor(peraggstate->uniqslot, peraggstate->evaldesc); + ExecSetSlotDescriptor(peraggstate->uniqslot, peraggstate->sortdesc); } /* Extract the sort information for use later */ @@ -2596,6 +2608,32 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) /* Update numaggs to match number of unique aggregates found */ aggstate->numaggs = aggno + 1; + combined_inputeval = NIL; + column_offset = 0; + for (int transno =0;transno < aggstate->numaggs; transno++) { + AggStatePerAggData* pertrans = &peragg[transno]; + ListCell *arg; + + pertrans->inputoff = column_offset; + foreach(arg,pertrans->aggref->args) + { + TargetEntry *source_tle =(TargetEntry *)lfirst(arg); + TargetEntry *tle; + + Assert(IsA(source_tle, TargetEntry)); + tle = flatCopyTargetEntry(source_tle); + tle->resno += column_offset; + + combined_inputeval = lappend(combined_inputeval, tle); + } + column_offset += list_length(pertrans->aggref->args); + } + + aggstate->evaldesc = ExecTypeFromTL(combined_inputeval, false); + aggstate->evalslot = ExecInitExtraTupleSlot(estate); + combined_inputeval = (List *)ExecInitExpr((Expr *)combined_inputeval, (PlanState *)aggstate); + aggstate->evalproj = ExecBuildProjectionInfo(combined_inputeval, aggstate->tmpcontext, aggstate->evalslot, NULL); + ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc); AggWriteFileControl* TempFilePara = (AggWriteFileControl*)palloc(sizeof(AggWriteFileControl)); TempFilePara->strategy = MEMORY_HASHAGG; diff --git a/src/include/executor/node/nodeAgg.h b/src/include/executor/node/nodeAgg.h index b00884f74..f1276a45c 100644 --- a/src/include/executor/node/nodeAgg.h +++ b/src/include/executor/node/nodeAgg.h @@ -79,7 +79,8 @@ typedef struct AggStatePerAggData { /* number of inputs including ORDER BY expressions */ int numInputs; - + /* offset of input columns in AggState->evalslot */ + int inputoff; bool is_avg; /* @@ -151,19 +152,19 @@ typedef struct AggStatePerAggData { bool inputtypeByVal, resulttypeByVal, transtypeByVal; /* - * Stuff for evaluation of inputs. We used to just use ExecEvalExpr, but - * with the addition of ORDER BY we now need at least a slot for passing - * data to the sort object, which requires a tupledesc, so we might as - * well go whole hog and use ExecProject too. + * Stuff for evaluation of aggregate inputs in cases where the aggregate + * requires sorted input. The arguments themselves will be evaluated via + * AggState->evalslot/evalproj for all aggregates at once, but we only + * want to sort the relevant columns for individual aggregates. */ - TupleDesc evaldesc; /* descriptor of input tuples */ - ProjectionInfo* evalproj; /* projection machinery */ + TupleDesc sortdesc; /* descriptor of input tuples */ /* * Slots for holding the evaluated input arguments. These are set up - * during ExecInitAgg() and then used for each input row. + * during ExecInitAgg() and then used for each input row requiring + * procesessing besides what's done in AggState->evalproj. */ - TupleTableSlot* evalslot; /* current input tuple */ + TupleTableSlot *sortslot; /* current input tuple */ TupleTableSlot* uniqslot; /* used for multi-column DISTINCT */ /* @@ -185,9 +186,14 @@ typedef struct AggStatePerAggData { * This field is a pre-initialized FunctionCallInfo struct used for * calling this aggregate's transfn. We save a few cycles per row by not * re-initializing the unchanging fields; which isn't much, but it seems - * worth the extra space consumption. + * worth the extra space consumption. cached for transhfn and collectfn now. */ FunctionCallInfoData transfn_fcinfo; + + /* XXX: use for vector engine now, better remove later*/ + TupleDesc evaldesc; /* descriptor of input tuples */ + ProjectionInfo *evalproj; /* projection machinery */ + TupleTableSlot *evalslot; /* current input tuple */ } AggStatePerAggData; /* diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 7a0c55919..78fe759e0 100755 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2388,6 +2388,10 @@ typedef struct AggState { #endif /* PGXC */ void* aggTempFileControl; FmgrInfo* eqfunctions; /* per-grouping-field equality fns */ + /* support for evaluation of agg inputs */ + TupleTableSlot *evalslot; /* slot for agg inputs */ + ProjectionInfo *evalproj; /* projection machinery */ + TupleDesc evaldesc; /* descriptor of input tuples */ } AggState; /* ----------------