plan: remove IsCorrelated and SetCorrelated. (#2543)
This commit is contained in:
@ -445,7 +445,7 @@ func (er *expressionRewriter) handleExistSubquery(v *ast.ExistsSubqueryExpr) (as
|
||||
return v, true
|
||||
}
|
||||
np = er.b.buildExists(np)
|
||||
if np.IsCorrelated() {
|
||||
if len(np.extractCorrelatedCols()) > 0 {
|
||||
er.p = er.b.buildSemiApply(er.p, np.GetChildren()[0].(LogicalPlan), nil, er.asScalar, false)
|
||||
if !er.asScalar {
|
||||
return v, true
|
||||
@ -524,7 +524,7 @@ func (er *expressionRewriter) handleScalarSubquery(v *ast.SubqueryExpr) (ast.Nod
|
||||
return v, true
|
||||
}
|
||||
np = er.b.buildMaxOneRow(np)
|
||||
if np.IsCorrelated() {
|
||||
if len(np.extractCorrelatedCols()) > 0 {
|
||||
er.p = er.b.buildInnerApply(er.p, np)
|
||||
if np.GetSchema().Len() > 1 {
|
||||
newCols := make([]expression.Expression, 0, np.GetSchema().Len())
|
||||
|
||||
@ -94,7 +94,6 @@ func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega
|
||||
agg.GroupByItems = gbyItems
|
||||
agg.SetSchema(schema)
|
||||
agg.collectGroupByColumns()
|
||||
agg.SetCorrelated()
|
||||
return agg, aggIndexMap
|
||||
}
|
||||
|
||||
@ -227,7 +226,6 @@ func (b *planBuilder) buildJoin(join *ast.Join) LogicalPlan {
|
||||
} else {
|
||||
joinPlan.JoinType = InnerJoin
|
||||
}
|
||||
joinPlan.SetCorrelated()
|
||||
return joinPlan
|
||||
}
|
||||
|
||||
@ -255,7 +253,6 @@ func (b *planBuilder) buildSelection(p LogicalPlan, where ast.ExprNode, AggMappe
|
||||
selection.Conditions = expressions
|
||||
selection.SetSchema(p.GetSchema().Clone())
|
||||
addChild(selection, p)
|
||||
selection.SetCorrelated()
|
||||
return selection
|
||||
}
|
||||
|
||||
@ -317,7 +314,6 @@ func (b *planBuilder) buildProjection(p LogicalPlan, fields []*ast.SelectField,
|
||||
}
|
||||
proj.SetSchema(schema)
|
||||
addChild(proj, p)
|
||||
proj.SetCorrelated()
|
||||
return proj, oldLen
|
||||
}
|
||||
|
||||
@ -334,7 +330,6 @@ func (b *planBuilder) buildDistinct(child LogicalPlan, length int) LogicalPlan {
|
||||
agg.initIDAndContext(b.ctx)
|
||||
addChild(agg, child)
|
||||
agg.SetSchema(child.GetSchema().Clone())
|
||||
agg.SetCorrelated()
|
||||
return agg
|
||||
}
|
||||
|
||||
@ -380,7 +375,6 @@ func (b *planBuilder) buildUnion(union *ast.UnionStmt) LogicalPlan {
|
||||
}
|
||||
|
||||
u.SetSchema(firstSchema)
|
||||
u.SetCorrelated()
|
||||
var p LogicalPlan
|
||||
p = u
|
||||
if union.Distinct {
|
||||
@ -423,7 +417,6 @@ func (b *planBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper
|
||||
sort.ByItems = exprs
|
||||
addChild(sort, p)
|
||||
sort.SetSchema(p.GetSchema().Clone())
|
||||
sort.SetCorrelated()
|
||||
return sort
|
||||
}
|
||||
|
||||
@ -470,7 +463,6 @@ func (b *planBuilder) buildLimit(src LogicalPlan, limit *ast.Limit) LogicalPlan
|
||||
li.initIDAndContext(b.ctx)
|
||||
addChild(li, src)
|
||||
li.SetSchema(src.GetSchema().Clone())
|
||||
li.SetCorrelated()
|
||||
return li
|
||||
}
|
||||
|
||||
@ -900,7 +892,6 @@ func (b *planBuilder) buildTrim(p LogicalPlan, len int) LogicalPlan {
|
||||
addChild(trim, p)
|
||||
schema := expression.NewSchema(p.GetSchema().Clone().Columns[:len]...)
|
||||
trim.SetSchema(schema)
|
||||
trim.SetCorrelated()
|
||||
return trim
|
||||
}
|
||||
|
||||
@ -984,7 +975,6 @@ func (b *planBuilder) buildInnerApply(outerPlan, innerPlan LogicalPlan) LogicalP
|
||||
for i := outerPlan.GetSchema().Len(); i < ap.GetSchema().Len(); i++ {
|
||||
ap.schema.Columns[i].IsAggOrSubq = true
|
||||
}
|
||||
ap.SetCorrelated()
|
||||
return ap
|
||||
}
|
||||
|
||||
@ -1021,7 +1011,6 @@ out:
|
||||
RetType: types.NewFieldType(mysql.TypeTiny),
|
||||
ColName: model.NewCIStr("exists_col")}
|
||||
exists.SetSchema(expression.NewSchema(newCol))
|
||||
exists.SetCorrelated()
|
||||
return exists
|
||||
}
|
||||
|
||||
@ -1031,7 +1020,6 @@ func (b *planBuilder) buildMaxOneRow(p LogicalPlan) LogicalPlan {
|
||||
maxOneRow.initIDAndContext(b.ctx)
|
||||
addChild(maxOneRow, p)
|
||||
maxOneRow.SetSchema(p.GetSchema().Clone())
|
||||
maxOneRow.SetCorrelated()
|
||||
return maxOneRow
|
||||
}
|
||||
|
||||
@ -1061,7 +1049,6 @@ func (b *planBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onConditio
|
||||
joinPlan.JoinType = SemiJoin
|
||||
}
|
||||
joinPlan.anti = not
|
||||
joinPlan.SetCorrelated()
|
||||
return joinPlan
|
||||
}
|
||||
|
||||
|
||||
@ -564,6 +564,11 @@ func (s *testPlanSuite) TestPlanBuilder(c *C) {
|
||||
sql: "select * from t where exists (select s.a from t s having sum(s.a) = t.a )",
|
||||
plan: "Join{DataScan(t)->DataScan(s)->Aggr(sum(s.a))->Projection}(test.t.a,sel_agg_1)->Projection",
|
||||
},
|
||||
{
|
||||
// Test Nested sub query.
|
||||
sql: "select * from t where exists (select s.a from t s where s.c in (select c from t as k where k.d = s.d) having sum(s.a) = t.a )",
|
||||
plan: "Join{DataScan(t)->Apply{DataScan(s)->DataScan(k)->Selection->Projection}->Aggr(sum(s.a))->Projection}(test.t.a,sel_agg_1)->Projection",
|
||||
},
|
||||
{
|
||||
sql: "select * from t for update",
|
||||
plan: "DataScan(t)->Lock->Projection",
|
||||
|
||||
@ -82,23 +82,6 @@ func (p *Join) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
return corCols
|
||||
}
|
||||
|
||||
// SetCorrelated implements Plan interface.
|
||||
func (p *Join) SetCorrelated() {
|
||||
p.basePlan.SetCorrelated()
|
||||
for _, cond := range p.EqualConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
for _, cond := range p.LeftConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
for _, cond := range p.RightConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
for _, cond := range p.OtherConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
}
|
||||
|
||||
// Projection represents a select fields plan.
|
||||
type Projection struct {
|
||||
baseLogicalPlan
|
||||
@ -113,14 +96,6 @@ func (p *Projection) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
return corCols
|
||||
}
|
||||
|
||||
// SetCorrelated implements Plan interface.
|
||||
func (p *Projection) SetCorrelated() {
|
||||
p.basePlan.SetCorrelated()
|
||||
for _, expr := range p.Exprs {
|
||||
p.correlated = p.correlated || expr.IsCorrelated()
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregation represents an aggregate plan.
|
||||
type Aggregation struct {
|
||||
baseLogicalPlan
|
||||
@ -145,19 +120,6 @@ func (p *Aggregation) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
return corCols
|
||||
}
|
||||
|
||||
// SetCorrelated implements Plan interface.
|
||||
func (p *Aggregation) SetCorrelated() {
|
||||
p.basePlan.SetCorrelated()
|
||||
for _, item := range p.GroupByItems {
|
||||
p.correlated = p.correlated || item.IsCorrelated()
|
||||
}
|
||||
for _, fun := range p.AggFuncs {
|
||||
for _, arg := range fun.GetArgs() {
|
||||
p.correlated = p.correlated || arg.IsCorrelated()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Selection means a filter.
|
||||
type Selection struct {
|
||||
baseLogicalPlan
|
||||
@ -179,14 +141,6 @@ func (p *Selection) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
return corCols
|
||||
}
|
||||
|
||||
// SetCorrelated implements Plan interface.
|
||||
func (p *Selection) SetCorrelated() {
|
||||
p.basePlan.SetCorrelated()
|
||||
for _, cond := range p.Conditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
}
|
||||
|
||||
// Apply gets one row from outer executor and gets one row from inner executor according to outer row.
|
||||
type Apply struct {
|
||||
Join
|
||||
@ -194,17 +148,14 @@ type Apply struct {
|
||||
corCols []*expression.CorrelatedColumn
|
||||
}
|
||||
|
||||
// SetCorrelated implements Plan interface.
|
||||
func (p *Apply) SetCorrelated() {
|
||||
corCols := p.GetChildren()[1].extractCorrelatedCols()
|
||||
p.correlated = p.GetChildren()[0].IsCorrelated()
|
||||
for _, corCol := range corCols {
|
||||
// If the outer column can't be resolved from this outer schema, it should be resolved by outer schema.
|
||||
if idx := p.GetChildren()[0].GetSchema().GetColumnIndex(&corCol.Column); idx == -1 {
|
||||
p.correlated = true
|
||||
break
|
||||
func (p *Apply) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := p.Join.extractCorrelatedCols()
|
||||
for i := len(corCols) - 1; i >= 0; i-- {
|
||||
if idx := p.GetChildren()[0].GetSchema().GetColumnIndex(&corCols[i].Column); idx != -1 {
|
||||
corCols = append(corCols[:i], corCols[i+1:]...)
|
||||
}
|
||||
}
|
||||
return corCols
|
||||
}
|
||||
|
||||
// Exists checks if a query returns result.
|
||||
@ -264,14 +215,6 @@ func (p *Sort) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
return corCols
|
||||
}
|
||||
|
||||
// SetCorrelated implements Plan interface.
|
||||
func (p *Sort) SetCorrelated() {
|
||||
p.basePlan.SetCorrelated()
|
||||
for _, it := range p.ByItems {
|
||||
p.correlated = p.correlated || it.Expr.IsCorrelated()
|
||||
}
|
||||
}
|
||||
|
||||
// Update represents Update plan.
|
||||
type Update struct {
|
||||
baseLogicalPlan
|
||||
|
||||
@ -82,7 +82,6 @@ func doOptimize(logic LogicalPlan, ctx context.Context, allocator *idAllocator)
|
||||
}
|
||||
pp := info.p
|
||||
pp = EliminateProjection(pp)
|
||||
physicalInitialize(pp)
|
||||
addCachePlan(pp, allocator)
|
||||
log.Debugf("[PLAN] %s", ToString(pp))
|
||||
return pp, nil
|
||||
|
||||
@ -1020,25 +1020,16 @@ func (p *Apply) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo,
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// physicalInitialize will set value of some attributes after convert2PhysicalPlan process.
|
||||
// Currently, only attribute "correlated" is considered.
|
||||
func physicalInitialize(p PhysicalPlan) {
|
||||
for _, child := range p.GetChildren() {
|
||||
physicalInitialize(child.(PhysicalPlan))
|
||||
}
|
||||
// initialize attributes
|
||||
p.SetCorrelated()
|
||||
}
|
||||
|
||||
// addCachePlan will add a Cache plan above the plan whose father's IsCorrelated() is true but its own IsCorrelated() is false.
|
||||
func addCachePlan(p PhysicalPlan, allocator *idAllocator) {
|
||||
func addCachePlan(p PhysicalPlan, allocator *idAllocator) []*expression.CorrelatedColumn {
|
||||
if len(p.GetChildren()) == 0 {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
selfCorCols := p.extractCorrelatedCols()
|
||||
newChildren := make([]Plan, 0, len(p.GetChildren()))
|
||||
for _, child := range p.GetChildren() {
|
||||
addCachePlan(child.(PhysicalPlan), allocator)
|
||||
if p.IsCorrelated() && !child.IsCorrelated() {
|
||||
childCorCols := addCachePlan(child.(PhysicalPlan), allocator)
|
||||
if len(selfCorCols) > 0 && len(childCorCols) == 0 {
|
||||
newChild := &Cache{}
|
||||
newChild.tp = "Cache"
|
||||
newChild.allocator = allocator
|
||||
@ -1054,4 +1045,5 @@ func addCachePlan(p PhysicalPlan, allocator *idAllocator) {
|
||||
}
|
||||
}
|
||||
p.SetChildren(newChildren...)
|
||||
return selfCorCols
|
||||
}
|
||||
|
||||
@ -768,7 +768,7 @@ func (s *testPlanSuite) TestFilterConditionPushDown(c *C) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testPlanSuite) TestPhysicalInitialize(c *C) {
|
||||
func (s *testPlanSuite) TestAddCache(c *C) {
|
||||
defer testleak.AfterTest(c)()
|
||||
cases := []struct {
|
||||
sql string
|
||||
@ -804,7 +804,6 @@ func (s *testPlanSuite) TestPhysicalInitialize(c *C) {
|
||||
info, err := lp.convert2PhysicalPlan(&requiredProperty{})
|
||||
pp := info.p
|
||||
pp = EliminateProjection(pp)
|
||||
physicalInitialize(pp)
|
||||
addCachePlan(pp, builder.allocator)
|
||||
c.Assert(ToString(pp), Equals, ca.ans, Commentf("for %s", ca.sql))
|
||||
}
|
||||
|
||||
@ -453,6 +453,12 @@ func (p *PhysicalHashSemiJoin) extractCorrelatedCols() []*expression.CorrelatedC
|
||||
func (p *PhysicalApply) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := p.basePlan.extractCorrelatedCols()
|
||||
corCols = append(corCols, p.PhysicalJoin.extractCorrelatedCols()...)
|
||||
for i := len(corCols) - 1; i >= 0; i-- {
|
||||
idx := p.PhysicalJoin.GetChildren()[0].GetSchema().GetColumnIndex(&corCols[i].Column)
|
||||
if idx != -1 {
|
||||
corCols = append(corCols[:i], corCols[i+1:]...)
|
||||
}
|
||||
}
|
||||
return corCols
|
||||
}
|
||||
|
||||
@ -538,18 +544,6 @@ func (p *PhysicalApply) MarshalJSON() ([]byte, error) {
|
||||
return buffer.Bytes(), nil
|
||||
}
|
||||
|
||||
// SetCorrelated implements Plan interface.
|
||||
func (p *PhysicalApply) SetCorrelated() {
|
||||
corColumns := p.GetChildren()[1].extractCorrelatedCols()
|
||||
p.correlated = p.GetChildren()[0].IsCorrelated()
|
||||
for _, corCol := range corColumns {
|
||||
if idx := p.GetChildren()[0].GetSchema().GetColumnIndex(&corCol.Column); idx == -1 {
|
||||
p.correlated = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Copy implements the PhysicalPlan Copy interface.
|
||||
func (p *PhysicalHashSemiJoin) Copy() PhysicalPlan {
|
||||
np := *p
|
||||
@ -591,23 +585,6 @@ func (p *PhysicalHashSemiJoin) MarshalJSON() ([]byte, error) {
|
||||
return buffer.Bytes(), nil
|
||||
}
|
||||
|
||||
// SetCorrelated implements Plan interface.
|
||||
func (p *PhysicalHashSemiJoin) SetCorrelated() {
|
||||
p.basePlan.SetCorrelated()
|
||||
for _, cond := range p.EqualConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
for _, cond := range p.LeftConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
for _, cond := range p.RightConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
for _, cond := range p.OtherConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
}
|
||||
|
||||
// Copy implements the PhysicalPlan Copy interface.
|
||||
func (p *PhysicalHashJoin) Copy() PhysicalPlan {
|
||||
np := *p
|
||||
@ -647,23 +624,6 @@ func (p *PhysicalHashJoin) MarshalJSON() ([]byte, error) {
|
||||
return buffer.Bytes(), nil
|
||||
}
|
||||
|
||||
// SetCorrelated implements Plan interface.
|
||||
func (p *PhysicalHashJoin) SetCorrelated() {
|
||||
p.basePlan.SetCorrelated()
|
||||
for _, cond := range p.EqualConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
for _, cond := range p.LeftConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
for _, cond := range p.RightConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
for _, cond := range p.OtherConditions {
|
||||
p.correlated = p.correlated || cond.IsCorrelated()
|
||||
}
|
||||
}
|
||||
|
||||
// Copy implements the PhysicalPlan Copy interface.
|
||||
func (p *Selection) Copy() PhysicalPlan {
|
||||
np := *p
|
||||
@ -812,19 +772,6 @@ func (p *PhysicalAggregation) MarshalJSON() ([]byte, error) {
|
||||
return buffer.Bytes(), nil
|
||||
}
|
||||
|
||||
// SetCorrelated implements Plan interface.
|
||||
func (p *PhysicalAggregation) SetCorrelated() {
|
||||
p.basePlan.SetCorrelated()
|
||||
for _, item := range p.GroupByItems {
|
||||
p.correlated = p.correlated || item.IsCorrelated()
|
||||
}
|
||||
for _, fun := range p.AggFuncs {
|
||||
for _, arg := range fun.GetArgs() {
|
||||
p.correlated = p.correlated || arg.IsCorrelated()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Copy implements the PhysicalPlan Copy interface.
|
||||
func (p *Update) Copy() PhysicalPlan {
|
||||
np := *p
|
||||
|
||||
20
plan/plan.go
20
plan/plan.go
@ -96,13 +96,6 @@ type Plan interface {
|
||||
GetSchema() *expression.Schema
|
||||
// Get the ID.
|
||||
GetID() string
|
||||
// Check whether this plan is correlated or not.
|
||||
IsCorrelated() bool
|
||||
// Set the value of attribute "correlated".
|
||||
// A plan will be correlated if one of its expressions or its child plans is correlated, except Apply.
|
||||
// As for Apply, it will be correlated if the outer plan is correlated or the inner plan has column that the outer doesn't has.
|
||||
// It will be called in the final step of logical plan building and the PhysicalInitialize process after convert2PhysicalPlan process.
|
||||
SetCorrelated()
|
||||
// SetParents sets the parents for the plan.
|
||||
SetParents(...Plan)
|
||||
// SetParents sets the children for the plan.
|
||||
@ -315,8 +308,6 @@ func (p *basePlan) initIDAndContext(ctx context.Context) {
|
||||
// basePlan implements base Plan interface.
|
||||
// Should be used as embedded struct in Plan implementations.
|
||||
type basePlan struct {
|
||||
correlated bool
|
||||
|
||||
parents []Plan
|
||||
children []Plan
|
||||
|
||||
@ -343,17 +334,6 @@ func (p *basePlan) MarshalJSON() ([]byte, error) {
|
||||
return buffer.Bytes(), nil
|
||||
}
|
||||
|
||||
// IsCorrelated implements Plan IsCorrelated interface.
|
||||
func (p *basePlan) IsCorrelated() bool {
|
||||
return p.correlated
|
||||
}
|
||||
|
||||
func (p *basePlan) SetCorrelated() {
|
||||
for _, child := range p.children {
|
||||
p.correlated = p.correlated || child.IsCorrelated()
|
||||
}
|
||||
}
|
||||
|
||||
// GetID implements Plan GetID interface.
|
||||
func (p *basePlan) GetID() string {
|
||||
return p.id
|
||||
|
||||
Reference in New Issue
Block a user