plan: consider task tp in required prop. (#3257)

This commit is contained in:
Han Fei
2017-05-15 20:58:19 +08:00
committed by Lynn
parent e3ac940a20
commit 0f246744ce
5 changed files with 207 additions and 56 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
)
@ -32,13 +33,18 @@ type testAnalyzeSuite struct {
}
func (s *testAnalyzeSuite) TestAnalyze(c *C) {
plan.UseDAGPlanBuilder = true
defer func() {
plan.UseDAGPlanBuilder = false
testleak.AfterTest(c)()
}()
store, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
defer store.Close()
testKit := testkit.NewTestKit(c, store)
defer func() {
testKit.MustExec("drop table t, t1, t2")
store.Close()
}()
testKit.MustExec("use test")
testKit.MustExec("create table t (a int, b int)")
testKit.MustExec("create index a on t (a)")
@ -64,31 +70,36 @@ func (s *testAnalyzeSuite) TestAnalyze(c *C) {
// Test analyze full table.
{
sql: "select * from t where t.a <= 2",
best: "Table(t)",
best: "TableReader(Table(t)->Sel([le(test.t.a, 2)]))",
},
{
sql: "select * from t where t.a < 2",
best: "IndexLookUp(Index(t.a)[[-inf,2)], Table(t))",
},
{
sql: "select * from t where t.a = 1 and t.b <= 2",
best: "Index(t.b)[[-inf,2]]",
best: "IndexLookUp(Index(t.b)[[-inf,2]], Table(t)->Sel([eq(test.t.a, 1)]))",
},
// Test not analyzed table.
{
sql: "select * from t1 where t1.a <= 2",
best: "Index(t1.a)[[-inf,2]]",
best: "IndexLookUp(Index(t1.a)[[-inf,2]], Table(t1))",
},
{
sql: "select * from t1 where t1.a = 1 and t1.b <= 2",
best: "Index(t1.a)[[1,1]]",
best: "IndexLookUp(Index(t1.a)[[1,1]], Table(t1)->Sel([le(test.t1.b, 2)]))",
},
// Test analyze single index.
{
sql: "select * from t2 where t2.a <= 2",
// This is not the best because the histogram for index b is pseudo, then the row count calculated for such
// a small table is always tableRowCount/3, so the cost is smaller.
best: "Index(t2.b)[[<nil>,+inf]]",
// FIXME: Fix it after implementing selectivity estimation for normal column.
best: "IndexLookUp(Index(t2.b)[[<nil>,+inf]], Table(t2)->Sel([le(test.t2.a, 2)]))",
},
{
sql: "select * from t2 where t2.a = 1 and t2.b <= 2",
best: "Index(t2.b)[[-inf,2]]",
best: "IndexLookUp(Index(t2.b)[[-inf,2]], Table(t2)->Sel([eq(test.t2.a, 1)]))",
},
}
for _, tt := range tests {
@ -108,10 +119,11 @@ func (s *testAnalyzeSuite) TestAnalyze(c *C) {
}
func newStoreWithBootstrap() (kv.Storage, error) {
store, err := tidb.NewStore(tidb.EngineGoLevelDBMemory)
store, err := tikv.NewMockTikvStore("")
if err != nil {
return nil, errors.Trace(err)
}
tidb.SetSchemaLease(0)
_, err = tidb.BootstrapSession(store)
return store, errors.Trace(err)
}

View File

@ -27,10 +27,20 @@ import (
"github.com/pingcap/tidb/util/types"
)
// wholeTaskTypes records all possible kinds of task that a plan can return. For Agg, TopN and Limit, we will try to get
// these tasks one by one.
var wholeTaskTypes = [...]taskType{rootTaskType, copSingleReadTaskType, copDoubleReadTaskType}
var invalidTask = &rootTaskProfile{cst: math.MaxFloat64}
func (p *requiredProp) enforceProperty(task taskProfile, ctx context.Context, allocator *idAllocator) taskProfile {
if p.isEmpty() {
return task
}
// If task is invalid, keep it remained.
if task.plan() == nil {
return task
}
sort := Sort{ByItems: make([]*ByItems, 0, len(p.cols))}.init(allocator, ctx)
for _, col := range p.cols {
sort.ByItems = append(sort.ByItems, &ByItems{col, p.desc})
@ -43,7 +53,7 @@ func (p *requiredProp) enforceProperty(task taskProfile, ctx context.Context, al
// When a sort column will be replaced by scalar function, we refuse it.
// When a sort column will be replaced by a constant, we just remove it.
func (p *Projection) getPushedProp(prop *requiredProp) (*requiredProp, bool) {
newProp := &requiredProp{}
newProp := &requiredProp{taskTp: rootTaskType}
if prop.isEmpty() {
return newProp, false
}
@ -77,6 +87,10 @@ func (p *Projection) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, e
if task != nil {
return task, nil
}
if prop.taskTp != rootTaskType {
// Projection cannot be pushed down currently, it can only return rootTask.
return invalidTask, p.storeTaskProfile(prop, invalidTask)
}
// enforceProperty task.
task, err = p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{})
if err != nil {
@ -109,6 +123,10 @@ func (p *LogicalJoin) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile,
if task != nil {
return task, nil
}
if prop.taskTp != rootTaskType {
// Join cannot be pushed down currently, it can only return rootTask.
return invalidTask, p.storeTaskProfile(prop, invalidTask)
}
switch p.JoinType {
case SemiJoin, LeftOuterSemiJoin:
task, err = p.convert2SemiJoin(prop)
@ -144,13 +162,13 @@ func (p *LogicalJoin) convert2MergeJoin(prop *requiredProp) (taskProfile, error)
}.init(p.allocator, p.ctx)
mergeJoin.SetSchema(p.schema)
lJoinKey := p.EqualConditions[0].GetArgs()[0].(*expression.Column)
lProp := &requiredProp{cols: []*expression.Column{lJoinKey}}
lProp := &requiredProp{cols: []*expression.Column{lJoinKey}, taskTp: rootTaskType}
lTask, err := lChild.convert2NewPhysicalPlan(lProp)
if err != nil {
return nil, errors.Trace(err)
}
rJoinKey := p.EqualConditions[0].GetArgs()[1].(*expression.Column)
rProp := &requiredProp{cols: []*expression.Column{rJoinKey}}
rProp := &requiredProp{cols: []*expression.Column{rJoinKey}, taskTp: rootTaskType}
rTask, err := rChild.convert2NewPhysicalPlan(rProp)
if err != nil {
return nil, errors.Trace(err)
@ -178,11 +196,11 @@ func (p *LogicalJoin) convert2SemiJoin(prop *requiredProp) (taskProfile, error)
Anti: p.anti,
}.init(p.allocator, p.ctx)
semiJoin.SetSchema(p.schema)
lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{})
lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType})
if err != nil {
return nil, errors.Trace(err)
}
rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{})
rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType})
if err != nil {
return nil, errors.Trace(err)
}
@ -206,11 +224,11 @@ func (p *LogicalJoin) convert2HashJoin(prop *requiredProp) (taskProfile, error)
DefaultValues: p.DefaultValues,
}.init(p.allocator, p.ctx)
hashJoin.SetSchema(p.schema)
lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{})
lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType})
if err != nil {
return nil, errors.Trace(err)
}
rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{})
rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType})
if err != nil {
return nil, errors.Trace(err)
}
@ -232,7 +250,7 @@ func (p *LogicalJoin) convert2HashJoin(prop *requiredProp) (taskProfile, error)
// getPropByOrderByItems will check if this sort property can be pushed or not. In order to simplify the problem, we only
// consider the case that all expression are columns and all of them are asc or desc.
func getPropByOrderByItems(items []*ByItems) (*requiredProp, bool) {
func getPropByOrderByItems(items []*ByItems, taskTp taskType) (*requiredProp, bool) {
desc := false
cols := make([]*expression.Column, 0, len(items))
for i, item := range items {
@ -246,7 +264,7 @@ func getPropByOrderByItems(items []*ByItems) (*requiredProp, bool) {
return nil, false
}
}
return &requiredProp{cols, desc}, true
return &requiredProp{cols, desc, taskTp}, true
}
// convert2NewPhysicalPlan implements PhysicalPlan interface.
@ -260,21 +278,25 @@ func (p *Sort) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error)
if task != nil {
return task, nil
}
if prop.taskTp != rootTaskType {
// TODO: This is a trick here, because an operator that can be pushed to Coprocessor can never be pushed across sort.
// e.g. If an aggregation want to be pushed, the SQL is always like select count(*) from t order by ...
// The Sort will on top of Aggregation. If the SQL is like select count(*) from (select * from s order by k).
// The Aggregation will also be blocked by projection. In the future we will break this restriction.
return invalidTask, p.storeTaskProfile(prop, invalidTask)
}
// enforce branch
task, err = p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{})
if err != nil {
return nil, errors.Trace(err)
}
task = p.attach2TaskProfile(task)
newProp, canPassProp := getPropByOrderByItems(p.ByItems)
newProp, canPassProp := getPropByOrderByItems(p.ByItems, rootTaskType)
if canPassProp {
orderedTask, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(newProp)
if err != nil {
return nil, errors.Trace(err)
}
if cop, ok := orderedTask.(*copTaskProfile); ok {
orderedTask = finishCopTask(cop, p.ctx, p.allocator)
}
if orderedTask.cost() < task.cost() {
task = orderedTask
}
@ -283,6 +305,7 @@ func (p *Sort) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error)
return task, p.storeTaskProfile(prop, task)
}
// convert2NewPhysicalPlan implements LogicalPlan interface.
func (p *TopN) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) {
task, err := p.getTaskProfile(prop)
if err != nil {
@ -291,26 +314,61 @@ func (p *TopN) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error)
if task != nil {
return task, nil
}
// enforce branch
task, err = p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{})
if err != nil {
return nil, errors.Trace(err)
if prop.taskTp != rootTaskType {
// TopN can only return rootTask.
return invalidTask, p.storeTaskProfile(prop, invalidTask)
}
task = p.attach2TaskProfile(task)
newProp, canPassProp := getPropByOrderByItems(p.ByItems)
if canPassProp {
orderedTask, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(newProp)
for _, taskTp := range wholeTaskTypes {
// Try to enforce topN for child.
optTask, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{taskTp: taskTp})
if err != nil {
return nil, errors.Trace(err)
}
limit := Limit{Offset: p.Offset, Count: p.Count}.init(p.allocator, p.ctx)
limit.SetSchema(p.schema)
orderedTask = limit.attach2TaskProfile(orderedTask)
if orderedTask.cost() < task.cost() {
task = orderedTask
optTask = p.attach2TaskProfile(optTask)
// Try to enforce sort to child and add limit for it.
newProp, canPassProp := getPropByOrderByItems(p.ByItems, taskTp)
if canPassProp {
orderedTask, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(newProp)
if err != nil {
return nil, errors.Trace(err)
}
limit := Limit{Offset: p.Offset, Count: p.Count}.init(p.allocator, p.ctx)
limit.SetSchema(p.schema)
orderedTask = limit.attach2TaskProfile(orderedTask)
if orderedTask.cost() < optTask.cost() {
optTask = orderedTask
}
}
optTask = prop.enforceProperty(optTask, p.ctx, p.allocator)
if task == nil || task.cost() > optTask.cost() {
task = optTask
}
}
return task, p.storeTaskProfile(prop, task)
}
func (p *Limit) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) {
task, err := p.getTaskProfile(prop)
if err != nil {
return nil, errors.Trace(err)
}
if task != nil {
return task, nil
}
if prop.taskTp != rootTaskType {
return invalidTask, p.storeTaskProfile(prop, invalidTask)
}
for _, taskTp := range wholeTaskTypes {
optTask, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{taskTp: taskTp})
if err != nil {
return nil, errors.Trace(err)
}
optTask = p.attach2TaskProfile(optTask)
optTask = prop.enforceProperty(optTask, p.ctx, p.allocator)
if task == nil || task.cost() > optTask.cost() {
task = optTask
}
}
task = prop.enforceProperty(task, p.ctx, p.allocator)
return task, p.storeTaskProfile(prop, task)
}
@ -323,6 +381,9 @@ func (p *baseLogicalPlan) convert2NewPhysicalPlan(prop *requiredProp) (taskProfi
if task != nil {
return task, nil
}
if prop.taskTp != rootTaskType {
return invalidTask, p.storeTaskProfile(prop, invalidTask)
}
if len(p.basePlan.children) == 0 {
task = &rootTaskProfile{p: p.basePlan.self.(PhysicalPlan)}
} else {
@ -485,6 +546,13 @@ func (p *DataSource) convertToIndexScan(prop *requiredProp, idx *model.IndexInfo
// On this way, it's double read case.
copTask.tablePlan = PhysicalTableScan{Columns: p.Columns, Table: is.Table}.init(p.allocator, p.ctx)
copTask.tablePlan.SetSchema(p.schema)
// If it's parent requires single read task, return max cost.
if prop.taskTp == copSingleReadTaskType {
return &copTaskProfile{cst: math.MaxFloat64}, nil
}
} else if prop.taskTp == copDoubleReadTaskType {
// If it's parent requires double read task, return max cost.
return &copTaskProfile{cst: math.MaxFloat64}, nil
}
var indexCols []*expression.Column
for _, col := range idx.Columns {
@ -526,6 +594,9 @@ func (p *DataSource) convertToIndexScan(prop *requiredProp, idx *model.IndexInfo
task = tryToAddUnionScan(copTask, p.pushedDownConds, p.ctx, p.allocator)
task = prop.enforceProperty(task, p.ctx, p.allocator)
}
if prop.taskTp == rootTaskType {
task = finishCopTask(task, p.ctx, p.allocator)
}
return task, nil
}
@ -572,6 +643,9 @@ func matchIndicesProp(idxCols []*model.IndexColumn, propCols []*expression.Colum
// convertToTableScan converts the DataSource to table scan.
func (p *DataSource) convertToTableScan(prop *requiredProp) (task taskProfile, err error) {
if prop.taskTp == copDoubleReadTaskType {
return &copTaskProfile{cst: math.MaxFloat64}, nil
}
ts := PhysicalTableScan{
Table: p.tableInfo,
Columns: p.Columns,
@ -631,6 +705,9 @@ func (p *DataSource) convertToTableScan(prop *requiredProp) (task taskProfile, e
task = tryToAddUnionScan(copTask, p.pushedDownConds, p.ctx, p.allocator)
task = prop.enforceProperty(task, p.ctx, p.allocator)
}
if prop.taskTp == rootTaskType {
task = finishCopTask(task, p.ctx, p.allocator)
}
return task, nil
}
@ -642,6 +719,10 @@ func (p *Union) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error)
if task != nil {
return task, nil
}
if prop.taskTp != rootTaskType {
// Union can only return rootTask.
return invalidTask, p.storeTaskProfile(prop, invalidTask)
}
// Union is a sort blocker. We can only enforce it.
tasks := make([]taskProfile, 0, len(p.children))
for _, child := range p.children {
@ -692,6 +773,10 @@ func (p *LogicalAggregation) convert2NewPhysicalPlan(prop *requiredProp) (taskPr
if task != nil {
return task, nil
}
if prop.taskTp != rootTaskType {
// Aggregation can only return rootTask.
return invalidTask, p.storeTaskProfile(prop, invalidTask)
}
task, err = p.convert2HashAggregation(prop)
if err != nil {
return nil, errors.Trace(err)
@ -699,21 +784,26 @@ func (p *LogicalAggregation) convert2NewPhysicalPlan(prop *requiredProp) (taskPr
return task, p.storeTaskProfile(prop, task)
}
func (p *LogicalAggregation) convert2HashAggregation(prop *requiredProp) (taskProfile, error) {
task, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{})
if err != nil {
return nil, errors.Trace(err)
func (p *LogicalAggregation) convert2HashAggregation(prop *requiredProp) (bestTask taskProfile, _ error) {
for _, taskTp := range wholeTaskTypes {
task, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{taskTp: taskTp})
if err != nil {
return nil, errors.Trace(err)
}
ha := PhysicalAggregation{
GroupByItems: p.GroupByItems,
AggFuncs: p.AggFuncs,
HasGby: len(p.GroupByItems) > 0,
AggType: CompleteAgg,
}.init(p.allocator, p.ctx)
ha.SetSchema(p.schema)
task = ha.attach2TaskProfile(task)
task = prop.enforceProperty(task, p.ctx, p.allocator)
if bestTask == nil || task.cost() < bestTask.cost() {
bestTask = task
}
}
ha := PhysicalAggregation{
GroupByItems: p.GroupByItems,
AggFuncs: p.AggFuncs,
HasGby: len(p.GroupByItems) > 0,
AggType: CompleteAgg,
}.init(p.allocator, p.ctx)
ha.SetSchema(p.schema)
task = ha.attach2TaskProfile(task)
task = prop.enforceProperty(task, p.ctx, p.allocator)
return task, nil
return
}
func (p *LogicalApply) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) {
@ -724,11 +814,15 @@ func (p *LogicalApply) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile,
if task != nil {
return task, nil
}
// TODO: refine this code.
if prop.taskTp != rootTaskType {
// Apply can only return rootTask.
return invalidTask, p.storeTaskProfile(prop, invalidTask)
}
// TODO: Refine this code.
if p.JoinType == SemiJoin || p.JoinType == LeftOuterSemiJoin {
task, err = p.convert2SemiJoin(&requiredProp{})
task, err = p.convert2SemiJoin(&requiredProp{taskTp: rootTaskType})
} else {
task, err = p.convert2HashJoin(&requiredProp{})
task, err = p.convert2HashJoin(&requiredProp{taskTp: rootTaskType})
}
if err != nil {
return nil, errors.Trace(err)

View File

@ -144,7 +144,7 @@ func logicalOptimize(flag uint64, logic LogicalPlan, ctx context.Context, alloc
}
func dagPhysicalOptimize(logic LogicalPlan) (PhysicalPlan, error) {
task, err := logic.convert2NewPhysicalPlan(&requiredProp{})
task, err := logic.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType})
if err != nil {
return nil, errors.Trace(err)
}

View File

@ -75,16 +75,48 @@ func (c *columnProp) equal(nc *columnProp, ctx context.Context) bool {
return c.col.Equal(nc.col, ctx) && c.desc == nc.desc
}
// requriedProp stands for the required order property by parents. It will be all asc or desc.
// taskType is the type of execution task.
type taskType int
const (
rootTaskType taskType = iota
copSingleReadTaskType // TableScan and IndexScan
copDoubleReadTaskType // IndexLookUp
)
// String implements fmt.Stringer interface.
func (t taskType) String() string {
switch t {
case rootTaskType:
return "rootTask"
case copSingleReadTaskType:
return "copSingleReadTask"
case copDoubleReadTaskType:
return "copDoubleReadTask"
}
return "UnknownTaskType"
}
// requriedProp stands for the required physical property by parents.
// It contains the orders, if the order is desc and the task types.
type requiredProp struct {
cols []*expression.Column
desc bool
// taskTp means the type of task that an operator requires.
// It needs to be specified because two different tasks can't be compared with cost directly.
// e.g. If a copTask takes less cost than a rootTask, we can't sure that we must choose the former one. Because the copTask
// must be finished and increase its cost in sometime, but we can't make sure the finishing time. So the best way
// to let the comparision fair is to add taskType to required property.
taskTp taskType
}
func (p *requiredProp) equal(prop *requiredProp) bool {
if len(p.cols) != len(prop.cols) || p.desc != prop.desc {
return false
}
if p.taskTp != prop.taskTp {
return false
}
for i := range p.cols {
if !p.cols[i].Equal(prop.cols[i], nil) {
return false
@ -99,18 +131,19 @@ func (p *requiredProp) isEmpty() bool {
// getHashKey encodes prop to a unique key. The key will be stored in the memory table.
func (p *requiredProp) getHashKey() ([]byte, error) {
datums := make([]types.Datum, 0, len(p.cols)*2+1)
datums := make([]types.Datum, 0, len(p.cols)*2+2)
datums = append(datums, types.NewDatum(p.desc))
for _, c := range p.cols {
datums = append(datums, types.NewDatum(c.FromID), types.NewDatum(c.Position))
}
datums = append(datums, types.NewDatum(int(p.taskTp)))
bytes, err := codec.EncodeValue(nil, datums...)
return bytes, errors.Trace(err)
}
// String implements fmt.Stringer interface. Just for test.
func (p *requiredProp) String() string {
return fmt.Sprintf("Prop{cols: %s, desc: %v}", p.cols, p.desc)
return fmt.Sprintf("Prop{cols: %s, desc: %v, taskTp: %s}", p.cols, p.desc, p.taskTp)
}
type requiredProperty struct {

View File

@ -210,6 +210,10 @@ func (t *rootTaskProfile) plan() PhysicalPlan {
}
func (p *Limit) attach2TaskProfile(profiles ...taskProfile) taskProfile {
// If task is invalid, keep it remained.
if profiles[0].plan() == nil {
return profiles[0]
}
profile := profiles[0].copy()
if cop, ok := profile.(*copTaskProfile); ok {
// If the task is copTask, the Limit can always be pushed down.
@ -264,6 +268,10 @@ func (p *Sort) attach2TaskProfile(profiles ...taskProfile) taskProfile {
}
func (p *TopN) attach2TaskProfile(profiles ...taskProfile) taskProfile {
// If task is invalid, keep it remained.
if profiles[0].plan() == nil {
return profiles[0]
}
profile := profiles[0].copy()
// This is a topN plan.
if copTask, ok := profile.(*copTaskProfile); ok && p.canPushDown() {
@ -389,6 +397,10 @@ func (p *PhysicalAggregation) newPartialAggregate() (partialAgg, finalAgg *Physi
}
func (p *PhysicalAggregation) attach2TaskProfile(profiles ...taskProfile) taskProfile {
// If task is invalid, keep it remained.
if profiles[0].plan() == nil {
return profiles[0]
}
// TODO: We only consider hash aggregation here.
profile := profiles[0].copy()
if cop, ok := profile.(*copTaskProfile); ok {