planner: refine cop task as a capital one for latter pkg move (#52506)
ref pingcap/tidb#52181
This commit is contained in:
@ -1075,7 +1075,7 @@ func (p *LogicalJoin) constructInnerTableScanTask(
|
||||
if usedStats != nil && usedStats.GetUsedInfo(ts.physicalTableID) != nil {
|
||||
ts.usedStatsInfo = usedStats.GetUsedInfo(ts.physicalTableID)
|
||||
}
|
||||
copTask := &copTask{
|
||||
copTask := &CopTask{
|
||||
tablePlan: ts,
|
||||
indexPlanFinished: true,
|
||||
tblColHists: ds.TblColHists,
|
||||
@ -1237,7 +1237,7 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
|
||||
tblColHists: ds.TblColHists,
|
||||
pkIsHandleCol: ds.getPKIsHandleCol(),
|
||||
}.Init(ds.SCtx(), ds.QueryBlockOffset())
|
||||
cop := &copTask{
|
||||
cop := &CopTask{
|
||||
indexPlan: is,
|
||||
tblColHists: ds.TblColHists,
|
||||
tblCols: ds.TblCols,
|
||||
|
||||
@ -425,8 +425,8 @@ func getTaskPlanCost(t Task, pop *coreusage.PhysicalOptimizeOp) (float64, bool,
|
||||
switch t.(type) {
|
||||
case *RootTask:
|
||||
taskType = property.RootTaskType
|
||||
case *copTask: // no need to know whether the task is single-read or double-read, so both CopSingleReadTaskType and CopDoubleReadTaskType are OK
|
||||
cop := t.(*copTask)
|
||||
case *CopTask: // no need to know whether the task is single-read or double-read, so both CopSingleReadTaskType and CopDoubleReadTaskType are OK
|
||||
cop := t.(*CopTask)
|
||||
if cop.indexPlan != nil && cop.tablePlan != nil { // handle IndexLookup specially
|
||||
taskType = property.CopMultiReadTaskType
|
||||
// keep compatible with the old cost interface, for CopMultiReadTask, the cost is idxCost + tblCost.
|
||||
@ -481,7 +481,7 @@ func getTaskPlanCost(t Task, pop *coreusage.PhysicalOptimizeOp) (float64, bool,
|
||||
// It's a very special case for index merge case.
|
||||
// t.plan() == nil in index merge COP case, it means indexPlanFinished is false in other words.
|
||||
cost := 0.0
|
||||
copTsk := t.(*copTask)
|
||||
copTsk := t.(*CopTask)
|
||||
for _, partialScan := range copTsk.idxMergePartPlans {
|
||||
partialCost, err := getPlanCost(partialScan, taskType, coreusage.NewDefaultPlanCostOption().WithOptimizeTracer(pop))
|
||||
if err != nil {
|
||||
@ -1610,7 +1610,7 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c
|
||||
})
|
||||
path := candidate.path
|
||||
scans := make([]PhysicalPlan, 0, len(path.PartialIndexPaths))
|
||||
cop := &copTask{
|
||||
cop := &CopTask{
|
||||
indexPlanFinished: false,
|
||||
tblColHists: ds.TblColHists,
|
||||
}
|
||||
@ -2019,7 +2019,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty,
|
||||
}
|
||||
path := candidate.path
|
||||
is := ds.getOriginalPhysicalIndexScan(prop, path, candidate.isMatchProp, candidate.path.IsSingleScan)
|
||||
cop := &copTask{
|
||||
cop := &CopTask{
|
||||
indexPlan: is,
|
||||
tblColHists: ds.TblColHists,
|
||||
tblCols: ds.TblCols,
|
||||
@ -2204,7 +2204,7 @@ func (is *PhysicalIndexScan) initSchema(idxExprCols []*expression.Column, isDoub
|
||||
is.SetSchema(expression.NewSchema(indexCols...))
|
||||
}
|
||||
|
||||
func (is *PhysicalIndexScan) addPushedDownSelection(copTask *copTask, p *DataSource, path *util.AccessPath, finalStats *property.StatsInfo) {
|
||||
func (is *PhysicalIndexScan) addPushedDownSelection(copTask *CopTask, p *DataSource, path *util.AccessPath, finalStats *property.StatsInfo) {
|
||||
// Add filter condition to table plan now.
|
||||
indexConds, tableConds := path.IndexFilters, path.TableFilters
|
||||
tableConds, copTask.rootTaskConds = SplitSelCondsWithVirtualColumn(tableConds)
|
||||
@ -2477,7 +2477,7 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
|
||||
// prop.TaskTp is cop related, just return invalidTask.
|
||||
return invalidTask, nil
|
||||
}
|
||||
copTask := &copTask{
|
||||
copTask := &CopTask{
|
||||
tablePlan: ts,
|
||||
indexPlanFinished: true,
|
||||
tblColHists: ds.TblColHists,
|
||||
@ -2711,7 +2711,7 @@ func (ts *PhysicalTableScan) addPushedDownSelectionToMppTask(mpp *MppTask, stats
|
||||
return mpp
|
||||
}
|
||||
|
||||
func (ts *PhysicalTableScan) addPushedDownSelection(copTask *copTask, stats *property.StatsInfo) {
|
||||
func (ts *PhysicalTableScan) addPushedDownSelection(copTask *CopTask, stats *property.StatsInfo) {
|
||||
ts.filterCondition, copTask.rootTaskConds = SplitSelCondsWithVirtualColumn(ts.filterCondition)
|
||||
var newRootConds []expression.Expression
|
||||
ts.filterCondition, newRootConds = expression.PushDownExprs(GetPushDownCtx(ts.SCtx()), ts.filterCondition, ts.StoreType)
|
||||
|
||||
@ -1555,7 +1555,7 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReader(_ context.Context, dbName m
|
||||
}
|
||||
}
|
||||
|
||||
cop := &copTask{
|
||||
cop := &CopTask{
|
||||
indexPlan: is,
|
||||
tablePlan: ts,
|
||||
tblColHists: is.StatsInfo().HistColl,
|
||||
|
||||
@ -29,91 +29,18 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/planner/core/internal/base"
|
||||
"github.com/pingcap/tidb/pkg/planner/property"
|
||||
"github.com/pingcap/tidb/pkg/planner/util"
|
||||
"github.com/pingcap/tidb/pkg/statistics"
|
||||
"github.com/pingcap/tidb/pkg/types"
|
||||
"github.com/pingcap/tidb/pkg/util/chunk"
|
||||
"github.com/pingcap/tidb/pkg/util/collate"
|
||||
"github.com/pingcap/tidb/pkg/util/logutil"
|
||||
"github.com/pingcap/tidb/pkg/util/paging"
|
||||
"github.com/pingcap/tidb/pkg/util/plancodec"
|
||||
"github.com/pingcap/tidb/pkg/util/size"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var _ Task = &copTask{}
|
||||
|
||||
// copTask is a task that runs in a distributed kv store.
|
||||
// TODO: In future, we should split copTask to indexTask and tableTask.
|
||||
type copTask struct {
|
||||
indexPlan PhysicalPlan
|
||||
tablePlan PhysicalPlan
|
||||
// indexPlanFinished means we have finished index plan.
|
||||
indexPlanFinished bool
|
||||
// keepOrder indicates if the plan scans data by order.
|
||||
keepOrder bool
|
||||
// needExtraProj means an extra prune is needed because
|
||||
// in double read / index merge cases, they may output one more column for handle(row id).
|
||||
needExtraProj bool
|
||||
// originSchema is the target schema to be projected to when needExtraProj is true.
|
||||
originSchema *expression.Schema
|
||||
|
||||
extraHandleCol *expression.Column
|
||||
commonHandleCols []*expression.Column
|
||||
// tblColHists stores the original stats of DataSource, it is used to get
|
||||
// average row width when computing network cost.
|
||||
tblColHists *statistics.HistColl
|
||||
// tblCols stores the original columns of DataSource before being pruned, it
|
||||
// is used to compute average row width when computing scan cost.
|
||||
tblCols []*expression.Column
|
||||
|
||||
idxMergePartPlans []PhysicalPlan
|
||||
idxMergeIsIntersection bool
|
||||
idxMergeAccessMVIndex bool
|
||||
|
||||
// rootTaskConds stores select conditions containing virtual columns.
|
||||
// These conditions can't push to TiKV, so we have to add a selection for rootTask
|
||||
rootTaskConds []expression.Expression
|
||||
|
||||
// For table partition.
|
||||
physPlanPartInfo PhysPlanPartInfo
|
||||
|
||||
// expectCnt is the expected row count of upper task, 0 for unlimited.
|
||||
// It's used for deciding whether using paging distsql.
|
||||
expectCnt uint64
|
||||
}
|
||||
|
||||
// Invalid implements Task interface.
|
||||
func (t *copTask) Invalid() bool {
|
||||
return t.tablePlan == nil && t.indexPlan == nil && len(t.idxMergePartPlans) == 0
|
||||
}
|
||||
|
||||
// Count implements Task interface.
|
||||
func (t *copTask) Count() float64 {
|
||||
if t.indexPlanFinished {
|
||||
return t.tablePlan.StatsInfo().RowCount
|
||||
}
|
||||
return t.indexPlan.StatsInfo().RowCount
|
||||
}
|
||||
|
||||
// Copy implements Task interface.
|
||||
func (t *copTask) Copy() Task {
|
||||
nt := *t
|
||||
return &nt
|
||||
}
|
||||
|
||||
// Plan implements Task interface.
|
||||
// copTask plan should be careful with indexMergeReader, whose real plan is stored in
|
||||
// idxMergePartPlans, when its indexPlanFinished is marked with false.
|
||||
func (t *copTask) Plan() PhysicalPlan {
|
||||
if t.indexPlanFinished {
|
||||
return t.tablePlan
|
||||
}
|
||||
return t.indexPlan
|
||||
}
|
||||
|
||||
func attachPlan2Task(p PhysicalPlan, t Task) Task {
|
||||
switch v := t.(type) {
|
||||
case *copTask:
|
||||
case *CopTask:
|
||||
if v.indexPlanFinished {
|
||||
p.SetChildren(v.tablePlan)
|
||||
v.tablePlan = p
|
||||
@ -132,7 +59,7 @@ func attachPlan2Task(p PhysicalPlan, t Task) Task {
|
||||
}
|
||||
|
||||
// finishIndexPlan means we no longer add plan to index plan, and compute the network cost for it.
|
||||
func (t *copTask) finishIndexPlan() {
|
||||
func (t *CopTask) finishIndexPlan() {
|
||||
if t.indexPlanFinished {
|
||||
return
|
||||
}
|
||||
@ -150,7 +77,7 @@ func (t *copTask) finishIndexPlan() {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *copTask) getStoreType() kv.StoreType {
|
||||
func (t *CopTask) getStoreType() kv.StoreType {
|
||||
if t.tablePlan == nil {
|
||||
return kv.TiKV
|
||||
}
|
||||
@ -167,42 +94,6 @@ func (t *copTask) getStoreType() kv.StoreType {
|
||||
return kv.TiKV
|
||||
}
|
||||
|
||||
// MemoryUsage return the memory usage of copTask
|
||||
func (t *copTask) MemoryUsage() (sum int64) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
sum = size.SizeOfInterface*(2+int64(cap(t.idxMergePartPlans)+cap(t.rootTaskConds))) + size.SizeOfBool*3 + size.SizeOfUint64 +
|
||||
size.SizeOfPointer*(3+int64(cap(t.commonHandleCols)+cap(t.tblCols))) + size.SizeOfSlice*4 + t.physPlanPartInfo.MemoryUsage()
|
||||
if t.indexPlan != nil {
|
||||
sum += t.indexPlan.MemoryUsage()
|
||||
}
|
||||
if t.tablePlan != nil {
|
||||
sum += t.tablePlan.MemoryUsage()
|
||||
}
|
||||
if t.originSchema != nil {
|
||||
sum += t.originSchema.MemoryUsage()
|
||||
}
|
||||
if t.extraHandleCol != nil {
|
||||
sum += t.extraHandleCol.MemoryUsage()
|
||||
}
|
||||
|
||||
for _, col := range t.commonHandleCols {
|
||||
sum += col.MemoryUsage()
|
||||
}
|
||||
for _, col := range t.tblCols {
|
||||
sum += col.MemoryUsage()
|
||||
}
|
||||
for _, p := range t.idxMergePartPlans {
|
||||
sum += p.MemoryUsage()
|
||||
}
|
||||
for _, expr := range t.rootTaskConds {
|
||||
sum += expr.MemoryUsage()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Attach2Task implements PhysicalPlan interface.
|
||||
func (p *basePhysicalPlan) Attach2Task(tasks ...Task) Task {
|
||||
t := tasks[0].ConvertToRootTask(p.SCtx())
|
||||
@ -563,8 +454,8 @@ func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...Task) Task {
|
||||
}
|
||||
|
||||
func (p *PhysicalHashJoin) attach2TaskForTiFlash(tasks ...Task) Task {
|
||||
lTask, lok := tasks[0].(*copTask)
|
||||
rTask, rok := tasks[1].(*copTask)
|
||||
lTask, lok := tasks[0].(*CopTask)
|
||||
rTask, rok := tasks[1].(*CopTask)
|
||||
if !lok || !rok {
|
||||
return p.attach2TaskForMpp(tasks...)
|
||||
}
|
||||
@ -577,7 +468,7 @@ func (p *PhysicalHashJoin) attach2TaskForTiFlash(tasks ...Task) Task {
|
||||
rTask.finishIndexPlan()
|
||||
}
|
||||
|
||||
task := &copTask{
|
||||
task := &CopTask{
|
||||
tblColHists: rTask.tblColHists,
|
||||
indexPlanFinished: true,
|
||||
tablePlan: p,
|
||||
@ -595,7 +486,7 @@ func (p *PhysicalMergeJoin) Attach2Task(tasks ...Task) Task {
|
||||
return t
|
||||
}
|
||||
|
||||
func buildIndexLookUpTask(ctx PlanContext, t *copTask) *RootTask {
|
||||
func buildIndexLookUpTask(ctx PlanContext, t *CopTask) *RootTask {
|
||||
newTask := &RootTask{}
|
||||
p := PhysicalIndexLookUpReader{
|
||||
tablePlan: t.tablePlan,
|
||||
@ -664,114 +555,7 @@ func calcPagingCost(ctx PlanContext, indexPlan PhysicalPlan, expectCnt uint64) f
|
||||
return math.Max(pagingCst-sessVars.GetSeekFactor(nil), 0)
|
||||
}
|
||||
|
||||
// ConvertToRootTask implements Task interface.
|
||||
func (t *copTask) ConvertToRootTask(ctx PlanContext) *RootTask {
|
||||
// copy one to avoid changing itself.
|
||||
return t.Copy().(*copTask).convertToRootTaskImpl(ctx)
|
||||
}
|
||||
|
||||
func (t *copTask) convertToRootTaskImpl(ctx PlanContext) *RootTask {
|
||||
// copTasks are run in parallel, to make the estimated cost closer to execution time, we amortize
|
||||
// the cost to cop iterator workers. According to `CopClient::Send`, the concurrency
|
||||
// is Min(DistSQLScanConcurrency, numRegionsInvolvedInScan), since we cannot infer
|
||||
// the number of regions involved, we simply use DistSQLScanConcurrency.
|
||||
t.finishIndexPlan()
|
||||
// Network cost of transferring rows of table scan to TiDB.
|
||||
if t.tablePlan != nil {
|
||||
tp := t.tablePlan
|
||||
for len(tp.Children()) > 0 {
|
||||
if len(tp.Children()) == 1 {
|
||||
tp = tp.Children()[0]
|
||||
} else {
|
||||
join := tp.(*PhysicalHashJoin)
|
||||
tp = join.children[1-join.InnerChildIdx]
|
||||
}
|
||||
}
|
||||
ts := tp.(*PhysicalTableScan)
|
||||
prevColumnLen := len(ts.Columns)
|
||||
prevSchema := ts.schema.Clone()
|
||||
ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns)
|
||||
if !t.needExtraProj && len(ts.Columns) > prevColumnLen {
|
||||
// Add an projection to make sure not to output extract columns.
|
||||
t.needExtraProj = true
|
||||
t.originSchema = prevSchema
|
||||
}
|
||||
}
|
||||
newTask := &RootTask{}
|
||||
if t.idxMergePartPlans != nil {
|
||||
p := PhysicalIndexMergeReader{
|
||||
partialPlans: t.idxMergePartPlans,
|
||||
tablePlan: t.tablePlan,
|
||||
IsIntersectionType: t.idxMergeIsIntersection,
|
||||
AccessMVIndex: t.idxMergeAccessMVIndex,
|
||||
KeepOrder: t.keepOrder,
|
||||
}.Init(ctx, t.idxMergePartPlans[0].QueryBlockOffset())
|
||||
p.PlanPartInfo = t.physPlanPartInfo
|
||||
setTableScanToTableRowIDScan(p.tablePlan)
|
||||
newTask.SetPlan(p)
|
||||
t.handleRootTaskConds(ctx, newTask)
|
||||
if t.needExtraProj {
|
||||
schema := t.originSchema
|
||||
proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.StatsInfo(), t.idxMergePartPlans[0].QueryBlockOffset(), nil)
|
||||
proj.SetSchema(schema)
|
||||
proj.SetChildren(p)
|
||||
newTask.SetPlan(proj)
|
||||
}
|
||||
return newTask
|
||||
}
|
||||
if t.indexPlan != nil && t.tablePlan != nil {
|
||||
newTask = buildIndexLookUpTask(ctx, t)
|
||||
} else if t.indexPlan != nil {
|
||||
p := PhysicalIndexReader{indexPlan: t.indexPlan}.Init(ctx, t.indexPlan.QueryBlockOffset())
|
||||
p.PlanPartInfo = t.physPlanPartInfo
|
||||
p.SetStats(t.indexPlan.StatsInfo())
|
||||
newTask.SetPlan(p)
|
||||
} else {
|
||||
tp := t.tablePlan
|
||||
for len(tp.Children()) > 0 {
|
||||
if len(tp.Children()) == 1 {
|
||||
tp = tp.Children()[0]
|
||||
} else {
|
||||
join := tp.(*PhysicalHashJoin)
|
||||
tp = join.children[1-join.InnerChildIdx]
|
||||
}
|
||||
}
|
||||
ts := tp.(*PhysicalTableScan)
|
||||
p := PhysicalTableReader{
|
||||
tablePlan: t.tablePlan,
|
||||
StoreType: ts.StoreType,
|
||||
IsCommonHandle: ts.Table.IsCommonHandle,
|
||||
}.Init(ctx, t.tablePlan.QueryBlockOffset())
|
||||
p.PlanPartInfo = t.physPlanPartInfo
|
||||
p.SetStats(t.tablePlan.StatsInfo())
|
||||
|
||||
// If agg was pushed down in Attach2Task(), the partial agg was placed on the top of tablePlan, the final agg was
|
||||
// placed above the PhysicalTableReader, and the schema should have been set correctly for them, the schema of
|
||||
// partial agg contains the columns needed by the final agg.
|
||||
// If we add the projection here, the projection will be between the final agg and the partial agg, then the
|
||||
// schema will be broken, the final agg will fail to find needed columns in ResolveIndices().
|
||||
// Besides, the agg would only be pushed down if it doesn't contain virtual columns, so virtual column should not be affected.
|
||||
aggPushedDown := false
|
||||
switch p.tablePlan.(type) {
|
||||
case *PhysicalHashAgg, *PhysicalStreamAgg:
|
||||
aggPushedDown = true
|
||||
}
|
||||
|
||||
if t.needExtraProj && !aggPushedDown {
|
||||
proj := PhysicalProjection{Exprs: expression.Column2Exprs(t.originSchema.Columns)}.Init(ts.SCtx(), ts.StatsInfo(), ts.QueryBlockOffset(), nil)
|
||||
proj.SetSchema(t.originSchema)
|
||||
proj.SetChildren(p)
|
||||
newTask.SetPlan(proj)
|
||||
} else {
|
||||
newTask.SetPlan(p)
|
||||
}
|
||||
}
|
||||
|
||||
t.handleRootTaskConds(ctx, newTask)
|
||||
return newTask
|
||||
}
|
||||
|
||||
func (t *copTask) handleRootTaskConds(ctx PlanContext, newTask *RootTask) {
|
||||
func (t *CopTask) handleRootTaskConds(ctx PlanContext, newTask *RootTask) {
|
||||
if len(t.rootTaskConds) > 0 {
|
||||
selectivity, _, err := cardinality.Selectivity(ctx, t.tblColHists, t.rootTaskConds, nil)
|
||||
if err != nil {
|
||||
@ -820,7 +604,7 @@ func (p *PhysicalLimit) Attach2Task(tasks ...Task) Task {
|
||||
}
|
||||
|
||||
sunk := false
|
||||
if cop, ok := t.(*copTask); ok {
|
||||
if cop, ok := t.(*CopTask); ok {
|
||||
suspendLimitAboveTablePlan := func() {
|
||||
newCount := p.Offset + p.Count
|
||||
childProfile := cop.tablePlan.StatsInfo()
|
||||
@ -844,7 +628,7 @@ func (p *PhysicalLimit) Attach2Task(tasks ...Task) Task {
|
||||
// but "regionNum" is unknown since the copTask can be a double read, so we ignore it now.
|
||||
stats := deriveLimitStats(childProfile, float64(newCount))
|
||||
pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.SCtx(), stats, p.QueryBlockOffset())
|
||||
cop = attachPlan2Task(pushedDownLimit, cop).(*copTask)
|
||||
cop = attachPlan2Task(pushedDownLimit, cop).(*CopTask)
|
||||
// Don't use clone() so that Limit and its children share the same schema. Otherwise the virtual generated column may not be resolved right.
|
||||
pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema())
|
||||
}
|
||||
@ -1116,7 +900,7 @@ func (p *PhysicalTopN) containVirtualColumn(tCols []*expression.Column) bool {
|
||||
}
|
||||
|
||||
// canPushDownToTiKV checks whether this topN can be pushed down to TiKV.
|
||||
func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool {
|
||||
func (p *PhysicalTopN) canPushDownToTiKV(copTask *CopTask) bool {
|
||||
if !p.canExpressionConvertedToPB(kv.TiKV) {
|
||||
return false
|
||||
}
|
||||
@ -1154,7 +938,7 @@ func (p *PhysicalTopN) Attach2Task(tasks ...Task) Task {
|
||||
cols = append(cols, expression.ExtractColumns(item.Expr)...)
|
||||
}
|
||||
needPushDown := len(cols) > 0
|
||||
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) && len(copTask.rootTaskConds) == 0 {
|
||||
if copTask, ok := t.(*CopTask); ok && needPushDown && p.canPushDownToTiKV(copTask) && len(copTask.rootTaskConds) == 0 {
|
||||
// If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and
|
||||
// push it to table plan.
|
||||
var pushedDownTopN *PhysicalTopN
|
||||
@ -1195,7 +979,7 @@ func (p *PhysicalExpand) Attach2Task(tasks ...Task) Task {
|
||||
// Attach2Task implements PhysicalPlan interface.
|
||||
func (p *PhysicalProjection) Attach2Task(tasks ...Task) Task {
|
||||
t := tasks[0].Copy()
|
||||
if cop, ok := t.(*copTask); ok {
|
||||
if cop, ok := t.(*CopTask); ok {
|
||||
if (len(cop.rootTaskConds) == 0 && len(cop.idxMergePartPlans) == 0) && expression.CanExprsPushDown(GetPushDownCtx(p.SCtx()), p.Exprs, cop.getStoreType()) {
|
||||
copTask := attachPlan2Task(p, cop)
|
||||
return copTask
|
||||
@ -1916,7 +1700,7 @@ func computePartialCursorOffset(name string) int {
|
||||
// Attach2Task implements PhysicalPlan interface.
|
||||
func (p *PhysicalStreamAgg) Attach2Task(tasks ...Task) Task {
|
||||
t := tasks[0].Copy()
|
||||
if cop, ok := t.(*copTask); ok {
|
||||
if cop, ok := t.(*CopTask); ok {
|
||||
// We should not push agg down across
|
||||
// 1. double read, since the data of second read is ordered by handle instead of index. The `extraHandleCol` is added
|
||||
// if the double read needs to keep order. So we just use it to decided
|
||||
@ -2420,7 +2204,7 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...Task) Task {
|
||||
// Attach2Task implements the PhysicalPlan interface.
|
||||
func (p *PhysicalHashAgg) Attach2Task(tasks ...Task) Task {
|
||||
t := tasks[0].Copy()
|
||||
if cop, ok := t.(*copTask); ok {
|
||||
if cop, ok := t.(*CopTask); ok {
|
||||
if len(cop.rootTaskConds) == 0 && len(cop.idxMergePartPlans) == 0 {
|
||||
copTaskType := cop.getStoreType()
|
||||
partialAgg, finalAgg := p.newPartialAggregate(copTaskType, false)
|
||||
|
||||
@ -29,6 +29,7 @@ import (
|
||||
var (
|
||||
_ Task = &RootTask{}
|
||||
_ Task = &MppTask{}
|
||||
_ Task = &CopTask{}
|
||||
)
|
||||
|
||||
// Task is a new version of `PhysicalPlanInfo`. It stores cost information for a task.
|
||||
@ -48,6 +49,8 @@ type Task interface {
|
||||
MemoryUsage() int64
|
||||
}
|
||||
|
||||
// ************************************* RootTask Start ******************************************
|
||||
|
||||
// RootTask is the final sink node of a plan graph. It should be a single goroutine on tidb.
|
||||
type RootTask struct {
|
||||
p PhysicalPlan
|
||||
@ -114,6 +117,10 @@ func (t *RootTask) MemoryUsage() (sum int64) {
|
||||
return sum
|
||||
}
|
||||
|
||||
// ************************************* RootTask End ******************************************
|
||||
|
||||
// ************************************* MPPTask Start ******************************************
|
||||
|
||||
// MppTask can not :
|
||||
// 1. keep order
|
||||
// 2. support double read
|
||||
@ -223,3 +230,221 @@ func (t *MppTask) ConvertToRootTaskImpl(ctx PlanContext) *RootTask {
|
||||
}
|
||||
return rt
|
||||
}
|
||||
|
||||
// ************************************* MPPTask End ******************************************
|
||||
|
||||
// ************************************* CopTask Start ******************************************
|
||||
|
||||
// CopTask is a task that runs in a distributed kv store.
|
||||
// TODO: In future, we should split copTask to indexTask and tableTask.
|
||||
type CopTask struct {
|
||||
indexPlan PhysicalPlan
|
||||
tablePlan PhysicalPlan
|
||||
// indexPlanFinished means we have finished index plan.
|
||||
indexPlanFinished bool
|
||||
// keepOrder indicates if the plan scans data by order.
|
||||
keepOrder bool
|
||||
// needExtraProj means an extra prune is needed because
|
||||
// in double read / index merge cases, they may output one more column for handle(row id).
|
||||
needExtraProj bool
|
||||
// originSchema is the target schema to be projected to when needExtraProj is true.
|
||||
originSchema *expression.Schema
|
||||
|
||||
extraHandleCol *expression.Column
|
||||
commonHandleCols []*expression.Column
|
||||
// tblColHists stores the original stats of DataSource, it is used to get
|
||||
// average row width when computing network cost.
|
||||
tblColHists *statistics.HistColl
|
||||
// tblCols stores the original columns of DataSource before being pruned, it
|
||||
// is used to compute average row width when computing scan cost.
|
||||
tblCols []*expression.Column
|
||||
|
||||
idxMergePartPlans []PhysicalPlan
|
||||
idxMergeIsIntersection bool
|
||||
idxMergeAccessMVIndex bool
|
||||
|
||||
// rootTaskConds stores select conditions containing virtual columns.
|
||||
// These conditions can't push to TiKV, so we have to add a selection for rootTask
|
||||
rootTaskConds []expression.Expression
|
||||
|
||||
// For table partition.
|
||||
physPlanPartInfo PhysPlanPartInfo
|
||||
|
||||
// expectCnt is the expected row count of upper task, 0 for unlimited.
|
||||
// It's used for deciding whether using paging distsql.
|
||||
expectCnt uint64
|
||||
}
|
||||
|
||||
// Invalid implements Task interface.
|
||||
func (t *CopTask) Invalid() bool {
|
||||
return t.tablePlan == nil && t.indexPlan == nil && len(t.idxMergePartPlans) == 0
|
||||
}
|
||||
|
||||
// Count implements Task interface.
|
||||
func (t *CopTask) Count() float64 {
|
||||
if t.indexPlanFinished {
|
||||
return t.tablePlan.StatsInfo().RowCount
|
||||
}
|
||||
return t.indexPlan.StatsInfo().RowCount
|
||||
}
|
||||
|
||||
// Copy implements Task interface.
|
||||
func (t *CopTask) Copy() Task {
|
||||
nt := *t
|
||||
return &nt
|
||||
}
|
||||
|
||||
// Plan implements Task interface.
|
||||
// copTask plan should be careful with indexMergeReader, whose real plan is stored in
|
||||
// idxMergePartPlans, when its indexPlanFinished is marked with false.
|
||||
func (t *CopTask) Plan() PhysicalPlan {
|
||||
if t.indexPlanFinished {
|
||||
return t.tablePlan
|
||||
}
|
||||
return t.indexPlan
|
||||
}
|
||||
|
||||
// MemoryUsage return the memory usage of copTask
|
||||
func (t *CopTask) MemoryUsage() (sum int64) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
sum = size.SizeOfInterface*(2+int64(cap(t.idxMergePartPlans)+cap(t.rootTaskConds))) + size.SizeOfBool*3 + size.SizeOfUint64 +
|
||||
size.SizeOfPointer*(3+int64(cap(t.commonHandleCols)+cap(t.tblCols))) + size.SizeOfSlice*4 + t.physPlanPartInfo.MemoryUsage()
|
||||
if t.indexPlan != nil {
|
||||
sum += t.indexPlan.MemoryUsage()
|
||||
}
|
||||
if t.tablePlan != nil {
|
||||
sum += t.tablePlan.MemoryUsage()
|
||||
}
|
||||
if t.originSchema != nil {
|
||||
sum += t.originSchema.MemoryUsage()
|
||||
}
|
||||
if t.extraHandleCol != nil {
|
||||
sum += t.extraHandleCol.MemoryUsage()
|
||||
}
|
||||
|
||||
for _, col := range t.commonHandleCols {
|
||||
sum += col.MemoryUsage()
|
||||
}
|
||||
for _, col := range t.tblCols {
|
||||
sum += col.MemoryUsage()
|
||||
}
|
||||
for _, p := range t.idxMergePartPlans {
|
||||
sum += p.MemoryUsage()
|
||||
}
|
||||
for _, expr := range t.rootTaskConds {
|
||||
sum += expr.MemoryUsage()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ConvertToRootTask implements Task interface.
|
||||
func (t *CopTask) ConvertToRootTask(ctx PlanContext) *RootTask {
|
||||
// copy one to avoid changing itself.
|
||||
return t.Copy().(*CopTask).convertToRootTaskImpl(ctx)
|
||||
}
|
||||
|
||||
func (t *CopTask) convertToRootTaskImpl(ctx PlanContext) *RootTask {
|
||||
// copTasks are run in parallel, to make the estimated cost closer to execution time, we amortize
|
||||
// the cost to cop iterator workers. According to `CopClient::Send`, the concurrency
|
||||
// is Min(DistSQLScanConcurrency, numRegionsInvolvedInScan), since we cannot infer
|
||||
// the number of regions involved, we simply use DistSQLScanConcurrency.
|
||||
t.finishIndexPlan()
|
||||
// Network cost of transferring rows of table scan to TiDB.
|
||||
if t.tablePlan != nil {
|
||||
tp := t.tablePlan
|
||||
for len(tp.Children()) > 0 {
|
||||
if len(tp.Children()) == 1 {
|
||||
tp = tp.Children()[0]
|
||||
} else {
|
||||
join := tp.(*PhysicalHashJoin)
|
||||
tp = join.children[1-join.InnerChildIdx]
|
||||
}
|
||||
}
|
||||
ts := tp.(*PhysicalTableScan)
|
||||
prevColumnLen := len(ts.Columns)
|
||||
prevSchema := ts.schema.Clone()
|
||||
ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns)
|
||||
if !t.needExtraProj && len(ts.Columns) > prevColumnLen {
|
||||
// Add a projection to make sure not to output extract columns.
|
||||
t.needExtraProj = true
|
||||
t.originSchema = prevSchema
|
||||
}
|
||||
}
|
||||
newTask := &RootTask{}
|
||||
if t.idxMergePartPlans != nil {
|
||||
p := PhysicalIndexMergeReader{
|
||||
partialPlans: t.idxMergePartPlans,
|
||||
tablePlan: t.tablePlan,
|
||||
IsIntersectionType: t.idxMergeIsIntersection,
|
||||
AccessMVIndex: t.idxMergeAccessMVIndex,
|
||||
KeepOrder: t.keepOrder,
|
||||
}.Init(ctx, t.idxMergePartPlans[0].QueryBlockOffset())
|
||||
p.PlanPartInfo = t.physPlanPartInfo
|
||||
setTableScanToTableRowIDScan(p.tablePlan)
|
||||
newTask.SetPlan(p)
|
||||
t.handleRootTaskConds(ctx, newTask)
|
||||
if t.needExtraProj {
|
||||
schema := t.originSchema
|
||||
proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.StatsInfo(), t.idxMergePartPlans[0].QueryBlockOffset(), nil)
|
||||
proj.SetSchema(schema)
|
||||
proj.SetChildren(p)
|
||||
newTask.SetPlan(proj)
|
||||
}
|
||||
return newTask
|
||||
}
|
||||
if t.indexPlan != nil && t.tablePlan != nil {
|
||||
newTask = buildIndexLookUpTask(ctx, t)
|
||||
} else if t.indexPlan != nil {
|
||||
p := PhysicalIndexReader{indexPlan: t.indexPlan}.Init(ctx, t.indexPlan.QueryBlockOffset())
|
||||
p.PlanPartInfo = t.physPlanPartInfo
|
||||
p.SetStats(t.indexPlan.StatsInfo())
|
||||
newTask.SetPlan(p)
|
||||
} else {
|
||||
tp := t.tablePlan
|
||||
for len(tp.Children()) > 0 {
|
||||
if len(tp.Children()) == 1 {
|
||||
tp = tp.Children()[0]
|
||||
} else {
|
||||
join := tp.(*PhysicalHashJoin)
|
||||
tp = join.children[1-join.InnerChildIdx]
|
||||
}
|
||||
}
|
||||
ts := tp.(*PhysicalTableScan)
|
||||
p := PhysicalTableReader{
|
||||
tablePlan: t.tablePlan,
|
||||
StoreType: ts.StoreType,
|
||||
IsCommonHandle: ts.Table.IsCommonHandle,
|
||||
}.Init(ctx, t.tablePlan.QueryBlockOffset())
|
||||
p.PlanPartInfo = t.physPlanPartInfo
|
||||
p.SetStats(t.tablePlan.StatsInfo())
|
||||
|
||||
// If agg was pushed down in Attach2Task(), the partial agg was placed on the top of tablePlan, the final agg was
|
||||
// placed above the PhysicalTableReader, and the schema should have been set correctly for them, the schema of
|
||||
// partial agg contains the columns needed by the final agg.
|
||||
// If we add the projection here, the projection will be between the final agg and the partial agg, then the
|
||||
// schema will be broken, the final agg will fail to find needed columns in ResolveIndices().
|
||||
// Besides, the agg would only be pushed down if it doesn't contain virtual columns, so virtual column should not be affected.
|
||||
aggPushedDown := false
|
||||
switch p.tablePlan.(type) {
|
||||
case *PhysicalHashAgg, *PhysicalStreamAgg:
|
||||
aggPushedDown = true
|
||||
}
|
||||
|
||||
if t.needExtraProj && !aggPushedDown {
|
||||
proj := PhysicalProjection{Exprs: expression.Column2Exprs(t.originSchema.Columns)}.Init(ts.SCtx(), ts.StatsInfo(), ts.QueryBlockOffset(), nil)
|
||||
proj.SetSchema(t.originSchema)
|
||||
proj.SetChildren(p)
|
||||
newTask.SetPlan(proj)
|
||||
} else {
|
||||
newTask.SetPlan(p)
|
||||
}
|
||||
}
|
||||
|
||||
t.handleRootTaskConds(ctx, newTask)
|
||||
return newTask
|
||||
}
|
||||
|
||||
// ************************************* CopTask End ******************************************
|
||||
|
||||
Reference in New Issue
Block a user