diff --git a/src/bin/gs_guc/cluster_guc.conf b/src/bin/gs_guc/cluster_guc.conf index 753337891..e64ac7bd5 100755 --- a/src/bin/gs_guc/cluster_guc.conf +++ b/src/bin/gs_guc/cluster_guc.conf @@ -196,6 +196,7 @@ enable_global_syscache|bool|0,0|NULL|NULL| gpc_clean_timeout|int|300,86400|NULL|NULL| enable_hashagg|bool|0,0|NULL|NULL| enable_hashjoin|bool|0,0|NULL|NULL| +enable_sortgroup_agg|bool|0,0|NULL|NULL| enable_hdfs_predicate_pushdown|bool|0,0|NULL|NULL| enable_hypo_index|bool|0,0|NULL|NULL| enable_indexonlyscan|bool|0,0|NULL|NULL| diff --git a/src/common/backend/nodes/copyfuncs.cpp b/src/common/backend/nodes/copyfuncs.cpp index 07fbbb484..d756f58d2 100644 --- a/src/common/backend/nodes/copyfuncs.cpp +++ b/src/common/backend/nodes/copyfuncs.cpp @@ -1393,6 +1393,37 @@ static Sort* _copySort(const Sort* from) return newnode; } +/* + * CopySortGroupFields + * + * This function copies the fields of the SortGroup node. + */ +static void CopySortGroupFields(const SortGroup *from, SortGroup *newnode) +{ + CopyPlanFields((const Plan *)from, (Plan *)newnode); + + COPY_SCALAR_FIELD(numCols); + COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber)); + COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid)); + COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid)); + COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool)); +} + +/* + * _copySortGroup + */ +static SortGroup *_copySortGroup(const SortGroup *from) +{ + SortGroup *newnode = makeNode(SortGroup); + + /* + * copy node superclass fields + */ + CopySortGroupFields(from, newnode); + + return newnode; +} + /* * _copyGroup */ @@ -7571,6 +7602,9 @@ void* copyObject(const void* from) case T_Sort: retval = _copySort((Sort*)from); break; + case T_SortGroup: + retval = _copySortGroup((SortGroup*)from); + break; case T_Group: retval = _copyGroup((Group*)from); break; diff --git a/src/common/backend/nodes/nodes.cpp b/src/common/backend/nodes/nodes.cpp index 55eb55d12..6d66c4453 100755 --- a/src/common/backend/nodes/nodes.cpp +++ b/src/common/backend/nodes/nodes.cpp @@ -65,6 +65,7 @@ static const TagStr g_tagStrArr[] = {{T_Invalid, "Invalid"}, {T_HashJoin, "HashJoin"}, {T_Material, "Material"}, {T_Sort, "Sort"}, + {T_SortGroup, "SortGroup"}, {T_Group, "Group"}, {T_Agg, "Agg"}, {T_WindowAgg, "WindowAgg"}, @@ -132,6 +133,7 @@ static const TagStr g_tagStrArr[] = {{T_Invalid, "Invalid"}, {T_HashJoinState, "HashJoinState"}, {T_MaterialState, "MaterialState"}, {T_SortState, "SortState"}, + {T_SortGroupState, "SortGroupState"}, {T_GroupState, "GroupState"}, {T_AggState, "AggState"}, {T_WindowAggState, "WindowAggState"}, diff --git a/src/common/backend/nodes/outfuncs.cpp b/src/common/backend/nodes/outfuncs.cpp index 5991c615b..6dd70d313 100755 --- a/src/common/backend/nodes/outfuncs.cpp +++ b/src/common/backend/nodes/outfuncs.cpp @@ -1943,6 +1943,41 @@ static void _outSort(StringInfo str, Sort* node) out_mem_info(str, &node->mem_info); } +static void _outSortGroup(StringInfo str, SortGroup* node) +{ + int i; + + WRITE_NODE_TYPE("SORTGROUP"); + + _outPlanInfo(str, (Plan*)node); + + WRITE_INT_FIELD(numCols); + + appendStringInfo(str, " :sortColIdx"); + for (i = 0; i < node->numCols; i++) { + appendStringInfo(str, " %d", node->sortColIdx[i]); + } + + WRITE_GRPOP_FIELD(sortOperators, numCols); + + appendStringInfo(str, " :collations"); + for (i = 0; i < node->numCols; i++) { + appendStringInfo(str, " %u", node->collations[i]); + } + + for (i = 0; i < node->numCols; i++) { + if (node->collations[i] >= FirstBootstrapObjectId && IsStatisfyUpdateCompatibility(node->collations[i])) { + appendStringInfo(str, " :collname "); + _outToken(str, get_collation_name(node->collations[i])); + } + } + + appendStringInfo(str, " :nullsFirst"); + for (i = 0; i < node->numCols; i++) { + appendStringInfo(str, " %s", booltostr(node->nullsFirst[i])); + } +} + static void _outUnique(StringInfo str, Unique* node) { int i; @@ -6179,6 +6214,9 @@ static void _outNode(StringInfo str, const void* obj) case T_Sort: _outSort(str, (Sort*)obj); break; + case T_SortGroup: + _outSortGroup(str, (SortGroup*)obj); + break; case T_Unique: _outUnique(str, (Unique*)obj); break; diff --git a/src/common/backend/nodes/readfuncs.cpp b/src/common/backend/nodes/readfuncs.cpp index f84bbae59..8ab7ea6e2 100755 --- a/src/common/backend/nodes/readfuncs.cpp +++ b/src/common/backend/nodes/readfuncs.cpp @@ -4024,6 +4024,25 @@ static Sort* _readSort(Sort* local_node) READ_DONE(); } +static SortGroup* _readSortGroup(SortGroup* local_node) +{ + READ_LOCALS_NULL(SortGroup); + READ_TEMP_LOCALS(); + + // Read Plan + _readPlan(&local_node->plan); + + READ_INT_FIELD(numCols); + READ_ATTR_ARRAY(sortColIdx, numCols); + READ_OPERATOROID_ARRAY(sortOperators, numCols); + READ_OID_ARRAY(collations, numCols); + + READ_OID_ARRAY_BYCONVERT(collations, numCols); + + READ_BOOL_ARRAY(nullsFirst, numCols); + READ_DONE(); +} + static Unique* _readUnique(Unique* local_node) { READ_LOCALS_NULL(Unique); @@ -6354,6 +6373,8 @@ Node* parseNodeString(void) return_value = _readSimpleSort(NULL); } else if (MATCH("SORT", 4)) { return_value = _readSort(NULL); + } else if (MATCH("SORTGROUP", 9)) { + return_value = _readSortGroup(NULL); } else if (MATCH("UNIQUE", 6)) { return_value = _readUnique(NULL); } else if (MATCH("PLANNEDSTMT", 11)) { diff --git a/src/common/backend/parser/parse_hint.cpp b/src/common/backend/parser/parse_hint.cpp index 248c4ba90..efacffd1f 100755 --- a/src/common/backend/parser/parse_hint.cpp +++ b/src/common/backend/parser/parse_hint.cpp @@ -3825,8 +3825,7 @@ bool permit_predpush(PlannerInfo *root) return !predpushHint->negative; } -const unsigned int G_NUM_SET_HINT_WHITE_LIST = 39; -const char* G_SET_HINT_WHITE_LIST[G_NUM_SET_HINT_WHITE_LIST] = { +const char* G_SET_HINT_WHITE_LIST[] = { /* keep in the ascending alphabetical order of frequency */ (char*)"best_agg_plan", (char*)"cost_weight_index", @@ -3854,6 +3853,7 @@ const char* G_SET_HINT_WHITE_LIST[G_NUM_SET_HINT_WHITE_LIST] = { (char*)"enable_remotesort", (char*)"enable_seqscan", (char*)"enable_sort", + (char*)"enable_sortgroup_agg", (char*)"enable_stream_operator", (char*)"enable_stream_recursive", (char*)"enable_tidscan", @@ -3868,6 +3868,8 @@ const char* G_SET_HINT_WHITE_LIST[G_NUM_SET_HINT_WHITE_LIST] = { (char*)"try_vector_engine_strategy", (char*)"var_eq_const_selectivity"}; +const unsigned int G_NUM_SET_HINT_WHITE_LIST = sizeof(G_SET_HINT_WHITE_LIST) / sizeof(G_SET_HINT_WHITE_LIST[0]); + static int param_str_cmp(const void *s1, const void *s2) { const char *key = (const char *)s1; diff --git a/src/common/backend/utils/misc/guc/guc_sql.cpp b/src/common/backend/utils/misc/guc/guc_sql.cpp index cbb1b8ccc..c4517892d 100755 --- a/src/common/backend/utils/misc/guc/guc_sql.cpp +++ b/src/common/backend/utils/misc/guc/guc_sql.cpp @@ -749,6 +749,17 @@ static void InitSqlConfigureNamesBool() NULL, NULL, NULL}, + {{"enable_sortgroup_agg", + PGC_USERSET, + NODE_ALL, + QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of sort group aggregation plans."), + NULL}, + &u_sess->attr.attr_sql.enable_sortgroup_agg, + false, + NULL, + NULL, + NULL}, {{"enable_material", PGC_USERSET, NODE_ALL, diff --git a/src/common/backend/utils/misc/postgresql_single.conf.sample b/src/common/backend/utils/misc/postgresql_single.conf.sample index 278ae73e2..46dc2fb04 100644 --- a/src/common/backend/utils/misc/postgresql_single.conf.sample +++ b/src/common/backend/utils/misc/postgresql_single.conf.sample @@ -347,6 +347,7 @@ hot_standby = on # "on" allows queries during recovery #enable_bitmapscan = on #enable_hashagg = on +#enable_sortgroup_agg = off #enable_hashjoin = on #enable_indexscan = on #enable_indexonlyscan = on diff --git a/src/common/backend/utils/sort/logtape.cpp b/src/common/backend/utils/sort/logtape.cpp index 4709d0b88..0f084ba52 100644 --- a/src/common/backend/utils/sort/logtape.cpp +++ b/src/common/backend/utils/sort/logtape.cpp @@ -191,13 +191,14 @@ struct LogicalTapeSet { /* The array of logical tapes. */ int nTapes; /* # of logical tapes in set */ - LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */ + LogicalTape *tapes; /* has nTapes nentries */ }; static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer); static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer); static long ltsGetFreeBlock(LogicalTapeSet *lts); static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum); +static void ltsInitTape(LogicalTape *lt); static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, SharedFileSet *fileset); /* @@ -462,14 +463,13 @@ static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, SharedF LogicalTapeSet* LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker) { LogicalTapeSet *lts; - LogicalTape *lt; int i; /* * Create top-level struct including per-tape LogicalTape structs. */ Assert(ntapes > 0); - lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) + ntapes * sizeof(LogicalTape)); + lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet)); lts->nBlocksAllocated = 0L; lts->nBlocksWritten = 0L; lts->nHoleBlocks = 0L; @@ -479,6 +479,7 @@ LogicalTapeSet* LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSe lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long)); lts->nFreeBlocks = 0; lts->nTapes = ntapes; + lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape)); /* * Initialize per-tape structs. Note we allocate the I/O buffer and the @@ -487,20 +488,7 @@ LogicalTapeSet* LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSe * of tapes needed. */ for (i = 0; i < ntapes; i++) { - lt = <s->tapes[i]; - lt->writing = true; - lt->frozen = false; - lt->dirty = false; - lt->firstBlockNumber = -1L; - lt->curBlockNumber = -1L; - lt->nextBlockNumber = -1L; - lt->offsetBlockNumber = 0L; - lt->buffer = NULL; - lt->buffer_size = 0; - /* palloc() larger than MaxAllocSize would fail */ - lt->max_size = MaxAllocSize; - lt->pos = 0; - lt->nbytes = 0; + ltsInitTape(<s->tapes[i]); } /* @@ -851,6 +839,43 @@ void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share) } } +/* + * Initialize per-tape struct. Note we allocate the I/O buffer lazily. + */ +static void ltsInitTape(LogicalTape *lt) +{ + lt->writing = true; + lt->frozen = false; + lt->dirty = false; + lt->firstBlockNumber = -1L; + lt->curBlockNumber = -1L; + lt->nextBlockNumber = -1L; + lt->offsetBlockNumber = 0L; + lt->buffer = NULL; + lt->buffer_size = 0; + /* palloc() larger than MaxAllocSize would fail */ + lt->max_size = MaxAllocSize; + lt->pos = 0; + lt->nbytes = 0; +} + +/* + * Add additional tapes to this tape set. Not intended to be used when any + * tapes are frozen. + */ +void LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional) +{ + int i; + int nTapesOrig = lts->nTapes; + + lts->nTapes += nAdditional; + + lts->tapes = (LogicalTape *)repalloc(lts->tapes, lts->nTapes * sizeof(LogicalTape)); + + for (i = nTapesOrig; i < lts->nTapes; i++) + ltsInitTape(<s->tapes[i]); +} + /* * Backspace the tape a given number of bytes. (We also support a more * general seek interface, see below.) diff --git a/src/gausskernel/optimizer/commands/explain.cpp b/src/gausskernel/optimizer/commands/explain.cpp index 6fdb8293a..8356ae563 100755 --- a/src/gausskernel/optimizer/commands/explain.cpp +++ b/src/gausskernel/optimizer/commands/explain.cpp @@ -166,6 +166,7 @@ static void show_merge_append_keys(MergeAppendState* mstate, List* ancestors, Ex static void show_merge_sort_keys(PlanState* state, List* ancestors, ExplainState* es); static void show_startwith_pseudo_entries(PlanState* state, List* ancestors, ExplainState* es); static void show_sort_info(SortState* sortstate, ExplainState* es); +static void show_sort_group_info(SortGroupState *state, ExplainState *es); static void show_hash_info(HashState* hashstate, ExplainState* es); static void show_vechash_info(VecHashJoinState* hashstate, ExplainState* es); static void show_tidbitmap_info(BitmapHeapScanState *planstate, ExplainState *es); @@ -2882,6 +2883,13 @@ static void ExplainNode( show_sort_info((SortState*)planstate, es); show_llvm_info(planstate, es); break; + case T_SortGroup: { + SortGroup *plan = (SortGroup *)planstate->plan; + show_sort_group_keys(planstate, "Sorted Group Key", plan->numCols, plan->sortColIdx, plan->sortOperators, + plan->collations, plan->nullsFirst, ancestors, es); + show_sort_group_info(castNode(SortGroupState, planstate), es); + break; + } case T_MergeAppend: show_merge_append_keys((MergeAppendState*)planstate, ancestors, es); break; @@ -4327,6 +4335,28 @@ static void show_sort_info(SortState* sortstate, ExplainState* es) } } +/* + * If it's EXPLAIN ANALYZE, show stats for a SortGroupState + */ +static void show_sort_group_info(SortGroupState *state, ExplainState *es) +{ + if (!es->analyze) + return; + if (state->sort_Done && state->state != NULL) { + int64 spaceUsed = Max(1, state->spaceUsed / 1024); + if (es->format == EXPLAIN_FORMAT_TEXT) { + if (es->str->len == 0 || es->str->data[es->str->len - 1] == '\n') + appendStringInfoSpaces(es->str, es->indent * 2); + appendStringInfo(es->str, "Space Used: %s : " INT64_FORMAT "kB\n", state->spaceType, + spaceUsed); + } + else { + ExplainPropertyInteger("Sort Space Used(kB)", spaceUsed, es); + ExplainPropertyText("Sort Space Type", state->spaceType, es); + } + } +} + template static void show_datanode_hash_info(ExplainState *es, int nbatch, int nbuckets_original, int nbatch_original, int nbuckets, long spacePeakKb) diff --git a/src/gausskernel/optimizer/path/costsize.cpp b/src/gausskernel/optimizer/path/costsize.cpp index e9d4ae167..5a7e7ba12 100755 --- a/src/gausskernel/optimizer/path/costsize.cpp +++ b/src/gausskernel/optimizer/path/costsize.cpp @@ -2281,6 +2281,120 @@ void cost_sort(Path* path, List* pathkeys, Cost input_cost, double tuples, int w (g_instance.cost_cxt.disable_cost_enlarge_factor * g_instance.cost_cxt.disable_cost_enlarge_factor); } +/* + * cost_groupsort + * Determines and returns the cost of sorting a relation using groupsort, + * not including the cost of reading the input data. + */ +static void cost_groupsort(PlannerInfo *root, Cost *startup_cost, Cost *run_cost, double *tuples, int width, Cost comparison_cost, + int sort_mem, double dNumGroups) +{ + double totalTuples = *tuples; + double input_bytes = relation_byte_size(totalTuples, width, false); + double output_bytes; + long sort_mem_bytes = sort_mem * 1024L; + double remainTuples; + double remainGroups; + double maxGroups = (double)sort_mem_bytes / BLCKSZ; + Cost discard_costs = 0; + Cost cpu_costs = 0; + Cost disk_costs = 0; + + /* Include the default cost-per-comparison */ + comparison_cost += 2.0 * u_sess->attr.attr_sql.cpu_operator_cost; + + if (0 < root->limit_tuples && root->limit_tuples < dNumGroups) { + /* estimate how many tuples are discarded directly */ + remainGroups = root->limit_tuples; + double ratio = (remainGroups / dNumGroups); + remainTuples = ratio * totalTuples; + output_bytes = ratio * input_bytes; + } + else { + remainGroups = dNumGroups; + remainTuples = totalTuples; + output_bytes = input_bytes; + } + + /*mustn't do log(0)*/ + if (remainGroups < 2.0) + remainGroups = 2.0; + if (remainTuples < 2.0) + remainTuples = 2.0; + + if (remainGroups > maxGroups || remainGroups * width > sort_mem_bytes) { + /* + * too many groups, or required memory exceeds exceeds work_mem, + * don't consider this plan + */ + *startup_cost += g_instance.cost_cxt.disable_cost * g_instance.cost_cxt.disable_cost_enlarge_factor; + } + + if (remainTuples < totalTuples) { + double discard_tuples = totalTuples - remainTuples; + /* + * Assume 0.9 of tuples are discarded directly, + * 0.1 tuples are inserted into skiplist first, but discarded by LIMIT N latter + */ + discard_costs = 0.9 * discard_tuples * comparison_cost + /*discarded directly*/ + 0.1 * discard_tuples * comparison_cost * LOG2(remainGroups); /* discarded by LIMIT N */ + } + + if (output_bytes > sort_mem_bytes) { + /* + * We'll have to use a disk-based sort of all the tuples + */ + double pagesPerGroup = ceil(input_bytes / remainGroups / BLCKSZ); + double npages = pagesPerGroup * remainGroups; + double npageaccesses; + + /* + * CPU costs + * + * Assume about NUMBER_TUPLES *log2 (NUMBER_GROUPS) comparisons + */ + cpu_costs += comparison_cost * remainTuples * LOG2(remainGroups); + + /* Disk costs */ + npageaccesses = 2.0 * npages; + /* Assume 3/4ths of accesses are sequential, 1/4th are not */ + disk_costs += npageaccesses * (u_sess->attr.attr_sql.seq_page_cost * 0.75 + + u_sess->attr.attr_sql.random_page_cost * 0.25); + } else { + /* We'll use plain groupsort on all the input tuples */ + cpu_costs += comparison_cost * remainTuples * LOG2(remainGroups); + } + + *startup_cost = discard_costs + cpu_costs + disk_costs; + /* + * Also charge a small amount (arbitrarily set equal to operator cost) per + * extracted tuple. + */ + + *run_cost = u_sess->attr.attr_sql.cpu_operator_cost * remainTuples; + *tuples = remainTuples; +} + +/* + * cost_sort_group + * Determines and returns the cost of sorting a relation using groupsort, + * including the cost of reading the input data. + */ +void cost_sort_group(Path *path, PlannerInfo *root, Cost input_cost, double tuples, int width, + Cost comparison_cost, int sort_mem, double dNumGroups) +{ + Cost startup_cost = 0; + Cost run_cost = 0; + + cost_groupsort(root, &startup_cost, &run_cost, &tuples, width, comparison_cost, sort_mem, dNumGroups); + + startup_cost += input_cost; + + path->rows = tuples; + path->startup_cost = startup_cost; + path->total_cost = startup_cost + run_cost; +} + /* * compute_sort_disk_cost * compute disk spill cost of sort operator @@ -2601,14 +2715,17 @@ void cost_agg(Path* path, PlannerInfo* root, AggStrategy aggstrategy, const AggC /* we aren't grouping */ total_cost = startup_cost + u_sess->attr.attr_sql.cpu_tuple_cost; output_tuples = 1; - } else if (aggstrategy == AGG_SORTED) { + } else if (aggstrategy == AGG_SORTED || aggstrategy == AGG_SORT_GROUP) { /* Here we are able to deliver output on-the-fly */ startup_cost = input_startup_cost; total_cost = input_total_cost; /* calcs phrased this way to match HASHED case, see note above */ total_cost += aggcosts->transCost.startup; total_cost += aggcosts->transCost.per_tuple * input_tuples; - total_cost += (u_sess->attr.attr_sql.cpu_operator_cost * numGroupCols) * input_tuples; + if (aggstrategy != AGG_SORT_GROUP) { + /* AGG_SORT_GROUP is not need to to perform grouping comparisons */ + total_cost += (u_sess->attr.attr_sql.cpu_operator_cost * numGroupCols) * input_tuples; + } total_cost += aggcosts->finalCost * numGroups; total_cost += u_sess->attr.attr_sql.cpu_tuple_cost * numGroups; output_tuples = numGroups; diff --git a/src/gausskernel/optimizer/plan/createplan.cpp b/src/gausskernel/optimizer/plan/createplan.cpp index 932a85b25..86f7ffff9 100755 --- a/src/gausskernel/optimizer/plan/createplan.cpp +++ b/src/gausskernel/optimizer/plan/createplan.cpp @@ -6980,6 +6980,48 @@ Sort* make_sort(PlannerInfo* root, Plan* lefttree, int numCols, AttrNumber* sort return node; } +/* + * make_sortgroup --- basic routine to build a SortGroup plan node + */ +SortGroup* make_sortgroup(PlannerInfo* root, Plan* lefttree, int numCols, AttrNumber* sortColIdx, Oid* sortOperators, + Oid* collations, bool* nullsFirst, double dNumGroup) +{ + SortGroup* node = makeNode(SortGroup); + Plan* plan = &node->plan; + Path sort_path; /* dummy for result of cost_sort_group */ + + copy_plan_costsize(plan, lefttree); /* only care about copying size */ + +#ifdef STREAMPLAN + inherit_plan_locator_info((Plan*)node, lefttree); +#endif + + cost_sort_group(&sort_path, + root, + lefttree->total_cost, + lefttree->plan_rows, + lefttree->plan_width, + 0.0, + u_sess->opt_cxt.op_work_mem, + dNumGroup); + + plan->startup_cost = sort_path.startup_cost; + plan->total_cost = sort_path.total_cost; + plan->plan_rows = sort_path.rows; + plan->targetlist = lefttree->targetlist; + plan->qual = NIL; + plan->lefttree = lefttree; + plan->righttree = NULL; + plan->hasUniqueResults = lefttree->hasUniqueResults; + plan->dop = lefttree->dop; + node->numCols = numCols; + node->sortColIdx = sortColIdx; + node->sortOperators = sortOperators; + node->collations = collations; + node->nullsFirst = nullsFirst; + return node; +} + /* * prepare_sort_from_pathkeys * Prepare to sort according to given pathkeys @@ -7418,6 +7460,54 @@ Sort* make_sort_from_groupcols(PlannerInfo* root, List* groupcls, AttrNumber* gr return make_sort(root, lefttree, numsortkeys, sortColIdx, sortOperators, collations, nullsFirst, -1.0); } + +/* + * make_sort_group_from_groupcols + * Create SortGroup plan + * + * 'groupcls' is the list of SortGroupClauses + * 'grpColIdx' gives the column numbers to use + * + */ +SortGroup* make_sort_group_from_groupcols(PlannerInfo* root, List* groupcls, AttrNumber* grpColIdx, Plan* lefttree, double dNumGroup) +{ + List* sub_tlist = lefttree->targetlist; + ListCell* l = NULL; + int numsortkeys; + AttrNumber* sortColIdx = NULL; + Oid* sortOperators = NULL; + Oid* collations = NULL; + bool* nullsFirst = NULL; + + /* Convert list-ish representation to arrays wanted by executor */ + numsortkeys = list_length(groupcls); + sortColIdx = (AttrNumber*)palloc(numsortkeys * sizeof(AttrNumber)); + sortOperators = (Oid*)palloc(numsortkeys * sizeof(Oid)); + collations = (Oid*)palloc(numsortkeys * sizeof(Oid)); + nullsFirst = (bool*)palloc(numsortkeys * sizeof(bool)); + + numsortkeys = 0; + foreach (l, groupcls) { + SortGroupClause* grpcl = (SortGroupClause*)lfirst(l); + TargetEntry* tle = get_tle_by_resno(sub_tlist, grpColIdx[numsortkeys]); + + if (tle == NULL) { + /* just break if we cannot find TargetEntry for SortGroupClause */ + ereport(ERROR, + (errmodule(MOD_OPT), + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("fail to find TargetEntry referenced by SortGroupClause")))); + } + + sortColIdx[numsortkeys] = tle->resno; + sortOperators[numsortkeys] = grpcl->sortop; + collations[numsortkeys] = exprCollation((Node*)tle->expr); + nullsFirst[numsortkeys] = grpcl->nulls_first; + numsortkeys++; + } + return make_sortgroup(root, lefttree, numsortkeys, sortColIdx, sortOperators, collations, nullsFirst, dNumGroup); +} + /* * make_sort_from_targetlist * Create sort plan to sort based on input plan's targetlist @@ -7626,6 +7716,12 @@ Agg* make_agg(PlannerInfo* root, List* tlist, List* qual, AggStrategy aggstrateg node->single_node = true; } + if (aggstrategy == AGG_SORT_GROUP && lefttree->type != T_SortGroup) { + /*subnode is not SortGroup, fallback strategy to AGG_SORTED */ + aggstrategy = AGG_SORTED; + root->consider_sortgroup_agg = false; + } + node->aggstrategy = aggstrategy; node->numCols = numGroupCols; node->grpColIdx = grpColIdx; @@ -9140,6 +9236,7 @@ bool is_projection_capable_plan(Plan* plan) case T_Hash: case T_Material: case T_Sort: + case T_SortGroup: case T_Unique: case T_SetOp: case T_LockRows: @@ -10564,6 +10661,7 @@ bool is_projection_capable_path(Path *path) case T_Hash: case T_Material: case T_Sort: + case T_SortGroup: case T_Unique: case T_SetOp: case T_LockRows: diff --git a/src/gausskernel/optimizer/plan/planmain.cpp b/src/gausskernel/optimizer/plan/planmain.cpp index b4d9ae25a..1410b7a7c 100755 --- a/src/gausskernel/optimizer/plan/planmain.cpp +++ b/src/gausskernel/optimizer/plan/planmain.cpp @@ -503,8 +503,8 @@ void update_tuple_fraction(PlannerInfo* root, !pathkeys_contained_in(root->window_pathkeys, root->group_pathkeys)) tuple_fraction = 0.0; - /* In any case, limit_tuples shouldn't be specified here */ - AssertEreport(limit_tuples < 0, + /* if we don not consider sort group agg, limit_tuples shouldn't be specified here */ + AssertEreport(root->consider_sortgroup_agg || limit_tuples < 0, MOD_OPT, "invalid limit tuples when estimating the number of result groups in grouping process."); } else if (parse->hasAggs || root->hasHavingQual || parse->groupingSets) { diff --git a/src/gausskernel/optimizer/plan/planner.cpp b/src/gausskernel/optimizer/plan/planner.cpp index 38a2276ef..b603dbb07 100755 --- a/src/gausskernel/optimizer/plan/planner.cpp +++ b/src/gausskernel/optimizer/plan/planner.cpp @@ -2738,6 +2738,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction) planner_targets->scanjoin_contains_srfs = false; root->planner_targets = planner_targets; } + root->consider_sortgroup_agg = u_sess->attr.attr_sql.enable_sortgroup_agg; /* * Apply memory context for generate plan in optimizer. @@ -2764,6 +2765,9 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction) limit_tuples = (double)count_est + (double)offset_est; } + if (limit_tuples < 0) + root->consider_sortgroup_agg = false; + if (parse->setOperations) { List* set_sortclauses = NIL; @@ -2885,8 +2889,10 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction) /* Preprocess GROUP BY clause, if any */ /* Preprocess Grouping set, if any */ - if (parse->groupingSets) + if (parse->groupingSets) { parse->groupingSets = expand_grouping_sets(parse->groupingSets, -1); + root->consider_sortgroup_agg = false; + } if (parse->groupClause) { ListCell* lc = NULL; @@ -2967,6 +2973,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction) upsertClause->updateTlist = preprocess_upsert_targetlist(upsertClause->updateTlist, linitial_int(parse->resultRelations), parse->rtable); + root->consider_sortgroup_agg = false; } /* * Locate any window functions in the tlist. (We don't need to look @@ -2983,6 +2990,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction) } else { parse->hasWindowFuncs = false; } + root->consider_sortgroup_agg = false; } /* @@ -3035,15 +3043,19 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction) u_sess->opt_cxt.query_dop = dop_tmp; } + if (parse->distinctClause || parse->havingQual || parse->hasWindowFuncs || root->hasHavingQual || parse->hasTargetSRFs) + root->consider_sortgroup_agg = false; + /* * Figure out whether there's a hard limit on the number of rows that * query_planner's result subplan needs to return. Even if we know a * hard limit overall, it doesn't apply if the query has any * grouping/aggregation operations, or SRFs in the tlist. */ - if (parse->groupClause || parse->groupingSets || parse->distinctClause || parse->hasAggs || + if (!root->consider_sortgroup_agg && + (parse->groupClause || parse->groupingSets || parse->distinctClause || parse->hasAggs || parse->hasWindowFuncs || root->hasHavingQual || - (parse->is_flt_frame && parse->hasTargetSRFs)) + (parse->is_flt_frame && parse->hasTargetSRFs))) sub_limit_tuples = -1.0; else sub_limit_tuples = limit_tuples; @@ -3257,6 +3269,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction) * right tlist, and it has no sort order. */ current_pathkeys = NIL; + root->consider_sortgroup_agg = false; } else { /* * Normal case --- create a plan according to query_planner's @@ -3872,11 +3885,18 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction) if (need_sort_for_grouping && partial_plan == NULL && (IS_STREAM_PLAN || parse->groupingSets == NULL)) { - result_plan = - (Plan*)make_sort_from_groupcols(root, parse->groupClause, groupColIdx, result_plan); + if (root->consider_sortgroup_agg) { + result_plan = (Plan*) make_sort_group_from_groupcols(root, parse->groupClause, groupColIdx, result_plan, dNumGroups[0]); + } else { + result_plan = + (Plan*)make_sort_from_groupcols(root, parse->groupClause, groupColIdx, result_plan); + } current_pathkeys = root->group_pathkeys; } - aggstrategy = AGG_SORTED; + if (root->consider_sortgroup_agg) + aggstrategy = AGG_SORT_GROUP; + else + aggstrategy = AGG_SORTED; } else { if (IS_STREAM_PLAN && count_distinct_optimization) { @@ -6141,6 +6161,12 @@ static void compute_hashed_path_cost(PlannerInfo* root, double limit_tuples, int bool needs_stream = false; bool need_second_hashagg = false; + if (!u_sess->attr.attr_sql.enable_hashagg) { + copy_path_costsize(hashed_p, cheapest_path); + hashed_p->total_cost = hashed_p->startup_cost = g_instance.cost_cxt.disable_cost; + return; + } + /* * See if the estimated cost is no more than doing it the other way. While * avoiding the need for sorted input is usually a win, the fact that the @@ -6643,6 +6669,14 @@ static void compute_sorted_path_cost(PlannerInfo* root, double limit_tuples, int if (!pathkeys_contained_in(root->group_pathkeys, current_pathkeys)) { current_pathkeys = root->group_pathkeys; need_sort_for_grouping = true; + + if (!u_sess->attr.attr_sql.enable_sort) { + sorted_p->total_cost = sorted_p->startup_cost = g_instance.cost_cxt.disable_cost; + return; + } + } else { + /* already sorted, never consider group sorting */ + root->consider_sortgroup_agg = false; } bool is_replicate = (!IS_STREAM_PLAN || @@ -6749,6 +6783,57 @@ static void compute_sorted_path_cost(PlannerInfo* root, double limit_tuples, int ereport(DEBUG1, (errmodule(MOD_OPT_AGG), (errmsg("[final sorted path total cost]: %lf", sorted_p->total_cost)))); } +/* + * compute_sort_group_path_cost: compute sort group path cost for choose. + * + */ +static void compute_sort_group_path_cost(PlannerInfo *root, double limit_tuples, int path_width, Path *cheapest_path, + const double dNumGroup, AggClauseCosts *agg_costs, Size hashentrysize, + List *target_pathkeys, Path *sorted_p) +{ + Query *parse = root->parse; + int numGroupCols = list_length(parse->groupClause); + List *current_pathkeys; + + copy_path_costsize(sorted_p, cheapest_path); + current_pathkeys = cheapest_path->pathkeys; + + + if (!u_sess->attr.attr_sql.enable_sortgroup_agg || + !root->consider_sortgroup_agg || + pathkeys_contained_in(root->group_pathkeys, current_pathkeys)) + { + /* already sorted, or sort group agg is disabled, never consider group sorting */ + root->consider_sortgroup_agg = false; + sorted_p->total_cost = sorted_p->startup_cost = g_instance.cost_cxt.disable_cost; + return; + } + else { + current_pathkeys = root->group_pathkeys; + } + + cost_sort_group(sorted_p, + root, + cheapest_path->total_cost, + PATH_LOCAL_ROWS(cheapest_path), + path_width, + 0.0, + u_sess->opt_cxt.op_work_mem, + dNumGroup); + cost_agg(sorted_p, + root, + AGG_SORT_GROUP, + agg_costs, + numGroupCols, + dNumGroup, + sorted_p->startup_cost, + sorted_p->total_cost, + sorted_p->rows, + path_width, + path_width, + hashentrysize); +} + /* * Executor doesn't support hashed aggregation with DISTINCT or ORDER BY * aggregates. (Doing so would imply storing *all* the input values in @@ -6800,7 +6885,7 @@ static bool choose_hashed_grouping(PlannerInfo* root, double tuple_fraction, dou bool can_sort = false; Size hashentrysize; List* target_pathkeys = NIL; - Path hashed_p, sorted_p; + Path hashed_p, sorted_p, sort_group_p; errno_t rc = EOK; can_hash = grouping_is_can_hash(parse, agg_costs); @@ -6863,15 +6948,10 @@ static bool choose_hashed_grouping(PlannerInfo* root, double tuple_fraction, dou #endif } - /* Prefer hashagg or sort when guc is set */ - if (!u_sess->attr.attr_sql.enable_hashagg && u_sess->attr.attr_sql.enable_sort) - return false; - if (!u_sess->attr.attr_sql.enable_sort && u_sess->attr.attr_sql.enable_hashagg) - return true; - /* If guc plan_mode_seed is random plan, we should choose random path between AGG_HASHED and AGG_SORTED */ if (u_sess->attr.attr_sql.plan_mode_seed != OPTIMIZE_PLAN) { int random_option = choose_random_option(lengthof(g_agglist)); + root->consider_sortgroup_agg = false; return (AGG_HASHED == g_agglist[random_option]); } @@ -6893,6 +6973,8 @@ static bool choose_hashed_grouping(PlannerInfo* root, double tuple_fraction, dou securec_check(rc, "\0", "\0"); rc = memset_s(&sorted_p, sizeof(sorted_p), 0, sizeof(sorted_p)); securec_check(rc, "\0", "\0"); + rc = memset_s(&sort_group_p, sizeof(sort_group_p), 0, sizeof(sort_group_p)); + securec_check(rc, "\0", "\0"); /* compute the minimal total cost for hash path. */ Distribution* distribution = ng_get_dest_distribution(cheapest_path); @@ -6919,12 +7001,30 @@ static bool choose_hashed_grouping(PlannerInfo* root, double tuple_fraction, dou target_pathkeys, &sorted_p); + compute_sort_group_path_cost(root, + limit_tuples, + path_width, + cheapest_path, + dNumGroups[0], + agg_costs, + hashentrysize, + target_pathkeys, + &sort_group_p); + /* * Now make the decision using the top-level tuple fraction. First we * have to convert an absolute count (LIMIT) into fractional form. */ tuple_fraction = tuple_fraction >= 1.0 ? tuple_fraction / dNumGroups[0] : tuple_fraction; + if (root->consider_sortgroup_agg && + compare_fractional_path_costs(&sort_group_p, &sorted_p, tuple_fraction) < 0) { + /*sort group is cheaper, so use it*/ + copy_path_costsize(&sorted_p, &sort_group_p); + } else { + root->consider_sortgroup_agg = false; + } + if (compare_fractional_path_costs(&hashed_p, &sorted_p, tuple_fraction) < 0) { /* Hashed is cheaper, so use it */ return true; @@ -9682,6 +9782,8 @@ static bool vector_engine_walker_internal(Plan* result_plan, bool check_rescan, break; case T_Agg: { + if (((Agg*)result_plan)->aggstrategy == AGG_SORT_GROUP) + return true; /* Check if targetlist contains unsupported feature */ if (vector_engine_expression_walker((Node*)(result_plan->targetlist), NULL)) return true; @@ -9917,6 +10019,7 @@ static Plan* fallback_plan(Plan* result_plan) case T_BaseResult: case T_ProjectSet: case T_Sort: + case T_SortGroup: case T_Stream: case T_Material: case T_StartWithOp: @@ -10100,7 +10203,14 @@ Plan* vectorize_plan(Plan* result_plan, bool ignore_remotequery, bool forceVecto return make_rowtove_plan(result_plan); } break; - + case T_SortGroup: + { + result_plan->lefttree = vectorize_plan(result_plan->lefttree, ignore_remotequery, forceVectorEngine); + if (result_plan->lefttree && IsVecOutput(result_plan->lefttree)) { + result_plan->lefttree = (Plan*) make_vectorow(result_plan->lefttree); + } + return result_plan; + } case T_MergeJoin: case T_NestLoop: result_plan->lefttree = vectorize_plan(result_plan->lefttree, ignore_remotequery, forceVectorEngine); diff --git a/src/gausskernel/optimizer/plan/setrefs.cpp b/src/gausskernel/optimizer/plan/setrefs.cpp index aa62a91c6..cdda66282 100644 --- a/src/gausskernel/optimizer/plan/setrefs.cpp +++ b/src/gausskernel/optimizer/plan/setrefs.cpp @@ -544,6 +544,7 @@ static Plan* set_plan_refs(PlannerInfo* root, Plan* plan, int rtoffset) case T_Material: case T_VecMaterial: case T_Sort: + case T_SortGroup: case T_VecSort: case T_Unique: case T_VecUnique: diff --git a/src/gausskernel/optimizer/plan/subselect.cpp b/src/gausskernel/optimizer/plan/subselect.cpp index 07d5d9bee..daae5c0fd 100644 --- a/src/gausskernel/optimizer/plan/subselect.cpp +++ b/src/gausskernel/optimizer/plan/subselect.cpp @@ -3302,6 +3302,7 @@ static Bitmapset* finalize_plan(PlannerInfo* root, Plan* plan, Bitmapset* valid_ case T_Hash: case T_Material: case T_Sort: + case T_SortGroup: case T_Unique: case T_SetOp: case T_Group: @@ -7483,4 +7484,4 @@ static void winmagic_replace_varlist_varno(Query* dest_qry, } pfree_ext(varno_map); -} \ No newline at end of file +} diff --git a/src/gausskernel/optimizer/util/optcommon.cpp b/src/gausskernel/optimizer/util/optcommon.cpp index 4aaadbe11..6f93ad369 100755 --- a/src/gausskernel/optimizer/util/optcommon.cpp +++ b/src/gausskernel/optimizer/util/optcommon.cpp @@ -314,6 +314,9 @@ void GetPlanNodePlainText( case T_Sort: *pname = *sname = *pt_operation = "Sort"; break; + case T_SortGroup: + *pname = *sname = *pt_operation = "Group Sort"; + break; case T_VecSort: *pname = *sname = *pt_operation = "Vector Sort"; break; @@ -338,6 +341,10 @@ void GetPlanNodePlainText( *pname = "HashAggregate"; *strategy = *pt_options = "Hashed"; break; + case AGG_SORT_GROUP: + *pname = "GroupAggregate"; + *strategy = *pt_options = "Sorted Group"; + break; default: *pname = "Aggregate ?\?\?"; *strategy = *pt_options = "?\?\?"; diff --git a/src/gausskernel/optimizer/util/optimizerdebug.cpp b/src/gausskernel/optimizer/util/optimizerdebug.cpp index 46f22d8d4..35e3d404d 100644 --- a/src/gausskernel/optimizer/util/optimizerdebug.cpp +++ b/src/gausskernel/optimizer/util/optimizerdebug.cpp @@ -477,6 +477,7 @@ void debug_print_agg_detail(PlannerInfo* root, AggStrategy aggstrategy, SAggMeth aggmthname = "DN_AGG_CN"; break; case AGG_SORTED: + case AGG_SORT_GROUP: aggstgname = "GroupAggregate"; break; case AGG_HASHED: diff --git a/src/gausskernel/optimizer/util/planmem_walker.cpp b/src/gausskernel/optimizer/util/planmem_walker.cpp index eca8c9014..f4a04c88b 100644 --- a/src/gausskernel/optimizer/util/planmem_walker.cpp +++ b/src/gausskernel/optimizer/util/planmem_walker.cpp @@ -406,6 +406,7 @@ bool plan_tree_walker(Node* node, MethodWalker walker, void* context) case T_VecSort: case T_Sort: + case T_SortGroup: if (walk_plan_node_fields((Plan*)node, walker, context)) return true; /* Other fields are simple counts and lists of indexes and oids. */ diff --git a/src/gausskernel/runtime/executor/Makefile b/src/gausskernel/runtime/executor/Makefile index af1fee525..386a34d9d 100644 --- a/src/gausskernel/runtime/executor/Makefile +++ b/src/gausskernel/runtime/executor/Makefile @@ -49,7 +49,7 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execJunk.o execMain.o \ nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o spi.o \ nodePartIterator.o nodeStub.o execClusterResize.o lightProxy.o execMerge.o \ nodeExtensible.o route.o nodeTrainModel.o db4ai_common.o spiDbesql.o \ - nodeProjectSet.o + nodeProjectSet.o nodeSortGroup.o override CPPFLAGS += -D__STDC_FORMAT_MACROS diff --git a/src/gausskernel/runtime/executor/execAmi.cpp b/src/gausskernel/runtime/executor/execAmi.cpp index 47103e378..2d6d52b84 100755 --- a/src/gausskernel/runtime/executor/execAmi.cpp +++ b/src/gausskernel/runtime/executor/execAmi.cpp @@ -54,6 +54,7 @@ #include "executor/node/nodeWindowAgg.h" #include "executor/node/nodeWorktablescan.h" #include "executor/node/nodeProjectSet.h" +#include "executor/node/nodeSortGroup.h" #include "nodes/nodeFuncs.h" #include "vecexecutor/vecnodes.h" #include "vecexecutor/vecnodevectorow.h" @@ -220,6 +221,9 @@ void ExecReScanByType(PlanState* node) case T_SortState: ExecReScanSort((SortState*)node); break; + case T_SortGroupState: + ExecReScanSortGroup((SortGroupState*)node); + break; case T_GroupState: ExecReScanGroup((GroupState*)node); diff --git a/src/gausskernel/runtime/executor/execProcnode.cpp b/src/gausskernel/runtime/executor/execProcnode.cpp index 298ea5e42..ec44a8ace 100755 --- a/src/gausskernel/runtime/executor/execProcnode.cpp +++ b/src/gausskernel/runtime/executor/execProcnode.cpp @@ -112,6 +112,7 @@ #include "executor/node/nodeWindowAgg.h" #include "executor/node/nodeWorktablescan.h" #include "executor/node/nodeProjectSet.h" +#include "executor/node/nodeSortGroup.h" #include "executor/exec/execStream.h" #include "optimizer/clauses.h" #include "optimizer/encoding.h" @@ -306,6 +307,8 @@ PlanState* ExecInitNodeByType(Plan* node, EState* estate, int eflags) return (PlanState*)ExecInitMaterial((Material*)node, estate, eflags); case T_Sort: return (PlanState*)ExecInitSort((Sort*)node, estate, eflags); + case T_SortGroup: + return (PlanState*)ExecInitSortGroup((SortGroup*)node, estate, eflags); case T_Group: return (PlanState*)ExecInitGroup((Group*)node, estate, eflags); case T_Agg: @@ -1134,6 +1137,10 @@ static void ExecEndNodeByType(PlanState* node) ExecEndSort((SortState*)node); break; + case T_SortGroupState: + ExecEndSortGroup((SortGroupState*)node); + break; + case T_GroupState: ExecEndGroup((GroupState*)node); break; diff --git a/src/gausskernel/runtime/executor/nodeAgg.cpp b/src/gausskernel/runtime/executor/nodeAgg.cpp index 4895cac7b..e18091255 100644 --- a/src/gausskernel/runtime/executor/nodeAgg.cpp +++ b/src/gausskernel/runtime/executor/nodeAgg.cpp @@ -172,6 +172,7 @@ static TupleTableSlot* agg_retrieve_direct(AggState* aggstate); static void agg_fill_hash_table(AggState* aggstate); static TupleTableSlot* agg_retrieve_hash_table(AggState* aggstate); static TupleTableSlot* agg_retrieve(AggState* node); +static TupleTableSlot* agg_sort_group_retrieve_direct(AggState* aggstate); static bool prepare_data_source(AggState* node); static TupleTableSlot* fetch_input_tuple(AggState* aggstate); @@ -1518,12 +1519,13 @@ static TupleTableSlot* agg_retrieve(AggState* node) static TupleTableSlot* ExecAgg(PlanState* state) { AggState* node = castNode(AggState, state); + TupleTableSlot* slot = NULL; /* * just for cooperation analysis. do nothing if is_dummy is true. * is_dummy is true that means Agg node is deparsed to remote sql in ForeignScan node. */ if (((Agg*)node->ss.ps.plan)->is_dummy) { - TupleTableSlot* slot = ExecProcNode(outerPlanState(node)); + slot = ExecProcNode(outerPlanState(node)); return slot; } @@ -1533,27 +1535,34 @@ static TupleTableSlot* ExecAgg(PlanState* state) * expressions). If so, try to project another one. */ if (node->ss.ps.ps_vec_TupFromTlist) { - TupleTableSlot* result = NULL; ExprDoneCond isDone; - result = ExecProject(node->ss.ps.ps_ProjInfo, &isDone); + slot = ExecProject(node->ss.ps.ps_ProjInfo, &isDone); if (isDone == ExprMultipleResult) - return result; + return slot; /* Done with that source tuple... */ node->ss.ps.ps_vec_TupFromTlist = false; } - /* - * Exit if nothing left to do. - */ - if (node->agg_done) - return NULL; - - /* Dispatch based on strategy */ - if (((Agg*)node->ss.ps.plan)->aggstrategy == AGG_HASHED) - return agg_retrieve(node); - else - return agg_retrieve_direct(node); + if (!node->agg_done) { + /* Dispatch based on strategy */ + switch (((Agg*)node->ss.ps.plan)->aggstrategy) + { + case AGG_HASHED: + slot = agg_retrieve(node); + break; + case AGG_PLAIN: + case AGG_SORTED: + slot = agg_retrieve_direct(node); + break; + case AGG_SORT_GROUP: + slot = agg_sort_group_retrieve_direct(node); + break; + } + if (!TupIsNull(slot)) + return slot; + } + return NULL; } /* @@ -1856,6 +1865,156 @@ static TupleTableSlot* agg_retrieve_direct(AggState* aggstate) return NULL; } +/* + * ExecAgg for sort-group case + */ +static TupleTableSlot *agg_sort_group_retrieve_direct(AggState *aggstate) +{ + ExprContext *econtext = NULL; + ExprContext *tmpcontext = NULL; + AggStatePerAgg peragg; + AggStatePerGroup pergroup; + TupleTableSlot *outerslot = NULL; + TupleTableSlot *firstSlot = NULL; + TupleTableSlot *result = NULL; + + Assert(aggstate->phase->numsets == 0); + + /* + * get state info from node + * + * econtext is the per-output-tuple expression context + * tmpcontext is the per-input-tuple expression context + * + */ + econtext = aggstate->ss.ps.ps_ExprContext; + + tmpcontext = aggstate->tmpcontext; + peragg = aggstate->peragg; + pergroup = aggstate->pergroup; + firstSlot = aggstate->ss.ss_ScanTupleSlot; + + /* + * We loop retrieving groups until we find one matching + *aggstate->ss.ps.qual + * + * For grouping sets, we have the invariant that aggstate->projected_set + * is either -1 (initial call) or the index (starting from 0) in + * gset_lengths for the group we just completed (either by projecting a + * row or by discarding it in the qual). + * + * aggstate->ss.ps.qual + */ + while (!aggstate->agg_done) { + /* + * Clear the per-output-tuple context for each group, as well as + * aggcontext (which contains any pass-by-ref transvalues of the old + * group). Some aggregate functions store working state in child + * contexts; those now get reset automatically without us needing to + * do anything special. + * + * We use ReScanExprContext not just ResetExprContext because we want + * any registered shutdown callbacks to be called. That allows + * aggregate functions to ensure they've cleaned up any non-memory + * resources. + * + */ + ReScanExprContext(econtext); + + MemoryContextReset(aggstate->aggcontexts[0]); + + tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple; + + /* + * If we don't already have the first tuple of the new group, + * fetch it from the outer plan. + */ + if (aggstate->grp_firstTuple == NULL) { + outerslot = fetch_input_tuple(aggstate); + + if (!TupIsNull(outerslot)) { + /* + * Make a copy of the first input tuple; we will use this + * for comparisons (in group mode) and for projection. + */ + aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); + } else { + aggstate->agg_done = true; + return NULL; + } + } + + aggstate->new_group_trigger = false; /*reset new group trigger*/ + + /* + * Initialize working state for a new input tuple group. + */ + initialize_aggregates(aggstate, peragg, pergroup, 1); + + if (aggstate->grp_firstTuple != NULL) { + /* + * Store the copied first input tuple in the tuple table slot + * reserved for it. The tuple will be deleted when it is + * cleared from the slot. + */ + (void)ExecStoreTuple(aggstate->grp_firstTuple, firstSlot, InvalidBuffer, true); + aggstate->grp_firstTuple = NULL; /* don't keep two + * pointers */ + /* set up for first advance_aggregates call */ + tmpcontext->ecxt_outertuple = firstSlot; + + /* + * Process each outer-plan tuple, and then fetch the next one, + * until we exhaust the outer plan or cross a group boundary. + */ + for (;;) { + advance_aggregates(aggstate, pergroup); + + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(tmpcontext); + + outerslot = fetch_input_tuple(aggstate); + if (TupIsNull(outerslot)) { + aggstate->agg_done = true; + break; + } + /* set up for next advance_aggregates call */ + tmpcontext->ecxt_outertuple = outerslot; + + /* + * check whether we've new group + */ + if (aggstate->new_group_trigger) { + aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); + break; /*we've new group*/ + } + } + } + + /* + * Use the representative input tuple for any references to + * non-aggregated input columns in aggregate direct args, the node + * qual, and the tlist. + */ + econtext->ecxt_outertuple = firstSlot; + + prepare_projection_slot(aggstate, econtext->ecxt_outertuple, 0); + + finalize_aggregates(aggstate, peragg, pergroup, 0); + + /* + * If there's no row to project right now, we must continue rather + * than returning a null since there might be more groups. + */ + result = project_aggregates(aggstate); + if (result != NULL) + return result; + } + + /* No more groups */ + return NULL; +} + /* * ExecAgg for hashed case: phase 1, read input and build hash table */ @@ -2179,6 +2338,11 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) outerPlan = outerPlan(node); outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags); + if (node->aggstrategy == AGG_SORT_GROUP) { + SortGroupState* srotGroup = (SortGroupState*) outerPlanState(aggstate); + Assert(IsA(srotGroup, SortGroupState)); + srotGroup->new_group_trigger = &aggstate->new_group_trigger; + } /* * initialize source tuple type. @@ -2223,15 +2387,20 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) AggStatePerPhase phasedata = &aggstate->phases[phase]; Agg* aggnode = NULL; Sort* sortnode = NULL; + SortGroup* sortGroupNode = NULL; int num_sets; if (phase > 0) { aggnode = (Agg*)list_nth(node->chain, phase - 1); - sortnode = (Sort*)aggnode->plan.lefttree; - Assert(IsA(sortnode, Sort)); + if (aggnode->plan.lefttree) { + if (IsA(aggnode->plan.lefttree, Sort)) { + sortnode = castNode(Sort, aggnode->plan.lefttree); + } else if (IsA(aggnode->plan.lefttree, SortGroup)) { + sortGroupNode = castNode(SortGroup, aggnode->plan.lefttree); + } + } } else { aggnode = node; - sortnode = NULL; } phasedata->numsets = num_sets = list_length(aggnode->groupingSets); @@ -2265,7 +2434,7 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) /* * If we are grouping, precompute fmgr lookup data for inner loop. */ - if (aggnode->aggstrategy == AGG_SORTED) { + if (aggnode->aggstrategy == AGG_SORTED || aggnode->aggstrategy == AGG_SORT_GROUP) { Assert(aggnode->numCols > 0); phasedata->eqfunctions = execTuplesMatchPrepare(aggnode->numCols, aggnode->grpOperators); @@ -2276,6 +2445,7 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags) phasedata->aggstrategy = aggstate->aggstrategy; } phasedata->sortnode = sortnode; + phasedata->sortGroupNode = sortGroupNode; } /* diff --git a/src/gausskernel/runtime/executor/nodeLimit.cpp b/src/gausskernel/runtime/executor/nodeLimit.cpp index 131d95814..41a55a9be 100644 --- a/src/gausskernel/runtime/executor/nodeLimit.cpp +++ b/src/gausskernel/runtime/executor/nodeLimit.cpp @@ -308,12 +308,12 @@ void recompute_limits(LimitState* node) */ static void pass_down_bound(LimitState* node, PlanState* child_node) { + /* Note: if this overflows, we'll return a negative value, which is OK */ + int64 tuples_needed = node->noCount ? -1 : (node->count + node->offset); if (IsA(child_node, SortState) || IsA(child_node, VecSortState)) { SortState* sortState = (SortState*)child_node; - int64 tuples_needed = node->count + node->offset; - /* negative test checks for overflow in sum */ - if (node->noCount || tuples_needed < 0) { + if (tuples_needed < 0) { /* make sure flag gets reset if needed upon rescan */ sortState->bounded = false; } else { @@ -340,6 +340,14 @@ static void pass_down_bound(LimitState* node, PlanState* child_node) */ if (outerPlanState(child_node) && !expression_returns_set((Node*)child_node->plan->targetlist)) pass_down_bound(node, outerPlanState(child_node)); + } else if (IsA(child_node, AggState)) { + if (tuples_needed > 0) { + child_node = outerPlanState((AggState *)child_node); + if (IsA(child_node, SortGroupState)) { + SortGroupState *sortGroup = (SortGroupState *)child_node; + sortGroup->bound = tuples_needed; + } + } } } diff --git a/src/gausskernel/runtime/executor/nodeSortGroup.cpp b/src/gausskernel/runtime/executor/nodeSortGroup.cpp new file mode 100644 index 000000000..44c40a049 --- /dev/null +++ b/src/gausskernel/runtime/executor/nodeSortGroup.cpp @@ -0,0 +1,1040 @@ +/* ------------------------------------------------------------------------- + * + * nodeSortGroup.cpp + * Routines to handle sorting of relations + * + * Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd. + * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/gausskernel/runtime/executor/nodeSortGroup.cpp + * + * ------------------------------------------------------------------------- + * + * INTERFACE ROUTINES + * ExecInitSortGroup - Creates the run-time state information for the SortGroupState node + * ExecSortGroup - Groups and sorts tuples from the outer subtree of the node + * ExecEndSortGroup - shutdown node and subnodes + */ +#include "postgres.h" +#include "miscadmin.h" +#include "access/tableam.h" +#include "executor/executor.h" +#include "executor/node/nodeSortGroup.h" +#include "executor/tuptable.h" +#include "utils/sortsupport.h" +#include "utils/logtape.h" + +#define SKIPLIST_MAXLEVEL 32 /*max level of the skiplist, should be enough for 2^64 elements*/ +#define INITIAL_GROUP_SIZE 64 /*Initial size of memTuples array*/ + +/* ---------------- + * GroupNode + * + * save the tuples and grouping keys + * ---------------- + */ +typedef struct GroupNode { + MinimalTuple tuple; /* the tuple itself, note that the first tuple is hold in SortGroupStatePriv.maincontext */ + Datum datum1; /* value of first key column */ + bool isnull1; /* is first key column NULL? */ + + int memTuplesSize; /*current size of memTuples*/ + int memTuplesCap; /*max capacity size of memTuples*/ + MinimalTuple *memTuples; /*tuples belong to this group, when holding in memory*/ + + /* + * when the used memory is greater than work_mem, + * tuples need to be stored on LogicalTapeSet + */ + int tapeNum; /* which tapenum holds the tuple of this group */ + int tapeOffset; /*tapeBlocknum and tapeOffset save the beginning position of ths group*/ + long tapeBlocknum; +} GroupNode; + +/* ---------------- + * SkiplistNode + * ---------------- + */ +typedef struct SkiplistNode { + GroupNode *group; /*group holds all tuples in memory or LogicalTapeSet*/ + int height; /*height of this node*/ + struct SkiplistNode *backward; + struct SkiplistNode *forward[FLEXIBLE_ARRAY_MEMBER]; +} SkiplistNode; + +/* ---------------- + * Skiplist + * + * We use skiplist to perform group sort, each skiplist node holds a group + * and each group holds all tuples in memory or LogicalTapeSet + * + * The order of the grouping key in skiplist: + * header->forward[0] > header->forward[0]->forward[0] > ... > tail + * top-N > top-(N-1) > ... > top-1 + * when we insert a new group into the skiplist, and then the length of the skiplist exceeds max_groups, + * we will discatd one group that is not in the top-N groups. Discatds the head node is much easier than + * discatds the tail node in skiplist, and that is whay we put the top-N groups into skiplist in such order. + * ---------------- + */ +typedef struct Skiplist { + SkiplistNode *header; + SkiplistNode *tail; + int64 length; /*length of the Skiplist*/ + int height; /*max height of the Skiplist*/ +} Skiplist; + +typedef struct TupleIter { + SkiplistNode *currentNode; /* current skiplist node, which holds the tuples and grouping keys*/ + int64 memPos; /* current position in GroupNode.memTuples*/ + + /* current position in LogicalTapeSet */ + long tapeBlocknum; + int tapeOffset; +} TupleIter; + +/* + * Private state of a SortGroupState operation. + */ +typedef struct SortGroupStatePriv { + MemoryContext maincontext; /* memory context for group metadata*/ + MemoryContext tuplecontext; /* memory context for tuples in memory*/ + int nKeys; /* number of columns in sort key */ + + bool holdTupleInMem; /*is tuples are saved in memory, otherwise saved in LogicalTapeSet */ + int64 allowedMem; /* total memory allowed, in bytes*/ + int64 max_groups; /* max number groups in Skiplist*/ + + bool *new_group_trigger; /* set to be TRUE when We have a new group */ + + /* + * tupDesc is only used by the MinimalTuple and CLUSTER routines. + */ + TupleDesc tupDesc; + SortSupport sortKeys; /* array of length nKeys */ + Skiplist skiplist; /* skiplist holds top-N groups*/ + + LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ + int *freeTape; /* array of free tapenum in LogicalTapeSet*/ + int freeTapeSize; /*current size of freeTape*/ + int freeTapeCap; /*max capacity size of freeTape*/ + + TupleIter iter; /* iterator of reanding tuples, it saves the current reading position*/ +} SortGroupStatePriv; + +static void initSkiplist(Skiplist *skiplist); +static SkiplistNode *skiplistCreateNode(int height); +static inline int randomLevel(void); +static SkiplistNode *skiplistInsertGroupNode(Skiplist *skiplist, const TupleTableSlot *slot, SortGroupStatePriv *state); + +static SortGroupStatePriv *groupSortBegin(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, + Oid *sortCollations, bool *nullsFirstFlags, int64 max_groups, int workMem); +static void groupSortPutTupleslot(SortGroupStatePriv *state, TupleTableSlot *slot); +static void groupSortEnd(SortGroupStatePriv *state); +static void groupSortFinished(SortGroupStatePriv *state); +static void groupSortDiscardLastGroup(SortGroupStatePriv *state); +static void groupSortInittapes(SortGroupStatePriv *state); +static void groupSortSeekPos(SortGroupStatePriv *state); + +static void initGroupIter(SortGroupStatePriv *state); +static int groupNodeSlotCmp(const GroupNode *a, const TupleTableSlot *slot, SortGroupStatePriv *state); +static bool groupSortGetTupleSlot(SortGroupStatePriv *state, TupleTableSlot *slot); +static GroupNode *createGroupNode(SortGroupStatePriv *state, TupleTableSlot *slot); +static void writeMinimalTuple(LogicalTapeSet *lts, int tapenum, MinimalTuple tuple); +static MinimalTuple readMinimalTuple(SortGroupStatePriv *state, TupleIter *iter); +static void deleteGroupNode(SortGroupStatePriv *state, GroupNode *group); +static void groupNodePutTupleSlot(SortGroupStatePriv *state, GroupNode *group, TupleTableSlot *slot); +static void groupNodeInitTape(SortGroupStatePriv *state, GroupNode *group); +static void groupNodeAssignTape(SortGroupStatePriv *state, GroupNode *group); +static void groupNodeReleaseTape(SortGroupStatePriv *state, GroupNode *group); + +static TupleTableSlot *ExecSortGroup(PlanState *pstate); +static void dumpSortGroupState(SortGroupState *statee); + +static inline int64 groupSortUsedMemory(SortGroupStatePriv *state) +{ + AllocSetContext* maincontex = (AllocSetContext*)state->maincontext; + AllocSetContext* tuplecontext = (AllocSetContext*)state->tuplecontext; + int64 bytes = maincontex->totalSpace - maincontex->freeSpace; + + Assert(maincontex->header.type == T_AllocSetContext); + + if (tuplecontext) { + Assert(tuplecontext->header.type == T_AllocSetContext); + bytes += (tuplecontext->totalSpace - tuplecontext->freeSpace); + } + + return bytes; +} + +/* ---------------- + * initSkiplist + * + * init a new skiplist + * ---------------- + */ +void initSkiplist(Skiplist *skiplist) +{ + memset(skiplist, 0, sizeof(Skiplist)); + skiplist->height = 1; + skiplist->length = 0; + skiplist->header = skiplistCreateNode(SKIPLIST_MAXLEVEL); + skiplist->tail = NULL; +} + +/* ---------------- + * skiplistCreateNode + * + * create a new node for SkiplistNode + * ---------------- + */ +SkiplistNode *skiplistCreateNode(int height) +{ + size_t bytes = offsetof(SkiplistNode, forward) + height * sizeof(SkiplistNode *); + SkiplistNode *node = (SkiplistNode *)palloc0(bytes); + node->height = height; + return node; +} + +/* Returns a random level for the new skiplist node we are going to create. + * The return value of this function is between 1 and SKIPLIST_MAXLEVEL + * (both inclusive), with a powerlaw-alike distribution where higher + * levels are less likely to be returned. */ +static int randomLevel(void) +{ + /* Increase height with probability 1 in kBranching, P = 1/4 */ + static const unsigned int kBranching = 4; + long level = 1; + while (level < SKIPLIST_MAXLEVEL && ((random() % kBranching) == 0)) + level += 1; + Assert(level <= SKIPLIST_MAXLEVEL); + return level; +} + +/* ---------------- + * skiplistInsertGroupNode + * + * Insert a new node into the skiplist + * ---------------- + */ +static SkiplistNode *skiplistInsertGroupNode(Skiplist *skiplist, const TupleTableSlot *slot, SortGroupStatePriv *state) +{ + SkiplistNode *update[SKIPLIST_MAXLEVEL]; + SkiplistNode *x; + SkiplistNode *forward; + int i; + int height; + int result; + + x = skiplist->header; + for (i = skiplist->height - 1; i >= 0; i--) { + while (x->forward[i]) { + forward = x->forward[i]; + result = groupNodeSlotCmp(forward->group, slot, state); + if (result == 0) { + /* forward = tuple, not need to create new node */ + return forward; + } else if (result > 0) { + /* tuple < forward, keep looking forward */ + x = forward; + } else { + /* tuple > forward, end up looking forward*/ + break; + } + } + update[i] = x; /*record this level*/ + } + + height = randomLevel(); + x = skiplistCreateNode(height); + + if (height > skiplist->height) { + for (i = skiplist->height; i < height; i++) { + update[i] = skiplist->header; + } + skiplist->height = height; + } + + for (i = 0; i < height; i++) { + x->forward[i] = update[i]->forward[i]; + update[i]->forward[i] = x; + } + + x->backward = (update[0] == skiplist->header) ? NULL : update[0]; + if (x->forward[0]) { + x->forward[0]->backward = x; + } else { + skiplist->tail = x; + } + skiplist->length++; + return x; +} + +static SortGroupStatePriv *groupSortBegin(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, + Oid *sortCollations, bool *nullsFirstFlags, int64 max_groups, int workMem) +{ + SortGroupStatePriv *state; + MemoryContext maincontext; + MemoryContext oldcontext; + + /* + * This memory context holds the group metadata and group keys + */ + maincontext = AllocSetContextCreate(CurrentMemoryContext, "GroupSort main", ALLOCSET_DEFAULT_SIZES); + + /* + * Make the SortGroupStatePriv within the maincontext. This way, we + * don't need a separate pfree() operation for it at shutdown. + */ + oldcontext = MemoryContextSwitchTo(maincontext); + + state = (SortGroupStatePriv *)palloc0(sizeof(SortGroupStatePriv)); + state->maincontext = maincontext; + + /* + * This memory context holds tuples in memory + */ + state->tuplecontext = AllocSetContextCreate(maincontext, "GroupSort tuple", ALLOCSET_DEFAULT_SIZES); + state->nKeys = nkeys; + state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ + state->max_groups = max_groups; + state->holdTupleInMem = true; + + /* Prepare SortSupport data for each column */ + state->sortKeys = (SortSupport)palloc0(nkeys * sizeof(SortSupportData)); + + for (int i = 0; i < nkeys; i++) { + SortSupport sortKey = &state->sortKeys[i]; + + AssertArg(attNums[i] != 0); + AssertArg(sortOperators[i] != 0); + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = sortCollations[i]; + sortKey->ssup_nulls_first = nullsFirstFlags[i]; + sortKey->ssup_attno = attNums[i]; + sortKey->abbreviate = false; /*TODO: Convey if abbreviation optimization is applicable in principle*/ + + PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey); + } + + state->allowedMem = Max(workMem, 64) * (int64)1024; /*work_mem is forced to be at least 64KB*/ + + initSkiplist(&state->skiplist); + + MemoryContextSwitchTo(oldcontext); + return state; +} + +static void groupSortFinished(SortGroupStatePriv *state) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->maincontext); + + if (!state->holdTupleInMem) { + SkiplistNode *node = state->skiplist.header->forward[0]; + + /*prepare each group node to read*/ + while (node) { + GroupNode *group = node->group; + Assert(group->tapeNum >= 0); + LogicalTapeFreeze(state->tapeset, group->tapeNum, NULL); + + /*Rewind logical tape and switch from writing to reading*/ + LogicalTapeRewindForRead(state->tapeset, group->tapeNum, BLCKSZ); + + /*obtain beginning position of each group*/ + LogicalTapeTell(state->tapeset, group->tapeNum, &group->tapeBlocknum, &group->tapeOffset); + Assert(group->tapeOffset == 0); + node = node->forward[0]; + } + } + + initGroupIter(state); /*initialize the tuples iterator*/ + MemoryContextSwitchTo(oldcontext); +} + +static void initGroupIter(SortGroupStatePriv *state) +{ + TupleIter *iter = &state->iter; + + iter->memPos = 0; + + if (state->skiplist.tail) { + iter->currentNode = state->skiplist.tail; + iter->tapeBlocknum = iter->currentNode->group->tapeBlocknum; + iter->tapeOffset = iter->currentNode->group->tapeOffset; + groupSortSeekPos(state); + } +} + +/* + * Accept one tuple while collecting input data for group sort. + */ +static void groupSortPutTupleslot(SortGroupStatePriv *state, TupleTableSlot *slot) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->maincontext); + Skiplist *skiplist = &state->skiplist; + SkiplistNode *groupNode; + + if (unlikely(slot->tts_nvalid < state->nKeys)) { + tableam_tslot_getallattrs(slot); + } + + if (skiplist->length >= state->max_groups) { + SkiplistNode *top = skiplist->header->forward[0]; + int result; + Assert(skiplist->length == state->max_groups); + Assert(top); + + result = groupNodeSlotCmp(top->group, slot, state); + if (result < 0) { + /* tuple > top-N , new tuple is not in top-N groups, discard it*/ + return; + } else if (result == 0) { + groupNode = top; + } else { + /* find the suitable node to insert*/ + groupNode = skiplistInsertGroupNode(skiplist, slot, state); + } + } + else { + groupNode = skiplistInsertGroupNode(skiplist, slot, state); + } + + if (!groupNode->group) { + /*new group, create and init the groupNode->grou*/ + groupNode->group = createGroupNode(state, slot); + } + else { + groupNodePutTupleSlot(state, groupNode->group, slot); + } + + if (state->max_groups < skiplist->length) { + /*when the length of the skiplist exceeds max_groups, we discard the last group*/ + groupSortDiscardLastGroup(state); + } + + if (state->holdTupleInMem && groupSortUsedMemory(state) > state->allowedMem) { + /* memory exceeds allowedMem, switch to tape-based operation */ + groupSortInittapes(state); + + state->holdTupleInMem = false; + MemoryContextDelete(state->tuplecontext); + state->tuplecontext = NULL; + } + MemoryContextSwitchTo(oldcontext); +} + +/* + * groupSortDiscardLastGroup - discatd one group that is not in the top-N groups. + */ +static void groupSortDiscardLastGroup(SortGroupStatePriv *state) +{ + Skiplist *skiplist = &state->skiplist; + SkiplistNode *header = skiplist->header; + int i; + + SkiplistNode *old_top; + SkiplistNode *new_top; + + Assert(skiplist->length >= 1); + Assert(state->max_groups + 1 == skiplist->length); + + old_top = header->forward[0]; + new_top = old_top->forward[0]; + + for (i = 0; i < old_top->height; i++) { + header->forward[i] = old_top->forward[i]; + } + for (i = old_top->height; i < new_top->height; i++) { + header->forward[i] = new_top; + } + + deleteGroupNode(state, old_top->group); + pfree(old_top); + + new_top->backward = NULL; + skiplist->length--; +} + +/* + * deleteGroupNode - delete GroupNode and release its resources + */ +static void deleteGroupNode(SortGroupStatePriv *state, GroupNode *group) +{ + pfree(group->tuple); + + if (group->tapeNum >= 0) { + groupNodeReleaseTape(state, group); + } + + for (int i = 1; i < group->memTuplesSize; i++) { + pfree(group->memTuples[i]); + } + if (group->memTuples) + pfree(group->memTuples); + + pfree(group); +} + +/* + * createGroupNode - create and initialize a GroupNode. + * + * Note that GroupNode->tuple is under maincontext + */ +static GroupNode *createGroupNode(SortGroupStatePriv *state, TupleTableSlot *slot) +{ + GroupNode *group; + MinimalTuple tuple; + HeapTupleData htup; + Datum original; + + Assert(CurrentMemoryContext == state->maincontext); + + tuple = ExecCopySlotMinimalTuple(slot); /* copy the tuple into sort storage */ + + group = (GroupNode *)palloc0(sizeof(GroupNode)); + group->tuple = tuple; + + /* set up first-column key value */ + htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; + htup.t_data = (HeapTupleHeader)((char *)tuple - MINIMAL_TUPLE_OFFSET); + original = heap_getattr(&htup, state->sortKeys[0].ssup_attno, state->tupDesc, &group->isnull1); + + group->datum1 = original; + group->tapeNum = -1; + + if (state->holdTupleInMem) { + group->memTuples = (MinimalTuple *)palloc(INITIAL_GROUP_SIZE * sizeof(MinimalTuple)); + group->memTuplesCap = INITIAL_GROUP_SIZE; + group->memTuples[0] = tuple; + group->memTuplesSize = 1; + } else { + groupNodeAssignTape(state, group); + writeMinimalTuple(state->tapeset, group->tapeNum, tuple); + } + return group; +} + +/* + * groupSortInittapes - initialize for tape. + */ +static void groupSortInittapes(SortGroupStatePriv *state) +{ + SkiplistNode *node; + state->freeTapeCap = state->freeTapeSize = Max(state->skiplist.length, 6); + state->tapeset = LogicalTapeSetCreate(state->freeTapeSize, NULL, NULL, -1); + state->freeTape = (int *)palloc(state->freeTapeSize * sizeof(int)); + for (int i = 0; i < state->freeTapeSize; i++) { + state->freeTape[i] = i; + } + + node = state->skiplist.header->forward[0]; + while (node) { + groupNodeInitTape(state, node->group); + node = node->forward[0]; + } +} + + +/* ---------------------------------------------------------------- + * groupSortSeekPos + * + * saves current position. + * ---------------------------------------------------------------- + */ +static void groupSortSeekPos(SortGroupStatePriv *state) +{ + TupleIter *iter = &state->iter; + if (iter->currentNode && !state->holdTupleInMem) { + LogicalTapeSeek(state->tapeset, iter->currentNode->group->tapeNum, iter->tapeBlocknum, iter->tapeOffset); + } +} + +static void groupNodeInitTape(SortGroupStatePriv *state, GroupNode *group) +{ + Assert(CurrentMemoryContext == state->maincontext); + groupNodeAssignTape(state, group); + + for (int i = 0; i < group->memTuplesSize; i++) { + /* + * Dump all tuples. + */ + writeMinimalTuple(state->tapeset, group->tapeNum, group->memTuples[i]); + } + + /* discard all tuples in memory */ + group->memTuples = NULL; + group->memTuplesCap = 0; + group->memTuplesSize = 0; +} + + +/* + * Assign unused tapes to specified group, extending the tape set if + * necessary. + */ +static void groupNodeAssignTape(SortGroupStatePriv *state, GroupNode *group) +{ + int tapeNum; + Assert(group->tapeNum == -1); + + if (state->freeTapeSize == 0) { + int nAdditional = state->freeTapeCap; + LogicalTapeSetExtend(state->tapeset, nAdditional); + state->freeTapeSize = nAdditional; + state->freeTapeCap *= 2; + state->freeTape = (int *)repalloc(state->freeTape, state->freeTapeCap * sizeof(int)); + for (int i = 0; i < nAdditional; i++) { + state->freeTape[i] = nAdditional + i; + } + } + + tapeNum = state->freeTape[--state->freeTapeSize]; + + group->tapeNum = tapeNum; +} + +/* + * This function rewinds the logical tape to be free, adds its 'tapeNum' to the free list. + */ +static void groupNodeReleaseTape(SortGroupStatePriv *state, GroupNode *group) +{ + /* rewinding frees the buffer while not in use */ + LogicalTapeRewindForRead(state->tapeset, group->tapeNum, BLCKSZ); + LogicalTapeRewindForWrite(state->tapeset, group->tapeNum); + + Assert(state->freeTapeSize < state->freeTapeCap); + state->freeTape[state->freeTapeSize++] = group->tapeNum; +} + +/* + * Wirte a MinimalTuple into specified logical tape + */ +static void writeMinimalTuple(LogicalTapeSet *lts, int tapenum, MinimalTuple tuple) +{ + char *tupbody; + unsigned int tupbodylen; + unsigned int tuplen; + + /* the part of the MinimalTuple we'll write: */ + tupbody = (char *)tuple + MINIMAL_TUPLE_DATA_OFFSET; + tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET; + + /* total on-disk footprint: */ + tuplen = tupbodylen + sizeof(int); + + LogicalTapeWrite(lts, tapenum, (void *)&tuplen, sizeof(tuplen)); + LogicalTapeWrite(lts, tapenum, (void *)tupbody, tupbodylen); +} + +/* + * Wirte a MinimalTuple from current position + * + * Returns NULL if no more tuples + */ +static MinimalTuple readMinimalTuple(SortGroupStatePriv *state, TupleIter *iter) +{ + GroupNode *group; + MinimalTuple tuple; + unsigned int len; + unsigned int tupbodylen; + unsigned int tuplen; + char *tupbody; + size_t bytes; + MemoryContext oldcontext = MemoryContextSwitchTo(state->maincontext); + + group = iter->currentNode->group; + bytes = LogicalTapeRead(state->tapeset, group->tapeNum, &len, sizeof(len)); + if (bytes == 0) { + /*next group*/ + iter->currentNode = iter->currentNode->backward; + if (!iter->currentNode) { + tuple = NULL; + goto finished; + } + group = iter->currentNode->group; + iter->tapeBlocknum = group->tapeBlocknum; + iter->tapeOffset = group->tapeOffset; + iter->memPos = 0; + groupSortSeekPos(state); + + if (state->new_group_trigger) + *state->new_group_trigger = true; + bytes = LogicalTapeRead(state->tapeset, group->tapeNum, &len, sizeof(len)); + } + + if (bytes != sizeof(len)) { + elog(ERROR, "unexpected end of tape"); + } + + tupbodylen = len - sizeof(int); + tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; + + tuple = (MinimalTuple)palloc0(tuplen); + tuple->t_len = tuplen; + tupbody = (char *)tuple + MINIMAL_TUPLE_DATA_OFFSET; + + if (LogicalTapeRead(state->tapeset, group->tapeNum, tupbody, tupbodylen) != tupbodylen) { + elog(ERROR, "unexpected end of tape"); + } + +finished: + MemoryContextSwitchTo(oldcontext); + return tuple; +} + +/* + * Wirte a MinimalTuple into specified group + */ +static void groupNodePutTupleSlot(SortGroupStatePriv *state, GroupNode *group, TupleTableSlot *slot) +{ + MinimalTuple tuple; + + if (state->holdTupleInMem) { + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); + tuple = ExecCopySlotMinimalTuple(slot); + + if (group->memTuplesSize >= group->memTuplesCap) { + int newCap = group->memTuplesCap * 2; + group->memTuples = (MinimalTuple *)repalloc(group->memTuples, newCap * sizeof(MinimalTuple)); + group->memTuplesCap = newCap; + } + group->memTuples[group->memTuplesSize++] = tuple; + MemoryContextSwitchTo(oldcontext); + } else { + Assert(group->tapeNum >= 0); + tuple = ExecCopySlotMinimalTuple(slot); + writeMinimalTuple(state->tapeset, group->tapeNum, tuple); + pfree(tuple); + } +} + +/* + * Read a MinimalTuple from SortGroupStatePriv + * + * Returns flase if no more tuples + */ +static bool groupSortGetTupleSlot(SortGroupStatePriv *state, TupleTableSlot *slot) +{ + TupleIter *iter = &state->iter; + MinimalTuple tuple; + + if (!iter->currentNode) { + ExecClearTuple(slot); + return false; + } + + if (state->holdTupleInMem) { + if (iter->memPos >= iter->currentNode->group->memTuplesSize) { + /* next group*/ + iter->currentNode = iter->currentNode->backward; + if (!iter->currentNode) { + ExecClearTuple(slot); + return false; + } + if (state->new_group_trigger) + *state->new_group_trigger = true; + iter->memPos = 0; + } + + Assert(iter->currentNode); + Assert(iter->currentNode->group->memTuplesSize > iter->memPos); + + tuple = iter->currentNode->group->memTuples[iter->memPos]; + ExecStoreMinimalTuple(tuple, slot, false); + iter->memPos++; + return true; + } + + /*read MinimalTuple from tape*/ + Assert(iter->currentNode->group->tapeNum >= 0); + tuple = readMinimalTuple(state, iter); + if (!tuple) { + ExecClearTuple(slot); + return false; + } + + ExecStoreMinimalTuple(tuple, slot, true); + return true; +} + +/* + * Function to compare two tuples; result is per qsort() convention, ie: + * <0, 0, >0 according as ab. + */ +static int groupNodeSlotCmp(const GroupNode *a, const TupleTableSlot *b, SortGroupStatePriv *state) +{ + int nkey; + SortSupport sortKey = state->sortKeys; + AttrNumber attno; + int32 compare; + HeapTupleData ltup; + TupleDesc tupDesc; + Datum datum1; + bool isnull1; + Datum datum2; + bool isnull2; + + Assert(b->tts_nvalid >= state->nKeys); + tupDesc = state->tupDesc; + attno = sortKey->ssup_attno; + + /* Compare the leading sort key */ + compare = ApplySortComparator(a->datum1, a->isnull1, b->tts_values[attno - 1], b->tts_isnull[attno - 1], sortKey); + if (compare != 0) + return compare; + + /* Compare additional sort keys */ + ltup.t_len = ((MinimalTuple)a->tuple)->t_len + MINIMAL_TUPLE_OFFSET; + ltup.t_data = (HeapTupleHeader)((char *)a->tuple - MINIMAL_TUPLE_OFFSET); + + sortKey++; + for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++) { + attno = sortKey->ssup_attno; + + datum1 = heap_getattr(<up, attno, tupDesc, &isnull1); + datum2 = b->tts_values[attno - 1]; + isnull2 = b->tts_isnull[attno - 1]; + + compare = ApplySortComparator(datum1, isnull1, datum2, isnull2, sortKey); + if (compare != 0) + return compare; + } + + return 0; +} + +/* ---------------------------------------------------------------- + * ExecInitSortGroup + * + * Creates the run-time state information for the SortGroupState node + * produced by the planner and initializes its outer subtree. + * ---------------------------------------------------------------- + */ +SortGroupState *ExecInitSortGroup(SortGroup *node, EState *estate, int eflags) +{ + SortGroupState *sortGroupState; + + /* + * create state structure + */ + sortGroupState = makeNode(SortGroupState); + sortGroupState->ss.ps.plan = (Plan *)node; + sortGroupState->ss.ps.state = estate; + sortGroupState->ss.ps.ExecProcNode = ExecSortGroup; + + sortGroupState->bound = LONG_MAX; /*unlimited*/ + sortGroupState->sort_Done = false; + + /* check for unsupported flags */ + Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_BACKWARD))); + + outerPlanState(sortGroupState) = ExecInitNode(outerPlan(node), estate, eflags); + + /* + * Miscellaneous initialization + * + * SortGroupState nodes don't initialize their ExprContexts because they never call + * ExecQual or ExecProject. + */ + + /* + * Initialize return slot and type. No need to initialize projection info + * because this node doesn't do projections. + */ + ExecInitResultTupleSlot(estate, &sortGroupState->ss.ps); + ExecInitScanTupleSlot(estate, &sortGroupState->ss); + sortGroupState->ss.ps.ps_ProjInfo = NULL; + + /* + * Initialize scan slot and type. + */ + ExecAssignScanTypeFromOuterPlan(&sortGroupState->ss); + ExecAssignResultTypeFromTL(&sortGroupState->ss.ps, + sortGroupState->ss.ss_ScanTupleSlot->tts_tupleDescriptor->td_tam_ops); + + Assert(sortGroupState->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor->td_tam_ops); + return sortGroupState; +} + + +/* ---------------------------------------------------------------- + * ExecSortGroup + * + * Groups and sorts tuples from the outer subtree of the node using SortGroupStatePriv, + * which saves the results in a temporary file or memory. After the + * initial call, returns a tuple from the file with each call. + * ---------------------------------------------------------------- + */ +static TupleTableSlot *ExecSortGroup(PlanState *pstate) +{ + SortGroupState *node = castNode(SortGroupState, pstate); + EState *estate; + TupleTableSlot *slot; + SortGroupStatePriv *state = node->state; + + estate = node->ss.ps.state; + + CHECK_FOR_INTERRUPTS(); + + /* + * If first time through, read all tuples from outer plan and pass them to + * skiplist + */ + if (unlikely(!node->sort_Done)) { + PlanState *outerNode; + TupleDesc tupDesc; + SortGroup *plannode = (SortGroup *)node->ss.ps.plan; + + /* + * Want to scan subplan in the forward direction while creating the + * sorted data. + */ + estate->es_direction = ForwardScanDirection; + + outerNode = outerPlanState(node); + tupDesc = ExecGetResultType(outerNode); + + state = groupSortBegin(tupDesc, + plannode->numCols, + plannode->sortColIdx, + plannode->sortOperators, + plannode->collations, + plannode->nullsFirst, + node->bound, + u_sess->attr.attr_memory.work_mem); + state->new_group_trigger = node->new_group_trigger; + /* + * Scan the subplan and feed all the tuples to SortGroupStatePriv. + */ + for (;;) { + slot = ExecProcNode(outerNode); + + if (TupIsNull(slot)) + break; + + groupSortPutTupleslot(state, slot); + } + + /* + * finally set the sorted flag to true + */ + groupSortFinished(state); + node->sort_Done = true; + node->state = state; + dumpSortGroupState(node); /* dump memory or tape states info */ + } + + /* + * Get the first or next tuple from SortGroupStatePriv. Returns NULL if no more + * tuples. Note that we only rely on slot tuple remaining valid until the + * next fetch from the SortGroupStatePriv. + */ + slot = node->ss.ps.ps_ResultTupleSlot; + groupSortGetTupleSlot(state, slot); + return slot; +} + +/* + * groupSortEnd + * + * Release resources and clean up. + */ +static void groupSortEnd(SortGroupStatePriv *state) +{ + if (state->tapeset) + LogicalTapeSetClose(state->tapeset); + /* + * Free the main memory context, including the SortGroupStatePriv struct + * itself. + */ + MemoryContextDelete(state->maincontext); +} + + +/* ---------------------------------------------------------------- + * ExecEndSortGroup(node) + * ---------------------------------------------------------------- + */ +void ExecEndSortGroup(SortGroupState *node) +{ + /* + * clean out the tuple table + */ + ExecClearTuple(node->ss.ss_ScanTupleSlot); + /* must drop pointer to sort result tuple */ + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + + /* + * Release SortGroupStatePriv resources + */ + if (node->state) + groupSortEnd(node->state); + node->state = NULL; + + /* + * shut down the subplan + */ + ExecEndNode(outerPlanState(node)); +} + +/* ---------------------------------------------------------------- + * dumpSortGroupState + * + * save the space used information into SortGroupState, + * This information is used when EXPLAN ANALYZE. + * ---------------------------------------------------------------- + */ +static void dumpSortGroupState(SortGroupState *state) +{ + SortGroupStatePriv *priv = state->state; + if (priv->holdTupleInMem) { + state->spaceType = "Memory"; + state->spaceUsed = groupSortUsedMemory(priv); + } else { + state->spaceType = "Disk"; + state->spaceUsed = LogicalTapeSetBlocks(priv->tapeset) * BLCKSZ; + } +} + +void ExecReScanSortGroup(SortGroupState *node) +{ + PlanState *outerPlan = outerPlanState(node); + + /* + * If we haven't sorted yet, just return. If outerplan's chgParam is not + * NULL then it will be re-scanned by ExecProcNode, else no reason to + * re-scan it at all. + */ + if (!node->sort_Done) + return; + + /* must drop pointer to sort result tuple */ + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + + /* + * If subnode is to be rescanned then we forget previous sort results; we + * have to re-read the subplan and re-sort. Also must re-sort if the + * bounded-sort parameters changed. + * + * Otherwise we can just rewind and rescan the sorted output. + */ + if (outerPlan->chgParam != NULL || node->bound != node->state->max_groups) { + node->sort_Done = false; + groupSortEnd(node->state); + node->state = NULL; + + /* + * if chgParam of subnode is not null then plan will be re-scanned by + * first ExecProcNode. + */ + if (outerPlan->chgParam == NULL) + ExecReScan(outerPlan); + } else { + initGroupIter(node->state); + dumpSortGroupState(node); + } +} \ No newline at end of file diff --git a/src/include/executor/node/nodeAgg.h b/src/include/executor/node/nodeAgg.h index 6052d7fdb..3ee84d772 100644 --- a/src/include/executor/node/nodeAgg.h +++ b/src/include/executor/node/nodeAgg.h @@ -410,6 +410,7 @@ typedef struct AggStatePerPhaseData { FmgrInfo* eqfunctions; /* per-grouping-field equality fns */ Agg* aggnode; /* Agg node for phase data */ Sort* sortnode; /* Sort node for input ordering for phase */ + SortGroup* sortGroupNode; /* SortGroup node for input ordering for phase */ AggStrategy aggstrategy; /* strategy mode */ ExprState *evaltrans; /* evaluation of transition functions */ } AggStatePerPhaseData; diff --git a/src/include/executor/node/nodeSortGroup.h b/src/include/executor/node/nodeSortGroup.h new file mode 100644 index 000000000..315c74adf --- /dev/null +++ b/src/include/executor/node/nodeSortGroup.h @@ -0,0 +1,26 @@ +/* ------------------------------------------------------------------------- + * + * nodeSortGroup.h + * support for the openGauss executor module + * + * + * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * Portions Copyright (c) 2022, openGauss Contributors + * + * src/include/executor/nodeSortGroup.h + * + * ------------------------------------------------------------------------- + */ + +#ifndef NODESORTGROUP_H +#define NODESORTGROUP_H + + +#include "nodes/execnodes.h" + +extern SortGroupState *ExecInitSortGroup(SortGroup *node, EState *estate, int eflags); +extern void ExecEndSortGroup(SortGroupState *node); +extern void ExecReScanSortGroup(SortGroupState *node); + +#endif /*NODESORTGROUP_H*/ \ No newline at end of file diff --git a/src/include/knl/knl_guc/knl_session_attr_sql.h b/src/include/knl/knl_guc/knl_session_attr_sql.h index f4b3af22d..4076d6344 100644 --- a/src/include/knl/knl_guc/knl_session_attr_sql.h +++ b/src/include/knl/knl_guc/knl_session_attr_sql.h @@ -67,6 +67,7 @@ typedef struct knl_session_attr_sql { bool enable_sort; bool enable_compress_spill; bool enable_hashagg; + bool enable_sortgroup_agg; bool enable_material; bool enable_nestloop; bool enable_mergejoin; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 84b5538c6..fd3645e54 100755 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2412,6 +2412,21 @@ typedef struct SortState { int64* space_size; /* spill size for temp table */ } SortState; +struct SortGroupStatePriv; +/* ---------------- + * SortGroupState information + * ---------------- + */ +typedef struct SortGroupState { + ScanState ss; /* its first field is NodeTag */ + int64 bound; /* if bounded, how many group are needed */ + struct SortGroupStatePriv *state; /* private state of nodeSortGroup.c */ + bool sort_Done; /* sort completed yet? */ + bool *new_group_trigger; /* indicates new groups where returning tuples */ + const char *spaceType; /* type of space spaceUsed represents */ + int64 spaceUsed; /* space used for explain */ +} SortGroupState; + /* --------------------- * GroupState information * ------------------------- @@ -2455,6 +2470,7 @@ typedef struct AggState { AggStatePerAgg curperagg; /* identifies currently active aggregate */ bool input_done; /* indicates end of input */ bool agg_done; /* indicates completion of Agg scan */ + bool new_group_trigger; /* indicates new groups where returning tuples*/ int projected_set; /* The last projected grouping set */ int current_set; /* The current grouping set being evaluated */ Bitmapset* grouped_cols; /* grouped cols in current projection */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 3d1e2f16c..d2bffa50b 100755 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -75,6 +75,7 @@ typedef enum NodeTag { T_HashJoin, T_Material, T_Sort, + T_SortGroup, T_Group, T_Agg, T_WindowAgg, @@ -176,6 +177,7 @@ typedef enum NodeTag { T_HashJoinState, T_MaterialState, T_SortState, + T_SortGroupState, T_GroupState, T_AggState, T_WindowAggState, diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 705ad3dcb..abe766205 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -1160,6 +1160,19 @@ typedef struct Sort { OpMemInfo mem_info; /* Memory info for sort */ } Sort; +/* ---------------- + * SortGroup node + * ---------------- + */ +typedef struct SortGroup { + Plan plan; + int numCols; /* number of sort-key columns */ + AttrNumber *sortColIdx; /* their indexes in the target list */ + Oid *sortOperators; /* OIDs of operators to sort them by */ + Oid *collations; /* OIDs of collations */ + bool *nullsFirst; /* NULLS FIRST/LAST directions */ +} SortGroup; + typedef struct VecSort : public Sort { } VecSort; @@ -1196,7 +1209,8 @@ typedef struct VecGroup : public Group { typedef enum AggStrategy { AGG_PLAIN, /* simple agg across all input rows */ AGG_SORTED, /* grouped agg, input must be sorted */ - AGG_HASHED /* grouped agg, use internal hashtable */ + AGG_HASHED, /* grouped agg, use internal hashtable */ + AGG_SORT_GROUP /* grouped agg, use sort group */ } AggStrategy; #ifdef STREAMPLAN diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 983a7a30f..6df27ab0b 100755 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -385,6 +385,7 @@ typedef struct PlannerInfo { bool hasPseudoConstantQuals; /* true if any RestrictInfo has * pseudoconstant = true */ bool hasRecursion; /* true if planning a recursive WITH item */ + bool consider_sortgroup_agg; /*ture if consider to use SORT GROUP agg */ /* Note: qualSecurityLevel is zero if there are no securityQuals */ Index qualSecurityLevel; /* minimum security_level for quals */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index c420591c6..bc74c7f30 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -108,6 +108,8 @@ extern void cost_recursive_union(Plan* runion, Plan* nrterm, Plan* rterm); extern void cost_sort(Path* path, List* pathkeys, Cost input_cost, double tuples, int width, Cost comparison_cost, int sort_mem, double limit_tuples, bool col_store, int dop = 1, OpMemInfo* mem_info = NULL, bool index_sort = false); +extern void cost_sort_group(Path *path, PlannerInfo *root, Cost input_cost, double tuples, int width, + Cost comparison_cost, int sort_mem, double dNumGroups); extern void cost_merge_append(Path* path, PlannerInfo* root, List* pathkeys, int n_streams, Cost input_startup_cost, Cost input_total_cost, double tuples); extern void cost_material(Path* path, Cost input_startup_cost, Cost input_total_cost, double tuples, int width); diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 7673dd2b1..93ca09fde 100755 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -79,9 +79,12 @@ extern Sort* make_sort_from_pathkeys( PlannerInfo* root, Plan* lefttree, List* pathkeys, double limit_tuples, bool can_parallel = false); extern Sort* make_sort_from_sortclauses(PlannerInfo* root, List* sortcls, Plan* lefttree); extern Sort* make_sort_from_groupcols(PlannerInfo* root, List* groupcls, AttrNumber* grpColIdx, Plan* lefttree); +extern SortGroup* make_sort_group_from_groupcols(PlannerInfo* root, List* groupcls, AttrNumber* grpColIdx, Plan* lefttree, double dNumGroup); extern Sort* make_sort_from_targetlist(PlannerInfo* root, Plan* lefttree, double limit_tuples); extern Sort* make_sort(PlannerInfo* root, Plan* lefttree, int numCols, AttrNumber* sortColIdx, Oid* sortOperators, Oid* collations, bool* nullsFirst, double limit_tuples); +extern SortGroup* make_sortgroup(PlannerInfo* root, Plan* lefttree, int numCols, AttrNumber* sortColIdx, Oid* sortOperators, + Oid* collations, bool* nullsFirst, double dNumGroup); extern Agg* make_agg(PlannerInfo* root, List* tlist, List* qual, AggStrategy aggstrategy, const AggClauseCosts* aggcosts, int numGroupCols, AttrNumber* grpColIdx, Oid* grpOperators, Oid* grp_collations, long numGroups, Plan* lefttree, WindowLists* wflists, bool need_stream, bool trans_agg, List* groupingSets = NIL, diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index 5b5174450..7418e50e0 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -64,6 +64,7 @@ extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size); extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum); extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share = NULL); +extern void LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional); extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size); extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, long blocknum, int offset); extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset); diff --git a/src/test/regress/expected/sortgroup_agg.out b/src/test/regress/expected/sortgroup_agg.out new file mode 100644 index 000000000..443eb1a1f --- /dev/null +++ b/src/test/regress/expected/sortgroup_agg.out @@ -0,0 +1,433 @@ +create schema sortgroupagg; +set search_path=sortgroupagg; +create table tbl_10k(id bigint, v1 numeric, v2 char(150)); +insert into tbl_10k select generate_series(1, 10 * 1000), (RANDOM() * 67)::int::numeric + 10e-100, (RANDOM() * 77)::int::numeric+10e-100; +analyze tbl_10k; +set enable_sortgroup_agg=on; +explain (costs off) select sum(id), v1,v2 from tbl_10k group by v1,v2 order by v1,v2 limit 1; + QUERY PLAN +---------------------------------------- + Limit + -> GroupAggregate + Group By Key: v1, v2 + -> Group Sort + Sorted Group Key: v1, v2 + -> Seq Scan on tbl_10k +(6 rows) + +-- order keys are not contained in group keys, needs sorts after aggregation +explain (costs off) select sum(id), v1,v2 from tbl_10k group by v1,v2 order by v1,sum(id) limit 10; + QUERY PLAN +---------------------------------------------- + Limit + -> Sort + Sort Key: v1, (sum(id)) + -> GroupAggregate + Group By Key: v1, v2 + -> Group Sort + Sorted Group Key: v1, v2 + -> Seq Scan on tbl_10k +(8 rows) + +create table agg_1 as +select sum(id), v1,v2 from tbl_10k group by v1,v2 order by v1,sum(id) limit 10 offset 11; +set enable_sortgroup_agg=off; +create table agg_2 as +select sum(id), v1,v2 from tbl_10k group by v1,v2 order by v1,sum(id) limit 10 offset 11; +-- Compare results to hash aggregation results +(select * from agg_1 except select * from agg_2) + union all +(select * from agg_2 except select * from agg_1); + sum | v1 | v2 +-----+----+---- +(0 rows) + +drop table agg_1, agg_2; +set enable_sortgroup_agg=on; +-- In the following cases, we cannot perform sortgroup +-- 1. plain agg +explain (costs off) select count(*) from tbl_10k limit 1; + QUERY PLAN +--------------------------------- + Limit + -> Aggregate + -> Seq Scan on tbl_10k +(3 rows) + +-- 2. HAVING clauses +explain (costs off) select sum(id), v1,v2 from tbl_10k group by v1,v2 having v1+v2>0 order by v1,v2 limit 1; + QUERY PLAN +----------------------------------------------------------------- + Limit + -> Sort + Sort Key: v1, v2 + -> HashAggregate + Group By Key: v1, v2 + -> Seq Scan on tbl_10k + Filter: ((v1 + (v2)::numeric) > 0::numeric) +(7 rows) + +--3. distinct +explain (costs off) select distinct(v1,v2) from tbl_10k group by v1,v2 limit 1; + QUERY PLAN +--------------------------------------------- + Limit + -> Unique + -> Sort + Sort Key: (ROW(v1, v2)) + -> HashAggregate + Group By Key: v1, v2 + -> Seq Scan on tbl_10k +(7 rows) + +--4. grouping sets +explain (costs off) select sum(v1),v1 from tbl_10k group by grouping sets((v1),(v2)) order by v1 desc limit 1; + QUERY PLAN +-------------------------------------------------- + Limit + -> Sort + Sort Key: v1 DESC + -> GroupAggregate + Group By Key: v1 + Sort Key: v2 + Group By Key: v2 + -> Sort + Sort Key: v1 DESC NULLS LAST + -> Seq Scan on tbl_10k +(10 rows) + +-- 5. winows +explain (costs off) SELECT v1, avg(v1) OVER (PARTITION BY v2) FROM tbl_10k group by v1, v2 order by v1,v2 limit 1; + QUERY PLAN +--------------------------------------------------- + Limit + -> Sort + Sort Key: v1, v2 + -> WindowAgg + -> Sort + Sort Key: v2 + -> HashAggregate + Group By Key: v1, v2 + -> Seq Scan on tbl_10k +(9 rows) + +--6. no LIMIT cluases +explain (costs off) select sum(id), v1,v2 from tbl_10k group by v1,v2 order by v1,v2; + QUERY PLAN +--------------------------------- + Sort + Sort Key: v1, v2 + -> HashAggregate + Group By Key: v1, v2 + -> Seq Scan on tbl_10k +(5 rows) + +set enable_hashagg =off; +set enable_sort=off; +-- GROUP BY single key +explain (costs off) select avg(v2), v2 from tbl_10k group by v2 order by v2 limit 1000 offset 10; + QUERY PLAN +--------------------------------------- + Limit + -> GroupAggregate + Group By Key: v2 + -> Group Sort + Sorted Group Key: v2 + -> Seq Scan on tbl_10k +(6 rows) + +create table agg_sortgroup_1 as +select avg(v2), v2 from tbl_10k group by v2 order by v2 limit 1000 offset 10; +create table agg_sortgroup_2 as +select avg(v1), v1 from tbl_10k group by v1 order by v1 desc limit 1000 offset 10; +set work_mem =64; +create table agg_sortgroup_disk_1 as +select avg(v2), v2 from tbl_10k group by v2 order by v2 limit 1000 offset 10; +create table agg_sortgroup_disk_2 as +select avg(v1), v1 from tbl_10k group by v1 order by v1 desc limit 1000 offset 10; +set work_mem =default; +set enable_hashagg =on; +set enable_sortgroup_agg=off; +set enable_sort=on; +create table agg_hashagg_1 as +select avg(v2), v2 from tbl_10k group by v2 order by v2 limit 1000 offset 10; +create table agg_hashagg_2 as +select avg(v1), v1 from tbl_10k group by v1 order by v1 desc limit 1000 offset 10; +-- Compare results to hash aggregation results +(select * from agg_sortgroup_1 except select * from agg_hashagg_1) + union all +(select * from agg_hashagg_1 except select * from agg_sortgroup_1); + avg | v2 +-----+---- +(0 rows) + +(select * from agg_sortgroup_2 except select * from agg_hashagg_2) + union all +(select * from agg_hashagg_2 except select * from agg_sortgroup_2); + avg | v1 +-----+---- +(0 rows) + +(select * from agg_sortgroup_disk_1 except select * from agg_hashagg_1) + union all +(select * from agg_hashagg_1 except select * from agg_sortgroup_disk_1); + avg | v2 +-----+---- +(0 rows) + +(select * from agg_sortgroup_disk_2 except select * from agg_hashagg_2) + union all +(select * from agg_hashagg_2 except select * from agg_sortgroup_disk_2); + avg | v1 +-----+---- +(0 rows) + +drop table agg_sortgroup_1,agg_sortgroup_2,agg_hashagg_1,agg_hashagg_2, agg_sortgroup_disk_1, agg_sortgroup_disk_2; +-- GROUP BY multiple keys +set enable_sortgroup_agg=on; +set enable_hashagg =off; +set enable_sort=off; +explain (costs off) select sum(v2+v1), v2,v1 from tbl_10k group by v2,v1 order by v2 desc ,v1 asc limit 1000 offset 10; + QUERY PLAN +--------------------------------------------- + Limit + -> GroupAggregate + Group By Key: v2, v1 + -> Group Sort + Sorted Group Key: v2 DESC, v1 + -> Seq Scan on tbl_10k +(6 rows) + +create table agg_sortgroup_1 as +select sum(v2+v1), v2,v1 from tbl_10k group by v2,v1 order by v2 desc ,v1 asc limit 1000 offset 10; +create table agg_sortgroup_2 as +select sum(v2+v1), v2,v1 from tbl_10k group by v1,v2 order by v1 asc ,v2 desc limit 1000 offset 10; +set work_mem =64; +create table agg_sortgroup_disk_1 as +select sum(v2+v1), v2,v1 from tbl_10k group by v2,v1 order by v2 desc ,v1 asc limit 1000 offset 10; +create table agg_sortgroup_disk_2 as +select sum(v2+v1), v2,v1 from tbl_10k group by v1,v2 order by v1 asc ,v2 desc limit 1000 offset 10; +set work_mem =default; +set enable_sortgroup_agg=off; +set enable_hashagg =on; +set enable_sort=on; +create table agg_hashagg_1 as +select sum(v2+v1), v2,v1 from tbl_10k group by v2,v1 order by v2 desc ,v1 asc limit 1000 offset 10; +create table agg_hashagg_2 as +select sum(v2+v1), v2,v1 from tbl_10k group by v1,v2 order by v1 asc ,v2 desc limit 1000 offset 10; +-- Compare results to hash aggregation results +(select * from agg_sortgroup_1 except select * from agg_hashagg_1) + union all +(select * from agg_hashagg_1 except select * from agg_sortgroup_1); + sum | v2 | v1 +-----+----+---- +(0 rows) + +(select * from agg_sortgroup_2 except select * from agg_hashagg_2) + union all +(select * from agg_hashagg_2 except select * from agg_sortgroup_2); + sum | v2 | v1 +-----+----+---- +(0 rows) + +(select * from agg_sortgroup_disk_1 except select * from agg_hashagg_1) + union all +(select * from agg_hashagg_1 except select * from agg_sortgroup_disk_1); + sum | v2 | v1 +-----+----+---- +(0 rows) + +(select * from agg_sortgroup_disk_2 except select * from agg_hashagg_2) + union all +(select * from agg_hashagg_2 except select * from agg_sortgroup_disk_2); + sum | v2 | v1 +-----+----+---- +(0 rows) + +drop table agg_sortgroup_1,agg_sortgroup_2,agg_hashagg_1,agg_hashagg_2,agg_sortgroup_disk_1, agg_sortgroup_disk_2; +-- already sorted, we don't consider sortgroup aggregation +set enable_sortgroup_agg=on; +set enable_hashagg =off; +set enable_sort=off; +explain (costs off) select avg(v1), v1 from tbl_10k group by v1 order by v1 limit 1; + QUERY PLAN +--------------------------------------- + Limit + -> GroupAggregate + Group By Key: v1 + -> Group Sort + Sorted Group Key: v1 + -> Seq Scan on tbl_10k +(6 rows) + +set enable_seqscan=off; +create index v1_index on tbl_10k (v1); +analyze tbl_10k; +explain (costs off) select avg(v1), v1 from tbl_10k group by v1 order by v1 limit 1; + QUERY PLAN +------------------------------------------------------- + Limit + -> GroupAggregate + Group By Key: v1 + -> Index Only Scan using v1_index on tbl_10k +(4 rows) + +drop index v1_index; +set enable_seqscan=on; +-- test ExecReScanSortGroup +set enable_sortgroup_agg=on; +set enable_hashagg =off; +set enable_sort=off; +set enable_material =off; +explain (costs off) +WITH t1 AS ( + SELECT v1::int % 10 as a1, SUM(id) as b1 + FROM tbl_10k + GROUP BY v1::int % 10 order by v1::int % 10 limit 100 +), t2 AS ( + SELECT v2::char(5)::numeric::int % 10 as a2, SUM(id) as b2 + FROM tbl_10k + GROUP BY v2::char(5)::numeric::int % 10 order by v2::char(5)::numeric::int % 10 limit 101 +) +select a1, a2, b1+b2 from t1 inner join t2 on (b1 + b2 > 10); + QUERY PLAN +-------------------------------------------------------------------------------------------------------------- + Nested Loop + Join Filter: (((sum(sortgroupagg.tbl_10k.id)) + (sum(sortgroupagg.tbl_10k.id))) > 10::numeric) + -> Limit + -> GroupAggregate + Group By Key: (((sortgroupagg.tbl_10k.v1)::integer % 10)) + -> Group Sort + Sorted Group Key: (((sortgroupagg.tbl_10k.v1)::integer % 10)) + -> Seq Scan on tbl_10k + -> Limit + -> GroupAggregate + Group By Key: (((((sortgroupagg.tbl_10k.v2)::character(5))::numeric)::integer % 10)) + -> Group Sort + Sorted Group Key: (((((sortgroupagg.tbl_10k.v2)::character(5))::numeric)::integer % 10)) + -> Seq Scan on tbl_10k +(14 rows) + +create table mem_rescan_1 as +WITH t1 AS ( + SELECT v1::int % 10 as a1, SUM(id) as b1 + FROM tbl_10k + GROUP BY v1::int % 10 order by v1::int % 10 limit 100 +), t2 AS ( + SELECT v2::char(5)::numeric::int % 10 as a2, SUM(id) as b2 + FROM tbl_10k + GROUP BY v2::char(5)::numeric::int % 10 order by v2::char(5)::numeric::int % 10 limit 101 +) +select a1, a2, b1+b2 from t1 inner join t2 on (b1 + b2 > 10); +set work_mem =64; +create table disk_rescan_1 as +WITH t1 AS ( + SELECT v1::int % 10 as a1, SUM(id) as b1 + FROM tbl_10k + GROUP BY v1::int % 10 order by v1::int % 10 limit 100 +), t2 AS ( + SELECT v2::char(5)::numeric::int % 10 as a2, SUM(id) as b2 + FROM tbl_10k + GROUP BY v2::char(5)::numeric::int % 10 order by v2::char(5)::numeric::int % 10 limit 101 +) +select a1, a2, b1+b2 from t1 inner join t2 on (b1 + b2 > 10); +-- Compare results between MEMORY SORT and DISK SORT +(select * from mem_rescan_1 except select * from disk_rescan_1) + union all +(select * from disk_rescan_1 except select * from mem_rescan_1); + a1 | a2 | ?column? +----+----+---------- +(0 rows) + +set work_mem =default; +set enable_sortgroup_agg=off; +set enable_hashagg =on; +set enable_sort=on; +create table hashagg_rescan_1 as +WITH t1 AS ( + SELECT v1::int % 10 as a1, SUM(id) as b1 + FROM tbl_10k + GROUP BY v1::int % 10 order by v1::int % 10 limit 100 +), t2 AS ( + SELECT v2::char(5)::numeric::int % 10 as a2, SUM(id) as b2 + FROM tbl_10k + GROUP BY v2::char(5)::numeric::int % 10 order by v2::char(5)::numeric::int % 10 limit 101 +) +select a1, a2, b1+b2 from t1 inner join t2 on (b1 + b2 > 10); +-- Compare results to hash aggregation results +-- hashagg_rescan_1 = mem_rescan_1 = disk_rescan_1 +(select * from mem_rescan_1 except select * from hashagg_rescan_1) + union all +(select * from hashagg_rescan_1 except select * from mem_rescan_1); + a1 | a2 | ?column? +----+----+---------- +(0 rows) + +drop table mem_rescan_1,hashagg_rescan_1,disk_rescan_1; +drop table tbl_10k; +create table tbl_cstore_10k(id bigint, v1 numeric, v2 numeric) with (orientation = column); +insert into tbl_cstore_10k select generate_series(1, 10 * 1000), (RANDOM() * 67)::int::numeric, (RANDOM() * 77)::int::numeric; +analyze tbl_cstore_10k; +set enable_sortgroup_agg=on; +set enable_hashagg =off; +set enable_sort=off; +explain (costs off) select sum(id), v1,v2 from tbl_cstore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; + QUERY PLAN +------------------------------------------------------- + Limit + -> GroupAggregate + Group By Key: v1, v2 + -> Group Sort + Sorted Group Key: v1, v2 + -> Row Adapter + -> CStore Scan on tbl_cstore_10k +(7 rows) + +create table agg_sortgroup_1 as +select sum(id), v1,v2 from tbl_cstore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; +set enable_sortgroup_agg=off; +set enable_hashagg =on; +set enable_sort=on; +create table agg_vecagg_1 as +select sum(id), v1,v2 from tbl_cstore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; +(select * from agg_sortgroup_1 except select * from agg_vecagg_1) + union all +(select * from agg_vecagg_1 except select * from agg_sortgroup_1); + sum | v1 | v2 +-----+----+---- +(0 rows) + +drop table tbl_cstore_10k, agg_sortgroup_1,agg_vecagg_1; +create table tbl_ustore_10k(id bigint, v1 numeric, v2 numeric) with (storage_type=ustore); +insert into tbl_ustore_10k select generate_series(1, 10 * 1000), (RANDOM() * 67)::int::numeric, (RANDOM() * 77)::int::numeric; +analyze tbl_ustore_10k; +set enable_sortgroup_agg=on; +set enable_hashagg =off; +set enable_sort=off; +explain (costs off) select sum(id), v1,v2 from tbl_ustore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; + QUERY PLAN +---------------------------------------------- + Limit + -> GroupAggregate + Group By Key: v1, v2 + -> Group Sort + Sorted Group Key: v1, v2 + -> Seq Scan on tbl_ustore_10k +(6 rows) + +create table agg_sortgroup_1 as +select sum(id), v1,v2 from tbl_ustore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; +set enable_sortgroup_agg=off; +set enable_hashagg =on; +set enable_sort=on; +create table agg_hashagg_1 as +select sum(id), v1,v2 from tbl_ustore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; +(select * from agg_sortgroup_1 except select * from agg_hashagg_1) + union all +(select * from agg_hashagg_1 except select * from agg_sortgroup_1); + sum | v1 | v2 +-----+----+---- +(0 rows) + +drop table tbl_ustore_10k, agg_sortgroup_1,agg_hashagg_1; +drop schema sortgroupagg cascade; diff --git a/src/test/regress/output/recovery_2pc_tools.source b/src/test/regress/output/recovery_2pc_tools.source index 6bf0da51e..c6a175cdd 100644 --- a/src/test/regress/output/recovery_2pc_tools.source +++ b/src/test/regress/output/recovery_2pc_tools.source @@ -318,6 +318,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c enable_sonic_hashjoin | bool | | | enable_sonic_optspill | bool | | | enable_sort | bool | | | + enable_sortgroup_agg | bool | | | enable_startwith_debug | bool | | | enable_stmt_track | bool | | | enable_stream_replication | bool | | | diff --git a/src/test/regress/parallel_schedule0 b/src/test/regress/parallel_schedule0 index 99c62efa3..d867a8f4c 100644 --- a/src/test/regress/parallel_schedule0 +++ b/src/test/regress/parallel_schedule0 @@ -822,6 +822,7 @@ test: vec_window_001 #test: vec_window_002 test: vec_numeric_sop_1 vec_numeric_sop_2 vec_numeric_sop_3 vec_numeric_sop_4 vec_numeric_sop_5 #test: vec_window_end +test: sortgroup_agg test: vec_unique_pre vec_bitmap_prepare test: vec_unique vec_setop_001 vec_setop_002 vec_setop_003 vec_setop_004 hw_vec_int4 hw_vec_int8 hw_vec_float4 hw_vec_float8 diff --git a/src/test/regress/pg_regress.cpp b/src/test/regress/pg_regress.cpp index db1f0b4ac..365275fe1 100644 --- a/src/test/regress/pg_regress.cpp +++ b/src/test/regress/pg_regress.cpp @@ -5461,7 +5461,7 @@ static void check_global_variables() } } -#define BASE_PGXC_LIKE_MACRO_NUM 1392 +#define BASE_PGXC_LIKE_MACRO_NUM 1420 static void check_pgxc_like_macros() { #ifdef BUILD_BY_CMAKE diff --git a/src/test/regress/sql/sortgroup_agg.sql b/src/test/regress/sql/sortgroup_agg.sql new file mode 100644 index 000000000..b8232a98c --- /dev/null +++ b/src/test/regress/sql/sortgroup_agg.sql @@ -0,0 +1,293 @@ +create schema sortgroupagg; +set search_path=sortgroupagg; + +create table tbl_10k(id bigint, v1 numeric, v2 char(150)); +insert into tbl_10k select generate_series(1, 10 * 1000), (RANDOM() * 67)::int::numeric + 10e-100, (RANDOM() * 77)::int::numeric+10e-100; +analyze tbl_10k; + +set enable_sortgroup_agg=on; + +explain (costs off) select sum(id), v1,v2 from tbl_10k group by v1,v2 order by v1,v2 limit 1; + +-- order keys are not contained in group keys, needs sorts after aggregation +explain (costs off) select sum(id), v1,v2 from tbl_10k group by v1,v2 order by v1,sum(id) limit 10; +create table agg_1 as +select sum(id), v1,v2 from tbl_10k group by v1,v2 order by v1,sum(id) limit 10 offset 11; + +set enable_sortgroup_agg=off; +create table agg_2 as +select sum(id), v1,v2 from tbl_10k group by v1,v2 order by v1,sum(id) limit 10 offset 11; + +-- Compare results to hash aggregation results +(select * from agg_1 except select * from agg_2) + union all +(select * from agg_2 except select * from agg_1); + +drop table agg_1, agg_2; + + +set enable_sortgroup_agg=on; +-- In the following cases, we cannot perform sortgroup + +-- 1. plain agg +explain (costs off) select count(*) from tbl_10k limit 1; + +-- 2. HAVING clauses +explain (costs off) select sum(id), v1,v2 from tbl_10k group by v1,v2 having v1+v2>0 order by v1,v2 limit 1; + +--3. distinct +explain (costs off) select distinct(v1,v2) from tbl_10k group by v1,v2 limit 1; + +--4. grouping sets +explain (costs off) select sum(v1),v1 from tbl_10k group by grouping sets((v1),(v2)) order by v1 desc limit 1; + +-- 5. winows +explain (costs off) SELECT v1, avg(v1) OVER (PARTITION BY v2) FROM tbl_10k group by v1, v2 order by v1,v2 limit 1; + +--6. no LIMIT cluases +explain (costs off) select sum(id), v1,v2 from tbl_10k group by v1,v2 order by v1,v2; + +set enable_hashagg =off; +set enable_sort=off; + +-- GROUP BY single key +explain (costs off) select avg(v2), v2 from tbl_10k group by v2 order by v2 limit 1000 offset 10; + +create table agg_sortgroup_1 as +select avg(v2), v2 from tbl_10k group by v2 order by v2 limit 1000 offset 10; +create table agg_sortgroup_2 as +select avg(v1), v1 from tbl_10k group by v1 order by v1 desc limit 1000 offset 10; + +set work_mem =64; +create table agg_sortgroup_disk_1 as +select avg(v2), v2 from tbl_10k group by v2 order by v2 limit 1000 offset 10; +create table agg_sortgroup_disk_2 as +select avg(v1), v1 from tbl_10k group by v1 order by v1 desc limit 1000 offset 10; +set work_mem =default; + +set enable_hashagg =on; +set enable_sortgroup_agg=off; +set enable_sort=on; + +create table agg_hashagg_1 as +select avg(v2), v2 from tbl_10k group by v2 order by v2 limit 1000 offset 10; +create table agg_hashagg_2 as +select avg(v1), v1 from tbl_10k group by v1 order by v1 desc limit 1000 offset 10; + +-- Compare results to hash aggregation results +(select * from agg_sortgroup_1 except select * from agg_hashagg_1) + union all +(select * from agg_hashagg_1 except select * from agg_sortgroup_1); + +(select * from agg_sortgroup_2 except select * from agg_hashagg_2) + union all +(select * from agg_hashagg_2 except select * from agg_sortgroup_2); + +(select * from agg_sortgroup_disk_1 except select * from agg_hashagg_1) + union all +(select * from agg_hashagg_1 except select * from agg_sortgroup_disk_1); + +(select * from agg_sortgroup_disk_2 except select * from agg_hashagg_2) + union all +(select * from agg_hashagg_2 except select * from agg_sortgroup_disk_2); + +drop table agg_sortgroup_1,agg_sortgroup_2,agg_hashagg_1,agg_hashagg_2, agg_sortgroup_disk_1, agg_sortgroup_disk_2; + +-- GROUP BY multiple keys +set enable_sortgroup_agg=on; +set enable_hashagg =off; +set enable_sort=off; + +explain (costs off) select sum(v2+v1), v2,v1 from tbl_10k group by v2,v1 order by v2 desc ,v1 asc limit 1000 offset 10; + +create table agg_sortgroup_1 as +select sum(v2+v1), v2,v1 from tbl_10k group by v2,v1 order by v2 desc ,v1 asc limit 1000 offset 10; +create table agg_sortgroup_2 as +select sum(v2+v1), v2,v1 from tbl_10k group by v1,v2 order by v1 asc ,v2 desc limit 1000 offset 10; + +set work_mem =64; +create table agg_sortgroup_disk_1 as +select sum(v2+v1), v2,v1 from tbl_10k group by v2,v1 order by v2 desc ,v1 asc limit 1000 offset 10; +create table agg_sortgroup_disk_2 as +select sum(v2+v1), v2,v1 from tbl_10k group by v1,v2 order by v1 asc ,v2 desc limit 1000 offset 10; +set work_mem =default; + +set enable_sortgroup_agg=off; +set enable_hashagg =on; +set enable_sort=on; + +create table agg_hashagg_1 as +select sum(v2+v1), v2,v1 from tbl_10k group by v2,v1 order by v2 desc ,v1 asc limit 1000 offset 10; +create table agg_hashagg_2 as +select sum(v2+v1), v2,v1 from tbl_10k group by v1,v2 order by v1 asc ,v2 desc limit 1000 offset 10; + +-- Compare results to hash aggregation results +(select * from agg_sortgroup_1 except select * from agg_hashagg_1) + union all +(select * from agg_hashagg_1 except select * from agg_sortgroup_1); + +(select * from agg_sortgroup_2 except select * from agg_hashagg_2) + union all +(select * from agg_hashagg_2 except select * from agg_sortgroup_2); + + +(select * from agg_sortgroup_disk_1 except select * from agg_hashagg_1) + union all +(select * from agg_hashagg_1 except select * from agg_sortgroup_disk_1); + +(select * from agg_sortgroup_disk_2 except select * from agg_hashagg_2) + union all +(select * from agg_hashagg_2 except select * from agg_sortgroup_disk_2); + +drop table agg_sortgroup_1,agg_sortgroup_2,agg_hashagg_1,agg_hashagg_2,agg_sortgroup_disk_1, agg_sortgroup_disk_2; + +-- already sorted, we don't consider sortgroup aggregation +set enable_sortgroup_agg=on; +set enable_hashagg =off; +set enable_sort=off; + +explain (costs off) select avg(v1), v1 from tbl_10k group by v1 order by v1 limit 1; + +set enable_seqscan=off; +create index v1_index on tbl_10k (v1); +analyze tbl_10k; + +explain (costs off) select avg(v1), v1 from tbl_10k group by v1 order by v1 limit 1; +drop index v1_index; +set enable_seqscan=on; + + +-- test ExecReScanSortGroup + +set enable_sortgroup_agg=on; +set enable_hashagg =off; +set enable_sort=off; +set enable_material =off; + +explain (costs off) +WITH t1 AS ( + SELECT v1::int % 10 as a1, SUM(id) as b1 + FROM tbl_10k + GROUP BY v1::int % 10 order by v1::int % 10 limit 100 +), t2 AS ( + SELECT v2::char(5)::numeric::int % 10 as a2, SUM(id) as b2 + FROM tbl_10k + GROUP BY v2::char(5)::numeric::int % 10 order by v2::char(5)::numeric::int % 10 limit 101 +) +select a1, a2, b1+b2 from t1 inner join t2 on (b1 + b2 > 10); + + +create table mem_rescan_1 as +WITH t1 AS ( + SELECT v1::int % 10 as a1, SUM(id) as b1 + FROM tbl_10k + GROUP BY v1::int % 10 order by v1::int % 10 limit 100 +), t2 AS ( + SELECT v2::char(5)::numeric::int % 10 as a2, SUM(id) as b2 + FROM tbl_10k + GROUP BY v2::char(5)::numeric::int % 10 order by v2::char(5)::numeric::int % 10 limit 101 +) +select a1, a2, b1+b2 from t1 inner join t2 on (b1 + b2 > 10); + +set work_mem =64; + +create table disk_rescan_1 as +WITH t1 AS ( + SELECT v1::int % 10 as a1, SUM(id) as b1 + FROM tbl_10k + GROUP BY v1::int % 10 order by v1::int % 10 limit 100 +), t2 AS ( + SELECT v2::char(5)::numeric::int % 10 as a2, SUM(id) as b2 + FROM tbl_10k + GROUP BY v2::char(5)::numeric::int % 10 order by v2::char(5)::numeric::int % 10 limit 101 +) +select a1, a2, b1+b2 from t1 inner join t2 on (b1 + b2 > 10); + +-- Compare results between MEMORY SORT and DISK SORT +(select * from mem_rescan_1 except select * from disk_rescan_1) + union all +(select * from disk_rescan_1 except select * from mem_rescan_1); + +set work_mem =default; +set enable_sortgroup_agg=off; +set enable_hashagg =on; +set enable_sort=on; + +create table hashagg_rescan_1 as +WITH t1 AS ( + SELECT v1::int % 10 as a1, SUM(id) as b1 + FROM tbl_10k + GROUP BY v1::int % 10 order by v1::int % 10 limit 100 +), t2 AS ( + SELECT v2::char(5)::numeric::int % 10 as a2, SUM(id) as b2 + FROM tbl_10k + GROUP BY v2::char(5)::numeric::int % 10 order by v2::char(5)::numeric::int % 10 limit 101 +) +select a1, a2, b1+b2 from t1 inner join t2 on (b1 + b2 > 10); + +-- Compare results to hash aggregation results + +-- hashagg_rescan_1 = mem_rescan_1 = disk_rescan_1 + +(select * from mem_rescan_1 except select * from hashagg_rescan_1) + union all +(select * from hashagg_rescan_1 except select * from mem_rescan_1); + +drop table mem_rescan_1,hashagg_rescan_1,disk_rescan_1; + +drop table tbl_10k; + +create table tbl_cstore_10k(id bigint, v1 numeric, v2 numeric) with (orientation = column); +insert into tbl_cstore_10k select generate_series(1, 10 * 1000), (RANDOM() * 67)::int::numeric, (RANDOM() * 77)::int::numeric; +analyze tbl_cstore_10k; + +set enable_sortgroup_agg=on; +set enable_hashagg =off; +set enable_sort=off; + +explain (costs off) select sum(id), v1,v2 from tbl_cstore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; +create table agg_sortgroup_1 as +select sum(id), v1,v2 from tbl_cstore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; + +set enable_sortgroup_agg=off; +set enable_hashagg =on; +set enable_sort=on; + +create table agg_vecagg_1 as +select sum(id), v1,v2 from tbl_cstore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; + + +(select * from agg_sortgroup_1 except select * from agg_vecagg_1) + union all +(select * from agg_vecagg_1 except select * from agg_sortgroup_1); + +drop table tbl_cstore_10k, agg_sortgroup_1,agg_vecagg_1; + + +create table tbl_ustore_10k(id bigint, v1 numeric, v2 numeric) with (storage_type=ustore); +insert into tbl_ustore_10k select generate_series(1, 10 * 1000), (RANDOM() * 67)::int::numeric, (RANDOM() * 77)::int::numeric; +analyze tbl_ustore_10k; + +set enable_sortgroup_agg=on; +set enable_hashagg =off; +set enable_sort=off; + +explain (costs off) select sum(id), v1,v2 from tbl_ustore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; +create table agg_sortgroup_1 as +select sum(id), v1,v2 from tbl_ustore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; + +set enable_sortgroup_agg=off; +set enable_hashagg =on; +set enable_sort=on; + +create table agg_hashagg_1 as +select sum(id), v1,v2 from tbl_ustore_10k group by v1,v2 order by v1,v2 limit 11 offset 10; + + +(select * from agg_sortgroup_1 except select * from agg_hashagg_1) + union all +(select * from agg_hashagg_1 except select * from agg_sortgroup_1); + +drop table tbl_ustore_10k, agg_sortgroup_1,agg_hashagg_1; + +drop schema sortgroupagg cascade; \ No newline at end of file