diff --git a/planner/core/casetest/physicalplantest/physical_plan_test.go b/planner/core/casetest/physicalplantest/physical_plan_test.go index f6522f1a83..86f91244c5 100644 --- a/planner/core/casetest/physicalplantest/physical_plan_test.go +++ b/planner/core/casetest/physicalplantest/physical_plan_test.go @@ -2247,3 +2247,35 @@ func TestIndexMergeOrderPushDown(t *testing.T) { tk.MustQuery("show warnings").Check(testkit.Rows(output[i].Warning...)) } } + +func TestIndexMergeSinkLimit(t *testing.T) { + var ( + input []string + output []struct { + SQL string + Plan []string + Warning []string + } + ) + planSuiteData := GetPlanSuiteData() + planSuiteData.LoadTestCases(t, &input, &output) + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("set tidb_cost_model_version=1") + tk.MustExec(" CREATE TABLE `t2` ( `a` int(11) DEFAULT NULL, `b` int(11) DEFAULT NULL, `c` int(11) DEFAULT NULL, KEY `a` (`a`), KEY `b` (`b`)) ") + tk.MustExec("insert into t2 values(1,2,1),(2,1,1),(3,3,1)") + tk.MustExec("create table t(a int, j json, index kj((cast(j as signed array))))") + tk.MustExec("insert into t values(1, '[1,2,3]')") + + for i, ts := range input { + testdata.OnRecord(func() { + output[i].SQL = ts + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(ts).Rows()) + output[i].Warning = testdata.ConvertRowsToStrings(tk.MustQuery("show warnings").Rows()) + }) + tk.MustQuery(ts).Check(testkit.Rows(output[i].Plan...)) + tk.MustQuery("show warnings").Check(testkit.Rows(output[i].Warning...)) + } +} diff --git a/planner/core/casetest/physicalplantest/testdata/plan_suite_in.json b/planner/core/casetest/physicalplantest/testdata/plan_suite_in.json index 512d0b7fb2..4e907c477a 100644 --- a/planner/core/casetest/physicalplantest/testdata/plan_suite_in.json +++ b/planner/core/casetest/physicalplantest/testdata/plan_suite_in.json @@ -1044,5 +1044,22 @@ "select * from tcommon where (a = 1 and c = 2) or (b = 1) order by c limit 2", "select * from thash use index(idx_ac, idx_bc) where a = 1 or b = 1 order by c limit 2" ] + }, + { + "name": "TestIndexMergeSinkLimit", + "cases": [ + "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 or b=1 and c=1 limit 2; -- test sink limit to table side of union index merge case, because of table side selection", + "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 and c=1 limit 2; -- test sink limit to table side of intersection index merge case, because of table side selection", + "select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 and c=1 limit 2", + "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 or b=1 limit 2; -- test sink limit to index side of union index merge case, because of table side is pure table scan", + "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 limit 2; -- test sink limit to table side of intersection index merge case, because of intersection case special", + "select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 limit 2", + "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where (1 member of (j)) limit 1; -- index merge union case, sink limit into index side and embed another one inside index merge reader", + "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_contains(j, '[1, 2, 3]') limit 1; -- index merge intersection case, sink limit into table side", + "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_overlaps(j, '[1, 2, 3]') limit 1; -- index merge union case, sink limit above selection above index merge reader, because json_overlaps can't be pushed down", + "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where (1 member of (j) and a=1 ) limit 1; -- index merge union case, sink limit to table side, because selection exists on table side", + "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_contains(j, '[1, 2, 3]') and a=1 limit 1; -- index merge intersection case, sink limit to table side because selection exists on table side", + "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_overlaps(j, '[1, 2, 3]') and a=1 limit 1; -- index merge union case, sink limit above selection above index merge reader, because json_overlaps can't be pushed down" + ] } ] diff --git a/planner/core/casetest/physicalplantest/testdata/plan_suite_out.json b/planner/core/casetest/physicalplantest/testdata/plan_suite_out.json index 71eeb43d73..40bca85fc4 100644 --- a/planner/core/casetest/physicalplantest/testdata/plan_suite_out.json +++ b/planner/core/casetest/physicalplantest/testdata/plan_suite_out.json @@ -7988,5 +7988,150 @@ ] } ] + }, + { + "Name": "TestIndexMergeSinkLimit", + "Cases": [ + { + "SQL": "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 or b=1 and c=1 limit 2; -- test sink limit to table side of union index merge case, because of table side selection", + "Plan": [ + "Limit 2.00 root offset:0, count:2", + "└─IndexMerge 0.00 root type: union", + " ├─IndexRangeScan(Build) 2.00 cop[tikv] table:t2, index:a(a) range:[1,1], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 2.00 cop[tikv] table:t2, index:b(b) range:[1,1], keep order:false, stats:pseudo", + " └─Limit(Probe) 0.00 cop[tikv] offset:0, count:2", + " └─Selection 0.00 cop[tikv] or(eq(test.t2.a, 1), and(eq(test.t2.b, 1), eq(test.t2.c, 1)))", + " └─TableRowIDScan 3.99 cop[tikv] table:t2 keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 and c=1 limit 2; -- test sink limit to table side of intersection index merge case, because of table side selection", + "Plan": [ + "Limit 0.00 root offset:0, count:2", + "└─IndexMerge 0.01 root type: intersection", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t2, index:a(a) range:[1,1], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t2, index:b(b) range:[1,1], keep order:false, stats:pseudo", + " └─Limit(Probe) 0.01 cop[tikv] offset:0, count:2", + " └─Selection 0.01 cop[tikv] eq(test.t2.c, 1)", + " └─TableRowIDScan 0.01 cop[tikv] table:t2 keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 and c=1 limit 2", + "Plan": null, + "Warning": null + }, + { + "SQL": "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 or b=1 limit 2; -- test sink limit to index side of union index merge case, because of table side is pure table scan", + "Plan": [ + "IndexMerge 2.00 root type: union, limit embedded(offset:0, count:2)", + "├─Limit(Build) 1.00 cop[tikv] offset:0, count:2", + "│ └─IndexRangeScan 1.00 cop[tikv] table:t2, index:a(a) range:[1,1], keep order:false, stats:pseudo", + "├─Limit(Build) 1.00 cop[tikv] offset:0, count:2", + "│ └─IndexRangeScan 1.00 cop[tikv] table:t2, index:b(b) range:[1,1], keep order:false, stats:pseudo", + "└─TableRowIDScan(Probe) 2.00 cop[tikv] table:t2 keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "explain format = 'brief' select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 limit 2; -- test sink limit to table side of intersection index merge case, because of intersection case special", + "Plan": [ + "Limit 0.01 root offset:0, count:2", + "└─IndexMerge 0.01 root type: intersection", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t2, index:a(a) range:[1,1], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t2, index:b(b) range:[1,1], keep order:false, stats:pseudo", + " └─Limit(Probe) 0.01 cop[tikv] offset:0, count:2", + " └─TableRowIDScan 0.01 cop[tikv] table:t2 keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select /*+ use_index_merge(t2, a, b) */ * from t2 where a=1 and b=1 limit 2", + "Plan": null, + "Warning": null + }, + { + "SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where (1 member of (j)) limit 1; -- index merge union case, sink limit into index side and embed another one inside index merge reader", + "Plan": [ + "IndexMerge 0.00 root type: union, limit embedded(offset:0, count:1)", + "├─Limit(Build) 0.00 cop[tikv] offset:0, count:1", + "│ └─IndexRangeScan 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo", + "└─TableRowIDScan(Probe) 1.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_contains(j, '[1, 2, 3]') limit 1; -- index merge intersection case, sink limit into table side", + "Plan": [ + "Limit 1.00 root offset:0, count:1", + "└─IndexMerge 0.00 root type: intersection", + " ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[2,2], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[3,3], keep order:false, stats:pseudo", + " └─Limit(Probe) 0.00 cop[tikv] offset:0, count:1", + " └─TableRowIDScan 0.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_overlaps(j, '[1, 2, 3]') limit 1; -- index merge union case, sink limit above selection above index merge reader, because json_overlaps can't be pushed down", + "Plan": [ + "Limit 1.00 root offset:0, count:1", + "└─Selection 1.00 root json_overlaps(test.t.j, cast(\"[1, 2, 3]\", json BINARY))", + " └─IndexMerge 0.00 root type: union", + " ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[2,2], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 0.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[3,3], keep order:false, stats:pseudo", + " └─TableRowIDScan(Probe) 0.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": [ + "Warning 1105 Scalar function 'json_overlaps'(signature: Unspecified, return type: bigint(20)) is not supported to push down to storage layer now." + ] + }, + { + "SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where (1 member of (j) and a=1 ) limit 1; -- index merge union case, sink limit to table side, because selection exists on table side", + "Plan": [ + "Limit 1.00 root offset:0, count:1", + "└─IndexMerge 0.00 root type: union", + " ├─IndexRangeScan(Build) 1.25 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo", + " └─Limit(Probe) 0.00 cop[tikv] offset:0, count:1", + " └─Selection 0.00 cop[tikv] eq(test.t.a, 1)", + " └─TableRowIDScan 1.25 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_contains(j, '[1, 2, 3]') and a=1 limit 1; -- index merge intersection case, sink limit to table side because selection exists on table side", + "Plan": [ + "Limit 1.00 root offset:0, count:1", + "└─IndexMerge 0.00 root type: intersection", + " ├─IndexRangeScan(Build) 1.25 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 1.25 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[2,2], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 1.25 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[3,3], keep order:false, stats:pseudo", + " └─Limit(Probe) 0.00 cop[tikv] offset:0, count:1", + " └─Selection 0.00 cop[tikv] eq(test.t.a, 1)", + " └─TableRowIDScan 1.25 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "explain format = 'brief' select /*+ use_index(t, kj) */ * from t where json_overlaps(j, '[1, 2, 3]') and a=1 limit 1; -- index merge union case, sink limit above selection above index merge reader, because json_overlaps can't be pushed down", + "Plan": [ + "Limit 1.00 root offset:0, count:1", + "└─Selection 1.00 root json_overlaps(test.t.j, cast(\"[1, 2, 3]\", json BINARY))", + " └─IndexMerge 1.00 root type: union", + " ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[2,2], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[3,3], keep order:false, stats:pseudo", + " └─Selection(Probe) 1.00 cop[tikv] eq(test.t.a, 1)", + " └─TableRowIDScan 1.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": [ + "Warning 1105 Scalar function 'json_overlaps'(signature: Unspecified, return type: bigint(20)) is not supported to push down to storage layer now." + ] + } + ] } ] diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 3aa86f6071..61e737e7bf 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -1284,14 +1284,19 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter return } +// convertToIndexMergeScan builds the index merge scan for intersection or union cases. func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) { if prop.IsFlashProp() || prop.TaskTp == property.CopSingleReadTaskType { return invalidTask, nil } - if prop.TaskTp == property.CopMultiReadTaskType && candidate.path.IndexMergeIsIntersection { + // lift the limitation of that double read can not build index merge **COP** task with intersection. + // that means we can output a cop task here without encapsulating it as root task, for the convenience of attaching limit to its table side. + + if !prop.IsSortItemEmpty() && !candidate.isMatchProp { return invalidTask, nil } - if !prop.IsSortItemEmpty() && !candidate.isMatchProp { + // while for now, we still can not push the sort prop to the intersection index plan side, temporarily banned here. + if !prop.IsSortItemEmpty() && candidate.path.IndexMergeIsIntersection { return invalidTask, nil } failpoint.Inject("forceIndexMergeKeepOrder", func(_ failpoint.Value) { diff --git a/planner/core/task.go b/planner/core/task.go index 625c4b255c..15b7b3a73e 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -818,6 +818,22 @@ func (t *rootTask) MemoryUsage() (sum int64) { return sum } +// attach2Task attach limit to different cases. +// For Normal Index Lookup +// 1: attach the limit to table side or index side of normal index lookup cop task. (normal case, old code, no more +// explanation here) +// +// For Index Merge: +// 2: attach the limit to **table** side for index merge intersection case, cause intersection will invalidate the +// fetched limit+offset rows from each partial index plan, you can not decide how many you want in advance for partial +// index path, actually. After we sink limit to table side, we still need an upper root limit to control the real limit +// count admission. +// +// 3: attach the limit to **index** side for index merge union case, because each index plan will output the fetched +// limit+offset (* N path) rows, you still need an embedded pushedLimit inside index merge reader to cut it down. +// +// 4: attach the limit to the TOP of root index merge operator if there is some root condition exists for index merge +// intersection/union case. func (p *PhysicalLimit) attach2Task(tasks ...task) task { t := tasks[0].copy() newPartitionBy := make([]property.SortItem, 0, len(p.GetPartitionBy())) @@ -827,6 +843,18 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task { sunk := false if cop, ok := t.(*copTask); ok { + suspendLimitAboveTablePlan := func() { + newCount := p.Offset + p.Count + childProfile := cop.tablePlan.StatsInfo() + // but "regionNum" is unknown since the copTask can be a double read, so we ignore it now. + stats := deriveLimitStats(childProfile, float64(newCount)) + pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.SCtx(), stats, p.SelectBlockOffset()) + pushedDownLimit.SetChildren(cop.tablePlan) + cop.tablePlan = pushedDownLimit + // Don't use clone() so that Limit and its children share the same schema. Otherwise, the virtual generated column may not be resolved right. + pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) + t = cop.convertToRootTask(p.SCtx()) + } if len(cop.idxMergePartPlans) == 0 { // For double read which requires order being kept, the limit cannot be pushed down to the table side, // because handles would be reordered before being sent to table scan. @@ -845,22 +873,53 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task { t = cop.convertToRootTask(p.SCtx()) sunk = p.sinkIntoIndexLookUp(t) } else if !cop.idxMergeIsIntersection { - // We only support push part of the order prop down to index merge case. - if !cop.indexPlanFinished && len(cop.rootTaskConds) == 0 { - newCount := p.Offset + p.Count - limitChildren := make([]PhysicalPlan, 0, len(cop.idxMergePartPlans)) - for _, partialScan := range cop.idxMergePartPlans { - childProfile := partialScan.StatsInfo() - stats := deriveLimitStats(childProfile, float64(newCount)) - pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.SCtx(), stats, p.SelectBlockOffset()) - pushedDownLimit.SetChildren(partialScan) - pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) - limitChildren = append(limitChildren, pushedDownLimit) + // We only support push part of the order prop down to index merge build case. + if len(cop.rootTaskConds) == 0 { + if cop.indexPlanFinished { + // when the index plan is finished, sink the limit to the index merge table side. + suspendLimitAboveTablePlan() + } else { + // cop.indexPlanFinished = false indicates the table side is a pure table-scan, sink the limit to the index merge index side. + newCount := p.Offset + p.Count + limitChildren := make([]PhysicalPlan, 0, len(cop.idxMergePartPlans)) + for _, partialScan := range cop.idxMergePartPlans { + childProfile := partialScan.StatsInfo() + stats := deriveLimitStats(childProfile, float64(newCount)) + pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.SCtx(), stats, p.SelectBlockOffset()) + pushedDownLimit.SetChildren(partialScan) + pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) + limitChildren = append(limitChildren, pushedDownLimit) + } + cop.idxMergePartPlans = limitChildren + t = cop.convertToRootTask(p.SCtx()) + sunk = p.sinkIntoIndexMerge(t) } - cop.idxMergePartPlans = limitChildren + } else { + // when there are some root conditions, just sink the limit upon the index merge reader. + t = cop.convertToRootTask(p.SCtx()) + sunk = p.sinkIntoIndexMerge(t) + } + } else if cop.idxMergeIsIntersection { + // In the index merge with intersection case, only the limit can be pushed down to the index merge table side. + // Note Difference: + // IndexMerge.PushedLimit is applied before table scan fetching, limiting the indexPartialPlan rows returned (it maybe ordered if orderBy items not empty) + // TableProbeSide sink limit is applied on the top of table plan, which will quickly shut down the both fetch-back and read-back process. + if len(cop.rootTaskConds) == 0 { + // if cop.indexPlanFinished = true + // indicates the table side is not a pure table-scan, so we could only append the limit upon the table plan. + // suspendLimitAboveTablePlan() + // else + // todo: cop.indexPlanFinished = false indicates the table side is a pure table-scan, so we sink the limit as index merge embedded push-down Limit theoretically. + // todo: while currently in the execution layer, intersection concurrency framework is not quickly suitable for us to do the limit cut down or the order by operation. + // so currently, we just put the limit at the top of table plan rather than a embedded pushedLimit inside indexMergeReader. + // t = cop.convertToRootTask(p.SCtx()) + // sunk = p.sinkIntoIndexMerge(t) + suspendLimitAboveTablePlan() + } else { + // otherwise, suspend the limit out of index merge reader. + t = cop.convertToRootTask(p.SCtx()) + sunk = p.sinkIntoIndexMerge(t) } - t = cop.convertToRootTask(p.SCtx()) - sunk = p.sinkIntoIndexMerge(t) } else { // Whatever the remained case is, we directly convert to it to root task. t = cop.convertToRootTask(p.SCtx()) @@ -1024,9 +1083,7 @@ func (p *PhysicalTopN) getPushedDownTopN(childPlan PhysicalPlan) *PhysicalTopN { } // canPushToIndexPlan checks if this TopN can be pushed to the index side of copTask. -// It can be pushed to the index side when all columns used by ByItems are available from the index side and -// -// there's no prefix index column. +// It can be pushed to the index side when all columns used by ByItems are available from the index side and there's no prefix index column. func (*PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []*expression.Column) bool { // If we call canPushToIndexPlan and there's no index plan, we should go into the index merge case. // Index merge case is specially handled for now. So we directly return false here.