diff --git a/executor/set_test.go b/executor/set_test.go index 8846fb9b8f..4972fb63e8 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -699,6 +699,14 @@ func TestSetVar(t *testing.T) { tk.MustExec("set global tidb_opt_skew_distinct_agg=1") tk.MustQuery("select @@global.tidb_opt_skew_distinct_agg").Check(testkit.Rows("1")) + // test for tidb_opt_three_stage_distinct_agg + tk.MustQuery("select @@session.tidb_opt_three_stage_distinct_agg").Check(testkit.Rows("1")) // default value is 1 + tk.MustExec("set session tidb_opt_three_stage_distinct_agg=0") + tk.MustQuery("select @@session.tidb_opt_three_stage_distinct_agg").Check(testkit.Rows("0")) + tk.MustQuery("select @@global.tidb_opt_three_stage_distinct_agg").Check(testkit.Rows("1")) // default value is 1 + tk.MustExec("set global tidb_opt_three_stage_distinct_agg=0") + tk.MustQuery("select @@global.tidb_opt_three_stage_distinct_agg").Check(testkit.Rows("0")) + // the value of max_allowed_packet should be a multiple of 1024 tk.MustExec("set @@global.max_allowed_packet=16385") tk.MustQuery("show warnings").Check(testkit.RowsWithSep("|", "Warning|1292|Truncated incorrect max_allowed_packet value: '16385'")) diff --git a/expression/column.go b/expression/column.go index a76c3aae1a..24df1eaf6b 100644 --- a/expression/column.go +++ b/expression/column.go @@ -202,6 +202,18 @@ func (col *CorrelatedColumn) MemoryUsage() (sum int64) { return sum } +// RemapColumn remaps columns with provided mapping and returns new expression +func (col *CorrelatedColumn) RemapColumn(m map[int64]*Column) (Expression, error) { + mapped := m[(&col.Column).UniqueID] + if mapped == nil { + return nil, errors.Errorf("Can't remap column for %s", col) + } + return &CorrelatedColumn{ + Column: *mapped, + Data: col.Data, + }, nil +} + // Column represents a column. type Column struct { RetType *types.FieldType @@ -537,6 +549,15 @@ func (col *Column) resolveIndicesByVirtualExpr(schema *Schema) bool { return false } +// RemapColumn remaps columns with provided mapping and returns new expression +func (col *Column) RemapColumn(m map[int64]*Column) (Expression, error) { + mapped := m[col.UniqueID] + if mapped == nil { + return nil, errors.Errorf("Can't remap column for %s", col) + } + return mapped, nil +} + // Vectorized returns if this expression supports vectorized evaluation. func (col *Column) Vectorized() bool { return true diff --git a/expression/constant.go b/expression/constant.go index 2bf3d67d90..9e2a1cfa14 100644 --- a/expression/constant.go +++ b/expression/constant.go @@ -422,6 +422,11 @@ func (c *Constant) resolveIndicesByVirtualExpr(_ *Schema) bool { return true } +// RemapColumn remaps columns with provided mapping and returns new expression +func (c *Constant) RemapColumn(_ map[int64]*Column) (Expression, error) { + return c, nil +} + // Vectorized returns if this expression supports vectorized evaluation. func (c *Constant) Vectorized() bool { if c.DeferredExpr != nil { diff --git a/expression/expression.go b/expression/expression.go index 9ec543f174..fde99f94fa 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -164,6 +164,9 @@ type Expression interface { // resolveIndicesByVirtualExpr is called inside the `ResolveIndicesByVirtualExpr` It will perform on the expression itself. resolveIndicesByVirtualExpr(schema *Schema) bool + // RemapColumn remaps columns with provided mapping and returns new expression + RemapColumn(map[int64]*Column) (Expression, error) + // ExplainInfo returns operator information to be explained. ExplainInfo() string diff --git a/expression/scalar_function.go b/expression/scalar_function.go index 47fec5beac..2483d342b6 100644 --- a/expression/scalar_function.go +++ b/expression/scalar_function.go @@ -488,6 +488,24 @@ func (sf *ScalarFunction) resolveIndicesByVirtualExpr(schema *Schema) bool { return true } +// RemapColumn remaps columns with provided mapping and returns new expression +func (sf *ScalarFunction) RemapColumn(m map[int64]*Column) (Expression, error) { + newSf, ok := sf.Clone().(*ScalarFunction) + if !ok { + return nil, errors.New("failed to cast to scalar function") + } + for i, arg := range sf.GetArgs() { + newArg, err := arg.RemapColumn(m) + if err != nil { + return nil, err + } + newSf.GetArgs()[i] = newArg + } + // clear hash code + newSf.hashcode = nil + return newSf, nil +} + // GetSingleColumn returns (Col, Desc) when the ScalarFunction is equivalent to (Col, Desc) // when used as a sort key, otherwise returns (nil, false). // diff --git a/expression/util_test.go b/expression/util_test.go index 22a17bdb59..1f7f966543 100644 --- a/expression/util_test.go +++ b/expression/util_test.go @@ -579,6 +579,7 @@ func (m *MockExpr) ResolveIndices(schema *Schema) (Expression, error) func (m *MockExpr) resolveIndices(schema *Schema) error { return nil } func (m *MockExpr) ResolveIndicesByVirtualExpr(schema *Schema) (Expression, bool) { return m, true } func (m *MockExpr) resolveIndicesByVirtualExpr(schema *Schema) bool { return true } +func (m *MockExpr) RemapColumn(_ map[int64]*Column) (Expression, error) { return m, nil } func (m *MockExpr) ExplainInfo() string { return "" } func (m *MockExpr) ExplainNormalizedInfo() string { return "" } func (m *MockExpr) HashCode(sc *stmtctx.StatementContext) []byte { return nil } diff --git a/planner/core/enforce_mpp_test.go b/planner/core/enforce_mpp_test.go index f3cde0a08a..8b0582ee54 100644 --- a/planner/core/enforce_mpp_test.go +++ b/planner/core/enforce_mpp_test.go @@ -484,3 +484,54 @@ func TestMPPSkewedGroupDistinctRewrite(t *testing.T) { require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())) } } + +// Test 3 stage aggregation for single count distinct +func TestMPPSingleDistinct3Stage(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + // test table + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b bigint not null, c bigint, d date, e varchar(20) collate utf8mb4_general_ci)") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Session()) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + require.True(t, exists) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + var input []string + var output []struct { + SQL string + Plan []string + Warn []string + } + enforceMPPSuiteData := plannercore.GetEnforceMPPSuiteData() + enforceMPPSuiteData.LoadTestCases(t, &input, &output) + for i, tt := range input { + testdata.OnRecord(func() { + output[i].SQL = tt + }) + if strings.HasPrefix(tt, "set") || strings.HasPrefix(tt, "UPDATE") { + tk.MustExec(tt) + continue + } + testdata.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())) + } +} diff --git a/planner/core/task.go b/planner/core/task.go index 7ff12e0099..a3e8a72f01 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1436,6 +1436,35 @@ func (p *basePhysicalAgg) newPartialAggregate(copTaskType kv.StoreType, isMPPTas return partialAgg, finalAgg } +// canUse3StageDistinctAgg returns true if this agg can use 3 stage for distinct aggregation +func (p *basePhysicalAgg) canUse3StageDistinctAgg() bool { + num := 0 + if !p.ctx.GetSessionVars().Enable3StageDistinctAgg || len(p.GroupByItems) > 0 { + return false + } + for _, fun := range p.AggFuncs { + if fun.HasDistinct { + num++ + if num > 1 || fun.Name != ast.AggFuncCount { + return false + } + for _, arg := range fun.Args { + // bail out when args are not simple column, see GitHub issue #35417 + if _, ok := arg.(*expression.Column); !ok { + return false + } + } + } else if len(fun.Args) > 1 { + return false + } + + if len(fun.OrderByItems) > 0 { + return false + } + } + return num == 1 +} + func genFirstRowAggForGroupBy(ctx sessionctx.Context, groupByItems []expression.Expression) ([]*aggregation.AggFuncDesc, error) { aggFuncs := make([]*aggregation.AggFuncDesc, 0, len(groupByItems)) for _, groupBy := range groupByItems { @@ -1642,15 +1671,98 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { if !mpp.needEnforceExchanger(prop) { return p.attach2TaskForMpp1Phase(mpp) } + // we have to check it before the content of p has been modified + canUse3StageAgg := p.canUse3StageDistinctAgg() proj := p.convertAvgForMPP() partialAgg, finalAgg := p.newPartialAggregate(kv.TiFlash, true) if finalAgg == nil { return invalidTask } + + // generate 3 stage aggregation for single count distinct if applicable. + // select count(distinct a), count(b) from foo + // will generate plan: + // HashAgg sum(#1), sum(#2) -> final agg + // +- Exchange Passthrough + // +- HashAgg count(distinct a) #1, sum(#3) #2 -> middle agg + // +- Exchange HashPartition by a + // +- HashAgg count(b) #3, group by a -> partial agg + // +- TableScan foo + var middleAgg *PhysicalHashAgg = nil + if partialAgg != nil && canUse3StageAgg { + clonedAgg, err := finalAgg.Clone() + if err != nil { + return invalidTask + } + middleAgg = clonedAgg.(*PhysicalHashAgg) + distinctPos := 0 + middleSchema := expression.NewSchema() + schemaMap := make(map[int64]*expression.Column, len(middleAgg.AggFuncs)) + for i, fun := range middleAgg.AggFuncs { + col := &expression.Column{ + UniqueID: p.ctx.GetSessionVars().AllocPlanColumnID(), + RetType: fun.RetTp, + } + if fun.HasDistinct { + distinctPos = i + } else { + fun.Mode = aggregation.Partial2Mode + originalCol := fun.Args[0].(*expression.Column) + schemaMap[originalCol.UniqueID] = col + } + middleSchema.Append(col) + } + middleAgg.schema = middleSchema + + finalHashAgg := finalAgg.(*PhysicalHashAgg) + finalAggDescs := make([]*aggregation.AggFuncDesc, 0, len(finalHashAgg.AggFuncs)) + for i, fun := range finalHashAgg.AggFuncs { + newArgs := make([]expression.Expression, 0, 1) + if distinctPos == i { + // change count(distinct) to sum() + fun.Name = ast.AggFuncSum + fun.HasDistinct = false + newArgs = append(newArgs, middleSchema.Columns[i]) + } else { + for _, arg := range fun.Args { + newCol, err := arg.RemapColumn(schemaMap) + if err != nil { + return invalidTask + } + newArgs = append(newArgs, newCol) + } + } + fun.Args = newArgs + finalAggDescs = append(finalAggDescs, fun) + } + finalHashAgg.AggFuncs = finalAggDescs + } + // partial agg would be null if one scalar agg cannot run in two-phase mode if partialAgg != nil { attachPlan2Task(partialAgg, mpp) } + + if middleAgg != nil && canUse3StageAgg { + items := partialAgg.(*PhysicalHashAgg).GroupByItems + partitionCols := make([]*property.MPPPartitionColumn, 0, len(items)) + for _, expr := range items { + col, ok := expr.(*expression.Column) + if !ok { + continue + } + partitionCols = append(partitionCols, &property.MPPPartitionColumn{ + Col: col, + CollateID: property.GetCollateIDByNameForPartition(col.GetType().GetCollate()), + }) + } + + prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: partitionCols} + newMpp := mpp.enforceExchanger(prop) + attachPlan2Task(middleAgg, newMpp) + mpp = newMpp + } + newMpp := mpp.enforceExchanger(prop) attachPlan2Task(finalAgg, newMpp) if proj == nil { diff --git a/planner/core/testdata/enforce_mpp_suite_in.json b/planner/core/testdata/enforce_mpp_suite_in.json index 208799941d..fc6d31dec9 100644 --- a/planner/core/testdata/enforce_mpp_suite_in.json +++ b/planner/core/testdata/enforce_mpp_suite_in.json @@ -113,5 +113,24 @@ "EXPLAIN select a, count(b), avg(distinct c), count(distinct c) from t group by a; -- multi distinct funcs, bail out", "EXPLAIN select count(b), count(distinct c) from t; -- single distinct func but no group key, bail out" ] + }, + { + "name": "TestMPPSingleDistinct3Stage", + "cases": [ + "set @@tidb_allow_mpp=1;set @@tidb_enforce_mpp=1;", + "EXPLAIN select count(distinct b) from t;", + "EXPLAIN select count(distinct c) from t;", + "EXPLAIN select count(distinct e) from t;", + "EXPLAIN select count(distinct a,b,c,e) from t;", + "EXPLAIN select count(distinct c), count(a), count(*) from t;", + "EXPLAIN select sum(b), count(a), count(*), count(distinct c) from t;", + "EXPLAIN select sum(b+a), count(*), count(distinct c), count(a) from t having count(distinct c) > 2;", + "EXPLAIN select sum(b+a), count(*), count(a) from t having count(distinct c) > 2;", + "EXPLAIN select sum(b+a), max(b), count(distinct c), count(*) from t having count(a) > 2;", + "EXPLAIN select sum(b), count(distinct a, b, e), count(a+b) from t;", + "EXPLAIN select count(distinct b), json_objectagg(d,c) from t;", + "EXPLAIN select count(distinct c+a), count(a) from t;", + "EXPLAIN select sum(b), count(distinct c+a, b, e), count(a+b) from t;" + ] } ] diff --git a/planner/core/testdata/enforce_mpp_suite_out.json b/planner/core/testdata/enforce_mpp_suite_out.json index 061d07c363..146e33f350 100644 --- a/planner/core/testdata/enforce_mpp_suite_out.json +++ b/planner/core/testdata/enforce_mpp_suite_out.json @@ -961,15 +961,251 @@ }, { "SQL": "EXPLAIN select count(b), count(distinct c) from t; -- single distinct func but no group key, bail out", + "Plan": [ + "TableReader_34 1.00 root data:ExchangeSender_33", + "└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_27 1.00 mpp[tiflash] Column#7, Column#8", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:sum(Column#13)->Column#7, funcs:sum(Column#14)->Column#8", + " └─ExchangeReceiver_32 1.00 mpp[tiflash] ", + " └─ExchangeSender_31 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:sum(Column#12)->Column#13, funcs:count(distinct test.t.c)->Column#14", + " └─ExchangeReceiver_30 1.00 mpp[tiflash] ", + " └─ExchangeSender_29 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.c, collate: binary]", + " └─HashAgg_26 1.00 mpp[tiflash] group by:test.t.c, funcs:count(test.t.b)->Column#12", + " └─TableFullScan_13 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + } + ] + }, + { + "Name": "TestMPPSingleDistinct3Stage", + "Cases": [ + { + "SQL": "set @@tidb_allow_mpp=1;set @@tidb_enforce_mpp=1;", + "Plan": null, + "Warn": null + }, + { + "SQL": "EXPLAIN select count(distinct b) from t;", + "Plan": [ + "TableReader_34 1.00 root data:ExchangeSender_33", + "└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_27 1.00 mpp[tiflash] Column#7", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:sum(Column#9)->Column#7", + " └─ExchangeReceiver_32 1.00 mpp[tiflash] ", + " └─ExchangeSender_31 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:count(distinct test.t.b)->Column#9", + " └─ExchangeReceiver_30 1.00 mpp[tiflash] ", + " └─ExchangeSender_29 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.b, collate: binary]", + " └─HashAgg_26 1.00 mpp[tiflash] group by:test.t.b, ", + " └─TableFullScan_13 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "EXPLAIN select count(distinct c) from t;", + "Plan": [ + "TableReader_34 1.00 root data:ExchangeSender_33", + "└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_27 1.00 mpp[tiflash] Column#7", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:sum(Column#9)->Column#7", + " └─ExchangeReceiver_32 1.00 mpp[tiflash] ", + " └─ExchangeSender_31 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:count(distinct test.t.c)->Column#9", + " └─ExchangeReceiver_30 1.00 mpp[tiflash] ", + " └─ExchangeSender_29 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.c, collate: binary]", + " └─HashAgg_26 1.00 mpp[tiflash] group by:test.t.c, ", + " └─TableFullScan_13 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "EXPLAIN select count(distinct e) from t;", + "Plan": [ + "TableReader_34 1.00 root data:ExchangeSender_33", + "└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_27 1.00 mpp[tiflash] Column#7", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:sum(Column#9)->Column#7", + " └─ExchangeReceiver_32 1.00 mpp[tiflash] ", + " └─ExchangeSender_31 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:count(distinct test.t.e)->Column#9", + " └─ExchangeReceiver_30 1.00 mpp[tiflash] ", + " └─ExchangeSender_29 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.e, collate: utf8mb4_general_ci]", + " └─HashAgg_26 1.00 mpp[tiflash] group by:test.t.e, ", + " └─TableFullScan_13 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "EXPLAIN select count(distinct a,b,c,e) from t;", + "Plan": [ + "TableReader_34 1.00 root data:ExchangeSender_33", + "└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_27 1.00 mpp[tiflash] Column#7", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:sum(Column#9)->Column#7", + " └─ExchangeReceiver_32 1.00 mpp[tiflash] ", + " └─ExchangeSender_31 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:count(distinct test.t.a, test.t.b, test.t.c, test.t.e)->Column#9", + " └─ExchangeReceiver_30 1.00 mpp[tiflash] ", + " └─ExchangeSender_29 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.a, collate: binary], [name: test.t.b, collate: binary], [name: test.t.c, collate: binary], [name: test.t.e, collate: utf8mb4_general_ci]", + " └─HashAgg_26 1.00 mpp[tiflash] group by:test.t.a, test.t.b, test.t.c, test.t.e, ", + " └─TableFullScan_13 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "EXPLAIN select count(distinct c), count(a), count(*) from t;", + "Plan": [ + "TableReader_34 1.00 root data:ExchangeSender_33", + "└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_27 1.00 mpp[tiflash] Column#7, Column#8, Column#9", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:sum(Column#17)->Column#7, funcs:sum(Column#18)->Column#8, funcs:sum(Column#19)->Column#9", + " └─ExchangeReceiver_32 1.00 mpp[tiflash] ", + " └─ExchangeSender_31 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:count(distinct test.t.c)->Column#17, funcs:sum(Column#15)->Column#18, funcs:sum(Column#16)->Column#19", + " └─ExchangeReceiver_30 1.00 mpp[tiflash] ", + " └─ExchangeSender_29 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.c, collate: binary]", + " └─HashAgg_26 1.00 mpp[tiflash] group by:test.t.c, funcs:count(test.t.a)->Column#15, funcs:count(1)->Column#16", + " └─TableFullScan_13 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "EXPLAIN select sum(b), count(a), count(*), count(distinct c) from t;", + "Plan": [ + "TableReader_34 1.00 root data:ExchangeSender_33", + "└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_27 1.00 mpp[tiflash] Column#7, Column#8, Column#9, Column#10", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:sum(Column#21)->Column#7, funcs:sum(Column#22)->Column#8, funcs:sum(Column#23)->Column#9, funcs:sum(Column#24)->Column#10", + " └─ExchangeReceiver_32 1.00 mpp[tiflash] ", + " └─ExchangeSender_31 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:sum(Column#18)->Column#21, funcs:sum(Column#19)->Column#22, funcs:sum(Column#20)->Column#23, funcs:count(distinct test.t.c)->Column#24", + " └─ExchangeReceiver_30 1.00 mpp[tiflash] ", + " └─ExchangeSender_29 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.c, collate: binary]", + " └─HashAgg_26 1.00 mpp[tiflash] group by:Column#27, funcs:sum(Column#25)->Column#18, funcs:count(Column#26)->Column#19, funcs:count(1)->Column#20", + " └─Projection_35 10000.00 mpp[tiflash] cast(test.t.b, decimal(20,0) BINARY)->Column#25, test.t.a, test.t.c", + " └─TableFullScan_13 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "EXPLAIN select sum(b+a), count(*), count(distinct c), count(a) from t having count(distinct c) > 2;", + "Plan": [ + "TableReader_41 0.80 root data:ExchangeSender_40", + "└─ExchangeSender_40 0.80 mpp[tiflash] ExchangeType: PassThrough", + " └─Selection_39 0.80 mpp[tiflash] gt(Column#9, 2)", + " └─Projection_31 1.00 mpp[tiflash] Column#7, Column#8, Column#9, Column#10", + " └─HashAgg_32 1.00 mpp[tiflash] funcs:sum(Column#25)->Column#7, funcs:sum(Column#26)->Column#8, funcs:sum(Column#27)->Column#9, funcs:sum(Column#28)->Column#10", + " └─ExchangeReceiver_36 1.00 mpp[tiflash] ", + " └─ExchangeSender_35 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_32 1.00 mpp[tiflash] funcs:sum(Column#22)->Column#25, funcs:sum(Column#23)->Column#26, funcs:count(distinct test.t.c)->Column#27, funcs:sum(Column#24)->Column#28", + " └─ExchangeReceiver_34 1.00 mpp[tiflash] ", + " └─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.c, collate: binary]", + " └─HashAgg_30 1.00 mpp[tiflash] group by:Column#31, funcs:sum(Column#29)->Column#22, funcs:count(1)->Column#23, funcs:count(Column#30)->Column#24", + " └─Projection_42 10000.00 mpp[tiflash] cast(plus(test.t.b, test.t.a), decimal(20,0) BINARY)->Column#29, test.t.a, test.t.c", + " └─TableFullScan_17 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "EXPLAIN select sum(b+a), count(*), count(a) from t having count(distinct c) > 2;", + "Plan": [ + "TableReader_41 0.80 root data:ExchangeSender_40", + "└─ExchangeSender_40 0.80 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_7 0.80 mpp[tiflash] Column#7, Column#8, Column#9", + " └─Selection_39 0.80 mpp[tiflash] gt(Column#10, 2)", + " └─Projection_31 1.00 mpp[tiflash] Column#7, Column#8, Column#9, Column#10", + " └─HashAgg_32 1.00 mpp[tiflash] funcs:sum(Column#24)->Column#7, funcs:sum(Column#25)->Column#8, funcs:sum(Column#26)->Column#9, funcs:sum(Column#27)->Column#10", + " └─ExchangeReceiver_36 1.00 mpp[tiflash] ", + " └─ExchangeSender_35 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_32 1.00 mpp[tiflash] funcs:sum(Column#21)->Column#24, funcs:sum(Column#22)->Column#25, funcs:sum(Column#23)->Column#26, funcs:count(distinct test.t.c)->Column#27", + " └─ExchangeReceiver_34 1.00 mpp[tiflash] ", + " └─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.c, collate: binary]", + " └─HashAgg_30 1.00 mpp[tiflash] group by:Column#30, funcs:sum(Column#28)->Column#21, funcs:count(1)->Column#22, funcs:count(Column#29)->Column#23", + " └─Projection_42 10000.00 mpp[tiflash] cast(plus(test.t.b, test.t.a), decimal(20,0) BINARY)->Column#28, test.t.a, test.t.c", + " └─TableFullScan_17 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "EXPLAIN select sum(b+a), max(b), count(distinct c), count(*) from t having count(a) > 2;", + "Plan": [ + "TableReader_41 0.80 root data:ExchangeSender_40", + "└─ExchangeSender_40 0.80 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_7 0.80 mpp[tiflash] Column#7, Column#8, Column#9, Column#10", + " └─Selection_39 0.80 mpp[tiflash] gt(Column#11, 2)", + " └─Projection_31 1.00 mpp[tiflash] Column#7, Column#8, Column#9, Column#10, Column#11", + " └─HashAgg_32 1.00 mpp[tiflash] funcs:sum(Column#29)->Column#7, funcs:max(Column#30)->Column#8, funcs:sum(Column#31)->Column#9, funcs:sum(Column#32)->Column#10, funcs:sum(Column#33)->Column#11", + " └─ExchangeReceiver_36 1.00 mpp[tiflash] ", + " └─ExchangeSender_35 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_32 1.00 mpp[tiflash] funcs:sum(Column#25)->Column#29, funcs:max(Column#26)->Column#30, funcs:count(distinct test.t.c)->Column#31, funcs:sum(Column#27)->Column#32, funcs:sum(Column#28)->Column#33", + " └─ExchangeReceiver_34 1.00 mpp[tiflash] ", + " └─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.c, collate: binary]", + " └─HashAgg_30 1.00 mpp[tiflash] group by:Column#37, funcs:sum(Column#34)->Column#25, funcs:max(Column#35)->Column#26, funcs:count(1)->Column#27, funcs:count(Column#36)->Column#28", + " └─Projection_42 10000.00 mpp[tiflash] cast(plus(test.t.b, test.t.a), decimal(20,0) BINARY)->Column#34, test.t.b, test.t.a, test.t.c", + " └─TableFullScan_17 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "EXPLAIN select sum(b), count(distinct a, b, e), count(a+b) from t;", + "Plan": [ + "TableReader_34 1.00 root data:ExchangeSender_33", + "└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_27 1.00 mpp[tiflash] Column#7, Column#8, Column#9", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:sum(Column#17)->Column#7, funcs:sum(Column#18)->Column#8, funcs:sum(Column#19)->Column#9", + " └─ExchangeReceiver_32 1.00 mpp[tiflash] ", + " └─ExchangeSender_31 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_28 1.00 mpp[tiflash] funcs:sum(Column#15)->Column#17, funcs:count(distinct test.t.a, test.t.b, test.t.e)->Column#18, funcs:sum(Column#16)->Column#19", + " └─ExchangeReceiver_30 1.00 mpp[tiflash] ", + " └─ExchangeSender_29 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.a, collate: binary], [name: test.t.b, collate: binary], [name: test.t.e, collate: utf8mb4_general_ci]", + " └─HashAgg_26 1.00 mpp[tiflash] group by:Column#22, Column#23, Column#24, funcs:sum(Column#20)->Column#15, funcs:count(Column#21)->Column#16", + " └─Projection_35 10000.00 mpp[tiflash] cast(test.t.b, decimal(20,0) BINARY)->Column#20, plus(test.t.a, test.t.b)->Column#21, test.t.a, test.t.b, test.t.e", + " └─TableFullScan_13 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "EXPLAIN select count(distinct b), json_objectagg(d,c) from t;", + "Plan": [ + "StreamAgg_7 1.00 root funcs:count(distinct Column#9)->Column#7, funcs:json_objectagg(Column#10, Column#11)->Column#8", + "└─Projection_16 10000.00 root test.t.b, cast(test.t.d, var_string(10))->Column#10, test.t.c", + " └─TableReader_15 10000.00 root data:TableFullScan_14", + " └─TableFullScan_14 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "Aggregation can not be pushed to tiflash because AggFunc `json_objectagg` is not supported now", + "Aggregation can not be pushed to tiflash because AggFunc `json_objectagg` is not supported now" + ] + }, + { + "SQL": "EXPLAIN select count(distinct c+a), count(a) from t;", "Plan": [ "TableReader_30 1.00 root data:ExchangeSender_29", "└─ExchangeSender_29 1.00 mpp[tiflash] ExchangeType: PassThrough", " └─Projection_25 1.00 mpp[tiflash] Column#7, Column#8", - " └─HashAgg_26 1.00 mpp[tiflash] funcs:sum(Column#10)->Column#7, funcs:count(distinct test.t.c)->Column#8", + " └─HashAgg_26 1.00 mpp[tiflash] funcs:count(distinct Column#11)->Column#7, funcs:sum(Column#12)->Column#8", " └─ExchangeReceiver_28 1.00 mpp[tiflash] ", " └─ExchangeSender_27 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg_24 1.00 mpp[tiflash] group by:test.t.c, funcs:count(test.t.b)->Column#10", - " └─TableFullScan_13 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + " └─HashAgg_24 1.00 mpp[tiflash] group by:Column#14, funcs:count(Column#13)->Column#12", + " └─Projection_31 10000.00 mpp[tiflash] test.t.a, plus(test.t.c, test.t.a)->Column#14", + " └─TableFullScan_13 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "EXPLAIN select sum(b), count(distinct c+a, b, e), count(a+b) from t;", + "Plan": [ + "TableReader_30 1.00 root data:ExchangeSender_29", + "└─ExchangeSender_29 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_25 1.00 mpp[tiflash] Column#7, Column#8, Column#9", + " └─HashAgg_26 1.00 mpp[tiflash] funcs:sum(Column#13)->Column#7, funcs:count(distinct Column#14, test.t.b, test.t.e)->Column#8, funcs:sum(Column#15)->Column#9", + " └─ExchangeReceiver_28 1.00 mpp[tiflash] ", + " └─ExchangeSender_27 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_24 1.00 mpp[tiflash] group by:Column#18, Column#19, Column#20, funcs:sum(Column#16)->Column#13, funcs:count(Column#17)->Column#15", + " └─Projection_31 10000.00 mpp[tiflash] cast(test.t.b, decimal(20,0) BINARY)->Column#16, plus(test.t.a, test.t.b)->Column#17, plus(test.t.c, test.t.a)->Column#18, test.t.b, test.t.e", + " └─TableFullScan_13 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" ], "Warn": null } diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index e43b448cfd..46cff38881 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -5869,11 +5869,14 @@ "TableReader 1.00 root data:ExchangeSender", "└─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", " └─Projection 1.00 mpp[tiflash] Column#4", - " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.t.value)->Column#4", + " └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#6)->Column#4", " └─ExchangeReceiver 1.00 mpp[tiflash] ", " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg 1.00 mpp[tiflash] group by:test.t.value, ", - " └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.t.value)->Column#6", + " └─ExchangeReceiver 1.00 mpp[tiflash] ", + " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.value, collate: binary]", + " └─HashAgg 1.00 mpp[tiflash] group by:test.t.value, ", + " └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" ] }, { @@ -5884,11 +5887,14 @@ " └─Projection 1.00 mpp[tiflash] Column#5", " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct Column#4)->Column#5", " └─Projection 1.00 mpp[tiflash] Column#4", - " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.t.value)->Column#4", + " └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#7)->Column#4", " └─ExchangeReceiver 1.00 mpp[tiflash] ", " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg 1.00 mpp[tiflash] group by:test.t.value, ", - " └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.t.value)->Column#7", + " └─ExchangeReceiver 1.00 mpp[tiflash] ", + " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.value, collate: binary]", + " └─HashAgg 1.00 mpp[tiflash] group by:test.t.value, ", + " └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" ] }, { @@ -5896,12 +5902,15 @@ "Plan": [ "TableReader 1.00 root data:ExchangeSender", "└─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Projection 1.00 mpp[tiflash] Column#4, Column#5, div(Column#6, cast(case(eq(Column#11, 0), 1, Column#11), decimal(20,0) BINARY))->Column#6", - " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.t.value)->Column#4, funcs:sum(Column#12)->Column#5, funcs:sum(Column#13)->Column#11, funcs:sum(Column#14)->Column#6", + " └─Projection 1.00 mpp[tiflash] Column#4, Column#5, div(Column#6, cast(case(eq(Column#15, 0), 1, Column#15), decimal(20,0) BINARY))->Column#6", + " └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#19)->Column#4, funcs:sum(Column#20)->Column#5, funcs:sum(Column#21)->Column#15, funcs:sum(Column#22)->Column#6", " └─ExchangeReceiver 1.00 mpp[tiflash] ", " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg 1.00 mpp[tiflash] group by:test.t.value, funcs:count(test.t.value)->Column#12, funcs:count(test.t.value)->Column#13, funcs:sum(test.t.value)->Column#14", - " └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.t.value)->Column#19, funcs:sum(Column#16)->Column#20, funcs:sum(Column#17)->Column#21, funcs:sum(Column#18)->Column#22", + " └─ExchangeReceiver 1.00 mpp[tiflash] ", + " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.value, collate: binary]", + " └─HashAgg 1.00 mpp[tiflash] group by:test.t.value, funcs:count(test.t.value)->Column#16, funcs:count(test.t.value)->Column#17, funcs:sum(test.t.value)->Column#18", + " └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" ] } ] diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index 2e505b1d08..6c5dae9df3 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -3275,11 +3275,14 @@ "TableReader 1.00 root data:ExchangeSender", "└─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", " └─Projection 1.00 mpp[tiflash] Column#5", - " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#5", + " └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#7)->Column#5", " └─ExchangeReceiver 1.00 mpp[tiflash] ", " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", - " └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" + " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#7", + " └─ExchangeReceiver 1.00 mpp[tiflash] ", + " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", + " └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ] }, { @@ -3305,11 +3308,14 @@ " └─Projection 1.00 mpp[tiflash] Column#6", " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct Column#5)->Column#6", " └─Projection 1.00 mpp[tiflash] Column#5", - " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#5", + " └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#8)->Column#5", " └─ExchangeReceiver 1.00 mpp[tiflash] ", " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", - " └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" + " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#8", + " └─ExchangeReceiver 1.00 mpp[tiflash] ", + " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", + " └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ] }, { @@ -3372,11 +3378,14 @@ " ├─ExchangeReceiver(Build) 1.00 mpp[tiflash] ", " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: Broadcast", " │ └─Projection 1.00 mpp[tiflash] Column#9", - " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#9", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#10)->Column#9", " │ └─ExchangeReceiver 1.00 mpp[tiflash] ", " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " │ └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", - " │ └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#10", + " │ └─ExchangeReceiver 1.00 mpp[tiflash] ", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " │ └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo", " └─Selection(Probe) 9990.00 mpp[tiflash] not(isnull(test.employee.deptid))", " └─TableFullScan 10000.00 mpp[tiflash] table:e1 keep order:false, stats:pseudo" ] @@ -3391,11 +3400,14 @@ " ├─ExchangeReceiver(Build) 1.00 mpp[tiflash] ", " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: Broadcast", " │ └─Projection 1.00 mpp[tiflash] Column#5", - " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#5", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#10)->Column#5", " │ └─ExchangeReceiver 1.00 mpp[tiflash] ", " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " │ └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", - " │ └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#10", + " │ └─ExchangeReceiver 1.00 mpp[tiflash] ", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " │ └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo", " └─Selection(Probe) 9990.00 mpp[tiflash] not(isnull(test.employee.deptid))", " └─TableFullScan 10000.00 mpp[tiflash] table:e2 keep order:false, stats:pseudo" ] @@ -3409,17 +3421,23 @@ " ├─ExchangeReceiver(Build) 1.00 mpp[tiflash] ", " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: Broadcast", " │ └─Projection 1.00 mpp[tiflash] Column#5", - " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#5", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#11)->Column#5", " │ └─ExchangeReceiver 1.00 mpp[tiflash] ", " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " │ └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", - " │ └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#11", + " │ └─ExchangeReceiver 1.00 mpp[tiflash] ", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " │ └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo", " └─Projection(Probe) 1.00 mpp[tiflash] Column#10", - " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#10", + " └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#12)->Column#10", " └─ExchangeReceiver 1.00 mpp[tiflash] ", " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", - " └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" + " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#12", + " └─ExchangeReceiver 1.00 mpp[tiflash] ", + " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", + " └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ] }, { @@ -3523,11 +3541,14 @@ " ├─TableReader(Build) 1.00 root data:ExchangeSender", " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", " │ └─Projection 1.00 mpp[tiflash] Column#5", - " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#5", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#12)->Column#5", " │ └─ExchangeReceiver 1.00 mpp[tiflash] ", " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " │ └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", - " │ └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#12", + " │ └─ExchangeReceiver 1.00 mpp[tiflash] ", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " │ └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo", " └─TableReader(Probe) 9990.00 root data:Selection", " └─Selection 9990.00 cop[tiflash] not(isnull(test.employee.deptid))", " └─TableFullScan 10000.00 cop[tiflash] table:e2 keep order:false, stats:pseudo" @@ -3540,19 +3561,25 @@ "├─TableReader(Build) 1.00 root data:ExchangeSender", "│ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", "│ └─Projection 1.00 mpp[tiflash] Column#10", - "│ └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#10", + "│ └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#16)->Column#10", "│ └─ExchangeReceiver 1.00 mpp[tiflash] ", "│ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - "│ └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", - "│ └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo", + "│ └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#16", + "│ └─ExchangeReceiver 1.00 mpp[tiflash] ", + "│ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + "│ └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", + "│ └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo", "└─TableReader(Probe) 1.00 root data:ExchangeSender", " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", " └─Projection 1.00 mpp[tiflash] Column#5", - " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#5", + " └─HashAgg 1.00 mpp[tiflash] funcs:sum(Column#15)->Column#5", " └─ExchangeReceiver 1.00 mpp[tiflash] ", " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", - " └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" + " └─HashAgg 1.00 mpp[tiflash] funcs:count(distinct test.employee.deptid)->Column#15", + " └─ExchangeReceiver 1.00 mpp[tiflash] ", + " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " └─HashAgg 1.00 mpp[tiflash] group by:test.employee.deptid, ", + " └─TableFullScan 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ] }, { diff --git a/planner/core/testdata/window_push_down_suite_out.json b/planner/core/testdata/window_push_down_suite_out.json index 1d0801701c..e4b3aa37c0 100644 --- a/planner/core/testdata/window_push_down_suite_out.json +++ b/planner/core/testdata/window_push_down_suite_out.json @@ -322,15 +322,18 @@ { "SQL": "explain select *, row_number() over () from (select count(distinct empid) from employee) t", "Plan": [ - "TableReader_53 1.00 root data:ExchangeSender_52", - "└─ExchangeSender_52 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Window_51 1.00 mpp[tiflash] row_number()->Column#7 over(rows between current row and current row)", + "TableReader_61 1.00 root data:ExchangeSender_60", + "└─ExchangeSender_60 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Window_59 1.00 mpp[tiflash] row_number()->Column#7 over(rows between current row and current row)", " └─Projection_15 1.00 mpp[tiflash] Column#5", - " └─HashAgg_16 1.00 mpp[tiflash] funcs:count(distinct test.employee.empid)->Column#5", - " └─ExchangeReceiver_18 1.00 mpp[tiflash] ", - " └─ExchangeSender_17 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg_12 1.00 mpp[tiflash] group by:test.employee.empid, ", - " └─TableFullScan_14 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" + " └─HashAgg_16 1.00 mpp[tiflash] funcs:sum(Column#8)->Column#5", + " └─ExchangeReceiver_20 1.00 mpp[tiflash] ", + " └─ExchangeSender_19 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_16 1.00 mpp[tiflash] funcs:count(distinct test.employee.empid)->Column#8", + " └─ExchangeReceiver_18 1.00 mpp[tiflash] ", + " └─ExchangeSender_17 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.empid, collate: binary]", + " └─HashAgg_12 1.00 mpp[tiflash] group by:test.employee.empid, ", + " └─TableFullScan_14 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ], "Warn": null }, @@ -351,18 +354,21 @@ { "SQL": "explain select *, row_number() over (partition by a) from (select count(distinct empid) a from employee) t", "Plan": [ - "TableReader_45 1.00 root data:ExchangeSender_44", - "└─ExchangeSender_44 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Window_43 1.00 mpp[tiflash] row_number()->Column#7 over(partition by Column#5 rows between current row and current row), stream_count: 8", - " └─Sort_20 1.00 mpp[tiflash] Column#5, stream_count: 8", - " └─ExchangeReceiver_19 1.00 mpp[tiflash] stream_count: 8", - " └─ExchangeSender_18 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#5, collate: binary], stream_count: 8", + "TableReader_49 1.00 root data:ExchangeSender_48", + "└─ExchangeSender_48 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Window_47 1.00 mpp[tiflash] row_number()->Column#7 over(partition by Column#5 rows between current row and current row), stream_count: 8", + " └─Sort_22 1.00 mpp[tiflash] Column#5, stream_count: 8", + " └─ExchangeReceiver_21 1.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_20 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#5, collate: binary], stream_count: 8", " └─Projection_14 1.00 mpp[tiflash] Column#5", - " └─HashAgg_15 1.00 mpp[tiflash] funcs:count(distinct test.employee.empid)->Column#5", - " └─ExchangeReceiver_17 1.00 mpp[tiflash] ", - " └─ExchangeSender_16 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg_12 1.00 mpp[tiflash] group by:test.employee.empid, ", - " └─TableFullScan_13 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" + " └─HashAgg_15 1.00 mpp[tiflash] funcs:sum(Column#8)->Column#5", + " └─ExchangeReceiver_19 1.00 mpp[tiflash] ", + " └─ExchangeSender_18 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_15 1.00 mpp[tiflash] funcs:count(distinct test.employee.empid)->Column#8", + " └─ExchangeReceiver_17 1.00 mpp[tiflash] ", + " └─ExchangeSender_16 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.empid, collate: binary]", + " └─HashAgg_12 1.00 mpp[tiflash] group by:test.employee.empid, ", + " └─TableFullScan_13 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ], "Warn": null }, @@ -398,18 +404,21 @@ { "SQL": "explain select count(distinct empid) from (select *, row_number() over (partition by deptid) from employee) t", "Plan": [ - "TableReader_45 1.00 root data:ExchangeSender_44", - "└─ExchangeSender_44 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Projection_40 1.00 mpp[tiflash] Column#7", - " └─HashAgg_41 1.00 mpp[tiflash] funcs:count(distinct test.employee.empid)->Column#7", - " └─ExchangeReceiver_43 1.00 mpp[tiflash] ", - " └─ExchangeSender_42 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg_39 1.00 mpp[tiflash] group by:test.employee.empid, ", - " └─Window_27 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row), stream_count: 8", - " └─Sort_18 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", - " └─ExchangeReceiver_17 10000.00 mpp[tiflash] stream_count: 8", - " └─ExchangeSender_16 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary], stream_count: 8", - " └─TableFullScan_15 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" + "TableReader_49 1.00 root data:ExchangeSender_48", + "└─ExchangeSender_48 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection_42 1.00 mpp[tiflash] Column#7", + " └─HashAgg_43 1.00 mpp[tiflash] funcs:sum(Column#9)->Column#7", + " └─ExchangeReceiver_47 1.00 mpp[tiflash] ", + " └─ExchangeSender_46 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_43 1.00 mpp[tiflash] funcs:count(distinct test.employee.empid)->Column#9", + " └─ExchangeReceiver_45 1.00 mpp[tiflash] ", + " └─ExchangeSender_44 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.empid, collate: binary]", + " └─HashAgg_41 1.00 mpp[tiflash] group by:test.employee.empid, ", + " └─Window_27 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row), stream_count: 8", + " └─Sort_18 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", + " └─ExchangeReceiver_17 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_16 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary], stream_count: 8", + " └─TableFullScan_15 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ], "Warn": null }, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index a188eca1cc..e64e740795 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -738,6 +738,9 @@ type SessionVars struct { // EnableSkewDistinctAgg can be set true to allow skew distinct aggregate rewrite EnableSkewDistinctAgg bool + // Enable3StageDistinctAgg indicates whether to allow 3 stage distinct aggregate + Enable3StageDistinctAgg bool + // MultiStatementMode permits incorrect client library usage. Not recommended to be turned on. MultiStatementMode int @@ -1561,6 +1564,7 @@ func NewSessionVars() *SessionVars { EnableLegacyInstanceScope: DefEnableLegacyInstanceScope, RemoveOrderbyInSubquery: DefTiDBRemoveOrderbyInSubquery, EnableSkewDistinctAgg: DefTiDBSkewDistinctAgg, + Enable3StageDistinctAgg: DefTiDB3StageDistinctAgg, MaxAllowedPacket: DefMaxAllowedPacket, TiFlashFastScan: DefTiFlashFastScan, EnableTiFlashReadForWriteStmt: DefTiDBEnableTiFlashReadForWriteStmt, diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index caf8458685..edd9a4b820 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -175,6 +175,10 @@ var defaultSysVars = []*SysVar{ s.EnableSkewDistinctAgg = TiDBOptOn(val) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOpt3StageDistinctAgg, Value: BoolToOnOff(DefTiDB3StageDistinctAgg), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + s.Enable3StageDistinctAgg = TiDBOptOn(val) + return nil + }}, {Scope: ScopeSession, Name: TiDBOptWriteRowID, Value: BoolToOnOff(DefOptWriteRowID), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { s.AllowWriteRowID = TiDBOptOn(val) return nil diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index c8aa6b72c9..806bf9ccca 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -54,6 +54,9 @@ const ( // TiDBOptSkewDistinctAgg is used to indicate the distinct agg has data skew TiDBOptSkewDistinctAgg = "tidb_opt_skew_distinct_agg" + // TiDBOpt3StageDistinctAgg is used to indicate whether to plan and execute the distinct agg in 3 stages + TiDBOpt3StageDistinctAgg = "tidb_opt_three_stage_distinct_agg" + // TiDBBCJThresholdSize is used to limit the size of small table for mpp broadcast join. // Its unit is bytes, if the size of small table is larger than it, we will not use bcj. TiDBBCJThresholdSize = "tidb_broadcast_join_threshold_size" @@ -1014,6 +1017,7 @@ const ( DefRCReadCheckTS = false DefTiDBRemoveOrderbyInSubquery = false DefTiDBSkewDistinctAgg = false + DefTiDB3StageDistinctAgg = true DefTiDBReadStaleness = 0 DefTiDBGCMaxWaitTime = 24 * 60 * 60 DefMaxAllowedPacket uint64 = 67108864