diff --git a/src/common/backend/parser/parse_agg.cpp b/src/common/backend/parser/parse_agg.cpp index d87a453e0..af09c7165 100644 --- a/src/common/backend/parser/parse_agg.cpp +++ b/src/common/backend/parser/parse_agg.cpp @@ -1816,6 +1816,73 @@ void build_trans_aggregate_fnexprs(int agg_num_inputs, int agg_num_direct_inputs /* finalfn is currently never treated as variadic */ } + +void build_aggregate_transfn_expr(Oid *agg_input_types, int agg_num_inputs, int agg_num_direct_inputs, + bool agg_variadic, Oid agg_state_type, Oid agg_input_collation, Oid transfn_oid, + Expr **transfnexpr) +{ + Param *argp; + List *args; + FuncExpr *fexpr; + int i; + + argp = makeNode(Param); + argp->paramkind = PARAM_EXEC; + argp->paramid = -1; + argp->paramtype = agg_state_type; + argp->paramtypmod = -1; + argp->paramcollid = agg_input_collation; + argp->location = -1; + + args = list_make1(argp); + + for (i = agg_num_direct_inputs; i < agg_num_inputs; i++) { + argp = makeNode(Param); + argp->paramkind = PARAM_EXEC; + argp->paramid = -1; + argp->paramtype = agg_input_types[i]; + argp->paramtypmod = -1; + argp->paramcollid = agg_input_collation; + argp->location = -1; + args = lappend(args, argp); + } + + fexpr = makeFuncExpr(transfn_oid, agg_state_type, args, InvalidOid, agg_input_collation, COERCE_EXPLICIT_CALL); + fexpr->funcvariadic = agg_variadic; + *transfnexpr = (Expr *)fexpr; +} + +void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, + Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr) +{ + Param *argp; + List *args; + int i; + + argp = makeNode(Param); + argp->paramkind = PARAM_EXEC; + argp->paramid = -1; + argp->paramtype = agg_state_type; + argp->paramtypmod = -1; + argp->paramcollid = agg_input_collation; + argp->location = -1; + args = list_make1(argp); + + for (i = 0; i < num_finalfn_inputs - 1; i++) { + argp = makeNode(Param); + argp->paramkind = PARAM_EXEC; + argp->paramid = -1; + argp->paramtype = agg_input_types[i]; + argp->paramtypmod = -1; + argp->paramcollid = agg_input_collation; + argp->location = -1; + args = lappend(args, argp); + } + + *finalfnexpr = + (Expr *)makeFuncExpr(finalfn_oid, agg_result_type, args, InvalidOid, agg_input_collation, COERCE_EXPLICIT_CALL); +} + /* * Expand a groupingSets clause to a flat list of grouping sets. * The returned list is sorted by length, shortest sets first. diff --git a/src/common/backend/utils/adt/orderedsetaggs.cpp b/src/common/backend/utils/adt/orderedsetaggs.cpp index 161c0054e..db14c094a 100644 --- a/src/common/backend/utils/adt/orderedsetaggs.cpp +++ b/src/common/backend/utils/adt/orderedsetaggs.cpp @@ -115,9 +115,17 @@ static OrderedSetAggState* ordered_set_startup(PG_FUNCTION_ARGS) /* Need the agg info to perform sort */ if (IsA(fcinfo->context, AggState)) { aggstate = (AggState*)fcinfo->context; - AggStatePerAgg curperagg = aggstate->curperagg; - if (curperagg != NULL) - aggref = curperagg->aggref; + if (aggstate->ss.ps.state->es_is_flt_frame) { + AggStatePerTrans curpertrans = aggstate->curpertrans; + if (curpertrans != NULL) { + aggref = curpertrans->aggref; + } + } else { + AggStatePerAgg curperagg = aggstate->curperagg; + if (curperagg != NULL) { + aggref = curperagg->aggref; + } + } } if (aggref == NULL || aggstate == NULL) diff --git a/src/common/pl/plpgsql/src/pl_exec.cpp b/src/common/pl/plpgsql/src/pl_exec.cpp index 3591a9bfc..d94330c88 100644 --- a/src/common/pl/plpgsql/src/pl_exec.cpp +++ b/src/common/pl/plpgsql/src/pl_exec.cpp @@ -1244,7 +1244,7 @@ Datum plpgsql_exec_function(PLpgSQL_function* func, FunctionCallInfo fcinfo, boo char *argmodes = NULL; HeapTuple procTup = NULL; Datum proArgModes = 0; - if (func->fn_nargs != fcinfo->nargs && !enable_out_param_override()) { + if (func->fn_nargs != fcinfo->nargs && !enable_out_param_override() && !estate.is_flt_frame) { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmodule(MOD_PLSQL), @@ -5693,6 +5693,8 @@ void plpgsql_estate_setup(PLpgSQL_execstate* estate, PLpgSQL_function* func, Ret estate->curr_nested_table_type = InvalidOid; estate->is_exception = false; + estate->is_flt_frame = (u_sess->attr.attr_common.enable_expr_fusion && u_sess->attr.attr_sql.query_dop_tmp == 1); + /* * Create an EState and ExprContext for evaluation of simple expressions. */ diff --git a/src/gausskernel/runtime/codegen/vecexecutor/vechashaggcodegen.cpp b/src/gausskernel/runtime/codegen/vecexecutor/vechashaggcodegen.cpp index 2f0ca6535..30c698970 100644 --- a/src/gausskernel/runtime/codegen/vecexecutor/vechashaggcodegen.cpp +++ b/src/gausskernel/runtime/codegen/vecexecutor/vechashaggcodegen.cpp @@ -366,7 +366,7 @@ bool VecHashAggCodeGen::BatchAggJittable(VecAggState* node, bool isSonic) { int i = 0; VecAgg* vecagg = (VecAgg*)(node->ss.ps.plan); - AggStatePerAgg peragg = node->peragg; + VecAggStatePerAgg peragg = node->pervecagg; if (!u_sess->attr.attr_sql.enable_codegen || IS_PGXC_COORDINATOR) return false; @@ -1275,7 +1275,7 @@ llvm::Function* VecHashAggCodeGen::BatchAggregationCodeGen(VecAggState* node, bo { /* First get the basic information of VecAggState */ int numaggs = node->numaggs; - AggStatePerAgg peragg = node->peragg; + VecAggStatePerAgg peragg = node->pervecagg; Assert(NULL != (GsCodeGen*)t_thrd.codegen_cxt.thr_codegen_obj); GsCodeGen* llvmCodeGen = (GsCodeGen*)t_thrd.codegen_cxt.thr_codegen_obj; @@ -1391,7 +1391,7 @@ llvm::Function* VecHashAggCodeGen::BatchAggregationCodeGen(VecAggState* node, bo llvm::Value** econtext = (llvm::Value**)palloc(sizeof(llvm::Value*) * numaggs); for (i = 0; i < numaggs; i++) { econtext[i] = NULL; - AggStatePerAgg peraggstate = &node->peragg[numaggs - i - 1]; + VecAggStatePerAgg peraggstate = &node->pervecagg[numaggs - i - 1]; aggref = (Aggref*)(peraggstate->aggref); if (peraggstate->evalproj != NULL && aggref->aggfnoid != COUNTOID) { ExprContext* exprcontext = peragg[numaggs - 1 - i].evalproj->pi_exprContext; @@ -1448,7 +1448,7 @@ llvm::Function* VecHashAggCodeGen::BatchAggregationCodeGen(VecAggState* node, bo */ for (i = 0; i < numaggs; i++) { int numSimpleVars = 0; - AggStatePerAgg peraggstate = &node->peragg[numaggs - i - 1]; + VecAggStatePerAgg peraggstate = &node->pervecagg[numaggs - i - 1]; aggref = (Aggref*)(peraggstate->aggref); ProjectionInfo* projInfo = (ProjectionInfo*)(peraggstate->evalproj); @@ -1512,7 +1512,7 @@ llvm::Function* VecHashAggCodeGen::BatchAggregationCodeGen(VecAggState* node, bo llvm::BasicBlock* numsum_bb = NULL; /* the inverse order */ - AggStatePerAgg peraggstate = &node->peragg[numaggs - i - 1]; + VecAggStatePerAgg peraggstate = &node->pervecagg[numaggs - i - 1]; aggref = (Aggref*)(peraggstate->aggref); ProjectionInfo* projInfo = (ProjectionInfo*)(peraggstate->evalproj); @@ -3512,7 +3512,7 @@ llvm::Function* VecHashAggCodeGen::SonicBatchAggregationCodeGen(VecAggState* nod */ for (i = 0; i < numaggs; i++) { int numSimpleVars = 0; - AggStatePerAgg peraggstate = &node->peragg[numaggs - i - 1]; + VecAggStatePerAgg peraggstate = &node->pervecagg[numaggs - i - 1]; aggref = (Aggref*)(peraggstate->aggref); ProjectionInfo* projInfo = (ProjectionInfo*)(peraggstate->evalproj); @@ -3565,7 +3565,7 @@ llvm::Function* VecHashAggCodeGen::SonicBatchAggregationCodeGen(VecAggState* nod llvm::BasicBlock* numsum_bb = NULL; /* the inverse order */ - AggStatePerAgg peraggstate = &node->peragg[numaggs - i - 1]; + VecAggStatePerAgg peraggstate = &node->pervecagg[numaggs - i - 1]; aggref = (Aggref*)(peraggstate->aggref); ProjectionInfo* projInfo = (ProjectionInfo*)(peraggstate->evalproj); diff --git a/src/gausskernel/runtime/executor/execExpr.cpp b/src/gausskernel/runtime/executor/execExpr.cpp index a107c2c60..9f3ea126f 100644 --- a/src/gausskernel/runtime/executor/execExpr.cpp +++ b/src/gausskernel/runtime/executor/execExpr.cpp @@ -48,6 +48,9 @@ static bool isAssignmentIndirectionExpr(Expr *expr); static void ExecInitCoerceToDomain(ExprEvalStep *scratch, CoerceToDomain *ctest, ExprState *state, Datum *resv, bool *resnull, Expr *node); +static void ExecBuildAggTransCall(ExprState *state, AggState *aggstate, ExprEvalStep *scratch, FunctionCallInfo fcinfo, + AggStatePerTrans pertrans, int transno, int setno, int setoff, bool ishash, + bool iscollect); /* * ExecInitExpr: prepare an expression tree for execution @@ -2782,3 +2785,290 @@ ExecInitCoerceToDomain(ExprEvalStep *scratch, CoerceToDomain *ctest, } } } + +/* + * Build transition/combine function invocations for all aggregate transition + * / combination function invocations in a grouping sets phase. This has to + * invoke all sort based transitions in a phase (if doSort is true), all hash + * based transitions (if doHash is true), or both (both true). + * + * The resulting expression will, for each set of transition values, first + * check for filters, evaluate aggregate input, check that that input is not + * NULL for a strict transition function, and then finally invoke the + * transition for each of the concurrently computed grouping sets. + */ +ExprState *ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, bool doSort, bool doHash) +{ + ExprState *state = makeNode(ExprState); + PlanState *parent = &aggstate->ss.ps; + ExprEvalStep scratch; + int transno = 0; + int setoff = 0; + LastAttnumInfo deform = {0, 0, 0}; + + state->expr = (Expr *)aggstate; + state->parent = parent; + + scratch.resvalue = &state->resvalue; + scratch.resnull = &state->resnull; + + /* + * First figure out which slots, and how many columns from each, we're + * going to need. + */ + for (transno = 0; transno < aggstate->numtrans; transno++) { + AggStatePerTrans pertrans = &aggstate->pertrans[transno]; + + get_last_attnums_walker((Node *)pertrans->aggref->aggdirectargs, &deform); + get_last_attnums_walker((Node *)pertrans->aggref->args, &deform); + get_last_attnums_walker((Node *)pertrans->aggref->aggorder, &deform); + get_last_attnums_walker((Node *)pertrans->aggref->aggdistinct, &deform); + // get_last_attnums_walker((Node *) pertrans->aggref->aggfilter, &deform); + } + ExecPushExprSlots(state, &deform); + + /* + * Emit instructions for each transition value / grouping set combination. + */ + for (transno = 0; transno < aggstate->numtrans; transno++) { + AggStatePerTrans pertrans = &aggstate->pertrans[transno]; + int numInputs = pertrans->numInputs; + int argno; + int setno; + bool isCollect = ((pertrans->aggref->aggstage > 0 || aggstate->is_final) && + need_adjust_agg_inner_func_type(pertrans->aggref) && pertrans->numSortCols == 0); + FunctionCallInfo trans_fcinfo = &pertrans->transfn_fcinfo; + FunctionCallInfo collect_fcinfo = &pertrans->collectfn_fcinfo; + ListCell *arg; + ListCell *bail; + List *adjust_bailout = NIL; + bool *strictnulls = NULL; + + /* + * Evaluate arguments to aggregate/combine function. + */ + argno = 0; + /*process the collect function*/ + if (isCollect) { + /* + * like Normal transition function below + */ + strictnulls = collect_fcinfo->argnull + 1; + + foreach (arg, pertrans->aggref->args) { + TargetEntry *source_tle = (TargetEntry *)lfirst(arg); + + /* + * Start from 1, since the 0th arg will be the transition + * value + */ + ExecInitExprRec(source_tle->expr, state, &collect_fcinfo->arg[argno + 1], + &collect_fcinfo->argnull[argno + 1], NULL); + argno++; + } + } else if (pertrans->numSortCols == 0) { + /* + * Normal transition function without ORDER BY / DISTINCT. + */ + strictnulls = trans_fcinfo->argnull + 1; + + foreach (arg, pertrans->aggref->args) { + TargetEntry *source_tle = (TargetEntry *)lfirst(arg); + + /* + * Start from 1, since the 0th arg will be the transition + * value + */ + ExecInitExprRec(source_tle->expr, state, &trans_fcinfo->arg[argno + 1], + &trans_fcinfo->argnull[argno + 1], NULL); + argno++; + } + } else if (pertrans->numInputs == 1) { + /* + * DISTINCT and/or ORDER BY case, with a single column sorted on. + */ + TargetEntry *source_tle = (TargetEntry *)linitial(pertrans->aggref->args); + + Assert(list_length(pertrans->aggref->args) == 1); + + ExecInitExprRec(source_tle->expr, state, &state->resvalue, &state->resnull, NULL); + strictnulls = &state->resnull; + argno++; + } else { + /* + * DISTINCT and/or ORDER BY case, with multiple columns sorted on. + */ + Datum *values = pertrans->sortslot->tts_values; + bool *nulls = pertrans->sortslot->tts_isnull; + + strictnulls = nulls; + + foreach (arg, pertrans->aggref->args) { + TargetEntry *source_tle = (TargetEntry *)lfirst(arg); + + ExecInitExprRec(source_tle->expr, state, &values[argno], &nulls[argno], NULL); + argno++; + } + } + Assert(numInputs == argno); + + /* + * For a strict transfn, nothing happens when there's a NULL input; we + * just keep the prior transValue. This is true for both plain and + * sorted/distinct aggregates. + */ + if (((!isCollect && trans_fcinfo->flinfo->fn_strict) || (isCollect && collect_fcinfo->flinfo->fn_strict)) && + pertrans->numTransInputs > 0) { + scratch.opcode = EEOP_AGG_STRICT_INPUT_CHECK; + scratch.d.agg_strict_input_check.nulls = strictnulls; + scratch.d.agg_strict_input_check.jumpnull = -1; /* adjust later */ + scratch.d.agg_strict_input_check.nargs = pertrans->numTransInputs; + ExprEvalPushStep(state, &scratch); + adjust_bailout = lappend_int(adjust_bailout, state->steps_len - 1); + } + + /* + * Call transition function (once for each concurrently evaluated + * grouping set). Do so for both sort and hash based computations, as + * applicable. + */ + setoff = 0; + if (doSort) { + int processGroupingSets = Max(phase->numsets, 1); + + for (setno = 0; setno < processGroupingSets; setno++) { + ExecBuildAggTransCall(state, aggstate, &scratch, isCollect ? collect_fcinfo : trans_fcinfo, pertrans, + transno, setno, setoff, false, isCollect); + setoff++; + } + } + + if (doHash) { + int numHashes = aggstate->num_hashes; + + /* in MIXED mode, there'll be preceding transition values */ + if (aggstate->aggstrategy != AGG_HASHED) + setoff = aggstate->maxsets; + else + setoff = 0; + + for (setno = 0; setno < numHashes; setno++) { + ExecBuildAggTransCall(state, aggstate, &scratch, isCollect ? collect_fcinfo : trans_fcinfo, pertrans, + transno, setno, setoff, true, isCollect); + setoff++; + } + } + + /* adjust early bail out jump target(s) */ + foreach (bail, adjust_bailout) { + ExprEvalStep *as = &state->steps[lfirst_int(bail)]; + + if (as->opcode == EEOP_JUMP_IF_NOT_TRUE) { + Assert(as->d.jump.jumpdone == -1); + as->d.jump.jumpdone = state->steps_len; + } else if (as->opcode == EEOP_AGG_STRICT_INPUT_CHECK) { + Assert(as->d.agg_strict_input_check.jumpnull == -1); + as->d.agg_strict_input_check.jumpnull = state->steps_len; + } else if (as->opcode == EEOP_AGG_STRICT_DESERIALIZE) { + Assert(as->d.agg_deserialize.jumpnull == -1); + as->d.agg_deserialize.jumpnull = state->steps_len; + } + } + } + + scratch.resvalue = NULL; + scratch.resnull = NULL; + scratch.opcode = EEOP_DONE; + ExprEvalPushStep(state, &scratch); + + ExecReadyExpr(state); + + return state; +} + +/* + * Build transition/combine function invocation for a single transition + * value. This is separated from ExecBuildAggTrans() because there are + * multiple callsites (hash and sort in some grouping set cases). + */ +static void ExecBuildAggTransCall(ExprState *state, AggState *aggstate, ExprEvalStep *scratch, FunctionCallInfo fcinfo, + AggStatePerTrans pertrans, int transno, int setno, int setoff, bool ishash, + bool iscollect) +{ + int adjust_init_jumpnull = -1; + int adjust_strict_jumpnull = -1; + MemoryContext aggcontext; + + aggcontext = aggstate->aggcontexts[setno]; + + /* + * If the initial value for the transition state doesn't exist in the + * pg_aggregate table then we will let the first non-NULL value returned + * from the outer procNode become the initial value. (This is useful for + * aggregates like max() and min().) The noTransValue flag signals that we + * still need to do this. + */ + if (pertrans->numSortCols == 0 && fcinfo->flinfo->fn_strict && pertrans->initValueIsNull) { + scratch->opcode = iscollect ? EEOP_AGG_COLLECT_INIT_TRANS : EEOP_AGG_INIT_TRANS; + scratch->d.agg_init_trans.aggstate = aggstate; + scratch->d.agg_init_trans.pertrans = pertrans; + scratch->d.agg_init_trans.setno = setno; + scratch->d.agg_init_trans.setoff = setoff; + scratch->d.agg_init_trans.transno = transno; + scratch->d.agg_init_trans.aggcontext = aggcontext; + scratch->d.agg_init_trans.jumpnull = -1; /* adjust later */ + ExprEvalPushStep(state, scratch); + + /* see comment about jumping out below */ + adjust_init_jumpnull = state->steps_len - 1; + } + + if (pertrans->numSortCols == 0 && fcinfo->flinfo->fn_strict) { + scratch->opcode = iscollect ? EEOP_AGG_COLLECT_STRICT_TRANS_CHECK : EEOP_AGG_STRICT_TRANS_CHECK; + scratch->d.agg_strict_trans_check.aggstate = aggstate; + scratch->d.agg_strict_trans_check.setno = setno; + scratch->d.agg_strict_trans_check.setoff = setoff; + scratch->d.agg_strict_trans_check.transno = transno; + scratch->d.agg_strict_trans_check.jumpnull = -1; /* adjust later */ + ExprEvalPushStep(state, scratch); + + /* + * Note, we don't push into adjust_bailout here - those jump to the + * end of all transition value computations. Here a single transition + * value is NULL, so just skip processing the individual value. + */ + adjust_strict_jumpnull = state->steps_len - 1; + } + + /* invoke appropriate transition implementation */ + if (pertrans->numSortCols == 0 && pertrans->transtypeByVal) + scratch->opcode = iscollect ? EEOP_AGG_COLLECT_PLAIN_TRANS_BYVAL : EEOP_AGG_PLAIN_TRANS_BYVAL; + else if (pertrans->numSortCols == 0) + scratch->opcode = iscollect ? EEOP_AGG_COLLECT_PLAIN_TRANS : EEOP_AGG_PLAIN_TRANS; + else if (pertrans->numInputs == 1) + scratch->opcode = EEOP_AGG_ORDERED_TRANS_DATUM; + else + scratch->opcode = EEOP_AGG_ORDERED_TRANS_TUPLE; + + scratch->d.agg_trans.aggstate = aggstate; + scratch->d.agg_trans.pertrans = pertrans; + scratch->d.agg_trans.setno = setno; + scratch->d.agg_trans.setoff = setoff; + scratch->d.agg_trans.transno = transno; + scratch->d.agg_trans.aggcontext = aggcontext; + ExprEvalPushStep(state, scratch); + + /* adjust jumps so they jump till after transition invocation */ + if (adjust_init_jumpnull != -1) { + ExprEvalStep *as = &state->steps[adjust_init_jumpnull]; + + Assert(as->d.agg_init_trans.jumpnull == -1); + as->d.agg_init_trans.jumpnull = state->steps_len; + } + if (adjust_strict_jumpnull != -1) { + ExprEvalStep *as = &state->steps[adjust_strict_jumpnull]; + + Assert(as->d.agg_strict_trans_check.jumpnull == -1); + as->d.agg_strict_trans_check.jumpnull = state->steps_len; + } +} \ No newline at end of file diff --git a/src/gausskernel/runtime/executor/execExprInterp.cpp b/src/gausskernel/runtime/executor/execExprInterp.cpp index 37f1b275b..602f9abba 100644 --- a/src/gausskernel/runtime/executor/execExprInterp.cpp +++ b/src/gausskernel/runtime/executor/execExprInterp.cpp @@ -592,6 +592,19 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull, ExprDoneCo &&CASE_EEOP_USERSET_ELEM, &&CASE_EEOP_PREFIX_BTYEA, &&CASE_EEOP_PREFIX_TEXT, + &&CASE_EEOP_AGG_STRICT_DESERIALIZE, + &&CASE_EEOP_AGG_DESERIALIZE, + &&CASE_EEOP_AGG_STRICT_INPUT_CHECK, + &&CASE_EEOP_AGG_INIT_TRANS, + &&CASE_EEOP_AGG_COLLECT_INIT_TRANS, + &&CASE_EEOP_AGG_STRICT_TRANS_CHECK, + &&CASE_EEOP_AGG_COLLECT_STRICT_TRANS_CHECK, + &&CASE_EEOP_AGG_PLAIN_TRANS_BYVAL, + &&CASE_EEOP_AGG_COLLECT_PLAIN_TRANS_BYVAL, + &&CASE_EEOP_AGG_PLAIN_TRANS, + &&CASE_EEOP_AGG_COLLECT_PLAIN_TRANS, + &&CASE_EEOP_AGG_ORDERED_TRANS_DATUM, + &&CASE_EEOP_AGG_ORDERED_TRANS_TUPLE, &&CASE_EEOP_LAST }; @@ -1564,6 +1577,375 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull, ExprDoneCo EEO_NEXT(); } + /* evaluate a strict aggregate deserialization function */ + EEO_CASE(EEOP_AGG_STRICT_DESERIALIZE) + { + bool *argnull = op->d.agg_deserialize.fcinfo_data->argnull; + + /* Don't call a strict deserialization function with NULL input */ + if (argnull[0]) + EEO_JUMP(op->d.agg_deserialize.jumpnull); + + /* fallthrough */ + } + + /* evaluate aggregate deserialization function (non-strict portion) */ + EEO_CASE(EEOP_AGG_DESERIALIZE) + { + FunctionCallInfo fcinfo = op->d.agg_deserialize.fcinfo_data; + AggState *aggstate = op->d.agg_deserialize.aggstate; + MemoryContext oldContext; + + /* + * We run the deserialization functions in per-input-tuple memory + * context. + */ + oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); + fcinfo->isnull = false; + *op->resvalue = FunctionCallInvoke(fcinfo); + *op->resnull = fcinfo->isnull; + MemoryContextSwitchTo(oldContext); + + EEO_NEXT(); + } + + /* + * Check that a strict aggregate transition / combination function's + * input is not NULL. + */ + EEO_CASE(EEOP_AGG_STRICT_INPUT_CHECK) + { + int argno; + bool *nulls = op->d.agg_strict_input_check.nulls; + int nargs = op->d.agg_strict_input_check.nargs; + + for (argno = 0; argno < nargs; argno++) { + if (nulls[argno]) + EEO_JUMP(op->d.agg_strict_input_check.jumpnull); + } + EEO_NEXT(); + } + + /* + * Initialize an aggregate's first value if necessary. + */ + EEO_CASE(EEOP_AGG_INIT_TRANS) + { + AggState *aggstate; + AggStatePerGroup pergroup; + + aggstate = op->d.agg_init_trans.aggstate; + pergroup = + &aggstate + ->all_pergroups[op->d.agg_init_trans.setoff * aggstate->numtrans + op->d.agg_init_trans.transno]; + + /* If transValue has not yet been initialized, do so now. */ + if (pergroup->noTransValue) { + AggStatePerTrans pertrans = op->d.agg_init_trans.pertrans; + + aggstate->curaggcontext = op->d.agg_init_trans.aggcontext; + aggstate->current_set = op->d.agg_init_trans.setno; + + ExecAggInitGroup(aggstate, pertrans, pergroup, op->d.agg_init_trans.aggcontext); + + /* copied trans value from input, done this round */ + EEO_JUMP(op->d.agg_init_trans.jumpnull); + } + + EEO_NEXT(); + } + /* + * Initialize an aggregate's collect first value if necessary. + */ + EEO_CASE(EEOP_AGG_COLLECT_INIT_TRANS) + { + AggState *aggstate; + AggStatePerGroup pergroup; + + aggstate = op->d.agg_init_trans.aggstate; + pergroup = + &aggstate + ->all_pergroups[op->d.agg_init_trans.setoff * aggstate->numtrans + op->d.agg_init_trans.transno]; + + /* If transValue has not yet been initialized, do so now. */ + if (pergroup->noCollectValue) { + AggStatePerTrans pertrans = op->d.agg_init_trans.pertrans; + + aggstate->curaggcontext = op->d.agg_init_trans.aggcontext; + aggstate->current_set = op->d.agg_init_trans.setno; + + ExecAggInitCollectGroup(aggstate, pertrans, pergroup, op->d.agg_init_trans.aggcontext); + + /* copied trans value from input, done this round */ + EEO_JUMP(op->d.agg_init_trans.jumpnull); + } + + EEO_NEXT(); + } + + /* check that a strict aggregate's input isn't NULL */ + EEO_CASE(EEOP_AGG_STRICT_TRANS_CHECK) + { + AggState *aggstate; + AggStatePerGroup pergroup; + + aggstate = op->d.agg_strict_trans_check.aggstate; + pergroup = &aggstate->all_pergroups[op->d.agg_strict_trans_check.setoff * aggstate->numtrans + + op->d.agg_strict_trans_check.transno]; + + if (unlikely(pergroup->transValueIsNull)) + EEO_JUMP(op->d.agg_strict_trans_check.jumpnull); + + EEO_NEXT(); + } + /* check that a strict aggregate's collect input isn't NULL */ + EEO_CASE(EEOP_AGG_COLLECT_STRICT_TRANS_CHECK) + { + AggState *aggstate; + AggStatePerGroup pergroup; + + aggstate = op->d.agg_strict_trans_check.aggstate; + pergroup = &aggstate->all_pergroups[op->d.agg_strict_trans_check.setoff * aggstate->numtrans + + op->d.agg_strict_trans_check.transno]; + + if (unlikely(pergroup->collectValueIsNull)) + EEO_JUMP(op->d.agg_strict_trans_check.jumpnull); + + EEO_NEXT(); + } + + /* + * Evaluate aggregate transition / combine function that has a + * by-value transition type. That's a seperate case from the + * by-reference implementation because it's a bit simpler. + */ + EEO_CASE(EEOP_AGG_PLAIN_TRANS_BYVAL) + { + AggState *aggstate; + AggStatePerTrans pertrans; + AggStatePerGroup pergroup; + FunctionCallInfo fcinfo; + MemoryContext oldContext; + Datum newVal; + + aggstate = op->d.agg_trans.aggstate; + pertrans = op->d.agg_trans.pertrans; + + pergroup = &aggstate->all_pergroups[op->d.agg_trans.setoff * aggstate->numtrans + op->d.agg_trans.transno]; + + Assert(pertrans->transtypeByVal); + + fcinfo = &pertrans->transfn_fcinfo; + + /* cf. select_current_set() */ + aggstate->curaggcontext = op->d.agg_trans.aggcontext; + aggstate->current_set = op->d.agg_trans.setno; + + /* set up aggstate->curpertrans for AggGetAggref() */ + aggstate->curpertrans = pertrans; + + /* invoke transition function in per-tuple context */ + oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); + + fcinfo->arg[0] = pergroup->transValue; + fcinfo->argnull[0] = pergroup->transValueIsNull; + fcinfo->isnull = false; /* just in case transfn doesn't set it */ + + newVal = FunctionCallInvoke(fcinfo); + + pergroup->transValue = newVal; + pergroup->transValueIsNull = fcinfo->isnull; + + MemoryContextSwitchTo(oldContext); + + EEO_NEXT(); + } + /* + * Evaluate aggregate collect function that has a + * by-value transition type. That's a seperate case from the + * by-reference implementation because it's a bit simpler. + */ + EEO_CASE(EEOP_AGG_COLLECT_PLAIN_TRANS_BYVAL) + { + AggState *aggstate; + AggStatePerTrans pertrans; + AggStatePerGroup pergroup; + FunctionCallInfo fcinfo; + MemoryContext oldContext; + Datum newVal; + + aggstate = op->d.agg_trans.aggstate; + pertrans = op->d.agg_trans.pertrans; + + pergroup = &aggstate->all_pergroups[op->d.agg_trans.setoff * aggstate->numtrans + op->d.agg_trans.transno]; + + Assert(pertrans->transtypeByVal); + + fcinfo = &pertrans->collectfn_fcinfo; + + /* cf. select_current_set() */ + aggstate->curaggcontext = op->d.agg_trans.aggcontext; + aggstate->current_set = op->d.agg_trans.setno; + + /* set up aggstate->curpertrans for AggGetAggref() */ + aggstate->curpertrans = pertrans; + + /* invoke transition function in per-tuple context */ + oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); + + fcinfo->arg[0] = pergroup->collectValue; + fcinfo->argnull[0] = pergroup->collectValueIsNull; + fcinfo->isnull = false; /* just in case transfn doesn't set it */ + + newVal = FunctionCallInvoke(fcinfo); + + pergroup->collectValue = newVal; + pergroup->collectValueIsNull = fcinfo->isnull; + + MemoryContextSwitchTo(oldContext); + + EEO_NEXT(); + } + + /* + * Evaluate aggregate transition / combine function that has a + * by-reference transition type. + * + * Could optimize a bit further by splitting off by-reference + * fixed-length types, but currently that doesn't seem worth it. + */ + EEO_CASE(EEOP_AGG_PLAIN_TRANS) + { + AggState *aggstate; + AggStatePerTrans pertrans; + AggStatePerGroup pergroup; + FunctionCallInfo fcinfo; + MemoryContext oldContext; + Datum newVal; + + aggstate = op->d.agg_trans.aggstate; + pertrans = op->d.agg_trans.pertrans; + + pergroup = &aggstate->all_pergroups[op->d.agg_trans.setoff * aggstate->numtrans + op->d.agg_trans.transno]; + + Assert(!pertrans->transtypeByVal); + + fcinfo = &pertrans->transfn_fcinfo; + + /* cf. select_current_set() */ + aggstate->curaggcontext = op->d.agg_trans.aggcontext; + aggstate->current_set = op->d.agg_trans.setno; + + /* set up aggstate->curpertrans for AggGetAggref() */ + aggstate->curpertrans = pertrans; + + /* invoke transition function in per-tuple context */ + oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); + + fcinfo->arg[0] = pergroup->transValue; + fcinfo->argnull[0] = pergroup->transValueIsNull; + fcinfo->isnull = false; /* just in case transfn doesn't set it */ + + newVal = FunctionCallInvoke(fcinfo); + + /* + * For pass-by-ref datatype, must copy the new value into + * aggcontext and free the prior transValue. But if transfn + * returned a pointer to its first input, we don't need to do + * anything. Also, if transfn returned a pointer to a R/W + * expanded object that is already a child of the aggcontext, + * assume we can adopt that value without copying it. + */ + if (DatumGetPointer(newVal) != DatumGetPointer(pergroup->transValue)) + newVal = ExecAggTransReparent(aggstate, pertrans, newVal, fcinfo->isnull, pergroup->transValue, + pergroup->transValueIsNull); + + pergroup->transValue = newVal; + pergroup->transValueIsNull = fcinfo->isnull; + + MemoryContextSwitchTo(oldContext); + + EEO_NEXT(); + } + /* + * Evaluate aggregate collect function that has a + * by-reference transition type. + * + * Could optimize a bit further by splitting off by-reference + * fixed-length types, but currently that doesn't seem worth it. + */ + EEO_CASE(EEOP_AGG_COLLECT_PLAIN_TRANS) + { + AggState *aggstate; + AggStatePerTrans pertrans; + AggStatePerGroup pergroup; + FunctionCallInfo fcinfo; + MemoryContext oldContext; + Datum newVal; + + aggstate = op->d.agg_trans.aggstate; + pertrans = op->d.agg_trans.pertrans; + + pergroup = &aggstate->all_pergroups[op->d.agg_trans.setoff * aggstate->numtrans + op->d.agg_trans.transno]; + + Assert(!pertrans->transtypeByVal); + + fcinfo = &pertrans->collectfn_fcinfo; + + /* cf. select_current_set() */ + aggstate->curaggcontext = op->d.agg_trans.aggcontext; + aggstate->current_set = op->d.agg_trans.setno; + + /* set up aggstate->curpertrans for AggGetAggref() */ + aggstate->curpertrans = pertrans; + + /* invoke transition function in per-tuple context */ + oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); + + fcinfo->arg[0] = pergroup->collectValue; + fcinfo->argnull[0] = pergroup->collectValueIsNull; + fcinfo->isnull = false; /* just in case transfn doesn't set it */ + + newVal = FunctionCallInvoke(fcinfo); + + /* + * For pass-by-ref datatype, must copy the new value into + * aggcontext and free the prior transValue. But if transfn + * returned a pointer to its first input, we don't need to do + * anything. Also, if transfn returned a pointer to a R/W + * expanded object that is already a child of the aggcontext, + * assume we can adopt that value without copying it. + */ + if (DatumGetPointer(newVal) != DatumGetPointer(pergroup->transValue)) + newVal = ExecAggTransReparent(aggstate, pertrans, newVal, fcinfo->isnull, pergroup->collectValue, + pergroup->collectValueIsNull); + + pergroup->collectValue = newVal; + pergroup->collectValueIsNull = fcinfo->isnull; + + MemoryContextSwitchTo(oldContext); + + EEO_NEXT(); + } + + /* process single-column ordered aggregate datum */ + EEO_CASE(EEOP_AGG_ORDERED_TRANS_DATUM) + { + /* too complex for an inline implementation */ + ExecEvalAggOrderedTransDatum(state, op, econtext); + + EEO_NEXT(); + } + + /* process multi-column ordered aggregate tuple */ + EEO_CASE(EEOP_AGG_ORDERED_TRANS_TUPLE) + { + /* too complex for an inline implementation */ + ExecEvalAggOrderedTransTuple(state, op, econtext); + + EEO_NEXT(); + } + EEO_CASE(EEOP_XMLEXPR) { /* too complex for an inline implementation */ @@ -3870,3 +4252,96 @@ ExecEvalWholeRowVar(ExprState *state, ExprEvalStep *op, ExprContext *econtext) *op->resnull = false; *op->resvalue = PointerGetDatum(dtuple); } + +void ExecAggInitGroup(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroup, + MemoryContext aggcontext) +{ + FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; + MemoryContext oldContext; + + /* + * We must copy the datum into aggcontext if it is pass-by-ref. We do not + * need to pfree the old transValue, since it's NULL. (We already checked + * that the agg's input type is binary-compatible with its transtype, so + * straight copy here is OK.) + */ + oldContext = MemoryContextSwitchTo(aggcontext); + pergroup->transValue = datumCopy(fcinfo->arg[1], pertrans->transtypeByVal, pertrans->transtypeLen); + pergroup->transValueIsNull = false; + pergroup->noTransValue = false; + MemoryContextSwitchTo(oldContext); +} + +void ExecAggInitCollectGroup(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroup, + MemoryContext aggcontext) +{ + FunctionCallInfo fcinfo = &pertrans->collectfn_fcinfo; + MemoryContext oldContext; + + /* + * We must copy the datum into aggcontext if it is pass-by-ref. We do not + * need to pfree the old transValue, since it's NULL. (We already checked + * that the agg's input type is binary-compatible with its transtype, so + * straight copy here is OK.) + */ + oldContext = MemoryContextSwitchTo(aggcontext); + pergroup->collectValue = datumCopy(fcinfo->arg[1], pertrans->transtypeByVal, pertrans->transtypeLen); + pergroup->collectValueIsNull = false; + pergroup->noCollectValue = false; + MemoryContextSwitchTo(oldContext); +} + +/* + * Ensure that the current transition value is a child of the aggcontext, + * rather than the per-tuple context. + * + * NB: This can change the current memory context. + */ +Datum ExecAggTransReparent(AggState *aggstate, AggStatePerTrans pertrans, Datum newValue, bool newValueIsNull, + Datum oldValue, bool oldValueIsNull) +{ + Assert(newValue != oldValue); + + if (!newValueIsNull) { + MemoryContextSwitchTo(aggstate->curaggcontext); + newValue = datumCopy(newValue, pertrans->transtypeByVal, pertrans->transtypeLen); + } else { + /* + * Ensure that AggStatePerGroup->transValue ends up being 0, so + * callers can safely compare newValue/oldValue without having to + * check their respective nullness. + */ + newValue = (Datum)0; + } + + if (!oldValueIsNull) { + pfree(DatumGetPointer(oldValue)); + } + + return newValue; +} + +/* + * Invoke ordered transition function, with a datum argument. + */ +void ExecEvalAggOrderedTransDatum(ExprState *state, ExprEvalStep *op, ExprContext *econtext) +{ + AggStatePerTrans pertrans = op->d.agg_trans.pertrans; + int setno = op->d.agg_trans.setno; + + tuplesort_putdatum(pertrans->sortstates[setno], *op->resvalue, *op->resnull); +} + +/* + * Invoke ordered transition function, with a tuple argument. + */ +void ExecEvalAggOrderedTransTuple(ExprState *state, ExprEvalStep *op, ExprContext *econtext) +{ + AggStatePerTrans pertrans = op->d.agg_trans.pertrans; + int setno = op->d.agg_trans.setno; + + ExecClearTuple(pertrans->sortslot); + pertrans->sortslot->tts_nvalid = pertrans->numInputs; + ExecStoreVirtualTuple(pertrans->sortslot); + tuplesort_puttupleslot(pertrans->sortstates[setno], pertrans->sortslot); +} \ No newline at end of file diff --git a/src/gausskernel/runtime/executor/nodeAgg.cpp b/src/gausskernel/runtime/executor/nodeAgg.cpp index d5f469302..242dc6f01 100644 --- a/src/gausskernel/runtime/executor/nodeAgg.cpp +++ b/src/gausskernel/runtime/executor/nodeAgg.cpp @@ -175,6 +175,29 @@ static TupleTableSlot* agg_retrieve(AggState* node); static bool prepare_data_source(AggState* node); static TupleTableSlot* fetch_input_tuple(AggState* aggstate); +static void exec_lookups_agg(AggState *aggstate, Agg *node, EState *estate); + +static void initialize_aggregate_flattened(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroup); +static void initialize_aggregates_flattened(AggState *aggstate, AggStatePerGroup pergroup, int numReset = 0); +static void advance_transition_function_flattened(AggState *aggstate, AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate); +static void advance_aggregates_flattened(AggState *aggstate, AggStatePerGroup pergroup); +static void process_ordered_aggregate_single_flattened(AggState *aggstate, AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate); +static void process_ordered_aggregate_multi_flattened(AggState *aggstate, AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate); +static void finalize_aggregate_flattened(AggState *aggstate, AggStatePerAggForFlattenedExpr peragg, + AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull); +static void finalize_aggregates_flattened(AggState *aggstate, AggStatePerAggForFlattenedExpr peraggs, + AggStatePerGroup pergroup, int currentSet); +static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggsate, EState *estate, Aggref *aggref, + Oid aggtransfn, Oid aggtranstype, Datum initValue, bool initValueIsNull, + Oid *inputTypes, int numArguments); +static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, int lastaggno, List **same_input_transnos); +static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, Oid aggtransfn, Oid aggtranstype, + Datum initValue, bool initValueIsNull, List *possible_matches); +static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *estate); + /* * Switch to phase "newphase", which must either be 0 (to reset) or * current_phase + 1. Juggle the tuplesorts accordingly. @@ -1344,7 +1367,11 @@ static AggHashEntry lookup_hash_entry(AggState* aggstate, TupleTableSlot* inputs /* this slot is new and has be inserted to hash table */ if (entry) { /* initialize aggregates for new tuple group */ - initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup); + if (aggstate->ss.ps.state->es_is_flt_frame) { + initialize_aggregates_flattened(aggstate, entry->pergroup); + } else { + initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup); + } agg_spill_to_disk(TempFileControl, aggstate->hashtable, aggstate->hashslot, @@ -1377,6 +1404,14 @@ static AggHashEntry lookup_hash_entry(AggState* aggstate, TupleTableSlot* inputs errmsg("more than one row returned by a subquery used as an expression"))); } + if (aggstate->ss.ps.state->es_is_flt_frame) { + if (entry) { + aggstate->all_pergroups = entry->pergroup; + } else { + aggstate->all_pergroups = NULL; + } + } + return entry; } @@ -1527,7 +1562,8 @@ static TupleTableSlot* agg_retrieve_direct(AggState* aggstate) Agg* node = aggstate->phase->aggnode; ExprContext* econtext = NULL; ExprContext* tmpcontext = NULL; - AggStatePerAgg peragg; + AggStatePerAgg peragg = NULL; + AggStatePerAggForFlattenedExpr peragg_flattened = NULL; AggStatePerGroup pergroup; TupleTableSlot* outerslot = NULL; TupleTableSlot* firstSlot = NULL; @@ -1548,7 +1584,12 @@ static TupleTableSlot* agg_retrieve_direct(AggState* aggstate) econtext = aggstate->ss.ps.ps_ExprContext; tmpcontext = aggstate->tmpcontext; - peragg = aggstate->peragg; + + if (aggstate->ss.ps.state->es_is_flt_frame) { + peragg_flattened = aggstate->peragg_flattened; + } else { + peragg = aggstate->peragg; + } pergroup = aggstate->pergroup; firstSlot = aggstate->ss.ss_ScanTupleSlot; @@ -1717,7 +1758,11 @@ static TupleTableSlot* agg_retrieve_direct(AggState* aggstate) /* * Initialize working state for a new input tuple group. */ - initialize_aggregates(aggstate, peragg, pergroup, numReset); + if (aggstate->ss.ps.state->es_is_flt_frame) { + initialize_aggregates_flattened(aggstate, pergroup, numReset); + } else { + initialize_aggregates(aggstate, peragg, pergroup, numReset); + } if (aggstate->grp_firstTuple != NULL) { /* @@ -1736,7 +1781,11 @@ static TupleTableSlot* agg_retrieve_direct(AggState* aggstate) * until we exhaust the outer plan or cross a group boundary. */ for (;;) { - advance_aggregates(aggstate, pergroup); + if (aggstate->ss.ps.state->es_is_flt_frame) { + advance_aggregates_flattened(aggstate, pergroup); + } else { + advance_aggregates(aggstate, pergroup); + } /* Reset per-input-tuple context after each tuple */ ResetExprContext(tmpcontext); @@ -1786,7 +1835,11 @@ static TupleTableSlot* agg_retrieve_direct(AggState* aggstate) prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet); - finalize_aggregates(aggstate, peragg, pergroup, currentSet); + if (aggstate->ss.ps.state->es_is_flt_frame) { + finalize_aggregates_flattened(aggstate, peragg_flattened, pergroup, currentSet); + } else { + finalize_aggregates(aggstate, peragg, pergroup, currentSet); + } /* * If there's no row to project right now, we must continue rather @@ -1850,7 +1903,11 @@ static void agg_fill_hash_table(AggState* aggstate) if (entry != NULL) { /* Advance the aggregates */ - advance_aggregates(aggstate, entry->pergroup); + if (aggstate->ss.ps.state->es_is_flt_frame) { + advance_aggregates_flattened(aggstate, entry->pergroup); + } else { + advance_aggregates(aggstate, entry->pergroup); + } } else { /* this outerslot is inserted to temp table, it will be compute when the temp file be readed */ } @@ -1888,6 +1945,7 @@ static TupleTableSlot* agg_retrieve_hash_table(AggState* aggstate) { ExprContext* econtext = NULL; AggStatePerAgg peragg; + AggStatePerAggForFlattenedExpr peragg_flattened = NULL; AggStatePerGroup pergroup; AggHashEntry entry; TupleTableSlot* firstSlot = NULL; @@ -1898,7 +1956,11 @@ static TupleTableSlot* agg_retrieve_hash_table(AggState* aggstate) */ /* econtext is the per-output-tuple expression context */ econtext = aggstate->ss.ps.ps_ExprContext; - peragg = aggstate->peragg; + if (aggstate->ss.ps.state->es_is_flt_frame) { + peragg_flattened = aggstate->peragg_flattened; + } else { + peragg = aggstate->peragg; + } firstSlot = aggstate->ss.ss_ScanTupleSlot; /* @@ -1933,7 +1995,11 @@ static TupleTableSlot* agg_retrieve_hash_table(AggState* aggstate) * Finalize each aggregate calculation, and stash results in the * per-output-tuple context. */ - finalize_aggregates(aggstate, peragg, pergroup, 0); + if (aggstate->ss.ps.state->es_is_flt_frame) { + finalize_aggregates_flattened(aggstate, peragg_flattened, pergroup, 0); + } else { + finalize_aggregates(aggstate, peragg, pergroup, 0); + } /* * Use the representative input tuple for any references to @@ -1966,17 +2032,16 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) { AggState* aggstate = NULL; AggStatePerAgg peragg; + AggStatePerAggForFlattenedExpr peragg_flattened; + AggStatePerTrans pertransstates; Plan* outerPlan = NULL; ExprContext* econtext = NULL; - int numaggs, aggno; + int numaggs; int phase; - List *combined_inputeval; ListCell* l = NULL; Bitmapset* all_grouped_cols = NULL; int numGroupingSets = 1; int numPhases; - int column_offset; - int currentsortno = 0; int i = 0; int j = 0; @@ -2008,6 +2073,15 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) aggstate->is_final = node->is_final; aggstate->ss.ps.ExecProcNode = ExecAgg; + if (aggstate->ss.ps.state->es_is_flt_frame) { + aggstate->numtrans = 0; + aggstate->aggstrategy = node->aggstrategy; + aggstate->peragg_flattened = NULL; + aggstate->pertrans = NULL; + aggstate->curpertrans = NULL; + aggstate->num_hashes = (node->aggstrategy == AGG_HASHED) ? 1 : 0; + } + /* * Calculate the maximum number of grouping sets in any phase; this * determines the size of some allocations. @@ -2191,6 +2265,9 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) } phasedata->aggnode = aggnode; + if (aggstate->ss.ps.state->es_is_flt_frame) { + phasedata->aggstrategy = aggstate->aggstrategy; + } phasedata->sortnode = sortnode; } @@ -2222,8 +2299,15 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) econtext->ecxt_aggvalues = (Datum*)palloc0(sizeof(Datum) * numaggs); econtext->ecxt_aggnulls = (bool*)palloc0(sizeof(bool) * numaggs); - peragg = (AggStatePerAgg)palloc0(sizeof(AggStatePerAggData) * numaggs); - aggstate->peragg = peragg; + if (aggstate->ss.ps.state->es_is_flt_frame) { + peragg_flattened = (AggStatePerAggForFlattenedExpr)palloc0(sizeof(AggStatePerAggForFlattenedExprData) * numaggs); + pertransstates = (AggStatePerTrans)palloc0(sizeof(AggStatePerTransData) * numaggs); + aggstate->peragg_flattened = peragg_flattened; + aggstate->pertrans = pertransstates; + } else { + peragg = (AggStatePerAgg)palloc0(sizeof(AggStatePerAggData) * numaggs); + aggstate->peragg = peragg; + } if (node->aggstrategy == AGG_HASHED) { aggstate->table_filled = false; @@ -2234,416 +2318,15 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) pergroup = (AggStatePerGroup)palloc0(sizeof(AggStatePerGroupData) * numaggs * numGroupingSets); aggstate->pergroup = pergroup; + aggstate->all_pergroups = pergroup; } - /* - * Perform lookups of aggregate function info, and initialize the - * unchanging fields of the per-agg data. We also detect duplicate - * aggregates (for example, "SELECT sum(x) ... HAVING sum(x) > 0"). When - * duplicates are detected, we only make an AggStatePerAgg struct for the - * first one. The clones are simply pointed at the same result entry by - * giving them duplicate aggno values. - */ - aggno = -1; - foreach (l, aggstate->aggs) { - AggrefExprState* aggrefstate = (AggrefExprState*)lfirst(l); - Aggref* aggref = (Aggref*)aggrefstate->xprstate.expr; - AggStatePerAgg peraggstate; - Oid inputTypes[FUNC_MAX_ARGS]; - int numArguments; - int numDirectArgs; - int numInputs; - int numSortCols; - int numDistinctCols; - List* sortlist = NIL; - HeapTuple aggTuple; - Form_pg_aggregate aggform; - Oid aggtranstype; - AclResult aclresult; - Oid transfn_oid, finalfn_oid; -#ifdef PGXC - Oid collectfn_oid; - Expr* collectfnexpr = NULL; -#endif /* PGXC */ - Expr* transfnexpr = NULL; - Expr* finalfnexpr = NULL; - Datum textInitVal; - ListCell* lc = NULL; - - /* Planner should have assigned aggregate to correct level */ - Assert(aggref->agglevelsup == 0); - - /* Look for a previous duplicate aggregate */ - for (i = 0; i <= aggno; i++) { - if (equal(aggref, peragg[i].aggref) && !contain_volatile_functions((Node*)aggref)) - break; - } - if (i <= aggno) { - /* Found a match to an existing entry, so just mark it */ - aggrefstate->aggno = i; - continue; - } - - /* Nope, so assign a new PerAgg record */ - peraggstate = &peragg[++aggno]; - - /* Mark Aggref state node with assigned index in the result array */ - aggrefstate->aggno = aggno; - - /* Fill in the peraggstate data */ - peraggstate->aggrefstate = aggrefstate; - peraggstate->aggref = aggref; - - peraggstate->sortstates = (Tuplesortstate**)palloc0(sizeof(Tuplesortstate*) * numGroupingSets); - - for (currentsortno = 0; currentsortno < numGroupingSets; currentsortno++) - peraggstate->sortstates[currentsortno] = NULL; - - /* Fetch the pg_aggregate row */ - aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid)); - if (!HeapTupleIsValid(aggTuple)) - ereport(ERROR, - (errcode(ERRCODE_CACHE_LOOKUP_FAILED), - errmodule(MOD_EXECUTOR), - errmsg("cache lookup failed for aggregate %u", aggref->aggfnoid))); - aggform = (Form_pg_aggregate)GETSTRUCT(aggTuple); - - /* Check permission to call aggregate function */ - aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), ACL_EXECUTE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(aggref->aggfnoid)); - - peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; - peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; -#ifdef PGXC - peraggstate->collectfn_oid = collectfn_oid = aggform->aggcollectfn; - - peraggstate->is_avg = false; - if (finalfn_oid == 1830) { - peraggstate->is_avg = true; - } -#ifdef ENABLE_MULTIPLE_NODES - /* - * For PGXC final and collection functions are used to combine results at Coordinator, - * disable those for Datanode - */ - if (IS_PGXC_DATANODE) { - if (!u_sess->exec_cxt.under_stream_runtime) { - peraggstate->finalfn_oid = finalfn_oid = InvalidOid; - peraggstate->collectfn_oid = collectfn_oid = InvalidOid; - } else { - if (need_adjust_agg_inner_func_type(peraggstate->aggref)) { - if (!node->is_final && !node->single_node) - peraggstate->finalfn_oid = finalfn_oid = InvalidOid; - if (aggref->aggstage == 0 && !node->is_final && !node->single_node) - peraggstate->collectfn_oid = collectfn_oid = InvalidOid; - } - } - } -#else - if (IS_STREAM_PLAN || StreamThreadAmI()) { - if (need_adjust_agg_inner_func_type(peraggstate->aggref)) { - if (!node->is_final && !node->single_node) { - peraggstate->finalfn_oid = finalfn_oid = InvalidOid; - } - if (aggref->aggstage == 0 && !node->is_final && !node->single_node) { - peraggstate->collectfn_oid = collectfn_oid = InvalidOid; - } - } - } -#endif /* ENABLE_MULTIPLE_NODES */ -#endif /* PGXC */ - /* Check that aggregate owner has permission to call component fns */ - { - HeapTuple procTuple; - Oid aggOwner; - - procTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid)); - if (!HeapTupleIsValid(procTuple)) - ereport(ERROR, - (errcode(ERRCODE_CACHE_LOOKUP_FAILED), - errmodule(MOD_EXECUTOR), - errmsg("cache lookup failed for aggregate function %u", aggref->aggfnoid))); - aggOwner = ((Form_pg_proc)GETSTRUCT(procTuple))->proowner; - ReleaseSysCache(procTuple); - - aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, ACL_EXECUTE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid)); - if (OidIsValid(finalfn_oid)) { - aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, ACL_EXECUTE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(finalfn_oid)); - } - -#ifdef PGXC - if (OidIsValid(collectfn_oid)) { - aclresult = pg_proc_aclcheck(collectfn_oid, aggOwner, ACL_EXECUTE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(collectfn_oid)); - } -#endif /* PGXC */ - } - - /* - * Get the the number of actual arguments and identify the actual - * datatypes of the aggregate inputs (saved in inputTypes). When - * agg accepts ANY or a polymorphic type, the actual datatype - * could be different from the agg's declared input types. - */ - numArguments = get_aggregate_argtypes(aggref, inputTypes, FUNC_MAX_ARGS); - peraggstate->numArguments = numArguments; - /* Get the direct arguments */ - numDirectArgs = list_length(aggref->aggdirectargs); - /* Get the number of aggregated input columns */ - numInputs = list_length(aggref->args); - peraggstate->numInputs = numInputs; - - /* Detect the number of columns passed to the transfn */ - if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) - peraggstate->numTransInputs = numInputs; - else - peraggstate->numTransInputs = numArguments; - - /* - * When agg accepts ANY or a polymorphic type, resolve actual - * type of transition state - */ - aggtranstype = resolve_aggregate_transtype(aggref->aggfnoid, aggform->aggtranstype, inputTypes, numArguments); - - /* build expression trees using actual argument & result types */ - build_trans_aggregate_fnexprs(numArguments, - numDirectArgs, - AGGKIND_IS_ORDERED_SET(aggref->aggkind), - aggref->aggvariadic, - aggtranstype, - inputTypes, - aggref->aggtype, - aggref->inputcollid, - transfn_oid, - finalfn_oid, - &transfnexpr, - &finalfnexpr); - -#ifdef PGXC - if (OidIsValid(collectfn_oid)) { - /* we expect final function expression to be NULL in call to - * build_aggregate_fnexprs below, since InvalidOid is passed for - * finalfn_oid argument. Use a dummy expression to accept that. - */ - Expr* dummyexpr = NULL; - /* - * for XC, we need to setup the collection function expression as well. - * Use build_aggregate_fnexpr() with invalid final function oid, and collection - * function information instead of transition function information. - * We should really be adding this step inside - * build_aggregate_fnexprs() but this way it becomes easy to merge. - */ - build_aggregate_fnexprs(&aggtranstype, - 1, - aggtranstype, - aggref->aggtype, - aggref->inputcollid, - collectfn_oid, - InvalidOid, - &collectfnexpr, - &dummyexpr); - Assert(!dummyexpr); - } -#endif /* PGXC */ - - /* set up infrastructure for calling the transfn and finalfn */ - fmgr_info(transfn_oid, &peraggstate->transfn); - fmgr_info_set_expr((Node*)transfnexpr, &peraggstate->transfn); - - if (OidIsValid(finalfn_oid)) { - fmgr_info(finalfn_oid, &peraggstate->finalfn); - fmgr_info_set_expr((Node*)finalfnexpr, &peraggstate->finalfn); - } - -#ifdef PGXC - if (OidIsValid(collectfn_oid)) { - fmgr_info(collectfn_oid, &peraggstate->collectfn); - peraggstate->collectfn.fn_expr = (Node*)collectfnexpr; - } -#endif /* PGXC */ - peraggstate->aggCollation = aggref->inputcollid; - InitFunctionCallInfoData(peraggstate->transfn_fcinfo, - &peraggstate->transfn, - peraggstate->numTransInputs + 1, - peraggstate->aggCollation, - (Node*)aggstate, - NULL); - /* get info zbout relevant datatypes */ - get_typlenbyval(aggref->aggtype, &peraggstate->resulttypeLen, &peraggstate->resulttypeByVal); - get_typlenbyval(aggtranstype, &peraggstate->transtypeLen, &peraggstate->transtypeByVal); - - /* - * initval is potentially null, so don't try to access it as a struct - * field. Must do it the hard way with SysCacheGetAttr. - */ - textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitval, &peraggstate->initValueIsNull); - - if (peraggstate->initValueIsNull) { - peraggstate->initValue = (Datum)0; - } else { - peraggstate->initValue = GetAggInitVal(textInitVal, aggtranstype); - } - -#ifdef PGXC - /* - * initval for collection function is potentially null, so don't try to - * access it as a struct field. Must do it the hard way with - * SysCacheGetAttr. - */ - textInitVal = - SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitcollect, &peraggstate->initCollectValueIsNull); - - if (peraggstate->initCollectValueIsNull) { - peraggstate->initCollectValue = (Datum)0; - } else { - peraggstate->initCollectValue = GetAggInitVal(textInitVal, aggtranstype); - } -#endif /* PGXC */ - - /* - * If the transfn is strict and the initval is NULL, make sure input - * type and transtype are the same (or at least binary-compatible), so - * that it's OK to use the first input value as the initial - * transValue. This should have been checked at agg definition time, - * but just in case... - */ - if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { - if (numArguments < numDirectArgs || !IsBinaryCoercible(inputTypes[numDirectArgs], aggtranstype)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), - errmsg( - "aggregate %u needs to have compatible input type and transition type", aggref->aggfnoid))); - } - - /* - * If we're doing either DISTINCT or ORDER BY, then we have a list of - * SortGroupClause nodes; fish out the data in them and stick them - * into arrays. - * For oerdered set agg, we handle the sort operation in the transfn - * function, so we can ignore ORDER BY. - */ - if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) { - numSortCols = 0; - numDistinctCols = 0; - } else if (aggref->aggdistinct) { - sortlist = aggref->aggdistinct; - numSortCols = numDistinctCols = list_length(sortlist); - Assert(numSortCols >= list_length(aggref->aggorder)); - } else { - sortlist = aggref->aggorder; - numSortCols = list_length(sortlist); - numDistinctCols = 0; - } - - peraggstate->numSortCols = numSortCols; - peraggstate->numDistinctCols = numDistinctCols; - - if (numSortCols > 0) { - /* Get a tupledesc and slot corresponding to the aggregated inputs - * (including sort expressions) of the agg. - */ - peraggstate->sortdesc = ExecTypeFromTL(aggref->args, false); - peraggstate->sortslot = ExecInitExtraTupleSlot(estate); - ExecSetSlotDescriptor(peraggstate->sortslot, peraggstate->sortdesc); - /* - * We don't implement DISTINCT or ORDER BY aggs in the HASHED case - * (yet) - */ - Assert(node->aggstrategy != AGG_HASHED); - - /* If we have only one input, we need its len/byval info. */ - if (numInputs == 1) { - get_typlenbyval(inputTypes[numDirectArgs], &peraggstate->inputtypeLen, &peraggstate->inputtypeByVal); - } else if (numDistinctCols > 0) { - /* we will need an extra slot to store prior values */ - peraggstate->uniqslot = ExecInitExtraTupleSlot(estate); - ExecSetSlotDescriptor(peraggstate->uniqslot, peraggstate->sortdesc); - } - - /* Extract the sort information for use later */ - peraggstate->sortColIdx = (AttrNumber*)palloc(numSortCols * sizeof(AttrNumber)); - peraggstate->sortOperators = (Oid*)palloc(numSortCols * sizeof(Oid)); - peraggstate->sortCollations = (Oid*)palloc(numSortCols * sizeof(Oid)); - peraggstate->sortNullsFirst = (bool*)palloc(numSortCols * sizeof(bool)); - - i = 0; - foreach (lc, sortlist) { - SortGroupClause* sortcl = (SortGroupClause*)lfirst(lc); - TargetEntry* tle = get_sortgroupclause_tle(sortcl, aggref->args); - - /* the parser should have made sure of this */ - Assert(OidIsValid(sortcl->sortop)); - - peraggstate->sortColIdx[i] = tle->resno; - peraggstate->sortOperators[i] = sortcl->sortop; - peraggstate->sortCollations[i] = exprCollation((Node*)tle->expr); - peraggstate->sortNullsFirst[i] = sortcl->nulls_first; - i++; - } - Assert(i == numSortCols); - } - - if (aggref->aggdistinct) { - Assert(numArguments > 0); - - /* - * We need the equal function for each DISTINCT comparison we will - * make. - */ - peraggstate->equalfns = (FmgrInfo*)palloc(numDistinctCols * sizeof(FmgrInfo)); - - i = 0; - foreach (lc, aggref->aggdistinct) { - SortGroupClause* sortcl = (SortGroupClause*)lfirst(lc); - - fmgr_info(get_opcode(sortcl->eqop), &peraggstate->equalfns[i]); - i++; - } - Assert(i == numDistinctCols); - } - - ReleaseSysCache(aggTuple); + if (aggstate->ss.ps.state->es_is_flt_frame) { + exec_lookups_agg_flattened(aggstate, node, estate); + } else { + exec_lookups_agg(aggstate, node, estate); } - /* Update numaggs to match number of unique aggregates found */ - aggstate->numaggs = aggno + 1; - combined_inputeval = NIL; - column_offset = 0; - for (int transno =0;transno < aggstate->numaggs; transno++) { - AggStatePerAggData* pertrans = &peragg[transno]; - ListCell *arg; - - pertrans->inputoff = column_offset; - foreach(arg,pertrans->aggref->args) - { - TargetEntry *source_tle =(TargetEntry *)lfirst(arg); - TargetEntry *tle; - - Assert(IsA(source_tle, TargetEntry)); - tle = flatCopyTargetEntry(source_tle); - tle->resno += column_offset; - - combined_inputeval = lappend(combined_inputeval, tle); - } - column_offset += list_length(pertrans->aggref->args); - } - - aggstate->evaldesc = ExecTypeFromTL(combined_inputeval, false); - aggstate->evalslot = ExecInitExtraTupleSlot(estate); - if (estate->es_is_flt_frame) { - aggstate->evalproj = ExecBuildProjectionInfoByFlatten(combined_inputeval, aggstate->tmpcontext, aggstate->evalslot, &aggstate->ss.ps, NULL); - } else {combined_inputeval = (List *)ExecInitExprByRecursion((Expr *)combined_inputeval, (PlanState *)aggstate); - aggstate->evalproj = ExecBuildProjectionInfoByRecursion(combined_inputeval, aggstate->tmpcontext, aggstate->evalslot, NULL); - } - - ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc); - AggWriteFileControl* TempFilePara = (AggWriteFileControl*)palloc(sizeof(AggWriteFileControl)); TempFilePara->strategy = MEMORY_HASHAGG; TempFilePara->spillToDisk = false; @@ -2677,7 +2360,7 @@ Datum GetAggInitVal(Datum textInitVal, Oid transtype) void ExecEndAgg(AggState* node) { - int aggno, setno; + int setno; AggWriteFileControl* TempFileControl = (AggWriteFileControl*)node->aggTempFileControl; int numGroupingSets = Max(node->maxsets, 1); int fileNum = TempFileControl->filenum; @@ -2707,18 +2390,35 @@ void ExecEndAgg(AggState* node) node->sort_out = NULL; } - for (aggno = 0; aggno < node->numaggs; aggno++) { - AggStatePerAgg peraggstate = &node->peragg[aggno]; + if (node->ss.ps.state->es_is_flt_frame) { + for (int transno = 0; transno < node->numtrans; transno++) { + AggStatePerTrans pertrans = &node->pertrans[transno]; - for (setno = 0; setno < numGroupingSets; setno++) { - if (peraggstate->sortstates[setno]) { - tuplesort_end(peraggstate->sortstates[setno]); - peraggstate->sortstates[setno] = NULL; + for (setno = 0; setno < numGroupingSets; setno++) { + if (pertrans->sortstates[setno]) { + tuplesort_end(pertrans->sortstates[setno]); + pertrans->sortstates[setno] = NULL; + } + } + if (AGGKIND_IS_ORDERED_SET(pertrans->aggref->aggkind) && node->ss.ps.ps_ExprContext != NULL) { + /* Ensure any agg shutdown callbacks have been called */ + ReScanExprContext(node->ss.ps.ps_ExprContext); } } - if (AGGKIND_IS_ORDERED_SET(peraggstate->aggref->aggkind) && node->ss.ps.ps_ExprContext != NULL) { - /* Ensure any agg shutdown callbacks have been called */ - ReScanExprContext(node->ss.ps.ps_ExprContext); + } else { + for (int aggno = 0; aggno < node->numaggs; aggno++) { + AggStatePerAgg peraggstate = &node->peragg[aggno]; + + for (setno = 0; setno < numGroupingSets; setno++) { + if (peraggstate->sortstates[setno]) { + tuplesort_end(peraggstate->sortstates[setno]); + peraggstate->sortstates[setno] = NULL; + } + } + if (AGGKIND_IS_ORDERED_SET(peraggstate->aggref->aggkind) && node->ss.ps.ps_ExprContext != NULL) { + /* Ensure any agg shutdown callbacks have been called */ + ReScanExprContext(node->ss.ps.ps_ExprContext); + } } } @@ -2746,7 +2446,6 @@ void ExecReScanAgg(AggState* node) { ExprContext* econtext = node->ss.ps.ps_ExprContext; PlanState* outerPlan = outerPlanState(node); - int aggno; AggWriteFileControl* TempFilePara = (AggWriteFileControl*)node->aggTempFileControl; Agg* aggnode = (Agg*)node->ss.ps.plan; int numGroupingSets = Max(node->maxsets, 1); @@ -2790,13 +2489,26 @@ void ExecReScanAgg(AggState* node) } /* Make sure we have closed any open tuplesorts */ - for (aggno = 0; aggno < node->numaggs; aggno++) { - for (setno = 0; setno < numGroupingSets; setno++) { - AggStatePerAgg peraggstate = &node->peragg[aggno]; + if (node->ss.ps.state->es_is_flt_frame) { + for (int transno = 0; transno < node->numtrans; transno++) { + for (setno = 0; setno < numGroupingSets; setno++) { + AggStatePerTrans pertrans = &node->pertrans[transno]; - if (peraggstate->sortstates[setno]) { - tuplesort_end(peraggstate->sortstates[setno]); - peraggstate->sortstates[setno] = NULL; + if (pertrans->sortstates[setno]) { + tuplesort_end(pertrans->sortstates[setno]); + pertrans->sortstates[setno] = NULL; + } + } + } + } else { + for (int aggno = 0; aggno < node->numaggs; aggno++) { + for (setno = 0; setno < numGroupingSets; setno++) { + AggStatePerAgg peraggstate = &node->peragg[aggno]; + + if (peraggstate->sortstates[setno]) { + tuplesort_end(peraggstate->sortstates[setno]); + peraggstate->sortstates[setno] = NULL; + } } } } @@ -3050,7 +2762,7 @@ FORCE_INLINE void agg_spill_to_disk(AggWriteFileControl* TempFileControl, TupleH */ void ExecEarlyFreeAggregation(AggState* node) { - int aggno, setno; + int setno; AggWriteFileControl* TempFileControl = (AggWriteFileControl*)node->aggTempFileControl; int numGroupingSets = Max(node->maxsets, 1); int fileNum = TempFileControl->filenum; @@ -3084,13 +2796,26 @@ void ExecEarlyFreeAggregation(AggState* node) node->sort_out = NULL; } - for (aggno = 0; aggno < node->numaggs; aggno++) { - AggStatePerAgg peraggstate = &node->peragg[aggno]; + if (node->ss.ps.state->es_is_flt_frame) { + for (int transno = 0; transno < node->numtrans; transno++) { + AggStatePerTrans peraggtrans = &node->pertrans[transno]; - for (setno = 0; setno < numGroupingSets; setno++) { - if (peraggstate->sortstates[setno]) { - tuplesort_end(peraggstate->sortstates[setno]); - peraggstate->sortstates[setno] = NULL; + for (setno = 0; setno < numGroupingSets; setno++) { + if (peraggtrans->sortstates[setno]) { + tuplesort_end(peraggtrans->sortstates[setno]); + peraggtrans->sortstates[setno] = NULL; + } + } + } + } else { + for (int aggno = 0; aggno < node->numaggs; aggno++) { + AggStatePerAgg peraggstate = &node->peragg[aggno]; + + for (setno = 0; setno < numGroupingSets; setno++) { + if (peraggstate->sortstates[setno]) { + tuplesort_end(peraggstate->sortstates[setno]); + peraggstate->sortstates[setno] = NULL; + } } } } @@ -3143,20 +2868,32 @@ void ExecReSetAgg(AggState* node) AggWriteFileControl* TempFilePara = (AggWriteFileControl*)node->aggTempFileControl; Agg* aggnode = (Agg*)node->ss.ps.plan; int numGroupingSets = Max(node->maxsets, 1); - int aggno; int setno; errno_t rc; node->agg_done = false; node->ss.ps.ps_vec_TupFromTlist = false; /* Make sure we have closed any open tuplesorts */ - for (aggno = 0; aggno < node->numaggs; aggno++) { - for (setno = 0; setno < numGroupingSets; setno++) { - AggStatePerAgg peraggstate = &node->peragg[aggno]; + if (node->ss.ps.state->es_is_flt_frame) { + for (int transno = 0; transno < node->numtrans; transno++) { + for (setno = 0; setno < numGroupingSets; setno++) { + AggStatePerTrans peraggtrans = &node->pertrans[transno]; - if (peraggstate->sortstates[setno]) { - tuplesort_end(peraggstate->sortstates[setno]); - peraggstate->sortstates[setno] = NULL; + if (peraggtrans->sortstates[setno]) { + tuplesort_end(peraggtrans->sortstates[setno]); + peraggtrans->sortstates[setno] = NULL; + } + } + } + } else { + for (int aggno = 0; aggno < node->numaggs; aggno++) { + for (int setno = 0; setno < numGroupingSets; setno++) { + AggStatePerAgg peraggstate = &node->peragg[aggno]; + + if (peraggstate->sortstates[setno]) { + tuplesort_end(peraggstate->sortstates[setno]); + peraggstate->sortstates[setno] = NULL; + } } } } @@ -3261,3 +2998,1284 @@ void ExecReSetAgg(AggState* node) if (outerPlan->chgParam == NULL) ExecReSetRecursivePlanTree(outerPlanState(node)); } + +static void initialize_aggregate_flattened(AggState *aggstate, AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate) +{ + Plan *plan = aggstate->ss.ps.plan; + int64 local_work_mem = SET_NODEMEM(plan->operatorMemKB[0], plan->dop); + int64 max_mem = (plan->operatorMaxMem > 0) ? SET_NODEMEM(plan->operatorMaxMem, plan->dop) : 0; + + if (pertrans->numSortCols > 0) { + if (pertrans->sortstates[aggstate->current_set]) { + tuplesort_end(pertrans->sortstates[aggstate->current_set]); + } + + if (pertrans->numInputs == 1) { + pertrans->sortstates[aggstate->current_set] = + tuplesort_begin_datum(pertrans->sortdesc->attrs[0].atttypid, pertrans->sortOperators[0], + pertrans->sortCollations[0], pertrans->sortNullsFirst[0], local_work_mem, false); + } else { + pertrans->sortstates[aggstate->current_set] = + tuplesort_begin_heap(pertrans->sortdesc, pertrans->numSortCols, pertrans->sortColIdx, + pertrans->sortOperators, pertrans->sortCollations, pertrans->sortNullsFirst, + local_work_mem, false, max_mem, plan->plan_node_id, SET_DOP(plan->dop)); + } + } + + if (pertrans->initValueIsNull) { + pergroupstate->transValue = pertrans->initValue; + } else { + MemoryContext oldContext; + oldContext = MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); + pergroupstate->transValue = datumCopy(pertrans->initValue, pertrans->transtypeByVal, pertrans->transtypeLen); + MemoryContextSwitchTo(oldContext); + } + + pergroupstate->transValueIsNull = pertrans->initValueIsNull; + pergroupstate->noTransValue = pertrans->initValueIsNull; + + if (pertrans->initCollectValueIsNull) { + pergroupstate->collectValue = pertrans->initCollectValue; + } else { + MemoryContext oldContext; + oldContext = MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); + pergroupstate->collectValue = + datumCopy(pertrans->initCollectValue, pertrans->transtypeByVal, pertrans->transtypeLen); + MemoryContextSwitchTo(oldContext); + } + pergroupstate->collectValueIsNull = pertrans->initCollectValueIsNull; + pergroupstate->noCollectValue = pertrans->initCollectValueIsNull; +} + +static void initialize_aggregates_flattened(AggState *aggstate, AggStatePerGroup pergroup, int numReset) +{ + int transno; + int numGroupingSets = Max(aggstate->phase->numsets, 1); + int setno = 0; + AggStatePerTrans transstates = aggstate->pertrans; + + if (numReset < 1) { + numReset = numGroupingSets; + } + + for (transno = 0; transno < aggstate->numtrans; transno++) { + AggStatePerTrans pertrans = &transstates[transno]; + + for (setno = 0; setno < numReset; setno++) { + AggStatePerGroup pergroupstate; + pergroupstate = &pergroup[transno + (setno * (aggstate->numtrans))]; + aggstate->current_set = setno; + initialize_aggregate_flattened(aggstate, pertrans, pergroupstate); + } + } +} + +static void advance_transition_function_flattened(AggState *aggstate, AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate) +{ + FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; + int numTransInputs = pertrans->numTransInputs; + MemoryContext oldContext; + Datum newVal; + int i; + + if (pertrans->transfn.fn_strict) { + int numTransInputs = pertrans->numTransInputs; + int i; + + for (i = 1; i <= numTransInputs; i++) { + if (fcinfo->argnull[i]) { + return; + } + } + if (pergroupstate->noTransValue) { + oldContext = MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); + pergroupstate->transValue = datumCopy(fcinfo->arg[1], pertrans->transtypeByVal, pertrans->transtypeLen); + pergroupstate->transValueIsNull = false; + pergroupstate->noTransValue = false; + MemoryContextSwitchTo(oldContext); + return; + } + if (pergroupstate->transValueIsNull) { + return; + } + } + + oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); + aggstate->curpertrans = pertrans; + + fcinfo->arg[0] = pergroupstate->transValue; + fcinfo->argnull[0] = pergroupstate->transValueIsNull; + fcinfo->argTypes[0] = InvalidOid; + fcinfo->isnull = false; + + Node *origin_fcxt = fcinfo->context; + if (IS_PGXC_DATANODE && pertrans->is_avg) { + Node *fcontext = (Node *)palloc0(sizeof(Node)); + fcontext->type = (NodeTag)(pertrans->is_avg); + fcinfo->context = fcontext; + } + + newVal = FunctionCallInvoke(fcinfo); + aggstate->curpertrans = NULL; + fcinfo->context = origin_fcxt; + if (!pertrans->transtypeByVal && DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue)) { + if (!fcinfo->isnull) { + MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]); + newVal = datumCopy(newVal, pertrans->transtypeByVal, pertrans->transtypeLen); + } + if (!pergroupstate->transValueIsNull) + pfree(DatumGetPointer(pergroupstate->transValue)); + } + + pergroupstate->transValue = newVal; + pergroupstate->transValueIsNull = fcinfo->isnull; + MemoryContextSwitchTo(oldContext); +} + +static void advance_aggregates_flattened(AggState *aggstate, AggStatePerGroup pergroup) +{ + bool dummynull; + + ExecEvalExprSwitchContext(aggstate->phase->evaltrans, aggstate->tmpcontext, &dummynull, NULL); +} + +static void process_ordered_aggregate_single_flattened(AggState *aggstate, AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate) +{ + Datum oldVal = (Datum)0; + bool oldIsNull = true; + bool haveOldVal = false; + MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; + MemoryContext oldContext; + bool isDistinct = (pertrans->numDistinctCols > 0); + FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; + Datum *newVal = NULL; + bool *isNull = NULL; + + Assert(pertrans->numDistinctCols < 2); + + tuplesort_performsort(pertrans->sortstates[aggstate->current_set]); + + InitFunctionCallInfoArgs(*fcinfo, pertrans->numArguments + 1, 1); + + newVal = fcinfo->arg + 1; + isNull = fcinfo->argnull + 1; + + while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set], true, newVal, isNull)) { + MemoryContextReset(workcontext); + oldContext = MemoryContextSwitchTo(workcontext); + + if (isDistinct && haveOldVal && + ((oldIsNull && *isNull) || + (!oldIsNull && !*isNull && DatumGetBool(FunctionCall2(&pertrans->equalfns[0], oldVal, *newVal))))) { + if (!pertrans->inputtypeByVal && !*isNull) + pfree(DatumGetPointer(*newVal)); + } else { + advance_transition_function_flattened(aggstate, pertrans, pergroupstate); + if (!oldIsNull && !pertrans->inputtypeByVal) + pfree(DatumGetPointer(oldVal)); + oldVal = *newVal; + oldIsNull = *isNull; + haveOldVal = true; + } + + MemoryContextSwitchTo(oldContext); + } + + if (!oldIsNull && !pertrans->inputtypeByVal) + pfree(DatumGetPointer(oldVal)); + + tuplesort_end(pertrans->sortstates[aggstate->current_set]); + pertrans->sortstates[aggstate->current_set] = NULL; +} + +static void process_ordered_aggregate_multi_flattened(AggState *aggstate, AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate) +{ + MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; + FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; + TupleTableSlot *slot1 = pertrans->sortslot; + TupleTableSlot *slot2 = pertrans->uniqslot; + int numTransInputs = pertrans->numTransInputs; + int numDistinctCols = pertrans->numDistinctCols; + Oid* sortCollations = pertrans->sortCollations; + Datum newAbbrevVal = (Datum)0; + Datum oldAbbrevVal = (Datum)0; + bool haveOldValue = false; + int i; + + tuplesort_performsort(pertrans->sortstates[aggstate->current_set]); + + (void)ExecClearTuple(slot1); + if (slot2 != NULL) { + (void)ExecClearTuple(slot2); + } + + while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set], true, slot1, &newAbbrevVal)) { + tableam_tslot_getsomeattrs(slot1, numTransInputs); + + if (numDistinctCols == 0 || !haveOldValue || newAbbrevVal != oldAbbrevVal || + !execTuplesMatch(slot1, slot2, numDistinctCols, pertrans->sortColIdx, pertrans->equalfns, workcontext, + sortCollations)) { + InitFunctionCallInfoArgs(*fcinfo, numTransInputs + 1, 1); + + for (i = 0; i < numTransInputs; i++) { + fcinfo->arg[i + 1] = slot1->tts_values[i]; + fcinfo->argnull[i + 1] = slot1->tts_isnull[i]; + } + + advance_transition_function_flattened(aggstate, pertrans, pergroupstate); + + if (numDistinctCols > 0) { + TupleTableSlot *tmpslot = slot2; + + slot2 = slot1; + slot1 = tmpslot; + oldAbbrevVal = newAbbrevVal; + haveOldValue = true; + } + } + + if (numDistinctCols == 0) { + MemoryContextReset(workcontext); + } + + (void)ExecClearTuple(slot1); + } + + if (slot2 != NULL) { + (void)ExecClearTuple(slot2); + } + + tuplesort_end(pertrans->sortstates[aggstate->current_set]); + pertrans->sortstates[aggstate->current_set] = NULL; +} + +static void finalize_aggregate_flattened(AggState *aggstate, AggStatePerAggForFlattenedExpr peragg, + AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull) +{ + bool anynull = false; + FunctionCallInfoData fcinfo; + int args_pos = 1; + int numFinalArgs = 1; + MemoryContext oldContext; + ListCell *lc = NULL; + AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno]; + + oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); + if (AGGKIND_IS_ORDERED_SET(pertrans->aggref->aggkind)) { + numFinalArgs += peragg->numFinalArgs; + } + + InitFunctionCallInfoArgs(fcinfo, numFinalArgs, 1); + + foreach (lc, peragg->aggdirectargs) { + fcinfo.arg[args_pos] = + ExecEvalExpr((ExprState *)lfirst(lc), aggstate->ss.ps.ps_ExprContext, &fcinfo.argnull[args_pos], NULL); + fcinfo.argTypes[args_pos] = ((ExprState *)lfirst(lc))->resultType; + if (anynull == true || fcinfo.argnull[args_pos] == true) { + anynull = true; + } else { + anynull = false; + } + args_pos++; + } + + if ((pertrans->aggref->aggstage > 0 || aggstate->is_final) && need_adjust_agg_inner_func_type(pertrans->aggref)) { + pergroupstate->transValue = pergroupstate->collectValue; + + pergroupstate->transValueIsNull = pergroupstate->collectValueIsNull; + } + + Assert(args_pos <= numFinalArgs); + if (OidIsValid(peragg->finalfn_oid)) { + aggstate->curpertrans = pertrans; + + InitFunctionCallInfoData(fcinfo, &(peragg->finalfn), numFinalArgs, pertrans->aggCollation, (Node *)aggstate, + NULL); + fcinfo.arg[0] = pergroupstate->transValue; + fcinfo.argnull[0] = pergroupstate->transValueIsNull; + fcinfo.argTypes[0] = InvalidOid; + if (anynull == true || pergroupstate->transValueIsNull == true) { + anynull = true; + } else { + anynull = false; + } + while (args_pos < numFinalArgs) { + fcinfo.arg[args_pos] = (Datum)0; + fcinfo.argnull[args_pos] = true; + fcinfo.argTypes[args_pos] = InvalidOid; + args_pos++; + anynull = true; + } + + if (fcinfo.flinfo->fn_strict && anynull) { + *resultVal = (Datum)0; + *resultIsNull = true; + } else { + *resultVal = FunctionCallInvoke(&fcinfo); + *resultIsNull = fcinfo.isnull; + } + aggstate->curpertrans = NULL; + } else { + *resultVal = pergroupstate->transValue; + *resultIsNull = pergroupstate->transValueIsNull; + } + + if (!peragg->resulttypeByVal && !*resultIsNull && + !MemoryContextContains(CurrentMemoryContext, DatumGetPointer(*resultVal))) { + *resultVal = datumCopy(*resultVal, peragg->resulttypeByVal, peragg->resulttypeLen); + } + + MemoryContextSwitchTo(oldContext); +} + +static void finalize_aggregates_flattened(AggState *aggstate, AggStatePerAggForFlattenedExpr peraggs, + AggStatePerGroup pergroup, int currentSet) +{ + ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; + Datum *aggvalues = econtext->ecxt_aggvalues; + bool *aggnulls = econtext->ecxt_aggnulls; + int aggno; + + Assert(currentSet == 0 || ((Agg *)aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); + + aggstate->current_set = currentSet; + + for (aggno = 0; aggno < aggstate->numaggs; aggno++) { + AggStatePerAggForFlattenedExpr peragg = &peraggs[aggno]; + int transno = peragg->transno; + AggStatePerTrans pertrans = &aggstate->pertrans[transno]; + AggStatePerGroup pergroupstate; + + pergroupstate = &pergroup[transno + (currentSet * (aggstate->numtrans))]; + + if (pertrans->numSortCols > 0 && pertrans->sortstates[aggstate->current_set] != NULL) { + Assert(((Agg *)aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); + + if (pertrans->numInputs == 1) { + process_ordered_aggregate_single_flattened(aggstate, pertrans, pergroupstate); + } else { + process_ordered_aggregate_multi_flattened(aggstate, pertrans, pergroupstate); + } + } + + finalize_aggregate_flattened(aggstate, peragg, pergroupstate, &aggvalues[aggno], &aggnulls[aggno]); + } +} + +static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, Aggref *aggref, + Oid aggtransfn, Oid aggtranstype, Datum initValue, bool initValueIsNull, + Oid *inputTypes, int numArguments) +{ + int numGroupingSets = Max(aggstate->maxsets, 1); + Expr *transfnexpr; + ListCell *lc; + int numInputs; + int numDirectArgs; + List *sortlist; + int numSortCols; + int numDistinctCols; + int i; + + pertrans->aggref = aggref; + pertrans->aggCollation = aggref->inputcollid; + pertrans->transfn_oid = aggtransfn; + pertrans->initValue = initValue; + pertrans->initValueIsNull = initValueIsNull; + + numDirectArgs = list_length(aggref->aggdirectargs); + pertrans->numInputs = numInputs = list_length(aggref->args); + pertrans->aggtranstype = aggtranstype; + + if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) { + pertrans->numTransInputs = numInputs; + } else { + pertrans->numTransInputs = numArguments; + } + + build_aggregate_transfn_expr(inputTypes, numArguments, numDirectArgs, aggref->aggvariadic, aggtranstype, + aggref->inputcollid, aggtransfn, &transfnexpr); + fmgr_info(aggtransfn, &pertrans->transfn); + fmgr_info_set_expr((Node *)transfnexpr, &pertrans->transfn); + + InitFunctionCallInfoData(pertrans->transfn_fcinfo, &pertrans->transfn, pertrans->numTransInputs + 1, + pertrans->aggCollation, (Node *)aggstate, NULL); + + if (pertrans->transfn.fn_strict && pertrans->initValueIsNull) { + if (numArguments <= numDirectArgs || !IsBinaryCoercible(inputTypes[numDirectArgs], aggtranstype)) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate %u needs to have compatible input type and transition type", aggref->aggfnoid))); + } + } + + get_typlenbyval(aggtranstype, &pertrans->transtypeLen, &pertrans->transtypeByVal); + + if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) { + sortlist = NIL; + numSortCols = numDistinctCols = 0; + } else if (aggref->aggdistinct) { + sortlist = aggref->aggdistinct; + numSortCols = numDistinctCols = list_length(sortlist); + Assert(numSortCols >= list_length(aggref->aggorder)); + } else { + sortlist = aggref->aggorder; + numSortCols = list_length(sortlist); + numDistinctCols = 0; + } + + pertrans->numSortCols = numSortCols; + pertrans->numDistinctCols = numDistinctCols; + + if (numSortCols > 0) { + pertrans->sortdesc = ExecTypeFromTL(aggref->args, false); + pertrans->sortslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(pertrans->sortslot, pertrans->sortdesc); + + Assert(((Agg *)aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); + + if (numInputs == 1) { + get_typlenbyval(inputTypes[numDirectArgs], &pertrans->inputtypeLen, &pertrans->inputtypeByVal); + } else if (numDistinctCols > 0) { + pertrans->uniqslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(pertrans->uniqslot, pertrans->sortdesc); + } + + pertrans->sortColIdx = (AttrNumber *)palloc(numSortCols * sizeof(AttrNumber)); + pertrans->sortOperators = (Oid *)palloc(numSortCols * sizeof(Oid)); + pertrans->sortCollations = (Oid *)palloc(numSortCols * sizeof(Oid)); + pertrans->sortNullsFirst = (bool *)palloc(numSortCols * sizeof(bool)); + + i = 0; + foreach (lc, sortlist) { + SortGroupClause *sortcl = (SortGroupClause *)lfirst(lc); + TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args); + + Assert(OidIsValid(sortcl->sortop)); + + pertrans->sortColIdx[i] = tle->resno; + pertrans->sortOperators[i] = sortcl->sortop; + pertrans->sortCollations[i] = exprCollation((Node *)tle->expr); + pertrans->sortNullsFirst[i] = sortcl->nulls_first; + i++; + } + Assert(i == numSortCols); + } + + if (aggref->aggdistinct) { + Assert(numArguments > 0); + + pertrans->equalfns = (FmgrInfo *)palloc(numDistinctCols * sizeof(FmgrInfo)); + + i = 0; + foreach (lc, aggref->aggdistinct) { + SortGroupClause *sortcl = (SortGroupClause *)lfirst(lc); + fmgr_info(get_opcode(sortcl->eqop), &pertrans->equalfns[i]); + i++; + } + Assert(i == numDistinctCols); + } + + pertrans->sortstates = (Tuplesortstate **)palloc0(sizeof(Tuplesortstate *) * numGroupingSets); +} + +static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, int lastaggno, List **same_input_transnos) +{ + int aggno; + AggStatePerAggForFlattenedExpr peraggs; + + *same_input_transnos = NIL; + + if (contain_volatile_functions((Node *)newagg)) { + return -1; + } + + peraggs = aggstate->peragg_flattened; + + for (aggno = 0; aggno <= lastaggno; aggno++) { + AggStatePerAggForFlattenedExpr peragg; + Aggref *existingRef; + + peragg = &peraggs[aggno]; + existingRef = peragg->aggref; + + if (newagg->inputcollid != existingRef->inputcollid || newagg->aggstar != existingRef->aggstar || + newagg->aggvariadic != existingRef->aggvariadic || newagg->aggkind != existingRef->aggkind || + !equal(newagg->aggdirectargs, existingRef->aggdirectargs) || !equal(newagg->args, existingRef->args) || + !equal(newagg->aggorder, existingRef->aggorder) || !equal(newagg->aggdistinct, existingRef->aggdistinct)) + continue; + + if (newagg->aggfnoid == existingRef->aggfnoid && newagg->aggtype == existingRef->aggtype && + newagg->aggcollid == existingRef->aggcollid) { + list_free(*same_input_transnos); + *same_input_transnos = NIL; + return aggno; + } + + *same_input_transnos = lappend_int(*same_input_transnos, peragg->transno); + } + + return -1; +} + +static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, Oid aggtransfn, Oid aggtranstype, + Datum initValue, bool initValueIsNull, List *transnos) +{ + ListCell *lc; + + foreach (lc, transnos) { + int transno = lfirst_int(lc); + AggStatePerTrans pertrans = &aggstate->pertrans[transno]; + + if (aggtransfn != pertrans->transfn_oid || aggtranstype != pertrans->aggtranstype) { + continue; + } + + if (initValueIsNull && pertrans->initValueIsNull) { + return transno; + } + + if (!initValueIsNull && !pertrans->initValueIsNull && + datumIsEqual(initValue, pertrans->initValue, pertrans->transtypeByVal, pertrans->transtypeLen)) { + return transno; + } + } + return -1; +} + +static void exec_lookups_agg(AggState *aggstate, Agg *node, EState *estate) +{ + int aggno; + ListCell *l = NULL; + int i = 0; + List *combined_inputeval; + int column_offset; + AggStatePerAgg peragg = aggstate->peragg; + int numGroupingSets = aggstate->maxsets; + + /* + * Perform lookups of aggregate function info, and initialize the + * unchanging fields of the per-agg data. We also detect duplicate + * aggregates (for example, "SELECT sum(x) ... HAVING sum(x) > 0"). When + * duplicates are detected, we only make an AggStatePerAgg struct for the + * first one. The clones are simply pointed at the same result entry by + * giving them duplicate aggno values. + */ + aggno = -1; + foreach (l, aggstate->aggs) { + AggrefExprState *aggrefstate = (AggrefExprState *)lfirst(l); + Aggref *aggref = (Aggref *)aggrefstate->xprstate.expr; + AggStatePerAgg peraggstate; + Oid inputTypes[FUNC_MAX_ARGS]; + int numArguments; + int numDirectArgs; + int numInputs; + int numSortCols; + int numDistinctCols; + List *sortlist = NIL; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + Oid aggtranstype; + AclResult aclresult; + Oid transfn_oid, finalfn_oid; +#ifdef PGXC + Oid collectfn_oid; + Expr *collectfnexpr = NULL; +#endif /* PGXC */ + Expr *transfnexpr = NULL; + Expr *finalfnexpr = NULL; + Datum textInitVal; + ListCell *lc = NULL; + + /* Planner should have assigned aggregate to correct level */ + Assert(aggref->agglevelsup == 0); + + /* Look for a previous duplicate aggregate */ + for (i = 0; i <= aggno; i++) { + if (equal(aggref, peragg[i].aggref) && !contain_volatile_functions((Node *)aggref)) + break; + } + if (i <= aggno) { + /* Found a match to an existing entry, so just mark it */ + aggrefstate->aggno = i; + continue; + } + + /* Nope, so assign a new PerAgg record */ + peraggstate = &peragg[++aggno]; + + /* Mark Aggref state node with assigned index in the result array */ + aggrefstate->aggno = aggno; + + /* Fill in the peraggstate data */ + peraggstate->aggrefstate = aggrefstate; + peraggstate->aggref = aggref; + + peraggstate->sortstates = (Tuplesortstate **)palloc0(sizeof(Tuplesortstate *) * numGroupingSets); + + for (int currentsortno = 0; currentsortno < numGroupingSets; currentsortno++) + peraggstate->sortstates[currentsortno] = NULL; + + /* Fetch the pg_aggregate row */ + aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_EXECUTOR), + errmsg("cache lookup failed for aggregate %u", aggref->aggfnoid))); + aggform = (Form_pg_aggregate)GETSTRUCT(aggTuple); + + /* Check permission to call aggregate function */ + aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(aggref->aggfnoid)); + + peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; +#ifdef PGXC + peraggstate->collectfn_oid = collectfn_oid = aggform->aggcollectfn; + + peraggstate->is_avg = false; + if (finalfn_oid == 1830) { + peraggstate->is_avg = true; + } +#ifdef ENABLE_MULTIPLE_NODES + /* + * For PGXC final and collection functions are used to combine results at Coordinator, + * disable those for Datanode + */ + if (IS_PGXC_DATANODE) { + if (!u_sess->exec_cxt.under_stream_runtime) { + peraggstate->finalfn_oid = finalfn_oid = InvalidOid; + peraggstate->collectfn_oid = collectfn_oid = InvalidOid; + } else { + if (need_adjust_agg_inner_func_type(peraggstate->aggref)) { + if (!node->is_final && !node->single_node) + peraggstate->finalfn_oid = finalfn_oid = InvalidOid; + if (aggref->aggstage == 0 && !node->is_final && !node->single_node) + peraggstate->collectfn_oid = collectfn_oid = InvalidOid; + } + } + } +#else + if (IS_STREAM_PLAN || StreamThreadAmI()) { + if (need_adjust_agg_inner_func_type(peraggstate->aggref)) { + if (!node->is_final && !node->single_node) { + peraggstate->finalfn_oid = finalfn_oid = InvalidOid; + } + if (aggref->aggstage == 0 && !node->is_final && !node->single_node) { + peraggstate->collectfn_oid = collectfn_oid = InvalidOid; + } + } + } +#endif /* ENABLE_MULTIPLE_NODES */ +#endif /* PGXC */ + /* Check that aggregate owner has permission to call component fns */ + { + HeapTuple procTuple; + Oid aggOwner; + + procTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(procTuple)) + ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_EXECUTOR), + errmsg("cache lookup failed for aggregate function %u", aggref->aggfnoid))); + aggOwner = ((Form_pg_proc)GETSTRUCT(procTuple))->proowner; + ReleaseSysCache(procTuple); + + aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid)); + if (OidIsValid(finalfn_oid)) { + aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(finalfn_oid)); + } + +#ifdef PGXC + if (OidIsValid(collectfn_oid)) { + aclresult = pg_proc_aclcheck(collectfn_oid, aggOwner, ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(collectfn_oid)); + } +#endif /* PGXC */ + } + + /* + * Get the the number of actual arguments and identify the actual + * datatypes of the aggregate inputs (saved in inputTypes). When + * agg accepts ANY or a polymorphic type, the actual datatype + * could be different from the agg's declared input types. + */ + numArguments = get_aggregate_argtypes(aggref, inputTypes, FUNC_MAX_ARGS); + peraggstate->numArguments = numArguments; + /* Get the direct arguments */ + numDirectArgs = list_length(aggref->aggdirectargs); + /* Get the number of aggregated input columns */ + numInputs = list_length(aggref->args); + peraggstate->numInputs = numInputs; + + /* Detect the number of columns passed to the transfn */ + if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) + peraggstate->numTransInputs = numInputs; + else + peraggstate->numTransInputs = numArguments; + + /* + * When agg accepts ANY or a polymorphic type, resolve actual + * type of transition state + */ + aggtranstype = resolve_aggregate_transtype(aggref->aggfnoid, aggform->aggtranstype, inputTypes, numArguments); + + /* build expression trees using actual argument & result types */ + build_trans_aggregate_fnexprs(numArguments, numDirectArgs, AGGKIND_IS_ORDERED_SET(aggref->aggkind), + aggref->aggvariadic, aggtranstype, inputTypes, aggref->aggtype, + aggref->inputcollid, transfn_oid, finalfn_oid, &transfnexpr, &finalfnexpr); + +#ifdef PGXC + if (OidIsValid(collectfn_oid)) { + /* we expect final function expression to be NULL in call to + * build_aggregate_fnexprs below, since InvalidOid is passed for + * finalfn_oid argument. Use a dummy expression to accept that. + */ + Expr *dummyexpr = NULL; + /* + * for XC, we need to setup the collection function expression as well. + * Use build_aggregate_fnexpr() with invalid final function oid, and collection + * function information instead of transition function information. + * We should really be adding this step inside + * build_aggregate_fnexprs() but this way it becomes easy to merge. + */ + build_aggregate_fnexprs(&aggtranstype, 1, aggtranstype, aggref->aggtype, aggref->inputcollid, collectfn_oid, + InvalidOid, &collectfnexpr, &dummyexpr); + Assert(!dummyexpr); + } +#endif /* PGXC */ + + /* set up infrastructure for calling the transfn and finalfn */ + fmgr_info(transfn_oid, &peraggstate->transfn); + fmgr_info_set_expr((Node *)transfnexpr, &peraggstate->transfn); + + if (OidIsValid(finalfn_oid)) { + fmgr_info(finalfn_oid, &peraggstate->finalfn); + fmgr_info_set_expr((Node *)finalfnexpr, &peraggstate->finalfn); + } + +#ifdef PGXC + if (OidIsValid(collectfn_oid)) { + fmgr_info(collectfn_oid, &peraggstate->collectfn); + peraggstate->collectfn.fn_expr = (Node *)collectfnexpr; + } +#endif /* PGXC */ + peraggstate->aggCollation = aggref->inputcollid; + InitFunctionCallInfoData(peraggstate->transfn_fcinfo, &peraggstate->transfn, peraggstate->numTransInputs + 1, + peraggstate->aggCollation, (Node *)aggstate, NULL); + /* get info zbout relevant datatypes */ + get_typlenbyval(aggref->aggtype, &peraggstate->resulttypeLen, &peraggstate->resulttypeByVal); + get_typlenbyval(aggtranstype, &peraggstate->transtypeLen, &peraggstate->transtypeByVal); + + /* + * initval is potentially null, so don't try to access it as a struct + * field. Must do it the hard way with SysCacheGetAttr. + */ + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitval, &peraggstate->initValueIsNull); + + if (peraggstate->initValueIsNull) { + peraggstate->initValue = (Datum)0; + } else { + peraggstate->initValue = GetAggInitVal(textInitVal, aggtranstype); + } + +#ifdef PGXC + /* + * initval for collection function is potentially null, so don't try to + * access it as a struct field. Must do it the hard way with + * SysCacheGetAttr. + */ + textInitVal = + SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitcollect, &peraggstate->initCollectValueIsNull); + + if (peraggstate->initCollectValueIsNull) { + peraggstate->initCollectValue = (Datum)0; + } else { + peraggstate->initCollectValue = GetAggInitVal(textInitVal, aggtranstype); + } +#endif /* PGXC */ + + /* + * If the transfn is strict and the initval is NULL, make sure input + * type and transtype are the same (or at least binary-compatible), so + * that it's OK to use the first input value as the initial + * transValue. This should have been checked at agg definition time, + * but just in case... + */ + if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { + if (numArguments < numDirectArgs || !IsBinaryCoercible(inputTypes[numDirectArgs], aggtranstype)) + ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate %u needs to have compatible input type and transition type", + aggref->aggfnoid))); + } + + /* + * If we're doing either DISTINCT or ORDER BY, then we have a list of + * SortGroupClause nodes; fish out the data in them and stick them + * into arrays. + * For oerdered set agg, we handle the sort operation in the transfn + * function, so we can ignore ORDER BY. + */ + if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) { + numSortCols = 0; + numDistinctCols = 0; + } else if (aggref->aggdistinct) { + sortlist = aggref->aggdistinct; + numSortCols = numDistinctCols = list_length(sortlist); + Assert(numSortCols >= list_length(aggref->aggorder)); + } else { + sortlist = aggref->aggorder; + numSortCols = list_length(sortlist); + numDistinctCols = 0; + } + + peraggstate->numSortCols = numSortCols; + peraggstate->numDistinctCols = numDistinctCols; + + if (numSortCols > 0) { + /* Get a tupledesc and slot corresponding to the aggregated inputs + * (including sort expressions) of the agg. + */ + peraggstate->sortdesc = ExecTypeFromTL(aggref->args, false); + peraggstate->sortslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(peraggstate->sortslot, peraggstate->sortdesc); + /* + * We don't implement DISTINCT or ORDER BY aggs in the HASHED case + * (yet) + */ + Assert(node->aggstrategy != AGG_HASHED); + + /* If we have only one input, we need its len/byval info. */ + if (numInputs == 1) { + get_typlenbyval(inputTypes[numDirectArgs], &peraggstate->inputtypeLen, &peraggstate->inputtypeByVal); + } else if (numDistinctCols > 0) { + /* we will need an extra slot to store prior values */ + peraggstate->uniqslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(peraggstate->uniqslot, peraggstate->sortdesc); + } + + /* Extract the sort information for use later */ + peraggstate->sortColIdx = (AttrNumber *)palloc(numSortCols * sizeof(AttrNumber)); + peraggstate->sortOperators = (Oid *)palloc(numSortCols * sizeof(Oid)); + peraggstate->sortCollations = (Oid *)palloc(numSortCols * sizeof(Oid)); + peraggstate->sortNullsFirst = (bool *)palloc(numSortCols * sizeof(bool)); + + i = 0; + foreach (lc, sortlist) { + SortGroupClause *sortcl = (SortGroupClause *)lfirst(lc); + TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args); + + /* the parser should have made sure of this */ + Assert(OidIsValid(sortcl->sortop)); + + peraggstate->sortColIdx[i] = tle->resno; + peraggstate->sortOperators[i] = sortcl->sortop; + peraggstate->sortCollations[i] = exprCollation((Node *)tle->expr); + peraggstate->sortNullsFirst[i] = sortcl->nulls_first; + i++; + } + Assert(i == numSortCols); + } + + if (aggref->aggdistinct) { + Assert(numArguments > 0); + + /* + * We need the equal function for each DISTINCT comparison we will + * make. + */ + peraggstate->equalfns = (FmgrInfo *)palloc(numDistinctCols * sizeof(FmgrInfo)); + + i = 0; + foreach (lc, aggref->aggdistinct) { + SortGroupClause *sortcl = (SortGroupClause *)lfirst(lc); + + fmgr_info(get_opcode(sortcl->eqop), &peraggstate->equalfns[i]); + i++; + } + Assert(i == numDistinctCols); + } + + ReleaseSysCache(aggTuple); + } + + /* Update numaggs to match number of unique aggregates found */ + aggstate->numaggs = aggno + 1; + combined_inputeval = NIL; + column_offset = 0; + for (int transno = 0; transno < aggstate->numaggs; transno++) { + AggStatePerAggData *pertrans = &peragg[transno]; + ListCell *arg; + + pertrans->inputoff = column_offset; + foreach (arg, pertrans->aggref->args) { + TargetEntry *source_tle = (TargetEntry *)lfirst(arg); + TargetEntry *tle; + + Assert(IsA(source_tle, TargetEntry)); + tle = flatCopyTargetEntry(source_tle); + tle->resno += column_offset; + + combined_inputeval = lappend(combined_inputeval, tle); + } + column_offset += list_length(pertrans->aggref->args); + } + + aggstate->evaldesc = ExecTypeFromTL(combined_inputeval, false); + aggstate->evalslot = ExecInitExtraTupleSlot(estate); + if (estate->es_is_flt_frame) { + aggstate->evalproj = ExecBuildProjectionInfoByFlatten(combined_inputeval, aggstate->tmpcontext, + aggstate->evalslot, &aggstate->ss.ps, NULL); + } else { + combined_inputeval = (List *)ExecInitExprByRecursion((Expr *)combined_inputeval, (PlanState *)aggstate); + aggstate->evalproj = + ExecBuildProjectionInfoByRecursion(combined_inputeval, aggstate->tmpcontext, aggstate->evalslot, NULL); + } + + ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc); + + return; +} + +static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *estate) +{ + int aggno = -1; + int transno = -1; + int numaggrefs; + ListCell *l = NULL; + AggStatePerAggForFlattenedExpr peragg_flattened; + AggStatePerTrans pertransstates; + + numaggrefs = list_length(aggstate->aggs); + pertransstates = aggstate->pertrans; + peragg_flattened = aggstate->peragg_flattened; + + /* ----------------- + * Perform lookups of aggregate function info, and initialize the + * unchanging fields of the per-agg and per-trans data. + * + * We try to optimize by detecting duplicate aggregate functions so that + * their state and final values are re-used, rather than needlessly being + * re-calculated independently. We also detect aggregates that are not + * the same, but which can share the same transition state. + * + * Scenarios: + * + * 1. An aggregate function appears more than once in query: + * + * SELECT SUM(x) FROM ... HAVING SUM(x) > 0 + * + * Since the aggregates are the identical, we only need to calculate + * the calculate it once. Both aggregates will share the same 'aggno' + * value. + * + * 2. Two different aggregate functions appear in the query, but the + * aggregates have the same transition function and initial value, but + * different final function: + * + * SELECT SUM(x), AVG(x) FROM ... + * + * In this case we must create a new peragg for the varying aggregate, + * and need to call the final functions separately, but can share the + * same transition state. + * + * For either of these optimizations to be valid, the aggregate's + * arguments must be the same, including any modifiers such as ORDER BY, + * DISTINCT and FILTER, and they mustn't contain any volatile functions. + * ----------------- + */ + aggno = -1; + transno = -1; + foreach (l, aggstate->aggs) { + AggrefExprState *aggrefstate = (AggrefExprState *)lfirst(l); + Aggref *aggref = aggrefstate->aggref; + AggStatePerAggForFlattenedExpr peragg; + AggStatePerTrans pertrans; + int existing_aggno; + int existing_transno; + List *same_input_transnos; + Oid inputTypes[FUNC_MAX_ARGS]; + int numArguments; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + AclResult aclresult; + Oid transfn_oid, finalfn_oid; + Oid collectfn_oid; + Expr *collectfnexpr = NULL; + Expr *finalfnexpr = NULL; + Oid aggtranstype; + + Datum textInitVal; + Datum initValue; + bool initValueIsNull; + + Datum initCollectValue; + bool initCollectValueIsNull; + + /* Planner should have assigned aggregate to correct level */ + Assert(aggref->agglevelsup == 0); + + /* 1. Check for already processed aggs which can be re-used */ + existing_aggno = find_compatible_peragg(aggref, aggstate, aggno, &same_input_transnos); + if (existing_aggno != -1) { + /* + * Existing compatible agg found. so just point the Aggref to the + * same per-agg struct. + */ + aggrefstate->aggno = existing_aggno; + continue; + } + + /* Mark Aggref state node with assigned index in the result array */ + peragg = &peragg_flattened[++aggno]; + peragg->aggref = aggref; + aggrefstate->aggno = aggno; + + /* Fetch the pg_aggregate row */ + aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_EXECUTOR), + errmsg("cache lookup failed for aggregate %u", aggref->aggfnoid))); + aggform = (Form_pg_aggregate)GETSTRUCT(aggTuple); + + /* Check permission to call aggregate function */ + aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(aggref->aggfnoid)); + + transfn_oid = aggform->aggtransfn; + peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; + + collectfn_oid = aggform->aggcollectfn; + + peragg->is_avg = false; + if (finalfn_oid == 1830) { + peragg->is_avg = true; + } +#ifdef ENABLE_MULTIPLE_NODES + /* + * For PGXC final and collection functions are used to combine results at Coordinator, + * disable those for Datanode + */ + if (IS_PGXC_DATANODE) { + if (!u_sess->exec_cxt.under_stream_runtime) { + peragg->finalfn_oid = finalfn_oid = InvalidOid; + collectfn_oid = InvalidOid; + } else { + if (need_adjust_agg_inner_func_type(peraggstate->aggref)) { + if (!node->is_final && !node->single_node) + peragg->finalfn_oid = finalfn_oid = InvalidOid; + if (aggref->aggstage == 0 && !node->is_final && !node->single_node) + collectfn_oid = InvalidOid; + } + } + } +#else + if (IS_STREAM_PLAN || StreamThreadAmI()) { + if (need_adjust_agg_inner_func_type(peragg->aggref)) { + if (!node->is_final && !node->single_node) { + peragg->finalfn_oid = finalfn_oid = InvalidOid; + } + if (aggref->aggstage == 0 && !node->is_final && !node->single_node) { + collectfn_oid = InvalidOid; + } + } + } +#endif /* ENABLE_MULTIPLE_NODES */ + /* Check that aggregate owner has permission to call component fns */ + { + HeapTuple procTuple; + Oid aggOwner; + + procTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(procTuple)) + ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_EXECUTOR), + errmsg("cache lookup failed for aggregate function %u", aggref->aggfnoid))); + aggOwner = ((Form_pg_proc)GETSTRUCT(procTuple))->proowner; + ReleaseSysCache(procTuple); + + aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid)); + if (OidIsValid(finalfn_oid)) { + aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(finalfn_oid)); + } + + if (OidIsValid(collectfn_oid)) { + aclresult = pg_proc_aclcheck(collectfn_oid, aggOwner, ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(collectfn_oid)); + } + } + + /* + * Get the the number of actual arguments and identify the actual + * datatypes of the aggregate inputs (saved in inputTypes). When + * agg accepts ANY or a polymorphic type, the actual datatype + * could be different from the agg's declared input types. + */ + numArguments = get_aggregate_argtypes(aggref, inputTypes, FUNC_MAX_ARGS); + + /* + * When agg accepts ANY or a polymorphic type, resolve actual + * type of transition state + */ + aggtranstype = resolve_aggregate_transtype(aggref->aggfnoid, aggform->aggtranstype, inputTypes, numArguments); + /* Detect how many arguments to pass to the finalfn */ + if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) + peragg->numFinalArgs = numArguments + 1; + else + peragg->numFinalArgs = 1; + + /* Initialize any direct-argument expressions */ + peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs, (PlanState *)aggstate); + /* + * build expression trees using actual argument & result types for the + * finalfn, if it exists + */ + if (OidIsValid(finalfn_oid)) { + build_aggregate_finalfn_expr(inputTypes, peragg->numFinalArgs, aggtranstype, aggref->aggtype, + aggref->inputcollid, finalfn_oid, &finalfnexpr); + fmgr_info(finalfn_oid, &peragg->finalfn); + fmgr_info_set_expr((Node *)finalfnexpr, &peragg->finalfn); + } + + /* get info about the result type's datatype */ + get_typlenbyval(aggref->aggtype, &peragg->resulttypeLen, &peragg->resulttypeByVal); + + if (OidIsValid(collectfn_oid)) { + /* we expect final function expression to be NULL in call to + * build_aggregate_fnexprs below, since InvalidOid is passed for + * finalfn_oid argument. Use a dummy expression to accept that. + */ + Expr *dummyexpr = NULL; + /* + * for XC, we need to setup the collection function expression as well. + * Use build_aggregate_fnexpr() with invalid final function oid, and collection + * function information instead of transition function information. + * We should really be adding this step inside + * build_aggregate_fnexprs() but this way it becomes easy to merge. + */ + build_aggregate_fnexprs(&aggtranstype, 1, aggtranstype, aggref->aggtype, aggref->inputcollid, collectfn_oid, + InvalidOid, &collectfnexpr, &dummyexpr); + Assert(!dummyexpr); + } + + /* + * initval is potentially null, so don't try to access it as a struct + * field. Must do it the hard way with SysCacheGetAttr. + */ + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitval, &initValueIsNull); + + if (initValueIsNull) + initValue = (Datum)0; + else + initValue = GetAggInitVal(textInitVal, aggtranstype); + + /* + * initval for collection function is potentially null, so don't try to + * access it as a struct field. Must do it the hard way with + * SysCacheGetAttr. + */ + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitcollect, &initCollectValueIsNull); + + if (initCollectValueIsNull) { + initCollectValue = (Datum)0; + } else { + initCollectValue = GetAggInitVal(textInitVal, aggtranstype); + } + + /* + * 2. Build working state for invoking the transition function, or + * look up previously initialized working state, if we can share it. + * + * find_compatible_peragg() already collected a list of per-Trans's + * with the same inputs. Check if any of them have the same transition + * function and initial value. + */ + /* for collectfn, always build a new peragg*/ + if (OidIsValid(collectfn_oid) && (aggref->aggstage > 0 || aggstate->is_final)) { + existing_transno = -1; + } else { + existing_transno = find_compatible_pertrans(aggstate, aggref, transfn_oid, aggtranstype, initValue, + initValueIsNull, same_input_transnos); + } + if (existing_transno != -1) { + /* + * Existing compatible trans found, so just point the 'peragg' to + * the same per-trans struct. + */ + pertrans = &pertransstates[existing_transno]; + peragg->transno = existing_transno; + } else { + pertrans = &pertransstates[++transno]; + build_pertrans_for_aggref(pertrans, aggstate, estate, aggref, transfn_oid, aggtranstype, initValue, + initValueIsNull, inputTypes, numArguments); + if (OidIsValid(collectfn_oid)) { + fmgr_info(collectfn_oid, &pertrans->collectfn); + pertrans->collectfn.fn_expr = (Node *)collectfnexpr; + /* init collectfn_fcinfo*/ + InitFunctionCallInfoData(pertrans->collectfn_fcinfo, &pertrans->collectfn, pertrans->numTransInputs + 1, + pertrans->aggCollation, (Node *)aggstate, NULL); + pertrans->initCollectValue = initCollectValue; + pertrans->initCollectValueIsNull = initCollectValueIsNull; + } + peragg->transno = transno; + } + ReleaseSysCache(aggTuple); + } + + /* Update numaggs to match number of unique aggregates found */ + aggstate->numaggs = aggno + 1; + aggstate->numtrans = transno + 1; + + /* + * Last, check whether any more aggregates got added onto the node while + * we processed the expressions for the aggregate arguments (including not + * only the regular arguments and FILTER expressions handled immediately + * above, but any direct arguments we might've handled earlier). If so, + * we have nested aggregate functions, which is semantically nonsensical, + * so complain. (This should have been caught by the parser, so we don't + * need to work hard on a helpful error message; but we defend against it + * here anyway, just to be sure.) + */ + if (numaggrefs != list_length(aggstate->aggs)) + ereport(ERROR, (errcode(ERRCODE_GROUPING_ERROR), errmsg("aggregate function calls cannot be nested"))); + + /* + * Build expressions doing all the transition work at once. We build a + * different one for each phase, as the number of transition function + * invocation can differ between phases. Note this'll work both for + * transition and combination functions (although there'll only be one + * phase in the latter case). + */ + for (int phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) { + AggStatePerPhase phase = &aggstate->phases[phaseidx]; + bool dohash = false; + bool dosort = false; + + /* phase 0 doesn't necessarily exist */ + if (!phase->aggnode) + continue; + + if (phase->aggstrategy == AGG_PLAIN || phase->aggstrategy == AGG_SORTED) { + dohash = false; + dosort = true; + } else if (phase->aggstrategy == AGG_HASHED) { + dohash = true; + dosort = false; + } else + Assert(false); + + phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash); + } +} \ No newline at end of file diff --git a/src/gausskernel/runtime/executor/nodeWindowAgg.cpp b/src/gausskernel/runtime/executor/nodeWindowAgg.cpp index 8d46ac658..1a1983583 100644 --- a/src/gausskernel/runtime/executor/nodeWindowAgg.cpp +++ b/src/gausskernel/runtime/executor/nodeWindowAgg.cpp @@ -1507,23 +1507,25 @@ WindowStatePerAggData* initialize_peragg(WindowAggState* winstate, WindowFunc* w /* resolve actual type of transition state, if polymorphic */ agg_trans_type = resolve_aggregate_transtype(wfunc->winfnoid, aggform->aggtranstype, input_types, num_arguments); - /* build expression trees using actual argument & result types */ - build_trans_aggregate_fnexprs(num_arguments, - 0, /* no ordered-set window functions yet */ - false, - false, /* no variadic window functions yet */ - agg_trans_type, - input_types, - wfunc->wintype, - wfunc->inputcollid, - transfn_oid, - finalfn_oid, - &transfnexpr, - &finalfnexpr); + if (winstate->ss.ps.state->es_is_flt_frame) { + build_aggregate_transfn_expr(input_types, num_arguments, 0, false, wfunc->wintype, wfunc->inputcollid, + transfn_oid, &transfnexpr); + } else { + /* build expression trees using actual argument & result types */ + build_trans_aggregate_fnexprs(num_arguments, 0, false, false, agg_trans_type, input_types, wfunc->wintype, + wfunc->inputcollid, transfn_oid, finalfn_oid, &transfnexpr, &finalfnexpr); + } fmgr_info(transfn_oid, &peraggstate->transfn); fmgr_info_set_expr((Node*)transfnexpr, &peraggstate->transfn); + if (winstate->ss.ps.state->es_is_flt_frame) { + if (OidIsValid(finalfn_oid)) { + build_aggregate_finalfn_expr(input_types, 1, agg_trans_type, wfunc->wintype, wfunc->inputcollid, + finalfn_oid, &finalfnexpr); + } + } + if (OidIsValid(finalfn_oid)) { fmgr_info(finalfn_oid, &peraggstate->finalfn); fmgr_info_set_expr((Node*)finalfnexpr, &peraggstate->finalfn); diff --git a/src/gausskernel/runtime/vecexecutor/vecnode/vecagg.cpp b/src/gausskernel/runtime/vecexecutor/vecnode/vecagg.cpp index ec8524140..d2aa8ba1c 100644 --- a/src/gausskernel/runtime/vecexecutor/vecnode/vecagg.cpp +++ b/src/gausskernel/runtime/vecexecutor/vecnode/vecagg.cpp @@ -56,7 +56,7 @@ #include "vecexecutor/vecsortagg.h" #include "vecexecutor/vechashagg.h" -static void DispatchAggFunction(AggStatePerAgg agg_state, VecAggInfo* agg_info, bool use_sonichash = false); +static void DispatchAggFunction(VecAggStatePerAgg agg_state, VecAggInfo* agg_info, bool use_sonichash = false); extern bool CodeGenThreadObjectReady(); extern bool CodeGenPassThreshold(double rows, int dn_num, int dop); @@ -64,7 +64,7 @@ extern bool CodeGenPassThreshold(double rows, int dn_num, int dop); /* * @Description: set vector agg function. */ -static void DispatchAggFunction(AggStatePerAgg agg_state, VecAggInfo* agg_info, bool use_sonichash) +static void DispatchAggFunction(VecAggStatePerAgg agg_state, VecAggInfo* agg_info, bool use_sonichash) { VecFuncCacheEntry* entry = NULL; Oid aggfnoid = agg_state->aggref->aggfnoid; @@ -103,7 +103,7 @@ static void DispatchAggFunction(AggStatePerAgg agg_state, VecAggInfo* agg_info, VecAggState* ExecInitVecAggregation(VecAgg* node, EState* estate, int eflags) { VecAggState* aggstate = NULL; - AggStatePerAgg peragg; + VecAggStatePerAgg peragg; Plan* outer_plan = NULL; int numaggs, aggno; ListCell* l = NULL; @@ -129,7 +129,7 @@ VecAggState* ExecInitVecAggregation(VecAgg* node, EState* estate, int eflags) aggstate->numaggs = 0; aggstate->eqfunctions = NULL; aggstate->hashfunctions = NULL; - aggstate->peragg = NULL; + aggstate->pervecagg = NULL; aggstate->agg_done = false; aggstate->pergroup = NULL; aggstate->grp_firstTuple = NULL; @@ -343,8 +343,8 @@ VecAggState* ExecInitVecAggregation(VecAgg* node, EState* estate, int eflags) * Set up aggregate-result storage in the output expr context, and also * allocate my private per-agg working storage */ - peragg = (AggStatePerAgg)palloc0(sizeof(AggStatePerAggData) * numaggs * num_grp_sets); - aggstate->peragg = peragg; + peragg = (VecAggStatePerAgg)palloc0(sizeof(VecAggStatePerAggData) * numaggs * num_grp_sets); + aggstate->pervecagg = peragg; /* Compute the columns we actually need to hash on */ aggstate->hash_needed = find_hash_columns(aggstate); @@ -374,7 +374,7 @@ VecAggState* ExecInitVecAggregation(VecAgg* node, EState* estate, int eflags) foreach (l, aggstate->aggs) { AggrefExprState* aggrefstate = (AggrefExprState*)lfirst(l); Aggref* aggref = (Aggref*)aggrefstate->xprstate.expr; - AggStatePerAgg peraggstate; + VecAggStatePerAgg peraggstate; Oid input_types[FUNC_MAX_ARGS]; int num_arguments; int num_inputs; @@ -667,7 +667,7 @@ VecAggState* ExecInitVecAggregation(VecAgg* node, EState* estate, int eflags) DBG_ASSERT(idx >= 0); bool use_sonichash = (node->aggstrategy == AGG_HASHED && node->is_sonichash); - DispatchAggFunction(&aggstate->peragg[aggno], &aggstate->aggInfo[idx], use_sonichash); + DispatchAggFunction(&aggstate->pervecagg[aggno], &aggstate->aggInfo[idx], use_sonichash); /* Initialize the function call parameter struct as well */ InitFunctionCallInfoData(aggstate->aggInfo[idx].vec_agg_function, @@ -1194,7 +1194,7 @@ void BaseAggRunner::BatchAggregation(VectorBatch* batch) for (i = 0; i < m_aggNum; i++) { VectorBatch* p_batch = NULL; ScalarVector* p_vector = NULL; - AggStatePerAgg per_agg_state = &m_runtime->peragg[m_aggNum - 1 - i]; + VecAggStatePerAgg per_agg_state = &m_runtime->pervecagg[m_aggNum - 1 - i]; ExprContext* econtext = NULL; /* count(*) per_agg_state->evalproj is null. */ @@ -1360,8 +1360,8 @@ VectorBatch* BaseAggRunner::ProducerBatch() void BaseAggRunner::initialize_sortstate(int work_mem, int max_mem, int plan_id, int dop) { int agg_no; - AggStatePerAgg per_agg = m_runtime->peragg; - AggStatePerAgg per_agg_state = NULL; + VecAggStatePerAgg per_agg = m_runtime->pervecagg; + VecAggStatePerAgg per_agg_state = NULL; for (agg_no = 0; agg_no < m_aggNum; agg_no++) { per_agg_state = &per_agg[agg_no]; @@ -1482,11 +1482,11 @@ void BaseAggRunner::BatchSortAggregation(int curr_set, int work_mem, int max_mem tmp_loc[k] = m_sortDistinct[curr_set].m_distinctLoc; } - AggStatePerAgg peragg = m_runtime->peragg; + AggStatePerAgg peragg = m_runtime->pervecagg; for (i = 0; i < m_aggNum; i++) { aggno = m_aggNum - 1 - i; first = true; - AggStatePerAgg peragg_stat = &peragg[aggno]; + VecAggStatePerAgg peragg_stat = &peragg[aggno]; if (peragg_stat->numSortCols > 0) { batchsort_performsort(sort_stat[aggno]); batchsort_getbatch(sort_stat[aggno], true, agg_batch[aggno]); @@ -1554,7 +1554,7 @@ void BaseAggRunner::AppendBatchForSortAgg(VectorBatch* batch, int start, int end for (i = 0; i < m_aggNum; i++) { aggno = m_aggNum - 1 - i; - AggStatePerAgg peragg_stat = &m_runtime->peragg[aggno]; + AggStatePerAgg peragg_stat = &m_runtime->pervecagg[aggno]; if (peragg_stat->numSortCols > 0) { ExprContext* econtext = NULL; @@ -1634,7 +1634,7 @@ void BaseAggRunner::BatchNoSortAgg(VectorBatch* batch) for (i = 0; i < m_aggNum; i++) { aggno = m_aggNum - 1 - i; - AggStatePerAgg peragg_stat = &m_runtime->peragg[aggno]; + AggStatePerAgg peragg_stat = &m_runtime->pervecagg[aggno]; if (peragg_stat->numSortCols <= 0) { VectorBatch* p_batch = NULL; diff --git a/src/gausskernel/runtime/vecexecutor/vectorsonic/vsonichashagg.cpp b/src/gausskernel/runtime/vecexecutor/vectorsonic/vsonichashagg.cpp index 7b0bd1d15..b0972b2c4 100644 --- a/src/gausskernel/runtime/vecexecutor/vectorsonic/vsonichashagg.cpp +++ b/src/gausskernel/runtime/vecexecutor/vectorsonic/vsonichashagg.cpp @@ -2239,7 +2239,7 @@ void SonicHashAgg::BatchAggregation(VectorBatch* batch) for (i = 0; i < m_aggNum; i++) { VectorBatch* pBatch = NULL; ScalarVector* pVector = NULL; - AggStatePerAgg peraggstate = &m_runtime->peragg[m_aggNum - 1 - i]; + VecAggStatePerAgg peraggstate = &m_runtime->pervecagg[m_aggNum - 1 - i]; ExprContext* econtext = NULL; /* for count(*), peraggstate->evalproj is null. */ diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 6f9bf4fb1..3cbb8031c 100755 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -306,6 +306,7 @@ extern ExprState* ExecInitExpr(Expr* node, PlanState* parent); extern ExprState *ExecInitQual(List *qual, PlanState *parent); extern ExprState *ExecInitCheck(List *qual, PlanState *parent); extern List *ExecInitExprList(List *nodes, PlanState *parent); +extern ExprState* ExecBuildAggTrans(AggState* aggstate, struct AggStatePerPhaseData *phase, bool doSort, bool doHash); extern ProjectionInfo* ExecBuildProjectionInfo(List *targetList, ExprContext *econtext, TupleTableSlot *slot, PlanState *parent, TupleDesc inputDesc); extern ExprState* ExecPrepareExpr(Expr* node, EState* estate); diff --git a/src/include/executor/node/nodeAgg.h b/src/include/executor/node/nodeAgg.h index f1276a45c..eb1bad51d 100644 --- a/src/include/executor/node/nodeAgg.h +++ b/src/include/executor/node/nodeAgg.h @@ -196,6 +196,201 @@ typedef struct AggStatePerAggData { TupleTableSlot *evalslot; /* current input tuple */ } AggStatePerAggData; +/* + * AggStatePerTransData - per aggregate state value information + * + * Working state for updating the aggregate's state value, by calling the + * transition function with an input row. This struct does not store the + * information needed to produce the final aggregate result from the transition + * state, that's stored in AggStatePerAggForFlattenedExprData instead. This separation allows + * multiple aggregate results to be produced from a single state value. + */ +typedef struct AggStatePerTransData { + Aggref *aggref; + + /* number of input arguments for aggregate function proper */ + int numArguments; + + /* number of inputs including ORDER BY expressions */ + int numInputs; + + /* offset of input columns in AggState->evalslot */ + int inputoff; + + bool is_avg; + + /* + * Number of aggregated input columns to pass to the transfn. This + * includes the ORDER BY columns for ordered-set aggs, but not for plain + * aggs. (This doesn't count the transition state value!) + */ + int numTransInputs; + + /* Oid of the state transition function */ + Oid transfn_oid; + /* Oid of state value's datatype */ + Oid aggtranstype; + + /* ExprStates of the FILTER and argument expressions. */ + ExprState *aggfilter; /* state of FILTER expression, if any */ + List *args; /* states of aggregated-argument expressions */ + List *aggdirectargs; /* states of direct-argument expressions */ + + /* + * fmgr lookup data for transition function. Note in particular that the + * fn_strict flag is kept here. + */ + FmgrInfo transfn; +#ifdef PGXC + FmgrInfo collectfn; +#endif /* PGXC */ + + /* Input collation derived for aggregate */ + Oid aggCollation; + + /* number of sorting columns */ + int numSortCols; + + /* number of sorting columns to consider in DISTINCT comparisons */ + /* (this is either zero or the same as numSortCols) */ + int numDistinctCols; + + /* deconstructed sorting information (arrays of length numSortCols) */ + AttrNumber *sortColIdx; + Oid *sortOperators; + Oid *sortCollations; + bool *sortNullsFirst; + + /* + * fmgr lookup data for input columns' equality operators --- only + * set/used when aggregate has DISTINCT flag. Note that these are in + * order of sort column index, not parameter index. + */ + FmgrInfo *equalfns; /* array of length numDistinctCols */ + + /* + * initial value from pg_aggregate entry + */ + Datum initValue; + bool initValueIsNull; +#ifdef PGXC + Datum initCollectValue; + bool initCollectValueIsNull; +#endif /* PGXC */ + + /* + * We need the len and byval info for the agg's input, result, and + * transition data types in order to know how to copy/delete values. + * + * Note that the info for the input type is used only when handling + * DISTINCT aggs with just one argument, so there is only one input type. + */ + int16 inputtypeLen, resulttypeLen, transtypeLen; + bool inputtypeByVal, resulttypeByVal, transtypeByVal; + + /* + * Stuff for evaluation of aggregate inputs in cases where the aggregate + * requires sorted input. The arguments themselves will be evaluated via + * AggState->evalslot/evalproj for all aggregates at once, but we only + * want to sort the relevant columns for individual aggregates. + */ + TupleDesc sortdesc; /* descriptor of input tuples */ + + /* + * Slots for holding the evaluated input arguments. These are set up + * during ExecInitAgg() and then used for each input row requiring + * procesessing besides what's done in AggState->evalproj. + */ + TupleTableSlot *sortslot; /* current input tuple */ + TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */ + + /* + * These values are working state that is initialized at the start of an + * input tuple group and updated for each input tuple. + * + * For a simple (non DISTINCT/ORDER BY) aggregate, we just feed the input + * values straight to the transition function. If it's DISTINCT or + * requires ORDER BY, we pass the input values into a Tuplesort object; + * then at completion of the input tuple group, we scan the sorted values, + * eliminate duplicates if needed, and run the transition function on the + * rest. + */ + + Tuplesortstate **sortstates; /* sort object, if DISTINCT or ORDER BY */ + Tuplesortstate *sortstate; /* sort object, if DISTINCT or ORDER BY */ + + /* + * This field is a pre-initialized FunctionCallInfo struct used for + * calling this aggregate's transfn. We save a few cycles per row by not + * re-initializing the unchanging fields; which isn't much, but it seems + * worth the extra space consumption. + */ + FunctionCallInfoData transfn_fcinfo; + + FunctionCallInfoData collectfn_fcinfo; + + /* XXX: use for vector engine now, better remove later*/ + TupleDesc evaldesc; /* descriptor of input tuples */ + ProjectionInfo *evalproj; /* projection machinery */ + TupleTableSlot *evalslot; /* current input tuple */ +} AggStatePerTransData; + +/* + * AggStatePerAggForFlattenedExprData - per-aggregate information + * + * This contains the information needed to call the final function, to produce + * a final aggregate result from the state value. If there are multiple + * identical Aggrefs in the query, they can all share the same per-agg data. + * + * These values are set up during ExecInitAgg() and do not change thereafter. + */ +typedef struct AggStatePerAggForFlattenedExprData { + /* + * Link to an Aggref expr this state value is for. + * + * There can be multiple identical Aggref's sharing the same per-agg. This + * points to the first one of them. + */ + Aggref *aggref; + + /* index to the state value which this agg should use */ + int transno; + + /* Optional Oid of final function (may be InvalidOid) */ + Oid finalfn_oid; + + /* + * fmgr lookup data for final function --- only valid when finalfn_oid oid + * is not InvalidOid. + */ + FmgrInfo finalfn; +#ifdef PGXC + FmgrInfo collectfn; +#endif /* PGXC */ + + /* ExprStates for any direct-argument expressions */ + List *aggdirectargs; + + /* + * Number of arguments to pass to the finalfn. This is always at least 1 + * (the transition state value) plus any ordered-set direct args. If the + * finalfn wants extra args then we pass nulls corresponding to the + * aggregated input columns. + */ + int numFinalArgs; + + /* + * We need the len and byval info for the agg's result data type in order + * to know how to copy/delete values. + */ + int16 resulttypeLen; + bool resulttypeByVal; +#ifdef PGXC + bool is_avg; +#endif /* PGXC */ + +} AggStatePerAggForFlattenedExprData; + /* * AggStatePerPhaseData - per-grouping-set-phase state * @@ -214,6 +409,8 @@ typedef struct AggStatePerPhaseData { FmgrInfo* eqfunctions; /* per-grouping-field equality fns */ Agg* aggnode; /* Agg node for phase data */ Sort* sortnode; /* Sort node for input ordering for phase */ + AggStrategy aggstrategy; /* strategy mode */ + ExprState *evaltrans; /* evaluation of transition functions */ } AggStatePerPhaseData; /* diff --git a/src/include/nodes/execExpr.h b/src/include/nodes/execExpr.h index 5d6560d43..0dfcf6c44 100644 --- a/src/include/nodes/execExpr.h +++ b/src/include/nodes/execExpr.h @@ -228,6 +228,21 @@ typedef enum ExprEvalOp EEOP_PREFIX_BTYEA, EEOP_PREFIX_TEXT, + /* aggregation related nodes */ + EEOP_AGG_STRICT_DESERIALIZE, + EEOP_AGG_DESERIALIZE, + EEOP_AGG_STRICT_INPUT_CHECK, + EEOP_AGG_INIT_TRANS, + EEOP_AGG_COLLECT_INIT_TRANS, + EEOP_AGG_STRICT_TRANS_CHECK, + EEOP_AGG_COLLECT_STRICT_TRANS_CHECK, + EEOP_AGG_PLAIN_TRANS_BYVAL, + EEOP_AGG_COLLECT_PLAIN_TRANS_BYVAL, + EEOP_AGG_PLAIN_TRANS, + EEOP_AGG_COLLECT_PLAIN_TRANS, + EEOP_AGG_ORDERED_TRANS_DATUM, + EEOP_AGG_ORDERED_TRANS_TUPLE, + /* non-existent operation, used e.g. to check array lengths */ EEOP_LAST } ExprEvalOp; @@ -615,7 +630,50 @@ typedef struct ExprEvalStep List* typeOids; /* type oid list of var for filter */ } hash_filter; - } d; + /* for EEOP_AGG_*DESERIALIZE */ + struct { + AggState *aggstate; + FunctionCallInfo fcinfo_data; + int jumpnull; + } agg_deserialize; + + /* for EEOP_AGG_STRICT_INPUT_CHECK */ + struct { + bool *nulls; + int nargs; + int jumpnull; + } agg_strict_input_check; + + /* for EEOP_AGG_INIT_TRANS */ + struct { + AggState *aggstate; + AggStatePerTrans pertrans; + MemoryContext aggcontext; + int setno; + int transno; + int setoff; + int jumpnull; + } agg_init_trans; + + /* for EEOP_AGG_STRICT_TRANS_CHECK */ + struct { + AggState *aggstate; + int setno; + int transno; + int setoff; + int jumpnull; + } agg_strict_trans_check; + + /* for EEOP_AGG_{PLAIN,ORDERED}_TRANS* */ + struct { + AggState *aggstate; + AggStatePerTrans pertrans; + MemoryContext aggcontext; + int setno; + int transno; + int setoff; + } agg_trans; + } d; } ExprEvalStep; @@ -706,5 +764,12 @@ extern void ExecEvalHashFilter(ExprState *state, ExprEvalStep *op, ExprContext *econtext); extern void ExecEvalWholeRowVar(ExprState *state, ExprEvalStep *op, ExprContext *econtext); - +extern Datum ExecAggTransReparent(AggState *aggstate, AggStatePerTrans pertrans, Datum newValue, bool newValueIsNull, + Datum oldValue, bool oldValueIsNull); +extern void ExecAggInitGroup(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroup, + MemoryContext aggcontext); +extern void ExecAggInitCollectGroup(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroup, + MemoryContext aggcontext); +extern void ExecEvalAggOrderedTransDatum(ExprState *state, ExprEvalStep *op, ExprContext *econtext); +extern void ExecEvalAggOrderedTransTuple(ExprState *state, ExprEvalStep *op, ExprContext *econtext); #endif /* EXEC_EXPR_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 5954bcdb9..6b1107fc3 100755 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2440,7 +2440,8 @@ typedef struct GroupState { */ /* these structs are private in nodeAgg.c: */ typedef struct AggStatePerAggData* AggStatePerAgg; -typedef struct AggStatePerTransData *AggStatePerTrans; +typedef struct AggStatePerAggForFlattenedExprData* AggStatePerAggForFlattenedExpr; +typedef struct AggStatePerTransData* AggStatePerTrans; typedef struct AggStatePerGroupData* AggStatePerGroup; typedef struct AggStatePerPhaseData* AggStatePerPhase; @@ -2486,6 +2487,16 @@ typedef struct AggState { TupleTableSlot *evalslot; /* slot for agg inputs */ ProjectionInfo *evalproj; /* projection machinery */ TupleDesc evaldesc; /* descriptor of input tuples */ + + int numtrans; /* number of pertrans items */ + AggStrategy aggstrategy; /* strategy mode */ + AggStatePerAggForFlattenedExpr peragg_flattened; /* per-Aggref information for flattened expression*/ + AggStatePerTrans pertrans; /* per-Trans state information */ + MemoryContext curaggcontext; /* currently active aggcontext */ + AggStatePerTrans curpertrans; /* currently active trans state */ + int num_hashes; + AggStatePerGroup hash_pergroup; /* grouping set indexed array of* per-group pointers */ + AggStatePerGroup all_pergroups; /* array of first ->pergroups, than * ->hash_pergroup */ } AggState; /* ---------------- diff --git a/src/include/parser/parse_agg.h b/src/include/parser/parse_agg.h index e13bbbc8f..38fc618ed 100644 --- a/src/include/parser/parse_agg.h +++ b/src/include/parser/parse_agg.h @@ -43,4 +43,12 @@ extern int get_aggregate_argtypes(Aggref* aggref, Oid* inputTypes, int func_max_ extern Oid resolve_aggregate_transtype(Oid aggfuncid, Oid aggtranstype, Oid* inputTypes, int numArguments); +extern void build_aggregate_transfn_expr(Oid *agg_input_types, int agg_num_inputs, int agg_num_direct_inputs, + bool agg_variadic, Oid agg_state_type, Oid agg_input_collation, + Oid transfn_oid, Expr **transfnexpr); + +extern void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, + Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, + Expr **finalfnexpr); + #endif /* PARSE_AGG_H */ diff --git a/src/include/utils/plpgsql.h b/src/include/utils/plpgsql.h index e329af9bf..0d0b7d1fd 100644 --- a/src/include/utils/plpgsql.h +++ b/src/include/utils/plpgsql.h @@ -1084,6 +1084,7 @@ typedef struct PLpgSQL_execstate { /* Runtime execution data */ Datum retval; bool retisnull; + bool is_flt_frame; /* Indicates whether it is a flattened expr frame */ Oid rettype; /* type of current retval */ Datum paramval; bool paramisnull; diff --git a/src/include/vecexecutor/vecnodes.h b/src/include/vecexecutor/vecnodes.h index 7918e7009..3950c6ca1 100644 --- a/src/include/vecexecutor/vecnodes.h +++ b/src/include/vecexecutor/vecnodes.h @@ -92,9 +92,14 @@ typedef struct VecAggInfo { PGFunction* vec_agg_final; } VecAggInfo; +typedef AggStatePerAggData VecAggStatePerAggData; +typedef VecAggStatePerAggData* VecAggStatePerAgg; + typedef struct VecAggState : public AggState { void* aggRun; + VecAggStatePerAgg pervecagg; + VecAggInfo* aggInfo; char* jitted_hashing; /* LLVM function pointer to point to * the codegened hashing function */