agg expr step reduce

This commit is contained in:
cailei19
2023-03-16 23:22:48 -07:00
parent 3d3de895c6
commit 35b92aea89
9 changed files with 661 additions and 517 deletions

View File

@ -192,11 +192,13 @@ static void finalize_aggregates_flattened(AggState *aggstate, AggStatePerAggForF
AggStatePerGroup pergroup, int currentSet);
static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggsate, EState *estate, Aggref *aggref,
Oid aggtransfn, Oid aggtranstype, Datum initValue, bool initValueIsNull,
Oid *inputTypes, int numArguments);
Oid *inputTypes, int numArguments, bool isInitNumericSum);
static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, int lastaggno, List **same_input_transnos);
static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, Oid aggtransfn, Oid aggtranstype,
Datum initValue, bool initValueIsNull, List *possible_matches);
static int find_compatible_pertrans(AggState *aggstate, Oid aggfnOid, Oid *aggtransfnOid, Oid *aggtranstype,
Datum initValue, bool *initValueIsNull, List *possible_matches);
static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *estate);
static void exec_agg_finalfn_init(AggState *aggstate, Agg *node, AggStatePerAggForFlattenedExpr peragg, AggStatePerTrans pertrans,
Oid *input_types, int num_arguments);
/*
* Switch to phase "newphase", which must either be 0 (to reset) or
@ -3075,10 +3077,8 @@ static void advance_transition_function_flattened(AggState *aggstate, AggStatePe
AggStatePerGroup pergroupstate)
{
FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
int numTransInputs = pertrans->numTransInputs;
MemoryContext oldContext;
Datum newVal;
int i;
if (pertrans->transfn.fn_strict) {
int numTransInputs = pertrans->numTransInputs;
@ -3368,7 +3368,7 @@ static void finalize_aggregates_flattened(AggState *aggstate, AggStatePerAggForF
static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, Aggref *aggref,
Oid aggtransfn, Oid aggtranstype, Datum initValue, bool initValueIsNull,
Oid *inputTypes, int numArguments)
Oid *inputTypes, int numArguments, bool isInitNumericSum)
{
int numGroupingSets = Max(aggstate->maxsets, 1);
Expr *transfnexpr;
@ -3385,8 +3385,6 @@ static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggst
pertrans->transfn_oid = aggtransfn;
pertrans->initValue = initValue;
pertrans->initValueIsNull = initValueIsNull;
numDirectArgs = list_length(aggref->aggdirectargs);
pertrans->numInputs = numInputs = list_length(aggref->args);
pertrans->aggtranstype = aggtranstype;
@ -3396,6 +3394,13 @@ static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggst
pertrans->numTransInputs = numArguments;
}
/* init tranfn (except INT8SUMFUNCOID、NUMERICSUMFUNCOID) */
if ((aggref->aggfnoid == INT8SUMFUNCOID || aggref->aggfnoid == NUMERICSUMFUNCOID) &&
!isInitNumericSum) {
return;
}
numDirectArgs = list_length(aggref->aggdirectargs);
build_aggregate_transfn_expr(inputTypes, numArguments, numDirectArgs, aggref->aggvariadic, aggtranstype,
aggref->inputcollid, aggtransfn, &transfnexpr);
fmgr_info(aggtransfn, &pertrans->transfn);
@ -3521,29 +3526,60 @@ static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, int lastag
return -1;
}
static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, Oid aggtransfn, Oid aggtranstype,
Datum initValue, bool initValueIsNull, List *transnos)
static int find_compatible_pertrans(AggState *aggstate, Oid aggfnOid, Oid *aggtransfnOid, Oid *aggtranstype,
Datum initValue, bool *initValueIsNull, List *possible_matches)
{
ListCell *lc;
int result = -1;
Oid newtransfnOid = *aggtransfnOid;
Oid newaggtranstype = *aggtranstype;
bool newinitValueIsNull = *initValueIsNull;
foreach (lc, transnos) {
#ifndef ENABLE_MULTIPLE_NODES
numeric_transfn_info_change(aggfnOid, &newtransfnOid, &newaggtranstype);
newinitValueIsNull = numeric_agg_trans_initvalisnull(newtransfnOid, newinitValueIsNull);
#endif
foreach (lc, possible_matches) {
int transno = lfirst_int(lc);
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
Oid newtransfnOidtemp = pertrans->transfn_oid;
Oid newaggtranstypetemp = pertrans->aggtranstype;
bool newinitValueIsNulltemp = pertrans->initValueIsNull;
if (aggtransfn != pertrans->transfn_oid || aggtranstype != pertrans->aggtranstype) {
#ifndef ENABLE_MULTIPLE_NODES
Oid aggfnOidtemp = pertrans->aggref->aggfnoid;
if (aggfnOidtemp == INT8SUMFUNCOID || aggfnOidtemp == NUMERICSUMFUNCOID) {
numeric_transfn_info_change(aggfnOidtemp, &newtransfnOidtemp, &newaggtranstypetemp);
newinitValueIsNulltemp = numeric_agg_trans_initvalisnull(newtransfnOidtemp, newinitValueIsNulltemp);
}
#endif
if (newtransfnOid != newtransfnOidtemp || newaggtranstype != newaggtranstypetemp) {
continue;
}
if (initValueIsNull && pertrans->initValueIsNull) {
return transno;
if (newinitValueIsNull && newinitValueIsNulltemp) {
result = transno;
break;
}
if (!initValueIsNull && !pertrans->initValueIsNull &&
if (!newinitValueIsNull && !newinitValueIsNulltemp &&
datumIsEqual(initValue, pertrans->initValue, pertrans->transtypeByVal, pertrans->transtypeLen)) {
return transno;
result = transno;
break;
}
}
return -1;
#ifndef ENABLE_MULTIPLE_NODES
if (aggfnOid != INT8SUMFUNCOID && aggfnOid != NUMERICSUMFUNCOID) {
*aggtransfnOid = newtransfnOid;
*aggtranstype = newaggtranstype;
*initValueIsNull = newinitValueIsNull;
}
#endif
return result;
}
static void exec_lookups_agg(AggState *aggstate, Agg *node, EState *estate)
@ -3944,6 +3980,99 @@ static void exec_lookups_agg(AggState *aggstate, Agg *node, EState *estate)
return;
}
static void exec_agg_finalfn_init(AggState *aggstate, Agg *node, AggStatePerAggForFlattenedExpr peragg, AggStatePerTrans pertrans,
Oid *input_types, int num_arguments)
{
AclResult aclresult;
Oid finalfn_oid = peragg->finalfn_oid;
Oid collectfn_oid = peragg->collectfn_oid;
Oid transfn_oid = pertrans->transfn_oid;
Aggref *aggref = pertrans->aggref;
Oid aggtranstype = pertrans->aggtranstype;
Expr *finalfnexpr = NULL;
peragg->is_avg = false;
if (finalfn_oid == 1830) {
peragg->is_avg = true;
}
#ifdef ENABLE_MULTIPLE_NODES
/*
* For PGXC final and collection functions are used to combine results at Coordinator,
* disable those for Datanode
*/
if (IS_PGXC_DATANODE) {
if (!u_sess->exec_cxt.under_stream_runtime) {
peragg->finalfn_oid = finalfn_oid = InvalidOid;
collectfn_oid = InvalidOid;
} else {
if (need_adjust_agg_inner_func_type(peraggstate->aggref)) {
if (!node->is_final && !node->single_node)
peragg->finalfn_oid = finalfn_oid = InvalidOid;
if (aggref->aggstage == 0 && !node->is_final && !node->single_node)
collectfn_oid = InvalidOid;
}
}
}
#else
if (IS_STREAM_PLAN || StreamThreadAmI()) {
if (need_adjust_agg_inner_func_type(peragg->aggref)) {
if (!node->is_final && !node->single_node) {
peragg->finalfn_oid = finalfn_oid = InvalidOid;
}
if (aggref->aggstage == 0 && !node->is_final && !node->single_node) {
collectfn_oid = InvalidOid;
}
}
}
#endif /* ENABLE_MULTIPLE_NODES */
/* Check that aggregate owner has permission to call component fns */
{
HeapTuple procTuple;
Oid aggOwner;
procTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid));
if (!HeapTupleIsValid(procTuple))
ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_EXECUTOR),
errmsg("cache lookup failed for aggregate function %u", aggref->aggfnoid)));
aggOwner = ((Form_pg_proc)GETSTRUCT(procTuple))->proowner;
ReleaseSysCache(procTuple);
aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid));
if (OidIsValid(finalfn_oid)) {
aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(finalfn_oid));
}
if (OidIsValid(collectfn_oid)) {
aclresult = pg_proc_aclcheck(collectfn_oid, aggOwner, ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(collectfn_oid));
}
}
if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) {
peragg->numFinalArgs = num_arguments + 1;
} else {
peragg->numFinalArgs = 1;
}
/* Initialize any direct-argument expressions */
peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs, (PlanState *)aggstate);
/*
* build expression trees using actual argument & result types for the
* finalfn, if it exists
*/
if (OidIsValid(finalfn_oid)) {
build_aggregate_finalfn_expr(input_types, peragg->numFinalArgs, aggtranstype, aggref->aggtype,
aggref->inputcollid, finalfn_oid, &finalfnexpr);
fmgr_info(finalfn_oid, &peragg->finalfn);
fmgr_info_set_expr((Node *)finalfnexpr, &peragg->finalfn);
}
}
static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *estate)
{
int aggno = -1;
@ -4006,10 +4135,9 @@ static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *es
HeapTuple aggTuple;
Form_pg_aggregate aggform;
AclResult aclresult;
Oid transfn_oid, finalfn_oid;
Oid transfn_oid;
Oid collectfn_oid;
Expr *collectfnexpr = NULL;
Expr *finalfnexpr = NULL;
Oid aggtranstype;
Datum textInitVal;
@ -4051,103 +4179,9 @@ static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *es
aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(aggref->aggfnoid));
transfn_oid = aggform->aggtransfn;
peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
collectfn_oid = aggform->aggcollectfn;
peragg->is_avg = false;
if (finalfn_oid == 1830) {
peragg->is_avg = true;
}
#ifdef ENABLE_MULTIPLE_NODES
/*
* For PGXC final and collection functions are used to combine results at Coordinator,
* disable those for Datanode
*/
if (IS_PGXC_DATANODE) {
if (!u_sess->exec_cxt.under_stream_runtime) {
peragg->finalfn_oid = finalfn_oid = InvalidOid;
collectfn_oid = InvalidOid;
} else {
if (need_adjust_agg_inner_func_type(peraggstate->aggref)) {
if (!node->is_final && !node->single_node)
peragg->finalfn_oid = finalfn_oid = InvalidOid;
if (aggref->aggstage == 0 && !node->is_final && !node->single_node)
collectfn_oid = InvalidOid;
}
}
}
#else
if (IS_STREAM_PLAN || StreamThreadAmI()) {
if (need_adjust_agg_inner_func_type(peragg->aggref)) {
if (!node->is_final && !node->single_node) {
peragg->finalfn_oid = finalfn_oid = InvalidOid;
}
if (aggref->aggstage == 0 && !node->is_final && !node->single_node) {
collectfn_oid = InvalidOid;
}
}
}
#endif /* ENABLE_MULTIPLE_NODES */
/* Check that aggregate owner has permission to call component fns */
{
HeapTuple procTuple;
Oid aggOwner;
procTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid));
if (!HeapTupleIsValid(procTuple))
ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_EXECUTOR),
errmsg("cache lookup failed for aggregate function %u", aggref->aggfnoid)));
aggOwner = ((Form_pg_proc)GETSTRUCT(procTuple))->proowner;
ReleaseSysCache(procTuple);
aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid));
if (OidIsValid(finalfn_oid)) {
aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(finalfn_oid));
}
if (OidIsValid(collectfn_oid)) {
aclresult = pg_proc_aclcheck(collectfn_oid, aggOwner, ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(collectfn_oid));
}
}
/*
* Get the the number of actual arguments and identify the actual
* datatypes of the aggregate inputs (saved in inputTypes). When
* agg accepts ANY or a polymorphic type, the actual datatype
* could be different from the agg's declared input types.
*/
numArguments = get_aggregate_argtypes(aggref, inputTypes, FUNC_MAX_ARGS);
/*
* When agg accepts ANY or a polymorphic type, resolve actual
* type of transition state
*/
aggtranstype = resolve_aggregate_transtype(aggref->aggfnoid, aggform->aggtranstype, inputTypes, numArguments);
/* Detect how many arguments to pass to the finalfn */
if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
peragg->numFinalArgs = numArguments + 1;
else
peragg->numFinalArgs = 1;
/* Initialize any direct-argument expressions */
peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs, (PlanState *)aggstate);
/*
* build expression trees using actual argument & result types for the
* finalfn, if it exists
*/
if (OidIsValid(finalfn_oid)) {
build_aggregate_finalfn_expr(inputTypes, peragg->numFinalArgs, aggtranstype, aggref->aggtype,
aggref->inputcollid, finalfn_oid, &finalfnexpr);
fmgr_info(finalfn_oid, &peragg->finalfn);
fmgr_info_set_expr((Node *)finalfnexpr, &peragg->finalfn);
}
aggtranstype = aggform->aggtranstype;
peragg->finalfn_oid = aggform->aggfinalfn;
peragg->collectfn_oid = collectfn_oid = aggform->aggcollectfn;
/* get info about the result type's datatype */
get_typlenbyval(aggref->aggtype, &peragg->resulttypeLen, &peragg->resulttypeByVal);
@ -4206,9 +4240,23 @@ static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *es
if (OidIsValid(collectfn_oid) && (aggref->aggstage > 0 || aggstate->is_final)) {
existing_transno = -1;
} else {
existing_transno = find_compatible_pertrans(aggstate, aggref, transfn_oid, aggtranstype, initValue,
initValueIsNull, same_input_transnos);
existing_transno = find_compatible_pertrans(aggstate, aggref->aggfnoid, &transfn_oid, &aggtranstype, initValue,
&initValueIsNull, same_input_transnos);
}
/*
* Get the the number of actual arguments and identify the actual
* datatypes of the aggregate inputs (saved in inputTypes). When
* agg accepts ANY or a polymorphic type, the actual datatype
* could be different from the agg's declared input types.
*/
numArguments = get_aggregate_argtypes(aggref, inputTypes, FUNC_MAX_ARGS);
/*
* When agg accepts ANY or a polymorphic type, resolve actual
* type of transition state
*/
aggtranstype = resolve_aggregate_transtype(aggref->aggfnoid, aggtranstype, inputTypes, numArguments);
if (existing_transno != -1) {
/*
* Existing compatible trans found, so just point the 'peragg' to
@ -4219,7 +4267,7 @@ static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *es
} else {
pertrans = &pertransstates[++transno];
build_pertrans_for_aggref(pertrans, aggstate, estate, aggref, transfn_oid, aggtranstype, initValue,
initValueIsNull, inputTypes, numArguments);
initValueIsNull, inputTypes, numArguments, false);
if (OidIsValid(collectfn_oid)) {
fmgr_info(collectfn_oid, &pertrans->collectfn);
pertrans->collectfn.fn_expr = (Node *)collectfnexpr;
@ -4231,6 +4279,14 @@ static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *es
}
peragg->transno = transno;
}
/* init final (except INT8SUMFUNCOID、NUMERICSUMFUNCOID) */
if (aggref->aggfnoid != INT8SUMFUNCOID && aggref->aggfnoid != NUMERICSUMFUNCOID) {
#ifndef ENABLE_MULTIPLE_NODES
numeric_finalfn_info_change(aggref->aggfnoid, &peragg->finalfn_oid);
#endif
exec_agg_finalfn_init(aggstate, node, peragg, pertrans, inputTypes, numArguments);
}
ReleaseSysCache(aggTuple);
}
@ -4251,6 +4307,32 @@ static void exec_lookups_agg_flattened(AggState *aggstate, Agg *node, EState *es
if (numaggrefs != list_length(aggstate->aggs))
ereport(ERROR, (errcode(ERRCODE_GROUPING_ERROR), errmsg("aggregate function calls cannot be nested")));
/* INT8SUMFUNCOID and NUMERICSUMFUNCOID init */
for (int i = 0; i < aggstate->numaggs; i++) {
Oid inputTypes[FUNC_MAX_ARGS];
int numArguments;
AggStatePerAggForFlattenedExpr peragg = &aggstate->peragg_flattened[i];
AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
Oid aggfnoid = peragg->aggref->aggfnoid;
if (aggfnoid == INT8SUMFUNCOID || aggfnoid == NUMERICSUMFUNCOID) {
#ifndef ENABLE_MULTIPLE_NODES
if (aggstate->numtrans < aggstate->numaggs) {
numeric_transfn_info_change(aggfnoid, &pertrans->transfn_oid, &pertrans->aggtranstype);
pertrans->initValueIsNull = numeric_agg_trans_initvalisnull(pertrans->transfn_oid,
pertrans->initValueIsNull);
numeric_finalfn_info_change(aggfnoid, &peragg->finalfn_oid);
}
#endif
numArguments = get_aggregate_argtypes(peragg->aggref, inputTypes, FUNC_MAX_ARGS);
pertrans->aggtranstype =
resolve_aggregate_transtype(aggfnoid, pertrans->aggtranstype, inputTypes, numArguments);
build_pertrans_for_aggref(pertrans, aggstate, estate, peragg->aggref, pertrans->transfn_oid,
pertrans->aggtranstype, pertrans->initValue, pertrans->initValueIsNull, inputTypes, numArguments, true);
exec_agg_finalfn_init(aggstate, node, peragg, pertrans, inputTypes, numArguments);
}
}
/*
* Build expressions doing all the transition work at once. We build a
* different one for each phase, as the number of transition function