planner: push aggregation functions with distinct to cop (#15500)
This commit is contained in:
@ -143,11 +143,11 @@ Projection_10 0.00 root test.st.id, test.dd.id, test.st.aid, test.st.cm, test.d
|
||||
explain SELECT cm, p1, p2, p3, p4, p5, p6_md5, p7_md5, count(1) as click_pv, count(DISTINCT ip) as click_ip FROM st WHERE (t between 1478188800 and 1478275200) and aid='cn.sbkcq' and pt='android' GROUP BY cm, p1, p2, p3, p4, p5, p6_md5, p7_md5;
|
||||
id estRows task access object operator info
|
||||
Projection_5 1.00 root test.st.cm, test.st.p1, test.st.p2, test.st.p3, test.st.p4, test.st.p5, test.st.p6_md5, test.st.p7_md5, Column#20, Column#21
|
||||
└─HashAgg_7 1.00 root group by:test.st.cm, test.st.p1, test.st.p2, test.st.p3, test.st.p4, test.st.p5, test.st.p6_md5, test.st.p7_md5, funcs:count(1)->Column#20, funcs:count(distinct test.st.ip)->Column#21, funcs:firstrow(test.st.cm)->test.st.cm, funcs:firstrow(test.st.p1)->test.st.p1, funcs:firstrow(test.st.p2)->test.st.p2, funcs:firstrow(test.st.p3)->test.st.p3, funcs:firstrow(test.st.p4)->test.st.p4, funcs:firstrow(test.st.p5)->test.st.p5, funcs:firstrow(test.st.p6_md5)->test.st.p6_md5, funcs:firstrow(test.st.p7_md5)->test.st.p7_md5
|
||||
└─IndexLookUp_15 0.00 root
|
||||
├─IndexRangeScan_12(Build) 250.00 cop[tikv] table:st, index:t(t) range:[1478188800,1478275200], keep order:false, stats:pseudo
|
||||
└─Selection_14(Probe) 0.00 cop[tikv] eq(test.st.aid, "cn.sbkcq"), eq(test.st.pt, "android")
|
||||
└─TableRowIDScan_13 250.00 cop[tikv] table:st keep order:false, stats:pseudo
|
||||
└─HashAgg_6 1.00 root group by:test.st.cm, test.st.p1, test.st.p2, test.st.p3, test.st.p4, test.st.p5, test.st.p6_md5, test.st.p7_md5, funcs:count(1)->Column#20, funcs:count(distinct test.st.ip)->Column#21, funcs:firstrow(test.st.cm)->test.st.cm, funcs:firstrow(test.st.p1)->test.st.p1, funcs:firstrow(test.st.p2)->test.st.p2, funcs:firstrow(test.st.p3)->test.st.p3, funcs:firstrow(test.st.p4)->test.st.p4, funcs:firstrow(test.st.p5)->test.st.p5, funcs:firstrow(test.st.p6_md5)->test.st.p6_md5, funcs:firstrow(test.st.p7_md5)->test.st.p7_md5
|
||||
└─IndexLookUp_13 0.00 root
|
||||
├─IndexRangeScan_10(Build) 250.00 cop[tikv] table:st, index:t(t) range:[1478188800,1478275200], keep order:false, stats:pseudo
|
||||
└─Selection_12(Probe) 0.00 cop[tikv] eq(test.st.aid, "cn.sbkcq"), eq(test.st.pt, "android")
|
||||
└─TableRowIDScan_11 250.00 cop[tikv] table:st keep order:false, stats:pseudo
|
||||
explain select dt.id as id, dt.aid as aid, dt.pt as pt, dt.dic as dic, dt.cm as cm, rr.gid as gid, rr.acd as acd, rr.t as t,dt.p1 as p1, dt.p2 as p2, dt.p3 as p3, dt.p4 as p4, dt.p5 as p5, dt.p6_md5 as p6, dt.p7_md5 as p7 from dt dt join rr rr on (rr.pt = 'ios' and rr.t > 1478185592 and dt.aid = rr.aid and dt.dic = rr.dic) where dt.pt = 'ios' and dt.t > 1478185592 and dt.bm = 0 limit 2000;
|
||||
id estRows task access object operator info
|
||||
Projection_10 0.00 root test.dt.id, test.dt.aid, test.dt.pt, test.dt.dic, test.dt.cm, test.rr.gid, test.rr.acd, test.rr.t, test.dt.p1, test.dt.p2, test.dt.p3, test.dt.p4, test.dt.p5, test.dt.p6_md5, test.dt.p7_md5
|
||||
@ -164,7 +164,7 @@ Projection_10 0.00 root test.dt.id, test.dt.aid, test.dt.pt, test.dt.dic, test.
|
||||
explain select pc,cr,count(DISTINCT uid) as pay_users,count(oid) as pay_times,sum(am) as am from pp where ps=2 and ppt>=1478188800 and ppt<1478275200 and pi in ('510017','520017') and uid in ('18089709','18090780') group by pc,cr;
|
||||
id estRows task access object operator info
|
||||
Projection_5 1.00 root test.pp.pc, test.pp.cr, Column#22, Column#23, Column#24
|
||||
└─HashAgg_7 1.00 root group by:test.pp.cr, test.pp.pc, funcs:count(distinct test.pp.uid)->Column#22, funcs:count(test.pp.oid)->Column#23, funcs:sum(test.pp.am)->Column#24, funcs:firstrow(test.pp.pc)->test.pp.pc, funcs:firstrow(test.pp.cr)->test.pp.cr
|
||||
└─HashAgg_6 1.00 root group by:test.pp.cr, test.pp.pc, funcs:count(distinct test.pp.uid)->Column#22, funcs:count(test.pp.oid)->Column#23, funcs:sum(test.pp.am)->Column#24, funcs:firstrow(test.pp.pc)->test.pp.pc, funcs:firstrow(test.pp.cr)->test.pp.cr
|
||||
└─IndexLookUp_21 0.00 root
|
||||
├─IndexRangeScan_18(Build) 0.40 cop[tikv] table:pp, index:sp(uid, pi) range:[18089709 510017,18089709 510017], [18089709 520017,18089709 520017], [18090780 510017,18090780 510017], [18090780 520017,18090780 520017], keep order:false, stats:pseudo
|
||||
└─Selection_20(Probe) 0.00 cop[tikv] eq(test.pp.ps, 2), ge(test.pp.ppt, 1478188800), lt(test.pp.ppt, 1478275200)
|
||||
|
||||
@ -150,11 +150,11 @@ Projection_10 170.34 root test.st.id, test.dd.id, test.st.aid, test.st.cm, test
|
||||
explain SELECT cm, p1, p2, p3, p4, p5, p6_md5, p7_md5, count(1) as click_pv, count(DISTINCT ip) as click_ip FROM st WHERE (t between 1478188800 and 1478275200) and aid='cn.sbkcq' and pt='android' GROUP BY cm, p1, p2, p3, p4, p5, p6_md5, p7_md5;
|
||||
id estRows task access object operator info
|
||||
Projection_5 39.28 root test.st.cm, test.st.p1, test.st.p2, test.st.p3, test.st.p4, test.st.p5, test.st.p6_md5, test.st.p7_md5, Column#20, Column#21
|
||||
└─HashAgg_7 39.28 root group by:test.st.cm, test.st.p1, test.st.p2, test.st.p3, test.st.p4, test.st.p5, test.st.p6_md5, test.st.p7_md5, funcs:count(1)->Column#20, funcs:count(distinct test.st.ip)->Column#21, funcs:firstrow(test.st.cm)->test.st.cm, funcs:firstrow(test.st.p1)->test.st.p1, funcs:firstrow(test.st.p2)->test.st.p2, funcs:firstrow(test.st.p3)->test.st.p3, funcs:firstrow(test.st.p4)->test.st.p4, funcs:firstrow(test.st.p5)->test.st.p5, funcs:firstrow(test.st.p6_md5)->test.st.p6_md5, funcs:firstrow(test.st.p7_md5)->test.st.p7_md5
|
||||
└─IndexLookUp_15 39.38 root
|
||||
├─IndexRangeScan_12(Build) 160.23 cop[tikv] table:st, index:t(t) range:[1478188800,1478275200], keep order:false
|
||||
└─Selection_14(Probe) 39.38 cop[tikv] eq(test.st.aid, "cn.sbkcq"), eq(test.st.pt, "android")
|
||||
└─TableRowIDScan_13 160.23 cop[tikv] table:st keep order:false
|
||||
└─HashAgg_6 39.28 root group by:test.st.cm, test.st.p1, test.st.p2, test.st.p3, test.st.p4, test.st.p5, test.st.p6_md5, test.st.p7_md5, funcs:count(1)->Column#20, funcs:count(distinct test.st.ip)->Column#21, funcs:firstrow(test.st.cm)->test.st.cm, funcs:firstrow(test.st.p1)->test.st.p1, funcs:firstrow(test.st.p2)->test.st.p2, funcs:firstrow(test.st.p3)->test.st.p3, funcs:firstrow(test.st.p4)->test.st.p4, funcs:firstrow(test.st.p5)->test.st.p5, funcs:firstrow(test.st.p6_md5)->test.st.p6_md5, funcs:firstrow(test.st.p7_md5)->test.st.p7_md5
|
||||
└─IndexLookUp_13 39.38 root
|
||||
├─IndexRangeScan_10(Build) 160.23 cop[tikv] table:st, index:t(t) range:[1478188800,1478275200], keep order:false
|
||||
└─Selection_12(Probe) 39.38 cop[tikv] eq(test.st.aid, "cn.sbkcq"), eq(test.st.pt, "android")
|
||||
└─TableRowIDScan_11 160.23 cop[tikv] table:st keep order:false
|
||||
explain select dt.id as id, dt.aid as aid, dt.pt as pt, dt.dic as dic, dt.cm as cm, rr.gid as gid, rr.acd as acd, rr.t as t,dt.p1 as p1, dt.p2 as p2, dt.p3 as p3, dt.p4 as p4, dt.p5 as p5, dt.p6_md5 as p6, dt.p7_md5 as p7 from dt dt join rr rr on (rr.pt = 'ios' and rr.t > 1478185592 and dt.aid = rr.aid and dt.dic = rr.dic) where dt.pt = 'ios' and dt.t > 1478185592 and dt.bm = 0 limit 2000;
|
||||
id estRows task access object operator info
|
||||
Projection_10 428.32 root test.dt.id, test.dt.aid, test.dt.pt, test.dt.dic, test.dt.cm, test.rr.gid, test.rr.acd, test.rr.t, test.dt.p1, test.dt.p2, test.dt.p3, test.dt.p4, test.dt.p5, test.dt.p6_md5, test.dt.p7_md5
|
||||
@ -171,11 +171,11 @@ Projection_10 428.32 root test.dt.id, test.dt.aid, test.dt.pt, test.dt.dic, tes
|
||||
explain select pc,cr,count(DISTINCT uid) as pay_users,count(oid) as pay_times,sum(am) as am from pp where ps=2 and ppt>=1478188800 and ppt<1478275200 and pi in ('510017','520017') and uid in ('18089709','18090780') group by pc,cr;
|
||||
id estRows task access object operator info
|
||||
Projection_5 207.86 root test.pp.pc, test.pp.cr, Column#22, Column#23, Column#24
|
||||
└─HashAgg_7 207.86 root group by:test.pp.cr, test.pp.pc, funcs:count(distinct test.pp.uid)->Column#22, funcs:count(test.pp.oid)->Column#23, funcs:sum(test.pp.am)->Column#24, funcs:firstrow(test.pp.pc)->test.pp.pc, funcs:firstrow(test.pp.cr)->test.pp.cr
|
||||
└─IndexLookUp_21 207.86 root
|
||||
├─IndexRangeScan_15(Build) 627.00 cop[tikv] table:pp, index:ps(ps) range:[2,2], keep order:false
|
||||
└─Selection_17(Probe) 207.86 cop[tikv] ge(test.pp.ppt, 1478188800), in(test.pp.pi, 510017, 520017), in(test.pp.uid, 18089709, 18090780), lt(test.pp.ppt, 1478275200)
|
||||
└─TableRowIDScan_16 627.00 cop[tikv] table:pp keep order:false
|
||||
└─HashAgg_6 207.86 root group by:test.pp.cr, test.pp.pc, funcs:count(distinct test.pp.uid)->Column#22, funcs:count(test.pp.oid)->Column#23, funcs:sum(test.pp.am)->Column#24, funcs:firstrow(test.pp.pc)->test.pp.pc, funcs:firstrow(test.pp.cr)->test.pp.cr
|
||||
└─IndexLookUp_17 207.86 root
|
||||
├─IndexRangeScan_14(Build) 627.00 cop[tikv] table:pp, index:ps(ps) range:[2,2], keep order:false
|
||||
└─Selection_16(Probe) 207.86 cop[tikv] ge(test.pp.ppt, 1478188800), in(test.pp.pi, 510017, 520017), in(test.pp.uid, 18089709, 18090780), lt(test.pp.ppt, 1478275200)
|
||||
└─TableRowIDScan_15 627.00 cop[tikv] table:pp keep order:false
|
||||
drop table if exists tbl_001;
|
||||
CREATE TABLE tbl_001 (a int, b int);
|
||||
load stats 's/explain_complex_stats_tbl_001.json';
|
||||
|
||||
@ -384,20 +384,20 @@ explain select a != any (select a from t t2) from t t1;
|
||||
id estRows task access object operator info
|
||||
Projection_8 10000.00 root and(or(or(gt(Column#8, 1), ne(test.t.a, Column#7)), if(ne(Column#9, 0), <nil>, 0)), and(ne(Column#10, 0), if(isnull(test.t.a), <nil>, 1)))->Column#11
|
||||
└─HashJoin_9 10000.00 root CARTESIAN inner join
|
||||
├─StreamAgg_16(Build) 1.00 root funcs:firstrow(Column#13)->Column#7, funcs:count(distinct Column#14)->Column#8, funcs:sum(Column#15)->Column#9, funcs:count(1)->Column#10
|
||||
│ └─Projection_26 10000.00 root test.t.a, test.t.a, cast(isnull(test.t.a), decimal(65,0) BINARY)->Column#15
|
||||
│ └─TableReader_23 10000.00 root data:TableFullScan_22
|
||||
│ └─TableFullScan_22 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo
|
||||
├─StreamAgg_14(Build) 1.00 root funcs:firstrow(Column#13)->Column#7, funcs:count(distinct Column#14)->Column#8, funcs:sum(Column#15)->Column#9, funcs:count(1)->Column#10
|
||||
│ └─Projection_19 10000.00 root test.t.a, test.t.a, cast(isnull(test.t.a), decimal(65,0) BINARY)->Column#15
|
||||
│ └─TableReader_18 10000.00 root data:TableFullScan_17
|
||||
│ └─TableFullScan_17 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo
|
||||
└─TableReader_12(Probe) 10000.00 root data:TableFullScan_11
|
||||
└─TableFullScan_11 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
|
||||
explain select a = all (select a from t t2) from t t1;
|
||||
id estRows task access object operator info
|
||||
Projection_8 10000.00 root or(and(and(le(Column#8, 1), eq(test.t.a, Column#7)), if(ne(Column#9, 0), <nil>, 1)), or(eq(Column#10, 0), if(isnull(test.t.a), <nil>, 0)))->Column#11
|
||||
└─HashJoin_9 10000.00 root CARTESIAN inner join
|
||||
├─StreamAgg_16(Build) 1.00 root funcs:firstrow(Column#13)->Column#7, funcs:count(distinct Column#14)->Column#8, funcs:sum(Column#15)->Column#9, funcs:count(1)->Column#10
|
||||
│ └─Projection_26 10000.00 root test.t.a, test.t.a, cast(isnull(test.t.a), decimal(65,0) BINARY)->Column#15
|
||||
│ └─TableReader_23 10000.00 root data:TableFullScan_22
|
||||
│ └─TableFullScan_22 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo
|
||||
├─StreamAgg_14(Build) 1.00 root funcs:firstrow(Column#13)->Column#7, funcs:count(distinct Column#14)->Column#8, funcs:sum(Column#15)->Column#9, funcs:count(1)->Column#10
|
||||
│ └─Projection_19 10000.00 root test.t.a, test.t.a, cast(isnull(test.t.a), decimal(65,0) BINARY)->Column#15
|
||||
│ └─TableReader_18 10000.00 root data:TableFullScan_17
|
||||
│ └─TableFullScan_17 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo
|
||||
└─TableReader_12(Probe) 10000.00 root data:TableFullScan_11
|
||||
└─TableFullScan_11 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
|
||||
drop table if exists t;
|
||||
|
||||
@ -928,17 +928,17 @@ p_size;
|
||||
id estRows task access object operator info
|
||||
Sort_13 14.41 root Column#23:desc, tpch.part.p_brand:asc, tpch.part.p_type:asc, tpch.part.p_size:asc
|
||||
└─Projection_15 14.41 root tpch.part.p_brand, tpch.part.p_type, tpch.part.p_size, Column#23
|
||||
└─HashAgg_18 14.41 root group by:tpch.part.p_brand, tpch.part.p_size, tpch.part.p_type, funcs:count(distinct tpch.partsupp.ps_suppkey)->Column#23, funcs:firstrow(tpch.part.p_brand)->tpch.part.p_brand, funcs:firstrow(tpch.part.p_type)->tpch.part.p_type, funcs:firstrow(tpch.part.p_size)->tpch.part.p_size
|
||||
└─HashJoin_30 3863988.24 root anti semi join, equal:[eq(tpch.partsupp.ps_suppkey, tpch.supplier.s_suppkey)]
|
||||
├─TableReader_68(Build) 400000.00 root data:Selection_67
|
||||
│ └─Selection_67 400000.00 cop[tikv] like(tpch.supplier.s_comment, "%Customer%Complaints%", 92)
|
||||
│ └─TableFullScan_66 500000.00 cop[tikv] table:supplier keep order:false
|
||||
└─IndexMergeJoin_38(Probe) 4829985.30 root inner join, inner:IndexReader_36, outer key:tpch.part.p_partkey, inner key:tpch.partsupp.ps_partkey
|
||||
├─TableReader_61(Build) 1200618.43 root data:Selection_60
|
||||
│ └─Selection_60 1200618.43 cop[tikv] in(tpch.part.p_size, 48, 19, 12, 4, 41, 7, 21, 39), ne(tpch.part.p_brand, "Brand#34"), not(like(tpch.part.p_type, "LARGE BRUSHED%", 92))
|
||||
│ └─TableFullScan_59 10000000.00 cop[tikv] table:part keep order:false
|
||||
└─IndexReader_36(Probe) 4.02 root index:IndexRangeScan_35
|
||||
└─IndexRangeScan_35 4.02 cop[tikv] table:partsupp, index:PRIMARY(PS_PARTKEY, PS_SUPPKEY) range: decided by [eq(tpch.partsupp.ps_partkey, tpch.part.p_partkey)], keep order:true
|
||||
└─HashAgg_16 14.41 root group by:tpch.part.p_brand, tpch.part.p_size, tpch.part.p_type, funcs:count(distinct tpch.partsupp.ps_suppkey)->Column#23, funcs:firstrow(tpch.part.p_brand)->tpch.part.p_brand, funcs:firstrow(tpch.part.p_type)->tpch.part.p_type, funcs:firstrow(tpch.part.p_size)->tpch.part.p_size
|
||||
└─HashJoin_28 3863988.24 root anti semi join, equal:[eq(tpch.partsupp.ps_suppkey, tpch.supplier.s_suppkey)]
|
||||
├─TableReader_66(Build) 400000.00 root data:Selection_65
|
||||
│ └─Selection_65 400000.00 cop[tikv] like(tpch.supplier.s_comment, "%Customer%Complaints%", 92)
|
||||
│ └─TableFullScan_64 500000.00 cop[tikv] table:supplier keep order:false
|
||||
└─IndexMergeJoin_36(Probe) 4829985.30 root inner join, inner:IndexReader_34, outer key:tpch.part.p_partkey, inner key:tpch.partsupp.ps_partkey
|
||||
├─TableReader_59(Build) 1200618.43 root data:Selection_58
|
||||
│ └─Selection_58 1200618.43 cop[tikv] in(tpch.part.p_size, 48, 19, 12, 4, 41, 7, 21, 39), ne(tpch.part.p_brand, "Brand#34"), not(like(tpch.part.p_type, "LARGE BRUSHED%", 92))
|
||||
│ └─TableFullScan_57 10000000.00 cop[tikv] table:part keep order:false
|
||||
└─IndexReader_34(Probe) 4.02 root index:IndexRangeScan_33
|
||||
└─IndexRangeScan_33 4.02 cop[tikv] table:partsupp, index:PRIMARY(PS_PARTKEY, PS_SUPPKEY) range: decided by [eq(tpch.partsupp.ps_partkey, tpch.part.p_partkey)], keep order:true
|
||||
/*
|
||||
Q17 Small-Quantity-Order Revenue Query
|
||||
This query determines how much average yearly revenue would be lost if orders were no longer filled for small
|
||||
|
||||
@ -346,6 +346,11 @@ func (s *testSuiteAgg) TestAggregation(c *C) {
|
||||
tk.MustQuery("select group_concat(a), group_concat(distinct a) from t").Check(testkit.Rows("<nil> <nil>"))
|
||||
tk.MustExec("insert into t value(1, null), (null, 1), (1, 2), (3, 4)")
|
||||
tk.MustQuery("select group_concat(a, b), group_concat(distinct a,b) from t").Check(testkit.Rows("12,34 12,34"))
|
||||
tk.MustExec("set @@session.tidb_opt_distinct_agg_push_down = 0")
|
||||
tk.MustQuery("select count(distinct a) from t;").Check(testkit.Rows("2"))
|
||||
tk.MustExec("set @@session.tidb_opt_distinct_agg_push_down = 1")
|
||||
tk.MustQuery("select count(distinct a) from t;").Check(testkit.Rows("2"))
|
||||
tk.MustExec("set @@session.tidb_opt_distinct_agg_push_down = 0")
|
||||
|
||||
tk.MustExec("drop table t")
|
||||
tk.MustExec("create table t(a decimal(10, 4))")
|
||||
|
||||
@ -27,7 +27,7 @@ import (
|
||||
// AggFuncToPBExpr converts aggregate function to pb.
|
||||
func AggFuncToPBExpr(sc *stmtctx.StatementContext, client kv.Client, aggFunc *AggFuncDesc) *tipb.Expr {
|
||||
if aggFunc.HasDistinct {
|
||||
return nil
|
||||
// do nothing and ignore aggFunc.HasDistinct
|
||||
}
|
||||
pc := expression.NewPBConverter(client, sc)
|
||||
var tp tipb.ExprType
|
||||
|
||||
@ -74,15 +74,6 @@ func (s *testEvaluatorSuite) TestAggFunc2Pb(c *C) {
|
||||
types.NewFieldType(mysql.TypeDouble),
|
||||
types.NewFieldType(mysql.TypeDouble),
|
||||
}
|
||||
for _, funcName := range funcNames {
|
||||
args := []expression.Expression{dg.genColumn(mysql.TypeDouble, 1)}
|
||||
aggFunc, err := NewAggFuncDesc(s.ctx, funcName, args, true)
|
||||
c.Assert(err, IsNil)
|
||||
pbExpr := AggFuncToPBExpr(sc, client, aggFunc)
|
||||
js, err := json.Marshal(pbExpr)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(string(js), Equals, "null")
|
||||
}
|
||||
|
||||
jsons := []string{
|
||||
`{"tp":3002,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}`,
|
||||
@ -94,13 +85,15 @@ func (s *testEvaluatorSuite) TestAggFunc2Pb(c *C) {
|
||||
`{"tp":3006,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}`,
|
||||
}
|
||||
for i, funcName := range funcNames {
|
||||
args := []expression.Expression{dg.genColumn(mysql.TypeDouble, 1)}
|
||||
aggFunc, err := NewAggFuncDesc(s.ctx, funcName, args, false)
|
||||
c.Assert(err, IsNil)
|
||||
aggFunc.RetTp = funcTypes[i]
|
||||
pbExpr := AggFuncToPBExpr(sc, client, aggFunc)
|
||||
js, err := json.Marshal(pbExpr)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(string(js), Equals, jsons[i])
|
||||
for _, hasDistinct := range []bool{true, false} {
|
||||
args := []expression.Expression{dg.genColumn(mysql.TypeDouble, 1)}
|
||||
aggFunc, err := NewAggFuncDesc(s.ctx, funcName, args, hasDistinct)
|
||||
c.Assert(err, IsNil)
|
||||
aggFunc.RetTp = funcTypes[i]
|
||||
pbExpr := AggFuncToPBExpr(sc, client, aggFunc)
|
||||
js, err := json.Marshal(pbExpr)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(string(js), Equals, jsons[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,9 +14,12 @@
|
||||
package cascades_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/session"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/mockstore"
|
||||
"github.com/pingcap/tidb/util/testkit"
|
||||
"github.com/pingcap/tidb/util/testutil"
|
||||
@ -171,6 +174,67 @@ func (s *testIntegrationSuite) TestAggregation(c *C) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testIntegrationSuite) TestPushdownDistinctEnable(c *C) {
|
||||
var (
|
||||
input []string
|
||||
output []struct {
|
||||
SQL string
|
||||
Plan []string
|
||||
Result []string
|
||||
}
|
||||
)
|
||||
s.testData.GetTestCases(c, &input, &output)
|
||||
vars := []string{
|
||||
fmt.Sprintf("set @@session.%s = 1", variable.TiDBOptDistinctAggPushDown),
|
||||
}
|
||||
s.doTestPushdownDistinct(c, vars, input, output)
|
||||
}
|
||||
|
||||
func (s *testIntegrationSuite) TestPushdownDistinctDisable(c *C) {
|
||||
var (
|
||||
input []string
|
||||
output []struct {
|
||||
SQL string
|
||||
Plan []string
|
||||
Result []string
|
||||
}
|
||||
)
|
||||
s.testData.GetTestCases(c, &input, &output)
|
||||
vars := []string{
|
||||
fmt.Sprintf("set @@session.%s = 0", variable.TiDBOptDistinctAggPushDown),
|
||||
}
|
||||
s.doTestPushdownDistinct(c, vars, input, output)
|
||||
}
|
||||
|
||||
func (s *testIntegrationSuite) doTestPushdownDistinct(c *C, vars, input []string, output []struct {
|
||||
SQL string
|
||||
Plan []string
|
||||
Result []string
|
||||
}) {
|
||||
tk := testkit.NewTestKitWithInit(c, s.store)
|
||||
tk.MustExec("drop table if exists t")
|
||||
tk.MustExec("create table t(a int, b int, c int, index(c))")
|
||||
tk.MustExec("insert into t values (1, 1, 1), (1, 1, 3), (1, 2, 3), (2, 1, 3), (1, 2, NULL);")
|
||||
tk.MustExec("set session sql_mode=''")
|
||||
tk.MustExec(fmt.Sprintf("set session %s=1", variable.TiDBHashAggPartialConcurrency))
|
||||
tk.MustExec(fmt.Sprintf("set session %s=1", variable.TiDBHashAggFinalConcurrency))
|
||||
tk.MustExec("set session tidb_enable_cascades_planner = 1")
|
||||
|
||||
for _, v := range vars {
|
||||
tk.MustExec(v)
|
||||
}
|
||||
|
||||
for i, ts := range input {
|
||||
s.testData.OnRecord(func() {
|
||||
output[i].SQL = ts
|
||||
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + ts).Rows())
|
||||
output[i].Result = s.testData.ConvertRowsToStrings(tk.MustQuery(ts).Sort().Rows())
|
||||
})
|
||||
tk.MustQuery("explain " + ts).Check(testkit.Rows(output[i].Plan...))
|
||||
tk.MustQuery(ts).Sort().Check(testkit.Rows(output[i].Result...))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testIntegrationSuite) TestSimplePlans(c *C) {
|
||||
tk := testkit.NewTestKitWithInit(c, s.store)
|
||||
tk.MustExec("drop table if exists t")
|
||||
|
||||
@ -47,6 +47,30 @@
|
||||
"select count(b), sum(b), avg(b), b, max(b), min(b), bit_and(b), bit_or(b), bit_xor(b) from t group by a having sum(b) >= 0 and count(b) >= 0 order by b"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "TestPushdownDistinctEnable",
|
||||
"cases": [
|
||||
"select /*+ HASH_AGG() */ a, count(distinct a) from t;", // firstrow(a) cannot be removed.
|
||||
"select /*+ HASH_AGG() */ avg(b), c, avg(b), count(distinct A, B), count(distinct A), count(distinct c), sum(b) from t group by c;",
|
||||
"select /*+ STREAM_AGG() */ count(distinct c) from t group by c;", // should push down after stream agg implemented
|
||||
"select /*+ STREAM_AGG() */ count(distinct c) from t;", // should push down after stream agg implemented
|
||||
"select /*+ HASH_AGG() */ count(distinct c) from t;",
|
||||
"select count(distinct c) from t group by c;",
|
||||
"select count(distinct c) from t;"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "TestPushdownDistinctDisable",
|
||||
"cases": [
|
||||
// do not pushdown even AGG_TO_COP is specified.
|
||||
"select /*+ HASH_AGG(), AGG_TO_COP() */ a, count(distinct a) from t;",
|
||||
"select /*+ HASH_AGG(), AGG_TO_COP() */ avg(b), c, avg(b), count(distinct A, B), count(distinct A), count(distinct c), sum(b) from t group by c;",
|
||||
"select /*+ STREAM_AGG(), AGG_TO_COP() */ count(distinct c) from t group by c;",
|
||||
"select /*+ STREAM_AGG(), AGG_TO_COP() */ count(distinct c) from t;",
|
||||
"select /*+ HASH_AGG(), AGG_TO_COP() */ count(distinct c) from t;",
|
||||
"select /*+ AGG_TO_COP() */ count(distinct c) from t group by c;"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "TestSimplePlans",
|
||||
"cases": [
|
||||
|
||||
181
planner/cascades/testdata/integration_suite_out.json
vendored
181
planner/cascades/testdata/integration_suite_out.json
vendored
@ -257,9 +257,9 @@
|
||||
"Plan": [
|
||||
"Projection_18 6400.00 root Column#3, Column#4",
|
||||
"└─Sort_30 6400.00 root Column#3:asc",
|
||||
" └─HashAgg_26 6400.00 root group by:Column#9, funcs:sum(Column#7)->Column#4, funcs:firstrow(Column#9)->Column#3",
|
||||
" └─HashAgg_26 6400.00 root group by:Column#7, funcs:sum(Column#8)->Column#4, funcs:firstrow(Column#7)->Column#3",
|
||||
" └─TableReader_27 6400.00 root data:HashAgg_28",
|
||||
" └─HashAgg_28 6400.00 cop[tikv] group by:plus(test.t.a, test.t.b), funcs:sum(test.t.a)->Column#7",
|
||||
" └─HashAgg_28 6400.00 cop[tikv] group by:plus(test.t.a, test.t.b), funcs:sum(test.t.a)->Column#8",
|
||||
" └─Selection_23 8000.00 cop[tikv] gt(plus(test.t.a, test.t.b), 1)",
|
||||
" └─TableFullScan_24 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
@ -454,6 +454,183 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name": "TestPushdownDistinctEnable",
|
||||
"Cases": [
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG() */ a, count(distinct a) from t;",
|
||||
"Plan": [
|
||||
"Projection_8 1.00 root test.t.a, Column#5",
|
||||
"└─HashAgg_12 1.00 root funcs:count(distinct test.t.a)->Column#5, funcs:firstrow(Column#7)->test.t.a",
|
||||
" └─TableReader_13 8000.00 root data:HashAgg_14",
|
||||
" └─HashAgg_14 8000.00 cop[tikv] group by:test.t.a, funcs:firstrow(test.t.a)->Column#7",
|
||||
" └─TableFullScan_11 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"1 2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG() */ avg(b), c, avg(b), count(distinct A, B), count(distinct A), count(distinct c), sum(b) from t group by c;",
|
||||
"Plan": [
|
||||
"Projection_8 8000.00 root Column#5, test.t.c, Column#5, Column#6, Column#7, Column#8, Column#9",
|
||||
"└─HashAgg_13 8000.00 root group by:test.t.c, funcs:avg(Column#11, Column#12)->Column#5, funcs:count(distinct test.t.a, test.t.b)->Column#6, funcs:count(distinct test.t.a)->Column#7, funcs:count(distinct test.t.c)->Column#8, funcs:sum(Column#13)->Column#9, funcs:firstrow(test.t.c)->test.t.c",
|
||||
" └─TableReader_14 8000.00 root data:HashAgg_15",
|
||||
" └─HashAgg_15 8000.00 cop[tikv] group by:test.t.a, test.t.b, test.t.c, funcs:avg(test.t.b)->Column#11, funcs:sum(test.t.b)->Column#12",
|
||||
" └─TableFullScan_11 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"1.0000 1 1.0000 1 1 1 1",
|
||||
"1.3333 3 1.3333 3 2 1 4",
|
||||
"2.0000 <nil> 2.0000 1 1 0 2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ STREAM_AGG() */ count(distinct c) from t group by c;",
|
||||
"Plan": [
|
||||
"HashAgg_17 8000.00 root group by:test.t.c, funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_18 8000.00 root index:HashAgg_19",
|
||||
" └─HashAgg_19 8000.00 cop[tikv] group by:test.t.c, ",
|
||||
" └─IndexFullScan_16 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"0",
|
||||
"1",
|
||||
"1"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ STREAM_AGG() */ count(distinct c) from t;",
|
||||
"Plan": [
|
||||
"HashAgg_17 1.00 root funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_18 8000.00 root index:HashAgg_19",
|
||||
" └─HashAgg_19 8000.00 cop[tikv] group by:test.t.c, ",
|
||||
" └─IndexFullScan_16 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG() */ count(distinct c) from t;",
|
||||
"Plan": [
|
||||
"HashAgg_17 1.00 root funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_18 8000.00 root index:HashAgg_19",
|
||||
" └─HashAgg_19 8000.00 cop[tikv] group by:test.t.c, ",
|
||||
" └─IndexFullScan_16 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select count(distinct c) from t group by c;",
|
||||
"Plan": [
|
||||
"HashAgg_17 8000.00 root group by:test.t.c, funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_18 8000.00 root index:HashAgg_19",
|
||||
" └─HashAgg_19 8000.00 cop[tikv] group by:test.t.c, ",
|
||||
" └─IndexFullScan_16 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"0",
|
||||
"1",
|
||||
"1"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select count(distinct c) from t;",
|
||||
"Plan": [
|
||||
"HashAgg_17 1.00 root funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_18 8000.00 root index:HashAgg_19",
|
||||
" └─HashAgg_19 8000.00 cop[tikv] group by:test.t.c, ",
|
||||
" └─IndexFullScan_16 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"2"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name": "TestPushdownDistinctDisable",
|
||||
"Cases": [
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ a, count(distinct a) from t;",
|
||||
"Plan": [
|
||||
"Projection_6 1.00 root test.t.a, Column#5",
|
||||
"└─HashAgg_7 1.00 root funcs:count(distinct test.t.a)->Column#5, funcs:firstrow(test.t.a)->test.t.a",
|
||||
" └─TableReader_8 10000.00 root data:TableFullScan_9",
|
||||
" └─TableFullScan_9 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"1 2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ avg(b), c, avg(b), count(distinct A, B), count(distinct A), count(distinct c), sum(b) from t group by c;",
|
||||
"Plan": [
|
||||
"Projection_6 8000.00 root Column#5, test.t.c, Column#5, Column#6, Column#7, Column#8, Column#9",
|
||||
"└─HashAgg_7 8000.00 root group by:Column#18, funcs:avg(Column#11)->Column#5, funcs:count(distinct Column#12, Column#13)->Column#6, funcs:count(distinct Column#14)->Column#7, funcs:count(distinct Column#15)->Column#8, funcs:sum(Column#16)->Column#9, funcs:firstrow(Column#17)->test.t.c",
|
||||
" └─Projection_10 10000.00 root cast(test.t.b, decimal(65,4) BINARY)->Column#11, test.t.a, test.t.b, test.t.a, test.t.c, cast(test.t.b, decimal(65,0) BINARY)->Column#16, test.t.c, test.t.c",
|
||||
" └─TableReader_8 10000.00 root data:TableFullScan_9",
|
||||
" └─TableFullScan_9 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"1.0000 1 1.0000 1 1 1 1",
|
||||
"1.3333 3 1.3333 3 2 1 4",
|
||||
"2.0000 <nil> 2.0000 1 1 0 2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ STREAM_AGG(), AGG_TO_COP() */ count(distinct c) from t group by c;",
|
||||
"Plan": [
|
||||
"HashAgg_8 8000.00 root group by:test.t.c, funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_11 10000.00 root index:IndexFullScan_12",
|
||||
" └─IndexFullScan_12 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"0",
|
||||
"1",
|
||||
"1"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ STREAM_AGG(), AGG_TO_COP() */ count(distinct c) from t;",
|
||||
"Plan": [
|
||||
"HashAgg_8 1.00 root funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_11 10000.00 root index:IndexFullScan_12",
|
||||
" └─IndexFullScan_12 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ count(distinct c) from t;",
|
||||
"Plan": [
|
||||
"HashAgg_8 1.00 root funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_11 10000.00 root index:IndexFullScan_12",
|
||||
" └─IndexFullScan_12 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ AGG_TO_COP() */ count(distinct c) from t group by c;",
|
||||
"Plan": [
|
||||
"HashAgg_8 8000.00 root group by:test.t.c, funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_11 10000.00 root index:IndexFullScan_12",
|
||||
" └─IndexFullScan_12 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"0",
|
||||
"1",
|
||||
"1"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name": "TestSimplePlans",
|
||||
"Cases": [
|
||||
|
||||
@ -391,12 +391,22 @@ func NewRulePushAggDownGather() Transformation {
|
||||
|
||||
// Match implements Transformation interface.
|
||||
func (r *PushAggDownGather) Match(expr *memo.ExprIter) bool {
|
||||
if expr.GetExpr().HasAppliedRule(r) {
|
||||
return false
|
||||
}
|
||||
agg := expr.GetExpr().ExprNode.(*plannercore.LogicalAggregation)
|
||||
for _, aggFunc := range agg.AggFuncs {
|
||||
if aggFunc.Mode != aggregation.CompleteMode {
|
||||
return false
|
||||
}
|
||||
}
|
||||
if agg.HasDistinct() {
|
||||
// TODO: remove this logic after the cost estimation of distinct pushdown is implemented.
|
||||
// If AllowDistinctAggPushDown is set to true, we should not consider RootTask.
|
||||
if !agg.SCtx().GetSessionVars().AllowDistinctAggPushDown {
|
||||
return false
|
||||
}
|
||||
}
|
||||
childEngine := expr.Children[0].GetExpr().Children[0].EngineType
|
||||
if childEngine != memo.EngineTiKV {
|
||||
// TODO: Remove this check when we have implemented TiFlashAggregation.
|
||||
@ -414,47 +424,44 @@ func (r *PushAggDownGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.Gr
|
||||
childGroup := old.Children[0].GetExpr().Children[0]
|
||||
// The old Aggregation should stay unchanged for other transformation.
|
||||
// So we build a new LogicalAggregation for the partialAgg.
|
||||
partialAggFuncs := make([]*aggregation.AggFuncDesc, len(agg.AggFuncs))
|
||||
for i, aggFunc := range agg.AggFuncs {
|
||||
newAggFunc := &aggregation.AggFuncDesc{
|
||||
HasDistinct: false,
|
||||
Mode: aggregation.Partial1Mode,
|
||||
}
|
||||
newAggFunc.Name = aggFunc.Name
|
||||
newAggFunc.RetTp = aggFunc.RetTp
|
||||
// The args will be changed below, so that we have to build a new slice for it.
|
||||
newArgs := make([]expression.Expression, len(aggFunc.Args))
|
||||
copy(newArgs, aggFunc.Args)
|
||||
newAggFunc.Args = newArgs
|
||||
partialAggFuncs[i] = newAggFunc
|
||||
aggFuncs := make([]*aggregation.AggFuncDesc, len(agg.AggFuncs))
|
||||
for i := range agg.AggFuncs {
|
||||
aggFuncs[i] = agg.AggFuncs[i].Clone()
|
||||
}
|
||||
partialGbyItems := make([]expression.Expression, len(agg.GroupByItems))
|
||||
copy(partialGbyItems, agg.GroupByItems)
|
||||
gbyItems := make([]expression.Expression, len(agg.GroupByItems))
|
||||
copy(gbyItems, agg.GroupByItems)
|
||||
|
||||
partialPref, finalPref, funcMap := plannercore.BuildFinalModeAggregation(agg.SCtx(),
|
||||
&plannercore.AggInfo{
|
||||
AggFuncs: aggFuncs,
|
||||
GroupByItems: gbyItems,
|
||||
Schema: aggSchema,
|
||||
})
|
||||
// Remove unnecessary FirstRow.
|
||||
partialPref.AggFuncs =
|
||||
plannercore.RemoveUnnecessaryFirstRow(agg.SCtx(), finalPref.AggFuncs, finalPref.GroupByItems, partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, funcMap)
|
||||
|
||||
partialAgg := plannercore.LogicalAggregation{
|
||||
AggFuncs: partialAggFuncs,
|
||||
GroupByItems: partialGbyItems,
|
||||
AggFuncs: partialPref.AggFuncs,
|
||||
GroupByItems: partialPref.GroupByItems,
|
||||
}.Init(agg.SCtx(), agg.SelectBlockOffset())
|
||||
partialAgg.CopyAggHints(agg)
|
||||
|
||||
finalAggFuncs, finalGbyItems, partialSchema :=
|
||||
plannercore.BuildFinalModeAggregation(partialAgg.SCtx(), partialAgg.AggFuncs, partialAgg.GroupByItems, aggSchema)
|
||||
// Remove unnecessary FirstRow.
|
||||
partialAgg.AggFuncs =
|
||||
plannercore.RemoveUnnecessaryFirstRow(partialAgg.SCtx(), finalAggFuncs, finalGbyItems, partialAgg.AggFuncs, partialAgg.GroupByItems, partialSchema)
|
||||
finalAgg := plannercore.LogicalAggregation{
|
||||
AggFuncs: finalAggFuncs,
|
||||
GroupByItems: finalGbyItems,
|
||||
AggFuncs: finalPref.AggFuncs,
|
||||
GroupByItems: finalPref.GroupByItems,
|
||||
}.Init(agg.SCtx(), agg.SelectBlockOffset())
|
||||
finalAgg.CopyAggHints(agg)
|
||||
|
||||
partialAggExpr := memo.NewGroupExpr(partialAgg)
|
||||
partialAggExpr.SetChildren(childGroup)
|
||||
partialAggGroup := memo.NewGroupWithSchema(partialAggExpr, partialSchema).SetEngineType(childGroup.EngineType)
|
||||
partialAggGroup := memo.NewGroupWithSchema(partialAggExpr, partialPref.Schema).SetEngineType(childGroup.EngineType)
|
||||
gatherExpr := memo.NewGroupExpr(gather)
|
||||
gatherExpr.SetChildren(partialAggGroup)
|
||||
gatherGroup := memo.NewGroupWithSchema(gatherExpr, partialSchema)
|
||||
gatherGroup := memo.NewGroupWithSchema(gatherExpr, partialPref.Schema)
|
||||
finalAggExpr := memo.NewGroupExpr(finalAgg)
|
||||
finalAggExpr.SetChildren(gatherGroup)
|
||||
finalAggExpr.AddAppliedRule(r)
|
||||
// We don't erase the old complete mode Aggregation because
|
||||
// this transformation would not always be better.
|
||||
return []*memo.GroupExpr{finalAggExpr}, false, false, nil
|
||||
|
||||
@ -1610,7 +1610,13 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope
|
||||
}
|
||||
|
||||
taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType}
|
||||
if !la.aggHints.preferAggToCop {
|
||||
if la.HasDistinct() {
|
||||
// TODO: remove this logic after the cost estimation of distinct pushdown is implemented.
|
||||
// If AllowDistinctAggPushDown is set to true, we should not consider RootTask.
|
||||
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
|
||||
taskTypes = []property.TaskType{property.RootTaskType}
|
||||
}
|
||||
} else if !la.aggHints.preferAggToCop {
|
||||
taskTypes = append(taskTypes, property.RootTaskType)
|
||||
}
|
||||
for _, taskTp := range taskTypes {
|
||||
@ -1628,6 +1634,19 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope
|
||||
return enforcedAggs
|
||||
}
|
||||
|
||||
func (la *LogicalAggregation) distinctArgsMeetsProperty() bool {
|
||||
for _, aggFunc := range la.AggFuncs {
|
||||
if aggFunc.HasDistinct {
|
||||
for _, distinctArg := range aggFunc.Args {
|
||||
if !expression.Contains(la.GroupByItems, distinctArg) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan {
|
||||
all, desc := prop.AllSameOrder()
|
||||
if !all {
|
||||
@ -1657,7 +1676,17 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
|
||||
// The table read of "CopDoubleReadTaskType" can't promises the sort
|
||||
// property that the stream aggregation required, no need to consider.
|
||||
taskTypes := []property.TaskType{property.CopSingleReadTaskType}
|
||||
if !la.aggHints.preferAggToCop {
|
||||
if la.HasDistinct() {
|
||||
// TODO: remove this logic after the cost estimation of distinct pushdown is implemented.
|
||||
// If AllowDistinctAggPushDown is set to true, we should not consider RootTask.
|
||||
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
|
||||
taskTypes = []property.TaskType{property.RootTaskType}
|
||||
} else {
|
||||
if !la.distinctArgsMeetsProperty() {
|
||||
continue
|
||||
}
|
||||
}
|
||||
} else if !la.aggHints.preferAggToCop {
|
||||
taskTypes = append(taskTypes, property.RootTaskType)
|
||||
}
|
||||
for _, taskTp := range taskTypes {
|
||||
@ -1687,7 +1716,13 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
|
||||
}
|
||||
hashAggs := make([]PhysicalPlan, 0, len(wholeTaskTypes))
|
||||
taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType}
|
||||
if !la.aggHints.preferAggToCop {
|
||||
if la.HasDistinct() {
|
||||
// TODO: remove this logic after the cost estimation of distinct pushdown is implemented.
|
||||
// If AllowDistinctAggPushDown is set to true, we should not consider RootTask.
|
||||
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
|
||||
taskTypes = []property.TaskType{property.RootTaskType}
|
||||
}
|
||||
} else if !la.aggHints.preferAggToCop {
|
||||
taskTypes = append(taskTypes, property.RootTaskType)
|
||||
}
|
||||
for _, taskTp := range taskTypes {
|
||||
|
||||
@ -312,6 +312,16 @@ type LogicalAggregation struct {
|
||||
inputCount float64 // inputCount is the input count of this plan.
|
||||
}
|
||||
|
||||
// HasDistinct shows whether LogicalAggregation has functions with distinct.
|
||||
func (la *LogicalAggregation) HasDistinct() bool {
|
||||
for _, aggFunc := range la.AggFuncs {
|
||||
if aggFunc.HasDistinct {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// CopyAggHints copies the aggHints from another LogicalAggregation.
|
||||
func (la *LogicalAggregation) CopyAggHints(agg *LogicalAggregation) {
|
||||
// TODO: Copy the hint may make the un-applicable hint throw the
|
||||
|
||||
@ -15,6 +15,7 @@ package core_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/parser"
|
||||
@ -29,6 +30,7 @@ import (
|
||||
"github.com/pingcap/tidb/session"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/util/hint"
|
||||
"github.com/pingcap/tidb/util/testkit"
|
||||
"github.com/pingcap/tidb/util/testleak"
|
||||
@ -843,6 +845,75 @@ func (s *testPlanSuite) TestAggToCopHint(c *C) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testPlanSuite) TestPushdownDistinctEnable(c *C) {
|
||||
defer testleak.AfterTest(c)()
|
||||
var (
|
||||
input []string
|
||||
output []struct {
|
||||
SQL string
|
||||
Plan []string
|
||||
Result []string
|
||||
}
|
||||
)
|
||||
s.testData.GetTestCases(c, &input, &output)
|
||||
vars := []string{
|
||||
fmt.Sprintf("set @@session.%s = 1", variable.TiDBOptDistinctAggPushDown),
|
||||
}
|
||||
s.doTestPushdownDistinct(c, vars, input, output)
|
||||
}
|
||||
|
||||
func (s *testPlanSuite) TestPushdownDistinctDisable(c *C) {
|
||||
defer testleak.AfterTest(c)()
|
||||
var (
|
||||
input []string
|
||||
output []struct {
|
||||
SQL string
|
||||
Plan []string
|
||||
Result []string
|
||||
}
|
||||
)
|
||||
s.testData.GetTestCases(c, &input, &output)
|
||||
vars := []string{
|
||||
fmt.Sprintf("set @@session.%s = 0", variable.TiDBOptDistinctAggPushDown),
|
||||
}
|
||||
s.doTestPushdownDistinct(c, vars, input, output)
|
||||
}
|
||||
|
||||
func (s *testPlanSuite) doTestPushdownDistinct(c *C, vars, input []string, output []struct {
|
||||
SQL string
|
||||
Plan []string
|
||||
Result []string
|
||||
}) {
|
||||
store, dom, err := newStoreWithBootstrap()
|
||||
c.Assert(err, IsNil)
|
||||
defer func() {
|
||||
dom.Close()
|
||||
store.Close()
|
||||
}()
|
||||
tk := testkit.NewTestKit(c, store)
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("drop table if exists t")
|
||||
tk.MustExec("create table t(a int, b int, c int, index(c))")
|
||||
tk.MustExec("insert into t values (1, 1, 1), (1, 1, 3), (1, 2, 3), (2, 1, 3), (1, 2, NULL);")
|
||||
tk.MustExec("set session sql_mode=''")
|
||||
tk.MustExec(fmt.Sprintf("set session %s=1", variable.TiDBHashAggPartialConcurrency))
|
||||
tk.MustExec(fmt.Sprintf("set session %s=1", variable.TiDBHashAggFinalConcurrency))
|
||||
|
||||
for _, v := range vars {
|
||||
tk.MustExec(v)
|
||||
}
|
||||
|
||||
for i, ts := range input {
|
||||
s.testData.OnRecord(func() {
|
||||
output[i].SQL = ts
|
||||
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + ts).Rows())
|
||||
output[i].Result = s.testData.ConvertRowsToStrings(tk.MustQuery(ts).Sort().Rows())
|
||||
})
|
||||
tk.MustQuery("explain " + ts).Check(testkit.Rows(output[i].Plan...))
|
||||
tk.MustQuery(ts).Sort().Check(testkit.Rows(output[i].Result...))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testPlanSuite) TestHintAlias(c *C) {
|
||||
defer testleak.AfterTest(c)()
|
||||
store, dom, err := newStoreWithBootstrap()
|
||||
|
||||
@ -1016,47 +1016,34 @@ func CheckAggCanPushCop(sctx sessionctx.Context, aggFuncs []*aggregation.AggFunc
|
||||
return expression.CanExprsPushDown(sc, groupByItems, client, storeType)
|
||||
}
|
||||
|
||||
// AggInfo stores the information of an Aggregation.
|
||||
type AggInfo struct {
|
||||
AggFuncs []*aggregation.AggFuncDesc
|
||||
GroupByItems []expression.Expression
|
||||
Schema *expression.Schema
|
||||
}
|
||||
|
||||
// BuildFinalModeAggregation splits either LogicalAggregation or PhysicalAggregation to finalAgg and partial1Agg,
|
||||
// returns the body of finalAgg and the schema of partialAgg.
|
||||
func BuildFinalModeAggregation(
|
||||
sctx sessionctx.Context,
|
||||
aggFuncs []*aggregation.AggFuncDesc,
|
||||
groupByItems []expression.Expression,
|
||||
finalSchema *expression.Schema) (finalAggFuncs []*aggregation.AggFuncDesc, finalGbyItems []expression.Expression, partialSchema *expression.Schema) {
|
||||
// TODO: Refactor the way of constructing aggregation functions.
|
||||
partialSchema = expression.NewSchema()
|
||||
partialCursor := 0
|
||||
finalAggFuncs = make([]*aggregation.AggFuncDesc, len(aggFuncs))
|
||||
for i, aggFunc := range aggFuncs {
|
||||
finalAggFunc := &aggregation.AggFuncDesc{HasDistinct: false}
|
||||
finalAggFunc.Name = aggFunc.Name
|
||||
args := make([]expression.Expression, 0, len(aggFunc.Args))
|
||||
if aggregation.NeedCount(finalAggFunc.Name) {
|
||||
ft := types.NewFieldType(mysql.TypeLonglong)
|
||||
ft.Flen, ft.Charset, ft.Collate = 21, charset.CharsetBin, charset.CollationBin
|
||||
partialSchema.Append(&expression.Column{
|
||||
UniqueID: sctx.GetSessionVars().AllocPlanColumnID(),
|
||||
RetType: ft,
|
||||
})
|
||||
args = append(args, partialSchema.Columns[partialCursor])
|
||||
partialCursor++
|
||||
}
|
||||
if aggregation.NeedValue(finalAggFunc.Name) {
|
||||
partialSchema.Append(&expression.Column{
|
||||
UniqueID: sctx.GetSessionVars().AllocPlanColumnID(),
|
||||
RetType: finalSchema.Columns[i].GetType(),
|
||||
})
|
||||
args = append(args, partialSchema.Columns[partialCursor])
|
||||
partialCursor++
|
||||
}
|
||||
finalAggFunc.Args = args
|
||||
finalAggFunc.Mode = aggregation.FinalMode
|
||||
finalAggFunc.RetTp = aggFunc.RetTp
|
||||
finalAggFuncs[i] = finalAggFunc
|
||||
sctx sessionctx.Context, original *AggInfo) (partial, final *AggInfo, funcMap map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc) {
|
||||
|
||||
funcMap = make(map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc, len(original.AggFuncs))
|
||||
partial = &AggInfo{
|
||||
AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(original.AggFuncs)),
|
||||
GroupByItems: original.GroupByItems,
|
||||
Schema: expression.NewSchema(),
|
||||
}
|
||||
partialCursor := 0
|
||||
final = &AggInfo{
|
||||
AggFuncs: make([]*aggregation.AggFuncDesc, len(original.AggFuncs)),
|
||||
GroupByItems: make([]expression.Expression, 0, len(original.GroupByItems)),
|
||||
Schema: original.Schema,
|
||||
}
|
||||
|
||||
partialGbySchema := expression.NewSchema()
|
||||
// add group by columns
|
||||
finalGbyItems = make([]expression.Expression, 0, len(groupByItems))
|
||||
for _, gbyExpr := range groupByItems {
|
||||
for _, gbyExpr := range partial.GroupByItems {
|
||||
var gbyCol *expression.Column
|
||||
if col, ok := gbyExpr.(*expression.Column); ok {
|
||||
gbyCol = col
|
||||
@ -1066,9 +1053,87 @@ func BuildFinalModeAggregation(
|
||||
RetType: gbyExpr.GetType(),
|
||||
}
|
||||
}
|
||||
partialSchema.Append(gbyCol)
|
||||
finalGbyItems = append(finalGbyItems, gbyCol)
|
||||
partialGbySchema.Append(gbyCol)
|
||||
final.GroupByItems = append(final.GroupByItems, gbyCol)
|
||||
}
|
||||
|
||||
// TODO: Refactor the way of constructing aggregation functions.
|
||||
// This fop loop is ugly, but I do not find a proper way to reconstruct
|
||||
// it right away.
|
||||
for i, aggFunc := range original.AggFuncs {
|
||||
finalAggFunc := &aggregation.AggFuncDesc{HasDistinct: false}
|
||||
finalAggFunc.Name = aggFunc.Name
|
||||
args := make([]expression.Expression, 0, len(aggFunc.Args))
|
||||
if aggFunc.HasDistinct {
|
||||
/*
|
||||
eg: SELECT COUNT(DISTINCT a), SUM(b) FROM t GROUP BY c
|
||||
|
||||
change from
|
||||
[root] group by: c, funcs:count(distinct a), funcs:sum(b)
|
||||
to
|
||||
[root] group by: c, funcs:count(distinct a), funcs:sum(b)
|
||||
[cop]: group by: c, a
|
||||
*/
|
||||
for _, distinctArg := range aggFunc.Args {
|
||||
// 1. add all args to partial.GroupByItems
|
||||
foundInGroupBy := false
|
||||
for j, gbyExpr := range partial.GroupByItems {
|
||||
if gbyExpr.Equal(sctx, distinctArg) {
|
||||
foundInGroupBy = true
|
||||
args = append(args, partialGbySchema.Columns[j])
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundInGroupBy {
|
||||
partial.GroupByItems = append(partial.GroupByItems, distinctArg)
|
||||
var gbyCol *expression.Column
|
||||
if col, ok := distinctArg.(*expression.Column); ok {
|
||||
gbyCol = col
|
||||
} else {
|
||||
gbyCol = &expression.Column{
|
||||
UniqueID: sctx.GetSessionVars().AllocPlanColumnID(),
|
||||
RetType: distinctArg.GetType(),
|
||||
}
|
||||
}
|
||||
partialGbySchema.Append(gbyCol)
|
||||
args = append(args, gbyCol)
|
||||
}
|
||||
}
|
||||
// Just use groupBy items in Schema of partialAgg as arguments,
|
||||
// no need to spawn FirstRow function.
|
||||
|
||||
finalAggFunc.HasDistinct = true
|
||||
finalAggFunc.Mode = aggregation.CompleteMode
|
||||
} else {
|
||||
if aggregation.NeedCount(finalAggFunc.Name) {
|
||||
ft := types.NewFieldType(mysql.TypeLonglong)
|
||||
ft.Flen, ft.Charset, ft.Collate = 21, charset.CharsetBin, charset.CollationBin
|
||||
partial.Schema.Append(&expression.Column{
|
||||
UniqueID: sctx.GetSessionVars().AllocPlanColumnID(),
|
||||
RetType: ft,
|
||||
})
|
||||
args = append(args, partial.Schema.Columns[partialCursor])
|
||||
partialCursor++
|
||||
}
|
||||
if aggregation.NeedValue(finalAggFunc.Name) {
|
||||
partial.Schema.Append(&expression.Column{
|
||||
UniqueID: sctx.GetSessionVars().AllocPlanColumnID(),
|
||||
RetType: original.Schema.Columns[i].GetType(),
|
||||
})
|
||||
args = append(args, partial.Schema.Columns[partialCursor])
|
||||
partialCursor++
|
||||
}
|
||||
partial.AggFuncs = append(partial.AggFuncs, aggFunc)
|
||||
|
||||
finalAggFunc.Mode = aggregation.FinalMode
|
||||
funcMap[aggFunc] = finalAggFunc
|
||||
}
|
||||
|
||||
finalAggFunc.Args = args
|
||||
finalAggFunc.RetTp = aggFunc.RetTp
|
||||
final.AggFuncs[i] = finalAggFunc
|
||||
}
|
||||
partial.Schema.Append(partialGbySchema.Columns...)
|
||||
return
|
||||
}
|
||||
|
||||
@ -1077,37 +1142,47 @@ func (p *basePhysicalAgg) newPartialAggregate(copTaskType kv.StoreType) (partial
|
||||
if !CheckAggCanPushCop(p.ctx, p.AggFuncs, p.GroupByItems, copTaskType) {
|
||||
return nil, p.self
|
||||
}
|
||||
finalAggFuncs, finalGbyItems, partialSchema := BuildFinalModeAggregation(p.ctx, p.AggFuncs, p.GroupByItems, p.schema)
|
||||
partialPref, finalPref, funcMap := BuildFinalModeAggregation(p.ctx, &AggInfo{
|
||||
AggFuncs: p.AggFuncs,
|
||||
GroupByItems: p.GroupByItems,
|
||||
Schema: p.Schema().Clone(),
|
||||
})
|
||||
if p.tp == plancodec.TypeStreamAgg && len(partialPref.GroupByItems) != len(finalPref.GroupByItems) {
|
||||
return nil, p.self
|
||||
}
|
||||
// Remove unnecessary FirstRow.
|
||||
p.AggFuncs = RemoveUnnecessaryFirstRow(p.ctx, finalAggFuncs, finalGbyItems, p.AggFuncs, p.GroupByItems, partialSchema)
|
||||
partialPref.AggFuncs = RemoveUnnecessaryFirstRow(p.ctx,
|
||||
finalPref.AggFuncs, finalPref.GroupByItems,
|
||||
partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, funcMap)
|
||||
if copTaskType == kv.TiDB {
|
||||
// For partial agg of TiDB cop task, since TiDB coprocessor reuse the TiDB executor,
|
||||
// and TiDB aggregation executor won't output the group by value,
|
||||
// so we need add `firstrow` aggregation function to output the group by value.
|
||||
aggFuncs, err := genFirstRowAggForGroupBy(p.ctx, p.GroupByItems)
|
||||
aggFuncs, err := genFirstRowAggForGroupBy(p.ctx, partialPref.GroupByItems)
|
||||
if err != nil {
|
||||
return nil, p.self
|
||||
}
|
||||
p.AggFuncs = append(p.AggFuncs, aggFuncs...)
|
||||
partialPref.AggFuncs = append(partialPref.AggFuncs, aggFuncs...)
|
||||
}
|
||||
finalSchema := p.schema
|
||||
p.schema = partialSchema
|
||||
p.AggFuncs = partialPref.AggFuncs
|
||||
p.GroupByItems = partialPref.GroupByItems
|
||||
p.schema = partialPref.Schema
|
||||
partialAgg := p.self
|
||||
// Create physical "final" aggregation.
|
||||
if p.tp == plancodec.TypeStreamAgg {
|
||||
finalAgg := basePhysicalAgg{
|
||||
AggFuncs: finalAggFuncs,
|
||||
GroupByItems: finalGbyItems,
|
||||
AggFuncs: finalPref.AggFuncs,
|
||||
GroupByItems: finalPref.GroupByItems,
|
||||
}.initForStream(p.ctx, p.stats, p.blockOffset)
|
||||
finalAgg.schema = finalSchema
|
||||
finalAgg.schema = finalPref.Schema
|
||||
return partialAgg, finalAgg
|
||||
}
|
||||
|
||||
finalAgg := basePhysicalAgg{
|
||||
AggFuncs: finalAggFuncs,
|
||||
GroupByItems: finalGbyItems,
|
||||
AggFuncs: finalPref.AggFuncs,
|
||||
GroupByItems: finalPref.GroupByItems,
|
||||
}.initForHash(p.ctx, p.stats, p.blockOffset)
|
||||
finalAgg.schema = finalSchema
|
||||
finalAgg.schema = finalPref.Schema
|
||||
return partialAgg, finalAgg
|
||||
}
|
||||
|
||||
@ -1136,16 +1211,28 @@ func RemoveUnnecessaryFirstRow(
|
||||
finalGbyItems []expression.Expression,
|
||||
partialAggFuncs []*aggregation.AggFuncDesc,
|
||||
partialGbyItems []expression.Expression,
|
||||
partialSchema *expression.Schema) []*aggregation.AggFuncDesc {
|
||||
partialSchema *expression.Schema,
|
||||
funcMap map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc) []*aggregation.AggFuncDesc {
|
||||
|
||||
partialCursor := 0
|
||||
newAggFuncs := make([]*aggregation.AggFuncDesc, 0, len(partialAggFuncs))
|
||||
for i, aggFunc := range partialAggFuncs {
|
||||
for _, aggFunc := range partialAggFuncs {
|
||||
if aggFunc.Name == ast.AggFuncFirstRow {
|
||||
canOptimize := false
|
||||
for j, gbyExpr := range partialGbyItems {
|
||||
if j >= len(finalGbyItems) {
|
||||
// after distinct push, len(partialGbyItems) may larger than len(finalGbyItems)
|
||||
// for example,
|
||||
// select /*+ HASH_AGG() */ a, count(distinct a) from t;
|
||||
// will generate to,
|
||||
// HashAgg root funcs:count(distinct a), funcs:firstrow(a)"
|
||||
// HashAgg cop group by:a, funcs:firstrow(a)->Column#6"
|
||||
// the firstrow in root task can not be removed.
|
||||
break
|
||||
}
|
||||
if gbyExpr.Equal(sctx, aggFunc.Args[0]) {
|
||||
canOptimize = true
|
||||
finalAggFuncs[i].Args[0] = finalGbyItems[j]
|
||||
funcMap[aggFunc].Args[0] = finalGbyItems[j]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
24
planner/core/testdata/plan_suite_in.json
vendored
24
planner/core/testdata/plan_suite_in.json
vendored
@ -534,6 +534,30 @@
|
||||
"select /*+ AGG_TO_COP(), HASH_AGG(), HASH_JOIN(t1), USE_INDEX(t1), USE_INDEX(t2) */ sum(t1.a) from ta t1, ta t2 where t1.a = t2.b group by t1.a"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "TestPushdownDistinctEnable",
|
||||
"cases": [
|
||||
"select /*+ HASH_AGG() */ a, count(distinct a) from t;", // firstrow(a) cannot be removed.
|
||||
"select /*+ HASH_AGG() */ avg(b), c, avg(b), count(distinct A, B), count(distinct A), count(distinct c), sum(b) from t group by c;",
|
||||
"select /*+ STREAM_AGG() */ count(distinct c) from t group by c;", // can push down
|
||||
"select /*+ STREAM_AGG() */ count(distinct c) from t;", // can not push down because c is not in group by
|
||||
"select /*+ HASH_AGG() */ count(distinct c) from t;", // can push down
|
||||
"select count(distinct c) from t group by c;",
|
||||
"select count(distinct c) from t;" // should not use streamAgg because c is not in group by
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "TestPushdownDistinctDisable",
|
||||
"cases": [
|
||||
// do not pushdown even AGG_TO_COP is specified.
|
||||
"select /*+ HASH_AGG(), AGG_TO_COP() */ a, count(distinct a) from t;",
|
||||
"select /*+ HASH_AGG(), AGG_TO_COP() */ avg(b), c, avg(b), count(distinct A, B), count(distinct A), count(distinct c), sum(b) from t group by c;",
|
||||
"select /*+ STREAM_AGG(), AGG_TO_COP() */ count(distinct c) from t group by c;",
|
||||
"select /*+ STREAM_AGG(), AGG_TO_COP() */ count(distinct c) from t;",
|
||||
"select /*+ HASH_AGG(), AGG_TO_COP() */ count(distinct c) from t;",
|
||||
"select /*+ AGG_TO_COP() */ count(distinct c) from t group by c;"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "TestDAGPlanBuilderWindow",
|
||||
"cases":[
|
||||
|
||||
176
planner/core/testdata/plan_suite_out.json
vendored
176
planner/core/testdata/plan_suite_out.json
vendored
@ -1432,6 +1432,182 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name": "TestPushdownDistinctEnable",
|
||||
"Cases": [
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG() */ a, count(distinct a) from t;",
|
||||
"Plan": [
|
||||
"Projection_4 1.00 root test.t.a, Column#5",
|
||||
"└─HashAgg_8 1.00 root funcs:count(distinct test.t.a)->Column#5, funcs:firstrow(Column#6)->test.t.a",
|
||||
" └─TableReader_9 1.00 root data:HashAgg_5",
|
||||
" └─HashAgg_5 1.00 cop[tikv] group by:test.t.a, funcs:firstrow(test.t.a)->Column#6",
|
||||
" └─TableFullScan_7 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"1 2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG() */ avg(b), c, avg(b), count(distinct A, B), count(distinct A), count(distinct c), sum(b) from t group by c;",
|
||||
"Plan": [
|
||||
"Projection_4 8000.00 root Column#5, test.t.c, Column#5, Column#6, Column#7, Column#8, Column#9",
|
||||
"└─HashAgg_8 8000.00 root group by:test.t.c, funcs:avg(Column#10, Column#11)->Column#5, funcs:count(distinct test.t.a, test.t.b)->Column#6, funcs:count(distinct test.t.a)->Column#7, funcs:count(distinct test.t.c)->Column#8, funcs:sum(Column#12)->Column#9, funcs:firstrow(test.t.c)->test.t.c",
|
||||
" └─TableReader_9 8000.00 root data:HashAgg_5",
|
||||
" └─HashAgg_5 8000.00 cop[tikv] group by:test.t.a, test.t.b, test.t.c, funcs:count(test.t.b)->Column#10, funcs:sum(test.t.b)->Column#11, funcs:sum(test.t.b)->Column#12",
|
||||
" └─TableFullScan_7 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"1.0000 1 1.0000 1 1 1 1",
|
||||
"1.3333 3 1.3333 3 2 1 4",
|
||||
"2.0000 <nil> 2.0000 1 1 0 2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ STREAM_AGG() */ count(distinct c) from t group by c;",
|
||||
"Plan": [
|
||||
"StreamAgg_11 8000.00 root group by:test.t.c, funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_12 8000.00 root index:StreamAgg_7",
|
||||
" └─StreamAgg_7 8000.00 cop[tikv] group by:test.t.c, ",
|
||||
" └─IndexFullScan_10 10000.00 cop[tikv] table:t, index:c(c) keep order:true, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"0",
|
||||
"1",
|
||||
"1"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ STREAM_AGG() */ count(distinct c) from t;",
|
||||
"Plan": [
|
||||
"StreamAgg_7 1.00 root funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_11 10000.00 root index:IndexFullScan_10",
|
||||
" └─IndexFullScan_10 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG() */ count(distinct c) from t;",
|
||||
"Plan": [
|
||||
"HashAgg_9 1.00 root funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_10 1.00 root index:HashAgg_5",
|
||||
" └─HashAgg_5 1.00 cop[tikv] group by:test.t.c, ",
|
||||
" └─IndexFullScan_8 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select count(distinct c) from t group by c;",
|
||||
"Plan": [
|
||||
"HashAgg_10 8000.00 root group by:test.t.c, funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_11 8000.00 root index:HashAgg_5",
|
||||
" └─HashAgg_5 8000.00 cop[tikv] group by:test.t.c, ",
|
||||
" └─IndexFullScan_9 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"0",
|
||||
"1",
|
||||
"1"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select count(distinct c) from t;",
|
||||
"Plan": [
|
||||
"HashAgg_9 1.00 root funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_10 1.00 root index:HashAgg_5",
|
||||
" └─HashAgg_5 1.00 cop[tikv] group by:test.t.c, ",
|
||||
" └─IndexFullScan_8 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"2"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name": "TestPushdownDistinctDisable",
|
||||
"Cases": [
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ a, count(distinct a) from t;",
|
||||
"Plan": [
|
||||
"Projection_4 1.00 root test.t.a, Column#5",
|
||||
"└─HashAgg_5 1.00 root funcs:count(distinct test.t.a)->Column#5, funcs:firstrow(test.t.a)->test.t.a",
|
||||
" └─TableReader_7 10000.00 root data:TableFullScan_6",
|
||||
" └─TableFullScan_6 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"1 2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ avg(b), c, avg(b), count(distinct A, B), count(distinct A), count(distinct c), sum(b) from t group by c;",
|
||||
"Plan": [
|
||||
"Projection_4 8000.00 root Column#5, test.t.c, Column#5, Column#6, Column#7, Column#8, Column#9",
|
||||
"└─HashAgg_5 8000.00 root group by:Column#17, funcs:avg(Column#10)->Column#5, funcs:count(distinct Column#11, Column#12)->Column#6, funcs:count(distinct Column#13)->Column#7, funcs:count(distinct Column#14)->Column#8, funcs:sum(Column#15)->Column#9, funcs:firstrow(Column#16)->test.t.c",
|
||||
" └─Projection_8 10000.00 root cast(test.t.b, decimal(65,4) BINARY)->Column#10, test.t.a, test.t.b, test.t.a, test.t.c, cast(test.t.b, decimal(65,0) BINARY)->Column#15, test.t.c, test.t.c",
|
||||
" └─TableReader_7 10000.00 root data:TableFullScan_6",
|
||||
" └─TableFullScan_6 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"1.0000 1 1.0000 1 1 1 1",
|
||||
"1.3333 3 1.3333 3 2 1 4",
|
||||
"2.0000 <nil> 2.0000 1 1 0 2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ STREAM_AGG(), AGG_TO_COP() */ count(distinct c) from t group by c;",
|
||||
"Plan": [
|
||||
"StreamAgg_6 8000.00 root group by:test.t.c, funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_9 10000.00 root index:IndexFullScan_8",
|
||||
" └─IndexFullScan_8 10000.00 cop[tikv] table:t, index:c(c) keep order:true, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"0",
|
||||
"1",
|
||||
"1"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ STREAM_AGG(), AGG_TO_COP() */ count(distinct c) from t;",
|
||||
"Plan": [
|
||||
"StreamAgg_6 1.00 root funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_11 10000.00 root index:IndexFullScan_10",
|
||||
" └─IndexFullScan_10 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ count(distinct c) from t;",
|
||||
"Plan": [
|
||||
"HashAgg_5 1.00 root funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_9 10000.00 root index:IndexFullScan_8",
|
||||
" └─IndexFullScan_8 10000.00 cop[tikv] table:t, index:c(c) keep order:false, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ AGG_TO_COP() */ count(distinct c) from t group by c;",
|
||||
"Plan": [
|
||||
"StreamAgg_6 8000.00 root group by:test.t.c, funcs:count(distinct test.t.c)->Column#5",
|
||||
"└─IndexReader_12 10000.00 root index:IndexFullScan_11",
|
||||
" └─IndexFullScan_11 10000.00 cop[tikv] table:t, index:c(c) keep order:true, stats:pseudo"
|
||||
],
|
||||
"Result": [
|
||||
"0",
|
||||
"1",
|
||||
"1"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name": "TestDAGPlanBuilderWindow",
|
||||
"Cases": [
|
||||
|
||||
@ -388,6 +388,9 @@ type SessionVars struct {
|
||||
// AllowAggPushDown can be set to false to forbid aggregation push down.
|
||||
AllowAggPushDown bool
|
||||
|
||||
// AllowDistinctAggPushDown can be set true to allow agg with distinct push down to tikv/tiflash.
|
||||
AllowDistinctAggPushDown bool
|
||||
|
||||
// AllowWriteRowID can be set to false to forbid write data to _tidb_rowid.
|
||||
// This variable is currently not recommended to be turned on.
|
||||
AllowWriteRowID bool
|
||||
@ -1043,6 +1046,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
|
||||
s.SkipUTF8Check = TiDBOptOn(val)
|
||||
case TiDBOptAggPushDown:
|
||||
s.AllowAggPushDown = TiDBOptOn(val)
|
||||
case TiDBOptDistinctAggPushDown:
|
||||
s.AllowDistinctAggPushDown = TiDBOptOn(val)
|
||||
case TiDBOptWriteRowID:
|
||||
s.AllowWriteRowID = TiDBOptOn(val)
|
||||
case TiDBOptInSubqToJoinAndAgg:
|
||||
|
||||
@ -43,6 +43,7 @@ func (*testSessionSuite) TestSetSystemVariable(c *C) {
|
||||
{variable.TxnIsolation, "SERIALIZABLE", true},
|
||||
{variable.TimeZone, "xyz", true},
|
||||
{variable.TiDBOptAggPushDown, "1", false},
|
||||
{variable.TiDBOptDistinctAggPushDown, "1", false},
|
||||
{variable.TIDBMemQuotaQuery, "1024", false},
|
||||
{variable.TIDBMemQuotaHashJoin, "1024", false},
|
||||
{variable.TIDBMemQuotaMergeJoin, "1024", false},
|
||||
|
||||
@ -610,6 +610,7 @@ var defaultSysVars = []*SysVar{
|
||||
/* TiDB specific variables */
|
||||
{ScopeSession, TiDBSnapshot, ""},
|
||||
{ScopeSession, TiDBOptAggPushDown, BoolToIntStr(DefOptAggPushDown)},
|
||||
{ScopeSession, TiDBOptDistinctAggPushDown, BoolToIntStr(DefOptDistinctAggPushDown)},
|
||||
{ScopeSession, TiDBOptWriteRowID, BoolToIntStr(DefOptWriteRowID)},
|
||||
{ScopeGlobal | ScopeSession, TiDBBuildStatsConcurrency, strconv.Itoa(DefBuildStatsConcurrency)},
|
||||
{ScopeGlobal, TiDBAutoAnalyzeRatio, strconv.FormatFloat(DefAutoAnalyzeRatio, 'f', -1, 64)},
|
||||
|
||||
@ -45,6 +45,9 @@ const (
|
||||
// tidb_opt_agg_push_down is used to enable/disable the optimizer rule of aggregation push down.
|
||||
TiDBOptAggPushDown = "tidb_opt_agg_push_down"
|
||||
|
||||
// tidb_opt_distinct_agg_push_down is used to decide whether agg with distinct should be pushed to tikv/tiflash.
|
||||
TiDBOptDistinctAggPushDown = "tidb_opt_distinct_agg_push_down"
|
||||
|
||||
// tidb_opt_write_row_id is used to enable/disable the operations of insert、replace and update to _tidb_rowid.
|
||||
TiDBOptWriteRowID = "tidb_opt_write_row_id"
|
||||
|
||||
@ -400,6 +403,7 @@ const (
|
||||
DefChecksumTableConcurrency = 4
|
||||
DefSkipUTF8Check = false
|
||||
DefOptAggPushDown = false
|
||||
DefOptDistinctAggPushDown = false
|
||||
DefOptWriteRowID = false
|
||||
DefOptCorrelationThreshold = 0.9
|
||||
DefOptCorrelationExpFactor = 1
|
||||
|
||||
@ -409,7 +409,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
|
||||
return "ON", nil
|
||||
}
|
||||
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
|
||||
case TiDBSkipUTF8Check, TiDBOptAggPushDown,
|
||||
case TiDBSkipUTF8Check, TiDBOptAggPushDown, TiDBOptDistinctAggPushDown,
|
||||
TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze,
|
||||
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, TiDBEnableChunkRPC,
|
||||
TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction, TiDBPProfSQLCPU,
|
||||
|
||||
Reference in New Issue
Block a user