plan: add base phyiscal plan. (#2975)

add basePhysicalPlan to implement some common physical interface.
This commit is contained in:
Han Fei
2017-04-01 13:53:31 +08:00
committed by GitHub
parent 172381f892
commit 3625c1dd4e
15 changed files with 607 additions and 469 deletions

View File

@ -252,12 +252,10 @@ func (a *aggregationOptimizer) checkAnyCountAndSum(aggFuncs []expression.Aggrega
}
func (a *aggregationOptimizer) makeNewAgg(aggFuncs []expression.AggregationFunction, gbyCols []*expression.Column) *Aggregation {
agg := &Aggregation{
GroupByItems: expression.Column2Exprs(gbyCols),
baseLogicalPlan: newBaseLogicalPlan(TypeAgg, a.allocator),
groupByCols: gbyCols,
}
agg.initIDAndContext(a.ctx)
agg := Aggregation{
GroupByItems: expression.Column2Exprs(gbyCols),
groupByCols: gbyCols,
}.init(a.allocator, a.ctx)
var newAggFuncs []expression.AggregationFunction
schema := expression.NewSchema(make([]*expression.Column, 0, len(aggFuncs)+len(gbyCols))...)
for _, aggFunc := range aggFuncs {
@ -278,13 +276,11 @@ func (a *aggregationOptimizer) makeNewAgg(aggFuncs []expression.AggregationFunct
// pushAggCrossUnion will try to push the agg down to the union. If the new aggregation's group-by columns doesn't contain unique key.
// We will return the new aggregation. Otherwise we will transform the aggregation to projection.
func (a *aggregationOptimizer) pushAggCrossUnion(agg *Aggregation, unionSchema *expression.Schema, unionChild LogicalPlan) LogicalPlan {
newAgg := &Aggregation{
AggFuncs: make([]expression.AggregationFunction, 0, len(agg.AggFuncs)),
GroupByItems: make([]expression.Expression, 0, len(agg.GroupByItems)),
baseLogicalPlan: newBaseLogicalPlan(TypeAgg, a.allocator),
}
newAgg := Aggregation{
AggFuncs: make([]expression.AggregationFunction, 0, len(agg.AggFuncs)),
GroupByItems: make([]expression.Expression, 0, len(agg.GroupByItems)),
}.init(a.allocator, a.ctx)
newAgg.SetSchema(agg.schema.Clone())
newAgg.initIDAndContext(a.ctx)
for _, aggFunc := range agg.AggFuncs {
newAggFunc := aggFunc.Clone()
newArgs := make([]expression.Expression, 0, len(newAggFunc.GetArgs()))
@ -428,12 +424,9 @@ func (a *aggregationOptimizer) tryToEliminateAggregation(agg *Aggregation) *Proj
}
func (a *aggregationOptimizer) convertAggToProj(agg *Aggregation, ctx context.Context, allocator *idAllocator) *Projection {
proj := &Projection{
Exprs: make([]expression.Expression, 0, len(agg.AggFuncs)),
baseLogicalPlan: newBaseLogicalPlan(TypeProj, allocator),
}
proj.self = proj
proj.initIDAndContext(ctx)
proj := Projection{
Exprs: make([]expression.Expression, 0, len(agg.AggFuncs)),
}.init(a.allocator, a.ctx)
for _, fun := range agg.AggFuncs {
expr := a.rewriteExpr(fun)
proj.Exprs = append(proj.Exprs, expr)

View File

@ -288,12 +288,9 @@ func (er *expressionRewriter) handleOtherComparableSubq(lexpr, rexpr expression.
funcName = ast.AggFuncMin
}
aggFunc := expression.NewAggFunction(funcName, []expression.Expression{rexpr}, false)
agg := &Aggregation{
baseLogicalPlan: newBaseLogicalPlan(TypeAgg, er.b.allocator),
AggFuncs: []expression.AggregationFunction{aggFunc},
}
agg.initIDAndContext(er.b.ctx)
agg.self = agg
agg := Aggregation{
AggFuncs: []expression.AggregationFunction{aggFunc},
}.init(er.b.allocator, er.ctx)
addChild(agg, np)
aggCol0 := &expression.Column{
ColName: model.NewCIStr("agg_Col_0"),
@ -352,12 +349,9 @@ func (er *expressionRewriter) buildQuantifierPlan(agg *Aggregation, cond, rexpr
outerSchemaLen := er.p.Schema().Len()
er.p = er.b.buildApplyWithJoinType(er.p, agg, InnerJoin)
joinSchema := er.p.Schema()
proj := &Projection{
baseLogicalPlan: newBaseLogicalPlan(TypeProj, er.b.allocator),
Exprs: expression.Column2Exprs(joinSchema.Clone().Columns[:outerSchemaLen]),
}
proj.self = proj
proj.initIDAndContext(er.ctx)
proj := Projection{
Exprs: expression.Column2Exprs(joinSchema.Clone().Columns[:outerSchemaLen]),
}.init(er.b.allocator, er.ctx)
proj.SetSchema(expression.NewSchema(joinSchema.Clone().Columns[:outerSchemaLen]...))
proj.Exprs = append(proj.Exprs, cond)
proj.schema.Append(&expression.Column{
@ -377,12 +371,9 @@ func (er *expressionRewriter) buildQuantifierPlan(agg *Aggregation, cond, rexpr
func (er *expressionRewriter) handleNEAny(lexpr, rexpr expression.Expression, np LogicalPlan) {
firstRowFunc := expression.NewAggFunction(ast.AggFuncFirstRow, []expression.Expression{rexpr}, false)
countFunc := expression.NewAggFunction(ast.AggFuncCount, []expression.Expression{rexpr.Clone()}, true)
agg := &Aggregation{
baseLogicalPlan: newBaseLogicalPlan(TypeAgg, er.b.allocator),
AggFuncs: []expression.AggregationFunction{firstRowFunc, countFunc},
}
agg.initIDAndContext(er.b.ctx)
agg.self = agg
agg := Aggregation{
AggFuncs: []expression.AggregationFunction{firstRowFunc, countFunc},
}.init(er.b.allocator, er.ctx)
addChild(agg, np)
firstRowResultCol := &expression.Column{
ColName: model.NewCIStr("col_firstRow"),
@ -408,12 +399,9 @@ func (er *expressionRewriter) handleNEAny(lexpr, rexpr expression.Expression, np
func (er *expressionRewriter) handleEQAll(lexpr, rexpr expression.Expression, np LogicalPlan) {
firstRowFunc := expression.NewAggFunction(ast.AggFuncFirstRow, []expression.Expression{rexpr}, false)
countFunc := expression.NewAggFunction(ast.AggFuncCount, []expression.Expression{rexpr.Clone()}, true)
agg := &Aggregation{
baseLogicalPlan: newBaseLogicalPlan(TypeAgg, er.b.allocator),
AggFuncs: []expression.AggregationFunction{firstRowFunc, countFunc},
}
agg.initIDAndContext(er.b.ctx)
agg.self = agg
agg := Aggregation{
AggFuncs: []expression.AggregationFunction{firstRowFunc, countFunc},
}.init(er.b.allocator, er.ctx)
addChild(agg, np)
firstRowResultCol := &expression.Column{
ColName: model.NewCIStr("col_firstRow"),

212
plan/initialize.go Normal file
View File

@ -0,0 +1,212 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package plan
import "github.com/pingcap/tidb/context"
func (p Aggregation) init(allocator *idAllocator, ctx context.Context) *Aggregation {
p.basePlan = newBasePlan(TypeAgg, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
return &p
}
func (p Join) init(allocator *idAllocator, ctx context.Context) *Join {
p.basePlan = newBasePlan(TypeJoin, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
return &p
}
func (p DataSource) init(allocator *idAllocator, ctx context.Context) *DataSource {
p.basePlan = newBasePlan(TypeTableScan, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
return &p
}
func (p Apply) init(allocator *idAllocator, ctx context.Context) *Apply {
p.basePlan = newBasePlan(TypeApply, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
return &p
}
func (p Selection) init(allocator *idAllocator, ctx context.Context) *Selection {
p.basePlan = newBasePlan(TypeSel, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p Projection) init(allocator *idAllocator, ctx context.Context) *Projection {
p.basePlan = newBasePlan(TypeProj, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p Union) init(allocator *idAllocator, ctx context.Context) *Union {
p.basePlan = newBasePlan(TypeUnion, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p Sort) init(allocator *idAllocator, ctx context.Context) *Sort {
p.basePlan = newBasePlan(TypeSort, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p Limit) init(allocator *idAllocator, ctx context.Context) *Limit {
p.basePlan = newBasePlan(TypeLimit, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p TableDual) init(allocator *idAllocator, ctx context.Context) *TableDual {
p.basePlan = newBasePlan(TypeDual, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p Exists) init(allocator *idAllocator, ctx context.Context) *Exists {
p.basePlan = newBasePlan(TypeExists, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p MaxOneRow) init(allocator *idAllocator, ctx context.Context) *MaxOneRow {
p.basePlan = newBasePlan(TypeMaxOneRow, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p Update) init(allocator *idAllocator, ctx context.Context) *Update {
p.basePlan = newBasePlan(TypeUpate, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p Delete) init(allocator *idAllocator, ctx context.Context) *Delete {
p.basePlan = newBasePlan(TypeDelete, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p Insert) init(allocator *idAllocator, ctx context.Context) *Insert {
p.basePlan = newBasePlan(TypeInsert, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p Show) init(allocator *idAllocator, ctx context.Context) *Show {
p.basePlan = newBasePlan(TypeShow, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p Analyze) init(allocator *idAllocator, ctx context.Context) *Analyze {
p.basePlan = newBasePlan(TypeAnalyze, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p SelectLock) init(allocator *idAllocator, ctx context.Context) *SelectLock {
p.basePlan = newBasePlan(TypeLock, allocator, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p PhysicalTableScan) init(allocator *idAllocator, ctx context.Context) *PhysicalTableScan {
p.basePlan = newBasePlan(TypeTableScan, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p PhysicalIndexScan) init(allocator *idAllocator, ctx context.Context) *PhysicalIndexScan {
p.basePlan = newBasePlan(TypeIdxScan, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p PhysicalMemTable) init(allocator *idAllocator, ctx context.Context) *PhysicalMemTable {
p.basePlan = newBasePlan(TypeMemTableScan, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p PhysicalDummyScan) init(allocator *idAllocator, ctx context.Context) *PhysicalDummyScan {
p.basePlan = newBasePlan(TypeDummy, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p PhysicalHashJoin) init(allocator *idAllocator, ctx context.Context) *PhysicalHashJoin {
tp := TypeHashRightJoin
if p.SmallTable == 1 {
tp = TypeHashLeftJoin
}
p.basePlan = newBasePlan(tp, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p PhysicalHashSemiJoin) init(allocator *idAllocator, ctx context.Context) *PhysicalHashSemiJoin {
p.basePlan = newBasePlan(TypeHashSemiJoin, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p PhysicalMergeJoin) init(allocator *idAllocator, ctx context.Context) *PhysicalMergeJoin {
p.basePlan = newBasePlan(TypeMergeJoin, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p PhysicalAggregation) init(allocator *idAllocator, ctx context.Context) *PhysicalAggregation {
tp := TypeHashAgg
if p.AggType == StreamedAgg {
tp = TypeStreamAgg
}
p.basePlan = newBasePlan(tp, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p PhysicalApply) init(allocator *idAllocator, ctx context.Context) *PhysicalApply {
p.basePlan = newBasePlan(TypeApply, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p Cache) init(allocator *idAllocator, ctx context.Context) *Cache {
p.basePlan = newBasePlan(TypeCache, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
func (p PhysicalUnionScan) init(allocator *idAllocator, ctx context.Context) *PhysicalUnionScan {
p.basePlan = newBasePlan(TypeCache, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}

View File

@ -18,6 +18,7 @@ import (
"github.com/ngaut/log"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
)
@ -56,6 +57,7 @@ type joinReOrderSolver struct {
resultJoin LogicalPlan
groupRank []*rankInfo
allocator *idAllocator
ctx context.Context
}
type edgeList []*rankInfo
@ -183,13 +185,10 @@ func (e *joinReOrderSolver) makeBushyJoin(cartesianJoinGroup []LogicalPlan) {
}
func (e *joinReOrderSolver) newJoin(lChild, rChild LogicalPlan) *Join {
join := &Join{
JoinType: InnerJoin,
reordered: true,
baseLogicalPlan: newBaseLogicalPlan(TypeJoin, e.allocator),
}
join.self = join
join.initIDAndContext(lChild.context())
join := Join{
JoinType: InnerJoin,
reordered: true,
}.init(e.allocator, e.ctx)
join.SetChildren(lChild, rChild)
join.SetSchema(expression.MergeSchema(lChild.Schema(), rChild.Schema()))
lChild.SetParents(join)

View File

@ -52,11 +52,8 @@ func (p *Aggregation) collectGroupByColumns() {
func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.AggregateFuncExpr, gbyItems []expression.Expression) (LogicalPlan, map[int]int) {
b.optFlag = b.optFlag | flagBuildKeyInfo
b.optFlag = b.optFlag | flagAggregationOptimize
agg := &Aggregation{
AggFuncs: make([]expression.AggregationFunction, 0, len(aggFuncList)),
baseLogicalPlan: newBaseLogicalPlan(TypeAgg, b.allocator)}
agg.self = agg
agg.initIDAndContext(b.ctx)
agg := Aggregation{AggFuncs: make([]expression.AggregationFunction, 0, len(aggFuncList))}.init(b.allocator, b.ctx)
schema := expression.NewSchema(make([]*expression.Column, 0, len(aggFuncList)+p.Schema().Len())...)
// aggIdxMap maps the old index to new index after applying common aggregation functions elimination.
aggIndexMap := make(map[int]int)
@ -222,11 +219,9 @@ func (b *planBuilder) buildJoin(join *ast.Join) LogicalPlan {
rightAlias := extractTableAlias(rightPlan)
newSchema := expression.MergeSchema(leftPlan.Schema(), rightPlan.Schema())
joinPlan := &Join{baseLogicalPlan: newBaseLogicalPlan(TypeJoin, b.allocator)}
joinPlan := Join{}.init(b.allocator, b.ctx)
addChild(joinPlan, leftPlan)
addChild(joinPlan, rightPlan)
joinPlan.self = joinPlan
joinPlan.initIDAndContext(b.ctx)
joinPlan.SetSchema(newSchema)
if b.TableHints() != nil {
@ -263,9 +258,7 @@ func (b *planBuilder) buildSelection(p LogicalPlan, where ast.ExprNode, AggMappe
b.optFlag = b.optFlag | flagPredicatePushDown
conditions := splitWhere(where)
expressions := make([]expression.Expression, 0, len(conditions))
selection := &Selection{baseLogicalPlan: newBaseLogicalPlan(TypeSel, b.allocator)}
selection.self = selection
selection.initIDAndContext(b.ctx)
selection := Selection{}.init(b.allocator, b.ctx)
for _, cond := range conditions {
expr, np, err := b.rewrite(cond, p, AggMapper, false)
if err != nil {
@ -289,12 +282,7 @@ func (b *planBuilder) buildSelection(p LogicalPlan, where ast.ExprNode, AggMappe
// buildProjection returns a Projection plan and non-aux columns length.
func (b *planBuilder) buildProjection(p LogicalPlan, fields []*ast.SelectField, mapper map[*ast.AggregateFuncExpr]int) (LogicalPlan, int) {
proj := &Projection{
Exprs: make([]expression.Expression, 0, len(fields)),
baseLogicalPlan: newBaseLogicalPlan(TypeProj, b.allocator),
}
proj.self = proj
proj.initIDAndContext(b.ctx)
proj := Projection{Exprs: make([]expression.Expression, 0, len(fields))}.init(b.allocator, b.ctx)
schema := expression.NewSchema(make([]*expression.Column, 0, len(fields))...)
oldLen := 0
for _, field := range fields {
@ -351,25 +339,21 @@ func (b *planBuilder) buildProjection(p LogicalPlan, fields []*ast.SelectField,
func (b *planBuilder) buildDistinct(child LogicalPlan, length int) LogicalPlan {
b.optFlag = b.optFlag | flagBuildKeyInfo
b.optFlag = b.optFlag | flagAggregationOptimize
agg := &Aggregation{
baseLogicalPlan: newBaseLogicalPlan(TypeAgg, b.allocator),
AggFuncs: make([]expression.AggregationFunction, 0, child.Schema().Len()),
GroupByItems: expression.Column2Exprs(child.Schema().Clone().Columns[:length])}
agg := Aggregation{
AggFuncs: make([]expression.AggregationFunction, 0, child.Schema().Len()),
GroupByItems: expression.Column2Exprs(child.Schema().Clone().Columns[:length]),
}.init(b.allocator, b.ctx)
agg.collectGroupByColumns()
for _, col := range child.Schema().Columns {
agg.AggFuncs = append(agg.AggFuncs, expression.NewAggFunction(ast.AggFuncFirstRow, []expression.Expression{col}, false))
}
agg.self = agg
agg.initIDAndContext(b.ctx)
addChild(agg, child)
agg.SetSchema(child.Schema().Clone())
return agg
}
func (b *planBuilder) buildUnion(union *ast.UnionStmt) LogicalPlan {
u := &Union{baseLogicalPlan: newBaseLogicalPlan(TypeUnion, b.allocator)}
u.self = u
u.initIDAndContext(b.ctx)
u := Union{}.init(b.allocator, b.ctx)
u.children = make([]Plan, len(union.SelectList.Selects))
for i, sel := range union.SelectList.Selects {
u.children[i] = b.buildSelect(sel)
@ -381,12 +365,7 @@ func (b *planBuilder) buildUnion(union *ast.UnionStmt) LogicalPlan {
return nil
}
if _, ok := sel.(*Projection); !ok {
proj := &Projection{
baseLogicalPlan: newBaseLogicalPlan(TypeProj, b.allocator),
Exprs: expression.Column2Exprs(sel.Schema().Columns),
}
proj.initIDAndContext(b.ctx)
proj.self = proj
proj := Projection{Exprs: expression.Column2Exprs(sel.Schema().Columns)}.init(b.allocator, b.ctx)
proj.SetSchema(sel.Schema().Clone())
sel.SetParents(proj)
proj.SetChildren(sel)
@ -447,10 +426,8 @@ func (by *ByItems) String() string {
}
func (b *planBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper map[*ast.AggregateFuncExpr]int) LogicalPlan {
var exprs []*ByItems
sort := &Sort{baseLogicalPlan: newBaseLogicalPlan(TypeSort, b.allocator)}
sort.self = sort
sort.initIDAndContext(b.ctx)
sort := Sort{}.init(b.allocator, b.ctx)
exprs := make([]*ByItems, 0, len(byItems))
for _, item := range byItems {
it, np, err := b.rewrite(item.Expr, p, aggMapper, true)
if err != nil {
@ -504,13 +481,11 @@ func (b *planBuilder) buildLimit(src LogicalPlan, limit *ast.Limit) LogicalPlan
return nil
}
}
li := &Limit{
Offset: offset,
Count: count,
baseLogicalPlan: newBaseLogicalPlan(TypeLimit, b.allocator),
}
li.self = li
li.initIDAndContext(b.ctx)
li := Limit{
Offset: offset,
Count: count,
}.init(b.allocator, b.ctx)
addChild(li, src)
li.SetSchema(src.Schema().Clone())
return li
@ -967,12 +942,7 @@ func (b *planBuilder) buildSelect(sel *ast.SelectStmt) LogicalPlan {
}
sel.Fields.Fields = originalFields
if oldLen != p.Schema().Len() {
proj := &Projection{
baseLogicalPlan: newBaseLogicalPlan(TypeProj, b.allocator),
Exprs: expression.Column2Exprs(p.Schema().Columns[:oldLen]),
}
proj.initIDAndContext(b.ctx)
proj.self = proj
proj := Projection{Exprs: expression.Column2Exprs(p.Schema().Columns[:oldLen])}.init(b.allocator, b.ctx)
addChild(proj, p)
proj.SetSchema(expression.NewSchema(p.Schema().Columns[:oldLen]...))
return proj
@ -982,9 +952,7 @@ func (b *planBuilder) buildSelect(sel *ast.SelectStmt) LogicalPlan {
}
func (b *planBuilder) buildTableDual() LogicalPlan {
dual := &TableDual{baseLogicalPlan: newBaseLogicalPlan(TypeDual, b.allocator)}
dual.self = dual
dual.initIDAndContext(b.ctx)
dual := TableDual{}.init(b.allocator, b.ctx)
dual.SetSchema(expression.NewSchema())
return dual
}
@ -1005,15 +973,12 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan {
}
tableInfo := tbl.Meta()
p := &DataSource{
indexHints: tn.IndexHints,
tableInfo: tableInfo,
baseLogicalPlan: newBaseLogicalPlan(TypeTableScan, b.allocator),
statisticTable: statisticTable,
DBName: schemaName,
}
p.self = p
p.initIDAndContext(b.ctx)
p := DataSource{
indexHints: tn.IndexHints,
tableInfo: tableInfo,
statisticTable: statisticTable,
DBName: schemaName,
}.init(b.allocator, b.ctx)
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SelectPriv, schemaName.L, tableInfo.Name.L, "")
@ -1055,13 +1020,8 @@ func (b *planBuilder) buildApplyWithJoinType(outerPlan, innerPlan LogicalPlan, t
b.optFlag = b.optFlag | flagPredicatePushDown
b.optFlag = b.optFlag | flagBuildKeyInfo
b.optFlag = b.optFlag | flagDecorrelate
join := &Join{
JoinType: tp,
baseLogicalPlan: newBaseLogicalPlan(TypeJoin, b.allocator),
}
ap := &Apply{Join: *join}
ap.initIDAndContext(b.ctx)
ap.self = ap
ap := Apply{Join: Join{JoinType: tp}}.init(b.allocator, b.ctx)
addChild(ap, outerPlan)
addChild(ap, innerPlan)
ap.SetSchema(expression.MergeSchema(outerPlan.Schema(), innerPlan.Schema()))
@ -1078,7 +1038,8 @@ func (b *planBuilder) buildSemiApply(outerPlan, innerPlan LogicalPlan, condition
b.optFlag = b.optFlag | flagDecorrelate
join := b.buildSemiJoin(outerPlan, innerPlan, condition, asScalar, not)
ap := &Apply{Join: *join}
ap.initIDAndContext(b.ctx)
ap.tp = TypeApply
ap.id = ap.tp + ap.allocator.allocID()
ap.self = ap
ap.children[0].SetParents(ap)
ap.children[1].SetParents(ap)
@ -1105,9 +1066,7 @@ out:
break out
}
}
exists := &Exists{baseLogicalPlan: newBaseLogicalPlan(TypeExists, b.allocator)}
exists.self = exists
exists.initIDAndContext(b.ctx)
exists := Exists{}.init(b.allocator, b.ctx)
addChild(exists, p)
newCol := &expression.Column{
FromID: exists.id,
@ -1118,18 +1077,14 @@ out:
}
func (b *planBuilder) buildMaxOneRow(p LogicalPlan) LogicalPlan {
maxOneRow := &MaxOneRow{baseLogicalPlan: newBaseLogicalPlan(TypeMaxOneRow, b.allocator)}
maxOneRow.self = maxOneRow
maxOneRow.initIDAndContext(b.ctx)
maxOneRow := MaxOneRow{}.init(b.allocator, b.ctx)
addChild(maxOneRow, p)
maxOneRow.SetSchema(p.Schema().Clone())
return maxOneRow
}
func (b *planBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onCondition []expression.Expression, asScalar bool, not bool) *Join {
joinPlan := &Join{baseLogicalPlan: newBaseLogicalPlan(TypeJoin, b.allocator)}
joinPlan.self = joinPlan
joinPlan.initIDAndContext(b.ctx)
joinPlan := Join{}.init(b.allocator, b.ctx)
for i, expr := range onCondition {
onCondition[i] = expr.Decorrelate(outerPlan.Schema())
}
@ -1197,10 +1152,7 @@ func (b *planBuilder) buildUpdate(update *ast.UpdateStmt) LogicalPlan {
return nil
}
p = np
updt := &Update{OrderedList: orderedList, baseLogicalPlan: newBaseLogicalPlan(TypeUpate, b.allocator)}
updt.ctx = b.ctx
updt.self = updt
updt.initIDAndContext(b.ctx)
updt := Update{OrderedList: orderedList}.init(b.allocator, b.ctx)
addChild(updt, p)
updt.SetSchema(p.Schema())
return updt
@ -1267,13 +1219,10 @@ func (b *planBuilder) buildDelete(delete *ast.DeleteStmt) LogicalPlan {
tables = delete.Tables.Tables
}
del := &Delete{
Tables: tables,
IsMultiTable: delete.IsMultiTable,
baseLogicalPlan: newBaseLogicalPlan(TypeDelete, b.allocator),
}
del.self = del
del.initIDAndContext(b.ctx)
del := Delete{
Tables: tables,
IsMultiTable: delete.IsMultiTable,
}.init(b.allocator, b.ctx)
addChild(del, p)
del.SetSchema(expression.NewSchema())

View File

@ -1137,11 +1137,9 @@ func (s *testPlanSuite) TestColumnPruning(c *C) {
}
func (s *testPlanSuite) TestAllocID(c *C) {
pA := &DataSource{baseLogicalPlan: newBaseLogicalPlan(TypeTableScan, new(idAllocator))}
pB := &DataSource{baseLogicalPlan: newBaseLogicalPlan(TypeTableScan, new(idAllocator))}
ctx := mockContext()
pA.initIDAndContext(ctx)
pB.initIDAndContext(ctx)
pA := DataSource{}.init(new(idAllocator), ctx)
pB := DataSource{}.init(new(idAllocator), ctx)
c.Assert(pA.id, Equals, pB.id)
}

View File

@ -22,6 +22,27 @@ import (
"github.com/pingcap/tidb/util/types"
)
var (
_ LogicalPlan = &Join{}
_ LogicalPlan = &Aggregation{}
_ LogicalPlan = &Projection{}
_ LogicalPlan = &Selection{}
_ LogicalPlan = &Apply{}
_ LogicalPlan = &Exists{}
_ LogicalPlan = &MaxOneRow{}
_ LogicalPlan = &TableDual{}
_ LogicalPlan = &DataSource{}
_ LogicalPlan = &Union{}
_ LogicalPlan = &Sort{}
_ LogicalPlan = &Update{}
_ LogicalPlan = &Delete{}
_ LogicalPlan = &SelectLock{}
_ LogicalPlan = &Limit{}
_ LogicalPlan = &Show{}
_ LogicalPlan = &Insert{}
_ LogicalPlan = &Analyze{}
)
// JoinType contains CrossJoin, InnerJoin, LeftOuterJoin, RightOuterJoin, FullOuterJoin, SemiJoin.
type JoinType int
@ -40,6 +61,7 @@ const (
// Join is the logical join plan.
type Join struct {
*basePlan
baseLogicalPlan
JoinType JoinType
@ -100,7 +122,10 @@ func (p *Join) extractCorrelatedCols() []*expression.CorrelatedColumn {
// Projection represents a select fields plan.
type Projection struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
Exprs []expression.Expression
}
@ -114,6 +139,7 @@ func (p *Projection) extractCorrelatedCols() []*expression.CorrelatedColumn {
// Aggregation represents an aggregate plan.
type Aggregation struct {
*basePlan
baseLogicalPlan
AggFuncs []expression.AggregationFunction
@ -138,7 +164,9 @@ func (p *Aggregation) extractCorrelatedCols() []*expression.CorrelatedColumn {
// Selection means a filter.
type Selection struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
// Originally the WHERE or ON condition is parsed into a single expression,
// but after we converted to CNF(Conjunctive normal form), it can be
@ -184,21 +212,28 @@ func (p *Apply) extractCorrelatedCols() []*expression.CorrelatedColumn {
// Exists checks if a query returns result.
type Exists struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
}
// MaxOneRow checks if a query returns no more than one row.
type MaxOneRow struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
}
// TableDual represents a dual table plan.
type TableDual struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
}
// DataSource represents a tablescan without condition push down.
type DataSource struct {
*basePlan
baseLogicalPlan
indexHints []*ast.IndexHint
@ -215,12 +250,16 @@ type DataSource struct {
// Union represents Union plan.
type Union struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
}
// Sort stands for the order by plan.
type Sort struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
ByItems []*ByItems
ExecLimit *Limit
@ -236,14 +275,18 @@ func (p *Sort) extractCorrelatedCols() []*expression.CorrelatedColumn {
// Update represents Update plan.
type Update struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
OrderedList []*expression.Assignment
}
// Delete represents a delete plan.
type Delete struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
Tables []*ast.TableName
IsMultiTable bool

View File

@ -28,13 +28,13 @@ func (ts *PhysicalTableScan) matchProperty(prop *requiredProperty, infos ...*phy
cost = float64(prop.limit.Count+prop.limit.Offset) * netWorkFactor
}
if len(prop.props) == 0 {
newTS := *ts
newTS := ts.Copy().(*PhysicalTableScan)
newTS.addLimit(prop.limit)
p := newTS.tryToAddUnionScan(&newTS)
p := newTS.tryToAddUnionScan(newTS)
return enforceProperty(prop, &physicalPlanInfo{p: p, cost: cost, count: infos[0].count})
}
if len(prop.props) == 1 && ts.pkCol != nil && ts.pkCol.Equal(prop.props[0].col, ts.ctx) {
sortedTS := *ts
sortedTS := ts.Copy().(*PhysicalTableScan)
sortedTS.Desc = prop.props[0].desc
sortedTS.KeepOrder = true
sortedTS.addLimit(prop.limit)
@ -42,14 +42,14 @@ func (ts *PhysicalTableScan) matchProperty(prop *requiredProperty, infos ...*phy
if len(sortedTS.tableFilterConditions) > 0 {
cost += rowCount * cpuFactor
}
p := sortedTS.tryToAddUnionScan(&sortedTS)
p := sortedTS.tryToAddUnionScan(sortedTS)
return enforceProperty(&requiredProperty{limit: prop.limit}, &physicalPlanInfo{
p: p,
cost: cost,
count: infos[0].count})
}
if prop.limit != nil {
sortedTS := *ts
sortedTS := ts.Copy().(*PhysicalTableScan)
success := sortedTS.addTopN(ts.ctx, prop)
if success {
cost += rowCount * cpuFactor
@ -57,7 +57,7 @@ func (ts *PhysicalTableScan) matchProperty(prop *requiredProperty, infos ...*phy
cost = rowCount * netWorkFactor
}
sortedTS.KeepOrder = true
p := sortedTS.tryToAddUnionScan(&sortedTS)
p := sortedTS.tryToAddUnionScan(sortedTS)
return enforceProperty(prop, &physicalPlanInfo{
p: p,
cost: cost,
@ -135,11 +135,11 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy
}
sortedCost := cost + rowCount*cpuFactor
if allAsc || allDesc {
sortedIS := *is
sortedIS := is.Copy().(*PhysicalIndexScan)
sortedIS.OutOfOrder = false
sortedIS.Desc = allDesc && !allAsc
sortedIS.addLimit(prop.limit)
p := sortedIS.tryToAddUnionScan(&sortedIS)
p := sortedIS.tryToAddUnionScan(sortedIS)
return enforceProperty(&requiredProperty{limit: prop.limit}, &physicalPlanInfo{
p: p,
cost: sortedCost,
@ -147,7 +147,7 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy
}
}
if prop.limit != nil {
sortedIS := *is
sortedIS := is.Copy().(*PhysicalIndexScan)
success := sortedIS.addTopN(is.ctx, prop)
if success {
cost += float64(infos[0].count) * cpuFactor
@ -155,7 +155,7 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy
cost = float64(infos[0].count) * netWorkFactor
}
sortedIS.OutOfOrder = true
p := sortedIS.tryToAddUnionScan(&sortedIS)
p := sortedIS.tryToAddUnionScan(sortedIS)
return enforceProperty(prop, &physicalPlanInfo{
p: p,
cost: cost,
@ -167,17 +167,17 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *PhysicalHashSemiJoin) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo {
lRes, rRes := childPlanInfo[0], childPlanInfo[1]
np := *p
np := p.Copy()
np.SetChildren(lRes.p, rRes.p)
cost := lRes.cost + rRes.cost
return &physicalPlanInfo{p: &np, cost: cost}
return &physicalPlanInfo{p: np, cost: cost}
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *PhysicalApply) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo {
np := *p
np := p.Copy()
np.SetChildren(childPlanInfo[0].p)
return &physicalPlanInfo{p: &np, cost: childPlanInfo[0].cost}
return &physicalPlanInfo{p: np, cost: childPlanInfo[0].cost}
}
func estimateJoinCount(lc uint64, rc uint64) uint64 {
@ -191,19 +191,19 @@ func estimateJoinCount(lc uint64, rc uint64) uint64 {
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *PhysicalMergeJoin) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo {
lRes, rRes := childPlanInfo[0], childPlanInfo[1]
np := *p
np := p.Copy()
np.SetChildren(lRes.p, rRes.p)
cost := lRes.cost + rRes.cost
return &physicalPlanInfo{p: &np, cost: cost, count: estimateJoinCount(lRes.count, rRes.count)}
return &physicalPlanInfo{p: np, cost: cost, count: estimateJoinCount(lRes.count, rRes.count)}
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *PhysicalHashJoin) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo {
lRes, rRes := childPlanInfo[0], childPlanInfo[1]
lCount, rCount := float64(lRes.count), float64(rRes.count)
np := *p
np := p.Copy().(*PhysicalHashJoin)
np.SetChildren(lRes.p, rRes.p)
if len(prop.props) != 0 {
np.Concurrency = 1
@ -214,12 +214,12 @@ func (p *PhysicalHashJoin) matchProperty(prop *requiredProperty, childPlanInfo .
} else {
cost += rCount + memoryFactor*lCount
}
return &physicalPlanInfo{p: &np, cost: cost, count: estimateJoinCount(lRes.count, rRes.count)}
return &physicalPlanInfo{p: np, cost: cost, count: estimateJoinCount(lRes.count, rRes.count)}
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *Union) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo {
np := *p
np := p.Copy()
children := make([]Plan, 0, len(childPlanInfo))
cost := float64(0)
count := uint64(0)
@ -229,16 +229,16 @@ func (p *Union) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPla
count += res.count
}
np.SetChildren(children...)
return &physicalPlanInfo{p: &np, cost: cost, count: count}
return &physicalPlanInfo{p: np, cost: cost, count: count}
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *Selection) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo {
if p.onTable {
res := p.children[0].(PhysicalPlan).matchProperty(prop, childPlanInfo...)
sel := *p
sel := p.Copy()
sel.SetChildren(res.p)
res.p = &sel
res.p = sel
res.count = uint64(float64(res.count) * selectionFactor)
return res
}
@ -249,9 +249,9 @@ func (p *Selection) matchProperty(prop *requiredProperty, childPlanInfo ...*phys
func (p *PhysicalUnionScan) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo {
limit := prop.limit
res := p.children[0].(PhysicalPlan).matchProperty(convertLimitOffsetToCount(prop), childPlanInfo...)
np := *p
np := p.Copy()
np.SetChildren(res.p)
res.p = &np
res.p = np
if limit != nil {
res = addPlanToResponse(limit, res)
}
@ -260,9 +260,9 @@ func (p *PhysicalUnionScan) matchProperty(prop *requiredProperty, childPlanInfo
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *Projection) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo {
np := *p
np := p.Copy()
np.SetChildren(childPlanInfo[0].p)
return &physicalPlanInfo{p: &np, cost: childPlanInfo[0].cost}
return &physicalPlanInfo{p: np, cost: childPlanInfo[0].cost}
}
// matchProperty implements PhysicalPlan matchProperty interface.
@ -271,77 +271,7 @@ func (p *Analyze) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalP
for _, res := range childPlanInfo {
children = append(children, res.p)
}
np := *p
np := p.Copy()
np.SetChildren(children...)
return &physicalPlanInfo{p: &np}
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *Cache) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *MaxOneRow) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *Exists) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *PhysicalAggregation) matchProperty(prop *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *Limit) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *TableDual) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *Sort) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *Insert) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *SelectLock) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *Update) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *PhysicalDummyScan) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *Delete) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *Show) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// matchProperty implements PhysicalPlan matchProperty interface.
func (p *PhysicalMemTable) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
return &physicalPlanInfo{p: np}
}

View File

@ -42,17 +42,14 @@ var JoinConcurrency = 5
func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInfo, error) {
client := p.ctx.GetClient()
ts := &PhysicalTableScan{
ts := PhysicalTableScan{
Table: p.tableInfo,
Columns: p.Columns,
TableAsName: p.TableAsName,
DBName: p.DBName,
physicalTableSource: physicalTableSource{client: client},
}
ts.tp = TypeTableScan
ts.allocator = p.allocator
}.init(p.allocator, p.ctx)
ts.SetSchema(p.Schema())
ts.initIDAndContext(p.ctx)
if p.ctx.Txn() != nil {
ts.readOnly = p.ctx.Txn().IsReadOnly()
} else {
@ -64,7 +61,7 @@ func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInf
table := p.tableInfo
sc := p.ctx.GetSessionVars().StmtCtx
if sel, ok := p.parents[0].(*Selection); ok {
newSel := *sel
newSel := sel.Copy().(*Selection)
conds := make([]expression.Expression, 0, len(sel.Conditions))
for _, cond := range sel.Conditions {
conds = append(conds, cond.Clone())
@ -80,7 +77,7 @@ func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInf
if len(newSel.Conditions) > 0 {
newSel.SetChildren(ts)
newSel.onTable = true
resultPlan = &newSel
resultPlan = newSel
}
} else {
ts.Ranges = []TableRange{{math.MinInt64, math.MaxInt64}}
@ -115,7 +112,7 @@ func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInf
func (p *DataSource) convert2IndexScan(prop *requiredProperty, index *model.IndexInfo) (*physicalPlanInfo, error) {
client := p.ctx.GetClient()
is := &PhysicalIndexScan{
is := PhysicalIndexScan{
Index: index,
Table: p.tableInfo,
Columns: p.Columns,
@ -123,10 +120,7 @@ func (p *DataSource) convert2IndexScan(prop *requiredProperty, index *model.Inde
OutOfOrder: true,
DBName: p.DBName,
physicalTableSource: physicalTableSource{client: client},
}
is.tp = TypeIdxScan
is.allocator = p.allocator
is.initIDAndContext(p.ctx)
}.init(p.allocator, p.ctx)
is.SetSchema(p.schema)
if p.ctx.Txn() != nil {
is.readOnly = p.ctx.Txn().IsReadOnly()
@ -140,7 +134,7 @@ func (p *DataSource) convert2IndexScan(prop *requiredProperty, index *model.Inde
rowCount := uint64(statsTbl.Count)
sc := p.ctx.GetSessionVars().StmtCtx
if sel, ok := p.parents[0].(*Selection); ok {
newSel := *sel
newSel := sel.Copy().(*Selection)
conds := make([]expression.Expression, 0, len(sel.Conditions))
for _, cond := range sel.Conditions {
conds = append(conds, cond.Clone())
@ -168,7 +162,7 @@ func (p *DataSource) convert2IndexScan(prop *requiredProperty, index *model.Inde
if len(newSel.Conditions) > 0 {
newSel.SetChildren(is)
newSel.onTable = true
resultPlan = &newSel
resultPlan = newSel
}
} else {
rb := rangeBuilder{sc: p.ctx.GetSessionVars().StmtCtx}
@ -225,15 +219,12 @@ func (p *DataSource) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlan
memDB := infoschema.IsMemoryDB(p.DBName.L)
isDistReq := !memDB && client != nil && client.SupportRequestType(kv.ReqTypeSelect, 0)
if !isDistReq {
memTable := &PhysicalMemTable{
memTable := PhysicalMemTable{
DBName: p.DBName,
Table: p.tableInfo,
Columns: p.Columns,
TableAsName: p.TableAsName,
}
memTable.tp = TypeMemTableScan
memTable.allocator = p.allocator
memTable.initIDAndContext(p.ctx)
}.init(p.allocator, p.ctx)
memTable.SetSchema(p.schema)
rb := &rangeBuilder{sc: p.ctx.GetSessionVars().StmtCtx}
memTable.Ranges = rb.buildTableRanges(fullRange)
@ -278,10 +269,7 @@ func (p *DataSource) tryToConvert2DummyScan(prop *requiredProperty) (*physicalPl
return nil, errors.Trace(err)
}
if !result {
dummy := &PhysicalDummyScan{}
dummy.tp = TypeDummy
dummy.allocator = p.allocator
dummy.initIDAndContext(p.ctx)
dummy := PhysicalDummyScan{}.init(p.allocator, p.ctx)
dummy.SetSchema(p.schema)
info := &physicalPlanInfo{p: dummy}
p.storePlanInfo(prop, info)
@ -310,13 +298,10 @@ func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPl
for _, col := range prop.props {
items = append(items, &ByItems{Expr: col.col, Desc: col.desc})
}
sort := &Sort{
sort := Sort{
ByItems: items,
ExecLimit: prop.limit,
}
sort.tp = TypeSort
sort.allocator = info.p.Allocator()
sort.initIDAndContext(info.p.context())
}.init(info.p.Allocator(), info.p.context())
sort.SetSchema(info.p.Schema())
info = addPlanToResponse(sort, info)
@ -326,10 +311,7 @@ func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPl
}
info.cost += sortCost(count)
} else if prop.limit != nil {
limit := prop.limit.Copy().(*Limit)
limit.tp = TypeLimit
limit.allocator = info.p.Allocator()
limit.initIDAndContext(info.p.context())
limit := Limit{Offset: prop.limit.Offset, Count: prop.limit.Count}.init(info.p.Allocator(), info.p.context())
limit.SetSchema(info.p.Schema())
info = addPlanToResponse(limit, info)
}
@ -409,18 +391,14 @@ func (p *Join) convert2PhysicalPlanSemi(prop *requiredProperty) (*physicalPlanIn
allLeft = false
}
}
join := &PhysicalHashSemiJoin{
join := PhysicalHashSemiJoin{
WithAux: LeftOuterSemiJoin == p.JoinType,
EqualConditions: p.EqualConditions,
LeftConditions: p.LeftConditions,
RightConditions: p.RightConditions,
OtherConditions: p.OtherConditions,
Anti: p.anti,
}
join.ctx = p.ctx
join.tp = TypeHashSemiJoin
join.allocator = p.allocator
join.initIDAndContext(p.ctx)
}.init(p.allocator, p.ctx)
join.SetSchema(p.schema)
lProp := prop
if !allLeft {
@ -461,7 +439,7 @@ func (p *Join) convert2PhysicalPlanLeft(prop *requiredProperty, innerJoin bool)
allLeft = false
}
}
join := &PhysicalHashJoin{
join := PhysicalHashJoin{
EqualConditions: p.EqualConditions,
LeftConditions: p.LeftConditions,
RightConditions: p.RightConditions,
@ -470,10 +448,7 @@ func (p *Join) convert2PhysicalPlanLeft(prop *requiredProperty, innerJoin bool)
// TODO: decide concurrency by data size.
Concurrency: JoinConcurrency,
DefaultValues: p.DefaultValues,
}
join.tp = TypeHashLeftJoin
join.allocator = p.allocator
join.initIDAndContext(lChild.context())
}.init(p.allocator, p.ctx)
join.SetSchema(p.schema)
if innerJoin {
join.JoinType = InnerJoin
@ -534,7 +509,7 @@ func (p *Join) convert2PhysicalPlanRight(prop *requiredProperty, innerJoin bool)
allRight = false
}
}
join := &PhysicalHashJoin{
join := PhysicalHashJoin{
EqualConditions: p.EqualConditions,
LeftConditions: p.LeftConditions,
RightConditions: p.RightConditions,
@ -542,10 +517,7 @@ func (p *Join) convert2PhysicalPlanRight(prop *requiredProperty, innerJoin bool)
// TODO: decide concurrency by data size.
Concurrency: JoinConcurrency,
DefaultValues: p.DefaultValues,
}
join.tp = TypeHashRightJoin
join.allocator = p.allocator
join.initIDAndContext(p.ctx)
}.init(p.allocator, p.ctx)
join.SetSchema(p.schema)
if innerJoin {
join.JoinType = InnerJoin
@ -649,7 +621,7 @@ func (p *Join) convert2PhysicalMergeJoin(parentProp *requiredProperty, lProp *re
otherFilter := append(expression.ScalarFuncs2Exprs(newEQConds), p.OtherConditions...)
join := &PhysicalMergeJoin{
join := PhysicalMergeJoin{
EqualConditions: []*expression.ScalarFunction{eqCond},
LeftConditions: p.LeftConditions,
RightConditions: p.RightConditions,
@ -657,10 +629,7 @@ func (p *Join) convert2PhysicalMergeJoin(parentProp *requiredProperty, lProp *re
DefaultValues: p.DefaultValues,
// Assume order for both side are the same
Desc: lProp.props[0].desc,
}
join.tp = TypeMergeJoin
join.allocator = p.allocator
join.initIDAndContext(p.ctx)
}.init(p.allocator, p.ctx)
join.SetSchema(p.schema)
join.JoinType = joinType
@ -831,14 +800,11 @@ func (p *Aggregation) convert2PhysicalPlanStream(prop *requiredProperty) (*physi
return &physicalPlanInfo{cost: math.MaxFloat64}, nil
}
}
agg := &PhysicalAggregation{
agg := PhysicalAggregation{
AggType: StreamedAgg,
AggFuncs: p.AggFuncs,
GroupByItems: p.GroupByItems,
}
agg.tp = TypeStreamAgg
agg.allocator = p.allocator
agg.initIDAndContext(p.ctx)
}.init(p.allocator, p.ctx)
agg.HasGby = len(p.GroupByItems) > 0
agg.SetSchema(p.schema)
// TODO: Consider distinct key.
@ -879,14 +845,11 @@ func (p *Aggregation) convert2PhysicalPlanStream(prop *requiredProperty) (*physi
// convert2PhysicalPlanFinalHash converts the logical aggregation to the final hash aggregation *physicalPlanInfo.
func (p *Aggregation) convert2PhysicalPlanFinalHash(x physicalDistSQLPlan, childInfo *physicalPlanInfo) *physicalPlanInfo {
agg := &PhysicalAggregation{
agg := PhysicalAggregation{
AggType: FinalAgg,
AggFuncs: p.AggFuncs,
GroupByItems: p.GroupByItems,
}
agg.tp = TypeHashAgg
agg.allocator = p.allocator
agg.initIDAndContext(p.ctx)
}.init(p.allocator, p.ctx)
agg.SetSchema(p.schema)
agg.HasGby = len(p.GroupByItems) > 0
schema := x.addAggregation(p.ctx, agg)
@ -903,14 +866,11 @@ func (p *Aggregation) convert2PhysicalPlanFinalHash(x physicalDistSQLPlan, child
// convert2PhysicalPlanCompleteHash converts the logical aggregation to the complete hash aggregation *physicalPlanInfo.
func (p *Aggregation) convert2PhysicalPlanCompleteHash(childInfo *physicalPlanInfo) *physicalPlanInfo {
agg := &PhysicalAggregation{
agg := PhysicalAggregation{
AggType: CompleteAgg,
AggFuncs: p.AggFuncs,
GroupByItems: p.GroupByItems,
}
agg.tp = TypeHashAgg
agg.allocator = p.allocator
agg.initIDAndContext(p.ctx)
}.init(p.allocator, p.ctx)
agg.HasGby = len(p.GroupByItems) > 0
agg.SetSchema(p.schema)
info := addPlanToResponse(agg, childInfo)
@ -1046,17 +1006,14 @@ func (p *Selection) makeScanController(onlyJudge bool) (*physicalPlanInfo, bool)
}
return nil, false
}
ts := &PhysicalTableScan{
ts := PhysicalTableScan{
Table: ds.tableInfo,
Columns: ds.Columns,
TableAsName: ds.TableAsName,
DBName: ds.DBName,
physicalTableSource: physicalTableSource{client: ds.ctx.GetClient()},
}
ts.tp = TypeTableScan
ts.allocator = ds.allocator
}.init(p.allocator, p.ctx)
ts.SetSchema(ds.schema)
ts.initIDAndContext(ds.ctx)
if ds.ctx.Txn() != nil {
ts.readOnly = p.ctx.Txn().IsReadOnly()
} else {
@ -1077,7 +1034,7 @@ func (p *Selection) makeScanController(onlyJudge bool) (*physicalPlanInfo, bool)
for _, cond := range corColConds {
condsBackUp = append(condsBackUp, cond.Clone())
}
is := &PhysicalIndexScan{
is := PhysicalIndexScan{
Table: ds.tableInfo,
Index: idx,
Columns: ds.Columns,
@ -1085,11 +1042,8 @@ func (p *Selection) makeScanController(onlyJudge bool) (*physicalPlanInfo, bool)
OutOfOrder: true,
DBName: ds.DBName,
physicalTableSource: physicalTableSource{client: ds.ctx.GetClient()},
}
is.tp = TypeIdxScan
is.allocator = ds.allocator
}.init(p.allocator, p.ctx)
is.SetSchema(ds.schema)
is.initIDAndContext(ds.ctx)
if is.ctx.Txn() != nil {
is.readOnly = p.ctx.Txn().IsReadOnly()
} else {
@ -1114,11 +1068,11 @@ func (p *Selection) makeScanController(onlyJudge bool) (*physicalPlanInfo, bool)
}
child = chosenPlan
}
newSel := *p
newSel := p.Copy().(*Selection)
newSel.ScanController = true
newSel.SetChildren(child)
info := &physicalPlanInfo{
p: &newSel,
p: newSel,
count: uint64(ds.statisticTable.Count),
}
info.cost = float64(info.count) * selectionFactor
@ -1161,10 +1115,10 @@ func (p *Selection) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanI
}
func (p *Selection) appendSelToInfo(info *physicalPlanInfo) *physicalPlanInfo {
np := *p
np := p.Copy().(*Selection)
np.SetChildren(info.p)
return &physicalPlanInfo{
p: &np,
p: np,
cost: info.cost,
count: uint64(float64(info.count) * selectionFactor),
}
@ -1336,9 +1290,9 @@ func (p *Sort) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo,
sortedPlanInfo = addPlanToResponse(np, sortedPlanInfo)
} else if sortCost+unSortedPlanInfo.cost < sortedPlanInfo.cost {
sortedPlanInfo.cost = sortCost + unSortedPlanInfo.cost
np := *p
np := p.Copy().(*Sort)
np.ExecLimit = selfProp.limit
sortedPlanInfo = addPlanToResponse(&np, unSortedPlanInfo)
sortedPlanInfo = addPlanToResponse(np, unSortedPlanInfo)
}
if !matchProp(p.ctx, prop, selfProp) {
sortedPlanInfo.cost = math.MaxFloat64
@ -1366,13 +1320,10 @@ func (p *Apply) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo,
}
switch info.p.(type) {
case *PhysicalHashJoin, *PhysicalHashSemiJoin:
ap := &PhysicalApply{
ap := PhysicalApply{
PhysicalJoin: info.p,
OuterSchema: p.corCols,
}
ap.tp = TypeApply
ap.allocator = p.allocator
ap.initIDAndContext(p.ctx)
}.init(p.allocator, p.ctx)
ap.SetChildren(info.p.Children()...)
ap.SetSchema(info.p.Schema())
info.p = ap
@ -1386,17 +1337,14 @@ func (p *Apply) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo,
}
func (p *Analyze) prepareSimpleTableScan(cols []*model.ColumnInfo) *PhysicalTableScan {
ts := &PhysicalTableScan{
ts := PhysicalTableScan{
Table: p.Table.TableInfo,
Columns: cols,
TableAsName: &p.Table.Name,
DBName: p.Table.DBInfo.Name,
physicalTableSource: physicalTableSource{client: p.ctx.GetClient()},
}
ts.tp = TypeTableScan
ts.allocator = p.allocator
}.init(p.allocator, p.ctx)
ts.SetSchema(expression.NewSchema(expression.ColumnInfos2Columns(ts.Table.Name, cols)...))
ts.initIDAndContext(p.ctx)
ts.readOnly = true
ts.Ranges = []TableRange{{math.MinInt64, math.MaxInt64}}
return ts
@ -1404,7 +1352,7 @@ func (p *Analyze) prepareSimpleTableScan(cols []*model.ColumnInfo) *PhysicalTabl
func (p *Analyze) prepareSimpleIndexScan(idxOffset int, cols []*model.ColumnInfo) *PhysicalIndexScan {
tblInfo := p.Table.TableInfo
is := &PhysicalIndexScan{
is := PhysicalIndexScan{
Index: tblInfo.Indices[idxOffset],
Table: tblInfo,
Columns: cols,
@ -1413,10 +1361,7 @@ func (p *Analyze) prepareSimpleIndexScan(idxOffset int, cols []*model.ColumnInfo
DBName: p.Table.DBInfo.Name,
physicalTableSource: physicalTableSource{client: p.ctx.GetClient()},
DoubleRead: false,
}
is.tp = TypeAnalyze
is.allocator = p.allocator
is.initIDAndContext(p.ctx)
}.init(p.allocator, p.ctx)
is.SetSchema(expression.NewSchema(expression.ColumnInfos2Columns(tblInfo.Name, cols)...))
is.readOnly = true
rb := rangeBuilder{sc: p.ctx.GetSessionVars().StmtCtx}
@ -1485,10 +1430,7 @@ func addCachePlan(p PhysicalPlan, allocator *idAllocator) []*expression.Correlat
childCorCols := addCachePlan(child.(PhysicalPlan), allocator)
// If p is a Selection and controls the access condition of below scan plan, there shouldn't have a cache plan.
if sel, ok := p.(*Selection); len(selfCorCols) > 0 && len(childCorCols) == 0 && (!ok || !sel.ScanController) {
newChild := &Cache{}
newChild.tp = TypeCache
newChild.allocator = allocator
newChild.initIDAndContext(p.context())
newChild := Cache{}.init(p.Allocator(), p.context())
newChild.SetSchema(child.Schema())
addChild(newChild, child)

View File

@ -36,6 +36,33 @@ var (
_ physicalDistSQLPlan = &PhysicalIndexScan{}
)
var (
_ PhysicalPlan = &Selection{}
_ PhysicalPlan = &Projection{}
_ PhysicalPlan = &Exists{}
_ PhysicalPlan = &MaxOneRow{}
_ PhysicalPlan = &TableDual{}
_ PhysicalPlan = &Union{}
_ PhysicalPlan = &Sort{}
_ PhysicalPlan = &Update{}
_ PhysicalPlan = &Delete{}
_ PhysicalPlan = &SelectLock{}
_ PhysicalPlan = &Limit{}
_ PhysicalPlan = &Show{}
_ PhysicalPlan = &Insert{}
_ PhysicalPlan = &Analyze{}
_ PhysicalPlan = &PhysicalIndexScan{}
_ PhysicalPlan = &PhysicalTableScan{}
_ PhysicalPlan = &PhysicalAggregation{}
_ PhysicalPlan = &PhysicalApply{}
_ PhysicalPlan = &PhysicalDummyScan{}
_ PhysicalPlan = &PhysicalHashJoin{}
_ PhysicalPlan = &PhysicalHashSemiJoin{}
_ PhysicalPlan = &PhysicalMergeJoin{}
_ PhysicalPlan = &PhysicalUnionScan{}
_ PhysicalPlan = &Cache{}
)
// PhysicalIndexScan represents an index scan plan.
type PhysicalIndexScan struct {
physicalTableSource
@ -61,7 +88,8 @@ type PhysicalIndexScan struct {
// PhysicalMemTable reads memory table.
type PhysicalMemTable struct {
basePlan
*basePlan
basePhysicalPlan
DBName model.CIStr
Table *model.TableInfo
@ -72,7 +100,10 @@ type PhysicalMemTable struct {
// Copy implements the PhysicalPlan Copy interface.
func (p *PhysicalMemTable) Copy() PhysicalPlan {
return &(*p)
np := *p
np.basePlan = p.basePlan.copy()
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// physicalDistSQLPlan means the plan that can be executed distributively.
@ -114,7 +145,8 @@ func (p *PhysicalTableScan) calculateCost(resultCount uint64, scanCount uint64)
}
type physicalTableSource struct {
basePlan
*basePlan
basePhysicalPlan
client kv.Client
@ -217,12 +249,9 @@ func (p *physicalTableSource) tryToAddUnionScan(resultPlan PhysicalPlan) Physica
return resultPlan
}
conditions := append(p.indexFilterConditions, p.tableFilterConditions...)
us := &PhysicalUnionScan{
us := PhysicalUnionScan{
Condition: expression.ComposeCNFCondition(p.ctx, append(conditions, p.AccessCondition...)...),
}
us.tp = TypeUnionScan
us.allocator = p.allocator
us.initIDAndContext(p.ctx)
}.init(p.allocator, p.ctx)
us.SetChildren(resultPlan)
us.SetSchema(resultPlan.Schema())
return us
@ -342,12 +371,14 @@ type PhysicalTableScan struct {
// PhysicalDummyScan is a dummy table that returns nothing.
type PhysicalDummyScan struct {
basePlan
*basePlan
basePhysicalPlan
}
// PhysicalApply represents apply plan, only used for subquery.
type PhysicalApply struct {
basePlan
*basePlan
basePhysicalPlan
PhysicalJoin PhysicalPlan
OuterSchema []*expression.CorrelatedColumn
@ -355,7 +386,8 @@ type PhysicalApply struct {
// PhysicalHashJoin represents hash join for inner/ outer join.
type PhysicalHashJoin struct {
basePlan
*basePlan
basePhysicalPlan
JoinType JoinType
@ -371,7 +403,8 @@ type PhysicalHashJoin struct {
// PhysicalMergeJoin represents merge join for inner/ outer join.
type PhysicalMergeJoin struct {
basePlan
*basePlan
basePhysicalPlan
JoinType JoinType
@ -386,7 +419,8 @@ type PhysicalMergeJoin struct {
// PhysicalHashSemiJoin represents hash join for semi join.
type PhysicalHashSemiJoin struct {
basePlan
*basePlan
basePhysicalPlan
WithAux bool
Anti bool
@ -411,7 +445,8 @@ const (
// PhysicalAggregation is Aggregation's physical plan.
type PhysicalAggregation struct {
basePlan
*basePlan
basePhysicalPlan
HasGby bool
AggType AggregationType
@ -421,14 +456,16 @@ type PhysicalAggregation struct {
// PhysicalUnionScan represents a union scan operator.
type PhysicalUnionScan struct {
basePlan
*basePlan
basePhysicalPlan
Condition expression.Expression
}
// Cache plan is a physical plan which stores the result of its child node.
type Cache struct {
basePlan
*basePlan
basePhysicalPlan
}
func (p *PhysicalMergeJoin) tryConsumeOrder(prop *requiredProperty, eqCond *expression.ScalarFunction) *requiredProperty {
@ -530,6 +567,8 @@ func (p *PhysicalAggregation) extractCorrelatedCols() []*expression.CorrelatedCo
// Copy implements the PhysicalPlan Copy interface.
func (p *PhysicalIndexScan) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
@ -564,6 +603,8 @@ func (p *PhysicalIndexScan) IsPointGetByUniqueKey(sc *variable.StatementContext)
// Copy implements the PhysicalPlan Copy interface.
func (p *PhysicalTableScan) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
@ -596,6 +637,8 @@ func (p *PhysicalMemTable) MarshalJSON() ([]byte, error) {
// Copy implements the PhysicalPlan Copy interface.
func (p *PhysicalApply) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
@ -616,6 +659,8 @@ func (p *PhysicalApply) MarshalJSON() ([]byte, error) {
// Copy implements the PhysicalPlan Copy interface.
func (p *PhysicalHashSemiJoin) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
@ -657,12 +702,16 @@ func (p *PhysicalHashSemiJoin) MarshalJSON() ([]byte, error) {
// Copy implements the PhysicalPlan Copy interface.
func (p *PhysicalHashJoin) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the PhysicalPlan Copy interface.
func (p *PhysicalMergeJoin) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
@ -736,6 +785,9 @@ func (p *PhysicalMergeJoin) MarshalJSON() ([]byte, error) {
// Copy implements the PhysicalPlan Copy interface.
func (p *Selection) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
@ -756,6 +808,9 @@ func (p *Selection) MarshalJSON() ([]byte, error) {
// Copy implements the PhysicalPlan Copy interface.
func (p *Projection) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
@ -775,24 +830,36 @@ func (p *Projection) MarshalJSON() ([]byte, error) {
// Copy implements the PhysicalPlan Copy interface.
func (p *Exists) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the PhysicalPlan Copy interface.
func (p *MaxOneRow) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the PhysicalPlan Copy interface.
func (p *Insert) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the PhysicalPlan Copy interface.
func (p *Limit) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
@ -810,6 +877,9 @@ func (p *Limit) MarshalJSON() ([]byte, error) {
// Copy implements the PhysicalPlan Copy interface.
func (p *Union) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
@ -843,18 +913,26 @@ func (p *Sort) MarshalJSON() ([]byte, error) {
// Copy implements the PhysicalPlan Copy interface.
func (p *TableDual) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the PhysicalPlan Copy interface.
func (p *SelectLock) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the PhysicalPlan Copy interface.
func (p *PhysicalAggregation) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
@ -879,41 +957,59 @@ func (p *PhysicalAggregation) MarshalJSON() ([]byte, error) {
// Copy implements the PhysicalPlan Copy interface.
func (p *Update) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the PhysicalPlan Copy interface.
func (p *PhysicalDummyScan) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the PhysicalPlan Copy interface.
func (p *Delete) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the PhysicalPlan Copy interface.
func (p *Show) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the PhysicalPlan Copy interface.
func (p *PhysicalUnionScan) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the PhysicalPlan Copy interface.
func (p *Cache) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
// Copy implements the Analyze Copy interface.
func (p *Analyze) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}

View File

@ -214,9 +214,12 @@ type PhysicalPlan interface {
}
type baseLogicalPlan struct {
basePlan
planMap map[string]*physicalPlanInfo
self LogicalPlan
basePlan *basePlan
planMap map[string]*physicalPlanInfo
}
type basePhysicalPlan struct {
basePlan *basePlan
}
func (p *baseLogicalPlan) getPlanInfo(prop *requiredProperty) (*physicalPlanInfo, error) {
@ -235,15 +238,15 @@ func (p *baseLogicalPlan) convert2PhysicalPlan(prop *requiredProperty) (*physica
if info != nil {
return info, nil
}
if len(p.children) == 0 {
return &physicalPlanInfo{p: p.self.(PhysicalPlan)}, nil
if len(p.basePlan.children) == 0 {
return &physicalPlanInfo{p: p.basePlan.self.(PhysicalPlan)}, nil
}
child := p.children[0].(LogicalPlan)
child := p.basePlan.children[0].(LogicalPlan)
info, err = child.convert2PhysicalPlan(prop)
if err != nil {
return nil, errors.Trace(err)
}
info = addPlanToResponse(p.self.(PhysicalPlan), info)
info = addPlanToResponse(p.basePlan.self.(PhysicalPlan), info)
return info, p.storePlanInfo(prop, info)
}
@ -258,50 +261,67 @@ func (p *baseLogicalPlan) storePlanInfo(prop *requiredProperty, info *physicalPl
}
func (p *baseLogicalPlan) buildKeyInfo() {
for _, child := range p.Children() {
for _, child := range p.basePlan.children {
child.(LogicalPlan).buildKeyInfo()
}
if len(p.children) == 1 {
switch p.self.(type) {
if len(p.basePlan.children) == 1 {
switch p.basePlan.self.(type) {
case *Exists, *Aggregation, *Projection:
p.schema.Keys = nil
p.basePlan.schema.Keys = nil
case *SelectLock:
p.schema.Keys = p.children[0].Schema().Keys
p.basePlan.schema.Keys = p.basePlan.children[0].Schema().Keys
default:
p.schema.Keys = p.children[0].Schema().Clone().Keys
p.basePlan.schema.Keys = p.basePlan.children[0].Schema().Clone().Keys
}
} else {
p.schema.Keys = nil
p.basePlan.schema.Keys = nil
}
}
func newBaseLogicalPlan(tp string, a *idAllocator) baseLogicalPlan {
return baseLogicalPlan{
planMap: make(map[string]*physicalPlanInfo),
basePlan: basePlan{
tp: tp,
allocator: a,
},
func newBasePlan(tp string, allocator *idAllocator, ctx context.Context, p Plan) *basePlan {
return &basePlan{
tp: tp,
allocator: allocator,
id: tp + allocator.allocID(),
ctx: ctx,
self: p,
}
}
func newBaseLogicalPlan(basePlan *basePlan) baseLogicalPlan {
return baseLogicalPlan{
planMap: make(map[string]*physicalPlanInfo),
basePlan: basePlan,
}
}
func newBasePhysicalPlan(basePlan *basePlan) basePhysicalPlan {
return basePhysicalPlan{
basePlan: basePlan,
}
}
func (p *basePhysicalPlan) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo {
panic("You can't call this function!")
}
// PredicatePushDown implements LogicalPlan interface.
func (p *baseLogicalPlan) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan, error) {
if len(p.Children()) == 0 {
return predicates, p.self, nil
if len(p.basePlan.children) == 0 {
return predicates, p.basePlan.self.(LogicalPlan), nil
}
child := p.children[0].(LogicalPlan)
child := p.basePlan.children[0].(LogicalPlan)
rest, _, err := child.PredicatePushDown(predicates)
if err != nil {
return nil, nil, errors.Trace(err)
}
if len(rest) > 0 {
err = addSelection(p, child, rest, p.allocator)
err = addSelection(p.basePlan.self, child, rest, p.basePlan.allocator)
if err != nil {
return nil, nil, errors.Trace(err)
}
}
return nil, p.self, nil
return nil, p.basePlan.self.(LogicalPlan), nil
}
func (p *basePlan) extractCorrelatedCols() []*expression.CorrelatedColumn {
@ -318,24 +338,19 @@ func (p *basePlan) Allocator() *idAllocator {
// ResolveIndicesAndCorCols implements LogicalPlan interface.
func (p *baseLogicalPlan) ResolveIndicesAndCorCols() {
for _, child := range p.children {
for _, child := range p.basePlan.children {
child.(LogicalPlan).ResolveIndicesAndCorCols()
}
}
// PruneColumns implements LogicalPlan interface.
func (p *baseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column) {
if len(p.children) == 0 {
if len(p.basePlan.children) == 0 {
return
}
child := p.children[0].(LogicalPlan)
child := p.basePlan.children[0].(LogicalPlan)
child.PruneColumns(parentUsedCols)
p.SetSchema(child.Schema())
}
func (p *basePlan) initIDAndContext(ctx context.Context) {
p.id = p.tp + p.allocator.allocID()
p.ctx = ctx
p.basePlan.SetSchema(child.Schema())
}
// basePlan implements base Plan interface.
@ -349,6 +364,12 @@ type basePlan struct {
id string
allocator *idAllocator
ctx context.Context
self Plan
}
func (p *basePlan) copy() *basePlan {
np := *p
return &np
}
// MarshalJSON implements json.Marshaler interface.

View File

@ -167,10 +167,7 @@ func (b *planBuilder) buildExecute(v *ast.ExecuteStmt) Plan {
func (b *planBuilder) buildDo(v *ast.DoStmt) Plan {
exprs := make([]expression.Expression, 0, len(v.Exprs))
dual := &TableDual{
baseLogicalPlan: newBaseLogicalPlan(TypeDual, b.allocator),
}
dual.self = dual
dual := TableDual{}.init(b.allocator, b.ctx)
for _, astExpr := range v.Exprs {
expr, _, err := b.rewrite(astExpr, dual, nil, true)
if err != nil {
@ -180,11 +177,7 @@ func (b *planBuilder) buildDo(v *ast.DoStmt) Plan {
exprs = append(exprs, expr)
}
dual.SetSchema(expression.NewSchema())
p := &Projection{
Exprs: exprs,
baseLogicalPlan: newBaseLogicalPlan(TypeProj, b.allocator),
}
p.initIDAndContext(b.ctx)
p := Projection{Exprs: exprs}.init(b.allocator, b.ctx)
addChild(p, dual)
p.self = p
p.SetSchema(expression.NewSchema())
@ -193,8 +186,6 @@ func (b *planBuilder) buildDo(v *ast.DoStmt) Plan {
func (b *planBuilder) buildSet(v *ast.SetStmt) Plan {
p := &Set{}
p.tp = TypeSet
p.allocator = b.allocator
for _, vars := range v.Variables {
assign := &expression.VarAssignment{
Name: vars.Name,
@ -217,7 +208,6 @@ func (b *planBuilder) buildSet(v *ast.SetStmt) Plan {
}
p.VarAssigns = append(p.VarAssigns, assign)
}
p.initIDAndContext(b.ctx)
p.SetSchema(expression.NewSchema())
return p
}
@ -321,12 +311,7 @@ func findIndexByName(indices []*model.IndexInfo, name model.CIStr) *model.IndexI
}
func (b *planBuilder) buildSelectLock(src Plan, lock ast.SelectLockType) *SelectLock {
selectLock := &SelectLock{
Lock: lock,
baseLogicalPlan: newBaseLogicalPlan(TypeLock, b.allocator),
}
selectLock.self = selectLock
selectLock.initIDAndContext(b.ctx)
selectLock := SelectLock{Lock: lock}.init(b.allocator, b.ctx)
addChild(selectLock, src)
selectLock.SetSchema(src.Schema())
return selectLock
@ -404,26 +389,18 @@ func getColumnOffsets(tn *ast.TableName) (indexOffsets []int, columnOffsets []in
}
func (b *planBuilder) buildAnalyze(as *ast.AnalyzeTableStmt) LogicalPlan {
p := &Analyze{
baseLogicalPlan: newBaseLogicalPlan(TypeAnalyze, b.allocator),
PkOffset: -1,
}
p := Analyze{PkOffset: -1}.init(b.allocator, b.ctx)
for _, tbl := range as.TableNames {
idxOffsets, colOffsets, pkOffset := getColumnOffsets(tbl)
result := &Analyze{
baseLogicalPlan: newBaseLogicalPlan(TypeAnalyze, b.allocator),
Table: tbl,
IdxOffsets: idxOffsets,
ColOffsets: colOffsets,
PkOffset: pkOffset,
}
result.self = result
result.initIDAndContext(b.ctx)
result := Analyze{
Table: tbl,
IdxOffsets: idxOffsets,
ColOffsets: colOffsets,
PkOffset: pkOffset,
}.init(b.allocator, b.ctx)
result.SetSchema(expression.TableInfo2Schema(tbl.TableInfo))
addChild(p, result)
}
p.self = p
p.initIDAndContext(b.ctx)
p.SetSchema(&expression.Schema{})
return p
}
@ -486,19 +463,16 @@ func splitWhere(where ast.ExprNode) []ast.ExprNode {
func (b *planBuilder) buildShow(show *ast.ShowStmt) Plan {
var resultPlan Plan
p := &Show{
Tp: show.Tp,
DBName: show.DBName,
Table: show.Table,
Column: show.Column,
Flag: show.Flag,
Full: show.Full,
User: show.User,
baseLogicalPlan: newBaseLogicalPlan(TypeShow, b.allocator),
}
p := Show{
Tp: show.Tp,
DBName: show.DBName,
Table: show.Table,
Column: show.Column,
Flag: show.Flag,
Full: show.Full,
User: show.User,
}.init(b.allocator, b.ctx)
resultPlan = p
p.initIDAndContext(b.ctx)
p.self = p
switch show.Tp {
case ast.ShowProcedureStatus:
p.SetSchema(buildShowProcedureSchema())
@ -535,12 +509,7 @@ func (b *planBuilder) buildShow(show *ast.ShowStmt) Plan {
}
}
if len(conditions) != 0 {
sel := &Selection{
baseLogicalPlan: newBaseLogicalPlan(TypeSel, b.allocator),
Conditions: conditions,
}
sel.initIDAndContext(b.ctx)
sel.self = sel
sel := Selection{Conditions: conditions}.init(b.allocator, b.ctx)
addChild(sel, p)
sel.SetSchema(p.Schema())
resultPlan = sel
@ -631,15 +600,14 @@ func (b *planBuilder) buildInsert(insert *ast.InsertStmt) Plan {
b.err = errors.Errorf("Can't get table %s.", tableInfo.Name.O)
return nil
}
insertPlan := &Insert{
Table: table,
Columns: insert.Columns,
tableSchema: schema,
IsReplace: insert.IsReplace,
Priority: insert.Priority,
Ignore: insert.Ignore,
baseLogicalPlan: newBaseLogicalPlan(TypeInsert, b.allocator),
}
insertPlan := Insert{
Table: table,
Columns: insert.Columns,
tableSchema: schema,
IsReplace: insert.IsReplace,
Priority: insert.Priority,
Ignore: insert.Ignore,
}.init(b.allocator, b.ctx)
b.visitInfo = append(b.visitInfo, visitInfo{
privilege: mysql.InsertPriv,
@ -696,7 +664,7 @@ func (b *planBuilder) buildInsert(insert *ast.InsertStmt) Plan {
Expr: expr,
})
}
mockTablePlan := &TableDual{}
mockTablePlan := TableDual{}.init(b.allocator, b.ctx)
mockTablePlan.SetSchema(schema)
for _, assign := range insert.OnDuplicate {
col, err := schema.FindColumn(assign.Column)
@ -718,8 +686,6 @@ func (b *planBuilder) buildInsert(insert *ast.InsertStmt) Plan {
Expr: expr,
})
}
insertPlan.initIDAndContext(b.ctx)
insertPlan.self = insertPlan
if insert.Select != nil {
selectPlan := b.build(insert.Select)
if b.err != nil {

View File

@ -108,14 +108,18 @@ func (ir *IndexRange) String() string {
// SelectLock represents a select lock plan.
type SelectLock struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
Lock ast.SelectLockType
}
// Limit represents offset and limit plan.
type Limit struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
Offset uint64
Count uint64
@ -147,7 +151,9 @@ type Deallocate struct {
// Show represents a show plan.
type Show struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
Tp ast.ShowStmtType // Databases/Tables/Columns/....
DBName string
@ -177,7 +183,9 @@ type Simple struct {
// Insert represents an insert plan.
type Insert struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
Table table.Table
tableSchema *expression.Schema
@ -193,7 +201,9 @@ type Insert struct {
// Analyze represents an analyze plan
type Analyze struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
Table *ast.TableName
IdxOffsets []int

View File

@ -27,11 +27,7 @@ func (s *ppdSolver) optimize(lp LogicalPlan, _ context.Context, _ *idAllocator)
func addSelection(p Plan, child LogicalPlan, conditions []expression.Expression, allocator *idAllocator) error {
conditions = expression.PropagateConstant(p.context(), conditions)
selection := &Selection{
Conditions: conditions,
baseLogicalPlan: newBaseLogicalPlan(TypeSel, allocator)}
selection.self = selection
selection.initIDAndContext(p.context())
selection := Selection{Conditions: conditions}.init(allocator, p.context())
selection.SetSchema(child.Schema().Clone())
return InsertPlan(p, child, selection)
}
@ -71,7 +67,7 @@ func (p *Join) PredicatePushDown(predicates []expression.Expression) (ret []expr
}
groups, valid := tryToGetJoinGroup(p)
if valid {
e := joinReOrderSolver{allocator: p.allocator}
e := joinReOrderSolver{allocator: p.allocator, ctx: p.ctx}
e.reorderJoin(groups, predicates)
newJoin := e.resultJoin
parent := p.parents[0]

View File

@ -184,12 +184,7 @@ func (sel *Selection) splitSelectionByIndexColumns(schema *expression.Schema) (i
indexSel = sel
}
if len(tableConds) != 0 {
tableSel = &Selection{
baseLogicalPlan: newBaseLogicalPlan(TypeSel, sel.allocator),
Conditions: tableConds,
}
tableSel.self = tableSel
tableSel.initIDAndContext(sel.ctx)
tableSel = Selection{Conditions: tableConds}.init(sel.allocator, sel.ctx)
}
return
}