plan, executor: clean code. (#4994)
This commit is contained in:
@ -484,7 +484,7 @@ func (b *executorBuilder) buildMergeJoin(v *plan.PhysicalMergeJoin) Executor {
|
||||
joinType: v.JoinType,
|
||||
defaultValues: v.DefaultValues,
|
||||
}
|
||||
exec, err := joinBuilder.BuildMergeJoin(v.Desc)
|
||||
exec, err := joinBuilder.BuildMergeJoin()
|
||||
if err != nil {
|
||||
b.err = err
|
||||
return nil
|
||||
|
||||
@ -35,7 +35,7 @@ type joinBuilder struct {
|
||||
defaultValues []types.Datum
|
||||
}
|
||||
|
||||
func (b *joinBuilder) BuildMergeJoin(assumeSortedDesc bool) (*MergeJoinExec, error) {
|
||||
func (b *joinBuilder) BuildMergeJoin() (*MergeJoinExec, error) {
|
||||
var leftJoinKeys, rightJoinKeys []*expression.Column
|
||||
for _, eqCond := range b.eqConditions {
|
||||
if len(eqCond.GetArgs()) != 2 {
|
||||
@ -74,7 +74,6 @@ func (b *joinBuilder) BuildMergeJoin(assumeSortedDesc bool) (*MergeJoinExec, err
|
||||
outerIter: leftRowBlock,
|
||||
innerIter: rightRowBlock,
|
||||
schema: b.schema,
|
||||
desc: assumeSortedDesc,
|
||||
resultGenerator: newJoinResultGenerator(b.context, b.joinType, b.defaultValues, b.otherFilter),
|
||||
}
|
||||
|
||||
|
||||
@ -33,7 +33,6 @@ type MergeJoinExec struct {
|
||||
stmtCtx *variable.StatementContext
|
||||
schema *expression.Schema
|
||||
prepared bool
|
||||
desc bool
|
||||
|
||||
outerKeys []*expression.Column
|
||||
innerKeys []*expression.Column
|
||||
@ -229,9 +228,6 @@ func (e *MergeJoinExec) computeJoin() (bool, error) {
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
}
|
||||
if e.desc {
|
||||
compareResult = -compareResult
|
||||
}
|
||||
}
|
||||
|
||||
if compareResult > 0 {
|
||||
|
||||
@ -294,11 +294,6 @@ func (p *PhysicalMergeJoin) ExplainInfo() string {
|
||||
buffer.WriteString(fmt.Sprintf(", other cond:%s",
|
||||
expression.ExplainExpressionList(p.OtherConditions)))
|
||||
}
|
||||
if p.Desc {
|
||||
buffer.WriteString(", desc")
|
||||
} else {
|
||||
buffer.WriteString(", asc")
|
||||
}
|
||||
if len(p.leftKeys) > 0 {
|
||||
buffer.WriteString(fmt.Sprintf(", left key:%s",
|
||||
expression.ExplainColumnList(p.leftKeys)))
|
||||
|
||||
@ -143,7 +143,7 @@ func (p *LogicalJoin) attachOnConds(onConds []expression.Expression) {
|
||||
}
|
||||
|
||||
func (p *LogicalJoin) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := p.basePlan.extractCorrelatedCols()
|
||||
corCols := p.baseLogicalPlan.extractCorrelatedCols()
|
||||
for _, fun := range p.EqualConditions {
|
||||
corCols = append(corCols, extractCorColumns(fun)...)
|
||||
}
|
||||
@ -173,7 +173,7 @@ type Projection struct {
|
||||
}
|
||||
|
||||
func (p *Projection) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := p.basePlan.extractCorrelatedCols()
|
||||
corCols := p.baseLogicalPlan.extractCorrelatedCols()
|
||||
for _, expr := range p.Exprs {
|
||||
corCols = append(corCols, extractCorColumns(expr)...)
|
||||
}
|
||||
@ -195,7 +195,7 @@ type LogicalAggregation struct {
|
||||
}
|
||||
|
||||
func (p *LogicalAggregation) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := p.basePlan.extractCorrelatedCols()
|
||||
corCols := p.baseLogicalPlan.extractCorrelatedCols()
|
||||
for _, expr := range p.GroupByItems {
|
||||
corCols = append(corCols, extractCorColumns(expr)...)
|
||||
}
|
||||
@ -231,7 +231,7 @@ type Selection struct {
|
||||
}
|
||||
|
||||
func (p *Selection) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := p.basePlan.extractCorrelatedCols()
|
||||
corCols := p.baseLogicalPlan.extractCorrelatedCols()
|
||||
for _, cond := range p.Conditions {
|
||||
corCols = append(corCols, extractCorColumns(cond)...)
|
||||
}
|
||||
@ -347,7 +347,7 @@ type Sort struct {
|
||||
}
|
||||
|
||||
func (p *Sort) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := p.basePlan.extractCorrelatedCols()
|
||||
corCols := p.baseLogicalPlan.extractCorrelatedCols()
|
||||
for _, item := range p.ByItems {
|
||||
corCols = append(corCols, extractCorColumns(item.Expr)...)
|
||||
}
|
||||
|
||||
@ -17,11 +17,9 @@ import (
|
||||
"github.com/pingcap/tidb/ast"
|
||||
"github.com/pingcap/tidb/expression"
|
||||
"github.com/pingcap/tidb/expression/aggregation"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/model"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/util/types"
|
||||
"github.com/pingcap/tipb/go-tipb"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -175,32 +173,9 @@ type physicalTableSource struct {
|
||||
*basePlan
|
||||
basePhysicalPlan
|
||||
|
||||
client kv.Client
|
||||
|
||||
Aggregated bool
|
||||
readOnly bool
|
||||
AggFuncsPB []*tipb.Expr
|
||||
GbyItemsPB []*tipb.ByItem
|
||||
|
||||
// TableConditionPBExpr is the pb structure of conditions that used in the table scan.
|
||||
TableConditionPBExpr *tipb.Expr
|
||||
// IndexConditionPBExpr is the pb structure of conditions that used in the index scan.
|
||||
IndexConditionPBExpr *tipb.Expr
|
||||
|
||||
// AccessCondition is used to calculate range.
|
||||
AccessCondition []expression.Expression
|
||||
|
||||
LimitCount *int64
|
||||
SortItemsPB []*tipb.ByItem
|
||||
|
||||
// The following fields are used for explaining and testing. Because pb structures are not human-readable.
|
||||
|
||||
aggFuncs []aggregation.Aggregation
|
||||
gbyItems []expression.Expression
|
||||
sortItems []*ByItems
|
||||
indexFilterConditions []expression.Expression
|
||||
tableFilterConditions []expression.Expression
|
||||
|
||||
// filterCondition is only used by new planner.
|
||||
filterCondition []expression.Expression
|
||||
|
||||
@ -220,21 +195,6 @@ func needValue(af aggregation.Aggregation) bool {
|
||||
af.GetName() == ast.AggFuncMax || af.GetName() == ast.AggFuncMin || af.GetName() == ast.AggFuncGroupConcat
|
||||
}
|
||||
|
||||
func (p *physicalTableSource) tryToAddUnionScan(resultPlan PhysicalPlan, s *expression.Schema) PhysicalPlan {
|
||||
if p.readOnly {
|
||||
return resultPlan
|
||||
}
|
||||
conditions := append(p.indexFilterConditions, p.tableFilterConditions...)
|
||||
us := PhysicalUnionScan{
|
||||
Conditions: append(conditions, p.AccessCondition...),
|
||||
NeedColHandle: p.NeedColHandle,
|
||||
}.init(p.allocator, p.ctx)
|
||||
us.SetChildren(resultPlan)
|
||||
us.SetSchema(s)
|
||||
p.NeedColHandle = true
|
||||
return us
|
||||
}
|
||||
|
||||
// PhysicalTableScan represents a table scan plan.
|
||||
type PhysicalTableScan struct {
|
||||
physicalTableSource
|
||||
@ -312,7 +272,6 @@ type PhysicalMergeJoin struct {
|
||||
OtherConditions []expression.Expression
|
||||
|
||||
DefaultValues []types.Datum
|
||||
Desc bool
|
||||
|
||||
leftKeys []*expression.Column
|
||||
rightKeys []*expression.Column
|
||||
@ -382,64 +341,6 @@ type PhysicalUnionScan struct {
|
||||
Conditions []expression.Expression
|
||||
}
|
||||
|
||||
func (p *PhysicalHashJoin) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := p.basePlan.extractCorrelatedCols()
|
||||
for _, fun := range p.EqualConditions {
|
||||
corCols = append(corCols, extractCorColumns(fun)...)
|
||||
}
|
||||
for _, fun := range p.LeftConditions {
|
||||
corCols = append(corCols, extractCorColumns(fun)...)
|
||||
}
|
||||
for _, fun := range p.RightConditions {
|
||||
corCols = append(corCols, extractCorColumns(fun)...)
|
||||
}
|
||||
for _, fun := range p.OtherConditions {
|
||||
corCols = append(corCols, extractCorColumns(fun)...)
|
||||
}
|
||||
return corCols
|
||||
}
|
||||
|
||||
func (p *PhysicalHashSemiJoin) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := p.basePlan.extractCorrelatedCols()
|
||||
for _, fun := range p.EqualConditions {
|
||||
corCols = append(corCols, extractCorColumns(fun)...)
|
||||
}
|
||||
for _, fun := range p.LeftConditions {
|
||||
corCols = append(corCols, extractCorColumns(fun)...)
|
||||
}
|
||||
for _, fun := range p.RightConditions {
|
||||
corCols = append(corCols, extractCorColumns(fun)...)
|
||||
}
|
||||
for _, fun := range p.OtherConditions {
|
||||
corCols = append(corCols, extractCorColumns(fun)...)
|
||||
}
|
||||
return corCols
|
||||
}
|
||||
|
||||
func (p *PhysicalApply) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := p.basePlan.extractCorrelatedCols()
|
||||
corCols = append(corCols, p.PhysicalJoin.extractCorrelatedCols()...)
|
||||
for i := len(corCols) - 1; i >= 0; i-- {
|
||||
if p.PhysicalJoin.Children()[0].Schema().Contains(&corCols[i].Column) {
|
||||
corCols = append(corCols[:i], corCols[i+1:]...)
|
||||
}
|
||||
}
|
||||
return corCols
|
||||
}
|
||||
|
||||
func (p *PhysicalAggregation) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := p.basePlan.extractCorrelatedCols()
|
||||
for _, expr := range p.GroupByItems {
|
||||
corCols = append(corCols, extractCorColumns(expr)...)
|
||||
}
|
||||
for _, fun := range p.AggFuncs {
|
||||
for _, arg := range fun.GetArgs() {
|
||||
corCols = append(corCols, extractCorColumns(arg)...)
|
||||
}
|
||||
}
|
||||
return corCols
|
||||
}
|
||||
|
||||
// Copy implements the PhysicalPlan Copy interface.
|
||||
func (p *PhysicalIndexScan) Copy() PhysicalPlan {
|
||||
np := *p
|
||||
|
||||
10
plan/plan.go
10
plan/plan.go
@ -61,8 +61,6 @@ type Plan interface {
|
||||
|
||||
context() context.Context
|
||||
|
||||
extractCorrelatedCols() []*expression.CorrelatedColumn
|
||||
|
||||
// ResolveIndices resolves the indices for columns. After doing this, the columns can evaluate the rows by their indices.
|
||||
ResolveIndices()
|
||||
|
||||
@ -197,6 +195,8 @@ type LogicalPlan interface {
|
||||
|
||||
// generatePhysicalPlans generates all possible plans.
|
||||
generatePhysicalPlans() []PhysicalPlan
|
||||
|
||||
extractCorrelatedCols() []*expression.CorrelatedColumn
|
||||
}
|
||||
|
||||
// PhysicalPlan is a tree of the physical operators.
|
||||
@ -307,10 +307,10 @@ func (p *baseLogicalPlan) PredicatePushDown(predicates []expression.Expression)
|
||||
return nil, p.basePlan.self.(LogicalPlan), nil
|
||||
}
|
||||
|
||||
func (p *basePlan) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
func (p *baseLogicalPlan) extractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
var corCols []*expression.CorrelatedColumn
|
||||
for _, child := range p.children {
|
||||
corCols = append(corCols, child.extractCorrelatedCols()...)
|
||||
for _, child := range p.basePlan.children {
|
||||
corCols = append(corCols, child.(LogicalPlan).extractCorrelatedCols()...)
|
||||
}
|
||||
return corCols
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user