executor,store/copr: fix index merge, distsql request's key ranges should be sorted (#36633)
close pingcap/tidb#36632
This commit is contained in:
@ -53,6 +53,7 @@ import (
|
||||
"github.com/pingcap/tidb/util/ranger"
|
||||
"github.com/pingcap/tipb/go-tipb"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -304,6 +305,9 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
|
||||
|
||||
e.memTracker = memory.NewTracker(e.id, -1)
|
||||
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
|
||||
slices.SortFunc(kvRanges, func(i, j kv.KeyRange) bool {
|
||||
return bytes.Compare(i.StartKey, j.StartKey) < 0
|
||||
})
|
||||
var builder distsql.RequestBuilder
|
||||
builder.SetKeyRanges(kvRanges).
|
||||
SetDAGRequest(e.dagPB).
|
||||
@ -612,6 +616,10 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
|
||||
}
|
||||
|
||||
// init kvReq, result and worker for this partition
|
||||
// The key ranges should be ordered.
|
||||
slices.SortFunc(kvRange, func(i, j kv.KeyRange) bool {
|
||||
return bytes.Compare(i.StartKey, j.StartKey) < 0
|
||||
})
|
||||
kvReq, err := builder.SetKeyRanges(kvRange).Build()
|
||||
if err != nil {
|
||||
worker.syncErr(err)
|
||||
|
||||
@ -26,6 +26,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/executor"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/metrics"
|
||||
@ -581,3 +582,49 @@ func TestAdaptiveClosestRead(t *testing.T) {
|
||||
// 2 IndexScan with cost 19/56, 2 TableReader with cost 32.5/65.
|
||||
checkMetrics("select/* +USE_INDEX_MERGE(t) */ id from t use index(`idx_v_s1`) use index(idx_s2) where (s1 < 3 and v > 0) or s2 = 3;", 3, 1)
|
||||
}
|
||||
|
||||
func TestCoprocessorPagingReqKeyRangeSorted(t *testing.T) {
|
||||
store, clean := testkit.CreateMockStore(t)
|
||||
defer clean()
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
|
||||
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging", "return"))
|
||||
defer func() {
|
||||
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging"))
|
||||
}()
|
||||
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("CREATE TABLE `UK_COLLATION19523` (" +
|
||||
"`COL1` binary(1) DEFAULT NULL," +
|
||||
"`COL2` varchar(20) COLLATE utf8_general_ci DEFAULT NULL," +
|
||||
"`COL4` datetime DEFAULT NULL," +
|
||||
"`COL3` bigint(20) DEFAULT NULL," +
|
||||
"`COL5` float DEFAULT NULL," +
|
||||
"UNIQUE KEY `U_COL1` (`COL1`)" +
|
||||
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci")
|
||||
|
||||
tk.MustExec("prepare stmt from 'SELECT/*+ HASH_JOIN(t1, t2) */ * FROM UK_COLLATION19523 t1 JOIN UK_COLLATION19523 t2 ON t1.col1 > t2.col1 WHERE t1.col1 IN (?, ?, ?) AND t2.col1 < ?;';")
|
||||
tk.MustExec("set @a=0x4F, @b=0xF8, @c=NULL, @d=0xBF;")
|
||||
tk.MustExec("execute stmt using @a,@b,@c,@d;")
|
||||
tk.MustExec("set @a=0x00, @b=0xD2, @c=9179987834981541375, @d=0xF8;")
|
||||
tk.MustExec("execute stmt using @a,@b,@c,@d;")
|
||||
|
||||
tk.MustExec("CREATE TABLE `IDT_COLLATION26873` (" +
|
||||
"`COL1` varbinary(20) DEFAULT NULL," +
|
||||
"`COL2` varchar(20) COLLATE utf8_general_ci DEFAULT NULL," +
|
||||
"`COL4` datetime DEFAULT NULL," +
|
||||
"`COL3` bigint(20) DEFAULT NULL," +
|
||||
"`COL5` float DEFAULT NULL," +
|
||||
"KEY `U_COL1` (`COL1`))")
|
||||
tk.MustExec("prepare stmt from 'SELECT/*+ INL_JOIN(t1, t2) */ t2.* FROM IDT_COLLATION26873 t1 LEFT JOIN IDT_COLLATION26873 t2 ON t1.col1 = t2.col1 WHERE t1.col1 < ? AND t1.col1 IN (?, ?, ?);';")
|
||||
tk.MustExec("set @a=NULL, @b=NULL, @c=NULL, @d=NULL;")
|
||||
tk.MustExec("execute stmt using @a,@b,@c,@d;")
|
||||
tk.MustExec("set @a=0xE3253A6AC72A3A168EAF0E34A4779A947872CCCD, @b=0xD67BB26504EE152C2C356D7F6CAD897F03462963, @c=NULL, @d=0xDE735FEB375A4CF33479A39CA925470BFB229DB4;")
|
||||
tk.MustExec("execute stmt using @a,@b,@c,@d;")
|
||||
tk.MustExec("set @a=2606738829406840179, @b=1468233589368287363, @c=5174008984061521089, @d=7727946571160309462;")
|
||||
tk.MustExec("execute stmt using @a,@b,@c,@d;")
|
||||
tk.MustExec("set @a=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @b=NULL, @c=6864108002939154648, @d=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE;")
|
||||
tk.MustExec("execute stmt using @a,@b,@c,@d;")
|
||||
tk.MustExec("set @a=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @b=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @c=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @d=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE;")
|
||||
tk.MustExec("execute stmt using @a,@b,@c,@d;")
|
||||
}
|
||||
|
||||
@ -43,6 +43,7 @@ import (
|
||||
"github.com/pingcap/tidb/util/ranger"
|
||||
"github.com/pingcap/tipb/go-tipb"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -324,6 +325,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
|
||||
}
|
||||
|
||||
// init kvReq and worker for this partition
|
||||
// The key ranges should be ordered.
|
||||
slices.SortFunc(keyRange, func(i, j kv.KeyRange) bool {
|
||||
return bytes.Compare(i.StartKey, j.StartKey) < 0
|
||||
})
|
||||
kvReq, err := builder.SetKeyRanges(keyRange).Build()
|
||||
if err != nil {
|
||||
worker.syncErr(e.resultCh, err)
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
package copr
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
@ -51,6 +52,7 @@ import (
|
||||
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
|
||||
"github.com/tikv/client-go/v2/util"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
var coprCacheCounterEvict = tidbmetrics.DistSQLCoprCacheCounter.WithLabelValues("evict")
|
||||
@ -92,6 +94,18 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
|
||||
// coprocessor request but type is not DAG
|
||||
req.Paging = false
|
||||
}
|
||||
|
||||
failpoint.Inject("checkKeyRangeSortedForPaging", func(_ failpoint.Value) {
|
||||
if req.Paging {
|
||||
isSorted := slices.IsSortedFunc(req.KeyRanges, func(i, j kv.KeyRange) bool {
|
||||
return bytes.Compare(i.StartKey, j.StartKey) < 0
|
||||
})
|
||||
if !isSorted {
|
||||
logutil.BgLogger().Fatal("distsql request key range not sorted!")
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
|
||||
ctx = context.WithValue(ctx, util.RequestSourceKey, req.RequestSource)
|
||||
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
|
||||
|
||||
Reference in New Issue
Block a user