/* ------------------------------------------------------------------------- * * nodeAgg.cpp * Routines to handle aggregate nodes. * * ExecAgg evaluates each aggregate in the following steps: * * transvalue = initcond * foreach input_tuple do * transvalue = transfunc(transvalue, input_value(s)) * result = finalfunc(transvalue, direct_argument(s)) * * If a finalfunc is not supplied then the result is just the ending * value of transvalue. * * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the * input tuples and eliminate duplicates (if required) before performing * the above-depicted process. While for ordered-set aggregate, their "Order * by" inputs are their aggregate arguments, so we can do the sort job at * the end. * * For normal aggregates: * If transfunc is marked "strict" in pg_proc and initcond is NULL, * then the first non-NULL input_value is assigned directly to transvalue, * and transfunc isn't applied until the second non-NULL input_value. * The agg's first input type and transtype must be the same in this case! * * If transfunc is marked "strict" then NULL input_values are skipped, * keeping the previous transvalue. If transfunc is not strict then it * is called for every input tuple and must deal with NULL initcond * or NULL input_values for itself. * * If finalfunc is marked "strict" then it is not called when the * ending transvalue is NULL, instead a NULL result is created * automatically (this is just the usual handling of strict functions, * of course). A non-strict finalfunc can make its own choice of * what to return for a NULL ending transvalue. * * For Ordered-set aggregates: * We pass both "direct" arguments and transition value to the finalfunc. * NULL placeholders are also provided as the remaining finalfunc arguments, * which correspond to the aggregated expressions. (These arguments are * useless at runtime, but may be needed to deal with a polymorphic * aggregate's result type.) * * We compute aggregate input expressions and run the transition functions * in a temporary econtext (aggstate->tmpcontext). This is reset at * least once per input tuple, so when the transvalue datatype is * pass-by-reference, we have to be careful to copy it into a longer-lived * memory context, and free the prior value to avoid memory leakage. We * store transvalues in another set of econtexts, aggstate->aggcontexts * (one per grouping set, see below), which are also used for the hashtable * structures in AGG_HASHED mode. These econtexts are rescanned, not just * reset, at group boundaries so that aggregate transition functions can * register shutdown callbacks via AggRegisterCallback. * * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to * run finalize functions and compute the output tuple; this context can be * reset once per output tuple. * * The executor's AggState node is passed as the fmgr "context" value in * all transfunc and finalfunc calls. It is not recommended that the * transition functions look at the AggState node directly, but they can * use AggCheckCallContext() to verify that they are being called by * nodeAgg.c (and not as ordinary SQL functions). The main reason a * transition function might want to know this is so that it can avoid * palloc'ing a fixed-size pass-by-ref transition value on every call: * it can instead just scribble on and return its left input. Ordinarily * it is completely forbidden for functions to modify pass-by-ref inputs, * but in the aggregate case we know the left input is either the initial * transition value or a previous function result, and in either case its * value need not be preserved. See int8inc() for an example. Notice that * advance_transition_function() is coded to avoid a data copy step when * the previous transition value pointer is returned. Also, some * transition functions want to store working state in addition to the * nominal transition value; they can use the memory context returned by * AggCheckCallContext() to do that. * * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The * AggState is available as context in earlier releases (back to 8.1), * but direct examination of the node is needed to use it before 9.0. * * Grouping sets: * * A list of grouping sets which is structurally equivalent to a ROLLUP * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over * ordered data. We do this by keeping a separate set of transition values * for each grouping set being concurrently processed; for each input tuple * we update them all, and on group boundaries we reset those states * (starting at the front of the list) whose grouping values have changed * (the list of grouping sets is ordered from most specific to least * specific). * * Where more complex grouping sets are used, we break them down into * "phases", where each phase has a different sort order. During each * phase but the last, the input tuples are additionally stored in a * tuplesort which is keyed to the next phase's sort order; during each * phase but the first, the input tuples are drawn from the previously * sorted data. (The sorting of the data for the first phase is handled by * the planner, as it might be satisfied by underlying nodes.) * * From the perspective of aggregate transition and final functions, the * only issue regarding grouping sets is this: a single call site (flinfo) * of an aggregate function may be used for updating several different * transition values in turn. So the function must not cache in the flinfo * anything which logically belongs as part of the transition value (most * importantly, the memory context in which the transition value exists). * The support API functions (AggCheckCallContext, AggRegisterCallback) are * sensitive to the grouping set for which the aggregate function is * currently being called. * * AGG_HASHED doesn't support multiple grouping sets yet. * * Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd. * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * src/gausskernel/runtime/executor/nodeAgg.cpp * * ------------------------------------------------------------------------- */ #include "postgres.h" #include "knl/knl_variable.h" #include "access/tableam.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "executor/executor.h" #include "executor/node/nodeAgg.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/clauses.h" #include "optimizer/tlist.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "pgxc/pgxc.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" #include "utils/tuplesort.h" #include "utils/datum.h" #include "utils/memprot.h" #include "workload/workload.h" static TupleTableSlot* ExecAgg(PlanState* state); static void initialize_aggregates( AggState* aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup, int numReset = 0); static void advance_transition_function( AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate); static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup); static void process_ordered_aggregate_single( AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate); static void process_ordered_aggregate_multi( AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate); static void finalize_aggregate(AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, Datum* resultVal, bool* resultIsNull); static void prepare_projection_slot(AggState* aggstate, TupleTableSlot* slot, int currentSet); static void finalize_aggregates(AggState* aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup, int currentSet); static TupleTableSlot* project_aggregates(AggState* aggstate); static Bitmapset* find_unaggregated_cols(AggState* aggstate); static bool find_unaggregated_cols_walker(Node* node, Bitmapset** colnos); static void build_hash_table(AggState* aggstate); static AggHashEntry lookup_hash_entry(AggState* aggstate, TupleTableSlot* inputslot); static TupleTableSlot* agg_retrieve_direct(AggState* aggstate); static void agg_fill_hash_table(AggState* aggstate); static TupleTableSlot* agg_retrieve_hash_table(AggState* aggstate); static TupleTableSlot* agg_retrieve(AggState* node); static TupleTableSlot* agg_sort_group_retrieve_direct(AggState* aggstate); static bool prepare_data_source(AggState* node); static TupleTableSlot* fetch_input_tuple(AggState* aggstate); static void exec_lookups_agg(AggState *aggstate, Agg *node, EState *estate); static void initialize_aggregate_flattened(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroup); static void initialize_aggregates_flattened(AggState *aggstate, AggStatePerGroup pergroup, int numReset = 0); static void advance_transition_function_flattened(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate); static void advance_aggregates_flattened(AggState *aggstate, AggStatePerGroup pergroup); static void process_ordered_aggregate_single_flattened(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate); static void process_ordered_aggregate_multi_flattened(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate); static void finalize_aggregate_flattened(AggState *aggstate, AggStatePerAggForFlattenedExpr peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull); static void finalize_aggregates_flattened(AggState *aggstate, AggStatePerAggForFlattenedExpr peraggs, AggStatePerGroup pergroup, int currentSet); static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggsate, EState *estate, Aggref *aggref, Oid aggtransfn, Oid aggtranstype, Datum initValue, bool initValueIsNull, Oid *inputTypes, int numArguments, bool isInitNumericSum); static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, int lastaggno, List **same_input_transnos); static int find_compatible_pertrans(AggState *aggstate, Oid aggfnOid, Oid *aggtransfnOid, Oid *aggtranstype, Datum initValue, bool *initValueIsNull, List *possible_matches); static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *estate); static void exec_agg_finalfn_init(AggState *aggstate, Agg *node, AggStatePerAggForFlattenedExpr peragg, AggStatePerTrans pertrans, Oid *input_types, int num_arguments); /* * Switch to phase "newphase", which must either be 0 (to reset) or * current_phase + 1. Juggle the tuplesorts accordingly. */ void initialize_phase(AggState* aggstate, int newphase) { Assert(newphase == 0 || newphase == aggstate->current_phase + 1); /* * Whatever the previous state, we're now done with whatever input * tuplesort was in use. */ if (aggstate->sort_in) { tuplesort_end(aggstate->sort_in); aggstate->sort_in = NULL; } if (newphase == 0) { /* * Discard any existing output tuplesort. */ if (aggstate->sort_out) { tuplesort_end(aggstate->sort_out); aggstate->sort_out = NULL; } } else { /* * The old output tuplesort becomes the new input one, and this is the * right time to actually sort it. */ aggstate->sort_in = aggstate->sort_out; aggstate->sort_out = NULL; Assert(aggstate->sort_in); tuplesort_performsort(aggstate->sort_in); } /* * If this isn't the last phase, we need to sort appropriately for the * next phase in sequence. */ if (newphase < aggstate->numphases - 1) { Sort* sortnode = aggstate->phases[newphase + 1].sortnode; PlanState* outerNode = outerPlanState(aggstate); TupleDesc tupDesc = ExecGetResultType(outerNode); Plan* plan = aggstate->ss.ps.plan; int64 workMem = SET_NODEMEM(plan->operatorMemKB[0], plan->dop); int64 maxMem = (plan->operatorMaxMem > 0) ? SET_NODEMEM(plan->operatorMaxMem, plan->dop) : 0; aggstate->sort_out = tuplesort_begin_heap(tupDesc, sortnode->numCols, sortnode->sortColIdx, sortnode->sortOperators, sortnode->collations, sortnode->nullsFirst, workMem, false, maxMem, sortnode->plan.plan_node_id, SET_DOP(sortnode->plan.dop)); } aggstate->current_phase = newphase; aggstate->phase = &aggstate->phases[newphase]; } Datum get_bit_and_initval(Oid aggtranstype, int typmod) { Oid typinput; Oid typioparam; char* strInitVal = NULL; Datum initVal; errno_t rc; getTypeInputInfo(aggtranstype, &typinput, &typioparam); int initValLen = typmod - (int)VARHDRSZ; int charsPerByte = 2; size_t strLen = (initValLen + 1) * charsPerByte + 1; // +2 for "\x" and +1 for '\0' strInitVal = (char*)palloc(strLen * sizeof(char)); strInitVal[0] = '\\'; strInitVal[1] = 'x'; strInitVal[strLen - 1] = '\0'; rc = memset_s(strInitVal + charsPerByte, initValLen * charsPerByte, 'F', initValLen * charsPerByte); securec_check(rc, "\0", "\0"); initVal = OidInputFunctionCall(typinput, strInitVal, typioparam, -1); pfree_ext(strInitVal); return initVal; } bool is_binary_type_in_dolphin(Oid typeOid) { if (!u_sess->attr.attr_sql.dolphin) { return false; } return (typeOid == get_typeoid(PG_CATALOG_NAMESPACE, "binary")) || (typeOid == get_typeoid(PG_CATALOG_NAMESPACE, "varbinary")); } /* * Fetch a tuple from either the outer plan (for phase 0) or from the sorter * populated by the previous phase. Copy it to the sorter for the next phase * if any. */ static TupleTableSlot* fetch_input_tuple(AggState* aggstate) { TupleTableSlot* slot = NULL; if (aggstate->sort_in) { /* make sure we check for interrupts in either path through here */ CHECK_FOR_INTERRUPTS(); if (!tuplesort_gettupleslot(aggstate->sort_in, true, aggstate->sort_slot, NULL)) return NULL; slot = aggstate->sort_slot; } else slot = ExecProcNode(outerPlanState(aggstate)); if (!TupIsNull(slot) && aggstate->sort_out) tuplesort_puttupleslot(aggstate->sort_out, slot); return slot; } /* * (Re)Initialize an individual aggregate. * * This function handles only one grouping set (already set in * aggstate->current_set). * * When called, CurrentMemoryContext should be the per-query context. */ static void initialize_aggregate(AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate) { Plan* plan = aggstate->ss.ps.plan; int64 local_work_mem = SET_NODEMEM(plan->operatorMemKB[0], plan->dop); int64 max_mem = (plan->operatorMaxMem > 0) ? SET_NODEMEM(plan->operatorMaxMem, plan->dop) : 0; /* * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate. */ if (peraggstate->numSortCols > 0) { /* * In case of rescan, maybe there could be an uncompleted sort * operation? Clean it up if so. */ if (peraggstate->sortstates[aggstate->current_set]) tuplesort_end(peraggstate->sortstates[aggstate->current_set]); /* * We use a plain Datum sorter when there's a single input column; * otherwise sort the full tuple. (See comments for * process_ordered_aggregate_single.) */ if (peraggstate->numInputs == 1) { peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_datum(peraggstate->sortdesc->attrs[0].atttypid, peraggstate->sortOperators[0], peraggstate->sortCollations[0], peraggstate->sortNullsFirst[0], local_work_mem, false); } else { peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_heap(peraggstate->sortdesc, peraggstate->numSortCols, peraggstate->sortColIdx, peraggstate->sortOperators, peraggstate->sortCollations, peraggstate->sortNullsFirst, local_work_mem, false, max_mem, plan->plan_node_id, SET_DOP(plan->dop)); } } /* * (Re)set transValue to the initial value. * * Note that when the initial value is pass-by-ref, we must copy it * (into the aggcontext) since we will pfree the transValue later. */ if (peraggstate->initValueIsNull) pergroupstate->transValue = peraggstate->initValue; else { MemoryContext oldContext; oldContext = MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); pergroupstate->transValue = datumCopy(peraggstate->initValue, peraggstate->transtypeByVal, peraggstate->transtypeLen); MemoryContextSwitchTo(oldContext); } pergroupstate->transValueIsNull = peraggstate->initValueIsNull; /* * If the initial value for the transition state doesn't exist in the * pg_aggregate table then we will let the first non-NULL value * returned from the outer procNode become the initial value. (This is * useful for aggregates like max() and min().) The noTransValue flag * signals that we still need to do this. */ pergroupstate->noTransValue = peraggstate->initValueIsNull; #ifdef PGXC /* * (Re)set collectValue to the initial value. * * Note that when the initial value is pass-by-ref, we must copy it * (into the aggcontext) since we will pfree the collectValue later. * collection type is same as transition type. */ if (peraggstate->initCollectValueIsNull) pergroupstate->collectValue = peraggstate->initCollectValue; else { MemoryContext oldContext; oldContext = MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); pergroupstate->collectValue = datumCopy(peraggstate->initCollectValue, peraggstate->transtypeByVal, peraggstate->transtypeLen); MemoryContextSwitchTo(oldContext); } pergroupstate->collectValueIsNull = peraggstate->initCollectValueIsNull; /* * If the initial value for the transition state doesn't exist in the * pg_aggregate table then we will let the first non-NULL value * returned from the outer procNode become the initial value. (This is * useful for aggregates like max() and min().) The noTransValue flag * signals that we still need to do this. */ pergroupstate->noCollectValue = peraggstate->initCollectValueIsNull; #endif /* PGXC */ } /* * Initialize all aggregates for a new group of input values. * * If there are multiple grouping sets, we initialize only the first numReset * of them (the grouping sets are ordered so that the most specific one, which * is reset most often, is first). As a convenience, if numReset is < 1, we * reinitialize all sets. * * When called, CurrentMemoryContext should be the per-query context. */ static void initialize_aggregates(AggState* aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup, int numReset) { int aggno; int numGroupingSets = Max(aggstate->phase->numsets, 1); int setno = 0; if (numReset < 1) { numReset = numGroupingSets; } for (aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAgg peraggstate = &peragg[aggno]; for (setno = 0; setno < numReset; setno++) { AggStatePerGroup pergroupstate; pergroupstate = &pergroup[aggno + (setno * (aggstate->numaggs))]; aggstate->current_set = setno; initialize_aggregate(aggstate, peraggstate, pergroupstate); } } } /* * Given new input value(s), advance the transition function of one aggregate * state within one grouping set only (already set in aggstate->current_set) * * The new values (and null flags) have been preloaded into argument positions * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to * pass to the transition function. We also expect that the static fields of * the fcinfo are already initialized; that was done by ExecInitAgg(). * * It doesn't matter which memory context this is called in. */ static void advance_transition_function( AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate) { FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo; MemoryContext oldContext; Datum newVal; if (peraggstate->transfn.fn_strict) { /* * For a strict transfn, nothing happens when there's a NULL input; we * just keep the prior transValue. */ int numTransInputs = peraggstate->numTransInputs; int i; for (i = 1; i <= numTransInputs; i++) { Oid aggtranstype = peraggstate->aggref->aggtrantype; ListCell* arg = list_head(peraggstate->aggref->args); TargetEntry *tle = (TargetEntry *)lfirst(arg); if (fcinfo->argnull[i] && strcmp(get_func_name(peraggstate->aggref->aggfnoid), "bit_and") == 0 && is_binary_type_in_dolphin(aggtranstype) && pergroupstate->transValueIsNull && IsA(tle->expr, Var)) { Var* var = (Var*)tle->expr; pergroupstate->transValue = get_bit_and_initval(aggtranstype, var->vartypmod); pergroupstate->transValueIsNull = false; return; } else if (fcinfo->argnull[i]) { return; } } if (pergroupstate->noTransValue) { /* * transValue has not been initialized. This is the first non-NULL * input value. We use it as the initial value for transValue. (We * already checked that the agg's input type is binary-compatible * with its transtype, so straight copy here is OK.) * * We must copy the datum into aggcontext if it is pass-by-ref. We * do not need to pfree the old transValue, since it's NULL. */ oldContext = MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); pergroupstate->transValue = datumCopy(fcinfo->arg[1], peraggstate->transtypeByVal, peraggstate->transtypeLen); pergroupstate->transValueIsNull = false; pergroupstate->noTransValue = false; MemoryContextSwitchTo(oldContext); return; } if (pergroupstate->transValueIsNull) { /* * Don't call a strict function with NULL inputs. Note it is * possible to get here despite the above tests, if the transfn is * strict *and* returned a NULL on a prior cycle. If that happens * we will propagate the NULL all the way to the end. */ return; } } /* We run the transition functions in per-input-tuple memory context */ oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); /* set up aggstate->curperagg to allow get aggref */ aggstate->curperagg = peraggstate; /* * OK to call the transition function */ fcinfo->arg[0] = pergroupstate->transValue; fcinfo->argnull[0] = pergroupstate->transValueIsNull; fcinfo->argTypes[0] = InvalidOid; fcinfo->isnull = false; /* just in case transfn doesn't set it */ fcinfo->can_ignore = aggstate->ss.ps.state->es_plannedstmt->hasIgnore; Node *origin_fcxt = fcinfo->context; if (IS_PGXC_DATANODE && peraggstate->is_avg) { Node *fcontext = (Node *)palloc0(sizeof(Node)); fcontext->type = (NodeTag)(peraggstate->is_avg); fcinfo->context = fcontext; } newVal = FunctionCallInvoke(fcinfo); aggstate->curperagg = NULL; fcinfo->context = origin_fcxt; /* * If pass-by-ref datatype, must copy the new value into aggcontext and * pfree the prior transValue. But if transfn returned a pointer to its * first input, we don't need to do anything. */ if (!peraggstate->transtypeByVal && DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue)) { if (!fcinfo->isnull) { MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); } if (!pergroupstate->transValueIsNull) pfree(DatumGetPointer(pergroupstate->transValue)); } pergroupstate->transValue = newVal; pergroupstate->transValueIsNull = fcinfo->isnull; MemoryContextSwitchTo(oldContext); } #ifdef PGXC /* * Given new input value(s), advance the collection function of an aggregate. * * The new values (and null flags) have been preloaded into argument positions * 1 and up in fcinfo, so that we needn't copy them again to pass to the * collection function. No other fields of fcinfo are assumed valid. * * It doesn't matter which memory context this is called in. */ static void advance_collection_function( AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, FunctionCallInfoData* fcinfo) { int numArguments = peraggstate->numArguments; Datum newVal; MemoryContext oldContext; Assert(OidIsValid(peraggstate->collectfn.fn_oid)); /* * numArgument has to be one, since each Datanode is going to send a single * transition value */ Assert(numArguments == 1); if (peraggstate->collectfn.fn_strict) { int cntArgs; /* * For a strict collectfn, nothing happens when there's a NULL input; we * just keep the prior transition value, transValue. */ for (cntArgs = 1; cntArgs <= numArguments; cntArgs++) { if (fcinfo->argnull[cntArgs]) return; } if (pergroupstate->noCollectValue) { /* * collection result has not been initialized. This is the first non-NULL * transition value. We use it as the initial value for collectValue. * Aggregate's transition and collection type are same * We must copy the datum into result if it is pass-by-ref. We * do not need to pfree the old result, since it's NULL. */ oldContext = MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); pergroupstate->collectValue = datumCopy(fcinfo->arg[1], peraggstate->transtypeByVal, peraggstate->transtypeLen); pergroupstate->collectValueIsNull = false; pergroupstate->noCollectValue = false; MemoryContextSwitchTo(oldContext); return; } if (pergroupstate->collectValueIsNull) { /* * Don't call a strict function with NULL inputs. Note it is * possible to get here despite the above tests, if the collectfn is * strict *and* returned a NULL on a prior cycle. If that happens * we will propagate the NULL all the way to the end. */ return; } } /* We run the collection functions in per-input-tuple memory context */ oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); /* * OK to call the collection function */ InitFunctionCallInfoData(*fcinfo, &(peraggstate->collectfn), 2, peraggstate->aggCollation, (Node*)aggstate, NULL); fcinfo->arg[0] = pergroupstate->collectValue; fcinfo->argnull[0] = pergroupstate->collectValueIsNull; fcinfo->argTypes[0] = InvalidOid; newVal = FunctionCallInvoke(fcinfo); /* * If pass-by-ref datatype, must copy the new value into aggcontext and * pfree the prior transValue. But if collectfn returned a pointer to its * first input, we don't need to do anything. */ if (!peraggstate->transtypeByVal && DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->collectValue)) { if (!fcinfo->isnull) { MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); } if (!pergroupstate->collectValueIsNull) pfree(DatumGetPointer(pergroupstate->collectValue)); } pergroupstate->collectValue = newVal; pergroupstate->collectValueIsNull = fcinfo->isnull; MemoryContextSwitchTo(oldContext); } #endif /* PGXC */ /* * Advance all the aggregates for one input tuple. The input tuple * has been stored in tmpcontext->ecxt_outertuple, so that it is accessible * to ExecEvalExpr. pergroup is the array of per-group structs to use * (this might be in a hashtable entry). * * When called, CurrentMemoryContext should be the per-query context. */ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup) { int aggno; int setno = 0; int numGroupingSets = Max(aggstate->phase->numsets, 1); int numAggs = aggstate->numaggs; TupleTableSlot *slot = aggstate->evalslot; /* compute input for all aggregates */ if (aggstate->evalproj) aggstate->evalslot = ExecProject(aggstate->evalproj, NULL); for (aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; int numTransInputs = peraggstate->numTransInputs; int i; int inputoff = peraggstate->inputoff; if (peraggstate->aggrefstate->aggfilter) { bool eisnull = false; Datum exprValue = ExecEvalExpr(peraggstate->aggrefstate->aggfilter, aggstate->evalproj->pi_exprContext, &eisnull, NULL); if (eisnull || !DatumGetBool(exprValue)) continue; } if (peraggstate->numSortCols > 0) { /* DISTINCT and/or ORDER BY case */ Assert(slot->tts_nvalid >= (peraggstate->numInputs + inputoff)); /* * If the transfn is strict, we want to check for nullity before * storing the row in the sorter, to save space if there are a lot * of nulls. Note that we must only check numArguments columns, * not numInputs, since nullity in columns used only for sorting * is not relevant here. */ if (peraggstate->transfn.fn_strict) { for (i = 0; i < numTransInputs; i++) { if (slot->tts_isnull[i + inputoff]) break; } if (i < numTransInputs) continue; } for (setno = 0; setno < numGroupingSets; setno++) { /* OK, put the tuple into the tuplesort object */ if (peraggstate->numInputs == 1) tuplesort_putdatum(peraggstate->sortstates[setno], slot->tts_values[inputoff], slot->tts_isnull[inputoff]); else { errno_t errorno = EOK; ExecClearTuple(peraggstate->sortslot); errorno = memcpy_s(peraggstate->sortslot->tts_values, peraggstate->numInputs * sizeof(Datum), &slot->tts_values[inputoff], peraggstate->numInputs * sizeof(Datum)); securec_check(errorno, "\0", "\0"); errorno = memcpy_s(peraggstate->sortslot->tts_isnull, peraggstate->numInputs * sizeof(bool), &slot->tts_isnull[inputoff], peraggstate->numInputs * sizeof(bool)); securec_check(errorno, "\0", "\0"); peraggstate->sortslot->tts_nvalid = peraggstate->numInputs; ExecStoreVirtualTuple(peraggstate->sortslot); tuplesort_puttupleslot(peraggstate->sortstates[setno], peraggstate->sortslot); } } } else { /* We can apply the transition function immediately */ FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo; /* Load values into fcinfo */ /* Start from 1, since the 0th arg will be the transition value */ Assert(slot->tts_nvalid >= numTransInputs); for (i = 0; i < numTransInputs; i++) { fcinfo->arg[i + 1] = slot->tts_values[i + inputoff]; fcinfo->argnull[i + 1] = slot->tts_isnull[i + inputoff]; fcinfo->argTypes[i + 1] = InvalidOid; } for (setno = 0; setno < numGroupingSets; setno++) { AggStatePerGroup pergroupstate = &pergroup[aggno + (setno * numAggs)]; aggstate->current_set = setno; #ifdef PGXC /* * For the agg not in the first level, besides PGXC case, we should do * collection. */ if ((peraggstate->aggref->aggstage > 0 || aggstate->is_final) && need_adjust_agg_inner_func_type(peraggstate->aggref)) { /* * we are collecting results sent by the Datanodes, so advance * collections instead of transitions */ advance_collection_function(aggstate, peraggstate, pergroupstate, fcinfo); } else #endif advance_transition_function(aggstate, peraggstate, pergroupstate); } } } } /* * Run the transition function for a DISTINCT or ORDER BY aggregate * with only one input. This is called after we have completed * entering all the input values into the sort object. We complete the * sort, read out the values in sorted order, and run the transition * function on each value (applying DISTINCT if appropriate). * * Note that the strictness of the transition function was checked when * entering the values into the sort, so we don't check it again here; * we just apply standard SQL DISTINCT logic. * * The one-input case is handled separately from the multi-input case * for performance reasons: for single by-value inputs, such as the * common case of count(distinct id), the tuplesort_getdatum code path * is around 300% faster. (The speedup for by-reference types is less * but still noticeable.) * * This function handles only one grouping set (already set in * aggstate->current_set). * * When called, CurrentMemoryContext should be the per-query context. */ static void process_ordered_aggregate_single( AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate) { Datum oldVal = (Datum)0; bool oldIsNull = true; bool haveOldVal = false; MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; MemoryContext oldContext; bool isDistinct = (peraggstate->numDistinctCols > 0); Datum* newVal = NULL; bool* isNull = NULL; FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo; Assert(peraggstate->numDistinctCols < 2); tuplesort_performsort(peraggstate->sortstates[aggstate->current_set]); /* Init FunctionCallInfoData for transition function before loading argument values. */ InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), peraggstate->numArguments + 1, peraggstate->aggCollation, (Node*)aggstate, NULL); /* Load the column into argument 1 (arg 0 will be transition value) */ newVal = fcinfo->arg + 1; isNull = fcinfo->argnull + 1; /* * Note: if input type is pass-by-ref, the datums returned by the sort are * freshly palloc'd in the per-query context, so we must be careful to * pfree them when they are no longer needed. */ while (tuplesort_getdatum(peraggstate->sortstates[aggstate->current_set], true, newVal, isNull)) { /* * Clear and select the working context for evaluation of the equality * function and transition function. */ MemoryContextReset(workcontext); oldContext = MemoryContextSwitchTo(workcontext); /* * If DISTINCT mode, and not distinct from prior, skip it. * * Note: we assume equality functions don't care about collation. */ if (isDistinct && haveOldVal && ((oldIsNull && *isNull) || (!oldIsNull && !*isNull && DatumGetBool(FunctionCall2(&peraggstate->equalfns[0], oldVal, *newVal))))) { /* equal to prior, so forget this one */ if (!peraggstate->inputtypeByVal && !*isNull) pfree(DatumGetPointer(*newVal)); } else { advance_transition_function(aggstate, peraggstate, pergroupstate); /* forget the old value, if any */ if (!oldIsNull && !peraggstate->inputtypeByVal) pfree(DatumGetPointer(oldVal)); /* and remember the new one for subsequent equality checks */ oldVal = *newVal; oldIsNull = *isNull; haveOldVal = true; } MemoryContextSwitchTo(oldContext); } if (!oldIsNull && !peraggstate->inputtypeByVal) pfree(DatumGetPointer(oldVal)); tuplesort_end(peraggstate->sortstates[aggstate->current_set]); peraggstate->sortstates[aggstate->current_set] = NULL; } /* * Run the transition function for a DISTINCT or ORDER BY aggregate * with more than one input. This is called after we have completed * entering all the input values into the sort object. We complete the * sort, read out the values in sorted order, and run the transition * function on each value (applying DISTINCT if appropriate). * * When called, CurrentMemoryContext should be the per-query context. */ static void process_ordered_aggregate_multi( AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate) { MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo; TupleTableSlot* slot1 = peraggstate->sortslot; TupleTableSlot* slot2 = peraggstate->uniqslot; int numTransInputs = peraggstate->numTransInputs; int numDistinctCols = peraggstate->numDistinctCols; Oid* sortCollations = peraggstate->sortCollations; Datum newAbbrevVal = (Datum)0; Datum oldAbbrevVal = (Datum)0; bool haveOldValue = false; int i; tuplesort_performsort(peraggstate->sortstates[aggstate->current_set]); (void)ExecClearTuple(slot1); if (slot2 != NULL) (void)ExecClearTuple(slot2); while (tuplesort_gettupleslot(peraggstate->sortstates[aggstate->current_set], true, slot1, &newAbbrevVal)) { /* * Extract the first numTransInputs as datums to pass to the transfn. * (This will help execTuplesMatch too, so do it immediately.) */ tableam_tslot_getsomeattrs(slot1, numTransInputs); if (numDistinctCols == 0 || !haveOldValue || newAbbrevVal != oldAbbrevVal || !execTuplesMatch(slot1, slot2, numDistinctCols, peraggstate->sortColIdx, peraggstate->equalfns, workcontext, sortCollations)) { /* Init FunctionCallInfoData for transition function before loading argument values. */ InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), numTransInputs + 1, peraggstate->aggCollation, (Node*)aggstate, NULL); /* Load values into fcinfo */ /* Start from 1, since the 0th arg will be the transition value */ for (i = 0; i < numTransInputs; i++) { fcinfo->arg[i + 1] = slot1->tts_values[i]; fcinfo->argnull[i + 1] = slot1->tts_isnull[i]; } advance_transition_function(aggstate, peraggstate, pergroupstate); if (numDistinctCols > 0) { /* swap the slot pointers to retain the current tuple */ TupleTableSlot* tmpslot = slot2; slot2 = slot1; slot1 = tmpslot; /* avoid execTuplesMatch() calls by reusing abbreviated keys */ oldAbbrevVal = newAbbrevVal; haveOldValue = true; } } /* Reset context each time, unless execTuplesMatch did it for us */ if (numDistinctCols == 0) MemoryContextReset(workcontext); (void)ExecClearTuple(slot1); } if (slot2 != NULL) (void)ExecClearTuple(slot2); tuplesort_end(peraggstate->sortstates[aggstate->current_set]); peraggstate->sortstates[aggstate->current_set] = NULL; } /* * Compute the final value of one aggregate. * * This function handles only one grouping set (already set in * aggstate->current_set). * * The finalfunction will be run, and the result delivered, in the * output-tuple context; caller's CurrentMemoryContext does not matter. */ static void finalize_aggregate(AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, Datum* resultVal, bool* resultIsNull) { bool anynull = false; FunctionCallInfoData fcinfo; /* record the current passed argument position */ int args_pos = 1; /* For a normal agg only the transition state value being passed to the finalfn */ int numFinalArgs = 1; MemoryContext oldContext; ListCell* lc = NULL; oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); /* * For ordered-set aggregates, direct argument(s) alone with nulls placeholder * (corresponding to the aggregate-input columns) are passed to finalfn. */ if (AGGKIND_IS_ORDERED_SET(peraggstate->aggref->aggkind)) numFinalArgs += peraggstate->numArguments; /* init the number of arguments to a function. */ InitFunctionCallInfoArgs(fcinfo, numFinalArgs, 1); /* * Evaluate any direct arguments for finalfn and load them into function * call info. */ foreach (lc, peraggstate->aggrefstate->aggdirectargs) { fcinfo.arg[args_pos] = ExecEvalExpr((ExprState*)lfirst(lc), aggstate->ss.ps.ps_ExprContext, &fcinfo.argnull[args_pos]); fcinfo.argTypes[args_pos] = ((ExprState*)lfirst(lc))->resultType; if (anynull == true || fcinfo.argnull[args_pos] == true) anynull = true; else anynull = false; args_pos++; } #ifdef PGXC /* * If we skipped the transition phase, we have the collection result in the * collectValue, move it to transValue for finalization to work on. */ if ((peraggstate->aggref->aggstage > 0 || aggstate->is_final) && need_adjust_agg_inner_func_type(peraggstate->aggref)) { pergroupstate->transValue = pergroupstate->collectValue; pergroupstate->transValueIsNull = pergroupstate->collectValueIsNull; } #endif /* PGXC */ Assert(args_pos <= numFinalArgs); /* * Apply the agg's finalfn if one is provided, else return transValue. */ if (OidIsValid(peraggstate->finalfn_oid)) { /* set up aggstate->curperagg to allow get aggref */ aggstate->curperagg = peraggstate; InitFunctionCallInfoData( fcinfo, &(peraggstate->finalfn), numFinalArgs, peraggstate->aggCollation, (Node*)aggstate, NULL); fcinfo.arg[0] = pergroupstate->transValue; fcinfo.argnull[0] = pergroupstate->transValueIsNull; fcinfo.argTypes[0] = InvalidOid; if (anynull == true || pergroupstate->transValueIsNull == true) anynull = true; else anynull = false; /* Fill remaining arguments positions with nulls */ while (args_pos < numFinalArgs) { fcinfo.arg[args_pos] = (Datum)0; fcinfo.argnull[args_pos] = true; fcinfo.argTypes[args_pos] = InvalidOid; args_pos++; anynull = true; } if (fcinfo.flinfo->fn_strict && anynull) { /* don't call a strict function with NULL inputs */ *resultVal = (Datum)0; *resultIsNull = true; } else { *resultVal = FunctionCallInvoke(&fcinfo); *resultIsNull = fcinfo.isnull; } aggstate->curperagg = NULL; } else { *resultVal = pergroupstate->transValue; *resultIsNull = pergroupstate->transValueIsNull; } /* * If result is pass-by-ref, make sure it is in the right context. */ if (!peraggstate->resulttypeByVal && !*resultIsNull && !MemoryContextContains(CurrentMemoryContext, DatumGetPointer(*resultVal))) *resultVal = datumCopy(*resultVal, peraggstate->resulttypeByVal, peraggstate->resulttypeLen); MemoryContextSwitchTo(oldContext); } /* * Prepare to finalize and project based on the specified representative tuple * slot and grouping set. * * In the specified tuple slot, force to null all attributes that should be * read as null in the context of the current grouping set. Also stash the * current group bitmap where GroupingExpr can get at it. * * This relies on three conditions: * * 1) Nothing is ever going to try and extract the whole tuple from this slot, * only reference it in evaluations, which will only access individual * attributes. * * 2) No system columns are going to need to be nulled. (If a system column is * referenced in a group clause, it is actually projected in the outer plan * tlist.) * * 3) Within a given phase, we never need to recover the value of an attribute * once it has been set to null. * * Poking into the slot this way is a bit ugly, but the consensus is that the * alternative was worse. */ static void prepare_projection_slot(AggState* aggstate, TupleTableSlot* slot, int currentSet) { if (aggstate->phase->grouped_cols) { Bitmapset* grouped_cols = aggstate->phase->grouped_cols[currentSet]; aggstate->grouped_cols = grouped_cols; if (TTS_EMPTY(slot)) { /* * Force all values to be NULL if working on an empty input tuple * (i.e. an empty grouping set for which no input rows were * supplied). */ ExecStoreAllNullTuple(slot); } else if (aggstate->all_grouped_cols) { ListCell* lc = NULL; Assert(slot->tts_tupleDescriptor != NULL); /* all_grouped_cols is arranged in desc order */ tableam_tslot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols)); foreach (lc, aggstate->all_grouped_cols) { int attnum = lfirst_int(lc); if (!bms_is_member(attnum, grouped_cols)) slot->tts_isnull[attnum - 1] = true; } } } } /* * Compute the final value of all aggregates for one group. * * This function handles only one grouping set at a time. * * Results are stored in the output econtext aggvalues/aggnulls. */ static void finalize_aggregates(AggState* aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup, int currentSet) { ExprContext* econtext = aggstate->ss.ps.ps_ExprContext; Datum* aggvalues = econtext->ecxt_aggvalues; bool* aggnulls = econtext->ecxt_aggnulls; int aggno; Assert(currentSet == 0 || ((Agg*)aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); aggstate->current_set = currentSet; for (aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAgg peraggstate = &peragg[aggno]; AggStatePerGroup pergroupstate; pergroupstate = &pergroup[aggno + (currentSet * (aggstate->numaggs))]; if (peraggstate->numSortCols > 0) { Assert(((Agg*)aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); if (peraggstate->numInputs == 1) process_ordered_aggregate_single(aggstate, peraggstate, pergroupstate); else process_ordered_aggregate_multi(aggstate, peraggstate, pergroupstate); } finalize_aggregate(aggstate, peraggstate, pergroupstate, &aggvalues[aggno], &aggnulls[aggno]); } } /* * Project the result of a group (whose aggs have already been calculated by * finalize_aggregates). Returns the result slot, or NULL if no row is * projected (suppressed by qual or by an empty SRF). */ static TupleTableSlot* project_aggregates(AggState* aggstate) { ExprContext* econtext = aggstate->ss.ps.ps_ExprContext; /* * Check the qual (HAVING clause); if the group does not match, ignore it. */ if (ExecQual(aggstate->ss.ps.qual, econtext)) { /* * Form and return or store a projection tuple using the aggregate * results and the representative input tuple. */ ExprDoneCond isDone; TupleTableSlot* result = NULL; result = ExecProject(aggstate->ss.ps.ps_ProjInfo, &isDone); if (isDone != ExprEndResult) { aggstate->ss.ps.ps_vec_TupFromTlist = (isDone == ExprMultipleResult); return result; } } else InstrCountFiltered1(aggstate, 1); return NULL; } /* * find_unaggregated_cols * Construct a bitmapset of the column numbers of un-aggregated Vars * appearing in our targetlist and qual (HAVING clause) */ static Bitmapset* find_unaggregated_cols(AggState* aggstate) { Agg* node = (Agg*)aggstate->ss.ps.plan; Bitmapset* colnos = NULL; colnos = NULL; (void)find_unaggregated_cols_walker((Node*)node->plan.targetlist, &colnos); (void)find_unaggregated_cols_walker((Node*)node->plan.qual, &colnos); return colnos; } static bool find_unaggregated_cols_walker(Node* node, Bitmapset** colnos) { if (node == NULL) return false; if (IsA(node, Var)) { Var* var = (Var*)node; /* setrefs.c should have set the varno to OUTER_VAR */ Assert(var->varno == OUTER_VAR); Assert(var->varlevelsup == 0); *colnos = bms_add_member(*colnos, var->varattno); return false; } if (IsA(node, Aggref) || IsA(node, GroupingFunc)) { /* do not descend into aggregate exprs */ return false; } return expression_tree_walker(node, (bool (*)())find_unaggregated_cols_walker, (void*)colnos); } /* * Initialize the hash table to empty. * * The hash table always lives in the aggcontext memory context. */ static void build_hash_table(AggState* aggstate) { Agg* node = (Agg*)aggstate->ss.ps.plan; MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory; Size entrysize; int64 workMem = SET_NODEMEM(node->plan.operatorMemKB[0], node->plan.dop); Assert(node->aggstrategy == AGG_HASHED); #ifdef USE_ASSERT_CHECKING Assert(node->numGroups > 0); #else if (node->numGroups <= 0) { elog(LOG, "[build_hash_table]: unexpected numGroups: %ld.", node->numGroups); node->numGroups = (long)LONG_MAX; } #endif entrysize = offsetof(AggHashEntryData, pergroup) + (aggstate->numaggs) * sizeof(AggStatePerGroupData); aggstate->hashtable = BuildTupleHashTable(node->numCols, node->grpColIdx, aggstate->phase->eqfunctions, aggstate->hashfunctions, node->numGroups, entrysize, aggstate->aggcontexts[0], tmpmem, workMem, node->grp_collations); } /* * Create a list of the tuple columns that actually need to be stored in * hashtable entries. The incoming tuples from the child plan node will * contain grouping columns, other columns referenced in our targetlist and * qual, columns used to compute the aggregate functions, and perhaps just * junk columns we don't use at all. Only columns of the first two types * need to be stored in the hashtable, and getting rid of the others can * make the table entries significantly smaller. To avoid messing up Var * numbering, we keep the same tuple descriptor for hashtable entries as the * incoming tuples have, but set unwanted columns to NULL in the tuples that * go into the table. * * To eliminate duplicates, we build a bitmapset of the needed columns, then * convert it to an integer list (cheaper to scan at runtime). The list is * in decreasing order so that the first entry is the largest; * lookup_hash_entry depends on this to use table's getsomeattrs correctly. * Note that the list is preserved over ExecReScanAgg, so we allocate it in * the per-query context (unlike the hash table itself). * * Note: at present, searching the tlist/qual is not really necessary since * the parser should disallow any unaggregated references to ungrouped * columns. However, the search will be needed when we add support for * SQL99 semantics that allow use of "functionally dependent" columns that * haven't been explicitly grouped by. */ List* find_hash_columns(AggState* aggstate) { Agg* node = (Agg*)aggstate->ss.ps.plan; Bitmapset* colnos = NULL; List* collist = NIL; int i; /* Find Vars that will be needed in tlist and qual */ colnos = find_unaggregated_cols(aggstate); /* Add in all the grouping columns */ for (i = 0; i < node->numCols; i++) colnos = bms_add_member(colnos, node->grpColIdx[i]); /* Convert to list, using lcons so largest element ends up first */ collist = NIL; while ((i = bms_first_member(colnos)) >= 0) collist = lcons_int(i, collist); bms_free_ext(colnos); return collist; } /* * Estimate per-hash-table-entry overhead for the planner. * * Note that the estimate does not include space for pass-by-reference * transition data values, nor for the representative tuple of each group. */ Size hash_agg_entry_size(int numAggs) { Size entrysize; /* This must match build_hash_table */ entrysize = offsetof(AggHashEntryData, pergroup) + numAggs * sizeof(AggStatePerGroupData); entrysize = MAXALIGN(entrysize); /* Account for hashtable overhead (assuming fill factor = 1) */ entrysize += 3 * sizeof(void*); return entrysize; } /* * Compute the hash value for a tuple */ uint32 ComputeHashValue(TupleHashTable hashtbl) { TupleTableSlot* slot = NULL; TupleHashTable hashtable = hashtbl; int numCols = hashtable->numCols; AttrNumber* keyColIdx = hashtable->keyColIdx; FmgrInfo* hashfunctions = NULL; uint32 hashkey = 0; int i; /* Process the current input tuple for the table */ slot = hashtable->inputslot; hashfunctions = hashtable->in_hash_funcs; /* Get the Table Accessor Method*/ Assert(slot != NULL && slot->tts_tupleDescriptor != NULL); for (i = 0; i < numCols; i++) { AttrNumber att = keyColIdx[i]; Datum attr; bool isNull = true; /* rotate hashkey left 1 bit at each step */ hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); attr = tableam_tslot_getattr(slot, att, &isNull); /* treat nulls as having hash key 0 */ if (!isNull) { uint32 hkey; hkey = DatumGetUInt32(FunctionCall1(&hashfunctions[i], attr)); hashkey ^= hkey; } } hashkey = DatumGetUInt32(hash_uint32(hashkey)); return hashkey; } /* * Find or create a hashtable entry for the tuple group containing the * given tuple. * * When called, CurrentMemoryContext should be the per-query context. */ static AggHashEntry lookup_hash_entry(AggState* aggstate, TupleTableSlot* inputslot) { TupleTableSlot* hashslot = aggstate->hashslot; ListCell* l = NULL; AggHashEntry entry; bool isnew = false; AggWriteFileControl* TempFileControl = (AggWriteFileControl*)aggstate->aggTempFileControl; /* if first time through, initialize hashslot by cloning input slot */ if (hashslot->tts_tupleDescriptor == NULL) { ExecSetSlotDescriptor(hashslot, inputslot->tts_tupleDescriptor); /* Make sure all unused columns are NULLs */ ExecStoreAllNullTuple(hashslot); } /* transfer just the needed columns into hashslot */ tableam_tslot_getsomeattrs(inputslot, linitial_int(aggstate->hash_needed)); foreach (l, aggstate->hash_needed) { int varNumber = lfirst_int(l) - 1; hashslot->tts_values[varNumber] = inputslot->tts_values[varNumber]; hashslot->tts_isnull[varNumber] = inputslot->tts_isnull[varNumber]; } if (TempFileControl->spillToDisk == false || TempFileControl->finishwrite == true) { /* find or create the hashtable entry using the filtered tuple */ entry = (AggHashEntry)LookupTupleHashEntry(aggstate->hashtable, hashslot, &isnew, true); } else { /* this solt need be insert into temp file instead of hash table if it is not existed in hash table */ entry = (AggHashEntry)LookupTupleHashEntry(aggstate->hashtable, hashslot, &isnew, false); } if (isnew) { /* this slot is new and has be inserted to hash table */ if (entry) { /* initialize aggregates for new tuple group */ if (aggstate->ss.ps.state->es_is_flt_frame) { initialize_aggregates_flattened(aggstate, entry->pergroup); } else { initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup); } agg_spill_to_disk(TempFileControl, aggstate->hashtable, aggstate->hashslot, ((Agg*)aggstate->ss.ps.plan)->numGroups, true, aggstate->ss.ps.plan->plan_node_id, SET_DOP(aggstate->ss.ps.plan->dop), aggstate->ss.ps.instrument); if (TempFileControl->filesource && aggstate->ss.ps.instrument) { TempFileControl->filesource->m_spill_size = &aggstate->ss.ps.instrument->sorthashinfo.spill_size; } } else { /* this slot is new, it need be inserted to temp file */ Assert(TempFileControl->spillToDisk == true && TempFileControl->finishwrite == false); uint32 hashvalue; MinimalTuple tuple = ExecFetchSlotMinimalTuple(inputslot); MemoryContext oldContext; /* * Here need switch memorycontext to ecxt_per_tuple_memory, so memory which be applyed in function * ComputeHashValue is freed. */ oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); hashvalue = ComputeHashValue(aggstate->hashtable); MemoryContextSwitchTo(oldContext); TempFileControl->filesource->writeTup(tuple, hashvalue & (TempFileControl->filenum - 1)); } } else if (((Agg *)aggstate->ss.ps.plan)->unique_check) { ereport(ERROR, (errcode(ERRCODE_CARDINALITY_VIOLATION), errmsg("more than one row returned by a subquery used as an expression"))); } if (aggstate->ss.ps.state->es_is_flt_frame) { if (entry) { aggstate->all_pergroups = entry->pergroup; } else { aggstate->all_pergroups = NULL; } } return entry; } /* prepare_data_source * get next data source, if it has finished return false else return true */ static bool prepare_data_source(AggState* node) { AggWriteFileControl* TempFileControl = (AggWriteFileControl*)node->aggTempFileControl; /* get data from lefttree node */ if (TempFileControl->strategy == MEMORY_HASHAGG) { /* * To avoid unnesseray memory allocate during initialization, we move building * process here and hash table should only be initialized once. */ if (unlikely(node->hashtable == NULL)) { build_hash_table(node); } TempFileControl->m_hashAggSource = New(CurrentMemoryContext) hashOpSource(outerPlanState(node)); /* get data from temp file */ } else if (TempFileControl->strategy == DIST_HASHAGG) { TempFileControl->m_hashAggSource = TempFileControl->filesource; if (TempFileControl->curfile >= 0) { TempFileControl->filesource->close(TempFileControl->curfile); } TempFileControl->curfile++; while (TempFileControl->curfile < TempFileControl->filenum) { int currfileidx = TempFileControl->curfile; if (TempFileControl->filesource->m_rownum[currfileidx] != 0) { TempFileControl->filesource->setCurrentIdx(currfileidx); MemoryContextResetAndDeleteChildren(node->aggcontexts[0]); build_hash_table(node); TempFileControl->filesource->rewind(currfileidx); node->table_filled = false; node->agg_done = false; break; /* no data in this temp file */ } else { TempFileControl->filesource->close(currfileidx); TempFileControl->curfile++; } } if (TempFileControl->curfile == TempFileControl->filenum) { return false; } } else { Assert(false); } TempFileControl->runState = HASHAGG_FETCH; return true; } /* agg_retrieve * retrieving groups from hash table; */ static TupleTableSlot* agg_retrieve(AggState* node) { AggWriteFileControl* TempFileControl = (AggWriteFileControl*)node->aggTempFileControl; TupleTableSlot* tmptup = NULL; for (;;) { switch (TempFileControl->runState) { case HASHAGG_PREPARE: { if (!prepare_data_source(node)) { return NULL; } break; } case HASHAGG_FETCH: { if (!node->table_filled) agg_fill_hash_table(node); tmptup = agg_retrieve_hash_table(node); if (tmptup != NULL) { return tmptup; } else if (tmptup == NULL && TempFileControl->spillToDisk == true) { TempFileControl->runState = HASHAGG_PREPARE; TempFileControl->strategy = DIST_HASHAGG; } else { return NULL; } break; } default: break; } } } /* * ExecAgg - * * ExecAgg receives tuples from its outer subplan and aggregates over * the appropriate attribute for each aggregate function use (Aggref * node) appearing in the targetlist or qual of the node. The number * of tuples to aggregate over depends on whether grouped or plain * aggregation is selected. In grouped aggregation, we produce a result * row for each group; in plain aggregation there's a single result row * for the whole query. In either case, the value of each aggregate is * stored in the expression context to be used when ExecProject evaluates * the result tuple. */ static TupleTableSlot* ExecAgg(PlanState* state) { AggState* node = castNode(AggState, state); TupleTableSlot* slot = NULL; /* * just for cooperation analysis. do nothing if is_dummy is true. * is_dummy is true that means Agg node is deparsed to remote sql in ForeignScan node. */ if (((Agg*)node->ss.ps.plan)->is_dummy) { slot = ExecProcNode(outerPlanState(node)); return slot; } /* * Check to see if we're still projecting out tuples from a previous agg * tuple (because there is a function-returning-set in the projection * expressions). If so, try to project another one. */ if (node->ss.ps.ps_vec_TupFromTlist) { ExprDoneCond isDone; slot = ExecProject(node->ss.ps.ps_ProjInfo, &isDone); if (isDone == ExprMultipleResult) return slot; /* Done with that source tuple... */ node->ss.ps.ps_vec_TupFromTlist = false; } if (!node->agg_done) { /* Dispatch based on strategy */ switch (((Agg*)node->ss.ps.plan)->aggstrategy) { case AGG_HASHED: slot = agg_retrieve(node); break; case AGG_PLAIN: case AGG_SORTED: slot = agg_retrieve_direct(node); break; case AGG_SORT_GROUP: slot = agg_sort_group_retrieve_direct(node); break; } if (!TupIsNull(slot)) return slot; } return NULL; } /* * ExecAgg for non-hashed case */ static TupleTableSlot* agg_retrieve_direct(AggState* aggstate) { Agg* node = aggstate->phase->aggnode; ExprContext* econtext = NULL; ExprContext* tmpcontext = NULL; AggStatePerAgg peragg = NULL; AggStatePerAggForFlattenedExpr peragg_flattened = NULL; AggStatePerGroup pergroup; TupleTableSlot* outerslot = NULL; TupleTableSlot* firstSlot = NULL; TupleTableSlot* result = NULL; bool hasGroupingSets = aggstate->phase->numsets > 0; int numGroupingSets = Max(aggstate->phase->numsets, 1); int currentSet; int nextSetSize; int numReset; /* * get state info from node * * econtext is the per-output-tuple expression context * tmpcontext is the per-input-tuple expression context * */ econtext = aggstate->ss.ps.ps_ExprContext; tmpcontext = aggstate->tmpcontext; if (aggstate->ss.ps.state->es_is_flt_frame) { peragg_flattened = aggstate->peragg_flattened; } else { peragg = aggstate->peragg; } pergroup = aggstate->pergroup; firstSlot = aggstate->ss.ss_ScanTupleSlot; /* * We loop retrieving groups until we find one matching *aggstate->ss.ps.qual * * For grouping sets, we have the invariant that aggstate->projected_set * is either -1 (initial call) or the index (starting from 0) in * gset_lengths for the group we just completed (either by projecting a * row or by discarding it in the qual). * * aggstate->ss.ps.qual */ while (!aggstate->agg_done) { /* * Clear the per-output-tuple context for each group, as well as * aggcontext (which contains any pass-by-ref transvalues of the old * group). Some aggregate functions store working state in child * contexts; those now get reset automatically without us needing to * do anything special. * * We use ReScanExprContext not just ResetExprContext because we want * any registered shutdown callbacks to be called. That allows * aggregate functions to ensure they've cleaned up any non-memory * resources. * */ ReScanExprContext(econtext); /* * Determine how many grouping sets need to be reset at this boundary. */ if (aggstate->projected_set >= 0 && aggstate->projected_set < numGroupingSets) numReset = aggstate->projected_set + 1; else numReset = numGroupingSets; /* * numReset can change on a phase boundary, but that's OK; we want to * reset the contexts used in _this_ phase, and later, after possibly * changing phase, initialize the right number of aggregates for the * _new_ phase. */ for (int i = 0; i < numReset; i++) { MemoryContextReset(aggstate->aggcontexts[i]); } /* * Check if input is complete and there are no more groups to project * in this phase; move to next phase or mark as done. */ if (aggstate->input_done == true && aggstate->projected_set >= (numGroupingSets - 1)) { if (aggstate->current_phase < aggstate->numphases - 1) { initialize_phase(aggstate, aggstate->current_phase + 1); aggstate->input_done = false; aggstate->projected_set = -1; numGroupingSets = Max(aggstate->phase->numsets, 1); node = aggstate->phase->aggnode; numReset = numGroupingSets; } else { aggstate->agg_done = true; break; } } /* * Get the number of columns in the next grouping set after the last * projected one (if any). This is the number of columns to compare to * see if we reached the boundary of that set too. */ if (aggstate->projected_set >= 0 && aggstate->projected_set < (numGroupingSets - 1)) nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1]; else nextSetSize = 0; /* ---------- * If a subgroup for the current grouping set is present, project it. * * We have a new group if: * - we're out of input but haven't projected all grouping sets * (checked above) * OR * - we already projected a row that wasn't from the last grouping * set * AND * - the next grouping set has at least one grouping column (since * empty grouping sets project only once input is exhausted) * AND * - the previous and pending rows differ on the grouping columns * of the next grouping set * ---------- */ if (aggstate->input_done || (node->aggstrategy == AGG_SORTED && aggstate->projected_set != -1 && aggstate->projected_set < (numGroupingSets - 1) && nextSetSize > 0 && !execTuplesMatch(econtext->ecxt_outertuple, tmpcontext->ecxt_outertuple, nextSetSize, node->grpColIdx, aggstate->phase->eqfunctions, tmpcontext->ecxt_per_tuple_memory, node->grp_collations))) { aggstate->projected_set += 1; Assert(aggstate->projected_set < numGroupingSets); Assert(nextSetSize > 0 || aggstate->input_done); } else { /* * We no longer care what group we just projected, the next * projection will always be the first (or only) grouping set * (unless the input proves to be empty). */ aggstate->projected_set = 0; /* * If we don't already have the first tuple of the new group, * fetch it from the outer plan. */ if (aggstate->grp_firstTuple == NULL) { outerslot = fetch_input_tuple(aggstate); if (!TupIsNull(outerslot)) { /* * Make a copy of the first input tuple; we will use this * for comparisons (in group mode) and for projection. */ aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); } else { /* outer plan produced no tuples at all */ if (hasGroupingSets) { /* * If there was no input at all, we need to project * rows only if there are grouping sets of size 0. * Note that this implies that there can't be any * references to ungrouped Vars, which would otherwise * cause issues with the empty output slot. * * XXX: This is no longer true, we currently deal with * this in finalize_aggregates(). */ aggstate->input_done = true; while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0) { aggstate->projected_set += 1; if (aggstate->projected_set >= numGroupingSets) { /* * We can't set agg_done here because we might * have more phases to do, even though the * input is empty. So we need to restart the * whole outer loop. */ break; } } if (aggstate->projected_set >= numGroupingSets) continue; } else { aggstate->agg_done = true; /* If we are grouping, we should produce no tuples too */ if (node->aggstrategy != AGG_PLAIN) return NULL; #ifdef USE_SPQ if (IS_SPQ_EXECUTOR) { if (t_thrd.spq_ctx.skip_direct_distribute_result) return NULL; } #endif } } } /* * Initialize working state for a new input tuple group. */ if (aggstate->ss.ps.state->es_is_flt_frame) { initialize_aggregates_flattened(aggstate, pergroup, numReset); } else { initialize_aggregates(aggstate, peragg, pergroup, numReset); } if (aggstate->grp_firstTuple != NULL) { /* * Store the copied first input tuple in the tuple table slot * reserved for it. The tuple will be deleted when it is * cleared from the slot. */ (void)ExecStoreTuple(aggstate->grp_firstTuple, firstSlot, InvalidBuffer, true); aggstate->grp_firstTuple = NULL; /* don't keep two * pointers */ /* set up for first advance_aggregates call */ tmpcontext->ecxt_outertuple = firstSlot; /* * Process each outer-plan tuple, and then fetch the next one, * until we exhaust the outer plan or cross a group boundary. */ for (;;) { if (aggstate->ss.ps.state->es_is_flt_frame) { advance_aggregates_flattened(aggstate, pergroup); } else { advance_aggregates(aggstate, pergroup); } /* Reset per-input-tuple context after each tuple */ ResetExprContext(tmpcontext); outerslot = fetch_input_tuple(aggstate); if (TupIsNull(outerslot)) { /* no more outer-plan tuples available */ if (hasGroupingSets) { aggstate->input_done = true; break; } else { aggstate->agg_done = true; break; } } /* set up for next advance_aggregates call */ tmpcontext->ecxt_outertuple = outerslot; /* * If we are grouping, check whether we've crossed a group * boundary. */ if (node->aggstrategy == AGG_SORTED) { if (!execTuplesMatch(firstSlot, outerslot, node->numCols, node->grpColIdx, aggstate->phase->eqfunctions, tmpcontext->ecxt_per_tuple_memory, node->grp_collations)) { aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); break; } } } } /* * Use the representative input tuple for any references to * non-aggregated input columns in aggregate direct args, the node * qual, and the tlist. (If we are not grouping, and there are no * input rows at all, we will come here with an empty firstSlot * ... but if not grouping, there can't be any references to * non-aggregated input columns, so no problem.) */ econtext->ecxt_outertuple = firstSlot; } Assert(aggstate->projected_set >= 0); currentSet = aggstate->projected_set; prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet); if (aggstate->ss.ps.state->es_is_flt_frame) { finalize_aggregates_flattened(aggstate, peragg_flattened, pergroup, currentSet); } else { finalize_aggregates(aggstate, peragg, pergroup, currentSet); } /* * If there's no row to project right now, we must continue rather * than returning a null since there might be more groups. */ result = project_aggregates(aggstate); if (result != NULL) return result; } /* No more groups */ return NULL; } /* * ExecAgg for sort-group case */ static TupleTableSlot *agg_sort_group_retrieve_direct(AggState *aggstate) { ExprContext *econtext = NULL; ExprContext *tmpcontext = NULL; AggStatePerAgg peragg; AggStatePerGroup pergroup; TupleTableSlot *outerslot = NULL; TupleTableSlot *firstSlot = NULL; TupleTableSlot *result = NULL; Assert(aggstate->phase->numsets == 0); /* * get state info from node * * econtext is the per-output-tuple expression context * tmpcontext is the per-input-tuple expression context * */ econtext = aggstate->ss.ps.ps_ExprContext; tmpcontext = aggstate->tmpcontext; peragg = aggstate->peragg; pergroup = aggstate->pergroup; firstSlot = aggstate->ss.ss_ScanTupleSlot; /* * We loop retrieving groups until we find one matching *aggstate->ss.ps.qual * * For grouping sets, we have the invariant that aggstate->projected_set * is either -1 (initial call) or the index (starting from 0) in * gset_lengths for the group we just completed (either by projecting a * row or by discarding it in the qual). * * aggstate->ss.ps.qual */ while (!aggstate->agg_done) { /* * Clear the per-output-tuple context for each group, as well as * aggcontext (which contains any pass-by-ref transvalues of the old * group). Some aggregate functions store working state in child * contexts; those now get reset automatically without us needing to * do anything special. * * We use ReScanExprContext not just ResetExprContext because we want * any registered shutdown callbacks to be called. That allows * aggregate functions to ensure they've cleaned up any non-memory * resources. * */ ReScanExprContext(econtext); MemoryContextReset(aggstate->aggcontexts[0]); tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple; /* * If we don't already have the first tuple of the new group, * fetch it from the outer plan. */ if (aggstate->grp_firstTuple == NULL) { outerslot = fetch_input_tuple(aggstate); if (!TupIsNull(outerslot)) { /* * Make a copy of the first input tuple; we will use this * for comparisons (in group mode) and for projection. */ aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); } else { aggstate->agg_done = true; return NULL; } } aggstate->new_group_trigger = false; /*reset new group trigger*/ /* * Initialize working state for a new input tuple group. */ initialize_aggregates(aggstate, peragg, pergroup, 1); if (aggstate->grp_firstTuple != NULL) { /* * Store the copied first input tuple in the tuple table slot * reserved for it. The tuple will be deleted when it is * cleared from the slot. */ (void)ExecStoreTuple(aggstate->grp_firstTuple, firstSlot, InvalidBuffer, true); aggstate->grp_firstTuple = NULL; /* don't keep two * pointers */ /* set up for first advance_aggregates call */ tmpcontext->ecxt_outertuple = firstSlot; /* * Process each outer-plan tuple, and then fetch the next one, * until we exhaust the outer plan or cross a group boundary. */ for (;;) { advance_aggregates(aggstate, pergroup); /* Reset per-input-tuple context after each tuple */ ResetExprContext(tmpcontext); outerslot = fetch_input_tuple(aggstate); if (TupIsNull(outerslot)) { aggstate->agg_done = true; break; } /* set up for next advance_aggregates call */ tmpcontext->ecxt_outertuple = outerslot; /* * check whether we've new group */ if (aggstate->new_group_trigger) { aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); break; /*we've new group*/ } } } /* * Use the representative input tuple for any references to * non-aggregated input columns in aggregate direct args, the node * qual, and the tlist. */ econtext->ecxt_outertuple = firstSlot; prepare_projection_slot(aggstate, econtext->ecxt_outertuple, 0); finalize_aggregates(aggstate, peragg, pergroup, 0); /* * If there's no row to project right now, we must continue rather * than returning a null since there might be more groups. */ result = project_aggregates(aggstate); if (result != NULL) return result; } /* No more groups */ return NULL; } /* * ExecAgg for hashed case: phase 1, read input and build hash table */ static void agg_fill_hash_table(AggState* aggstate) { ExprContext* tmpcontext = NULL; AggHashEntry entry; TupleTableSlot* outerslot = NULL; AggWriteFileControl* TempFileControl = (AggWriteFileControl*)aggstate->aggTempFileControl; /* * get state info from node * * tmpcontext is the per-input-tuple expression context */ /* tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; /* * Process each outer-plan tuple, and then fetch the next one, until we * exhaust the outer plan. */ WaitState oldStatus = pgstat_report_waitstatus(STATE_EXEC_HASHAGG_BUILD_HASH); for (;;) { outerslot = TempFileControl->m_hashAggSource->getTup(); if (TupIsNull(outerslot)) { if (!TempFileControl->spillToDisk) { /* Early free left tree after hash table built */ ExecEarlyFree(outerPlanState(aggstate)); EARLY_FREE_LOG(elog(LOG, "Early Free: Hash Table for Agg" " is built at node %d, memory used %d MB.", (aggstate->ss.ps.plan)->plan_node_id, getSessionMemoryUsageMB())); } pgstat_report_waitstatus(oldStatus); break; } if (aggstate->ndp_slot && outerslot->tts_mintuple && (outerslot->tts_mintuple->t_infomask & NDP_HANDLED_TUPLE)) { ndp_tableam->handle_hashaggslot(aggstate, &outerslot->tts_minhdr); continue; } /* set up for advance_aggregates call */ tmpcontext->ecxt_outertuple = outerslot; /* Find or build hashtable entry for this tuple's group */ entry = lookup_hash_entry(aggstate, outerslot); if (entry != NULL) { /* Advance the aggregates */ if (aggstate->ss.ps.state->es_is_flt_frame) { advance_aggregates_flattened(aggstate, entry->pergroup); } else { advance_aggregates(aggstate, entry->pergroup); } } else { /* this outerslot is inserted to temp table, it will be compute when the temp file be readed */ } /* Reset per-input-tuple context after each tuple */ ResetExprContext(tmpcontext); } aggstate->table_filled = true; if (HAS_INSTR(&aggstate->ss, true)) { AggWriteFileControl *aggTempFileControl = (AggWriteFileControl*)aggstate->aggTempFileControl; if (aggTempFileControl->spillToDisk == false && aggTempFileControl->inmemoryRownum > 0) aggstate->hashtable->width /= aggTempFileControl->inmemoryRownum; if (aggTempFileControl->strategy == MEMORY_HASHAGG) aggstate->ss.ps.instrument->width = (int)aggstate->hashtable->width; aggstate->ss.ps.instrument->sysBusy = aggstate->hashtable->causedBySysRes; aggstate->ss.ps.instrument->spreadNum = aggTempFileControl->spreadNum; } if (TempFileControl->spillToDisk && TempFileControl->finishwrite == false) { TempFileControl->finishwrite = true; if (HAS_INSTR(&aggstate->ss, true)) { PlanState* planstate = &aggstate->ss.ps; planstate->instrument->sorthashinfo.hash_FileNum = (TempFileControl->filenum); planstate->instrument->sorthashinfo.hash_writefile = true; } } /* Initialize to walk the hash table */ ResetTupleHashIterator(aggstate->hashtable, &aggstate->hashiter); } /* * ExecAgg for hashed case: phase 2, retrieving groups from hash table */ static TupleTableSlot* agg_retrieve_hash_table(AggState* aggstate) { ExprContext* econtext = NULL; AggStatePerAgg peragg; AggStatePerAggForFlattenedExpr peragg_flattened = NULL; AggStatePerGroup pergroup; AggHashEntry entry; TupleTableSlot* firstSlot = NULL; TupleTableSlot* result = NULL; /* * get state info from node */ /* econtext is the per-output-tuple expression context */ econtext = aggstate->ss.ps.ps_ExprContext; if (aggstate->ss.ps.state->es_is_flt_frame) { peragg_flattened = aggstate->peragg_flattened; } else { peragg = aggstate->peragg; } firstSlot = aggstate->ss.ss_ScanTupleSlot; /* * We loop retrieving groups until we find one satisfying * aggstate->ss.ps.qual */ while (!aggstate->agg_done) { /* * Find the next entry in the hash table */ entry = (AggHashEntry)ScanTupleHashTable(&aggstate->hashiter); if (entry == NULL) { /* No more entries in hashtable, so done */ aggstate->agg_done = TRUE; return NULL; } /* * Clear the per-output-tuple context for each group */ ResetExprContext(econtext); /* * Store the copied first input tuple in the tuple table slot reserved * for it, so that it can be used in ExecProject. */ ExecStoreMinimalTuple(entry->shared.firstTuple, firstSlot, false); pergroup = entry->pergroup; /* * Finalize each aggregate calculation, and stash results in the * per-output-tuple context. */ if (aggstate->ss.ps.state->es_is_flt_frame) { finalize_aggregates_flattened(aggstate, peragg_flattened, pergroup, 0); } else { finalize_aggregates(aggstate, peragg, pergroup, 0); } /* * Use the representative input tuple for any references to * non-aggregated input columns in the qual and tlist. */ econtext->ecxt_outertuple = firstSlot; /* * Check the qual (HAVING clause); if the group does not match, ignore * it and loop back to try to process another group. */ result = project_aggregates(aggstate); if (result != NULL) { return result; } } /* No more groups */ return NULL; } /* ----------------- * ExecInitAgg * * Creates the run-time information for the agg node produced by the * planner and initializes its outer subtree * ----------------- */ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) { AggState* aggstate = NULL; AggStatePerAgg peragg; AggStatePerAggForFlattenedExpr peragg_flattened; AggStatePerTrans pertransstates; Plan* outerPlan = NULL; ExprContext* econtext = NULL; int numaggs; int phase; ListCell* l = NULL; Bitmapset* all_grouped_cols = NULL; int numGroupingSets = 1; int numPhases; int i = 0; int j = 0; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); /* * create state structure */ aggstate = makeNode(AggState); aggstate->ss.ps.plan = (Plan*)node; aggstate->ss.ps.state = estate; #ifdef USE_SPQ aggstate->aggsplittype = node->aggsplittype; #endif aggstate->aggs = NIL; aggstate->numaggs = 0; aggstate->maxsets = 0; aggstate->hashfunctions = NULL; aggstate->projected_set = -1; aggstate->current_set = 0; aggstate->peragg = NULL; aggstate->curperagg = NULL; aggstate->agg_done = false; aggstate->input_done = false; aggstate->pergroup = NULL; aggstate->grp_firstTuple = NULL; aggstate->hashtable = NULL; aggstate->sort_in = NULL; aggstate->sort_out = NULL; aggstate->is_final = node->is_final; aggstate->ss.ps.ExecProcNode = ExecAgg; if (aggstate->ss.ps.state->es_is_flt_frame) { aggstate->numtrans = 0; aggstate->aggstrategy = node->aggstrategy; aggstate->peragg_flattened = NULL; aggstate->pertrans = NULL; aggstate->curpertrans = NULL; aggstate->num_hashes = (node->aggstrategy == AGG_HASHED) ? 1 : 0; } /* * Calculate the maximum number of grouping sets in any phase; this * determines the size of some allocations. */ if (node->groupingSets) { Assert(node->aggstrategy != AGG_HASHED); numGroupingSets = list_length(node->groupingSets); foreach (l, node->chain) { Agg* agg = (Agg*)lfirst(l); numGroupingSets = Max(numGroupingSets, list_length(agg->groupingSets)); } } aggstate->maxsets = numGroupingSets; aggstate->numphases = numPhases = 1 + list_length(node->chain); aggstate->aggcontexts = (MemoryContext*)palloc0(sizeof(MemoryContext) * numGroupingSets); /* * Create expression contexts. We need three or more, one for * per-input-tuple processing, one for per-output-tuple processing, and * one for each grouping set. We cheat a little * by using ExecAssignExprContext() to build both. * * NOTE: the details of what is stored in aggcontexts and what is stored * in the regular per-query memory context are driven by a simple * decision: we want to reset the aggcontext at group boundaries (if not * hashing) and in ExecReScanAgg to recover no-longer-wanted space. */ ExecAssignExprContext(estate, &aggstate->ss.ps); aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; int64 workMem = SET_NODEMEM(node->plan.operatorMemKB[0], node->plan.dop); int64 maxMem = (node->plan.operatorMaxMem > node->plan.operatorMemKB[0]) ? SET_NODEMEM(node->plan.operatorMaxMem, node->plan.dop) : 0; /* Create memcontext. The per-tuple memory context of the * per-grouping-set aggcontexts replaces the standalone * memory context formerly used to hold transition values. */ for (i = 0; i < numGroupingSets; ++i) { aggstate->aggcontexts[i] = AllocSetContextCreate(CurrentMemoryContext, "AggContext", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, STANDARD_CONTEXT, workMem * 1024L); } ExecAssignExprContext(estate, &aggstate->ss.ps); /* * tuple table initialization */ ExecInitScanTupleSlot(estate, &aggstate->ss); ExecInitResultTupleSlot(estate, &aggstate->ss.ps); aggstate->hashslot = ExecInitExtraTupleSlot(estate); aggstate->sort_slot = ExecInitExtraTupleSlot(estate); /* * initialize child expressions * * Note: ExecInitExpr finds Aggrefs for us, and also checks that no aggs * contain other agg calls in their arguments. This would make no sense * under SQL semantics anyway (and it's forbidden by the spec). Because * that is true, we don't need to worry about evaluating the aggs in any * particular order. */ if (estate->es_is_flt_frame) { aggstate->ss.ps.qual = (List*)ExecInitQualByFlatten(node->plan.qual, (PlanState*)aggstate); } else { aggstate->ss.ps.targetlist = (List*)ExecInitExprByRecursion((Expr*)node->plan.targetlist, (PlanState*)aggstate); aggstate->ss.ps.qual = (List*)ExecInitExprByRecursion((Expr*)node->plan.qual, (PlanState*)aggstate); } /* * initialize child nodes * * If we are doing a hashed aggregation then the child plan does not need * to handle REWIND efficiently; see ExecReScanAgg. */ if (node->aggstrategy == AGG_HASHED) eflags &= ~EXEC_FLAG_REWIND; outerPlan = outerPlan(node); outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags); if (node->aggstrategy == AGG_SORT_GROUP) { SortGroupState* srotGroup = (SortGroupState*) outerPlanState(aggstate); Assert(IsA(srotGroup, SortGroupState)); srotGroup->new_group_trigger = &aggstate->new_group_trigger; } /* * initialize source tuple type. */ ExecAssignScanTypeFromOuterPlan(&aggstate->ss); if (node->chain) { ExecSetSlotDescriptor(aggstate->sort_slot, aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor); } /* * Initialize result tuple type and projection info. * Result tuple slot of Aggregation always contains a virtual tuple, * Default tableAMtype for this slot is Heap. */ ExecAssignResultTypeFromTL(&aggstate->ss.ps); ExecAssignProjectionInfo(&aggstate->ss.ps, NULL); aggstate->ss.ps.ps_vec_TupFromTlist = false; /* * get the count of aggregates in targetlist and quals */ numaggs = aggstate->numaggs; Assert(numaggs == list_length(aggstate->aggs)); if (numaggs <= 0) { /* * This is not an error condition: we might be using the Agg node just * to do hash-based grouping. Even in the regular case, * constant-expression simplification could optimize away all of the * Aggrefs in the targetlist and qual. So keep going, but force local * copy of numaggs positive so that palloc()s below don't choke. */ numaggs = 1; } /* * For each phase, prepare grouping set data and fmgr lookup data for * compare functions. Accumulate all_grouped_cols in passing. */ aggstate->phases = (AggStatePerPhaseData*)palloc0(numPhases * sizeof(AggStatePerPhaseData)); for (phase = 0; phase < numPhases; ++phase) { AggStatePerPhase phasedata = &aggstate->phases[phase]; Agg* aggnode = NULL; Sort* sortnode = NULL; SortGroup* sortGroupNode = NULL; int num_sets; if (phase > 0) { aggnode = (Agg*)list_nth(node->chain, phase - 1); if (aggnode->plan.lefttree) { if (IsA(aggnode->plan.lefttree, Sort)) { sortnode = castNode(Sort, aggnode->plan.lefttree); } else if (IsA(aggnode->plan.lefttree, SortGroup)) { sortGroupNode = castNode(SortGroup, aggnode->plan.lefttree); } } } else { aggnode = node; } phasedata->numsets = num_sets = list_length(aggnode->groupingSets); if (num_sets) { phasedata->gset_lengths = (int*)palloc(num_sets * sizeof(int)); phasedata->grouped_cols = (Bitmapset**)palloc(num_sets * sizeof(Bitmapset*)); i = 0; foreach (l, aggnode->groupingSets) { int current_length = list_length((List*)lfirst(l)); Bitmapset* cols = NULL; /* planner forces this to be correct */ for (j = 0; j < current_length; ++j) cols = bms_add_member(cols, aggnode->grpColIdx[j]); phasedata->grouped_cols[i] = cols; phasedata->gset_lengths[i] = current_length; ++i; } all_grouped_cols = bms_add_members(all_grouped_cols, phasedata->grouped_cols[0]); } else { Assert(phase == 0); phasedata->gset_lengths = NULL; phasedata->grouped_cols = NULL; } /* * If we are grouping, precompute fmgr lookup data for inner loop. */ if (aggnode->aggstrategy == AGG_SORTED || aggnode->aggstrategy == AGG_SORT_GROUP) { Assert(aggnode->numCols > 0); phasedata->eqfunctions = execTuplesMatchPrepare(aggnode->numCols, aggnode->grpOperators); } phasedata->aggnode = aggnode; if (aggstate->ss.ps.state->es_is_flt_frame) { phasedata->aggstrategy = aggstate->aggstrategy; } phasedata->sortnode = sortnode; phasedata->sortGroupNode = sortGroupNode; } /* * Convert all_grouped_cols to a descending-order list. */ i = -1; while ((i = bms_next_member(all_grouped_cols, i)) >= 0) aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols); /* * Hashing can only appear in the initial phase. */ if (node->aggstrategy == AGG_HASHED) execTuplesHashPrepare( node->numCols, node->grpOperators, &aggstate->phases[0].eqfunctions, &aggstate->hashfunctions); /* * Initialize current phase-dependent values to initial phase */ aggstate->current_phase = 0; initialize_phase(aggstate, 0); /* * Set up aggregate-result storage in the output expr context, and also * allocate my private per-agg working storage */ econtext = aggstate->ss.ps.ps_ExprContext; econtext->ecxt_aggvalues = (Datum*)palloc0(sizeof(Datum) * numaggs); econtext->ecxt_aggnulls = (bool*)palloc0(sizeof(bool) * numaggs); if (aggstate->ss.ps.state->es_is_flt_frame) { peragg_flattened = (AggStatePerAggForFlattenedExpr)palloc0(sizeof(AggStatePerAggForFlattenedExprData) * numaggs); pertransstates = (AggStatePerTrans)palloc0(sizeof(AggStatePerTransData) * numaggs); aggstate->peragg_flattened = peragg_flattened; aggstate->pertrans = pertransstates; } else { peragg = (AggStatePerAgg)palloc0(sizeof(AggStatePerAggData) * numaggs); aggstate->peragg = peragg; } if (node->aggstrategy == AGG_HASHED) { aggstate->table_filled = false; /* Compute the columns we actually need to hash on */ aggstate->hash_needed = find_hash_columns(aggstate); } else { AggStatePerGroup pergroup; pergroup = (AggStatePerGroup)palloc0(sizeof(AggStatePerGroupData) * numaggs * numGroupingSets); aggstate->pergroup = pergroup; aggstate->all_pergroups = pergroup; } if (aggstate->ss.ps.state->es_is_flt_frame) { exec_lookups_agg_flattened(aggstate, node, estate); } else { exec_lookups_agg(aggstate, node, estate); } AggWriteFileControl* TempFilePara = (AggWriteFileControl*)palloc(sizeof(AggWriteFileControl)); TempFilePara->strategy = MEMORY_HASHAGG; TempFilePara->spillToDisk = false; TempFilePara->totalMem = workMem * 1024L; TempFilePara->useMem = 0; TempFilePara->inmemoryRownum = 0; TempFilePara->finishwrite = false; TempFilePara->runState = HASHAGG_PREPARE; TempFilePara->curfile = -1; TempFilePara->filenum = 0; TempFilePara->filesource = NULL; TempFilePara->m_hashAggSource = NULL; TempFilePara->maxMem = maxMem * 1024L; TempFilePara->spreadNum = 0; aggstate->aggTempFileControl = TempFilePara; return aggstate; } Datum GetAggInitVal(Datum textInitVal, Oid transtype) { Oid typinput, typioparam; char* strInitVal = NULL; Datum initVal; getTypeInputInfo(transtype, &typinput, &typioparam); strInitVal = TextDatumGetCString(textInitVal); initVal = OidInputFunctionCall(typinput, strInitVal, typioparam, -1); pfree_ext(strInitVal); return initVal; } void ExecEndAgg(AggState* node) { int setno; AggWriteFileControl* TempFileControl = (AggWriteFileControl*)node->aggTempFileControl; int numGroupingSets = Max(node->maxsets, 1); int fileNum = TempFileControl->filenum; hashFileSource* file = TempFileControl->filesource; if (file != NULL) { for (int i = 0; i < fileNum; i++) { file->close(i); } file->freeFileSource(); } /* * Clean up sort_slot first before tuplesort_end(node->sort_in) * because the minimal tuple in sort_slot may point to some memory * in sort_in(tuplesort) when doing external sort. */ (void)ExecClearTuple(node->sort_slot); /* Make sure we have closed any open tuplesorts */ if (node->sort_in) { tuplesort_end(node->sort_in); node->sort_in = NULL; } if (node->sort_out) { tuplesort_end(node->sort_out); node->sort_out = NULL; } if (node->ss.ps.state->es_is_flt_frame) { for (int transno = 0; transno < node->numtrans; transno++) { AggStatePerTrans pertrans = &node->pertrans[transno]; for (setno = 0; setno < numGroupingSets; setno++) { if (pertrans->sortstates[setno]) { tuplesort_end(pertrans->sortstates[setno]); pertrans->sortstates[setno] = NULL; } } if (AGGKIND_IS_ORDERED_SET(pertrans->aggref->aggkind) && node->ss.ps.ps_ExprContext != NULL) { /* Ensure any agg shutdown callbacks have been called */ ReScanExprContext(node->ss.ps.ps_ExprContext); } } } else { for (int aggno = 0; aggno < node->numaggs; aggno++) { AggStatePerAgg peraggstate = &node->peragg[aggno]; for (setno = 0; setno < numGroupingSets; setno++) { if (peraggstate->sortstates[setno]) { tuplesort_end(peraggstate->sortstates[setno]); peraggstate->sortstates[setno] = NULL; } } if (AGGKIND_IS_ORDERED_SET(peraggstate->aggref->aggkind) && node->ss.ps.ps_ExprContext != NULL) { /* Ensure any agg shutdown callbacks have been called */ ReScanExprContext(node->ss.ps.ps_ExprContext); } } } /* * We don't actually free any ExprContexts here (see comment in * ExecFreeExprContext), just unlinking the output one from the plan node * suffices. */ ExecFreeExprContext(&node->ss.ps); /* clean up tuple table */ (void)ExecClearTuple(node->ss.ss_ScanTupleSlot); for (setno = 0; setno < numGroupingSets; setno++) { if (node->aggcontexts[setno]) { MemoryContextDelete(node->aggcontexts[setno]); node->aggcontexts[setno] = NULL; } } ExecEndNode(outerPlanState(node)); } void ExecReScanAgg(AggState* node) { ExprContext* econtext = node->ss.ps.ps_ExprContext; PlanState* outerPlan = outerPlanState(node); AggWriteFileControl* TempFilePara = (AggWriteFileControl*)node->aggTempFileControl; Agg* aggnode = (Agg*)node->ss.ps.plan; int numGroupingSets = Max(node->maxsets, 1); int setno; errno_t rc; /* Already reset, just rescan lefttree */ bool isRescan = node->ss.ps.recursive_reset && node->ss.ps.state->es_recursive_next_iteration; if (isRescan) { if (node->ss.ps.lefttree->chgParam == NULL) ExecReScan(node->ss.ps.lefttree); node->ss.ps.recursive_reset = false; return; } node->agg_done = false; node->ss.ps.ps_vec_TupFromTlist = false; if (aggnode->aggstrategy == AGG_HASHED) { /* * In the hashed case, if we haven't yet built the hash table then we * can just return; nothing done yet, so nothing to undo. If subnode's * chgParam is not NULL then it will be re-scanned by ExecProcNode, * else no reason to re-scan it at all. */ if (!node->table_filled) return; /* * If we do have the hash table, and the subplan does not have any * parameter changes, and none of our own parameter changes affect * input expressions of the aggregated functions, then we can just * rescan the existing hash table, and have not spill to disk; * no need to build it again. */ if (node->ss.ps.lefttree->chgParam == NULL && TempFilePara->spillToDisk == false && aggnode->aggParams == NULL && !EXEC_IN_RECURSIVE_MODE(node->ss.ps.plan)) { ResetTupleHashIterator(node->hashtable, &node->hashiter); return; } } /* Make sure we have closed any open tuplesorts */ if (node->ss.ps.state->es_is_flt_frame) { for (int transno = 0; transno < node->numtrans; transno++) { for (setno = 0; setno < numGroupingSets; setno++) { AggStatePerTrans pertrans = &node->pertrans[transno]; if (pertrans->sortstates[setno]) { tuplesort_end(pertrans->sortstates[setno]); pertrans->sortstates[setno] = NULL; } } } } else { for (int aggno = 0; aggno < node->numaggs; aggno++) { for (setno = 0; setno < numGroupingSets; setno++) { AggStatePerAgg peraggstate = &node->peragg[aggno]; if (peraggstate->sortstates[setno]) { tuplesort_end(peraggstate->sortstates[setno]); peraggstate->sortstates[setno] = NULL; } } } } /* * We don't need to ReScanExprContext the output tuple context here; * ExecReScan already did it. But we do need to reset our per-grouping-set * contexts, which may have transvalues stored in them. (We use rescan * rather than just reset because transfns may have registered callbacks * that need to be run now.) * * Note that with AGG_HASHED, the hash table is allocated in a sub-context * of the aggcontext. This used to be an issue, but now, resetting a * context automatically deletes sub-contexts too. */ /* Release first tuple of group, if we have made a copy */ if (node->grp_firstTuple != NULL) { tableam_tops_free_tuple(node->grp_firstTuple); node->grp_firstTuple = NULL; } (void)ExecClearTuple(node->ss.ss_ScanTupleSlot); /* Forget current agg values */ rc = memset_s(econtext->ecxt_aggvalues, sizeof(Datum) * node->numaggs, 0, sizeof(Datum) * node->numaggs); securec_check(rc, "\0", "\0"); rc = memset_s(econtext->ecxt_aggnulls, sizeof(bool) * node->numaggs, 0, sizeof(bool) * node->numaggs); securec_check(rc, "\0", "\0"); /* * Release all temp storage. Note that with AGG_HASHED, the hash table is * allocated in a sub-context of the aggcontext. We're going to rebuild * the hash table from scratch, so we need to use * MemoryContextResetAndDeleteChildren() to avoid leaking the old hash * table's memory context header. */ for (setno = 0; setno < numGroupingSets; setno++) { MemoryContextResetAndDeleteChildren(node->aggcontexts[setno]); } if (aggnode->aggstrategy == AGG_HASHED) { AggWriteFileControl* TempFileControl = (AggWriteFileControl*)node->aggTempFileControl; int fileNum = TempFileControl->filenum; int64 workMem = SET_NODEMEM(aggnode->plan.operatorMemKB[0], aggnode->plan.dop); int64 maxMem = (aggnode->plan.operatorMaxMem > 0) ? SET_NODEMEM(aggnode->plan.operatorMaxMem, aggnode->plan.dop) : 0; hashFileSource* file = TempFileControl->filesource; if (file != NULL) { for (int i = 0; i < fileNum; i++) { file->close(i); } file->freeFileSource(); } /* * After close the temp file and free the filesource, setting the filesource to NULL * preventing free or close wrong object the next time here. * Problem Scenario: when the first rescan need spill to disk and second rescan * doesn't need, without this set will lead core in freeFileSource as m_tuple was * set to null in the first rescan. */ TempFileControl->filesource = NULL; /* Rebuild an empty hash table */ build_hash_table(node); node->table_filled = false; /* reset hashagg temp file para */ TempFilePara->strategy = MEMORY_HASHAGG; TempFilePara->spillToDisk = false; TempFilePara->finishwrite = false; TempFilePara->totalMem = workMem * 1024L; TempFilePara->useMem = 0; TempFilePara->inmemoryRownum = 0; TempFilePara->runState = HASHAGG_PREPARE; TempFilePara->curfile = -1; TempFilePara->filenum = 0; TempFilePara->maxMem = maxMem * 1024L; TempFilePara->spreadNum = 0; } else { /* * Reset the per-group state (in particular, mark transvalues null) */ rc = memset_s(node->pergroup, sizeof(AggStatePerGroupData) * node->numaggs * numGroupingSets, 0, sizeof(AggStatePerGroupData) * node->numaggs * numGroupingSets); securec_check(rc, "", ""); /* reset to phase 0 */ initialize_phase(node, 0); node->input_done = false; node->projected_set = -1; } /* * if chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ if (outerPlan->chgParam == NULL) ExecReScan(node->ss.ps.lefttree); } /* * AggCheckCallContext - test if a SQL function is being called as an aggregate * * The transition and/or final functions of an aggregate may want to verify * that they are being called as aggregates, rather than as plain SQL * functions. They should use this function to do so. The return value * is nonzero if being called as an aggregate, or zero if not. (Specific * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more * values could conceivably appear in future.) * * If aggcontext isn't NULL, the function also stores at *aggcontext the * identity of the memory context that aggregate transition values are being * stored in. Note that the same aggregate call site (flinfo) may be called * interleaved on different transition values in different contexts, so it's * not kosher to cache aggcontext under fn_extra. It is, however, kosher to * cache it in the transvalue itself (for internal-type transvalues). */ int AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext* aggcontext) { if (fcinfo->context && IsA(fcinfo->context, AggState)) { if (aggcontext != NULL) { AggState* aggstate = ((AggState*)fcinfo->context); *aggcontext = ((AggState*)fcinfo->context)->aggcontexts[aggstate->current_set]; } return AGG_CONTEXT_AGGREGATE; } if (fcinfo->context && IsA(fcinfo->context, WindowAggState)) { if (aggcontext != NULL) *aggcontext = ((WindowAggState*)fcinfo->context)->aggcontext; return AGG_CONTEXT_WINDOW; } #ifndef ENABLE_MULTIPLE_NODES if (fcinfo->context && IsA(fcinfo->context, VecWindowAggState)) { if (aggcontext != NULL) *aggcontext = ((VecWindowAggState*)fcinfo->context)->aggcontext; return AGG_CONTEXT_WINDOW; } #endif /* this is just to prevent "uninitialized variable" warnings */ if (aggcontext != NULL) *aggcontext = NULL; return 0; } /* * aggregate_dummy - dummy execution routine for aggregate functions * * This function is listed as the implementation (prosrc field) of pg_proc * entries for aggregate functions. Its only purpose is to throw an error * if someone mistakenly executes such a function in the normal way. * * Perhaps someday we could assign real meaning to the prosrc field of * an aggregate? */ Datum aggregate_dummy(PG_FUNCTION_ARGS) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("aggregate function %u called as normal function", fcinfo->flinfo->fn_oid))); return (Datum)0; /* keep compiler quiet */ } FORCE_INLINE void agg_spill_to_disk(AggWriteFileControl* TempFileControl, TupleHashTable hashtable, TupleTableSlot* hashslot, int64 numGroups, bool isAgg, int planId, int dop, Instrumentation* instrument) { if (TempFileControl->spillToDisk == false) { Assert(TempFileControl->finishwrite == false); AllocSetContext* set = (AllocSetContext*)(hashtable->tablecxt); int64 totalSize = set->totalSpace; TempFileControl->inmemoryRownum++; /* add 1 when insert one slot to hash table */ /* compute totalSize of AggContext and TupleHashTable */ int64 usedSize = totalSize + TempFileControl->inmemoryRownum * hashtable->entrysize; bool sysBusy = gs_sysmemory_busy(usedSize * dop, false); /* next slot will be inserted into temp file when that useful memory more than total memory */ if (usedSize >= TempFileControl->totalMem || sysBusy) { bool memSpread = false; if (sysBusy) { hashtable->causedBySysRes = sysBusy; TempFileControl->totalMem = usedSize; set->maxSpaceSize = usedSize; MEMCTL_LOG(LOG, "%s(%d) early spilled, workmem: %ldKB, usedmem: %ldKB", isAgg ? "HashAgg" : "HashSetop", planId, TempFileControl->totalMem / 1024L, usedSize / 1024L); /* check if there's enough memory for memory auto spread */ } else if (TempFileControl->maxMem > TempFileControl->totalMem) { TempFileControl->totalMem = usedSize; int64 spreadMem = Min(Min(dywlm_client_get_memory() * 1024L, TempFileControl->totalMem), TempFileControl->maxMem - TempFileControl->totalMem); if (spreadMem > TempFileControl->totalMem * MEM_AUTO_SPREAD_MIN_RATIO) { TempFileControl->totalMem += spreadMem; TempFileControl->spreadNum++; set->maxSpaceSize += spreadMem; memSpread = true; MEMCTL_LOG(DEBUG2, "%s(%d) auto mem spread %ldKB succeed, and work mem is %ldKB.", isAgg ? "HashAgg" : "HashSetop", planId, spreadMem / 1024L, TempFileControl->totalMem / 1024L); } else { MEMCTL_LOG(LOG, "%s(%d) auto mem spread %ldKB failed, and work mem is %ldKB.", isAgg ? "HashAgg" : "HashSetop", planId, spreadMem / 1024L, TempFileControl->totalMem / 1024L); } } /* if spilling to disk, need to record info into hashtable */ if (!memSpread) { if (TempFileControl->inmemoryRownum > 0) hashtable->width /= TempFileControl->inmemoryRownum; hashtable->add_width = false; TempFileControl->spillToDisk = true; /* cache the memory size into instrument for explain performance */ if (instrument != NULL) { instrument->memoryinfo.peakOpMemory = usedSize; } /* estimate num of temp file */ int estsize = getPower2NextNum(4 * numGroups / TempFileControl->inmemoryRownum); TempFileControl->filenum = Max(HASH_MIN_FILENUMBER, estsize); TempFileControl->filenum = Min(TempFileControl->filenum, HASH_MAX_FILENUMBER); TempFileControl->filesource = New(CurrentMemoryContext) hashFileSource(hashslot, TempFileControl->filenum); /* increase current session spill count */ pgstat_increase_session_spill(); } } } } /* * @Description: Early free the memory for Aggregation. * * @param[IN] node: executor state for Agg * @return: void */ void ExecEarlyFreeAggregation(AggState* node) { int setno; AggWriteFileControl* TempFileControl = (AggWriteFileControl*)node->aggTempFileControl; int numGroupingSets = Max(node->maxsets, 1); int fileNum = TempFileControl->filenum; hashFileSource* file = TempFileControl->filesource; PlanState* plan_state = &node->ss.ps; if (plan_state->earlyFreed) return; if (file != NULL) { for (int i = 0; i < fileNum; i++) { file->close(i); } file->freeFileSource(); } /* * Clean up sort_slot first before tuplesort_end(node->sort_in) * because the minimal tuple in sort_slot may point to some memory * in sort_in(tuplesort) when doing external sort. */ (void)ExecClearTuple(node->sort_slot); /* Make sure we have closed any open tuplesorts */ if (node->sort_in) { tuplesort_end(node->sort_in); node->sort_in = NULL; } if (node->sort_out) { tuplesort_end(node->sort_out); node->sort_out = NULL; } if (node->ss.ps.state->es_is_flt_frame) { for (int transno = 0; transno < node->numtrans; transno++) { AggStatePerTrans peraggtrans = &node->pertrans[transno]; for (setno = 0; setno < numGroupingSets; setno++) { if (peraggtrans->sortstates[setno]) { tuplesort_end(peraggtrans->sortstates[setno]); peraggtrans->sortstates[setno] = NULL; } } } } else { for (int aggno = 0; aggno < node->numaggs; aggno++) { AggStatePerAgg peraggstate = &node->peragg[aggno]; for (setno = 0; setno < numGroupingSets; setno++) { if (peraggstate->sortstates[setno]) { tuplesort_end(peraggstate->sortstates[setno]); peraggstate->sortstates[setno] = NULL; } } } } /* Ensure any agg shutdown callbacks have been called */ ReScanExprContext(node->ss.ps.ps_ExprContext); /* * We don't actually free any ExprContexts here (see comment in * ExecFreeExprContext), just unlinking the output one from the plan node * suffices. */ ExecFreeExprContext(&node->ss.ps); /* clean up tuple table */ (void)ExecClearTuple(node->ss.ss_ScanTupleSlot); for (setno = 0; setno < numGroupingSets; setno++) { if (node->aggcontexts[setno]) { MemoryContextDelete(node->aggcontexts[setno]); node->aggcontexts[setno] = NULL; } } EARLY_FREE_LOG(elog(LOG, "Early Free: After early freeing Agg " "at node %d, memory used %d MB.", plan_state->plan->plan_node_id, getSessionMemoryUsageMB())); plan_state->earlyFreed = true; ExecEarlyFree(outerPlanState(node)); } /* * @Function: ExecReSetAgg() * * @Brief: Reset the agg state structure in rescan case under * recursion-stream new iteration condition. * * @Input node: node agg planstate * * @Return: no return value */ void ExecReSetAgg(AggState* node) { Assert(IS_PGXC_DATANODE && node != NULL && (IsA(node, AggState))); Assert(EXEC_IN_RECURSIVE_MODE(node->ss.ps.plan)); ExprContext* econtext = node->ss.ps.ps_ExprContext; PlanState* outerPlan = outerPlanState(node); AggWriteFileControl* TempFilePara = (AggWriteFileControl*)node->aggTempFileControl; Agg* aggnode = (Agg*)node->ss.ps.plan; int numGroupingSets = Max(node->maxsets, 1); int setno; errno_t rc; node->agg_done = false; node->ss.ps.ps_vec_TupFromTlist = false; /* Make sure we have closed any open tuplesorts */ if (node->ss.ps.state->es_is_flt_frame) { for (int transno = 0; transno < node->numtrans; transno++) { for (setno = 0; setno < numGroupingSets; setno++) { AggStatePerTrans peraggtrans = &node->pertrans[transno]; if (peraggtrans->sortstates[setno]) { tuplesort_end(peraggtrans->sortstates[setno]); peraggtrans->sortstates[setno] = NULL; } } } } else { for (int aggno = 0; aggno < node->numaggs; aggno++) { for (int setno = 0; setno < numGroupingSets; setno++) { AggStatePerAgg peraggstate = &node->peragg[aggno]; if (peraggstate->sortstates[setno]) { tuplesort_end(peraggstate->sortstates[setno]); peraggstate->sortstates[setno] = NULL; } } } } /* * We don't need to ReScanExprContext the output tuple context here; * ExecReScan already did it. But we do need to reset our per-grouping-set * contexts, which may have transvalues stored in them. (We use rescan * rather than just reset because transfns may have registered callbacks * that need to be run now.) * * Note that with AGG_HASHED, the hash table is allocated in a sub-context * of the aggcontext. This used to be an issue, but now, resetting a * context automatically deletes sub-contexts too. */ /* Release first tuple of group, if we have made a copy */ if (node->grp_firstTuple != NULL) { tableam_tops_free_tuple(node->grp_firstTuple); node->grp_firstTuple = NULL; } (void)ExecClearTuple(node->ss.ss_ScanTupleSlot); /* Forget current agg values */ rc = memset_s(econtext->ecxt_aggvalues, sizeof(Datum) * node->numaggs, 0, sizeof(Datum) * node->numaggs); securec_check(rc, "\0", "\0"); rc = memset_s(econtext->ecxt_aggnulls, sizeof(bool) * node->numaggs, 0, sizeof(bool) * node->numaggs); securec_check(rc, "\0", "\0"); /* * Release all temp storage. Note that with AGG_HASHED, the hash table is * allocated in a sub-context of the aggcontext. We're going to rebuild * the hash table from scratch, so we need to use * MemoryContextResetAndDeleteChildren() to avoid leaking the old hash * table's memory context header. */ for (setno = 0; setno < numGroupingSets; setno++) { MemoryContextResetAndDeleteChildren(node->aggcontexts[setno]); } if (aggnode->aggstrategy == AGG_HASHED) { AggWriteFileControl* TempFileControl = (AggWriteFileControl*)node->aggTempFileControl; int fileNum = TempFileControl->filenum; int64 workMem = SET_NODEMEM(aggnode->plan.operatorMemKB[0], aggnode->plan.dop); int64 maxMem = (aggnode->plan.operatorMaxMem > 0) ? SET_NODEMEM(aggnode->plan.operatorMaxMem, aggnode->plan.dop) : 0; hashFileSource* file = TempFileControl->filesource; if (file != NULL) { for (int i = 0; i < fileNum; i++) { file->close(i); } file->freeFileSource(); } /* * After close the temp file and free the filesource, setting the filesource to NULL * preventing free or close wrong object the next time here. * Problem Scenario: when the first rescan need spill to disk and second rescan * doesn't need, without this set will lead core in freeFileSource as m_tuple was * set to null in the first rescan. */ TempFileControl->filesource = NULL; /* Rebuild an empty hash table */ build_hash_table(node); node->table_filled = false; /* reset hashagg temp file para */ TempFilePara->strategy = MEMORY_HASHAGG; TempFilePara->spillToDisk = false; TempFilePara->finishwrite = false; TempFilePara->totalMem = workMem * 1024L; TempFilePara->useMem = 0; TempFilePara->inmemoryRownum = 0; TempFilePara->runState = HASHAGG_PREPARE; TempFilePara->curfile = -1; TempFilePara->filenum = 0; TempFilePara->maxMem = maxMem * 1024L; TempFilePara->spreadNum = 0; } else { /* * Reset the per-group state (in particular, mark transvalues null) */ rc = memset_s(node->pergroup, sizeof(AggStatePerGroupData) * node->numaggs * numGroupingSets, 0, sizeof(AggStatePerGroupData) * node->numaggs * numGroupingSets); securec_check(rc, "\0", "\0"); /* reset to phase 0 */ initialize_phase(node, 0); node->input_done = false; node->projected_set = -1; } node->ss.ps.recursive_reset = true; /* * if chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ if (outerPlan->chgParam == NULL) ExecReSetRecursivePlanTree(outerPlanState(node)); } static void initialize_aggregate_flattened(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate) { Plan *plan = aggstate->ss.ps.plan; int64 local_work_mem = SET_NODEMEM(plan->operatorMemKB[0], plan->dop); int64 max_mem = (plan->operatorMaxMem > 0) ? SET_NODEMEM(plan->operatorMaxMem, plan->dop) : 0; if (pertrans->numSortCols > 0) { if (pertrans->sortstates[aggstate->current_set]) { tuplesort_end(pertrans->sortstates[aggstate->current_set]); } if (pertrans->numInputs == 1) { pertrans->sortstates[aggstate->current_set] = tuplesort_begin_datum(pertrans->sortdesc->attrs[0].atttypid, pertrans->sortOperators[0], pertrans->sortCollations[0], pertrans->sortNullsFirst[0], local_work_mem, false); } else { pertrans->sortstates[aggstate->current_set] = tuplesort_begin_heap(pertrans->sortdesc, pertrans->numSortCols, pertrans->sortColIdx, pertrans->sortOperators, pertrans->sortCollations, pertrans->sortNullsFirst, local_work_mem, false, max_mem, plan->plan_node_id, SET_DOP(plan->dop)); } } if (pertrans->initValueIsNull) { pergroupstate->transValue = pertrans->initValue; } else { MemoryContext oldContext; oldContext = MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); pergroupstate->transValue = datumCopy(pertrans->initValue, pertrans->transtypeByVal, pertrans->transtypeLen); MemoryContextSwitchTo(oldContext); } pergroupstate->transValueIsNull = pertrans->initValueIsNull; pergroupstate->noTransValue = pertrans->initValueIsNull; if (pertrans->initCollectValueIsNull) { pergroupstate->collectValue = pertrans->initCollectValue; } else { MemoryContext oldContext; oldContext = MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); pergroupstate->collectValue = datumCopy(pertrans->initCollectValue, pertrans->transtypeByVal, pertrans->transtypeLen); MemoryContextSwitchTo(oldContext); } pergroupstate->collectValueIsNull = pertrans->initCollectValueIsNull; pergroupstate->noCollectValue = pertrans->initCollectValueIsNull; } static void initialize_aggregates_flattened(AggState *aggstate, AggStatePerGroup pergroup, int numReset) { int transno; int numGroupingSets = Max(aggstate->phase->numsets, 1); int setno = 0; AggStatePerTrans transstates = aggstate->pertrans; if (numReset < 1) { numReset = numGroupingSets; } for (transno = 0; transno < aggstate->numtrans; transno++) { AggStatePerTrans pertrans = &transstates[transno]; for (setno = 0; setno < numReset; setno++) { AggStatePerGroup pergroupstate; pergroupstate = &pergroup[transno + (setno * (aggstate->numtrans))]; aggstate->current_set = setno; initialize_aggregate_flattened(aggstate, pertrans, pergroupstate); } } } static void advance_transition_function_flattened(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate) { FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; MemoryContext oldContext; Datum newVal; if (pertrans->transfn.fn_strict) { int numTransInputs = pertrans->numTransInputs; int i; for (i = 1; i <= numTransInputs; i++) { if (fcinfo->argnull[i]) { return; } } if (pergroupstate->noTransValue) { oldContext = MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); pergroupstate->transValue = datumCopy(fcinfo->arg[1], pertrans->transtypeByVal, pertrans->transtypeLen); pergroupstate->transValueIsNull = false; pergroupstate->noTransValue = false; MemoryContextSwitchTo(oldContext); return; } if (pergroupstate->transValueIsNull) { return; } } oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); aggstate->curpertrans = pertrans; fcinfo->arg[0] = pergroupstate->transValue; fcinfo->argnull[0] = pergroupstate->transValueIsNull; fcinfo->argTypes[0] = InvalidOid; fcinfo->isnull = false; Node *origin_fcxt = fcinfo->context; if (IS_PGXC_DATANODE && pertrans->is_avg) { Node *fcontext = (Node *)palloc0(sizeof(Node)); fcontext->type = (NodeTag)(pertrans->is_avg); fcinfo->context = fcontext; } newVal = FunctionCallInvoke(fcinfo); aggstate->curpertrans = NULL; fcinfo->context = origin_fcxt; if (!pertrans->transtypeByVal && DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue)) { if (!fcinfo->isnull) { MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); newVal = datumCopy(newVal, pertrans->transtypeByVal, pertrans->transtypeLen); } if (!pergroupstate->transValueIsNull) pfree(DatumGetPointer(pergroupstate->transValue)); } pergroupstate->transValue = newVal; pergroupstate->transValueIsNull = fcinfo->isnull; MemoryContextSwitchTo(oldContext); } static void advance_aggregates_flattened(AggState *aggstate, AggStatePerGroup pergroup) { bool dummynull; ExecEvalExprSwitchContext(aggstate->phase->evaltrans, aggstate->tmpcontext, &dummynull, NULL); } static void process_ordered_aggregate_single_flattened(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate) { Datum oldVal = (Datum)0; bool oldIsNull = true; bool haveOldVal = false; MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; MemoryContext oldContext; bool isDistinct = (pertrans->numDistinctCols > 0); FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; Datum *newVal = NULL; bool *isNull = NULL; Assert(pertrans->numDistinctCols < 2); tuplesort_performsort(pertrans->sortstates[aggstate->current_set]); InitFunctionCallInfoArgs(*fcinfo, pertrans->numArguments + 1, 1); newVal = fcinfo->arg + 1; isNull = fcinfo->argnull + 1; while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set], true, newVal, isNull)) { MemoryContextReset(workcontext); oldContext = MemoryContextSwitchTo(workcontext); if (isDistinct && haveOldVal && ((oldIsNull && *isNull) || (!oldIsNull && !*isNull && DatumGetBool(FunctionCall2(&pertrans->equalfns[0], oldVal, *newVal))))) { if (!pertrans->inputtypeByVal && !*isNull) pfree(DatumGetPointer(*newVal)); } else { advance_transition_function_flattened(aggstate, pertrans, pergroupstate); if (!oldIsNull && !pertrans->inputtypeByVal) pfree(DatumGetPointer(oldVal)); oldVal = *newVal; oldIsNull = *isNull; haveOldVal = true; } MemoryContextSwitchTo(oldContext); } if (!oldIsNull && !pertrans->inputtypeByVal) pfree(DatumGetPointer(oldVal)); tuplesort_end(pertrans->sortstates[aggstate->current_set]); pertrans->sortstates[aggstate->current_set] = NULL; } static void process_ordered_aggregate_multi_flattened(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate) { MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; TupleTableSlot *slot1 = pertrans->sortslot; TupleTableSlot *slot2 = pertrans->uniqslot; int numTransInputs = pertrans->numTransInputs; int numDistinctCols = pertrans->numDistinctCols; Oid* sortCollations = pertrans->sortCollations; Datum newAbbrevVal = (Datum)0; Datum oldAbbrevVal = (Datum)0; bool haveOldValue = false; int i; tuplesort_performsort(pertrans->sortstates[aggstate->current_set]); (void)ExecClearTuple(slot1); if (slot2 != NULL) { (void)ExecClearTuple(slot2); } while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set], true, slot1, &newAbbrevVal)) { tableam_tslot_getsomeattrs(slot1, numTransInputs); if (numDistinctCols == 0 || !haveOldValue || newAbbrevVal != oldAbbrevVal || !execTuplesMatch(slot1, slot2, numDistinctCols, pertrans->sortColIdx, pertrans->equalfns, workcontext, sortCollations)) { InitFunctionCallInfoArgs(*fcinfo, numTransInputs + 1, 1); for (i = 0; i < numTransInputs; i++) { fcinfo->arg[i + 1] = slot1->tts_values[i]; fcinfo->argnull[i + 1] = slot1->tts_isnull[i]; } advance_transition_function_flattened(aggstate, pertrans, pergroupstate); if (numDistinctCols > 0) { TupleTableSlot *tmpslot = slot2; slot2 = slot1; slot1 = tmpslot; oldAbbrevVal = newAbbrevVal; haveOldValue = true; } } if (numDistinctCols == 0) { MemoryContextReset(workcontext); } (void)ExecClearTuple(slot1); } if (slot2 != NULL) { (void)ExecClearTuple(slot2); } tuplesort_end(pertrans->sortstates[aggstate->current_set]); pertrans->sortstates[aggstate->current_set] = NULL; } static void finalize_aggregate_flattened(AggState *aggstate, AggStatePerAggForFlattenedExpr peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull) { bool anynull = false; FunctionCallInfoData fcinfo; int args_pos = 1; int numFinalArgs = 1; MemoryContext oldContext; ListCell *lc = NULL; AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno]; oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); if (AGGKIND_IS_ORDERED_SET(pertrans->aggref->aggkind)) { numFinalArgs += peragg->numFinalArgs; } InitFunctionCallInfoArgs(fcinfo, numFinalArgs, 1); foreach (lc, peragg->aggdirectargs) { fcinfo.arg[args_pos] = ExecEvalExpr((ExprState *)lfirst(lc), aggstate->ss.ps.ps_ExprContext, &fcinfo.argnull[args_pos], NULL); fcinfo.argTypes[args_pos] = ((ExprState *)lfirst(lc))->resultType; if (anynull == true || fcinfo.argnull[args_pos] == true) { anynull = true; } else { anynull = false; } args_pos++; } if ((pertrans->aggref->aggstage > 0 || aggstate->is_final) && need_adjust_agg_inner_func_type(pertrans->aggref)) { pergroupstate->transValue = pergroupstate->collectValue; pergroupstate->transValueIsNull = pergroupstate->collectValueIsNull; } Assert(args_pos <= numFinalArgs); if (OidIsValid(peragg->finalfn_oid)) { aggstate->curpertrans = pertrans; InitFunctionCallInfoData(fcinfo, &(peragg->finalfn), numFinalArgs, pertrans->aggCollation, (Node *)aggstate, NULL); fcinfo.arg[0] = pergroupstate->transValue; fcinfo.argnull[0] = pergroupstate->transValueIsNull; fcinfo.argTypes[0] = InvalidOid; if (anynull == true || pergroupstate->transValueIsNull == true) { anynull = true; } else { anynull = false; } while (args_pos < numFinalArgs) { fcinfo.arg[args_pos] = (Datum)0; fcinfo.argnull[args_pos] = true; fcinfo.argTypes[args_pos] = InvalidOid; args_pos++; anynull = true; } if (fcinfo.flinfo->fn_strict && anynull) { *resultVal = (Datum)0; *resultIsNull = true; } else { *resultVal = FunctionCallInvoke(&fcinfo); *resultIsNull = fcinfo.isnull; } aggstate->curpertrans = NULL; } else { *resultVal = pergroupstate->transValue; *resultIsNull = pergroupstate->transValueIsNull; } if (!peragg->resulttypeByVal && !*resultIsNull && !MemoryContextContains(CurrentMemoryContext, DatumGetPointer(*resultVal))) { *resultVal = datumCopy(*resultVal, peragg->resulttypeByVal, peragg->resulttypeLen); } MemoryContextSwitchTo(oldContext); } static void finalize_aggregates_flattened(AggState *aggstate, AggStatePerAggForFlattenedExpr peraggs, AggStatePerGroup pergroup, int currentSet) { ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; Datum *aggvalues = econtext->ecxt_aggvalues; bool *aggnulls = econtext->ecxt_aggnulls; int aggno; Assert(currentSet == 0 || ((Agg *)aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); aggstate->current_set = currentSet; for (aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAggForFlattenedExpr peragg = &peraggs[aggno]; int transno = peragg->transno; AggStatePerTrans pertrans = &aggstate->pertrans[transno]; AggStatePerGroup pergroupstate; pergroupstate = &pergroup[transno + (currentSet * (aggstate->numtrans))]; if (pertrans->numSortCols > 0 && pertrans->sortstates[aggstate->current_set] != NULL) { Assert(((Agg *)aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); if (pertrans->numInputs == 1) { process_ordered_aggregate_single_flattened(aggstate, pertrans, pergroupstate); } else { process_ordered_aggregate_multi_flattened(aggstate, pertrans, pergroupstate); } } finalize_aggregate_flattened(aggstate, peragg, pergroupstate, &aggvalues[aggno], &aggnulls[aggno]); } } static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, Aggref *aggref, Oid aggtransfn, Oid aggtranstype, Datum initValue, bool initValueIsNull, Oid *inputTypes, int numArguments, bool isInitNumericSum) { int numGroupingSets = Max(aggstate->maxsets, 1); Expr *transfnexpr; ListCell *lc; int numInputs; int numDirectArgs; List *sortlist; int numSortCols; int numDistinctCols; int i; pertrans->aggref = aggref; pertrans->aggCollation = aggref->inputcollid; pertrans->transfn_oid = aggtransfn; pertrans->initValue = initValue; pertrans->initValueIsNull = initValueIsNull; pertrans->numInputs = numInputs = list_length(aggref->args); pertrans->aggtranstype = aggtranstype; if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) { pertrans->numTransInputs = numInputs; } else { pertrans->numTransInputs = numArguments; } /* init tranfn (except INT8SUMFUNCOID、NUMERICSUMFUNCOID) */ if ((aggref->aggfnoid == INT8SUMFUNCOID || aggref->aggfnoid == NUMERICSUMFUNCOID) && !isInitNumericSum) { return; } numDirectArgs = list_length(aggref->aggdirectargs); build_aggregate_transfn_expr(inputTypes, numArguments, numDirectArgs, aggref->aggvariadic, aggtranstype, aggref->inputcollid, aggtransfn, &transfnexpr); fmgr_info(aggtransfn, &pertrans->transfn); fmgr_info_set_expr((Node *)transfnexpr, &pertrans->transfn); InitFunctionCallInfoData(pertrans->transfn_fcinfo, &pertrans->transfn, pertrans->numTransInputs + 1, pertrans->aggCollation, (Node *)aggstate, NULL); if (pertrans->transfn.fn_strict && pertrans->initValueIsNull) { if (numArguments <= numDirectArgs || !IsBinaryCoercible(inputTypes[numDirectArgs], aggtranstype)) { ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("aggregate %u needs to have compatible input type and transition type", aggref->aggfnoid))); } } get_typlenbyval(aggtranstype, &pertrans->transtypeLen, &pertrans->transtypeByVal); if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) { sortlist = NIL; numSortCols = numDistinctCols = 0; } else if (aggref->aggdistinct) { sortlist = aggref->aggdistinct; numSortCols = numDistinctCols = list_length(sortlist); Assert(numSortCols >= list_length(aggref->aggorder)); } else { sortlist = aggref->aggorder; numSortCols = list_length(sortlist); numDistinctCols = 0; } pertrans->numSortCols = numSortCols; pertrans->numDistinctCols = numDistinctCols; if (numSortCols > 0) { pertrans->sortdesc = ExecTypeFromTL(aggref->args, false); pertrans->sortslot = ExecInitExtraTupleSlot(estate); ExecSetSlotDescriptor(pertrans->sortslot, pertrans->sortdesc); Assert(((Agg *)aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); if (numInputs == 1) { get_typlenbyval(inputTypes[numDirectArgs], &pertrans->inputtypeLen, &pertrans->inputtypeByVal); } else if (numDistinctCols > 0) { pertrans->uniqslot = ExecInitExtraTupleSlot(estate); ExecSetSlotDescriptor(pertrans->uniqslot, pertrans->sortdesc); } pertrans->sortColIdx = (AttrNumber *)palloc(numSortCols * sizeof(AttrNumber)); pertrans->sortOperators = (Oid *)palloc(numSortCols * sizeof(Oid)); pertrans->sortCollations = (Oid *)palloc(numSortCols * sizeof(Oid)); pertrans->sortNullsFirst = (bool *)palloc(numSortCols * sizeof(bool)); i = 0; foreach (lc, sortlist) { SortGroupClause *sortcl = (SortGroupClause *)lfirst(lc); TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args); Assert(OidIsValid(sortcl->sortop)); pertrans->sortColIdx[i] = tle->resno; pertrans->sortOperators[i] = sortcl->sortop; pertrans->sortCollations[i] = exprCollation((Node *)tle->expr); pertrans->sortNullsFirst[i] = sortcl->nulls_first; i++; } Assert(i == numSortCols); } if (aggref->aggdistinct) { Assert(numArguments > 0); pertrans->equalfns = (FmgrInfo *)palloc(numDistinctCols * sizeof(FmgrInfo)); i = 0; foreach (lc, aggref->aggdistinct) { SortGroupClause *sortcl = (SortGroupClause *)lfirst(lc); fmgr_info(get_opcode(sortcl->eqop), &pertrans->equalfns[i]); i++; } Assert(i == numDistinctCols); } pertrans->sortstates = (Tuplesortstate **)palloc0(sizeof(Tuplesortstate *) * numGroupingSets); } static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, int lastaggno, List **same_input_transnos) { int aggno; AggStatePerAggForFlattenedExpr peraggs; *same_input_transnos = NIL; if (contain_volatile_functions((Node *)newagg)) { return -1; } peraggs = aggstate->peragg_flattened; for (aggno = 0; aggno <= lastaggno; aggno++) { AggStatePerAggForFlattenedExpr peragg; Aggref *existingRef; peragg = &peraggs[aggno]; existingRef = peragg->aggref; if (newagg->inputcollid != existingRef->inputcollid || newagg->aggstar != existingRef->aggstar || newagg->aggvariadic != existingRef->aggvariadic || newagg->aggkind != existingRef->aggkind || !equal(newagg->aggdirectargs, existingRef->aggdirectargs) || !equal(newagg->args, existingRef->args) || !equal(newagg->aggorder, existingRef->aggorder) || !equal(newagg->aggdistinct, existingRef->aggdistinct)) continue; if (newagg->aggfnoid == existingRef->aggfnoid && newagg->aggtype == existingRef->aggtype && newagg->aggcollid == existingRef->aggcollid) { list_free(*same_input_transnos); *same_input_transnos = NIL; return aggno; } *same_input_transnos = lappend_int(*same_input_transnos, peragg->transno); } return -1; } static int find_compatible_pertrans(AggState *aggstate, Oid aggfnOid, Oid *aggtransfnOid, Oid *aggtranstype, Datum initValue, bool *initValueIsNull, List *possible_matches) { ListCell *lc; int result = -1; Oid newtransfnOid = *aggtransfnOid; Oid newaggtranstype = *aggtranstype; bool newinitValueIsNull = *initValueIsNull; #ifndef ENABLE_MULTIPLE_NODES numeric_transfn_info_change(aggfnOid, &newtransfnOid, &newaggtranstype); newinitValueIsNull = numeric_agg_trans_initvalisnull(newtransfnOid, newinitValueIsNull); #endif foreach (lc, possible_matches) { int transno = lfirst_int(lc); AggStatePerTrans pertrans = &aggstate->pertrans[transno]; Oid newtransfnOidtemp = pertrans->transfn_oid; Oid newaggtranstypetemp = pertrans->aggtranstype; bool newinitValueIsNulltemp = pertrans->initValueIsNull; #ifndef ENABLE_MULTIPLE_NODES Oid aggfnOidtemp = pertrans->aggref->aggfnoid; if (aggfnOidtemp == INT8SUMFUNCOID || aggfnOidtemp == NUMERICSUMFUNCOID) { numeric_transfn_info_change(aggfnOidtemp, &newtransfnOidtemp, &newaggtranstypetemp); newinitValueIsNulltemp = numeric_agg_trans_initvalisnull(newtransfnOidtemp, newinitValueIsNulltemp); } #endif if (newtransfnOid != newtransfnOidtemp || newaggtranstype != newaggtranstypetemp) { continue; } if (newinitValueIsNull && newinitValueIsNulltemp) { result = transno; break; } if (!newinitValueIsNull && !newinitValueIsNulltemp && datumIsEqual(initValue, pertrans->initValue, pertrans->transtypeByVal, pertrans->transtypeLen)) { result = transno; break; } } #ifndef ENABLE_MULTIPLE_NODES if (aggfnOid != INT8SUMFUNCOID && aggfnOid != NUMERICSUMFUNCOID) { *aggtransfnOid = newtransfnOid; *aggtranstype = newaggtranstype; *initValueIsNull = newinitValueIsNull; } #endif return result; } static void exec_lookups_agg(AggState *aggstate, Agg *node, EState *estate) { int aggno; ListCell *l = NULL; int i = 0; List *combined_inputeval; int column_offset; AggStatePerAgg peragg = aggstate->peragg; int numGroupingSets = aggstate->maxsets; /* * Perform lookups of aggregate function info, and initialize the * unchanging fields of the per-agg data. We also detect duplicate * aggregates (for example, "SELECT sum(x) ... HAVING sum(x) > 0"). When * duplicates are detected, we only make an AggStatePerAgg struct for the * first one. The clones are simply pointed at the same result entry by * giving them duplicate aggno values. */ aggno = -1; foreach (l, aggstate->aggs) { AggrefExprState *aggrefstate = (AggrefExprState *)lfirst(l); Aggref *aggref = (Aggref *)aggrefstate->xprstate.expr; AggStatePerAgg peraggstate; Oid inputTypes[FUNC_MAX_ARGS]; int numArguments; int numDirectArgs; int numInputs; int numSortCols; int numDistinctCols; List *sortlist = NIL; HeapTuple aggTuple; Form_pg_aggregate aggform; Oid aggtranstype; AclResult aclresult; Oid transfn_oid, finalfn_oid; #ifdef PGXC Oid collectfn_oid; Expr *collectfnexpr = NULL; #endif /* PGXC */ Expr *transfnexpr = NULL; Expr *finalfnexpr = NULL; Datum textInitVal; ListCell *lc = NULL; /* Planner should have assigned aggregate to correct level */ Assert(aggref->agglevelsup == 0); /* Look for a previous duplicate aggregate */ for (i = 0; i <= aggno; i++) { if (equal(aggref, peragg[i].aggref) && !contain_volatile_functions((Node *)aggref)) break; } if (i <= aggno) { /* Found a match to an existing entry, so just mark it */ aggrefstate->aggno = i; continue; } /* Nope, so assign a new PerAgg record */ peraggstate = &peragg[++aggno]; /* Mark Aggref state node with assigned index in the result array */ aggrefstate->aggno = aggno; /* Fill in the peraggstate data */ peraggstate->aggrefstate = aggrefstate; peraggstate->aggref = aggref; peraggstate->sortstates = (Tuplesortstate **)palloc0(sizeof(Tuplesortstate *) * numGroupingSets); for (int currentsortno = 0; currentsortno < numGroupingSets; currentsortno++) peraggstate->sortstates[currentsortno] = NULL; /* Fetch the pg_aggregate row */ aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid)); if (!HeapTupleIsValid(aggTuple)) ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_EXECUTOR), errmsg("cache lookup failed for aggregate %u", aggref->aggfnoid))); aggform = (Form_pg_aggregate)GETSTRUCT(aggTuple); /* Check permission to call aggregate function */ aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), ACL_EXECUTE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(aggref->aggfnoid)); peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; #ifdef PGXC peraggstate->collectfn_oid = collectfn_oid = aggform->aggcollectfn; peraggstate->is_avg = false; if (finalfn_oid == 1830) { peraggstate->is_avg = true; } #ifdef ENABLE_MULTIPLE_NODES /* * For PGXC final and collection functions are used to combine results at Coordinator, * disable those for Datanode */ if (IS_PGXC_DATANODE) { if (!u_sess->exec_cxt.under_stream_runtime) { peraggstate->finalfn_oid = finalfn_oid = InvalidOid; peraggstate->collectfn_oid = collectfn_oid = InvalidOid; } else { if (need_adjust_agg_inner_func_type(peraggstate->aggref)) { if (!node->is_final && !node->single_node) peraggstate->finalfn_oid = finalfn_oid = InvalidOid; if (aggref->aggstage == 0 && !node->is_final && !node->single_node) peraggstate->collectfn_oid = collectfn_oid = InvalidOid; } } } #else if (IS_STREAM_PLAN || StreamThreadAmI()) { if (need_adjust_agg_inner_func_type(peraggstate->aggref)) { if (!node->is_final && !node->single_node) { peraggstate->finalfn_oid = finalfn_oid = InvalidOid; } if (aggref->aggstage == 0 && !node->is_final && !node->single_node) { peraggstate->collectfn_oid = collectfn_oid = InvalidOid; } } } #endif /* ENABLE_MULTIPLE_NODES */ #ifdef USE_SPQ /* Final function only required if we're finalizing the aggregates */ if (t_thrd.spq_ctx.spq_role != ROLE_UTILITY) { if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplittype)) peraggstate->finalfn_oid = finalfn_oid = InvalidOid; else peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; } #endif /* SPQ */ #endif /* PGXC */ /* Check that aggregate owner has permission to call component fns */ { HeapTuple procTuple; Oid aggOwner; procTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid)); if (!HeapTupleIsValid(procTuple)) ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_EXECUTOR), errmsg("cache lookup failed for aggregate function %u", aggref->aggfnoid))); aggOwner = ((Form_pg_proc)GETSTRUCT(procTuple))->proowner; ReleaseSysCache(procTuple); aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, ACL_EXECUTE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid)); if (OidIsValid(finalfn_oid)) { aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, ACL_EXECUTE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(finalfn_oid)); } #ifdef PGXC if (OidIsValid(collectfn_oid)) { aclresult = pg_proc_aclcheck(collectfn_oid, aggOwner, ACL_EXECUTE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(collectfn_oid)); } #endif /* PGXC */ } /* * Get the the number of actual arguments and identify the actual * datatypes of the aggregate inputs (saved in inputTypes). When * agg accepts ANY or a polymorphic type, the actual datatype * could be different from the agg's declared input types. */ numArguments = get_aggregate_argtypes(aggref, inputTypes, FUNC_MAX_ARGS); peraggstate->numArguments = numArguments; /* Get the direct arguments */ numDirectArgs = list_length(aggref->aggdirectargs); /* Get the number of aggregated input columns */ numInputs = list_length(aggref->args); peraggstate->numInputs = numInputs; /* Detect the number of columns passed to the transfn */ if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) peraggstate->numTransInputs = numInputs; else peraggstate->numTransInputs = numArguments; /* * When agg accepts ANY or a polymorphic type, resolve actual * type of transition state */ aggtranstype = resolve_aggregate_transtype(aggref->aggfnoid, aggform->aggtranstype, inputTypes, numArguments); /* build expression trees using actual argument & result types */ build_trans_aggregate_fnexprs(numArguments, numDirectArgs, AGGKIND_IS_ORDERED_SET(aggref->aggkind), aggref->aggvariadic, aggtranstype, inputTypes, aggref->aggtype, aggref->inputcollid, transfn_oid, finalfn_oid, &transfnexpr, &finalfnexpr); #ifdef PGXC if (OidIsValid(collectfn_oid)) { /* we expect final function expression to be NULL in call to * build_aggregate_fnexprs below, since InvalidOid is passed for * finalfn_oid argument. Use a dummy expression to accept that. */ Expr *dummyexpr = NULL; /* * for XC, we need to setup the collection function expression as well. * Use build_aggregate_fnexpr() with invalid final function oid, and collection * function information instead of transition function information. * We should really be adding this step inside * build_aggregate_fnexprs() but this way it becomes easy to merge. */ build_aggregate_fnexprs(&aggtranstype, 1, aggtranstype, aggref->aggtype, aggref->inputcollid, collectfn_oid, InvalidOid, &collectfnexpr, &dummyexpr); Assert(!dummyexpr); } #endif /* PGXC */ /* set up infrastructure for calling the transfn and finalfn */ fmgr_info(transfn_oid, &peraggstate->transfn); fmgr_info_set_expr((Node *)transfnexpr, &peraggstate->transfn); if (OidIsValid(finalfn_oid)) { fmgr_info(finalfn_oid, &peraggstate->finalfn); fmgr_info_set_expr((Node *)finalfnexpr, &peraggstate->finalfn); } #ifdef PGXC if (OidIsValid(collectfn_oid)) { fmgr_info(collectfn_oid, &peraggstate->collectfn); peraggstate->collectfn.fn_expr = (Node *)collectfnexpr; } #endif /* PGXC */ peraggstate->aggCollation = aggref->inputcollid; InitFunctionCallInfoData(peraggstate->transfn_fcinfo, &peraggstate->transfn, peraggstate->numTransInputs + 1, peraggstate->aggCollation, (Node *)aggstate, NULL); /* get info zbout relevant datatypes */ get_typlenbyval(aggref->aggtype, &peraggstate->resulttypeLen, &peraggstate->resulttypeByVal); get_typlenbyval(aggtranstype, &peraggstate->transtypeLen, &peraggstate->transtypeByVal); /* * initval is potentially null, so don't try to access it as a struct * field. Must do it the hard way with SysCacheGetAttr. */ textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitval, &peraggstate->initValueIsNull); if (peraggstate->initValueIsNull) { peraggstate->initValue = (Datum)0; } else { peraggstate->initValue = GetAggInitVal(textInitVal, aggtranstype); } #ifdef PGXC /* * initval for collection function is potentially null, so don't try to * access it as a struct field. Must do it the hard way with * SysCacheGetAttr. */ textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitcollect, &peraggstate->initCollectValueIsNull); if (peraggstate->initCollectValueIsNull) { peraggstate->initCollectValue = (Datum)0; } else { peraggstate->initCollectValue = GetAggInitVal(textInitVal, aggtranstype); } #endif /* PGXC */ /* * If the transfn is strict and the initval is NULL, make sure input * type and transtype are the same (or at least binary-compatible), so * that it's OK to use the first input value as the initial * transValue. This should have been checked at agg definition time, * but just in case... */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { if (numArguments < numDirectArgs || !IsBinaryCoercible(inputTypes[numDirectArgs], aggtranstype)) ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("aggregate %u needs to have compatible input type and transition type", aggref->aggfnoid))); } /* * If we're doing either DISTINCT or ORDER BY, then we have a list of * SortGroupClause nodes; fish out the data in them and stick them * into arrays. * For oerdered set agg, we handle the sort operation in the transfn * function, so we can ignore ORDER BY. */ if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) { numSortCols = 0; numDistinctCols = 0; } else if (aggref->aggdistinct) { sortlist = aggref->aggdistinct; numSortCols = numDistinctCols = list_length(sortlist); Assert(numSortCols >= list_length(aggref->aggorder)); } else { sortlist = aggref->aggorder; numSortCols = list_length(sortlist); numDistinctCols = 0; } peraggstate->numSortCols = numSortCols; peraggstate->numDistinctCols = numDistinctCols; if (numSortCols > 0) { /* Get a tupledesc and slot corresponding to the aggregated inputs * (including sort expressions) of the agg. */ peraggstate->sortdesc = ExecTypeFromTL(aggref->args, false); peraggstate->sortslot = ExecInitExtraTupleSlot(estate); ExecSetSlotDescriptor(peraggstate->sortslot, peraggstate->sortdesc); /* * We don't implement DISTINCT or ORDER BY aggs in the HASHED case * (yet) */ Assert(node->aggstrategy != AGG_HASHED); /* If we have only one input, we need its len/byval info. */ if (numInputs == 1) { get_typlenbyval(inputTypes[numDirectArgs], &peraggstate->inputtypeLen, &peraggstate->inputtypeByVal); } else if (numDistinctCols > 0) { /* we will need an extra slot to store prior values */ peraggstate->uniqslot = ExecInitExtraTupleSlot(estate); ExecSetSlotDescriptor(peraggstate->uniqslot, peraggstate->sortdesc); } /* Extract the sort information for use later */ peraggstate->sortColIdx = (AttrNumber *)palloc(numSortCols * sizeof(AttrNumber)); peraggstate->sortOperators = (Oid *)palloc(numSortCols * sizeof(Oid)); peraggstate->sortCollations = (Oid *)palloc(numSortCols * sizeof(Oid)); peraggstate->sortNullsFirst = (bool *)palloc(numSortCols * sizeof(bool)); i = 0; foreach (lc, sortlist) { SortGroupClause *sortcl = (SortGroupClause *)lfirst(lc); TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args); /* the parser should have made sure of this */ Assert(OidIsValid(sortcl->sortop)); peraggstate->sortColIdx[i] = tle->resno; peraggstate->sortOperators[i] = sortcl->sortop; peraggstate->sortCollations[i] = exprCollation((Node *)tle->expr); peraggstate->sortNullsFirst[i] = sortcl->nulls_first; i++; } Assert(i == numSortCols); } if (aggref->aggdistinct) { Assert(numArguments > 0); /* * We need the equal function for each DISTINCT comparison we will * make. */ peraggstate->equalfns = (FmgrInfo *)palloc(numDistinctCols * sizeof(FmgrInfo)); i = 0; foreach (lc, aggref->aggdistinct) { SortGroupClause *sortcl = (SortGroupClause *)lfirst(lc); fmgr_info(get_opcode(sortcl->eqop), &peraggstate->equalfns[i]); i++; } Assert(i == numDistinctCols); } ReleaseSysCache(aggTuple); } /* Update numaggs to match number of unique aggregates found */ aggstate->numaggs = aggno + 1; combined_inputeval = NIL; column_offset = 0; for (int transno = 0; transno < aggstate->numaggs; transno++) { AggStatePerAggData *pertrans = &peragg[transno]; ListCell *arg; pertrans->inputoff = column_offset; foreach (arg, pertrans->aggref->args) { TargetEntry *source_tle = (TargetEntry *)lfirst(arg); TargetEntry *tle; Assert(IsA(source_tle, TargetEntry)); tle = flatCopyTargetEntry(source_tle); tle->resno += column_offset; combined_inputeval = lappend(combined_inputeval, tle); } column_offset += list_length(pertrans->aggref->args); } aggstate->evaldesc = ExecTypeFromTL(combined_inputeval, false); aggstate->evalslot = ExecInitExtraTupleSlot(estate); if (estate->es_is_flt_frame) { aggstate->evalproj = ExecBuildProjectionInfoByFlatten(combined_inputeval, aggstate->tmpcontext, aggstate->evalslot, &aggstate->ss.ps, NULL); } else { combined_inputeval = (List *)ExecInitExprByRecursion((Expr *)combined_inputeval, (PlanState *)aggstate); aggstate->evalproj = ExecBuildProjectionInfoByRecursion(combined_inputeval, aggstate->tmpcontext, aggstate->evalslot, NULL); } ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc); return; } static void exec_agg_finalfn_init(AggState *aggstate, Agg *node, AggStatePerAggForFlattenedExpr peragg, AggStatePerTrans pertrans, Oid *input_types, int num_arguments) { AclResult aclresult; Oid finalfn_oid = peragg->finalfn_oid; Oid collectfn_oid = peragg->collectfn_oid; Oid transfn_oid = pertrans->transfn_oid; Aggref *aggref = pertrans->aggref; Oid aggtranstype = pertrans->aggtranstype; Expr *finalfnexpr = NULL; peragg->is_avg = false; if (finalfn_oid == 1830) { peragg->is_avg = true; } #ifdef ENABLE_MULTIPLE_NODES /* * For PGXC final and collection functions are used to combine results at Coordinator, * disable those for Datanode */ if (IS_PGXC_DATANODE) { if (!u_sess->exec_cxt.under_stream_runtime) { peragg->finalfn_oid = finalfn_oid = InvalidOid; collectfn_oid = InvalidOid; } else { if (need_adjust_agg_inner_func_type(peraggstate->aggref)) { if (!node->is_final && !node->single_node) peragg->finalfn_oid = finalfn_oid = InvalidOid; if (aggref->aggstage == 0 && !node->is_final && !node->single_node) collectfn_oid = InvalidOid; } } } #else if (IS_STREAM_PLAN || StreamThreadAmI()) { if (need_adjust_agg_inner_func_type(peragg->aggref)) { if (!node->is_final && !node->single_node) { peragg->finalfn_oid = finalfn_oid = InvalidOid; } if (aggref->aggstage == 0 && !node->is_final && !node->single_node) { collectfn_oid = InvalidOid; } } } #endif /* ENABLE_MULTIPLE_NODES */ /* Check that aggregate owner has permission to call component fns */ { HeapTuple procTuple; Oid aggOwner; procTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid)); if (!HeapTupleIsValid(procTuple)) ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_EXECUTOR), errmsg("cache lookup failed for aggregate function %u", aggref->aggfnoid))); aggOwner = ((Form_pg_proc)GETSTRUCT(procTuple))->proowner; ReleaseSysCache(procTuple); aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, ACL_EXECUTE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid)); if (OidIsValid(finalfn_oid)) { aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, ACL_EXECUTE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(finalfn_oid)); } if (OidIsValid(collectfn_oid)) { aclresult = pg_proc_aclcheck(collectfn_oid, aggOwner, ACL_EXECUTE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(collectfn_oid)); } } if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) { peragg->numFinalArgs = num_arguments + 1; } else { peragg->numFinalArgs = 1; } /* Initialize any direct-argument expressions */ peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs, (PlanState *)aggstate); /* * build expression trees using actual argument & result types for the * finalfn, if it exists */ if (OidIsValid(finalfn_oid)) { build_aggregate_finalfn_expr(input_types, peragg->numFinalArgs, aggtranstype, aggref->aggtype, aggref->inputcollid, finalfn_oid, &finalfnexpr); fmgr_info(finalfn_oid, &peragg->finalfn); fmgr_info_set_expr((Node *)finalfnexpr, &peragg->finalfn); } } static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *estate) { int aggno = -1; int transno = -1; int numaggrefs; ListCell *l = NULL; AggStatePerAggForFlattenedExpr peragg_flattened; AggStatePerTrans pertransstates; numaggrefs = list_length(aggstate->aggs); pertransstates = aggstate->pertrans; peragg_flattened = aggstate->peragg_flattened; /* ----------------- * Perform lookups of aggregate function info, and initialize the * unchanging fields of the per-agg and per-trans data. * * We try to optimize by detecting duplicate aggregate functions so that * their state and final values are re-used, rather than needlessly being * re-calculated independently. We also detect aggregates that are not * the same, but which can share the same transition state. * * Scenarios: * * 1. An aggregate function appears more than once in query: * * SELECT SUM(x) FROM ... HAVING SUM(x) > 0 * * Since the aggregates are the identical, we only need to calculate * the calculate it once. Both aggregates will share the same 'aggno' * value. * * 2. Two different aggregate functions appear in the query, but the * aggregates have the same transition function and initial value, but * different final function: * * SELECT SUM(x), AVG(x) FROM ... * * In this case we must create a new peragg for the varying aggregate, * and need to call the final functions separately, but can share the * same transition state. * * For either of these optimizations to be valid, the aggregate's * arguments must be the same, including any modifiers such as ORDER BY, * DISTINCT and FILTER, and they mustn't contain any volatile functions. * ----------------- */ aggno = -1; transno = -1; foreach (l, aggstate->aggs) { AggrefExprState *aggrefstate = (AggrefExprState *)lfirst(l); Aggref *aggref = aggrefstate->aggref; AggStatePerAggForFlattenedExpr peragg; AggStatePerTrans pertrans; int existing_aggno; int existing_transno; List *same_input_transnos; Oid inputTypes[FUNC_MAX_ARGS]; int numArguments; HeapTuple aggTuple; Form_pg_aggregate aggform; AclResult aclresult; Oid transfn_oid; Oid collectfn_oid; Expr *collectfnexpr = NULL; Oid aggtranstype; Datum textInitVal; Datum initValue; bool initValueIsNull; Datum initCollectValue; bool initCollectValueIsNull; /* Planner should have assigned aggregate to correct level */ Assert(aggref->agglevelsup == 0); /* 1. Check for already processed aggs which can be re-used */ existing_aggno = find_compatible_peragg(aggref, aggstate, aggno, &same_input_transnos); if (existing_aggno != -1) { /* * Existing compatible agg found. so just point the Aggref to the * same per-agg struct. */ aggrefstate->aggno = existing_aggno; continue; } /* Mark Aggref state node with assigned index in the result array */ peragg = &peragg_flattened[++aggno]; peragg->aggref = aggref; aggrefstate->aggno = aggno; /* Fetch the pg_aggregate row */ aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid)); if (!HeapTupleIsValid(aggTuple)) ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_EXECUTOR), errmsg("cache lookup failed for aggregate %u", aggref->aggfnoid))); aggform = (Form_pg_aggregate)GETSTRUCT(aggTuple); /* Check permission to call aggregate function */ aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), ACL_EXECUTE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(aggref->aggfnoid)); transfn_oid = aggform->aggtransfn; aggtranstype = aggform->aggtranstype; peragg->finalfn_oid = aggform->aggfinalfn; peragg->collectfn_oid = collectfn_oid = aggform->aggcollectfn; /* get info about the result type's datatype */ get_typlenbyval(aggref->aggtype, &peragg->resulttypeLen, &peragg->resulttypeByVal); if (OidIsValid(collectfn_oid)) { /* we expect final function expression to be NULL in call to * build_aggregate_fnexprs below, since InvalidOid is passed for * finalfn_oid argument. Use a dummy expression to accept that. */ Expr *dummyexpr = NULL; /* * for XC, we need to setup the collection function expression as well. * Use build_aggregate_fnexpr() with invalid final function oid, and collection * function information instead of transition function information. * We should really be adding this step inside * build_aggregate_fnexprs() but this way it becomes easy to merge. */ build_aggregate_fnexprs(&aggtranstype, 1, aggtranstype, aggref->aggtype, aggref->inputcollid, collectfn_oid, InvalidOid, &collectfnexpr, &dummyexpr); Assert(!dummyexpr); } /* * initval is potentially null, so don't try to access it as a struct * field. Must do it the hard way with SysCacheGetAttr. */ textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitval, &initValueIsNull); if (initValueIsNull) initValue = (Datum)0; else initValue = GetAggInitVal(textInitVal, aggtranstype); /* * initval for collection function is potentially null, so don't try to * access it as a struct field. Must do it the hard way with * SysCacheGetAttr. */ textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitcollect, &initCollectValueIsNull); if (initCollectValueIsNull) { initCollectValue = (Datum)0; } else { initCollectValue = GetAggInitVal(textInitVal, aggtranstype); } /* * 2. Build working state for invoking the transition function, or * look up previously initialized working state, if we can share it. * * find_compatible_peragg() already collected a list of per-Trans's * with the same inputs. Check if any of them have the same transition * function and initial value. */ /* for collectfn, always build a new peragg*/ if (OidIsValid(collectfn_oid) && (aggref->aggstage > 0 || aggstate->is_final)) { existing_transno = -1; } else { existing_transno = find_compatible_pertrans(aggstate, aggref->aggfnoid, &transfn_oid, &aggtranstype, initValue, &initValueIsNull, same_input_transnos); } /* * Get the the number of actual arguments and identify the actual * datatypes of the aggregate inputs (saved in inputTypes). When * agg accepts ANY or a polymorphic type, the actual datatype * could be different from the agg's declared input types. */ numArguments = get_aggregate_argtypes(aggref, inputTypes, FUNC_MAX_ARGS); /* * When agg accepts ANY or a polymorphic type, resolve actual * type of transition state */ aggtranstype = resolve_aggregate_transtype(aggref->aggfnoid, aggtranstype, inputTypes, numArguments); if (existing_transno != -1) { /* * Existing compatible trans found, so just point the 'peragg' to * the same per-trans struct. */ pertrans = &pertransstates[existing_transno]; peragg->transno = existing_transno; } else { pertrans = &pertransstates[++transno]; build_pertrans_for_aggref(pertrans, aggstate, estate, aggref, transfn_oid, aggtranstype, initValue, initValueIsNull, inputTypes, numArguments, false); if (OidIsValid(collectfn_oid)) { fmgr_info(collectfn_oid, &pertrans->collectfn); pertrans->collectfn.fn_expr = (Node *)collectfnexpr; /* init collectfn_fcinfo*/ InitFunctionCallInfoData(pertrans->collectfn_fcinfo, &pertrans->collectfn, pertrans->numTransInputs + 1, pertrans->aggCollation, (Node *)aggstate, NULL); pertrans->initCollectValue = initCollectValue; pertrans->initCollectValueIsNull = initCollectValueIsNull; } peragg->transno = transno; } /* init final (except INT8SUMFUNCOID、NUMERICSUMFUNCOID) */ if (aggref->aggfnoid != INT8SUMFUNCOID && aggref->aggfnoid != NUMERICSUMFUNCOID) { #ifndef ENABLE_MULTIPLE_NODES numeric_finalfn_info_change(aggref->aggfnoid, &peragg->finalfn_oid); #endif exec_agg_finalfn_init(aggstate, node, peragg, pertrans, inputTypes, numArguments); } ReleaseSysCache(aggTuple); } /* Update numaggs to match number of unique aggregates found */ aggstate->numaggs = aggno + 1; aggstate->numtrans = transno + 1; /* * Last, check whether any more aggregates got added onto the node while * we processed the expressions for the aggregate arguments (including not * only the regular arguments and FILTER expressions handled immediately * above, but any direct arguments we might've handled earlier). If so, * we have nested aggregate functions, which is semantically nonsensical, * so complain. (This should have been caught by the parser, so we don't * need to work hard on a helpful error message; but we defend against it * here anyway, just to be sure.) */ if (numaggrefs != list_length(aggstate->aggs)) ereport(ERROR, (errcode(ERRCODE_GROUPING_ERROR), errmsg("aggregate function calls cannot be nested"))); /* INT8SUMFUNCOID and NUMERICSUMFUNCOID init */ for (int i = 0; i < aggstate->numaggs; i++) { Oid inputTypes[FUNC_MAX_ARGS]; int numArguments; AggStatePerAggForFlattenedExpr peragg = &aggstate->peragg_flattened[i]; AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno]; Oid aggfnoid = peragg->aggref->aggfnoid; if (aggfnoid == INT8SUMFUNCOID || aggfnoid == NUMERICSUMFUNCOID) { #ifndef ENABLE_MULTIPLE_NODES if (aggstate->numtrans < aggstate->numaggs) { numeric_transfn_info_change(aggfnoid, &pertrans->transfn_oid, &pertrans->aggtranstype); pertrans->initValueIsNull = numeric_agg_trans_initvalisnull(pertrans->transfn_oid, pertrans->initValueIsNull); numeric_finalfn_info_change(aggfnoid, &peragg->finalfn_oid); } #endif numArguments = get_aggregate_argtypes(peragg->aggref, inputTypes, FUNC_MAX_ARGS); pertrans->aggtranstype = resolve_aggregate_transtype(aggfnoid, pertrans->aggtranstype, inputTypes, numArguments); build_pertrans_for_aggref(pertrans, aggstate, estate, peragg->aggref, pertrans->transfn_oid, pertrans->aggtranstype, pertrans->initValue, pertrans->initValueIsNull, inputTypes, numArguments, true); exec_agg_finalfn_init(aggstate, node, peragg, pertrans, inputTypes, numArguments); } } /* * Build expressions doing all the transition work at once. We build a * different one for each phase, as the number of transition function * invocation can differ between phases. Note this'll work both for * transition and combination functions (although there'll only be one * phase in the latter case). */ for (int phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) { AggStatePerPhase phase = &aggstate->phases[phaseidx]; bool dohash = false; bool dosort = false; /* phase 0 doesn't necessarily exist */ if (!phase->aggnode) continue; if (phase->aggstrategy == AGG_PLAIN || phase->aggstrategy == AGG_SORTED) { dohash = false; dosort = true; } else if (phase->aggstrategy == AGG_HASHED) { dohash = true; dosort = false; } else Assert(false); phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash); } }