From b1a5536a646fa5cf435448f1661d13896c8c602a Mon Sep 17 00:00:00 2001 From: Arenatlx <314806019@qq.com> Date: Sun, 13 Apr 2025 18:47:22 +0800 Subject: [PATCH] planner: add datasource inner task build by feeling and building for index join prop. (#60495) ref pingcap/tidb#60106 --- pkg/planner/core/exhaust_physical_plans.go | 109 ++++++++++++++++++++- pkg/planner/core/find_best_task.go | 49 +++++++++ pkg/planner/core/index_join_path.go | 36 +++++++ pkg/planner/property/physical_property.go | 2 + 4 files changed, 194 insertions(+), 2 deletions(-) diff --git a/pkg/planner/core/exhaust_physical_plans.go b/pkg/planner/core/exhaust_physical_plans.go index 387df62ecd..dde892b33c 100644 --- a/pkg/planner/core/exhaust_physical_plans.go +++ b/pkg/planner/core/exhaust_physical_plans.go @@ -54,6 +54,11 @@ func exhaustPhysicalPlans4LogicalUnionScan(lp base.LogicalPlan, prop *property.P return nil, true, nil } childProp := prop.CloneEssentialFields() + childProp = admitIndexJoinProp(childProp, prop) + if childProp == nil { + // even hint can not work with this. index join prop is not satisfied in mpp task type. + return nil, false, nil + } // here we just pass down the keep order property to the child. // cuz, in union scan exec, it will feel the underlying tableReader or indexReader to get the keepOrder. us := PhysicalUnionScan{ @@ -1086,6 +1091,27 @@ type indexJoinInnerChildWrapper struct { zippedChildren []base.LogicalPlan } +// admitIndexJoinInnerChildPattern is used to check whether current physical choosing is under an index join's +// probe side. If it is, and we ganna check the original inner pattern check here to keep compatible with the old. +// the @first bool indicate whether current logical plan is valid of index join inner side. +func admitIndexJoinInnerChildPattern(p base.LogicalPlan) bool { + switch x := p.GetBaseLogicalPlan().(*logicalop.BaseLogicalPlan).Self().(type) { + case *logicalop.DataSource: + // DS that prefer tiFlash reading couldn't walk into index join. + if x.PreferStoreType&h.PreferTiFlash != 0 { + return false + } + case *logicalop.LogicalProjection, *logicalop.LogicalSelection, *logicalop.LogicalAggregation: + if !p.SCtx().GetSessionVars().EnableINLJoinInnerMultiPattern { + return false + } + case *logicalop.LogicalUnionScan: + default: // index join inner side couldn't allow join, sort, limit, etc. todo: open it. + return false + } + return true +} + func extractIndexJoinInnerChildPattern(p *logicalop.LogicalJoin, innerChild base.LogicalPlan) *indexJoinInnerChildWrapper { wrapper := &indexJoinInnerChildWrapper{} nextChild := func(pp base.LogicalPlan) base.LogicalPlan { @@ -2756,6 +2782,7 @@ func exhaustPhysicalPlans4LogicalProjection(lp base.LogicalPlan, prop *property. } ret := make([]base.PhysicalPlan, 0, len(newProps)) + newProps = admitIndexJoinProps(newProps, prop) for _, newProp := range newProps { proj := PhysicalProjection{ Exprs: p.Exprs, @@ -3110,6 +3137,12 @@ func getEnforcedStreamAggs(la *logicalop.LogicalAggregation, prop *property.Phys SortItems: property.SortItemsFromCols(la.GetGroupByCols(), desc), } if !prop.IsPrefix(childProp) { + // empty + return enforcedAggs + } + childProp = admitIndexJoinProp(childProp, prop) + if childProp == nil { + // empty return enforcedAggs } taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopMultiReadTaskType} @@ -3122,6 +3155,8 @@ func getEnforcedStreamAggs(la *logicalop.LogicalAggregation, prop *property.Phys } else if !la.PreferAggToCop { taskTypes = append(taskTypes, property.RootTaskType) } + // only admit special types for index join prop + taskTypes = admitIndexJoinTypes(taskTypes, prop) for _, taskTp := range taskTypes { copiedChildProperty := new(property.PhysicalProperty) *copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field. @@ -3169,6 +3204,10 @@ func getStreamAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base. childProp := &property.PhysicalProperty{ ExpectedCnt: math.Max(prop.ExpectedCnt*la.InputCount/la.StatsInfo().RowCount, prop.ExpectedCnt), } + childProp = admitIndexJoinProp(childProp, prop) + if childProp == nil { + return nil + } for _, possibleChildProperty := range la.PossibleProperties { childProp.SortItems = property.SortItemsFromCols(possibleChildProperty[:len(groupByCols)], desc) @@ -3194,6 +3233,7 @@ func getStreamAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base. if !la.CanPushToCop(kv.TiKV) && !la.CanPushToCop(kv.TiFlash) { taskTypes = []property.TaskType{property.RootTaskType} } + taskTypes = admitIndexJoinTypes(taskTypes, prop) for _, taskTp := range taskTypes { copiedChildProperty := new(property.PhysicalProperty) *copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field. @@ -3462,7 +3502,7 @@ func getHashAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.Ph if prop.IsFlashProp() { taskTypes = []property.TaskType{prop.TaskTp} } - + taskTypes = admitIndexJoinTypes(taskTypes, prop) for _, taskTp := range taskTypes { if taskTp == property.MppTaskType { mppAggs := tryToGetMppHashAggs(la, prop) @@ -3470,7 +3510,13 @@ func getHashAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.Ph hashAggs = append(hashAggs, mppAggs...) } } else { - agg := NewPhysicalHashAgg(la, la.StatsInfo().ScaleByExpectCnt(prop.ExpectedCnt), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, TaskTp: taskTp, CTEProducerStatus: prop.CTEProducerStatus}) + childProp := &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, TaskTp: taskTp, CTEProducerStatus: prop.CTEProducerStatus} + // mainly to fill indexJoinProp to childProp. + childProp = admitIndexJoinProp(childProp, prop) + if childProp == nil { + continue + } + agg := NewPhysicalHashAgg(la, la.StatsInfo().ScaleByExpectCnt(prop.ExpectedCnt), childProp) agg.SetSchema(la.Schema().Clone()) hashAggs = append(hashAggs, agg) } @@ -3504,6 +3550,64 @@ func exhaustPhysicalPlans4LogicalAggregation(lp base.LogicalPlan, prop *property return aggs, !(preferStream || preferHash), nil } +func admitIndexJoinTypes(types []property.TaskType, prop *property.PhysicalProperty) []property.TaskType { + if prop.TaskTp == property.MppTaskType { + // if the parent prop is mppTask, we assume it couldn't contain indexJoinProp by default, + // which is guaranteed by the parent physical plans enumeration. + return types + } + // only admit root & cop task type to push down indexJoinProp. + if prop.IndexJoinProp != nil { + newTypes := types[:0] + for _, tp := range types { + if tp != property.MppTaskType { + newTypes = append(newTypes, tp) + } + } + types = newTypes + } + return types +} + +func admitIndexJoinProps(children []*property.PhysicalProperty, prop *property.PhysicalProperty) []*property.PhysicalProperty { + if prop.TaskTp == property.MppTaskType { + // if the parent prop is mppTask, we assume it couldn't contain indexJoinProp by default, + // which is guaranteed by the parent physical plans enumeration. + return children + } + // only admit root & cop task type to push down indexJoinProp. + if prop.IndexJoinProp != nil { + newChildren := children[:0] + for _, child := range children { + if child.TaskTp != property.MppTaskType { + child.IndexJoinProp = prop.IndexJoinProp + // only admit non-mpp task prop. + newChildren = append(newChildren, child) + } + } + children = newChildren + } + return children +} + +func admitIndexJoinProp(child, prop *property.PhysicalProperty) *property.PhysicalProperty { + if prop.TaskTp == property.MppTaskType { + // if the parent prop is mppTask, we assume it couldn't contain indexJoinProp by default, + // which is guaranteed by the parent physical plans enumeration. + return child + } + // only admit root & cop task type to push down indexJoinProp. + if prop.IndexJoinProp != nil { + if child.TaskTp != property.MppTaskType { + child.IndexJoinProp = prop.IndexJoinProp + } else { + // only admit non-mpp task prop. + child = nil + } + } + return child +} + func exhaustPhysicalPlans4LogicalSelection(lp base.LogicalPlan, prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) { p := lp.(*logicalop.LogicalSelection) newProps := make([]*property.PhysicalProperty, 0, 2) @@ -3519,6 +3623,7 @@ func exhaustPhysicalPlans4LogicalSelection(lp base.LogicalPlan, prop *property.P } ret := make([]base.PhysicalPlan, 0, len(newProps)) + newProps = admitIndexJoinProps(newProps, prop) for _, newProp := range newProps { sel := PhysicalSelection{ Conditions: p.Conditions, diff --git a/pkg/planner/core/find_best_task.go b/pkg/planner/core/find_best_task.go index 227c9bfa38..28b52838d1 100644 --- a/pkg/planner/core/find_best_task.go +++ b/pkg/planner/core/find_best_task.go @@ -92,6 +92,10 @@ func GetPropByOrderByItemsContainScalarFunc(items []*util.ByItems) (_ *property. } func findBestTask4LogicalTableDual(lp base.LogicalPlan, prop *property.PhysicalProperty, planCounter *base.PlanCounterTp, opt *optimizetrace.PhysicalOptimizeOp) (base.Task, int64, error) { + if prop.IndexJoinProp != nil { + // even enforce hint can not work with this. + return base.InvalidTask, 0, nil + } p := lp.(*logicalop.LogicalTableDual) // If the required property is not empty and the row count > 1, // we cannot ensure this required property. @@ -112,6 +116,10 @@ func findBestTask4LogicalTableDual(lp base.LogicalPlan, prop *property.PhysicalP } func findBestTask4LogicalShow(lp base.LogicalPlan, prop *property.PhysicalProperty, planCounter *base.PlanCounterTp, _ *optimizetrace.PhysicalOptimizeOp) (base.Task, int64, error) { + if prop.IndexJoinProp != nil { + // even enforce hint can not work with this. + return base.InvalidTask, 0, nil + } p := lp.(*logicalop.LogicalShow) if !prop.IsSortItemEmpty() || planCounter.Empty() { return base.InvalidTask, 0, nil @@ -125,6 +133,10 @@ func findBestTask4LogicalShow(lp base.LogicalPlan, prop *property.PhysicalProper } func findBestTask4LogicalShowDDLJobs(lp base.LogicalPlan, prop *property.PhysicalProperty, planCounter *base.PlanCounterTp, _ *optimizetrace.PhysicalOptimizeOp) (base.Task, int64, error) { + if prop.IndexJoinProp != nil { + // even enforce hint can not work with this. + return base.InvalidTask, 0, nil + } p := lp.(*logicalop.LogicalShowDDLJobs) if !prop.IsSortItemEmpty() || planCounter.Empty() { return base.InvalidTask, 0, nil @@ -540,6 +552,14 @@ func findBestTask(lp base.LogicalPlan, prop *property.PhysicalProperty, planCoun planCounter.Dec(1) return bestTask, 1, nil } + // if prop is require an index join's probe side, check the inner pattern admission here. + if prop.IndexJoinProp != nil { + pass := admitIndexJoinInnerChildPattern(lp) + if !pass { + // even enforce hint can not work with this. + return base.InvalidTask, 0, nil + } + } canAddEnforcer := prop.CanAddEnforcer @@ -553,6 +573,10 @@ func findBestTask(lp base.LogicalPlan, prop *property.PhysicalProperty, planCoun // prop should be read only because its cached hashcode might be not consistent // when it is changed. So we clone a new one for the temporary changes. newProp := prop.CloneEssentialFields() + // here newProp is used as another complete copy for enforcer, fill indexJoinProp manually. + // for childProp := prop.CloneEssentialFields(), we do not clone indexJoinProp childProp for by default. + // and only call admitIndexJoinProp to inherit the indexJoinProp for special pattern operators. + newProp.IndexJoinProp = prop.IndexJoinProp var plansFitsProp, plansNeedEnforce []base.PhysicalPlan var hintWorksWithProp bool // Maybe the plan can satisfy the required property, @@ -626,6 +650,10 @@ END: } func findBestTask4LogicalMemTable(lp base.LogicalPlan, prop *property.PhysicalProperty, planCounter *base.PlanCounterTp, opt *optimizetrace.PhysicalOptimizeOp) (t base.Task, cntPlan int64, err error) { + if prop.IndexJoinProp != nil { + // even enforce hint can not work with this. + return base.InvalidTask, 0, nil + } p := lp.(*logicalop.LogicalMemTable) if prop.MPPPartitionTp != property.AnyType { return base.InvalidTask, 0, nil @@ -1421,6 +1449,19 @@ func findBestTask4LogicalDataSource(lp base.LogicalPlan, prop *property.Physical planCounter.Dec(1) return } + // if prop is require an index join's probe side, check the inner pattern admission here. + if prop.IndexJoinProp != nil { + pass := admitIndexJoinInnerChildPattern(lp) + if !pass { + // even enforce hint can not work with this. + return base.InvalidTask, 0, nil + } + // when datasource leaf is in index join's inner side, build the task out with old + // index join build logic, we can't merge this with normal datasource's index range + // because normal index range is built on expression EQ/IN. while index join's inner + // has its special runtime constants detecting and filling logic. + return getBestIndexJoinInnerTaskByProp(ds, prop, opt, planCounter) + } var cnt int64 var unenforcedTask base.Task // If prop.CanAddEnforcer is true, the prop.SortItems need to be set nil for ds.findBestTask. @@ -3108,6 +3149,10 @@ func getOriginalPhysicalIndexScan(ds *logicalop.DataSource, prop *property.Physi } func findBestTask4LogicalCTE(lp base.LogicalPlan, prop *property.PhysicalProperty, counter *base.PlanCounterTp, pop *optimizetrace.PhysicalOptimizeOp) (t base.Task, cntPlan int64, err error) { + if prop.IndexJoinProp != nil { + // even enforce hint can not work with this. + return base.InvalidTask, 0, nil + } p := lp.(*logicalop.LogicalCTE) if p.ChildLen() > 0 { return p.BaseLogicalPlan.FindBestTask(prop, counter, pop) @@ -3142,6 +3187,10 @@ func findBestTask4LogicalCTE(lp base.LogicalPlan, prop *property.PhysicalPropert } func findBestTask4LogicalCTETable(lp base.LogicalPlan, prop *property.PhysicalProperty, _ *base.PlanCounterTp, _ *optimizetrace.PhysicalOptimizeOp) (t base.Task, cntPlan int64, err error) { + if prop.IndexJoinProp != nil { + // even enforce hint can not work with this. + return base.InvalidTask, 0, nil + } p := lp.(*logicalop.LogicalCTETable) if !prop.IsSortItemEmpty() { return base.InvalidTask, 0, nil diff --git a/pkg/planner/core/index_join_path.go b/pkg/planner/core/index_join_path.go index 0dd0fbc551..2eb296102d 100644 --- a/pkg/planner/core/index_join_path.go +++ b/pkg/planner/core/index_join_path.go @@ -25,10 +25,12 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/planner/cardinality" + "github.com/pingcap/tidb/pkg/planner/core/base" "github.com/pingcap/tidb/pkg/planner/core/operator/logicalop" "github.com/pingcap/tidb/pkg/planner/planctx" "github.com/pingcap/tidb/pkg/planner/property" "github.com/pingcap/tidb/pkg/planner/util" + "github.com/pingcap/tidb/pkg/planner/util/optimizetrace" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" @@ -630,6 +632,40 @@ func getIndexJoinIntPKPathInfo(ds *logicalop.DataSource, innerJoinKeys, outerJoi return keyOff2IdxOff, newOuterJoinKeys, ranges, chosenPath, true } +// getBestIndexJoinInnerTaskByProp tries to build the best inner child task from ds for index join by the given property. +func getBestIndexJoinInnerTaskByProp(ds *logicalop.DataSource, prop *property.PhysicalProperty, + opt *optimizetrace.PhysicalOptimizeOp, planCounter *base.PlanCounterTp) (base.Task, int64, error) { + // the below code is quite similar from the original logic + // reason1: we need to leverage original indexPathInfo down related logic to build constant range for index plan. + // reason2: the ranges from TS and IS couldn't be directly used to derive the stats' estimation, it's not real. + // reason3: skyline pruning should not prune the possible index path which could feel the runtime EQ access conditions. + innerTSCopTask := buildDataSource2TableScanByIndexJoinProp(ds, prop) + if !innerTSCopTask.Invalid() { + planCounter.Dec(1) + if planCounter.Empty() { + // planCounter is counted to end, just return this one. + return innerTSCopTask, 1, nil + } + } + innerISCopTask := buildDataSource2IndexScanByIndexJoinProp(ds, prop) + if !innerISCopTask.Invalid() { + planCounter.Dec(1) + if planCounter.Empty() { + // planCounter is counted to end, just return this one. + return innerISCopTask, 1, nil + } + } + // if we can see the both, compare the cost. + leftIsBetter, err := compareTaskCost(innerTSCopTask, innerISCopTask, opt) + if err != nil { + return base.InvalidTask, 0, err + } + if leftIsBetter { + return innerTSCopTask, 1, nil + } + return innerISCopTask, 1, nil +} + // getBestIndexJoinPathResultByProp tries to iterate all possible access paths of the inner child and builds // index join path for each access path based on push-down indexIndexProp. It returns the best index join path result and the mapping. func getBestIndexJoinPathResultByProp( diff --git a/pkg/planner/property/physical_property.go b/pkg/planner/property/physical_property.go index abc895af51..e0d1980f2f 100644 --- a/pkg/planner/property/physical_property.go +++ b/pkg/planner/property/physical_property.go @@ -495,6 +495,8 @@ func (p *PhysicalProperty) CloneEssentialFields() *PhysicalProperty { MPPPartitionCols: p.MPPPartitionCols, RejectSort: p.RejectSort, CTEProducerStatus: p.CTEProducerStatus, + // we default not to clone basic indexJoinProp by default. + // and only call admitIndexJoinProp to inherit the indexJoinProp for special pattern operators. } return prop }