From 8d589d2c7c73d1407f68ba780fa2001d94f8a77a Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Fri, 24 Jun 2022 10:38:37 +0800 Subject: [PATCH] planner: new cost formula for IndexJoin (#35671) ref pingcap/tidb#35240 --- planner/core/plan_cost.go | 78 ++++++++++++++++++++++++++++++--------- planner/core/task.go | 6 +-- 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/planner/core/plan_cost.go b/planner/core/plan_cost.go index c0598e74f2..16d3478e49 100644 --- a/planner/core/plan_cost.go +++ b/planner/core/plan_cost.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/util/paging" @@ -223,11 +224,8 @@ func (p *PhysicalIndexLookUpReader) GetPlanCost(taskType property.TaskType, cost // table-side seek cost p.planCost += estimateNetSeekCost(p.tablePlan) - if p.ctx.GetSessionVars().CostModelVersion == modelVer2 { - // accumulate the real double-read cost: numDoubleReadTasks * seekFactor - numDoubleReadTasks := p.estNumDoubleReadTasks(costFlag) - p.planCost += numDoubleReadTasks * p.ctx.GetSessionVars().GetSeekFactor(ts.Table) - } + // double read cost + p.planCost += p.estDoubleReadCost(ts.Table, costFlag) // consider concurrency p.planCost /= float64(p.ctx.GetSessionVars().DistSQLScanConcurrency()) @@ -238,14 +236,21 @@ func (p *PhysicalIndexLookUpReader) GetPlanCost(taskType property.TaskType, cost return p.planCost, nil } -func (p *PhysicalIndexLookUpReader) estNumDoubleReadTasks(costFlag uint64) float64 { - doubleReadRows := p.indexPlan.StatsCount() +func (p *PhysicalIndexLookUpReader) estDoubleReadCost(tbl *model.TableInfo, costFlag uint64) float64 { + if p.ctx.GetSessionVars().CostModelVersion == modelVer1 { + // only consider double-read cost on modelVer2 + return 0 + } + // estimate the double-read cost: (numDoubleReadTasks * seekFactor) / concurrency + doubleReadRows := getCardinality(p.indexPlan, costFlag) batchSize := float64(p.ctx.GetSessionVars().IndexLookupSize) + concurrency := math.Max(1.0, float64(p.ctx.GetSessionVars().IndexLookupConcurrency())) + seekFactor := p.ctx.GetSessionVars().GetSeekFactor(tbl) // distRatio indicates how many requests corresponding to a batch, current value is from experiments. // TODO: estimate it by using index correlation or make it configurable. distRatio := 40.0 - numDoubleReadTasks := (doubleReadRows / batchSize) * distRatio - return numDoubleReadTasks // use Float64 instead of Int like `Ceil(...)` to make the cost continuous + numDoubleReadTasks := (doubleReadRows / batchSize) * distRatio // use Float64 instead of Int like `Ceil(...)` to make the cost continuous. + return (numDoubleReadTasks * seekFactor) / concurrency } // GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost. @@ -453,7 +458,7 @@ func (p *PhysicalIndexScan) GetPlanCost(taskType property.TaskType, costFlag uin } // GetCost computes the cost of index join operator and its children. -func (p *PhysicalIndexJoin) GetCost(outerCnt, innerCnt float64, outerCost, innerCost float64) float64 { +func (p *PhysicalIndexJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64, costFlag uint64) float64 { var cpuCost float64 sessVars := p.ctx.GetSessionVars() // Add the cost of evaluating outer filter, since inner filter of index join @@ -490,6 +495,9 @@ func (p *PhysicalIndexJoin) GetCost(outerCnt, innerCnt float64, outerCost, inner numPairs = 0 } } + if hasCostFlag(costFlag, CostFlagUseTrueCardinality) { + numPairs = getOperatorActRows(p) + } probeCost := numPairs * sessVars.GetCPUFactor() // Cost of additional concurrent goroutines. cpuCost += probeCost + (innerConcurrency+1.0)*sessVars.GetConcurrencyFactor() @@ -498,7 +506,23 @@ func (p *PhysicalIndexJoin) GetCost(outerCnt, innerCnt float64, outerCost, inner memoryCost := innerConcurrency * (batchSize * distinctFactor) * innerCnt * sessVars.GetMemoryFactor() // Cost of inner child plan, i.e, mainly I/O and network cost. innerPlanCost := outerCnt * innerCost - return outerCost + innerPlanCost + cpuCost + memoryCost + return outerCost + innerPlanCost + cpuCost + memoryCost + p.estDoubleReadCost(outerCnt) +} + +func (p *PhysicalIndexJoin) estDoubleReadCost(doubleReadRows float64) float64 { + if p.ctx.GetSessionVars().CostModelVersion == modelVer1 { + // only consider double-read cost on modelVer2 + return 0 + } + // estimate the double read cost for IndexJoin: (double-read-tasks * seek-factor) / concurrency + seekFactor := p.ctx.GetSessionVars().GetSeekFactor(nil) + batchSize := math.Max(1.0, float64(p.ctx.GetSessionVars().IndexJoinBatchSize)) + concurrency := math.Max(1.0, float64(p.ctx.GetSessionVars().IndexLookupJoinConcurrency())) + // distRatio indicates how many requests corresponding to a batch, current value is from experiments. + // TODO: estimate it by using index correlation or make it configurable. + distRatio := 40.0 + numDoubleReadTasks := (doubleReadRows / batchSize) * distRatio + return (numDoubleReadTasks * seekFactor) / concurrency } // GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost. @@ -517,13 +541,17 @@ func (p *PhysicalIndexJoin) GetPlanCost(taskType property.TaskType, costFlag uin } outerCnt := getCardinality(outerChild, costFlag) innerCnt := getCardinality(innerChild, costFlag) - p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost) + if hasCostFlag(costFlag, CostFlagUseTrueCardinality) && outerCnt > 0 { + innerCnt /= outerCnt // corresponding to one outer row when calculating IndexJoin costs + innerCost /= outerCnt + } + p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost, costFlag) p.planCostInit = true return p.planCost, nil } // GetCost computes the cost of index merge join operator and its children. -func (p *PhysicalIndexHashJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64) float64 { +func (p *PhysicalIndexHashJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64, costFlag uint64) float64 { var cpuCost float64 sessVars := p.ctx.GetSessionVars() // Add the cost of evaluating outer filter, since inner filter of index join @@ -561,6 +589,9 @@ func (p *PhysicalIndexHashJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost numPairs = 0 } } + if hasCostFlag(costFlag, CostFlagUseTrueCardinality) { + numPairs = getOperatorActRows(p) + } // Inner workers do hash join in parallel, but they can only save ONE outer // batch results. So as the number of outer batch exceeds inner concurrency, // it would fall back to linear execution. In a word, the hash join only runs @@ -579,7 +610,7 @@ func (p *PhysicalIndexHashJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost memoryCost := concurrency * (batchSize * distinctFactor) * innerCnt * sessVars.GetMemoryFactor() // Cost of inner child plan, i.e, mainly I/O and network cost. innerPlanCost := outerCnt * innerCost - return outerCost + innerPlanCost + cpuCost + memoryCost + return outerCost + innerPlanCost + cpuCost + memoryCost + p.estDoubleReadCost(outerCnt) } // GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost. @@ -598,13 +629,17 @@ func (p *PhysicalIndexHashJoin) GetPlanCost(taskType property.TaskType, costFlag } outerCnt := getCardinality(outerChild, costFlag) innerCnt := getCardinality(innerChild, costFlag) - p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost) + if hasCostFlag(costFlag, CostFlagUseTrueCardinality) && outerCnt > 0 { + innerCnt /= outerCnt // corresponding to one outer row when calculating IndexJoin costs + innerCost /= outerCnt + } + p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost, costFlag) p.planCostInit = true return p.planCost, nil } // GetCost computes the cost of index merge join operator and its children. -func (p *PhysicalIndexMergeJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64) float64 { +func (p *PhysicalIndexMergeJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64, costFlag uint64) float64 { var cpuCost float64 sessVars := p.ctx.GetSessionVars() // Add the cost of evaluating outer filter, since inner filter of index join @@ -644,6 +679,9 @@ func (p *PhysicalIndexMergeJoin) GetCost(outerCnt, innerCnt, outerCost, innerCos numPairs = 0 } } + if hasCostFlag(costFlag, CostFlagUseTrueCardinality) { + numPairs = getOperatorActRows(p) + } avgProbeCnt := numPairs / outerCnt var probeCost float64 // Inner workers do merge join in parallel, but they can only save ONE outer batch @@ -662,7 +700,7 @@ func (p *PhysicalIndexMergeJoin) GetCost(outerCnt, innerCnt, outerCost, innerCos memoryCost := innerConcurrency * (batchSize * avgProbeCnt) * sessVars.GetMemoryFactor() innerPlanCost := outerCnt * innerCost - return outerCost + innerPlanCost + cpuCost + memoryCost + return outerCost + innerPlanCost + cpuCost + memoryCost + p.estDoubleReadCost(outerCnt) } // GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost. @@ -681,7 +719,11 @@ func (p *PhysicalIndexMergeJoin) GetPlanCost(taskType property.TaskType, costFla } outerCnt := getCardinality(outerChild, costFlag) innerCnt := getCardinality(innerChild, costFlag) - p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost) + if hasCostFlag(costFlag, CostFlagUseTrueCardinality) && outerCnt > 0 { + innerCnt /= outerCnt // corresponding to one outer row when calculating IndexJoin costs + innerCost /= outerCnt + } + p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost, costFlag) p.planCostInit = true return p.planCost, nil } diff --git a/planner/core/task.go b/planner/core/task.go index 4ecc67fc67..2683981152 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -266,7 +266,7 @@ func (p *PhysicalIndexMergeJoin) attach2Task(tasks ...task) task { } t := &rootTask{ p: p, - cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost()), + cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost(), 0), } p.cost = t.cost() return t @@ -282,7 +282,7 @@ func (p *PhysicalIndexHashJoin) attach2Task(tasks ...task) task { } t := &rootTask{ p: p, - cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost()), + cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost(), 0), } p.cost = t.cost() return t @@ -298,7 +298,7 @@ func (p *PhysicalIndexJoin) attach2Task(tasks ...task) task { } t := &rootTask{ p: p, - cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost()), + cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost(), 0), } p.cost = t.cost() return t