planner: add datasource inner task build by feeling and building for index join prop. (#60495)
ref pingcap/tidb#60106
This commit is contained in:
@ -54,6 +54,11 @@ func exhaustPhysicalPlans4LogicalUnionScan(lp base.LogicalPlan, prop *property.P
|
||||
return nil, true, nil
|
||||
}
|
||||
childProp := prop.CloneEssentialFields()
|
||||
childProp = admitIndexJoinProp(childProp, prop)
|
||||
if childProp == nil {
|
||||
// even hint can not work with this. index join prop is not satisfied in mpp task type.
|
||||
return nil, false, nil
|
||||
}
|
||||
// here we just pass down the keep order property to the child.
|
||||
// cuz, in union scan exec, it will feel the underlying tableReader or indexReader to get the keepOrder.
|
||||
us := PhysicalUnionScan{
|
||||
@ -1086,6 +1091,27 @@ type indexJoinInnerChildWrapper struct {
|
||||
zippedChildren []base.LogicalPlan
|
||||
}
|
||||
|
||||
// admitIndexJoinInnerChildPattern is used to check whether current physical choosing is under an index join's
|
||||
// probe side. If it is, and we ganna check the original inner pattern check here to keep compatible with the old.
|
||||
// the @first bool indicate whether current logical plan is valid of index join inner side.
|
||||
func admitIndexJoinInnerChildPattern(p base.LogicalPlan) bool {
|
||||
switch x := p.GetBaseLogicalPlan().(*logicalop.BaseLogicalPlan).Self().(type) {
|
||||
case *logicalop.DataSource:
|
||||
// DS that prefer tiFlash reading couldn't walk into index join.
|
||||
if x.PreferStoreType&h.PreferTiFlash != 0 {
|
||||
return false
|
||||
}
|
||||
case *logicalop.LogicalProjection, *logicalop.LogicalSelection, *logicalop.LogicalAggregation:
|
||||
if !p.SCtx().GetSessionVars().EnableINLJoinInnerMultiPattern {
|
||||
return false
|
||||
}
|
||||
case *logicalop.LogicalUnionScan:
|
||||
default: // index join inner side couldn't allow join, sort, limit, etc. todo: open it.
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func extractIndexJoinInnerChildPattern(p *logicalop.LogicalJoin, innerChild base.LogicalPlan) *indexJoinInnerChildWrapper {
|
||||
wrapper := &indexJoinInnerChildWrapper{}
|
||||
nextChild := func(pp base.LogicalPlan) base.LogicalPlan {
|
||||
@ -2756,6 +2782,7 @@ func exhaustPhysicalPlans4LogicalProjection(lp base.LogicalPlan, prop *property.
|
||||
}
|
||||
|
||||
ret := make([]base.PhysicalPlan, 0, len(newProps))
|
||||
newProps = admitIndexJoinProps(newProps, prop)
|
||||
for _, newProp := range newProps {
|
||||
proj := PhysicalProjection{
|
||||
Exprs: p.Exprs,
|
||||
@ -3110,6 +3137,12 @@ func getEnforcedStreamAggs(la *logicalop.LogicalAggregation, prop *property.Phys
|
||||
SortItems: property.SortItemsFromCols(la.GetGroupByCols(), desc),
|
||||
}
|
||||
if !prop.IsPrefix(childProp) {
|
||||
// empty
|
||||
return enforcedAggs
|
||||
}
|
||||
childProp = admitIndexJoinProp(childProp, prop)
|
||||
if childProp == nil {
|
||||
// empty
|
||||
return enforcedAggs
|
||||
}
|
||||
taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopMultiReadTaskType}
|
||||
@ -3122,6 +3155,8 @@ func getEnforcedStreamAggs(la *logicalop.LogicalAggregation, prop *property.Phys
|
||||
} else if !la.PreferAggToCop {
|
||||
taskTypes = append(taskTypes, property.RootTaskType)
|
||||
}
|
||||
// only admit special types for index join prop
|
||||
taskTypes = admitIndexJoinTypes(taskTypes, prop)
|
||||
for _, taskTp := range taskTypes {
|
||||
copiedChildProperty := new(property.PhysicalProperty)
|
||||
*copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field.
|
||||
@ -3169,6 +3204,10 @@ func getStreamAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.
|
||||
childProp := &property.PhysicalProperty{
|
||||
ExpectedCnt: math.Max(prop.ExpectedCnt*la.InputCount/la.StatsInfo().RowCount, prop.ExpectedCnt),
|
||||
}
|
||||
childProp = admitIndexJoinProp(childProp, prop)
|
||||
if childProp == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, possibleChildProperty := range la.PossibleProperties {
|
||||
childProp.SortItems = property.SortItemsFromCols(possibleChildProperty[:len(groupByCols)], desc)
|
||||
@ -3194,6 +3233,7 @@ func getStreamAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.
|
||||
if !la.CanPushToCop(kv.TiKV) && !la.CanPushToCop(kv.TiFlash) {
|
||||
taskTypes = []property.TaskType{property.RootTaskType}
|
||||
}
|
||||
taskTypes = admitIndexJoinTypes(taskTypes, prop)
|
||||
for _, taskTp := range taskTypes {
|
||||
copiedChildProperty := new(property.PhysicalProperty)
|
||||
*copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field.
|
||||
@ -3462,7 +3502,7 @@ func getHashAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.Ph
|
||||
if prop.IsFlashProp() {
|
||||
taskTypes = []property.TaskType{prop.TaskTp}
|
||||
}
|
||||
|
||||
taskTypes = admitIndexJoinTypes(taskTypes, prop)
|
||||
for _, taskTp := range taskTypes {
|
||||
if taskTp == property.MppTaskType {
|
||||
mppAggs := tryToGetMppHashAggs(la, prop)
|
||||
@ -3470,7 +3510,13 @@ func getHashAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.Ph
|
||||
hashAggs = append(hashAggs, mppAggs...)
|
||||
}
|
||||
} else {
|
||||
agg := NewPhysicalHashAgg(la, la.StatsInfo().ScaleByExpectCnt(prop.ExpectedCnt), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, TaskTp: taskTp, CTEProducerStatus: prop.CTEProducerStatus})
|
||||
childProp := &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, TaskTp: taskTp, CTEProducerStatus: prop.CTEProducerStatus}
|
||||
// mainly to fill indexJoinProp to childProp.
|
||||
childProp = admitIndexJoinProp(childProp, prop)
|
||||
if childProp == nil {
|
||||
continue
|
||||
}
|
||||
agg := NewPhysicalHashAgg(la, la.StatsInfo().ScaleByExpectCnt(prop.ExpectedCnt), childProp)
|
||||
agg.SetSchema(la.Schema().Clone())
|
||||
hashAggs = append(hashAggs, agg)
|
||||
}
|
||||
@ -3504,6 +3550,64 @@ func exhaustPhysicalPlans4LogicalAggregation(lp base.LogicalPlan, prop *property
|
||||
return aggs, !(preferStream || preferHash), nil
|
||||
}
|
||||
|
||||
func admitIndexJoinTypes(types []property.TaskType, prop *property.PhysicalProperty) []property.TaskType {
|
||||
if prop.TaskTp == property.MppTaskType {
|
||||
// if the parent prop is mppTask, we assume it couldn't contain indexJoinProp by default,
|
||||
// which is guaranteed by the parent physical plans enumeration.
|
||||
return types
|
||||
}
|
||||
// only admit root & cop task type to push down indexJoinProp.
|
||||
if prop.IndexJoinProp != nil {
|
||||
newTypes := types[:0]
|
||||
for _, tp := range types {
|
||||
if tp != property.MppTaskType {
|
||||
newTypes = append(newTypes, tp)
|
||||
}
|
||||
}
|
||||
types = newTypes
|
||||
}
|
||||
return types
|
||||
}
|
||||
|
||||
func admitIndexJoinProps(children []*property.PhysicalProperty, prop *property.PhysicalProperty) []*property.PhysicalProperty {
|
||||
if prop.TaskTp == property.MppTaskType {
|
||||
// if the parent prop is mppTask, we assume it couldn't contain indexJoinProp by default,
|
||||
// which is guaranteed by the parent physical plans enumeration.
|
||||
return children
|
||||
}
|
||||
// only admit root & cop task type to push down indexJoinProp.
|
||||
if prop.IndexJoinProp != nil {
|
||||
newChildren := children[:0]
|
||||
for _, child := range children {
|
||||
if child.TaskTp != property.MppTaskType {
|
||||
child.IndexJoinProp = prop.IndexJoinProp
|
||||
// only admit non-mpp task prop.
|
||||
newChildren = append(newChildren, child)
|
||||
}
|
||||
}
|
||||
children = newChildren
|
||||
}
|
||||
return children
|
||||
}
|
||||
|
||||
func admitIndexJoinProp(child, prop *property.PhysicalProperty) *property.PhysicalProperty {
|
||||
if prop.TaskTp == property.MppTaskType {
|
||||
// if the parent prop is mppTask, we assume it couldn't contain indexJoinProp by default,
|
||||
// which is guaranteed by the parent physical plans enumeration.
|
||||
return child
|
||||
}
|
||||
// only admit root & cop task type to push down indexJoinProp.
|
||||
if prop.IndexJoinProp != nil {
|
||||
if child.TaskTp != property.MppTaskType {
|
||||
child.IndexJoinProp = prop.IndexJoinProp
|
||||
} else {
|
||||
// only admit non-mpp task prop.
|
||||
child = nil
|
||||
}
|
||||
}
|
||||
return child
|
||||
}
|
||||
|
||||
func exhaustPhysicalPlans4LogicalSelection(lp base.LogicalPlan, prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
|
||||
p := lp.(*logicalop.LogicalSelection)
|
||||
newProps := make([]*property.PhysicalProperty, 0, 2)
|
||||
@ -3519,6 +3623,7 @@ func exhaustPhysicalPlans4LogicalSelection(lp base.LogicalPlan, prop *property.P
|
||||
}
|
||||
|
||||
ret := make([]base.PhysicalPlan, 0, len(newProps))
|
||||
newProps = admitIndexJoinProps(newProps, prop)
|
||||
for _, newProp := range newProps {
|
||||
sel := PhysicalSelection{
|
||||
Conditions: p.Conditions,
|
||||
|
||||
@ -92,6 +92,10 @@ func GetPropByOrderByItemsContainScalarFunc(items []*util.ByItems) (_ *property.
|
||||
}
|
||||
|
||||
func findBestTask4LogicalTableDual(lp base.LogicalPlan, prop *property.PhysicalProperty, planCounter *base.PlanCounterTp, opt *optimizetrace.PhysicalOptimizeOp) (base.Task, int64, error) {
|
||||
if prop.IndexJoinProp != nil {
|
||||
// even enforce hint can not work with this.
|
||||
return base.InvalidTask, 0, nil
|
||||
}
|
||||
p := lp.(*logicalop.LogicalTableDual)
|
||||
// If the required property is not empty and the row count > 1,
|
||||
// we cannot ensure this required property.
|
||||
@ -112,6 +116,10 @@ func findBestTask4LogicalTableDual(lp base.LogicalPlan, prop *property.PhysicalP
|
||||
}
|
||||
|
||||
func findBestTask4LogicalShow(lp base.LogicalPlan, prop *property.PhysicalProperty, planCounter *base.PlanCounterTp, _ *optimizetrace.PhysicalOptimizeOp) (base.Task, int64, error) {
|
||||
if prop.IndexJoinProp != nil {
|
||||
// even enforce hint can not work with this.
|
||||
return base.InvalidTask, 0, nil
|
||||
}
|
||||
p := lp.(*logicalop.LogicalShow)
|
||||
if !prop.IsSortItemEmpty() || planCounter.Empty() {
|
||||
return base.InvalidTask, 0, nil
|
||||
@ -125,6 +133,10 @@ func findBestTask4LogicalShow(lp base.LogicalPlan, prop *property.PhysicalProper
|
||||
}
|
||||
|
||||
func findBestTask4LogicalShowDDLJobs(lp base.LogicalPlan, prop *property.PhysicalProperty, planCounter *base.PlanCounterTp, _ *optimizetrace.PhysicalOptimizeOp) (base.Task, int64, error) {
|
||||
if prop.IndexJoinProp != nil {
|
||||
// even enforce hint can not work with this.
|
||||
return base.InvalidTask, 0, nil
|
||||
}
|
||||
p := lp.(*logicalop.LogicalShowDDLJobs)
|
||||
if !prop.IsSortItemEmpty() || planCounter.Empty() {
|
||||
return base.InvalidTask, 0, nil
|
||||
@ -540,6 +552,14 @@ func findBestTask(lp base.LogicalPlan, prop *property.PhysicalProperty, planCoun
|
||||
planCounter.Dec(1)
|
||||
return bestTask, 1, nil
|
||||
}
|
||||
// if prop is require an index join's probe side, check the inner pattern admission here.
|
||||
if prop.IndexJoinProp != nil {
|
||||
pass := admitIndexJoinInnerChildPattern(lp)
|
||||
if !pass {
|
||||
// even enforce hint can not work with this.
|
||||
return base.InvalidTask, 0, nil
|
||||
}
|
||||
}
|
||||
|
||||
canAddEnforcer := prop.CanAddEnforcer
|
||||
|
||||
@ -553,6 +573,10 @@ func findBestTask(lp base.LogicalPlan, prop *property.PhysicalProperty, planCoun
|
||||
// prop should be read only because its cached hashcode might be not consistent
|
||||
// when it is changed. So we clone a new one for the temporary changes.
|
||||
newProp := prop.CloneEssentialFields()
|
||||
// here newProp is used as another complete copy for enforcer, fill indexJoinProp manually.
|
||||
// for childProp := prop.CloneEssentialFields(), we do not clone indexJoinProp childProp for by default.
|
||||
// and only call admitIndexJoinProp to inherit the indexJoinProp for special pattern operators.
|
||||
newProp.IndexJoinProp = prop.IndexJoinProp
|
||||
var plansFitsProp, plansNeedEnforce []base.PhysicalPlan
|
||||
var hintWorksWithProp bool
|
||||
// Maybe the plan can satisfy the required property,
|
||||
@ -626,6 +650,10 @@ END:
|
||||
}
|
||||
|
||||
func findBestTask4LogicalMemTable(lp base.LogicalPlan, prop *property.PhysicalProperty, planCounter *base.PlanCounterTp, opt *optimizetrace.PhysicalOptimizeOp) (t base.Task, cntPlan int64, err error) {
|
||||
if prop.IndexJoinProp != nil {
|
||||
// even enforce hint can not work with this.
|
||||
return base.InvalidTask, 0, nil
|
||||
}
|
||||
p := lp.(*logicalop.LogicalMemTable)
|
||||
if prop.MPPPartitionTp != property.AnyType {
|
||||
return base.InvalidTask, 0, nil
|
||||
@ -1421,6 +1449,19 @@ func findBestTask4LogicalDataSource(lp base.LogicalPlan, prop *property.Physical
|
||||
planCounter.Dec(1)
|
||||
return
|
||||
}
|
||||
// if prop is require an index join's probe side, check the inner pattern admission here.
|
||||
if prop.IndexJoinProp != nil {
|
||||
pass := admitIndexJoinInnerChildPattern(lp)
|
||||
if !pass {
|
||||
// even enforce hint can not work with this.
|
||||
return base.InvalidTask, 0, nil
|
||||
}
|
||||
// when datasource leaf is in index join's inner side, build the task out with old
|
||||
// index join build logic, we can't merge this with normal datasource's index range
|
||||
// because normal index range is built on expression EQ/IN. while index join's inner
|
||||
// has its special runtime constants detecting and filling logic.
|
||||
return getBestIndexJoinInnerTaskByProp(ds, prop, opt, planCounter)
|
||||
}
|
||||
var cnt int64
|
||||
var unenforcedTask base.Task
|
||||
// If prop.CanAddEnforcer is true, the prop.SortItems need to be set nil for ds.findBestTask.
|
||||
@ -3108,6 +3149,10 @@ func getOriginalPhysicalIndexScan(ds *logicalop.DataSource, prop *property.Physi
|
||||
}
|
||||
|
||||
func findBestTask4LogicalCTE(lp base.LogicalPlan, prop *property.PhysicalProperty, counter *base.PlanCounterTp, pop *optimizetrace.PhysicalOptimizeOp) (t base.Task, cntPlan int64, err error) {
|
||||
if prop.IndexJoinProp != nil {
|
||||
// even enforce hint can not work with this.
|
||||
return base.InvalidTask, 0, nil
|
||||
}
|
||||
p := lp.(*logicalop.LogicalCTE)
|
||||
if p.ChildLen() > 0 {
|
||||
return p.BaseLogicalPlan.FindBestTask(prop, counter, pop)
|
||||
@ -3142,6 +3187,10 @@ func findBestTask4LogicalCTE(lp base.LogicalPlan, prop *property.PhysicalPropert
|
||||
}
|
||||
|
||||
func findBestTask4LogicalCTETable(lp base.LogicalPlan, prop *property.PhysicalProperty, _ *base.PlanCounterTp, _ *optimizetrace.PhysicalOptimizeOp) (t base.Task, cntPlan int64, err error) {
|
||||
if prop.IndexJoinProp != nil {
|
||||
// even enforce hint can not work with this.
|
||||
return base.InvalidTask, 0, nil
|
||||
}
|
||||
p := lp.(*logicalop.LogicalCTETable)
|
||||
if !prop.IsSortItemEmpty() {
|
||||
return base.InvalidTask, 0, nil
|
||||
|
||||
@ -25,10 +25,12 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/parser/ast"
|
||||
"github.com/pingcap/tidb/pkg/parser/mysql"
|
||||
"github.com/pingcap/tidb/pkg/planner/cardinality"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/base"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
|
||||
"github.com/pingcap/tidb/pkg/planner/planctx"
|
||||
"github.com/pingcap/tidb/pkg/planner/property"
|
||||
"github.com/pingcap/tidb/pkg/planner/util"
|
||||
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
|
||||
"github.com/pingcap/tidb/pkg/statistics"
|
||||
"github.com/pingcap/tidb/pkg/types"
|
||||
"github.com/pingcap/tidb/pkg/util/chunk"
|
||||
@ -630,6 +632,40 @@ func getIndexJoinIntPKPathInfo(ds *logicalop.DataSource, innerJoinKeys, outerJoi
|
||||
return keyOff2IdxOff, newOuterJoinKeys, ranges, chosenPath, true
|
||||
}
|
||||
|
||||
// getBestIndexJoinInnerTaskByProp tries to build the best inner child task from ds for index join by the given property.
|
||||
func getBestIndexJoinInnerTaskByProp(ds *logicalop.DataSource, prop *property.PhysicalProperty,
|
||||
opt *optimizetrace.PhysicalOptimizeOp, planCounter *base.PlanCounterTp) (base.Task, int64, error) {
|
||||
// the below code is quite similar from the original logic
|
||||
// reason1: we need to leverage original indexPathInfo down related logic to build constant range for index plan.
|
||||
// reason2: the ranges from TS and IS couldn't be directly used to derive the stats' estimation, it's not real.
|
||||
// reason3: skyline pruning should not prune the possible index path which could feel the runtime EQ access conditions.
|
||||
innerTSCopTask := buildDataSource2TableScanByIndexJoinProp(ds, prop)
|
||||
if !innerTSCopTask.Invalid() {
|
||||
planCounter.Dec(1)
|
||||
if planCounter.Empty() {
|
||||
// planCounter is counted to end, just return this one.
|
||||
return innerTSCopTask, 1, nil
|
||||
}
|
||||
}
|
||||
innerISCopTask := buildDataSource2IndexScanByIndexJoinProp(ds, prop)
|
||||
if !innerISCopTask.Invalid() {
|
||||
planCounter.Dec(1)
|
||||
if planCounter.Empty() {
|
||||
// planCounter is counted to end, just return this one.
|
||||
return innerISCopTask, 1, nil
|
||||
}
|
||||
}
|
||||
// if we can see the both, compare the cost.
|
||||
leftIsBetter, err := compareTaskCost(innerTSCopTask, innerISCopTask, opt)
|
||||
if err != nil {
|
||||
return base.InvalidTask, 0, err
|
||||
}
|
||||
if leftIsBetter {
|
||||
return innerTSCopTask, 1, nil
|
||||
}
|
||||
return innerISCopTask, 1, nil
|
||||
}
|
||||
|
||||
// getBestIndexJoinPathResultByProp tries to iterate all possible access paths of the inner child and builds
|
||||
// index join path for each access path based on push-down indexIndexProp. It returns the best index join path result and the mapping.
|
||||
func getBestIndexJoinPathResultByProp(
|
||||
|
||||
@ -495,6 +495,8 @@ func (p *PhysicalProperty) CloneEssentialFields() *PhysicalProperty {
|
||||
MPPPartitionCols: p.MPPPartitionCols,
|
||||
RejectSort: p.RejectSort,
|
||||
CTEProducerStatus: p.CTEProducerStatus,
|
||||
// we default not to clone basic indexJoinProp by default.
|
||||
// and only call admitIndexJoinProp to inherit the indexJoinProp for special pattern operators.
|
||||
}
|
||||
return prop
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user