Files
tidb/pkg/planner/core/plan.go

215 lines
7.3 KiB
Go

// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package core
import (
"math"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/planner/cardinality"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/execdetails"
)
// AsSctx converts PlanContext to sessionctx.Context.
func AsSctx(pctx base.PlanContext) (sessionctx.Context, error) {
sctx, ok := pctx.(sessionctx.Context)
if !ok {
return nil, errors.New("the current PlanContext cannot be converted to sessionctx.Context")
}
return sctx, nil
}
// optimizeByShuffle insert `PhysicalShuffle` to optimize performance by running in a parallel manner.
func optimizeByShuffle(tsk base.Task, ctx base.PlanContext) base.Task {
if tsk.Plan() == nil {
return tsk
}
switch p := tsk.Plan().(type) {
case *physicalop.PhysicalWindow:
if shuffle := optimizeByShuffle4Window(p, ctx); shuffle != nil {
return shuffle.Attach2Task(tsk)
}
case *physicalop.PhysicalMergeJoin:
if shuffle := optimizeByShuffle4MergeJoin(p, ctx); shuffle != nil {
return shuffle.Attach2Task(tsk)
}
case *physicalop.PhysicalStreamAgg:
if shuffle := optimizeByShuffle4StreamAgg(p, ctx); shuffle != nil {
return shuffle.Attach2Task(tsk)
}
}
return tsk
}
func optimizeByShuffle4Window(pp *physicalop.PhysicalWindow, ctx base.PlanContext) *physicalop.PhysicalShuffle {
concurrency := ctx.GetSessionVars().WindowConcurrency()
if concurrency <= 1 {
return nil
}
sort, ok := pp.Children()[0].(*physicalop.PhysicalSort)
if !ok {
// Multi-thread executing on SORTED data source is not effective enough by current implementation.
// TODO: Implement a better one.
return nil
}
tail, dataSource := sort, sort.Children()[0]
partitionBy := make([]*expression.Column, 0, len(pp.PartitionBy))
for _, item := range pp.PartitionBy {
partitionBy = append(partitionBy, item.Col)
}
ndv, _ := cardinality.EstimateColsNDVWithMatchedLen(ctx, partitionBy, dataSource.Schema(), dataSource.StatsInfo())
if ndv <= 1 {
return nil
}
concurrency = min(concurrency, int(ndv))
byItems := make([]expression.Expression, 0, len(pp.PartitionBy))
for _, item := range pp.PartitionBy {
byItems = append(byItems, item.Col)
}
reqProp := &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64}
shuffle := physicalop.PhysicalShuffle{
Concurrency: concurrency,
Tails: []base.PhysicalPlan{tail},
DataSources: []base.PhysicalPlan{dataSource},
SplitterType: physicalop.PartitionHashSplitterType,
ByItemArrays: [][]expression.Expression{byItems},
}.Init(ctx, pp.StatsInfo(), pp.QueryBlockOffset(), reqProp)
return shuffle
}
func optimizeByShuffle4StreamAgg(pp *physicalop.PhysicalStreamAgg, ctx base.PlanContext) *physicalop.PhysicalShuffle {
concurrency := ctx.GetSessionVars().StreamAggConcurrency()
if concurrency <= 1 {
return nil
}
sort, ok := pp.Children()[0].(*physicalop.PhysicalSort)
if !ok {
// Multi-thread executing on SORTED data source is not effective enough by current implementation.
// TODO: Implement a better one.
return nil
}
tail, dataSource := sort, sort.Children()[0]
partitionBy := make([]*expression.Column, 0, len(pp.GroupByItems))
for _, item := range pp.GroupByItems {
if col, ok := item.(*expression.Column); ok {
partitionBy = append(partitionBy, col)
}
}
ndv, _ := cardinality.EstimateColsNDVWithMatchedLen(ctx, partitionBy, dataSource.Schema(), dataSource.StatsInfo())
if ndv <= 1 {
return nil
}
concurrency = min(concurrency, int(ndv))
reqProp := &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64}
shuffle := physicalop.PhysicalShuffle{
Concurrency: concurrency,
Tails: []base.PhysicalPlan{tail},
DataSources: []base.PhysicalPlan{dataSource},
SplitterType: physicalop.PartitionHashSplitterType,
ByItemArrays: [][]expression.Expression{util.CloneExprs(pp.GroupByItems)},
}.Init(ctx, pp.StatsInfo(), pp.QueryBlockOffset(), reqProp)
return shuffle
}
func optimizeByShuffle4MergeJoin(pp *physicalop.PhysicalMergeJoin, ctx base.PlanContext) *physicalop.PhysicalShuffle {
concurrency := ctx.GetSessionVars().MergeJoinConcurrency()
if concurrency <= 1 {
return nil
}
children := pp.Children()
dataSources := make([]base.PhysicalPlan, len(children))
tails := make([]base.PhysicalPlan, len(children))
for i := range children {
sort, ok := children[i].(*physicalop.PhysicalSort)
if !ok {
// Multi-thread executing on SORTED data source is not effective enough by current implementation.
// TODO: Implement a better one.
return nil
}
tails[i], dataSources[i] = sort, sort.Children()[0]
}
leftByItemArray := make([]expression.Expression, 0, len(pp.LeftJoinKeys))
for _, col := range pp.LeftJoinKeys {
leftByItemArray = append(leftByItemArray, col.Clone())
}
rightByItemArray := make([]expression.Expression, 0, len(pp.RightJoinKeys))
for _, col := range pp.RightJoinKeys {
rightByItemArray = append(rightByItemArray, col.Clone())
}
reqProp := &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64}
shuffle := physicalop.PhysicalShuffle{
Concurrency: concurrency,
Tails: tails,
DataSources: dataSources,
SplitterType: physicalop.PartitionHashSplitterType,
ByItemArrays: [][]expression.Expression{leftByItemArray, rightByItemArray},
}.Init(ctx, pp.StatsInfo(), pp.QueryBlockOffset(), reqProp)
return shuffle
}
func getEstimatedProbeCntFromProbeParents(probeParents []base.PhysicalPlan) float64 {
res := float64(1)
for _, pp := range probeParents {
switch pp.(type) {
case *physicalop.PhysicalApply, *physicalop.PhysicalIndexHashJoin,
*physicalop.PhysicalIndexMergeJoin, *physicalop.PhysicalIndexJoin:
if join, ok := pp.(interface{ GetInnerChildIdx() int }); ok {
outer := pp.Children()[1-join.GetInnerChildIdx()]
res *= outer.StatsInfo().RowCount
}
}
}
return res
}
func getActualProbeCntFromProbeParents(pps []base.PhysicalPlan, statsColl *execdetails.RuntimeStatsColl) int64 {
res := int64(1)
for _, pp := range pps {
switch pp.(type) {
case *physicalop.PhysicalApply, *physicalop.PhysicalIndexHashJoin,
*physicalop.PhysicalIndexMergeJoin, *physicalop.PhysicalIndexJoin:
if join, ok := pp.(interface{ GetInnerChildIdx() int }); ok {
outerChildID := pp.Children()[1-join.GetInnerChildIdx()].ID()
actRows := int64(1)
if statsColl.ExistsRootStats(outerChildID) {
actRows = statsColl.GetRootStats(outerChildID).GetActRows()
}
if statsColl.ExistsCopStats(outerChildID) {
actRows = statsColl.GetCopStats(outerChildID).GetActRows()
}
// TODO: For PhysicalApply, we need to consider cache hit ratio in JoinRuntimeStats and use actRows/(1-ratio) here.
res *= actRows
}
}
}
return res
}