diff --git a/plan/expression_rewriter.go b/plan/expression_rewriter.go index 893bc26bdc..93336fa1cf 100644 --- a/plan/expression_rewriter.go +++ b/plan/expression_rewriter.go @@ -445,7 +445,7 @@ func (er *expressionRewriter) handleExistSubquery(v *ast.ExistsSubqueryExpr) (as return v, true } np = er.b.buildExists(np) - if np.IsCorrelated() { + if len(np.extractCorrelatedCols()) > 0 { er.p = er.b.buildSemiApply(er.p, np.GetChildren()[0].(LogicalPlan), nil, er.asScalar, false) if !er.asScalar { return v, true @@ -524,7 +524,7 @@ func (er *expressionRewriter) handleScalarSubquery(v *ast.SubqueryExpr) (ast.Nod return v, true } np = er.b.buildMaxOneRow(np) - if np.IsCorrelated() { + if len(np.extractCorrelatedCols()) > 0 { er.p = er.b.buildInnerApply(er.p, np) if np.GetSchema().Len() > 1 { newCols := make([]expression.Expression, 0, np.GetSchema().Len()) diff --git a/plan/logical_plan_builder.go b/plan/logical_plan_builder.go index eb692b0861..fe08afe81b 100644 --- a/plan/logical_plan_builder.go +++ b/plan/logical_plan_builder.go @@ -94,7 +94,6 @@ func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega agg.GroupByItems = gbyItems agg.SetSchema(schema) agg.collectGroupByColumns() - agg.SetCorrelated() return agg, aggIndexMap } @@ -227,7 +226,6 @@ func (b *planBuilder) buildJoin(join *ast.Join) LogicalPlan { } else { joinPlan.JoinType = InnerJoin } - joinPlan.SetCorrelated() return joinPlan } @@ -255,7 +253,6 @@ func (b *planBuilder) buildSelection(p LogicalPlan, where ast.ExprNode, AggMappe selection.Conditions = expressions selection.SetSchema(p.GetSchema().Clone()) addChild(selection, p) - selection.SetCorrelated() return selection } @@ -317,7 +314,6 @@ func (b *planBuilder) buildProjection(p LogicalPlan, fields []*ast.SelectField, } proj.SetSchema(schema) addChild(proj, p) - proj.SetCorrelated() return proj, oldLen } @@ -334,7 +330,6 @@ func (b *planBuilder) buildDistinct(child LogicalPlan, length int) LogicalPlan { agg.initIDAndContext(b.ctx) addChild(agg, child) agg.SetSchema(child.GetSchema().Clone()) - agg.SetCorrelated() return agg } @@ -380,7 +375,6 @@ func (b *planBuilder) buildUnion(union *ast.UnionStmt) LogicalPlan { } u.SetSchema(firstSchema) - u.SetCorrelated() var p LogicalPlan p = u if union.Distinct { @@ -423,7 +417,6 @@ func (b *planBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper sort.ByItems = exprs addChild(sort, p) sort.SetSchema(p.GetSchema().Clone()) - sort.SetCorrelated() return sort } @@ -470,7 +463,6 @@ func (b *planBuilder) buildLimit(src LogicalPlan, limit *ast.Limit) LogicalPlan li.initIDAndContext(b.ctx) addChild(li, src) li.SetSchema(src.GetSchema().Clone()) - li.SetCorrelated() return li } @@ -900,7 +892,6 @@ func (b *planBuilder) buildTrim(p LogicalPlan, len int) LogicalPlan { addChild(trim, p) schema := expression.NewSchema(p.GetSchema().Clone().Columns[:len]...) trim.SetSchema(schema) - trim.SetCorrelated() return trim } @@ -984,7 +975,6 @@ func (b *planBuilder) buildInnerApply(outerPlan, innerPlan LogicalPlan) LogicalP for i := outerPlan.GetSchema().Len(); i < ap.GetSchema().Len(); i++ { ap.schema.Columns[i].IsAggOrSubq = true } - ap.SetCorrelated() return ap } @@ -1021,7 +1011,6 @@ out: RetType: types.NewFieldType(mysql.TypeTiny), ColName: model.NewCIStr("exists_col")} exists.SetSchema(expression.NewSchema(newCol)) - exists.SetCorrelated() return exists } @@ -1031,7 +1020,6 @@ func (b *planBuilder) buildMaxOneRow(p LogicalPlan) LogicalPlan { maxOneRow.initIDAndContext(b.ctx) addChild(maxOneRow, p) maxOneRow.SetSchema(p.GetSchema().Clone()) - maxOneRow.SetCorrelated() return maxOneRow } @@ -1061,7 +1049,6 @@ func (b *planBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onConditio joinPlan.JoinType = SemiJoin } joinPlan.anti = not - joinPlan.SetCorrelated() return joinPlan } diff --git a/plan/logical_plan_test.go b/plan/logical_plan_test.go index 56e24abda6..03812d656e 100644 --- a/plan/logical_plan_test.go +++ b/plan/logical_plan_test.go @@ -564,6 +564,11 @@ func (s *testPlanSuite) TestPlanBuilder(c *C) { sql: "select * from t where exists (select s.a from t s having sum(s.a) = t.a )", plan: "Join{DataScan(t)->DataScan(s)->Aggr(sum(s.a))->Projection}(test.t.a,sel_agg_1)->Projection", }, + { + // Test Nested sub query. + sql: "select * from t where exists (select s.a from t s where s.c in (select c from t as k where k.d = s.d) having sum(s.a) = t.a )", + plan: "Join{DataScan(t)->Apply{DataScan(s)->DataScan(k)->Selection->Projection}->Aggr(sum(s.a))->Projection}(test.t.a,sel_agg_1)->Projection", + }, { sql: "select * from t for update", plan: "DataScan(t)->Lock->Projection", diff --git a/plan/logical_plans.go b/plan/logical_plans.go index 850b17075c..1ffdc18961 100644 --- a/plan/logical_plans.go +++ b/plan/logical_plans.go @@ -82,23 +82,6 @@ func (p *Join) extractCorrelatedCols() []*expression.CorrelatedColumn { return corCols } -// SetCorrelated implements Plan interface. -func (p *Join) SetCorrelated() { - p.basePlan.SetCorrelated() - for _, cond := range p.EqualConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } - for _, cond := range p.LeftConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } - for _, cond := range p.RightConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } - for _, cond := range p.OtherConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } -} - // Projection represents a select fields plan. type Projection struct { baseLogicalPlan @@ -113,14 +96,6 @@ func (p *Projection) extractCorrelatedCols() []*expression.CorrelatedColumn { return corCols } -// SetCorrelated implements Plan interface. -func (p *Projection) SetCorrelated() { - p.basePlan.SetCorrelated() - for _, expr := range p.Exprs { - p.correlated = p.correlated || expr.IsCorrelated() - } -} - // Aggregation represents an aggregate plan. type Aggregation struct { baseLogicalPlan @@ -145,19 +120,6 @@ func (p *Aggregation) extractCorrelatedCols() []*expression.CorrelatedColumn { return corCols } -// SetCorrelated implements Plan interface. -func (p *Aggregation) SetCorrelated() { - p.basePlan.SetCorrelated() - for _, item := range p.GroupByItems { - p.correlated = p.correlated || item.IsCorrelated() - } - for _, fun := range p.AggFuncs { - for _, arg := range fun.GetArgs() { - p.correlated = p.correlated || arg.IsCorrelated() - } - } -} - // Selection means a filter. type Selection struct { baseLogicalPlan @@ -179,14 +141,6 @@ func (p *Selection) extractCorrelatedCols() []*expression.CorrelatedColumn { return corCols } -// SetCorrelated implements Plan interface. -func (p *Selection) SetCorrelated() { - p.basePlan.SetCorrelated() - for _, cond := range p.Conditions { - p.correlated = p.correlated || cond.IsCorrelated() - } -} - // Apply gets one row from outer executor and gets one row from inner executor according to outer row. type Apply struct { Join @@ -194,17 +148,14 @@ type Apply struct { corCols []*expression.CorrelatedColumn } -// SetCorrelated implements Plan interface. -func (p *Apply) SetCorrelated() { - corCols := p.GetChildren()[1].extractCorrelatedCols() - p.correlated = p.GetChildren()[0].IsCorrelated() - for _, corCol := range corCols { - // If the outer column can't be resolved from this outer schema, it should be resolved by outer schema. - if idx := p.GetChildren()[0].GetSchema().GetColumnIndex(&corCol.Column); idx == -1 { - p.correlated = true - break +func (p *Apply) extractCorrelatedCols() []*expression.CorrelatedColumn { + corCols := p.Join.extractCorrelatedCols() + for i := len(corCols) - 1; i >= 0; i-- { + if idx := p.GetChildren()[0].GetSchema().GetColumnIndex(&corCols[i].Column); idx != -1 { + corCols = append(corCols[:i], corCols[i+1:]...) } } + return corCols } // Exists checks if a query returns result. @@ -264,14 +215,6 @@ func (p *Sort) extractCorrelatedCols() []*expression.CorrelatedColumn { return corCols } -// SetCorrelated implements Plan interface. -func (p *Sort) SetCorrelated() { - p.basePlan.SetCorrelated() - for _, it := range p.ByItems { - p.correlated = p.correlated || it.Expr.IsCorrelated() - } -} - // Update represents Update plan. type Update struct { baseLogicalPlan diff --git a/plan/optimizer.go b/plan/optimizer.go index 87e6980f2e..42187992b5 100644 --- a/plan/optimizer.go +++ b/plan/optimizer.go @@ -82,7 +82,6 @@ func doOptimize(logic LogicalPlan, ctx context.Context, allocator *idAllocator) } pp := info.p pp = EliminateProjection(pp) - physicalInitialize(pp) addCachePlan(pp, allocator) log.Debugf("[PLAN] %s", ToString(pp)) return pp, nil diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index 026a5dde09..357713874e 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -1020,25 +1020,16 @@ func (p *Apply) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, return info, nil } -// physicalInitialize will set value of some attributes after convert2PhysicalPlan process. -// Currently, only attribute "correlated" is considered. -func physicalInitialize(p PhysicalPlan) { - for _, child := range p.GetChildren() { - physicalInitialize(child.(PhysicalPlan)) - } - // initialize attributes - p.SetCorrelated() -} - // addCachePlan will add a Cache plan above the plan whose father's IsCorrelated() is true but its own IsCorrelated() is false. -func addCachePlan(p PhysicalPlan, allocator *idAllocator) { +func addCachePlan(p PhysicalPlan, allocator *idAllocator) []*expression.CorrelatedColumn { if len(p.GetChildren()) == 0 { - return + return nil } + selfCorCols := p.extractCorrelatedCols() newChildren := make([]Plan, 0, len(p.GetChildren())) for _, child := range p.GetChildren() { - addCachePlan(child.(PhysicalPlan), allocator) - if p.IsCorrelated() && !child.IsCorrelated() { + childCorCols := addCachePlan(child.(PhysicalPlan), allocator) + if len(selfCorCols) > 0 && len(childCorCols) == 0 { newChild := &Cache{} newChild.tp = "Cache" newChild.allocator = allocator @@ -1054,4 +1045,5 @@ func addCachePlan(p PhysicalPlan, allocator *idAllocator) { } } p.SetChildren(newChildren...) + return selfCorCols } diff --git a/plan/physical_plan_test.go b/plan/physical_plan_test.go index 49fa01b3a8..5e54c6df23 100644 --- a/plan/physical_plan_test.go +++ b/plan/physical_plan_test.go @@ -768,7 +768,7 @@ func (s *testPlanSuite) TestFilterConditionPushDown(c *C) { } } -func (s *testPlanSuite) TestPhysicalInitialize(c *C) { +func (s *testPlanSuite) TestAddCache(c *C) { defer testleak.AfterTest(c)() cases := []struct { sql string @@ -804,7 +804,6 @@ func (s *testPlanSuite) TestPhysicalInitialize(c *C) { info, err := lp.convert2PhysicalPlan(&requiredProperty{}) pp := info.p pp = EliminateProjection(pp) - physicalInitialize(pp) addCachePlan(pp, builder.allocator) c.Assert(ToString(pp), Equals, ca.ans, Commentf("for %s", ca.sql)) } diff --git a/plan/physical_plans.go b/plan/physical_plans.go index 6cf39f2298..eb10fa0000 100644 --- a/plan/physical_plans.go +++ b/plan/physical_plans.go @@ -453,6 +453,12 @@ func (p *PhysicalHashSemiJoin) extractCorrelatedCols() []*expression.CorrelatedC func (p *PhysicalApply) extractCorrelatedCols() []*expression.CorrelatedColumn { corCols := p.basePlan.extractCorrelatedCols() corCols = append(corCols, p.PhysicalJoin.extractCorrelatedCols()...) + for i := len(corCols) - 1; i >= 0; i-- { + idx := p.PhysicalJoin.GetChildren()[0].GetSchema().GetColumnIndex(&corCols[i].Column) + if idx != -1 { + corCols = append(corCols[:i], corCols[i+1:]...) + } + } return corCols } @@ -538,18 +544,6 @@ func (p *PhysicalApply) MarshalJSON() ([]byte, error) { return buffer.Bytes(), nil } -// SetCorrelated implements Plan interface. -func (p *PhysicalApply) SetCorrelated() { - corColumns := p.GetChildren()[1].extractCorrelatedCols() - p.correlated = p.GetChildren()[0].IsCorrelated() - for _, corCol := range corColumns { - if idx := p.GetChildren()[0].GetSchema().GetColumnIndex(&corCol.Column); idx == -1 { - p.correlated = true - break - } - } -} - // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalHashSemiJoin) Copy() PhysicalPlan { np := *p @@ -591,23 +585,6 @@ func (p *PhysicalHashSemiJoin) MarshalJSON() ([]byte, error) { return buffer.Bytes(), nil } -// SetCorrelated implements Plan interface. -func (p *PhysicalHashSemiJoin) SetCorrelated() { - p.basePlan.SetCorrelated() - for _, cond := range p.EqualConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } - for _, cond := range p.LeftConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } - for _, cond := range p.RightConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } - for _, cond := range p.OtherConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } -} - // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalHashJoin) Copy() PhysicalPlan { np := *p @@ -647,23 +624,6 @@ func (p *PhysicalHashJoin) MarshalJSON() ([]byte, error) { return buffer.Bytes(), nil } -// SetCorrelated implements Plan interface. -func (p *PhysicalHashJoin) SetCorrelated() { - p.basePlan.SetCorrelated() - for _, cond := range p.EqualConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } - for _, cond := range p.LeftConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } - for _, cond := range p.RightConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } - for _, cond := range p.OtherConditions { - p.correlated = p.correlated || cond.IsCorrelated() - } -} - // Copy implements the PhysicalPlan Copy interface. func (p *Selection) Copy() PhysicalPlan { np := *p @@ -812,19 +772,6 @@ func (p *PhysicalAggregation) MarshalJSON() ([]byte, error) { return buffer.Bytes(), nil } -// SetCorrelated implements Plan interface. -func (p *PhysicalAggregation) SetCorrelated() { - p.basePlan.SetCorrelated() - for _, item := range p.GroupByItems { - p.correlated = p.correlated || item.IsCorrelated() - } - for _, fun := range p.AggFuncs { - for _, arg := range fun.GetArgs() { - p.correlated = p.correlated || arg.IsCorrelated() - } - } -} - // Copy implements the PhysicalPlan Copy interface. func (p *Update) Copy() PhysicalPlan { np := *p diff --git a/plan/plan.go b/plan/plan.go index 06edb269c7..f0ea22a553 100644 --- a/plan/plan.go +++ b/plan/plan.go @@ -96,13 +96,6 @@ type Plan interface { GetSchema() *expression.Schema // Get the ID. GetID() string - // Check whether this plan is correlated or not. - IsCorrelated() bool - // Set the value of attribute "correlated". - // A plan will be correlated if one of its expressions or its child plans is correlated, except Apply. - // As for Apply, it will be correlated if the outer plan is correlated or the inner plan has column that the outer doesn't has. - // It will be called in the final step of logical plan building and the PhysicalInitialize process after convert2PhysicalPlan process. - SetCorrelated() // SetParents sets the parents for the plan. SetParents(...Plan) // SetParents sets the children for the plan. @@ -315,8 +308,6 @@ func (p *basePlan) initIDAndContext(ctx context.Context) { // basePlan implements base Plan interface. // Should be used as embedded struct in Plan implementations. type basePlan struct { - correlated bool - parents []Plan children []Plan @@ -343,17 +334,6 @@ func (p *basePlan) MarshalJSON() ([]byte, error) { return buffer.Bytes(), nil } -// IsCorrelated implements Plan IsCorrelated interface. -func (p *basePlan) IsCorrelated() bool { - return p.correlated -} - -func (p *basePlan) SetCorrelated() { - for _, child := range p.children { - p.correlated = p.correlated || child.IsCorrelated() - } -} - // GetID implements Plan GetID interface. func (p *basePlan) GetID() string { return p.id