2130 lines
84 KiB
Go
2130 lines
84 KiB
Go
// Copyright 2017 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package core
|
|
|
|
import (
|
|
"math"
|
|
"slices"
|
|
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/expression/aggregation"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/parser/mysql"
|
|
"github.com/pingcap/tidb/pkg/planner/cardinality"
|
|
"github.com/pingcap/tidb/pkg/planner/core/base"
|
|
"github.com/pingcap/tidb/pkg/planner/core/operator/baseimpl"
|
|
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
|
|
"github.com/pingcap/tidb/pkg/planner/property"
|
|
"github.com/pingcap/tidb/pkg/planner/util"
|
|
"github.com/pingcap/tidb/pkg/planner/util/fixcontrol"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
"github.com/pingcap/tidb/pkg/util/paging"
|
|
"github.com/pingcap/tidb/pkg/util/plancodec"
|
|
"github.com/pingcap/tipb/go-tipb"
|
|
)
|
|
|
|
// HeavyFunctionNameMap stores function names that is worth to do HeavyFunctionOptimize.
|
|
// Currently this only applies to Vector data types and their functions. The HeavyFunctionOptimize
|
|
// eliminate the usage of the function in TopN operators to avoid vector distance re-calculation
|
|
// of TopN in the root task.
|
|
var HeavyFunctionNameMap = map[string]struct{}{
|
|
"vec_cosine_distance": {},
|
|
"vec_l1_distance": {},
|
|
"vec_l2_distance": {},
|
|
"vec_negative_inner_product": {},
|
|
"vec_dims": {},
|
|
"vec_l2_norm": {},
|
|
}
|
|
|
|
func attachPlan2Task(p base.PhysicalPlan, t base.Task) base.Task {
|
|
// since almost all current physical plan will be attached to bottom encapsulated task.
|
|
// we do the stats inheritance here for all the index join inner task.
|
|
inheritStatsFromBottomTaskForIndexJoinInner(p, t)
|
|
switch v := t.(type) {
|
|
case *physicalop.CopTask:
|
|
if v.IndexPlanFinished {
|
|
p.SetChildren(v.TablePlan)
|
|
v.TablePlan = p
|
|
} else {
|
|
p.SetChildren(v.IndexPlan)
|
|
v.IndexPlan = p
|
|
}
|
|
case *physicalop.RootTask:
|
|
p.SetChildren(v.GetPlan())
|
|
v.SetPlan(p)
|
|
case *physicalop.MppTask:
|
|
p.SetChildren(v.Plan())
|
|
v.SetPlan(p)
|
|
}
|
|
return t
|
|
}
|
|
|
|
// attach2Task4PhysicalUnionScan implements PhysicalPlan interface.
|
|
func attach2Task4PhysicalUnionScan(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalUnionScan)
|
|
// when it arrives here, physical union scan will absolutely require a root task type,
|
|
// so convert child to root task type first.
|
|
task := tasks[0].ConvertToRootTask(p.SCtx())
|
|
// We need to pull the projection under unionScan upon unionScan.
|
|
// Since the projection only prunes columns, it's ok the put it upon unionScan.
|
|
if sel, ok := task.Plan().(*physicalop.PhysicalSelection); ok {
|
|
if pj, ok := sel.Children()[0].(*physicalop.PhysicalProjection); ok {
|
|
// Convert unionScan->selection->projection to projection->unionScan->selection.
|
|
// shallow clone sel
|
|
clonedSel := *sel
|
|
clonedSel.SetChildren(pj.Children()...)
|
|
// set child will substitute original child slices, not an in-place change.
|
|
p.SetChildren(&clonedSel)
|
|
p.SetStats(task.Plan().StatsInfo())
|
|
rt := task.(*physicalop.RootTask)
|
|
rt.SetPlan(p) // root task plan current is p headed.
|
|
// shallow clone proj.
|
|
clonedProj := *pj
|
|
// set child will substitute original child slices, not an in-place change.
|
|
clonedProj.SetChildren(p)
|
|
return clonedProj.Attach2Task(task)
|
|
}
|
|
}
|
|
if pj, ok := task.Plan().(*physicalop.PhysicalProjection); ok {
|
|
// Convert unionScan->projection to projection->unionScan, because unionScan can't handle projection as its children.
|
|
p.SetChildren(pj.Children()...)
|
|
p.SetStats(task.Plan().StatsInfo())
|
|
rt, _ := task.(*physicalop.RootTask)
|
|
rt.SetPlan(pj.Children()[0])
|
|
// shallow clone proj.
|
|
clonedProj := *pj
|
|
// set child will substitute original child slices, not an in-place change.
|
|
clonedProj.SetChildren(p)
|
|
return clonedProj.Attach2Task(p.BasePhysicalPlan.Attach2Task(task))
|
|
}
|
|
p.SetStats(task.Plan().StatsInfo())
|
|
// once task is copTask type here, it may be converted proj + tablePlan here.
|
|
// then when it's connected with union-scan here, we may get as: union-scan + proj + tablePlan
|
|
// while proj is not allowed to be built under union-scan in execution layer currently.
|
|
return p.BasePhysicalPlan.Attach2Task(task)
|
|
}
|
|
|
|
// attach2Task4PhysicalApply implements PhysicalPlan interface.
|
|
func attach2Task4PhysicalApply(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalApply)
|
|
lTask := tasks[0].ConvertToRootTask(p.SCtx())
|
|
rTask := tasks[1].ConvertToRootTask(p.SCtx())
|
|
p.SetChildren(lTask.Plan(), rTask.Plan())
|
|
p.SetSchema(physicalop.BuildPhysicalJoinSchema(p.JoinType, p))
|
|
t := &physicalop.RootTask{}
|
|
t.SetPlan(p)
|
|
// inherit left and right child's warnings.
|
|
t.Warnings.CopyFrom(&lTask.(*physicalop.RootTask).Warnings, &rTask.(*physicalop.RootTask).Warnings)
|
|
return t
|
|
}
|
|
|
|
// attach2Task4PhysicalIndexMergeJoin implements PhysicalPlan interface.
|
|
func attach2Task4PhysicalIndexMergeJoin(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalIndexMergeJoin)
|
|
outerTask := tasks[1-p.InnerChildIdx].ConvertToRootTask(p.SCtx())
|
|
if p.InnerChildIdx == 1 {
|
|
p.SetChildren(outerTask.Plan(), p.InnerPlan)
|
|
} else {
|
|
p.SetChildren(p.InnerPlan, outerTask.Plan())
|
|
}
|
|
t := &physicalop.RootTask{}
|
|
t.SetPlan(p)
|
|
return t
|
|
}
|
|
|
|
func indexHashJoinAttach2TaskV1(p *physicalop.PhysicalIndexHashJoin, tasks ...base.Task) base.Task {
|
|
outerTask := tasks[1-p.InnerChildIdx].ConvertToRootTask(p.SCtx())
|
|
if p.InnerChildIdx == 1 {
|
|
p.SetChildren(outerTask.Plan(), p.InnerPlan)
|
|
} else {
|
|
p.SetChildren(p.InnerPlan, outerTask.Plan())
|
|
}
|
|
t := &physicalop.RootTask{}
|
|
t.SetPlan(p)
|
|
return t
|
|
}
|
|
|
|
func indexHashJoinAttach2TaskV2(p *physicalop.PhysicalIndexHashJoin, tasks ...base.Task) base.Task {
|
|
outerTask := tasks[1-p.InnerChildIdx].ConvertToRootTask(p.SCtx())
|
|
innerTask := tasks[p.InnerChildIdx].ConvertToRootTask(p.SCtx())
|
|
// only fill the wrapped physical index join is ok.
|
|
completePhysicalIndexJoin(&p.PhysicalIndexJoin, innerTask.(*physicalop.RootTask), innerTask.Plan().Schema(), outerTask.Plan().Schema(), true)
|
|
if p.InnerChildIdx == 1 {
|
|
p.SetChildren(outerTask.Plan(), innerTask.Plan())
|
|
} else {
|
|
p.SetChildren(innerTask.Plan(), outerTask.Plan())
|
|
}
|
|
t := &physicalop.RootTask{}
|
|
t.SetPlan(p)
|
|
t.Warnings.CopyFrom(&outerTask.(*physicalop.RootTask).Warnings, &innerTask.(*physicalop.RootTask).Warnings)
|
|
return t
|
|
}
|
|
|
|
// attach2Task4PhysicalIndexHashJoin implements PhysicalPlan interface.
|
|
func attach2Task4PhysicalIndexHashJoin(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalIndexHashJoin)
|
|
if p.SCtx().GetSessionVars().EnhanceIndexJoinBuildV2 {
|
|
return indexHashJoinAttach2TaskV2(p, tasks...)
|
|
}
|
|
return indexHashJoinAttach2TaskV1(p, tasks...)
|
|
}
|
|
|
|
func indexJoinAttach2TaskV1(p *physicalop.PhysicalIndexJoin, tasks ...base.Task) base.Task {
|
|
outerTask := tasks[1-p.InnerChildIdx].ConvertToRootTask(p.SCtx())
|
|
if p.InnerChildIdx == 1 {
|
|
p.SetChildren(outerTask.Plan(), p.InnerPlan)
|
|
} else {
|
|
p.SetChildren(p.InnerPlan, outerTask.Plan())
|
|
}
|
|
t := &physicalop.RootTask{}
|
|
t.SetPlan(p)
|
|
return t
|
|
}
|
|
|
|
func indexJoinAttach2TaskV2(p *physicalop.PhysicalIndexJoin, tasks ...base.Task) base.Task {
|
|
outerTask := tasks[1-p.InnerChildIdx].ConvertToRootTask(p.SCtx())
|
|
innerTask := tasks[p.InnerChildIdx].ConvertToRootTask(p.SCtx())
|
|
completePhysicalIndexJoin(p, innerTask.(*physicalop.RootTask), innerTask.Plan().Schema(), outerTask.Plan().Schema(), true)
|
|
if p.InnerChildIdx == 1 {
|
|
p.SetChildren(outerTask.Plan(), innerTask.Plan())
|
|
} else {
|
|
p.SetChildren(innerTask.Plan(), outerTask.Plan())
|
|
}
|
|
t := &physicalop.RootTask{}
|
|
t.SetPlan(p)
|
|
t.Warnings.CopyFrom(&outerTask.(*physicalop.RootTask).Warnings, &innerTask.(*physicalop.RootTask).Warnings)
|
|
return t
|
|
}
|
|
|
|
func attach2Task4PhysicalIndexJoin(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalIndexJoin)
|
|
if p.SCtx().GetSessionVars().EnhanceIndexJoinBuildV2 {
|
|
return indexJoinAttach2TaskV2(p, tasks...)
|
|
}
|
|
return indexJoinAttach2TaskV1(p, tasks...)
|
|
}
|
|
|
|
// RowSize for cost model ver2 is simplified, always use this function to calculate row size.
|
|
func getAvgRowSize(stats *property.StatsInfo, cols []*expression.Column) (size float64) {
|
|
if stats.HistColl != nil {
|
|
size = max(cardinality.GetAvgRowSizeDataInDiskByRows(stats.HistColl, cols), 0)
|
|
} else {
|
|
// Estimate using just the type info.
|
|
for _, col := range cols {
|
|
size += max(float64(chunk.EstimateTypeWidth(col.GetStaticType())), 0)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// attach2Task4PhysicalHashJoin implements PhysicalPlan interface.
|
|
func attach2Task4PhysicalHashJoin(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalHashJoin)
|
|
if p.StoreTp == kv.TiFlash {
|
|
return attach2TaskForTiFlash4PhysicalHashJoin(p, tasks...)
|
|
}
|
|
rTask := tasks[1].ConvertToRootTask(p.SCtx())
|
|
lTask := tasks[0].ConvertToRootTask(p.SCtx())
|
|
p.SetChildren(lTask.Plan(), rTask.Plan())
|
|
task := &physicalop.RootTask{}
|
|
task.SetPlan(p)
|
|
task.Warnings.CopyFrom(&rTask.(*physicalop.RootTask).Warnings, &lTask.(*physicalop.RootTask).Warnings)
|
|
return task
|
|
}
|
|
|
|
func attach2TaskForTiFlash4PhysicalHashJoin(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalHashJoin)
|
|
rTask, rok := tasks[1].(*physicalop.CopTask)
|
|
lTask, lok := tasks[0].(*physicalop.CopTask)
|
|
if !lok || !rok {
|
|
return attach2TaskForMpp4PhysicalHashJoin(p, tasks...)
|
|
}
|
|
rRoot := rTask.ConvertToRootTask(p.SCtx())
|
|
lRoot := lTask.ConvertToRootTask(p.SCtx())
|
|
p.SetChildren(lRoot.Plan(), rRoot.Plan())
|
|
p.SetSchema(physicalop.BuildPhysicalJoinSchema(p.JoinType, p))
|
|
task := &physicalop.RootTask{}
|
|
task.SetPlan(p)
|
|
task.Warnings.CopyFrom(&rTask.Warnings, &lTask.Warnings)
|
|
return task
|
|
}
|
|
|
|
// TiDB only require that the types fall into the same catalog but TiFlash require the type to be exactly the same, so
|
|
// need to check if the conversion is a must
|
|
func needConvert(tp *types.FieldType, rtp *types.FieldType) bool {
|
|
// all the string type are mapped to the same type in TiFlash, so
|
|
// do not need convert for string types
|
|
if types.IsString(tp.GetType()) && types.IsString(rtp.GetType()) {
|
|
return false
|
|
}
|
|
if tp.GetType() != rtp.GetType() {
|
|
return true
|
|
}
|
|
if tp.GetType() != mysql.TypeNewDecimal {
|
|
return false
|
|
}
|
|
if tp.GetDecimal() != rtp.GetDecimal() {
|
|
return true
|
|
}
|
|
// for decimal type, TiFlash have 4 different impl based on the required precision
|
|
if tp.GetFlen() >= 0 && tp.GetFlen() <= 9 && rtp.GetFlen() >= 0 && rtp.GetFlen() <= 9 {
|
|
return false
|
|
}
|
|
if tp.GetFlen() > 9 && tp.GetFlen() <= 18 && rtp.GetFlen() > 9 && rtp.GetFlen() <= 18 {
|
|
return false
|
|
}
|
|
if tp.GetFlen() > 18 && tp.GetFlen() <= 38 && rtp.GetFlen() > 18 && rtp.GetFlen() <= 38 {
|
|
return false
|
|
}
|
|
if tp.GetFlen() > 38 && tp.GetFlen() <= 65 && rtp.GetFlen() > 38 && rtp.GetFlen() <= 65 {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func negotiateCommonType(lType, rType *types.FieldType) (_ *types.FieldType, _, _ bool) {
|
|
commonType := types.AggFieldType([]*types.FieldType{lType, rType})
|
|
if commonType.GetType() == mysql.TypeNewDecimal {
|
|
lExtend := 0
|
|
rExtend := 0
|
|
cDec := rType.GetDecimal()
|
|
if lType.GetDecimal() < rType.GetDecimal() {
|
|
lExtend = rType.GetDecimal() - lType.GetDecimal()
|
|
} else if lType.GetDecimal() > rType.GetDecimal() {
|
|
rExtend = lType.GetDecimal() - rType.GetDecimal()
|
|
cDec = lType.GetDecimal()
|
|
}
|
|
lLen, rLen := lType.GetFlen()+lExtend, rType.GetFlen()+rExtend
|
|
cLen := max(lLen, rLen)
|
|
commonType.SetDecimalUnderLimit(cDec)
|
|
commonType.SetFlenUnderLimit(cLen)
|
|
} else if needConvert(lType, commonType) || needConvert(rType, commonType) {
|
|
if mysql.IsIntegerType(commonType.GetType()) {
|
|
// If the target type is int, both TiFlash and Mysql only support cast to Int64
|
|
// so we need to promote the type to Int64
|
|
commonType.SetType(mysql.TypeLonglong)
|
|
commonType.SetFlen(mysql.MaxIntWidth)
|
|
}
|
|
}
|
|
return commonType, needConvert(lType, commonType), needConvert(rType, commonType)
|
|
}
|
|
|
|
func getProj(ctx base.PlanContext, p base.PhysicalPlan) *physicalop.PhysicalProjection {
|
|
proj := physicalop.PhysicalProjection{
|
|
Exprs: make([]expression.Expression, 0, len(p.Schema().Columns)),
|
|
}.Init(ctx, p.StatsInfo(), p.QueryBlockOffset())
|
|
for _, col := range p.Schema().Columns {
|
|
proj.Exprs = append(proj.Exprs, col)
|
|
}
|
|
proj.SetSchema(p.Schema().Clone())
|
|
proj.SetChildren(p)
|
|
return proj
|
|
}
|
|
|
|
func appendExpr(p *physicalop.PhysicalProjection, expr expression.Expression) *expression.Column {
|
|
p.Exprs = append(p.Exprs, expr)
|
|
|
|
col := &expression.Column{
|
|
UniqueID: p.SCtx().GetSessionVars().AllocPlanColumnID(),
|
|
RetType: expr.GetType(p.SCtx().GetExprCtx().GetEvalCtx()),
|
|
}
|
|
col.SetCoercibility(expr.Coercibility())
|
|
p.Schema().Append(col)
|
|
return col
|
|
}
|
|
|
|
// TiFlash join require that partition key has exactly the same type, while TiDB only guarantee the partition key is the same catalog,
|
|
// so if the partition key type is not exactly the same, we need add a projection below the join or exchanger if exists.
|
|
func convertPartitionKeysIfNeed4PhysicalHashJoin(pp base.PhysicalPlan, lTask, rTask *physicalop.MppTask) (_, _ *physicalop.MppTask) {
|
|
p := pp.(*physicalop.PhysicalHashJoin)
|
|
lp := lTask.Plan()
|
|
if _, ok := lp.(*physicalop.PhysicalExchangeReceiver); ok {
|
|
lp = lp.Children()[0].Children()[0]
|
|
}
|
|
rp := rTask.Plan()
|
|
if _, ok := rp.(*physicalop.PhysicalExchangeReceiver); ok {
|
|
rp = rp.Children()[0].Children()[0]
|
|
}
|
|
// to mark if any partition key needs to convert
|
|
lMask := make([]bool, len(lTask.HashCols))
|
|
rMask := make([]bool, len(rTask.HashCols))
|
|
cTypes := make([]*types.FieldType, len(lTask.HashCols))
|
|
lChanged := false
|
|
rChanged := false
|
|
for i := range lTask.HashCols {
|
|
lKey := lTask.HashCols[i]
|
|
rKey := rTask.HashCols[i]
|
|
cType, lConvert, rConvert := negotiateCommonType(lKey.Col.RetType, rKey.Col.RetType)
|
|
if lConvert {
|
|
lMask[i] = true
|
|
cTypes[i] = cType
|
|
lChanged = true
|
|
}
|
|
if rConvert {
|
|
rMask[i] = true
|
|
cTypes[i] = cType
|
|
rChanged = true
|
|
}
|
|
}
|
|
if !lChanged && !rChanged {
|
|
return lTask, rTask
|
|
}
|
|
var lProj, rProj *physicalop.PhysicalProjection
|
|
if lChanged {
|
|
lProj = getProj(p.SCtx(), lp)
|
|
lp = lProj
|
|
}
|
|
if rChanged {
|
|
rProj = getProj(p.SCtx(), rp)
|
|
rp = rProj
|
|
}
|
|
|
|
lPartKeys := make([]*property.MPPPartitionColumn, 0, len(rTask.HashCols))
|
|
rPartKeys := make([]*property.MPPPartitionColumn, 0, len(lTask.HashCols))
|
|
for i := range lTask.HashCols {
|
|
lKey := lTask.HashCols[i]
|
|
rKey := rTask.HashCols[i]
|
|
if lMask[i] {
|
|
cType := cTypes[i].Clone()
|
|
cType.SetFlag(lKey.Col.RetType.GetFlag())
|
|
lCast := expression.BuildCastFunction(p.SCtx().GetExprCtx(), lKey.Col, cType)
|
|
lKey = &property.MPPPartitionColumn{Col: appendExpr(lProj, lCast), CollateID: lKey.CollateID}
|
|
}
|
|
if rMask[i] {
|
|
cType := cTypes[i].Clone()
|
|
cType.SetFlag(rKey.Col.RetType.GetFlag())
|
|
rCast := expression.BuildCastFunction(p.SCtx().GetExprCtx(), rKey.Col, cType)
|
|
rKey = &property.MPPPartitionColumn{Col: appendExpr(rProj, rCast), CollateID: rKey.CollateID}
|
|
}
|
|
lPartKeys = append(lPartKeys, lKey)
|
|
rPartKeys = append(rPartKeys, rKey)
|
|
}
|
|
// if left or right child changes, we need to add enforcer.
|
|
if lChanged {
|
|
nlTask := lTask.Copy().(*physicalop.MppTask)
|
|
nlTask.SetPlan(lProj)
|
|
nlTask = nlTask.EnforceExchanger(&property.PhysicalProperty{
|
|
TaskTp: property.MppTaskType,
|
|
MPPPartitionTp: property.HashType,
|
|
MPPPartitionCols: lPartKeys,
|
|
}, nil)
|
|
lTask = nlTask
|
|
}
|
|
if rChanged {
|
|
nrTask := rTask.Copy().(*physicalop.MppTask)
|
|
nrTask.SetPlan(rProj)
|
|
nrTask = nrTask.EnforceExchanger(&property.PhysicalProperty{
|
|
TaskTp: property.MppTaskType,
|
|
MPPPartitionTp: property.HashType,
|
|
MPPPartitionCols: rPartKeys,
|
|
}, nil)
|
|
rTask = nrTask
|
|
}
|
|
return lTask, rTask
|
|
}
|
|
|
|
func enforceExchangerByBackup4PhysicalHashJoin(pp base.PhysicalPlan, task *physicalop.MppTask, idx int, expectedCols int) *physicalop.MppTask {
|
|
p := pp.(*physicalop.PhysicalHashJoin)
|
|
if backupHashProp := p.GetChildReqProps(idx); backupHashProp != nil {
|
|
if len(backupHashProp.MPPPartitionCols) == expectedCols {
|
|
return task.EnforceExchangerImpl(backupHashProp)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func attach2TaskForMpp4PhysicalHashJoin(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
const (
|
|
left = 0
|
|
right = 1
|
|
)
|
|
rTask, rok := tasks[right].(*physicalop.MppTask)
|
|
lTask, lok := tasks[left].(*physicalop.MppTask)
|
|
if !lok || !rok {
|
|
return base.InvalidTask
|
|
}
|
|
p := pp.(*physicalop.PhysicalHashJoin)
|
|
if p.MppShuffleJoin {
|
|
if len(lTask.HashCols) == 0 || len(rTask.HashCols) == 0 {
|
|
// if the hash columns are empty, this is very likely a bug.
|
|
return base.InvalidTask
|
|
}
|
|
if len(lTask.HashCols) != len(rTask.HashCols) {
|
|
// if the hash columns are not the same, The most likely scenario is that
|
|
// they have undergone exchange optimization, removing some hash columns.
|
|
// In this case, we need to restore them on the side that is missing.
|
|
if len(lTask.HashCols) < len(rTask.HashCols) {
|
|
lTask = enforceExchangerByBackup4PhysicalHashJoin(p, lTask, left, len(rTask.HashCols))
|
|
} else {
|
|
rTask = enforceExchangerByBackup4PhysicalHashJoin(p, rTask, right, len(lTask.HashCols))
|
|
}
|
|
if lTask == nil || rTask == nil {
|
|
return base.InvalidTask
|
|
}
|
|
}
|
|
lTask, rTask = convertPartitionKeysIfNeed4PhysicalHashJoin(p, lTask, rTask)
|
|
}
|
|
p.SetChildren(lTask.Plan(), rTask.Plan())
|
|
// outer task is the task that will pass its MPPPartitionType to the join result
|
|
// for broadcast inner join, it should be the non-broadcast side, since broadcast side is always the build side, so
|
|
// just use the probe side is ok.
|
|
// for hash inner join, both side is ok, by default, we use the probe side
|
|
// for outer join, it should always be the outer side of the join
|
|
// for semi join, it should be the left side(the same as left out join)
|
|
outerTaskIndex := 1 - p.InnerChildIdx
|
|
if p.JoinType != base.InnerJoin {
|
|
if p.JoinType == base.RightOuterJoin {
|
|
outerTaskIndex = 1
|
|
} else {
|
|
outerTaskIndex = 0
|
|
}
|
|
}
|
|
// can not use the task from tasks because it maybe updated.
|
|
outerTask := lTask
|
|
if outerTaskIndex == 1 {
|
|
outerTask = rTask
|
|
}
|
|
task := physicalop.NewMppTask(p,
|
|
outerTask.GetPartitionType(),
|
|
outerTask.GetHashCols(),
|
|
nil, rTask.GetWarnings(), lTask.GetWarnings())
|
|
// Current TiFlash doesn't support receive Join executors' schema info directly from TiDB.
|
|
// Instead, it calculates Join executors' output schema using algorithm like BuildPhysicalJoinSchema which
|
|
// produces full semantic schema.
|
|
// Thus, the column prune optimization achievements will be abandoned here.
|
|
// To avoid the performance issue, add a projection here above the Join operator to prune useless columns explicitly.
|
|
// TODO(hyb): transfer Join executors' schema to TiFlash through DagRequest, and use it directly in TiFlash.
|
|
defaultSchema := physicalop.BuildPhysicalJoinSchema(p.JoinType, p)
|
|
hashColArray := make([]*expression.Column, 0, len(task.HashCols))
|
|
// For task.hashCols, these columns may not be contained in pruned columns:
|
|
// select A.id from A join B on A.id = B.id; Suppose B is probe side, and it's hash inner join.
|
|
// After column prune, the output schema of A join B will be A.id only; while the task's hashCols will be B.id.
|
|
// To make matters worse, the hashCols may be used to check if extra cast projection needs to be added, then the newly
|
|
// added projection will expect B.id as input schema. So make sure hashCols are included in task.p's schema.
|
|
// TODO: planner should takes the hashCols attribute into consideration when perform column pruning; Or provide mechanism
|
|
// to constraint hashCols are always chosen inside Join's pruned schema
|
|
for _, hashCol := range task.HashCols {
|
|
hashColArray = append(hashColArray, hashCol.Col)
|
|
}
|
|
if p.Schema().Len() < defaultSchema.Len() {
|
|
if p.Schema().Len() > 0 {
|
|
proj := physicalop.PhysicalProjection{
|
|
Exprs: expression.Column2Exprs(p.Schema().Columns),
|
|
}.Init(p.SCtx(), p.StatsInfo(), p.QueryBlockOffset())
|
|
|
|
proj.SetSchema(p.Schema().Clone())
|
|
for _, hashCol := range hashColArray {
|
|
if !proj.Schema().Contains(hashCol) && defaultSchema.Contains(hashCol) {
|
|
joinCol := defaultSchema.Columns[defaultSchema.ColumnIndex(hashCol)]
|
|
proj.Exprs = append(proj.Exprs, joinCol)
|
|
proj.Schema().Append(joinCol.Clone().(*expression.Column))
|
|
}
|
|
}
|
|
attachPlan2Task(proj, task)
|
|
} else {
|
|
if len(hashColArray) == 0 {
|
|
constOne := expression.NewOne()
|
|
expr := make([]expression.Expression, 0, 1)
|
|
expr = append(expr, constOne)
|
|
proj := physicalop.PhysicalProjection{
|
|
Exprs: expr,
|
|
}.Init(p.SCtx(), p.StatsInfo(), p.QueryBlockOffset())
|
|
|
|
proj.SetSchema(expression.NewSchema(&expression.Column{
|
|
UniqueID: proj.SCtx().GetSessionVars().AllocPlanColumnID(),
|
|
RetType: constOne.GetType(p.SCtx().GetExprCtx().GetEvalCtx()),
|
|
}))
|
|
attachPlan2Task(proj, task)
|
|
} else {
|
|
proj := physicalop.PhysicalProjection{
|
|
Exprs: make([]expression.Expression, 0, len(hashColArray)),
|
|
}.Init(p.SCtx(), p.StatsInfo(), p.QueryBlockOffset())
|
|
|
|
clonedHashColArray := make([]*expression.Column, 0, len(task.HashCols))
|
|
for _, hashCol := range hashColArray {
|
|
if defaultSchema.Contains(hashCol) {
|
|
joinCol := defaultSchema.Columns[defaultSchema.ColumnIndex(hashCol)]
|
|
proj.Exprs = append(proj.Exprs, joinCol)
|
|
clonedHashColArray = append(clonedHashColArray, joinCol.Clone().(*expression.Column))
|
|
}
|
|
}
|
|
|
|
proj.SetSchema(expression.NewSchema(clonedHashColArray...))
|
|
attachPlan2Task(proj, task)
|
|
}
|
|
}
|
|
}
|
|
p.SetSchema(defaultSchema)
|
|
return task
|
|
}
|
|
|
|
// attach2Task4PhysicalMergeJoin implements PhysicalPlan interface.
|
|
func attach2Task4PhysicalMergeJoin(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalMergeJoin)
|
|
lTask := tasks[0].ConvertToRootTask(p.SCtx())
|
|
rTask := tasks[1].ConvertToRootTask(p.SCtx())
|
|
p.SetChildren(lTask.Plan(), rTask.Plan())
|
|
t := &physicalop.RootTask{}
|
|
t.SetPlan(p)
|
|
t.Warnings.CopyFrom(&rTask.(*physicalop.RootTask).Warnings, &lTask.(*physicalop.RootTask).Warnings)
|
|
return t
|
|
}
|
|
|
|
func extractRows(p base.PhysicalPlan) float64 {
|
|
f := float64(0)
|
|
for _, c := range p.Children() {
|
|
if len(c.Children()) != 0 {
|
|
f += extractRows(c)
|
|
} else {
|
|
f += c.StatsInfo().RowCount
|
|
}
|
|
}
|
|
return f
|
|
}
|
|
|
|
// calcPagingCost calculates the cost for paging processing which may increase the seekCnt and reduce scanned rows.
|
|
func calcPagingCost(ctx base.PlanContext, indexPlan base.PhysicalPlan, expectCnt uint64) float64 {
|
|
sessVars := ctx.GetSessionVars()
|
|
indexRows := indexPlan.StatsCount()
|
|
sourceRows := extractRows(indexPlan)
|
|
// with paging, the scanned rows is always less than or equal to source rows.
|
|
if uint64(sourceRows) < expectCnt {
|
|
expectCnt = uint64(sourceRows)
|
|
}
|
|
seekCnt := paging.CalculateSeekCnt(expectCnt)
|
|
indexSelectivity := float64(1)
|
|
if sourceRows > indexRows {
|
|
indexSelectivity = indexRows / sourceRows
|
|
}
|
|
pagingCst := seekCnt*sessVars.GetSeekFactor(nil) + float64(expectCnt)*sessVars.GetCPUFactor()
|
|
pagingCst *= indexSelectivity
|
|
|
|
// we want the diff between idxCst and pagingCst here,
|
|
// however, the idxCst does not contain seekFactor, so a seekFactor needs to be removed
|
|
return math.Max(pagingCst-sessVars.GetSeekFactor(nil), 0)
|
|
}
|
|
|
|
// attach2Task4PhysicalLimit attach limit to different cases.
|
|
// For Normal Index Lookup
|
|
// 1: attach the limit to table side or index side of normal index lookup cop task. (normal case, old code, no more
|
|
// explanation here)
|
|
//
|
|
// For Index Merge:
|
|
// 2: attach the limit to **table** side for index merge intersection case, cause intersection will invalidate the
|
|
// fetched limit+offset rows from each partial index plan, you can not decide how many you want in advance for partial
|
|
// index path, actually. After we sink limit to table side, we still need an upper root limit to control the real limit
|
|
// count admission.
|
|
//
|
|
// 3: attach the limit to **index** side for index merge union case, because each index plan will output the fetched
|
|
// limit+offset (* N path) rows, you still need an embedded pushedLimit inside index merge reader to cut it down.
|
|
//
|
|
// 4: attach the limit to the TOP of root index merge operator if there is some root condition exists for index merge
|
|
// intersection/union case.
|
|
func attach2Task4PhysicalLimit(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalLimit)
|
|
t := tasks[0].Copy()
|
|
newPartitionBy := make([]property.SortItem, 0, len(p.GetPartitionBy()))
|
|
for _, expr := range p.GetPartitionBy() {
|
|
newPartitionBy = append(newPartitionBy, expr.Clone())
|
|
}
|
|
|
|
sunk := false
|
|
if cop, ok := t.(*physicalop.CopTask); ok {
|
|
suspendLimitAboveTablePlan := func() {
|
|
newCount := p.Offset + p.Count
|
|
childProfile := cop.TablePlan.StatsInfo()
|
|
// but "regionNum" is unknown since the copTask can be a double read, so we ignore it now.
|
|
stats := property.DeriveLimitStats(childProfile, float64(newCount))
|
|
pushedDownLimit := physicalop.PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.SCtx(), stats, p.QueryBlockOffset())
|
|
pushedDownLimit.SetChildren(cop.TablePlan)
|
|
cop.TablePlan = pushedDownLimit
|
|
// Don't use clone() so that Limit and its children share the same schema. Otherwise, the virtual generated column may not be resolved right.
|
|
pushedDownLimit.SetSchema(pushedDownLimit.Children()[0].Schema())
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
}
|
|
if len(cop.IdxMergePartPlans) == 0 {
|
|
// For double read which requires order being kept, the limit cannot be pushed down to the table side,
|
|
// because handles would be reordered before being sent to table scan.
|
|
if (!cop.KeepOrder || !cop.IndexPlanFinished || cop.IndexPlan == nil) && len(cop.RootTaskConds) == 0 {
|
|
// When limit is pushed down, we should remove its offset.
|
|
newCount := p.Offset + p.Count
|
|
childProfile := cop.Plan().StatsInfo()
|
|
// Strictly speaking, for the row count of stats, we should multiply newCount with "regionNum",
|
|
// but "regionNum" is unknown since the copTask can be a double read, so we ignore it now.
|
|
stats := property.DeriveLimitStats(childProfile, float64(newCount))
|
|
pushedDownLimit := physicalop.PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.SCtx(), stats, p.QueryBlockOffset())
|
|
cop = attachPlan2Task(pushedDownLimit, cop).(*physicalop.CopTask)
|
|
// Don't use clone() so that Limit and its children share the same schema. Otherwise the virtual generated column may not be resolved right.
|
|
pushedDownLimit.SetSchema(pushedDownLimit.Children()[0].Schema())
|
|
}
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
sunk = sinkIntoIndexLookUp(p, t)
|
|
} else if !cop.IdxMergeIsIntersection {
|
|
// We only support push part of the order prop down to index merge build case.
|
|
if len(cop.RootTaskConds) == 0 {
|
|
// For double read which requires order being kept, the limit cannot be pushed down to the table side,
|
|
// because handles would be reordered before being sent to table scan.
|
|
if cop.IndexPlanFinished && !cop.KeepOrder {
|
|
// when the index plan is finished and index plan is not ordered, sink the limit to the index merge table side.
|
|
suspendLimitAboveTablePlan()
|
|
} else if !cop.IndexPlanFinished {
|
|
// cop.IndexPlanFinished = false indicates the table side is a pure table-scan, sink the limit to the index merge index side.
|
|
newCount := p.Offset + p.Count
|
|
limitChildren := make([]base.PhysicalPlan, 0, len(cop.IdxMergePartPlans))
|
|
for _, partialScan := range cop.IdxMergePartPlans {
|
|
childProfile := partialScan.StatsInfo()
|
|
stats := property.DeriveLimitStats(childProfile, float64(newCount))
|
|
pushedDownLimit := physicalop.PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.SCtx(), stats, p.QueryBlockOffset())
|
|
pushedDownLimit.SetChildren(partialScan)
|
|
pushedDownLimit.SetSchema(pushedDownLimit.Children()[0].Schema())
|
|
limitChildren = append(limitChildren, pushedDownLimit)
|
|
}
|
|
cop.IdxMergePartPlans = limitChildren
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
sunk = sinkIntoIndexMerge(p, t)
|
|
} else {
|
|
// when there are some limitations, just sink the limit upon the index merge reader.
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
sunk = sinkIntoIndexMerge(p, t)
|
|
}
|
|
} else {
|
|
// when there are some root conditions, just sink the limit upon the index merge reader.
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
sunk = sinkIntoIndexMerge(p, t)
|
|
}
|
|
} else if cop.IdxMergeIsIntersection {
|
|
// In the index merge with intersection case, only the limit can be pushed down to the index merge table side.
|
|
// Note Difference:
|
|
// IndexMerge.PushedLimit is applied before table scan fetching, limiting the indexPartialPlan rows returned (it maybe ordered if orderBy items not empty)
|
|
// TableProbeSide sink limit is applied on the top of table plan, which will quickly shut down the both fetch-back and read-back process.
|
|
if len(cop.RootTaskConds) == 0 {
|
|
if cop.IndexPlanFinished {
|
|
// indicates the table side is not a pure table-scan, so we could only append the limit upon the table plan.
|
|
suspendLimitAboveTablePlan()
|
|
} else {
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
sunk = sinkIntoIndexMerge(p, t)
|
|
}
|
|
} else {
|
|
// Otherwise, suspend the limit out of index merge reader.
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
sunk = sinkIntoIndexMerge(p, t)
|
|
}
|
|
} else {
|
|
// Whatever the remained case is, we directly convert to it to root task.
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
}
|
|
} else if mpp, ok := t.(*physicalop.MppTask); ok {
|
|
newCount := p.Offset + p.Count
|
|
childProfile := mpp.Plan().StatsInfo()
|
|
stats := property.DeriveLimitStats(childProfile, float64(newCount))
|
|
pushedDownLimit := physicalop.PhysicalLimit{Count: newCount, PartitionBy: newPartitionBy}.Init(p.SCtx(), stats, p.QueryBlockOffset())
|
|
mpp = attachPlan2Task(pushedDownLimit, mpp).(*physicalop.MppTask)
|
|
pushedDownLimit.SetSchema(pushedDownLimit.Children()[0].Schema())
|
|
t = mpp.ConvertToRootTask(p.SCtx())
|
|
}
|
|
if sunk {
|
|
return t
|
|
}
|
|
// Skip limit with partition on the root. This is a derived topN and window function
|
|
// will take care of the filter.
|
|
if len(p.GetPartitionBy()) > 0 {
|
|
return t
|
|
}
|
|
return attachPlan2Task(p, t)
|
|
}
|
|
|
|
func sinkIntoIndexLookUp(p *physicalop.PhysicalLimit, t base.Task) bool {
|
|
root := t.(*physicalop.RootTask)
|
|
reader, isDoubleRead := root.GetPlan().(*physicalop.PhysicalIndexLookUpReader)
|
|
proj, isProj := root.GetPlan().(*physicalop.PhysicalProjection)
|
|
if !isDoubleRead && !isProj {
|
|
return false
|
|
}
|
|
if isProj {
|
|
reader, isDoubleRead = proj.Children()[0].(*physicalop.PhysicalIndexLookUpReader)
|
|
if !isDoubleRead {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// We can sink Limit into IndexLookUpReader only if tablePlan contains no Selection.
|
|
ts, isTableScan := reader.TablePlan.(*physicalop.PhysicalTableScan)
|
|
if !isTableScan {
|
|
return false
|
|
}
|
|
|
|
// If this happens, some Projection Operator must be inlined into this Limit. (issues/14428)
|
|
// For example, if the original plan is `IndexLookUp(col1, col2) -> Limit(col1, col2) -> Project(col1)`,
|
|
// then after inlining the Project, it will be `IndexLookUp(col1, col2) -> Limit(col1)` here.
|
|
// If the Limit is sunk into the IndexLookUp, the IndexLookUp's schema needs to be updated as well,
|
|
// So we add an extra projection to solve the problem.
|
|
if p.Schema().Len() != reader.Schema().Len() {
|
|
extraProj := physicalop.PhysicalProjection{
|
|
Exprs: expression.Column2Exprs(p.Schema().Columns),
|
|
}.Init(p.SCtx(), p.StatsInfo(), p.QueryBlockOffset(), nil)
|
|
extraProj.SetSchema(p.Schema())
|
|
// If the root.p is already a Projection. We left the optimization for the later Projection Elimination.
|
|
extraProj.SetChildren(root.GetPlan())
|
|
root.SetPlan(extraProj)
|
|
}
|
|
|
|
reader.PushedLimit = &physicalop.PushedDownLimit{
|
|
Offset: p.Offset,
|
|
Count: p.Count,
|
|
}
|
|
if originStats := ts.StatsInfo(); originStats.RowCount >= p.StatsInfo().RowCount {
|
|
// Only reset the table scan stats when its row estimation is larger than the limit count.
|
|
// When indexLookUp push down is enabled, some rows have been looked up in TiKV side,
|
|
// and the rows processed by the TiDB table scan may be less than the limit count.
|
|
ts.SetStats(p.StatsInfo())
|
|
if originStats != nil {
|
|
// keep the original stats version
|
|
ts.StatsInfo().StatsVersion = originStats.StatsVersion
|
|
}
|
|
}
|
|
reader.SetStats(p.StatsInfo())
|
|
if isProj {
|
|
proj.SetStats(p.StatsInfo())
|
|
}
|
|
return true
|
|
}
|
|
|
|
func sinkIntoIndexMerge(p *physicalop.PhysicalLimit, t base.Task) bool {
|
|
root := t.(*physicalop.RootTask)
|
|
imReader, isIm := root.GetPlan().(*physicalop.PhysicalIndexMergeReader)
|
|
proj, isProj := root.GetPlan().(*physicalop.PhysicalProjection)
|
|
if !isIm && !isProj {
|
|
return false
|
|
}
|
|
if isProj {
|
|
imReader, isIm = proj.Children()[0].(*physicalop.PhysicalIndexMergeReader)
|
|
if !isIm {
|
|
return false
|
|
}
|
|
}
|
|
ts, ok := imReader.TablePlan.(*physicalop.PhysicalTableScan)
|
|
if !ok {
|
|
return false
|
|
}
|
|
imReader.PushedLimit = &physicalop.PushedDownLimit{
|
|
Count: p.Count,
|
|
Offset: p.Offset,
|
|
}
|
|
// since ts.statsInfo.rowcount may dramatically smaller than limit.statsInfo.
|
|
// like limit: rowcount=1
|
|
// ts: rowcount=0.0025
|
|
originStats := ts.StatsInfo()
|
|
if originStats != nil {
|
|
// keep the original stats version
|
|
ts.StatsInfo().StatsVersion = originStats.StatsVersion
|
|
if originStats.RowCount < p.StatsInfo().RowCount {
|
|
ts.StatsInfo().RowCount = originStats.RowCount
|
|
}
|
|
}
|
|
needProj := p.Schema().Len() != root.GetPlan().Schema().Len()
|
|
if !needProj {
|
|
for i := range p.Schema().Len() {
|
|
if !p.Schema().Columns[i].EqualColumn(root.GetPlan().Schema().Columns[i]) {
|
|
needProj = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if needProj {
|
|
extraProj := physicalop.PhysicalProjection{
|
|
Exprs: expression.Column2Exprs(p.Schema().Columns),
|
|
}.Init(p.SCtx(), p.StatsInfo(), p.QueryBlockOffset(), nil)
|
|
extraProj.SetSchema(p.Schema())
|
|
// If the root.p is already a Projection. We left the optimization for the later Projection Elimination.
|
|
extraProj.SetChildren(root.GetPlan())
|
|
root.SetPlan(extraProj)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// attach2Task4PhysicalSort is basic logic of Attach2Task which implements PhysicalPlan interface.
|
|
func attach2Task4PhysicalSort(p base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
intest.Assert(p.(*physicalop.PhysicalSort) != nil)
|
|
t := tasks[0].Copy()
|
|
t = attachPlan2Task(p, t)
|
|
return t
|
|
}
|
|
|
|
// attach2Task4NominalSort implements PhysicalPlan interface.
|
|
func attach2Task4NominalSort(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.NominalSort)
|
|
if p.OnlyColumn {
|
|
return tasks[0]
|
|
}
|
|
t := tasks[0].Copy()
|
|
t = attachPlan2Task(p, t)
|
|
return t
|
|
}
|
|
|
|
func getPushedDownTopN(p *physicalop.PhysicalTopN, childPlan base.PhysicalPlan, storeTp kv.StoreType) (topN, newGlobalTopN *physicalop.PhysicalTopN) {
|
|
fixValue := fixcontrol.GetBoolWithDefault(p.SCtx().GetSessionVars().GetOptimizerFixControlMap(), fixcontrol.Fix56318, true)
|
|
// HeavyFunctionOptimize: if TopN's ByItems is a HeavyFunction (currently mainly for Vector Search), we will change
|
|
// the ByItems in order to reuse the function result.
|
|
byItemIndex := make([]int, 0)
|
|
for i, byItem := range p.ByItems {
|
|
if ContainHeavyFunction(byItem.Expr) {
|
|
byItemIndex = append(byItemIndex, i)
|
|
}
|
|
}
|
|
if fixValue && len(byItemIndex) > 0 {
|
|
x, err := p.Clone(p.SCtx())
|
|
if err != nil {
|
|
return nil, nil
|
|
}
|
|
newGlobalTopN = x.(*physicalop.PhysicalTopN)
|
|
// the projecton's construction cannot be create if the AllowProjectionPushDown is disable.
|
|
if storeTp == kv.TiKV && !p.SCtx().GetSessionVars().AllowProjectionPushDown {
|
|
newGlobalTopN = nil
|
|
}
|
|
}
|
|
newByItems := make([]*util.ByItems, 0, len(p.ByItems))
|
|
for _, expr := range p.ByItems {
|
|
newByItems = append(newByItems, expr.Clone())
|
|
}
|
|
newPartitionBy := make([]property.SortItem, 0, len(p.GetPartitionBy()))
|
|
for _, expr := range p.GetPartitionBy() {
|
|
newPartitionBy = append(newPartitionBy, expr.Clone())
|
|
}
|
|
newCount := p.Offset + p.Count
|
|
childProfile := childPlan.StatsInfo()
|
|
// Strictly speaking, for the row count of pushed down TopN, we should multiply newCount with "regionNum",
|
|
// but "regionNum" is unknown since the copTask can be a double read, so we ignore it now.
|
|
stats := property.DeriveLimitStats(childProfile, float64(newCount))
|
|
|
|
// Add a extra physicalProjection to save the distance column, a example like :
|
|
// select id from t order by vec_distance(vec, '[1,2,3]') limit x
|
|
// The Plan will be modified like:
|
|
//
|
|
// Original: DataSource(id, vec) -> TopN(by vec->dis) -> Projection(id)
|
|
// └─Byitem: vec_distance(vec, '[1,2,3]')
|
|
//
|
|
// New: DataSource(id, vec) -> Projection(id, vec->dis) -> TopN(by dis) -> Projection(id)
|
|
// └─Byitem: dis
|
|
//
|
|
// Note that for plan now, TopN has its own schema and does not use the schema of children.
|
|
if newGlobalTopN != nil {
|
|
// create a new PhysicalProjection to calculate the distance columns, and add it into plan route
|
|
bottomProjSchemaCols := make([]*expression.Column, 0, len(childPlan.Schema().Columns))
|
|
bottomProjExprs := make([]expression.Expression, 0, len(childPlan.Schema().Columns))
|
|
for _, col := range newGlobalTopN.Schema().Columns {
|
|
newCol := col.Clone().(*expression.Column)
|
|
bottomProjSchemaCols = append(bottomProjSchemaCols, newCol)
|
|
bottomProjExprs = append(bottomProjExprs, newCol)
|
|
}
|
|
type DistanceColItem struct {
|
|
Index int
|
|
DistanceCol *expression.Column
|
|
}
|
|
distanceCols := make([]DistanceColItem, 0)
|
|
for _, idx := range byItemIndex {
|
|
bottomProjExprs = append(bottomProjExprs, newGlobalTopN.ByItems[idx].Expr)
|
|
distanceCol := &expression.Column{
|
|
UniqueID: newGlobalTopN.SCtx().GetSessionVars().AllocPlanColumnID(),
|
|
RetType: newGlobalTopN.ByItems[idx].Expr.GetType(p.SCtx().GetExprCtx().GetEvalCtx()),
|
|
}
|
|
distanceCols = append(distanceCols, DistanceColItem{
|
|
Index: idx,
|
|
DistanceCol: distanceCol,
|
|
})
|
|
}
|
|
for _, dis := range distanceCols {
|
|
bottomProjSchemaCols = append(bottomProjSchemaCols, dis.DistanceCol)
|
|
}
|
|
|
|
bottomProj := physicalop.PhysicalProjection{
|
|
Exprs: bottomProjExprs,
|
|
}.Init(p.SCtx(), stats, p.QueryBlockOffset(), p.GetChildReqProps(0))
|
|
bottomProj.SetSchema(expression.NewSchema(bottomProjSchemaCols...))
|
|
bottomProj.SetChildren(childPlan)
|
|
|
|
topN := physicalop.PhysicalTopN{
|
|
ByItems: newByItems,
|
|
PartitionBy: newPartitionBy,
|
|
Count: newCount,
|
|
}.Init(p.SCtx(), stats, p.QueryBlockOffset(), p.GetChildReqProps(0))
|
|
// mppTask's topN
|
|
for _, item := range distanceCols {
|
|
topN.ByItems[item.Index].Expr = item.DistanceCol
|
|
}
|
|
|
|
// rootTask's topn, need reuse the distance col
|
|
for _, expr := range distanceCols {
|
|
newGlobalTopN.ByItems[expr.Index].Expr = expr.DistanceCol
|
|
}
|
|
topN.SetChildren(bottomProj)
|
|
|
|
// orderByCol is the column `distanceCol`, so this explain always success.
|
|
orderByCol, _ := topN.ByItems[0].Expr.(*expression.Column)
|
|
orderByCol.Index = len(bottomProj.Exprs) - 1
|
|
|
|
// try to Check and modify plan when it is possible to not scanning vector column at all.
|
|
tryReturnDistanceFromIndex(topN, newGlobalTopN, childPlan, bottomProj)
|
|
|
|
return topN, newGlobalTopN
|
|
}
|
|
|
|
topN = physicalop.PhysicalTopN{
|
|
ByItems: newByItems,
|
|
PartitionBy: newPartitionBy,
|
|
Count: newCount,
|
|
}.Init(p.SCtx(), stats, p.QueryBlockOffset(), p.GetChildReqProps(0))
|
|
topN.SetChildren(childPlan)
|
|
return topN, newGlobalTopN
|
|
}
|
|
|
|
// tryReturnDistanceFromIndex checks whether the vector in the plan can be removed and a distance column will be added.
|
|
// Consider this situation sql statement: select id from t order by vec_distance(vec, '[1,2,3]') limit x
|
|
// The plan like:
|
|
//
|
|
// DataSource(id, vec) -> Projection1(id, vec->dis) -> TopN(by dis) -> Projection2(id)
|
|
// └─Schema: id, vec
|
|
//
|
|
// In vector index, the distance result already exists, so there is no need to calculate it again in projection1.
|
|
// We can directly read the distance result. After this Optimization, the plan will be modified to:
|
|
//
|
|
// DataSource(id, dis) -> TopN(by dis) -> Projection2(id)
|
|
// └─Schema: id, dis
|
|
func tryReturnDistanceFromIndex(local, global *physicalop.PhysicalTopN, childPlan base.PhysicalPlan, proj *physicalop.PhysicalProjection) bool {
|
|
tableScan, ok := childPlan.(*physicalop.PhysicalTableScan)
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
orderByCol, _ := local.ByItems[0].Expr.(*expression.Column)
|
|
var annQueryInfo *physicalop.ColumnarIndexExtra
|
|
for _, idx := range tableScan.UsedColumnarIndexes {
|
|
if idx != nil && idx.QueryInfo.IndexType == tipb.ColumnarIndexType_TypeVector && idx.QueryInfo != nil {
|
|
annQueryInfo = idx
|
|
break
|
|
}
|
|
}
|
|
if annQueryInfo == nil {
|
|
return false
|
|
}
|
|
|
|
// If the vector column is only used in the VectorSearch and no where
|
|
// else, then it can be eliminated in TableScan.
|
|
if orderByCol.Index < 0 || orderByCol.Index >= len(proj.Exprs) {
|
|
return false
|
|
}
|
|
|
|
isVecColumnInUse := false
|
|
for idx, projExpr := range proj.Exprs {
|
|
if idx == orderByCol.Index {
|
|
// Skip the distance function projection itself.
|
|
continue
|
|
}
|
|
flag := expression.HasColumnWithCondition(projExpr, func(col *expression.Column) bool {
|
|
return col.ID == annQueryInfo.QueryInfo.GetAnnQueryInfo().GetColumn().ColumnId
|
|
})
|
|
if flag {
|
|
isVecColumnInUse = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if isVecColumnInUse {
|
|
return false
|
|
}
|
|
|
|
// append distance column to the table scan
|
|
virtualDistanceColInfo := &model.ColumnInfo{
|
|
ID: model.VirtualColVecSearchDistanceID,
|
|
FieldType: *types.NewFieldType(mysql.TypeFloat),
|
|
Offset: len(tableScan.Columns) - 1,
|
|
}
|
|
|
|
virtualDistanceCol := &expression.Column{
|
|
UniqueID: tableScan.SCtx().GetSessionVars().AllocPlanColumnID(),
|
|
RetType: types.NewFieldType(mysql.TypeFloat),
|
|
}
|
|
|
|
// remove the vector column in order to read distance directly by virtualDistanceCol
|
|
vectorIdx := -1
|
|
for i, col := range tableScan.Columns {
|
|
if col.ID == annQueryInfo.QueryInfo.GetAnnQueryInfo().GetColumn().ColumnId {
|
|
vectorIdx = i
|
|
break
|
|
}
|
|
}
|
|
if vectorIdx == -1 {
|
|
return false
|
|
}
|
|
|
|
// set the EnableDistanceProj to modify the read process of tiflash.
|
|
annQueryInfo.QueryInfo.GetAnnQueryInfo().EnableDistanceProj = true
|
|
|
|
// append the distance column to the last position in columns and schema.
|
|
tableScan.Columns = slices.Delete(tableScan.Columns, vectorIdx, vectorIdx+1)
|
|
tableScan.Columns = append(tableScan.Columns, virtualDistanceColInfo)
|
|
|
|
tableScan.Schema().Columns = slices.Delete(tableScan.Schema().Columns, vectorIdx, vectorIdx+1)
|
|
tableScan.Schema().Append(virtualDistanceCol)
|
|
|
|
// The children of topN are currently projections. After optimization, we no longer
|
|
// need the projection and directly set the children to tablescan.
|
|
local.SetChildren(tableScan)
|
|
|
|
// modify the topN's ByItem
|
|
local.ByItems[0].Expr = virtualDistanceCol
|
|
global.ByItems[0].Expr = virtualDistanceCol
|
|
local.ByItems[0].Expr.(*expression.Column).Index = tableScan.Schema().Len() - 1
|
|
|
|
return true
|
|
}
|
|
|
|
// ContainHeavyFunction check if the expr contains a function that need to do HeavyFunctionOptimize. Currently this only applies
|
|
// to Vector data types and their functions. The HeavyFunctionOptimize eliminate the usage of the function in TopN operators
|
|
// to avoid vector distance re-calculation of TopN in the root task.
|
|
func ContainHeavyFunction(expr expression.Expression) bool {
|
|
sf, ok := expr.(*expression.ScalarFunction)
|
|
if !ok {
|
|
return false
|
|
}
|
|
if _, ok := HeavyFunctionNameMap[sf.FuncName.L]; ok {
|
|
return true
|
|
}
|
|
return slices.ContainsFunc(sf.GetArgs(), ContainHeavyFunction)
|
|
}
|
|
|
|
// canPushToIndexPlan checks if this TopN can be pushed to the index side of copTask.
|
|
// It can be pushed to the index side when all columns used by ByItems are available from the index side and there's no prefix index column.
|
|
func canPushToIndexPlan(indexPlan base.PhysicalPlan, byItemCols []*expression.Column) bool {
|
|
// If we call canPushToIndexPlan and there's no index plan, we should go into the index merge case.
|
|
// Index merge case is specially handled for now. So we directly return false here.
|
|
// So we directly return false.
|
|
if indexPlan == nil {
|
|
return false
|
|
}
|
|
schema := indexPlan.Schema()
|
|
for _, col := range byItemCols {
|
|
pos := schema.ColumnIndex(col)
|
|
if pos == -1 {
|
|
return false
|
|
}
|
|
if schema.Columns[pos].IsPrefix {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// canExpressionConvertedToPB checks whether each of the the expression in TopN can be converted to pb.
|
|
func canExpressionConvertedToPB(p *physicalop.PhysicalTopN, storeTp kv.StoreType) bool {
|
|
exprs := make([]expression.Expression, 0, len(p.ByItems))
|
|
for _, item := range p.ByItems {
|
|
exprs = append(exprs, item.Expr)
|
|
}
|
|
return expression.CanExprsPushDown(util.GetPushDownCtx(p.SCtx()), exprs, storeTp)
|
|
}
|
|
|
|
// containVirtualColumn checks whether TopN.ByItems contains virtual generated columns.
|
|
func containVirtualColumn(p *physicalop.PhysicalTopN, tCols []*expression.Column) bool {
|
|
tColSet := make(map[int64]struct{}, len(tCols))
|
|
for _, tCol := range tCols {
|
|
if tCol.ID > 0 && tCol.VirtualExpr != nil {
|
|
tColSet[tCol.ID] = struct{}{}
|
|
}
|
|
}
|
|
for _, by := range p.ByItems {
|
|
cols := expression.ExtractColumns(by.Expr)
|
|
for _, col := range cols {
|
|
if _, ok := tColSet[col.ID]; ok {
|
|
// A column with ID > 0 indicates that the column can be resolved by data source.
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// canPushDownToTiKV checks whether this topN can be pushed down to TiKV.
|
|
func canPushDownToTiKV(p *physicalop.PhysicalTopN, copTask *physicalop.CopTask) bool {
|
|
if !canExpressionConvertedToPB(p, kv.TiKV) {
|
|
return false
|
|
}
|
|
if len(copTask.RootTaskConds) != 0 {
|
|
return false
|
|
}
|
|
if !copTask.IndexPlanFinished && len(copTask.IdxMergePartPlans) > 0 {
|
|
for _, partialPlan := range copTask.IdxMergePartPlans {
|
|
if containVirtualColumn(p, partialPlan.Schema().Columns) {
|
|
return false
|
|
}
|
|
}
|
|
} else if containVirtualColumn(p, copTask.Plan().Schema().Columns) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// canPushDownToTiFlash checks whether this topN can be pushed down to TiFlash.
|
|
func canPushDownToTiFlash(p *physicalop.PhysicalTopN, mppTask *physicalop.MppTask) bool {
|
|
if !canExpressionConvertedToPB(p, kv.TiFlash) {
|
|
return false
|
|
}
|
|
if containVirtualColumn(p, mppTask.Plan().Schema().Columns) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// For https://github.com/pingcap/tidb/issues/51723,
|
|
// This function only supports `CLUSTER_SLOW_QUERY`,
|
|
// it will change plan from
|
|
// TopN -> TableReader -> TableFullScan[cop] to
|
|
// TopN -> TableReader -> Limit[cop] -> TableFullScan[cop] + keepOrder
|
|
func pushLimitDownToTiDBCop(p *physicalop.PhysicalTopN, copTsk *physicalop.CopTask) (base.Task, bool) {
|
|
if copTsk.IndexPlan != nil || copTsk.TablePlan == nil {
|
|
return nil, false
|
|
}
|
|
|
|
var (
|
|
selOnTblScan *physicalop.PhysicalSelection
|
|
selSelectivity float64
|
|
tblScan *physicalop.PhysicalTableScan
|
|
err error
|
|
ok bool
|
|
)
|
|
|
|
copTsk.TablePlan, err = copTsk.TablePlan.Clone(p.SCtx())
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
finalTblScanPlan := copTsk.TablePlan
|
|
for len(finalTblScanPlan.Children()) > 0 {
|
|
selOnTblScan, _ = finalTblScanPlan.(*physicalop.PhysicalSelection)
|
|
finalTblScanPlan = finalTblScanPlan.Children()[0]
|
|
}
|
|
|
|
if tblScan, ok = finalTblScanPlan.(*physicalop.PhysicalTableScan); !ok {
|
|
return nil, false
|
|
}
|
|
|
|
// Check the table is `CLUSTER_SLOW_QUERY` or not.
|
|
if tblScan.Table.Name.O != infoschema.ClusterTableSlowLog {
|
|
return nil, false
|
|
}
|
|
|
|
colsProp, ok := physicalop.GetPropByOrderByItems(p.ByItems)
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
if len(colsProp.SortItems) != 1 || !colsProp.SortItems[0].Col.Equal(p.SCtx().GetExprCtx().GetEvalCtx(), tblScan.HandleCols.GetCol(0)) {
|
|
return nil, false
|
|
}
|
|
if selOnTblScan != nil && tblScan.StatsInfo().RowCount > 0 {
|
|
selSelectivity = selOnTblScan.StatsInfo().RowCount / tblScan.StatsInfo().RowCount
|
|
}
|
|
tblScan.Desc = colsProp.SortItems[0].Desc
|
|
tblScan.KeepOrder = true
|
|
|
|
childProfile := copTsk.Plan().StatsInfo()
|
|
newCount := p.Offset + p.Count
|
|
stats := property.DeriveLimitStats(childProfile, float64(newCount))
|
|
pushedLimit := physicalop.PhysicalLimit{
|
|
Count: newCount,
|
|
}.Init(p.SCtx(), stats, p.QueryBlockOffset())
|
|
pushedLimit.SetSchema(copTsk.TablePlan.Schema())
|
|
copTsk = attachPlan2Task(pushedLimit, copTsk).(*physicalop.CopTask)
|
|
child := pushedLimit.Children()[0]
|
|
child.SetStats(child.StatsInfo().ScaleByExpectCnt(p.SCtx().GetSessionVars(), float64(newCount)))
|
|
if selSelectivity > 0 && selSelectivity < 1 {
|
|
scaledRowCount := child.StatsInfo().RowCount / selSelectivity
|
|
tblScan.SetStats(tblScan.StatsInfo().ScaleByExpectCnt(p.SCtx().GetSessionVars(), scaledRowCount))
|
|
}
|
|
rootTask := copTsk.ConvertToRootTask(p.SCtx())
|
|
return attachPlan2Task(p, rootTask), true
|
|
}
|
|
|
|
// Attach2Task implements the PhysicalPlan interface.
|
|
func attach2Task4PhysicalTopN(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalTopN)
|
|
t := tasks[0].Copy()
|
|
cols := make([]*expression.Column, 0, len(p.ByItems))
|
|
for _, item := range p.ByItems {
|
|
cols = append(cols, expression.ExtractColumns(item.Expr)...)
|
|
}
|
|
needPushDown := len(cols) > 0
|
|
if copTask, ok := t.(*physicalop.CopTask); ok && needPushDown && copTask.GetStoreType() == kv.TiDB && len(copTask.RootTaskConds) == 0 {
|
|
newTask, changed := pushLimitDownToTiDBCop(p, copTask)
|
|
if changed {
|
|
return newTask
|
|
}
|
|
}
|
|
if copTask, ok := t.(*physicalop.CopTask); ok && needPushDown && canPushDownToTiKV(p, copTask) && len(copTask.RootTaskConds) == 0 {
|
|
// If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and
|
|
// push it to table plan.
|
|
var pushedDownTopN *physicalop.PhysicalTopN
|
|
var newGlobalTopN *physicalop.PhysicalTopN
|
|
if !copTask.IndexPlanFinished && canPushToIndexPlan(copTask.IndexPlan, cols) {
|
|
pushedDownTopN, newGlobalTopN = getPushedDownTopN(p, copTask.IndexPlan, copTask.GetStoreType())
|
|
copTask.IndexPlan = pushedDownTopN
|
|
if newGlobalTopN != nil {
|
|
rootTask := t.ConvertToRootTask(newGlobalTopN.SCtx())
|
|
// Skip TopN with partition on the root. This is a derived topN and window function
|
|
// will take care of the filter.
|
|
if len(p.GetPartitionBy()) > 0 {
|
|
return t
|
|
}
|
|
return attachPlan2Task(newGlobalTopN, rootTask)
|
|
}
|
|
} else {
|
|
// It works for both normal index scan and index merge scan.
|
|
copTask.FinishIndexPlan()
|
|
pushedDownTopN, newGlobalTopN = getPushedDownTopN(p, copTask.TablePlan, copTask.GetStoreType())
|
|
copTask.TablePlan = pushedDownTopN
|
|
if newGlobalTopN != nil {
|
|
rootTask := t.ConvertToRootTask(newGlobalTopN.SCtx())
|
|
// Skip TopN with partition on the root. This is a derived topN and window function
|
|
// will take care of the filter.
|
|
if len(p.GetPartitionBy()) > 0 {
|
|
return t
|
|
}
|
|
return attachPlan2Task(newGlobalTopN, rootTask)
|
|
}
|
|
}
|
|
} else if mppTask, ok := t.(*physicalop.MppTask); ok && needPushDown && canPushDownToTiFlash(p, mppTask) {
|
|
pushedDownTopN, newGlobalTopN := getPushedDownTopN(p, mppTask.Plan(), kv.TiFlash)
|
|
mppTask.SetPlan(pushedDownTopN)
|
|
if newGlobalTopN != nil {
|
|
rootTask := t.ConvertToRootTask(newGlobalTopN.SCtx())
|
|
// Skip TopN with partition on the root. This is a derived topN and window function
|
|
// will take care of the filter.
|
|
if len(p.GetPartitionBy()) > 0 {
|
|
return t
|
|
}
|
|
return attachPlan2Task(newGlobalTopN, rootTask)
|
|
}
|
|
}
|
|
rootTask := t.ConvertToRootTask(p.SCtx())
|
|
// Skip TopN with partition on the root. This is a derived topN and window function
|
|
// will take care of the filter.
|
|
if len(p.GetPartitionBy()) > 0 {
|
|
return t
|
|
}
|
|
return attachPlan2Task(p, rootTask)
|
|
}
|
|
|
|
// attach2Task4PhysicalProjection implements PhysicalPlan interface.
|
|
func attach2Task4PhysicalProjection(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalProjection)
|
|
t := tasks[0].Copy()
|
|
if cop, ok := t.(*physicalop.CopTask); ok {
|
|
if (len(cop.RootTaskConds) == 0 && len(cop.IdxMergePartPlans) == 0) && expression.CanExprsPushDown(util.GetPushDownCtx(p.SCtx()), p.Exprs, cop.GetStoreType()) {
|
|
copTask := attachPlan2Task(p, cop)
|
|
return copTask
|
|
}
|
|
} else if mpp, ok := t.(*physicalop.MppTask); ok {
|
|
if expression.CanExprsPushDown(util.GetPushDownCtx(p.SCtx()), p.Exprs, kv.TiFlash) {
|
|
p.SetChildren(mpp.Plan())
|
|
mpp.SetPlan(p)
|
|
return mpp
|
|
}
|
|
}
|
|
t = t.ConvertToRootTask(p.SCtx())
|
|
t = attachPlan2Task(p, t)
|
|
return t
|
|
}
|
|
|
|
// attach2Task4PhysicalExpand implements PhysicalPlan interface.
|
|
func attach2Task4PhysicalExpand(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalExpand)
|
|
t := tasks[0].Copy()
|
|
// current expand can only be run in MPP TiFlash mode or Root Tidb mode.
|
|
// if expr inside could not be pushed down to tiFlash, it will error in converting to pb side.
|
|
if mpp, ok := t.(*physicalop.MppTask); ok {
|
|
p.SetChildren(mpp.Plan())
|
|
mpp.SetPlan(p)
|
|
return mpp
|
|
}
|
|
// For root task
|
|
// since expand should be in root side accordingly, convert to root task now.
|
|
root := t.ConvertToRootTask(p.SCtx())
|
|
t = attachPlan2Task(p, root)
|
|
return t
|
|
}
|
|
|
|
func attach2MppTasks4PhysicalUnionAll(p *physicalop.PhysicalUnionAll, tasks ...base.Task) base.Task {
|
|
t := physicalop.NewMppTask(p, property.AnyType, nil, nil, nil)
|
|
childPlans := make([]base.PhysicalPlan, 0, len(tasks))
|
|
for _, tk := range tasks {
|
|
if mpp, ok := tk.(*physicalop.MppTask); ok && !tk.Invalid() {
|
|
childPlans = append(childPlans, mpp.Plan())
|
|
continue
|
|
}
|
|
return base.InvalidTask
|
|
}
|
|
if len(childPlans) == 0 {
|
|
return base.InvalidTask
|
|
}
|
|
p.SetChildren(childPlans...)
|
|
return t
|
|
}
|
|
|
|
// attach2Task4PhysicalUnionAll implements PhysicalPlan interface logic.
|
|
func attach2Task4PhysicalUnionAll(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalUnionAll)
|
|
for _, t := range tasks {
|
|
if _, ok := t.(*physicalop.MppTask); ok {
|
|
if p.TP() == plancodec.TypePartitionUnion {
|
|
// In attach2MppTasks(), will attach PhysicalUnion to mppTask directly.
|
|
// But PartitionUnion cannot pushdown to tiflash, so here disable PartitionUnion pushdown to tiflash explicitly.
|
|
// For now, return base.InvalidTask immediately, we can refine this by letting childTask of PartitionUnion convert to rootTask.
|
|
return base.InvalidTask
|
|
}
|
|
return attach2MppTasks4PhysicalUnionAll(p, tasks...)
|
|
}
|
|
}
|
|
t := &physicalop.RootTask{}
|
|
t.SetPlan(p)
|
|
childPlans := make([]base.PhysicalPlan, 0, len(tasks))
|
|
for _, task := range tasks {
|
|
task = task.ConvertToRootTask(p.SCtx())
|
|
childPlans = append(childPlans, task.Plan())
|
|
}
|
|
p.SetChildren(childPlans...)
|
|
return t
|
|
}
|
|
|
|
// attach2Task4PhysicalSelection implements PhysicalPlan interface.
|
|
func attach2Task4PhysicalSelection(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
sel := pp.(*physicalop.PhysicalSelection)
|
|
if mppTask, _ := tasks[0].(*physicalop.MppTask); mppTask != nil { // always push to mpp task.
|
|
if expression.CanExprsPushDown(util.GetPushDownCtx(sel.SCtx()), sel.Conditions, kv.TiFlash) {
|
|
return attachPlan2Task(sel, mppTask.Copy())
|
|
}
|
|
}
|
|
t := tasks[0].ConvertToRootTask(sel.SCtx())
|
|
return attachPlan2Task(sel, t)
|
|
}
|
|
|
|
func inheritStatsFromBottomElemForIndexJoinInner(p base.PhysicalPlan, indexJoinInfo *physicalop.IndexJoinInfo, stats *property.StatsInfo) {
|
|
var isIndexJoin bool
|
|
switch p.(type) {
|
|
case *physicalop.PhysicalIndexJoin, *physicalop.PhysicalIndexHashJoin, *physicalop.PhysicalIndexMergeJoin:
|
|
isIndexJoin = true
|
|
default:
|
|
}
|
|
// indexJoinInfo != nil means the child Task comes from an index join inner side.
|
|
// !isIndexJoin means the childTask only be passed through to indexJoin as an END.
|
|
if !isIndexJoin && indexJoinInfo != nil {
|
|
switch p.(type) {
|
|
case *physicalop.PhysicalSelection:
|
|
// todo: for simplicity, we can just inherit it from child.
|
|
// scale(1) means a cloned stats information same as the input stats.
|
|
p.SetStats(stats.Scale(p.SCtx().GetSessionVars(), 1))
|
|
case *physicalop.PhysicalProjection:
|
|
// mainly about the rowEst, proj doesn't change that.
|
|
p.SetStats(stats.Scale(p.SCtx().GetSessionVars(), 1))
|
|
case *physicalop.PhysicalHashAgg, *physicalop.PhysicalStreamAgg:
|
|
// todo: for simplicity, we can just inherit it from child.
|
|
p.SetStats(stats.Scale(p.SCtx().GetSessionVars(), 1))
|
|
case *physicalop.PhysicalUnionScan:
|
|
// todo: for simplicity, we can just inherit it from child.
|
|
p.SetStats(stats.Scale(p.SCtx().GetSessionVars(), 1))
|
|
default:
|
|
p.SetStats(stats.Scale(p.SCtx().GetSessionVars(), 1))
|
|
}
|
|
}
|
|
}
|
|
|
|
func inheritStatsFromBottomTaskForIndexJoinInner(p base.PhysicalPlan, t base.Task) {
|
|
var indexJoinInfo *physicalop.IndexJoinInfo
|
|
switch v := t.(type) {
|
|
case *physicalop.CopTask:
|
|
indexJoinInfo = v.IndexJoinInfo
|
|
case *physicalop.RootTask:
|
|
indexJoinInfo = v.IndexJoinInfo
|
|
default:
|
|
// index join's inner side couldn't be a mppTask, leave it.
|
|
}
|
|
inheritStatsFromBottomElemForIndexJoinInner(p, indexJoinInfo, t.Plan().StatsInfo())
|
|
}
|
|
|
|
// attach2Task4PhysicalStreamAgg implements PhysicalPlan interface.
|
|
func attach2Task4PhysicalStreamAgg(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalStreamAgg)
|
|
t := tasks[0].Copy()
|
|
if cop, ok := t.(*physicalop.CopTask); ok {
|
|
// We should not push agg down across
|
|
// 1. double read, since the data of second read is ordered by handle instead of index. The `extraHandleCol` is added
|
|
// if the double read needs to keep order. So we just use it to decided
|
|
// whether the following plan is double read with order reserved.
|
|
// 2. the case that there's filters should be calculated on TiDB side.
|
|
// 3. the case of index merge
|
|
if (cop.IndexPlan != nil && cop.TablePlan != nil && cop.KeepOrder) || len(cop.RootTaskConds) > 0 || len(cop.IdxMergePartPlans) > 0 {
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
attachPlan2Task(p, t)
|
|
} else {
|
|
storeType := cop.GetStoreType()
|
|
// TiFlash doesn't support Stream Aggregation
|
|
if storeType == kv.TiFlash && len(p.GroupByItems) > 0 {
|
|
return base.InvalidTask
|
|
}
|
|
partialAgg, finalAgg := p.NewPartialAggregate(storeType, false)
|
|
if partialAgg != nil {
|
|
if cop.TablePlan != nil {
|
|
cop.FinishIndexPlan()
|
|
// the partialAgg attachment didn't follow the attachPlan2Task function, so here we actively call
|
|
// inheritStatsFromBottomForIndexJoinInner(p, t) to inherit stats from the bottom plan for index
|
|
// join inner side. note: partialAgg will share stats with finalAgg.
|
|
inheritStatsFromBottomElemForIndexJoinInner(partialAgg, cop.IndexJoinInfo, cop.TablePlan.StatsInfo())
|
|
partialAgg.SetChildren(cop.TablePlan)
|
|
cop.TablePlan = partialAgg
|
|
// If needExtraProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
|
|
// the schema is the same as the original DataSource schema.
|
|
// However, we pushed down the agg here, the partial agg was placed on the top of tablePlan, and the final
|
|
// agg will be placed above the PhysicalIndexLookUpReader, and the schema will be set correctly for them.
|
|
// If we add the projection again, the projection will be between the PhysicalIndexLookUpReader and
|
|
// the partial agg, and the schema will be broken.
|
|
cop.NeedExtraProj = false
|
|
} else {
|
|
// the partialAgg attachment didn't follow the attachPlan2Task function, so here we actively call
|
|
// inheritStatsFromBottomForIndexJoinInner(p, t) to inherit stats from the bottom plan for index
|
|
// join inner side. note: partialAgg will share stats with finalAgg.
|
|
inheritStatsFromBottomElemForIndexJoinInner(partialAgg, cop.IndexJoinInfo, cop.IndexPlan.StatsInfo())
|
|
partialAgg.SetChildren(cop.IndexPlan)
|
|
cop.IndexPlan = partialAgg
|
|
}
|
|
}
|
|
// COP Task -> Root Task, warnings inherited inside.
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
attachPlan2Task(finalAgg, t)
|
|
}
|
|
} else if mpp, ok := t.(*physicalop.MppTask); ok {
|
|
t = mpp.ConvertToRootTask(p.SCtx())
|
|
attachPlan2Task(p, t)
|
|
} else {
|
|
attachPlan2Task(p, t)
|
|
}
|
|
return t
|
|
}
|
|
|
|
func attach2TaskForMpp1Phase(p *physicalop.PhysicalHashAgg, mpp *physicalop.MppTask) base.Task {
|
|
// 1-phase agg: when the partition columns can be satisfied, where the plan does not need to enforce Exchange
|
|
// only push down the original agg
|
|
proj := p.ConvertAvgForMPP()
|
|
attachPlan2Task(p.Self, mpp)
|
|
if proj != nil {
|
|
attachPlan2Task(proj, mpp)
|
|
}
|
|
return mpp
|
|
}
|
|
|
|
// scaleStats4GroupingSets scale the derived stats because the lower source has been expanded.
|
|
//
|
|
// parent OP <- logicalAgg <- children OP (derived stats)
|
|
// |
|
|
// v
|
|
// parent OP <- physicalAgg <- children OP (stats used)
|
|
// |
|
|
// +----------+----------+----------+
|
|
// Final Mid Partial Expand
|
|
//
|
|
// physical agg stats is reasonable from the whole, because expand operator is designed to facilitate
|
|
// the Mid and Partial Agg, which means when leaving the Final, its output rowcount could be exactly
|
|
// the same as what it derived(estimated) before entering physical optimization phase.
|
|
//
|
|
// From the cost model correctness, for these inserted sub-agg and even expand operator, we should
|
|
// recompute the stats for them particularly.
|
|
//
|
|
// for example: grouping sets {<a>},{<b>}, group by items {a,b,c,groupingID}
|
|
// after expand:
|
|
//
|
|
// a, b, c, groupingID
|
|
// ... null c 1 ---+
|
|
// ... null c 1 +------- replica group 1
|
|
// ... null c 1 ---+
|
|
// null ... c 2 ---+
|
|
// null ... c 2 +------- replica group 2
|
|
// null ... c 2 ---+
|
|
//
|
|
// since null value is seen the same when grouping data (groupingID in one replica is always the same):
|
|
// - so the num of group in replica 1 is equal to NDV(a,c)
|
|
// - so the num of group in replica 2 is equal to NDV(b,c)
|
|
//
|
|
// in a summary, the total num of group of all replica is equal to = Σ:NDV(each-grouping-set-cols, normal-group-cols)
|
|
func scaleStats4GroupingSets(p *physicalop.PhysicalHashAgg, groupingSets expression.GroupingSets, groupingIDCol *expression.Column,
|
|
childSchema *expression.Schema, childStats *property.StatsInfo) {
|
|
idSets := groupingSets.AllSetsColIDs()
|
|
normalGbyCols := make([]*expression.Column, 0, len(p.GroupByItems))
|
|
for _, gbyExpr := range p.GroupByItems {
|
|
cols := expression.ExtractColumns(gbyExpr)
|
|
for _, col := range cols {
|
|
if !idSets.Has(int(col.UniqueID)) && col.UniqueID != groupingIDCol.UniqueID {
|
|
normalGbyCols = append(normalGbyCols, col)
|
|
}
|
|
}
|
|
}
|
|
sumNDV := float64(0)
|
|
groupingSetCols := make([]*expression.Column, 0, 4)
|
|
for _, groupingSet := range groupingSets {
|
|
// for every grouping set, pick its cols out, and combine with normal group cols to get the ndv.
|
|
groupingSetCols = groupingSet.ExtractCols(groupingSetCols)
|
|
groupingSetCols = append(groupingSetCols, normalGbyCols...)
|
|
ndv, _ := cardinality.EstimateColsNDVWithMatchedLen(p.SCtx(), groupingSetCols, childSchema, childStats)
|
|
groupingSetCols = groupingSetCols[:0]
|
|
sumNDV += ndv
|
|
}
|
|
// After group operator, all same rows are grouped into one row, that means all
|
|
// change the sub-agg's stats
|
|
if p.StatsInfo() != nil {
|
|
// equivalence to a new cloned one. (cause finalAgg and partialAgg may share a same copy of stats)
|
|
cpStats := p.StatsInfo().Scale(p.SCtx().GetSessionVars(), 1)
|
|
cpStats.RowCount = sumNDV
|
|
// We cannot estimate the ColNDVs for every output, so we use a conservative strategy.
|
|
for k := range cpStats.ColNDVs {
|
|
cpStats.ColNDVs[k] = sumNDV
|
|
}
|
|
// for old groupNDV, if it's containing one more grouping set cols, just plus the NDV where the col is excluded.
|
|
// for example: old grouping NDV(b,c), where b is in grouping sets {<a>},{<b>}. so when countering the new NDV:
|
|
// cases:
|
|
// new grouping NDV(b,c) := old NDV(b,c) + NDV(null, c) = old NDV(b,c) + DNV(c).
|
|
// new grouping NDV(a,b,c) := old NDV(a,b,c) + NDV(null,b,c) + NDV(a,null,c) = old NDV(a,b,c) + NDV(b,c) + NDV(a,c)
|
|
allGroupingSetsIDs := groupingSets.AllSetsColIDs()
|
|
for _, oneGNDV := range cpStats.GroupNDVs {
|
|
newGNDV := oneGNDV.NDV
|
|
intersectionIDs := make([]int64, 0, len(oneGNDV.Cols))
|
|
for i, id := range oneGNDV.Cols {
|
|
if allGroupingSetsIDs.Has(int(id)) {
|
|
// when meet an id in grouping sets, skip it (cause its null) and append the rest ids to count the incrementNDV.
|
|
beforeLen := len(intersectionIDs)
|
|
intersectionIDs = append(intersectionIDs, oneGNDV.Cols[i:]...)
|
|
incrementNDV, _ := cardinality.EstimateColsDNVWithMatchedLenFromUniqueIDs(
|
|
p.SCtx(), intersectionIDs, childSchema, childStats)
|
|
newGNDV += incrementNDV
|
|
// restore the before intersectionIDs slice.
|
|
intersectionIDs = intersectionIDs[:beforeLen]
|
|
}
|
|
// insert ids one by one.
|
|
intersectionIDs = append(intersectionIDs, id)
|
|
}
|
|
oneGNDV.NDV = newGNDV
|
|
}
|
|
p.SetStats(cpStats)
|
|
}
|
|
}
|
|
|
|
// adjust3StagePhaseAgg generate 3 stage aggregation for single/multi count distinct if applicable.
|
|
//
|
|
// select count(distinct a), count(b) from foo
|
|
//
|
|
// will generate plan:
|
|
//
|
|
// HashAgg sum(#1), sum(#2) -> final agg
|
|
// +- Exchange Passthrough
|
|
// +- HashAgg count(distinct a) #1, sum(#3) #2 -> middle agg
|
|
// +- Exchange HashPartition by a
|
|
// +- HashAgg count(b) #3, group by a -> partial agg
|
|
// +- TableScan foo
|
|
//
|
|
// select count(distinct a), count(distinct b), count(c) from foo
|
|
//
|
|
// will generate plan:
|
|
//
|
|
// HashAgg sum(#1), sum(#2), sum(#3) -> final agg
|
|
// +- Exchange Passthrough
|
|
// +- HashAgg count(distinct a) #1, count(distinct b) #2, sum(#4) #3 -> middle agg
|
|
// +- Exchange HashPartition by a,b,groupingID
|
|
// +- HashAgg count(c) #4, group by a,b,groupingID -> partial agg
|
|
// +- Expand {<a>}, {<b>} -> expand
|
|
// +- TableScan foo
|
|
func adjust3StagePhaseAgg(p *physicalop.PhysicalHashAgg, partialAgg, finalAgg base.PhysicalPlan, canUse3StageAgg bool,
|
|
groupingSets expression.GroupingSets, mpp *physicalop.MppTask) (final, mid, part, proj4Part base.PhysicalPlan, _ error) {
|
|
ectx := p.SCtx().GetExprCtx().GetEvalCtx()
|
|
|
|
if !(partialAgg != nil && canUse3StageAgg) {
|
|
// quick path: return the original finalAgg and partiAgg.
|
|
return finalAgg, nil, partialAgg, nil, nil
|
|
}
|
|
if len(groupingSets) == 0 {
|
|
// single distinct agg mode.
|
|
clonedAgg, err := finalAgg.Clone(p.SCtx())
|
|
if err != nil {
|
|
return nil, nil, nil, nil, err
|
|
}
|
|
|
|
// step1: adjust middle agg.
|
|
middleHashAgg := clonedAgg.(*physicalop.PhysicalHashAgg)
|
|
distinctPos := 0
|
|
middleSchema := expression.NewSchema()
|
|
schemaMap := make(map[int64]*expression.Column, len(middleHashAgg.AggFuncs))
|
|
for i, fun := range middleHashAgg.AggFuncs {
|
|
col := &expression.Column{
|
|
UniqueID: p.SCtx().GetSessionVars().AllocPlanColumnID(),
|
|
RetType: fun.RetTp,
|
|
}
|
|
if fun.HasDistinct {
|
|
distinctPos = i
|
|
fun.Mode = aggregation.Partial1Mode
|
|
} else {
|
|
fun.Mode = aggregation.Partial2Mode
|
|
originalCol := fun.Args[0].(*expression.Column)
|
|
// mapping the current partial output column with the agg origin arg column. (final agg arg should use this one)
|
|
schemaMap[originalCol.UniqueID] = col
|
|
}
|
|
middleSchema.Append(col)
|
|
}
|
|
middleHashAgg.SetSchema(middleSchema)
|
|
|
|
// step2: adjust final agg.
|
|
finalHashAgg := finalAgg.(*physicalop.PhysicalHashAgg)
|
|
finalAggDescs := make([]*aggregation.AggFuncDesc, 0, len(finalHashAgg.AggFuncs))
|
|
for i, fun := range finalHashAgg.AggFuncs {
|
|
newArgs := make([]expression.Expression, 0, 1)
|
|
if distinctPos == i {
|
|
// change count(distinct) to sum()
|
|
fun.Name = ast.AggFuncSum
|
|
fun.HasDistinct = false
|
|
newArgs = append(newArgs, middleSchema.Columns[i])
|
|
} else {
|
|
for _, arg := range fun.Args {
|
|
newCol, err := arg.RemapColumn(schemaMap)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, err
|
|
}
|
|
newArgs = append(newArgs, newCol)
|
|
}
|
|
}
|
|
fun.Mode = aggregation.FinalMode
|
|
fun.Args = newArgs
|
|
finalAggDescs = append(finalAggDescs, fun)
|
|
}
|
|
finalHashAgg.AggFuncs = finalAggDescs
|
|
// partialAgg is im-mutated from args.
|
|
return finalHashAgg, middleHashAgg, partialAgg, nil, nil
|
|
}
|
|
// multi distinct agg mode, having grouping sets.
|
|
// set the default expression to constant 1 for the convenience to choose default group set data.
|
|
var groupingIDCol expression.Expression
|
|
// enforce Expand operator above the children.
|
|
// physical plan is enumerated without children from itself, use mpp subtree instead p.children.
|
|
// scale(len(groupingSets)) will change the NDV, while Expand doesn't change the NDV and groupNDV.
|
|
stats := mpp.Plan().StatsInfo().Scale(p.SCtx().GetSessionVars(), float64(1))
|
|
stats.RowCount = stats.RowCount * float64(len(groupingSets))
|
|
physicalExpand := physicalop.PhysicalExpand{
|
|
GroupingSets: groupingSets,
|
|
}.Init(p.SCtx(), stats, mpp.Plan().QueryBlockOffset())
|
|
// generate a new column as groupingID to identify which this row is targeting for.
|
|
tp := types.NewFieldType(mysql.TypeLonglong)
|
|
tp.SetFlag(mysql.UnsignedFlag | mysql.NotNullFlag)
|
|
groupingIDCol = &expression.Column{
|
|
UniqueID: p.SCtx().GetSessionVars().AllocPlanColumnID(),
|
|
RetType: tp,
|
|
}
|
|
// append the physical expand op with groupingID column.
|
|
physicalExpand.SetSchema(mpp.Plan().Schema().Clone())
|
|
physicalExpand.Schema().Append(groupingIDCol.(*expression.Column))
|
|
physicalExpand.GroupingIDCol = groupingIDCol.(*expression.Column)
|
|
// attach PhysicalExpand to mpp
|
|
attachPlan2Task(physicalExpand, mpp)
|
|
|
|
// having group sets
|
|
clonedAgg, err := finalAgg.Clone(p.SCtx())
|
|
if err != nil {
|
|
return nil, nil, nil, nil, err
|
|
}
|
|
cloneHashAgg := clonedAgg.(*physicalop.PhysicalHashAgg)
|
|
// Clone(), it will share same base-plan elements from the finalAgg, including id,tp,stats. Make a new one here.
|
|
cloneHashAgg.Plan = baseimpl.NewBasePlan(cloneHashAgg.SCtx(), cloneHashAgg.TP(), cloneHashAgg.QueryBlockOffset())
|
|
cloneHashAgg.SetStats(finalAgg.StatsInfo()) // reuse the final agg stats here.
|
|
|
|
// step1: adjust partial agg, for normal agg here, adjust it to target for specified group data.
|
|
// Since we may substitute the first arg of normal agg with case-when expression here, append a
|
|
// customized proj here rather than depending on postOptimize to insert a blunt one for us.
|
|
//
|
|
// proj4Partial output all the base col from lower op + caseWhen proj cols.
|
|
proj4Partial := new(physicalop.PhysicalProjection).Init(p.SCtx(), mpp.Plan().StatsInfo(), mpp.Plan().QueryBlockOffset())
|
|
for _, col := range mpp.Plan().Schema().Columns {
|
|
proj4Partial.Exprs = append(proj4Partial.Exprs, col)
|
|
}
|
|
proj4Partial.SetSchema(mpp.Plan().Schema().Clone())
|
|
|
|
partialHashAgg := partialAgg.(*physicalop.PhysicalHashAgg)
|
|
partialHashAgg.GroupByItems = append(partialHashAgg.GroupByItems, groupingIDCol)
|
|
partialHashAgg.Schema().Append(groupingIDCol.(*expression.Column))
|
|
// it will create a new stats for partial agg.
|
|
scaleStats4GroupingSets(partialHashAgg, groupingSets, groupingIDCol.(*expression.Column), proj4Partial.Schema(), proj4Partial.StatsInfo())
|
|
for _, fun := range partialHashAgg.AggFuncs {
|
|
if !fun.HasDistinct {
|
|
// for normal agg phase1, we should also modify them to target for specified group data.
|
|
// Expr = (case when groupingID = targeted_groupingID then arg else null end)
|
|
eqExpr := expression.NewFunctionInternal(p.SCtx().GetExprCtx(), ast.EQ, types.NewFieldType(mysql.TypeTiny), groupingIDCol, expression.NewUInt64Const(fun.GroupingID))
|
|
caseWhen := expression.NewFunctionInternal(p.SCtx().GetExprCtx(), ast.Case, fun.Args[0].GetType(ectx), eqExpr, fun.Args[0], expression.NewNull())
|
|
caseWhenProjCol := &expression.Column{
|
|
UniqueID: p.SCtx().GetSessionVars().AllocPlanColumnID(),
|
|
RetType: fun.Args[0].GetType(ectx),
|
|
}
|
|
proj4Partial.Exprs = append(proj4Partial.Exprs, caseWhen)
|
|
proj4Partial.Schema().Append(caseWhenProjCol)
|
|
fun.Args[0] = caseWhenProjCol
|
|
}
|
|
}
|
|
|
|
// step2: adjust middle agg
|
|
// middleHashAgg shared the same stats with the final agg does.
|
|
middleHashAgg := cloneHashAgg
|
|
middleSchema := expression.NewSchema()
|
|
schemaMap := make(map[int64]*expression.Column, len(middleHashAgg.AggFuncs))
|
|
for _, fun := range middleHashAgg.AggFuncs {
|
|
col := &expression.Column{
|
|
UniqueID: p.SCtx().GetSessionVars().AllocPlanColumnID(),
|
|
RetType: fun.RetTp,
|
|
}
|
|
if fun.HasDistinct {
|
|
// let count distinct agg aggregate on whole-scope data rather using case-when expr to target on specified group. (agg null strict attribute)
|
|
fun.Mode = aggregation.Partial1Mode
|
|
} else {
|
|
fun.Mode = aggregation.Partial2Mode
|
|
originalCol := fun.Args[0].(*expression.Column)
|
|
// record the origin column unique id down before change it to be case when expr.
|
|
// mapping the current partial output column with the agg origin arg column. (final agg arg should use this one)
|
|
schemaMap[originalCol.UniqueID] = col
|
|
}
|
|
middleSchema.Append(col)
|
|
}
|
|
middleHashAgg.SetSchema(middleSchema)
|
|
|
|
// step3: adjust final agg
|
|
finalHashAgg := finalAgg.(*physicalop.PhysicalHashAgg)
|
|
finalAggDescs := make([]*aggregation.AggFuncDesc, 0, len(finalHashAgg.AggFuncs))
|
|
for i, fun := range finalHashAgg.AggFuncs {
|
|
newArgs := make([]expression.Expression, 0, 1)
|
|
if fun.HasDistinct {
|
|
// change count(distinct) agg to sum()
|
|
fun.Name = ast.AggFuncSum
|
|
fun.HasDistinct = false
|
|
// count(distinct a,b) -> become a single partial result col.
|
|
newArgs = append(newArgs, middleSchema.Columns[i])
|
|
} else {
|
|
// remap final normal agg args to be output schema of middle normal agg.
|
|
for _, arg := range fun.Args {
|
|
newCol, err := arg.RemapColumn(schemaMap)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, err
|
|
}
|
|
newArgs = append(newArgs, newCol)
|
|
}
|
|
}
|
|
fun.Mode = aggregation.FinalMode
|
|
fun.Args = newArgs
|
|
fun.GroupingID = 0
|
|
finalAggDescs = append(finalAggDescs, fun)
|
|
}
|
|
finalHashAgg.AggFuncs = finalAggDescs
|
|
return finalHashAgg, middleHashAgg, partialHashAgg, proj4Partial, nil
|
|
}
|
|
|
|
func attach2TaskForMpp(p *physicalop.PhysicalHashAgg, tasks ...base.Task) base.Task {
|
|
ectx := p.SCtx().GetExprCtx().GetEvalCtx()
|
|
|
|
t := tasks[0].Copy()
|
|
mpp, ok := t.(*physicalop.MppTask)
|
|
if !ok {
|
|
return base.InvalidTask
|
|
}
|
|
switch p.MppRunMode {
|
|
case physicalop.Mpp1Phase:
|
|
// 1-phase agg: when the partition columns can be satisfied, where the plan does not need to enforce Exchange
|
|
// only push down the original agg
|
|
proj := p.ConvertAvgForMPP()
|
|
attachPlan2Task(p, mpp)
|
|
if proj != nil {
|
|
attachPlan2Task(proj, mpp)
|
|
}
|
|
return mpp
|
|
case physicalop.Mpp2Phase:
|
|
// TODO: when partition property is matched by sub-plan, we actually needn't do extra an exchange and final agg.
|
|
proj := p.ConvertAvgForMPP()
|
|
partialAgg, finalAgg := p.NewPartialAggregate(kv.TiFlash, true)
|
|
if partialAgg == nil {
|
|
return base.InvalidTask
|
|
}
|
|
attachPlan2Task(partialAgg, mpp)
|
|
partitionCols := p.MppPartitionCols
|
|
if len(partitionCols) == 0 {
|
|
items := finalAgg.(*physicalop.PhysicalHashAgg).GroupByItems
|
|
partitionCols = make([]*property.MPPPartitionColumn, 0, len(items))
|
|
for _, expr := range items {
|
|
col, ok := expr.(*expression.Column)
|
|
if !ok {
|
|
return base.InvalidTask
|
|
}
|
|
partitionCols = append(partitionCols, &property.MPPPartitionColumn{
|
|
Col: col,
|
|
CollateID: property.GetCollateIDByNameForPartition(col.GetType(ectx).GetCollate()),
|
|
})
|
|
}
|
|
}
|
|
if partialHashAgg, ok := partialAgg.(*physicalop.PhysicalHashAgg); ok && len(partitionCols) != 0 {
|
|
partialHashAgg.TiflashPreAggMode = p.SCtx().GetSessionVars().TiFlashPreAggMode
|
|
}
|
|
prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: partitionCols}
|
|
newMpp := mpp.EnforceExchangerImpl(prop)
|
|
if newMpp.Invalid() {
|
|
return newMpp
|
|
}
|
|
attachPlan2Task(finalAgg, newMpp)
|
|
// TODO: how to set 2-phase cost?
|
|
if proj != nil {
|
|
attachPlan2Task(proj, newMpp)
|
|
}
|
|
return newMpp
|
|
case physicalop.MppTiDB:
|
|
partialAgg, finalAgg := p.NewPartialAggregate(kv.TiFlash, false)
|
|
if partialAgg != nil {
|
|
attachPlan2Task(partialAgg, mpp)
|
|
}
|
|
t = mpp.ConvertToRootTask(p.SCtx())
|
|
attachPlan2Task(finalAgg, t)
|
|
return t
|
|
case physicalop.MppScalar:
|
|
prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.SinglePartitionType}
|
|
if !property.NeedEnforceExchanger(mpp.GetPartitionType(), mpp.HashCols, prop, nil) {
|
|
// On the one hand: when the low layer already satisfied the single partition layout, just do the all agg computation in the single node.
|
|
return attach2TaskForMpp1Phase(p, mpp)
|
|
}
|
|
// On the other hand: try to split the mppScalar agg into multi phases agg **down** to multi nodes since data already distributed across nodes.
|
|
// we have to check it before the content of p has been modified
|
|
canUse3StageAgg, groupingSets := p.Scale3StageForDistinctAgg()
|
|
proj := p.ConvertAvgForMPP()
|
|
partialAgg, finalAgg := p.NewPartialAggregate(kv.TiFlash, true)
|
|
if finalAgg == nil {
|
|
return base.InvalidTask
|
|
}
|
|
|
|
final, middle, partial, proj4Partial, err := adjust3StagePhaseAgg(p, partialAgg, finalAgg, canUse3StageAgg, groupingSets, mpp)
|
|
if err != nil {
|
|
return base.InvalidTask
|
|
}
|
|
|
|
// partial agg proj would be null if one scalar agg cannot run in two-phase mode
|
|
if proj4Partial != nil {
|
|
attachPlan2Task(proj4Partial, mpp)
|
|
}
|
|
|
|
// partial agg would be null if one scalar agg cannot run in two-phase mode
|
|
if partial != nil {
|
|
attachPlan2Task(partial, mpp)
|
|
}
|
|
|
|
if middle != nil && canUse3StageAgg {
|
|
items := partial.(*physicalop.PhysicalHashAgg).GroupByItems
|
|
partitionCols := make([]*property.MPPPartitionColumn, 0, len(items))
|
|
for _, expr := range items {
|
|
col, ok := expr.(*expression.Column)
|
|
if !ok {
|
|
continue
|
|
}
|
|
partitionCols = append(partitionCols, &property.MPPPartitionColumn{
|
|
Col: col,
|
|
CollateID: property.GetCollateIDByNameForPartition(col.GetType(ectx).GetCollate()),
|
|
})
|
|
}
|
|
|
|
exProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: partitionCols}
|
|
newMpp := mpp.EnforceExchanger(exProp, nil)
|
|
attachPlan2Task(middle, newMpp)
|
|
mpp = newMpp
|
|
if partialHashAgg, ok := partial.(*physicalop.PhysicalHashAgg); ok && len(partitionCols) != 0 {
|
|
partialHashAgg.TiflashPreAggMode = p.SCtx().GetSessionVars().TiFlashPreAggMode
|
|
}
|
|
}
|
|
|
|
// prop here still be the first generated single-partition requirement.
|
|
newMpp := mpp.EnforceExchanger(prop, nil)
|
|
attachPlan2Task(final, newMpp)
|
|
if proj == nil {
|
|
proj = physicalop.PhysicalProjection{
|
|
Exprs: make([]expression.Expression, 0, len(p.Schema().Columns)),
|
|
}.Init(p.SCtx(), p.StatsInfo(), p.QueryBlockOffset())
|
|
for _, col := range p.Schema().Columns {
|
|
proj.Exprs = append(proj.Exprs, col)
|
|
}
|
|
proj.SetSchema(p.Schema())
|
|
}
|
|
attachPlan2Task(proj, newMpp)
|
|
return newMpp
|
|
default:
|
|
return base.InvalidTask
|
|
}
|
|
}
|
|
|
|
// attach2Task4PhysicalHashAgg implements the PhysicalPlan interface.
|
|
func attach2Task4PhysicalHashAgg(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalHashAgg)
|
|
t := tasks[0].Copy()
|
|
if cop, ok := t.(*physicalop.CopTask); ok {
|
|
if len(cop.RootTaskConds) == 0 && len(cop.IdxMergePartPlans) == 0 {
|
|
copTaskType := cop.GetStoreType()
|
|
partialAgg, finalAgg := p.NewPartialAggregate(copTaskType, false)
|
|
if partialAgg != nil {
|
|
if cop.TablePlan != nil {
|
|
cop.FinishIndexPlan()
|
|
// the partialAgg attachment didn't follow the attachPlan2Task function, so here we actively call
|
|
// inheritStatsFromBottomForIndexJoinInner(p, t) to inherit stats from the bottom plan for index
|
|
// join inner side. note: partialAgg will share stats with finalAgg.
|
|
inheritStatsFromBottomElemForIndexJoinInner(partialAgg, cop.IndexJoinInfo, cop.TablePlan.StatsInfo())
|
|
partialAgg.SetChildren(cop.TablePlan)
|
|
cop.TablePlan = partialAgg
|
|
// If needExtraProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
|
|
// the schema is the same as the original DataSource schema.
|
|
// However, we pushed down the agg here, the partial agg was placed on the top of tablePlan, and the final
|
|
// agg will be placed above the PhysicalIndexLookUpReader, and the schema will be set correctly for them.
|
|
// If we add the projection again, the projection will be between the PhysicalIndexLookUpReader and
|
|
// the partial agg, and the schema will be broken.
|
|
cop.NeedExtraProj = false
|
|
} else {
|
|
// the partialAgg attachment didn't follow the attachPlan2Task function, so here we actively call
|
|
// inheritStatsFromBottomForIndexJoinInner(p, t) to inherit stats from the bottom plan for index
|
|
// join inner side. note: partialAgg will share stats with finalAgg.
|
|
inheritStatsFromBottomElemForIndexJoinInner(partialAgg, cop.IndexJoinInfo, cop.IndexPlan.StatsInfo())
|
|
partialAgg.SetChildren(cop.IndexPlan)
|
|
cop.IndexPlan = partialAgg
|
|
}
|
|
}
|
|
// In `newPartialAggregate`, we are using stats of final aggregation as stats
|
|
// of `partialAgg`, so the network cost of transferring result rows of `partialAgg`
|
|
// to TiDB is normally under-estimated for hash aggregation, since the group-by
|
|
// column may be independent of the column used for region distribution, so a closer
|
|
// estimation of network cost for hash aggregation may multiply the number of
|
|
// regions involved in the `partialAgg`, which is unknown however.
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
attachPlan2Task(finalAgg, t)
|
|
} else {
|
|
t = cop.ConvertToRootTask(p.SCtx())
|
|
attachPlan2Task(p, t)
|
|
}
|
|
} else if _, ok := t.(*physicalop.MppTask); ok {
|
|
return attach2TaskForMpp(p, tasks...)
|
|
} else {
|
|
attachPlan2Task(p, t)
|
|
}
|
|
return t
|
|
}
|
|
|
|
func attach2TaskForMPP4PhysicalWindow(p *physicalop.PhysicalWindow, mpp *physicalop.MppTask) base.Task {
|
|
// FIXME: currently, tiflash's join has different schema with TiDB,
|
|
// so we have to rebuild the schema of join and operators which may inherit schema from join.
|
|
// for window, we take the sub-plan's schema, and the schema generated by windowDescs.
|
|
columns := p.Schema().Clone().Columns[len(p.Schema().Columns)-len(p.WindowFuncDescs):]
|
|
p.SetSchema(expression.MergeSchema(mpp.Plan().Schema(), expression.NewSchema(columns...)))
|
|
|
|
failpoint.Inject("CheckMPPWindowSchemaLength", func() {
|
|
if len(p.Schema().Columns) != len(mpp.Plan().Schema().Columns)+len(p.WindowFuncDescs) {
|
|
panic("mpp physical window has incorrect schema length")
|
|
}
|
|
})
|
|
|
|
return attachPlan2Task(p, mpp)
|
|
}
|
|
|
|
func attach2Task4PhysicalWindow(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalWindow)
|
|
if mpp, ok := tasks[0].Copy().(*physicalop.MppTask); ok && p.StoreTp == kv.TiFlash {
|
|
return attach2TaskForMPP4PhysicalWindow(p, mpp)
|
|
}
|
|
t := tasks[0].ConvertToRootTask(p.SCtx())
|
|
return attachPlan2Task(p.Self, t)
|
|
}
|
|
|
|
// attach2Task4PhysicalCTEStorage implements the PhysicalPlan interface.
|
|
func attach2Task4PhysicalCTEStorage(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalCTEStorage)
|
|
t := tasks[0].Copy()
|
|
if mpp, ok := t.(*physicalop.MppTask); ok {
|
|
p.SetChildren(t.Plan())
|
|
nt := physicalop.NewMppTask(p,
|
|
mpp.GetPartitionType(), mpp.GetHashCols(),
|
|
mpp.GetTblColHists(), mpp.GetWarnings())
|
|
return nt
|
|
}
|
|
t.ConvertToRootTask(p.SCtx())
|
|
p.SetChildren(t.Plan())
|
|
ta := &physicalop.RootTask{}
|
|
ta.SetPlan(p)
|
|
ta.Warnings.CopyFrom(&t.(*physicalop.RootTask).Warnings)
|
|
return ta
|
|
}
|
|
|
|
// attach2Task4PhysicalSequence implements PhysicalSequence.Attach2Task.
|
|
func attach2Task4PhysicalSequence(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
|
|
p := pp.(*physicalop.PhysicalSequence)
|
|
|
|
for _, t := range tasks {
|
|
_, isMpp := t.(*physicalop.MppTask)
|
|
if !isMpp {
|
|
return tasks[len(tasks)-1]
|
|
}
|
|
}
|
|
|
|
lastTask := tasks[len(tasks)-1].(*physicalop.MppTask)
|
|
|
|
children := make([]base.PhysicalPlan, 0, len(tasks))
|
|
for _, t := range tasks {
|
|
children = append(children, t.Plan())
|
|
}
|
|
|
|
p.SetChildren(children...)
|
|
|
|
mppTask := physicalop.NewMppTask(p, lastTask.GetPartitionType(), lastTask.GetHashCols(), lastTask.GetTblColHists(), nil)
|
|
tmpWarnings := make([]*physicalop.SimpleWarnings, 0, len(tasks))
|
|
for _, t := range tasks {
|
|
if mpp, ok := t.(*physicalop.MppTask); ok {
|
|
tmpWarnings = append(tmpWarnings, &mpp.Warnings)
|
|
continue
|
|
}
|
|
if root, ok := t.(*physicalop.RootTask); ok {
|
|
tmpWarnings = append(tmpWarnings, &root.Warnings)
|
|
continue
|
|
}
|
|
if cop, ok := t.(*physicalop.CopTask); ok {
|
|
tmpWarnings = append(tmpWarnings, &cop.Warnings)
|
|
}
|
|
}
|
|
mppTask.Warnings.CopyFrom(tmpWarnings...)
|
|
return mppTask
|
|
}
|
|
|
|
func collectRowSizeFromMPPPlan(mppPlan base.PhysicalPlan) (rowSize float64) {
|
|
if mppPlan != nil && mppPlan.StatsInfo() != nil && mppPlan.StatsInfo().HistColl != nil {
|
|
return cardinality.GetAvgRowSize(mppPlan.SCtx(), mppPlan.StatsInfo().HistColl, mppPlan.Schema().Columns, false, false)
|
|
}
|
|
return 1 // use 1 as lower-bound for safety
|
|
}
|
|
|
|
func accumulateNetSeekCost4MPP(p base.PhysicalPlan) (cost float64) {
|
|
if ts, ok := p.(*physicalop.PhysicalTableScan); ok {
|
|
return float64(len(ts.Ranges)) * float64(len(ts.Columns)) * ts.SCtx().GetSessionVars().GetSeekFactor(ts.Table)
|
|
}
|
|
for _, c := range p.Children() {
|
|
cost += accumulateNetSeekCost4MPP(c)
|
|
}
|
|
return
|
|
}
|