planner: sink limit down to index merge table side under union and intersection case (#46619)
close pingcap/tidb#46313
This commit is contained in:
@ -2247,3 +2247,35 @@ func TestIndexMergeOrderPushDown(t *testing.T) {
|
||||
tk.MustQuery("show warnings").Check(testkit.Rows(output[i].Warning...))
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexMergeSinkLimit(t *testing.T) {
|
||||
var (
|
||||
input []string
|
||||
output []struct {
|
||||
SQL string
|
||||
Plan []string
|
||||
Warning []string
|
||||
}
|
||||
)
|
||||
planSuiteData := GetPlanSuiteData()
|
||||
planSuiteData.LoadTestCases(t, &input, &output)
|
||||
store := testkit.CreateMockStore(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("set tidb_cost_model_version=1")
|
||||
tk.MustExec(" CREATE TABLE `t2` ( `a` int(11) DEFAULT NULL, `b` int(11) DEFAULT NULL, `c` int(11) DEFAULT NULL, KEY `a` (`a`), KEY `b` (`b`)) ")
|
||||
tk.MustExec("insert into t2 values(1,2,1),(2,1,1),(3,3,1)")
|
||||
tk.MustExec("create table t(a int, j json, index kj((cast(j as signed array))))")
|
||||
tk.MustExec("insert into t values(1, '[1,2,3]')")
|
||||
|
||||
for i, ts := range input {
|
||||
testdata.OnRecord(func() {
|
||||
output[i].SQL = ts
|
||||
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(ts).Rows())
|
||||
output[i].Warning = testdata.ConvertRowsToStrings(tk.MustQuery("show warnings").Rows())
|
||||
})
|
||||
tk.MustQuery(ts).Check(testkit.Rows(output[i].Plan...))
|
||||
tk.MustQuery("show warnings").Check(testkit.Rows(output[i].Warning...))
|
||||
}
|
||||
}
|
||||
|
||||
@ -1044,5 +1044,22 @@
|
||||
"select * from tcommon where (a = 1 and c = 2) or (b = 1) order by c limit 2",
|
||||
"select * from thash use index(idx_ac, idx_bc) where a = 1 or b = 1 order by c limit 2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "TestIndexMergeSinkLimit",
|
||||
"cases": [
|
||||
"explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 or b=1 and c=1 limit 2; -- test sink limit to table side of union index merge case, because of table side selection",
|
||||
"explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 and c=1 limit 2; -- test sink limit to table side of intersection index merge case, because of table side selection",
|
||||
"select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 and c=1 limit 2",
|
||||
"explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 or b=1 limit 2; -- test sink limit to index side of union index merge case, because of table side is pure table scan",
|
||||
"explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 limit 2; -- test sink limit to table side of intersection index merge case, because of intersection case special",
|
||||
"select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 limit 2",
|
||||
"explain format = 'brief' select /*+ use_index(t, kj) */ * from t where (1 member of (j)) limit 1; -- index merge union case, sink limit into index side and embed another one inside index merge reader",
|
||||
"explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_contains(j, '[1, 2, 3]') limit 1; -- index merge intersection case, sink limit into table side",
|
||||
"explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_overlaps(j, '[1, 2, 3]') limit 1; -- index merge union case, sink limit above selection above index merge reader, because json_overlaps can't be pushed down",
|
||||
"explain format = 'brief' select /*+ use_index(t, kj) */ * from t where (1 member of (j) and a=1 ) limit 1; -- index merge union case, sink limit to table side, because selection exists on table side",
|
||||
"explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_contains(j, '[1, 2, 3]') and a=1 limit 1; -- index merge intersection case, sink limit to table side because selection exists on table side",
|
||||
"explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_overlaps(j, '[1, 2, 3]') and a=1 limit 1; -- index merge union case, sink limit above selection above index merge reader, because json_overlaps can't be pushed down"
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
@ -7988,5 +7988,150 @@
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name": "TestIndexMergeSinkLimit",
|
||||
"Cases": [
|
||||
{
|
||||
"SQL": "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 or b=1 and c=1 limit 2; -- test sink limit to table side of union index merge case, because of table side selection",
|
||||
"Plan": [
|
||||
"Limit 2.00 root offset:0, count:2",
|
||||
"└─IndexMerge 0.00 root type: union",
|
||||
" ├─IndexRangeScan(Build) 2.00 cop[tikv] table:t2, index:a(a) range:[1,1], keep order:false, stats:pseudo",
|
||||
" ├─IndexRangeScan(Build) 2.00 cop[tikv] table:t2, index:b(b) range:[1,1], keep order:false, stats:pseudo",
|
||||
" └─Limit(Probe) 0.00 cop[tikv] offset:0, count:2",
|
||||
" └─Selection 0.00 cop[tikv] or(eq(test.t2.a, 1), and(eq(test.t2.b, 1), eq(test.t2.c, 1)))",
|
||||
" └─TableRowIDScan 3.99 cop[tikv] table:t2 keep order:false, stats:pseudo"
|
||||
],
|
||||
"Warning": null
|
||||
},
|
||||
{
|
||||
"SQL": "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 and c=1 limit 2; -- test sink limit to table side of intersection index merge case, because of table side selection",
|
||||
"Plan": [
|
||||
"Limit 0.00 root offset:0, count:2",
|
||||
"└─IndexMerge 0.01 root type: intersection",
|
||||
" ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t2, index:a(a) range:[1,1], keep order:false, stats:pseudo",
|
||||
" ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t2, index:b(b) range:[1,1], keep order:false, stats:pseudo",
|
||||
" └─Limit(Probe) 0.01 cop[tikv] offset:0, count:2",
|
||||
" └─Selection 0.01 cop[tikv] eq(test.t2.c, 1)",
|
||||
" └─TableRowIDScan 0.01 cop[tikv] table:t2 keep order:false, stats:pseudo"
|
||||
],
|
||||
"Warning": null
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 and c=1 limit 2",
|
||||
"Plan": null,
|
||||
"Warning": null
|
||||
},
|
||||
{
|
||||
"SQL": "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 or b=1 limit 2; -- test sink limit to index side of union index merge case, because of table side is pure table scan",
|
||||
"Plan": [
|
||||
"IndexMerge 2.00 root type: union, limit embedded(offset:0, count:2)",
|
||||
"├─Limit(Build) 1.00 cop[tikv] offset:0, count:2",
|
||||
"│ └─IndexRangeScan 1.00 cop[tikv] table:t2, index:a(a) range:[1,1], keep order:false, stats:pseudo",
|
||||
"├─Limit(Build) 1.00 cop[tikv] offset:0, count:2",
|
||||
"│ └─IndexRangeScan 1.00 cop[tikv] table:t2, index:b(b) range:[1,1], keep order:false, stats:pseudo",
|
||||
"└─TableRowIDScan(Probe) 2.00 cop[tikv] table:t2 keep order:false, stats:pseudo"
|
||||
],
|
||||
"Warning": null
|
||||
},
|
||||
{
|
||||
"SQL": "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 limit 2; -- test sink limit to table side of intersection index merge case, because of intersection case special",
|
||||
"Plan": [
|
||||
"Limit 0.01 root offset:0, count:2",
|
||||
"└─IndexMerge 0.01 root type: intersection",
|
||||
" ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t2, index:a(a) range:[1,1], keep order:false, stats:pseudo",
|
||||
" ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t2, index:b(b) range:[1,1], keep order:false, stats:pseudo",
|
||||
" └─Limit(Probe) 0.01 cop[tikv] offset:0, count:2",
|
||||
" └─TableRowIDScan 0.01 cop[tikv] table:t2 keep order:false, stats:pseudo"
|
||||
],
|
||||
"Warning": null
|
||||
},
|
||||
{
|
||||
"SQL": "select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 limit 2",
|
||||
"Plan": null,
|
||||
"Warning": null
|
||||
},
|
||||
{
|
||||
"SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where (1 member of (j)) limit 1; -- index merge union case, sink limit into index side and embed another one inside index merge reader",
|
||||
"Plan": [
|
||||
"IndexMerge 0.00 root type: union, limit embedded(offset:0, count:1)",
|
||||
"├─Limit(Build) 0.00 cop[tikv] offset:0, count:1",
|
||||
"│ └─IndexRangeScan 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo",
|
||||
"└─TableRowIDScan(Probe) 1.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Warning": null
|
||||
},
|
||||
{
|
||||
"SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_contains(j, '[1, 2, 3]') limit 1; -- index merge intersection case, sink limit into table side",
|
||||
"Plan": [
|
||||
"Limit 1.00 root offset:0, count:1",
|
||||
"└─IndexMerge 0.00 root type: intersection",
|
||||
" ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo",
|
||||
" ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[2,2], keep order:false, stats:pseudo",
|
||||
" ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[3,3], keep order:false, stats:pseudo",
|
||||
" └─Limit(Probe) 0.00 cop[tikv] offset:0, count:1",
|
||||
" └─TableRowIDScan 0.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Warning": null
|
||||
},
|
||||
{
|
||||
"SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_overlaps(j, '[1, 2, 3]') limit 1; -- index merge union case, sink limit above selection above index merge reader, because json_overlaps can't be pushed down",
|
||||
"Plan": [
|
||||
"Limit 1.00 root offset:0, count:1",
|
||||
"└─Selection 1.00 root json_overlaps(test.t.j, cast(\"[1, 2, 3]\", json BINARY))",
|
||||
" └─IndexMerge 0.00 root type: union",
|
||||
" ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo",
|
||||
" ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[2,2], keep order:false, stats:pseudo",
|
||||
" ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[3,3], keep order:false, stats:pseudo",
|
||||
" └─TableRowIDScan(Probe) 0.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Warning": [
|
||||
"Warning 1105 Scalar function 'json_overlaps'(signature: Unspecified, return type: bigint(20)) is not supported to push down to storage layer now."
|
||||
]
|
||||
},
|
||||
{
|
||||
"SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where (1 member of (j) and a=1 ) limit 1; -- index merge union case, sink limit to table side, because selection exists on table side",
|
||||
"Plan": [
|
||||
"Limit 1.00 root offset:0, count:1",
|
||||
"└─IndexMerge 0.00 root type: union",
|
||||
" ├─IndexRangeScan(Build) 1.25 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo",
|
||||
" └─Limit(Probe) 0.00 cop[tikv] offset:0, count:1",
|
||||
" └─Selection 0.00 cop[tikv] eq(test.t.a, 1)",
|
||||
" └─TableRowIDScan 1.25 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Warning": null
|
||||
},
|
||||
{
|
||||
"SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_contains(j, '[1, 2, 3]') and a=1 limit 1; -- index merge intersection case, sink limit to table side because selection exists on table side",
|
||||
"Plan": [
|
||||
"Limit 1.00 root offset:0, count:1",
|
||||
"└─IndexMerge 0.00 root type: intersection",
|
||||
" ├─IndexRangeScan(Build) 1.25 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo",
|
||||
" ├─IndexRangeScan(Build) 1.25 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[2,2], keep order:false, stats:pseudo",
|
||||
" ├─IndexRangeScan(Build) 1.25 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[3,3], keep order:false, stats:pseudo",
|
||||
" └─Limit(Probe) 0.00 cop[tikv] offset:0, count:1",
|
||||
" └─Selection 0.00 cop[tikv] eq(test.t.a, 1)",
|
||||
" └─TableRowIDScan 1.25 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Warning": null
|
||||
},
|
||||
{
|
||||
"SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_overlaps(j, '[1, 2, 3]') and a=1 limit 1; -- index merge union case, sink limit above selection above index merge reader, because json_overlaps can't be pushed down",
|
||||
"Plan": [
|
||||
"Limit 1.00 root offset:0, count:1",
|
||||
"└─Selection 1.00 root json_overlaps(test.t.j, cast(\"[1, 2, 3]\", json BINARY))",
|
||||
" └─IndexMerge 1.00 root type: union",
|
||||
" ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo",
|
||||
" ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[2,2], keep order:false, stats:pseudo",
|
||||
" ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[3,3], keep order:false, stats:pseudo",
|
||||
" └─Selection(Probe) 1.00 cop[tikv] eq(test.t.a, 1)",
|
||||
" └─TableRowIDScan 1.00 cop[tikv] table:t keep order:false, stats:pseudo"
|
||||
],
|
||||
"Warning": [
|
||||
"Warning 1105 Scalar function 'json_overlaps'(signature: Unspecified, return type: bigint(20)) is not supported to push down to storage layer now."
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
@ -1284,14 +1284,19 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
|
||||
return
|
||||
}
|
||||
|
||||
// convertToIndexMergeScan builds the index merge scan for intersection or union cases.
|
||||
func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) {
|
||||
if prop.IsFlashProp() || prop.TaskTp == property.CopSingleReadTaskType {
|
||||
return invalidTask, nil
|
||||
}
|
||||
if prop.TaskTp == property.CopMultiReadTaskType && candidate.path.IndexMergeIsIntersection {
|
||||
// lift the limitation of that double read can not build index merge **COP** task with intersection.
|
||||
// that means we can output a cop task here without encapsulating it as root task, for the convenience of attaching limit to its table side.
|
||||
|
||||
if !prop.IsSortItemEmpty() && !candidate.isMatchProp {
|
||||
return invalidTask, nil
|
||||
}
|
||||
if !prop.IsSortItemEmpty() && !candidate.isMatchProp {
|
||||
// while for now, we still can not push the sort prop to the intersection index plan side, temporarily banned here.
|
||||
if !prop.IsSortItemEmpty() && candidate.path.IndexMergeIsIntersection {
|
||||
return invalidTask, nil
|
||||
}
|
||||
failpoint.Inject("forceIndexMergeKeepOrder", func(_ failpoint.Value) {
|
||||
|
||||
@ -818,6 +818,22 @@ func (t *rootTask) MemoryUsage() (sum int64) {
|
||||
return sum
|
||||
}
|
||||
|
||||
// attach2Task attach limit to different cases.
|
||||
// For Normal Index Lookup
|
||||
// 1: attach the limit to table side or index side of normal index lookup cop task. (normal case, old code, no more
|
||||
// explanation here)
|
||||
//
|
||||
// For Index Merge:
|
||||
// 2: attach the limit to **table** side for index merge intersection case, cause intersection will invalidate the
|
||||
// fetched limit+offset rows from each partial index plan, you can not decide how many you want in advance for partial
|
||||
// index path, actually. After we sink limit to table side, we still need an upper root limit to control the real limit
|
||||
// count admission.
|
||||
//
|
||||
// 3: attach the limit to **index** side for index merge union case, because each index plan will output the fetched
|
||||
// limit+offset (* N path) rows, you still need an embedded pushedLimit inside index merge reader to cut it down.
|
||||
//
|
||||
// 4: attach the limit to the TOP of root index merge operator if there is some root condition exists for index merge
|
||||
// intersection/union case.
|
||||
func (p *PhysicalLimit) attach2Task(tasks ...task) task {
|
||||
t := tasks[0].copy()
|
||||
newPartitionBy := make([]property.SortItem, 0, len(p.GetPartitionBy()))
|
||||
@ -827,6 +843,18 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task {
|
||||
|
||||
sunk := false
|
||||
if cop, ok := t.(*copTask); ok {
|
||||
suspendLimitAboveTablePlan := func() {
|
||||
newCount := p.Offset + p.Count
|
||||
childProfile := cop.tablePlan.StatsInfo()
|
||||
// but "regionNum" is unknown since the copTask can be a double read, so we ignore it now.
|
||||
stats := deriveLimitStats(childProfile, float64(newCount))
|
||||
pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.SCtx(), stats, p.SelectBlockOffset())
|
||||
pushedDownLimit.SetChildren(cop.tablePlan)
|
||||
cop.tablePlan = pushedDownLimit
|
||||
// Don't use clone() so that Limit and its children share the same schema. Otherwise, the virtual generated column may not be resolved right.
|
||||
pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema())
|
||||
t = cop.convertToRootTask(p.SCtx())
|
||||
}
|
||||
if len(cop.idxMergePartPlans) == 0 {
|
||||
// For double read which requires order being kept, the limit cannot be pushed down to the table side,
|
||||
// because handles would be reordered before being sent to table scan.
|
||||
@ -845,22 +873,53 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task {
|
||||
t = cop.convertToRootTask(p.SCtx())
|
||||
sunk = p.sinkIntoIndexLookUp(t)
|
||||
} else if !cop.idxMergeIsIntersection {
|
||||
// We only support push part of the order prop down to index merge case.
|
||||
if !cop.indexPlanFinished && len(cop.rootTaskConds) == 0 {
|
||||
newCount := p.Offset + p.Count
|
||||
limitChildren := make([]PhysicalPlan, 0, len(cop.idxMergePartPlans))
|
||||
for _, partialScan := range cop.idxMergePartPlans {
|
||||
childProfile := partialScan.StatsInfo()
|
||||
stats := deriveLimitStats(childProfile, float64(newCount))
|
||||
pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.SCtx(), stats, p.SelectBlockOffset())
|
||||
pushedDownLimit.SetChildren(partialScan)
|
||||
pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema())
|
||||
limitChildren = append(limitChildren, pushedDownLimit)
|
||||
// We only support push part of the order prop down to index merge build case.
|
||||
if len(cop.rootTaskConds) == 0 {
|
||||
if cop.indexPlanFinished {
|
||||
// when the index plan is finished, sink the limit to the index merge table side.
|
||||
suspendLimitAboveTablePlan()
|
||||
} else {
|
||||
// cop.indexPlanFinished = false indicates the table side is a pure table-scan, sink the limit to the index merge index side.
|
||||
newCount := p.Offset + p.Count
|
||||
limitChildren := make([]PhysicalPlan, 0, len(cop.idxMergePartPlans))
|
||||
for _, partialScan := range cop.idxMergePartPlans {
|
||||
childProfile := partialScan.StatsInfo()
|
||||
stats := deriveLimitStats(childProfile, float64(newCount))
|
||||
pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.SCtx(), stats, p.SelectBlockOffset())
|
||||
pushedDownLimit.SetChildren(partialScan)
|
||||
pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema())
|
||||
limitChildren = append(limitChildren, pushedDownLimit)
|
||||
}
|
||||
cop.idxMergePartPlans = limitChildren
|
||||
t = cop.convertToRootTask(p.SCtx())
|
||||
sunk = p.sinkIntoIndexMerge(t)
|
||||
}
|
||||
cop.idxMergePartPlans = limitChildren
|
||||
} else {
|
||||
// when there are some root conditions, just sink the limit upon the index merge reader.
|
||||
t = cop.convertToRootTask(p.SCtx())
|
||||
sunk = p.sinkIntoIndexMerge(t)
|
||||
}
|
||||
} else if cop.idxMergeIsIntersection {
|
||||
// In the index merge with intersection case, only the limit can be pushed down to the index merge table side.
|
||||
// Note Difference:
|
||||
// IndexMerge.PushedLimit is applied before table scan fetching, limiting the indexPartialPlan rows returned (it maybe ordered if orderBy items not empty)
|
||||
// TableProbeSide sink limit is applied on the top of table plan, which will quickly shut down the both fetch-back and read-back process.
|
||||
if len(cop.rootTaskConds) == 0 {
|
||||
// if cop.indexPlanFinished = true
|
||||
// indicates the table side is not a pure table-scan, so we could only append the limit upon the table plan.
|
||||
// suspendLimitAboveTablePlan()
|
||||
// else
|
||||
// todo: cop.indexPlanFinished = false indicates the table side is a pure table-scan, so we sink the limit as index merge embedded push-down Limit theoretically.
|
||||
// todo: while currently in the execution layer, intersection concurrency framework is not quickly suitable for us to do the limit cut down or the order by operation.
|
||||
// so currently, we just put the limit at the top of table plan rather than a embedded pushedLimit inside indexMergeReader.
|
||||
// t = cop.convertToRootTask(p.SCtx())
|
||||
// sunk = p.sinkIntoIndexMerge(t)
|
||||
suspendLimitAboveTablePlan()
|
||||
} else {
|
||||
// otherwise, suspend the limit out of index merge reader.
|
||||
t = cop.convertToRootTask(p.SCtx())
|
||||
sunk = p.sinkIntoIndexMerge(t)
|
||||
}
|
||||
t = cop.convertToRootTask(p.SCtx())
|
||||
sunk = p.sinkIntoIndexMerge(t)
|
||||
} else {
|
||||
// Whatever the remained case is, we directly convert to it to root task.
|
||||
t = cop.convertToRootTask(p.SCtx())
|
||||
@ -1024,9 +1083,7 @@ func (p *PhysicalTopN) getPushedDownTopN(childPlan PhysicalPlan) *PhysicalTopN {
|
||||
}
|
||||
|
||||
// canPushToIndexPlan checks if this TopN can be pushed to the index side of copTask.
|
||||
// It can be pushed to the index side when all columns used by ByItems are available from the index side and
|
||||
//
|
||||
// there's no prefix index column.
|
||||
// It can be pushed to the index side when all columns used by ByItems are available from the index side and there's no prefix index column.
|
||||
func (*PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []*expression.Column) bool {
|
||||
// If we call canPushToIndexPlan and there's no index plan, we should go into the index merge case.
|
||||
// Index merge case is specially handled for now. So we directly return false here.
|
||||
|
||||
Reference in New Issue
Block a user