importinto: use one writer for each kv group for all concurrent encoder (#47185)
ref pingcap/tidb#46704
This commit is contained in:
7
br/pkg/lightning/backend/external/writer.go
vendored
7
br/pkg/lightning/backend/external/writer.go
vendored
@ -488,6 +488,9 @@ func (w *Writer) createStorageWriter(ctx context.Context) (
|
||||
|
||||
// EngineWriter implements backend.EngineWriter interface.
|
||||
type EngineWriter struct {
|
||||
// Only 1 writer is used for some kv group(data or some index), no matter
|
||||
// how many routines are encoding data, so need to sync write to it.
|
||||
sync.Mutex
|
||||
w *Writer
|
||||
}
|
||||
|
||||
@ -498,6 +501,8 @@ func NewEngineWriter(w *Writer) *EngineWriter {
|
||||
|
||||
// AppendRows implements backend.EngineWriter interface.
|
||||
func (e *EngineWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
kvs := kv.Rows2KvPairs(rows)
|
||||
if len(kvs) == 0 {
|
||||
return nil
|
||||
@ -519,5 +524,7 @@ func (e *EngineWriter) IsSynced() bool {
|
||||
|
||||
// Close implements backend.EngineWriter interface.
|
||||
func (e *EngineWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
return nil, e.w.Close(ctx)
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/docker/go-units"
|
||||
"github.com/google/uuid"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
|
||||
@ -63,6 +64,9 @@ type encodeAndSortOperator struct {
|
||||
sharedVars *SharedVars
|
||||
logger *zap.Logger
|
||||
errCh chan error
|
||||
|
||||
dataWriter *external.EngineWriter
|
||||
indexWriter *importer.IndexRouteWriter
|
||||
}
|
||||
|
||||
var _ operator.Operator = (*encodeAndSortOperator)(nil)
|
||||
@ -82,18 +86,64 @@ func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor,
|
||||
logger: executor.logger,
|
||||
errCh: make(chan error),
|
||||
}
|
||||
|
||||
if op.tableImporter.IsGlobalSort() {
|
||||
op.initWriters(executor, indexMemorySizeLimit)
|
||||
}
|
||||
|
||||
pool := workerpool.NewWorkerPool(
|
||||
"encodeAndSortOperator",
|
||||
util.ImportInto,
|
||||
int(executor.taskMeta.Plan.ThreadCnt),
|
||||
func() workerpool.Worker[*importStepMinimalTask, workerpool.None] {
|
||||
return newChunkWorker(ctx, op, indexMemorySizeLimit)
|
||||
return newChunkWorker(subCtx, op)
|
||||
},
|
||||
)
|
||||
op.AsyncOperator = operator.NewAsyncOperator(subCtx, pool)
|
||||
return op
|
||||
}
|
||||
|
||||
// with current design of global sort writer, we only create one writer for
|
||||
// each kv group, and all chunks shares the same writers.
|
||||
// the writer itself will sort and upload data concurrently.
|
||||
func (op *encodeAndSortOperator) initWriters(executor *importStepExecutor, indexMemorySizeLimit uint64) {
|
||||
totalDataKVMemSizeLimit := external.DefaultMemSizeLimit * uint64(executor.taskMeta.Plan.ThreadCnt)
|
||||
totalMemSizeLimitPerIndexWriter := indexMemorySizeLimit * uint64(executor.taskMeta.Plan.ThreadCnt)
|
||||
op.logger.Info("init global sort writer with mem limit",
|
||||
zap.String("data-limit", units.BytesSize(float64(totalDataKVMemSizeLimit))),
|
||||
zap.String("per-index-limit", units.BytesSize(float64(totalMemSizeLimitPerIndexWriter))))
|
||||
|
||||
// in case on network partition, 2 nodes might run the same subtask.
|
||||
// so use uuid to make sure the path is unique.
|
||||
workerUUID := uuid.New().String()
|
||||
// sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID}
|
||||
indexWriterFn := func(indexID int64) *external.Writer {
|
||||
builder := external.NewWriterBuilder().
|
||||
SetOnCloseFunc(func(summary *external.WriterSummary) {
|
||||
op.sharedVars.mergeIndexSummary(indexID, summary)
|
||||
}).SetMemorySizeLimit(totalMemSizeLimitPerIndexWriter).
|
||||
SetMutex(&op.sharedVars.ShareMu)
|
||||
prefix := subtaskPrefix(op.taskID, op.subtaskID)
|
||||
// writer id for index: index/{indexID}/{workerID}
|
||||
writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID)
|
||||
writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID)
|
||||
return writer
|
||||
}
|
||||
|
||||
// sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID}
|
||||
builder := external.NewWriterBuilder().
|
||||
SetOnCloseFunc(op.sharedVars.mergeDataSummary).
|
||||
SetMemorySizeLimit(totalDataKVMemSizeLimit).
|
||||
SetMutex(&op.sharedVars.ShareMu)
|
||||
prefix := subtaskPrefix(op.taskID, op.subtaskID)
|
||||
// writer id for data: data/{workerID}
|
||||
writerID := path.Join("data", workerUUID)
|
||||
writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID)
|
||||
|
||||
op.dataWriter = external.NewEngineWriter(writer)
|
||||
op.indexWriter = importer.NewIndexRouteWriter(op.logger, indexWriterFn)
|
||||
}
|
||||
|
||||
func (op *encodeAndSortOperator) Open() error {
|
||||
op.wg.Run(func() {
|
||||
for err := range op.errCh {
|
||||
@ -114,9 +164,31 @@ func (op *encodeAndSortOperator) Close() error {
|
||||
// right now AsyncOperator.Close always returns nil, ok to ignore it.
|
||||
// nolint:errcheck
|
||||
op.AsyncOperator.Close()
|
||||
|
||||
closeCtx := op.ctx
|
||||
if closeCtx.Err() != nil {
|
||||
// in case of context canceled, we need to create a new context to close writers.
|
||||
newCtx, cancel := context.WithTimeout(context.Background(), maxWaitDuration)
|
||||
closeCtx = newCtx
|
||||
defer cancel()
|
||||
}
|
||||
if op.dataWriter != nil {
|
||||
// Note: we cannot ignore close error as we're writing to S3 or GCS.
|
||||
// ignore error might cause data loss. below too.
|
||||
if _, err := op.dataWriter.Close(closeCtx); err != nil {
|
||||
op.onError(errors.Trace(err))
|
||||
}
|
||||
}
|
||||
if op.indexWriter != nil {
|
||||
if _, err := op.indexWriter.Close(closeCtx); err != nil {
|
||||
op.onError(errors.Trace(err))
|
||||
}
|
||||
}
|
||||
|
||||
op.cancel()
|
||||
close(op.errCh)
|
||||
op.wg.Wait()
|
||||
|
||||
// see comments on interface definition, this Close is actually WaitAndClose.
|
||||
return op.firstErr.Load()
|
||||
}
|
||||
@ -140,43 +212,13 @@ func (op *encodeAndSortOperator) Done() <-chan struct{} {
|
||||
type chunkWorker struct {
|
||||
ctx context.Context
|
||||
op *encodeAndSortOperator
|
||||
|
||||
dataWriter *external.EngineWriter
|
||||
indexWriter *importer.IndexRouteWriter
|
||||
}
|
||||
|
||||
func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemorySizeLimit uint64) *chunkWorker {
|
||||
func newChunkWorker(ctx context.Context, op *encodeAndSortOperator) *chunkWorker {
|
||||
w := &chunkWorker{
|
||||
ctx: ctx,
|
||||
op: op,
|
||||
}
|
||||
if op.tableImporter.IsGlobalSort() {
|
||||
// in case on network partition, 2 nodes might run the same subtask.
|
||||
workerUUID := uuid.New().String()
|
||||
// sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID}
|
||||
indexWriterFn := func(indexID int64) *external.Writer {
|
||||
builder := external.NewWriterBuilder().
|
||||
SetOnCloseFunc(func(summary *external.WriterSummary) {
|
||||
op.sharedVars.mergeIndexSummary(indexID, summary)
|
||||
}).SetMemorySizeLimit(indexMemorySizeLimit).SetMutex(&op.sharedVars.ShareMu)
|
||||
prefix := subtaskPrefix(op.taskID, op.subtaskID)
|
||||
// writer id for index: index/{indexID}/{workerID}
|
||||
writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID)
|
||||
writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID)
|
||||
return writer
|
||||
}
|
||||
|
||||
// sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID}
|
||||
builder := external.NewWriterBuilder().
|
||||
SetOnCloseFunc(op.sharedVars.mergeDataSummary).SetMutex(&op.sharedVars.ShareMu)
|
||||
prefix := subtaskPrefix(op.taskID, op.subtaskID)
|
||||
// writer id for data: data/{workerID}
|
||||
writerID := path.Join("data", workerUUID)
|
||||
writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID)
|
||||
w.dataWriter = external.NewEngineWriter(writer)
|
||||
|
||||
w.indexWriter = importer.NewIndexRouteWriter(op.logger, indexWriterFn)
|
||||
}
|
||||
return w
|
||||
}
|
||||
|
||||
@ -187,31 +229,12 @@ func (w *chunkWorker) HandleTask(task *importStepMinimalTask, _ func(workerpool.
|
||||
// we don't use the input send function, it makes workflow more complex
|
||||
// we send result to errCh and handle it here.
|
||||
executor := newImportMinimalTaskExecutor(task)
|
||||
if err := executor.Run(w.ctx, w.dataWriter, w.indexWriter); err != nil {
|
||||
if err := executor.Run(w.ctx, w.op.dataWriter, w.op.indexWriter); err != nil {
|
||||
w.op.onError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *chunkWorker) Close() {
|
||||
closeCtx := w.ctx
|
||||
if closeCtx.Err() != nil {
|
||||
// in case of context canceled, we need to create a new context to close writers.
|
||||
newCtx, cancel := context.WithTimeout(context.Background(), maxWaitDuration)
|
||||
closeCtx = newCtx
|
||||
defer cancel()
|
||||
}
|
||||
if w.dataWriter != nil {
|
||||
// Note: we cannot ignore close error as we're writing to S3 or GCS.
|
||||
// ignore error might cause data loss. below too.
|
||||
if _, err := w.dataWriter.Close(closeCtx); err != nil {
|
||||
w.op.onError(errors.Trace(err))
|
||||
}
|
||||
}
|
||||
if w.indexWriter != nil {
|
||||
if _, err := w.indexWriter.Close(closeCtx); err != nil {
|
||||
w.op.onError(errors.Trace(err))
|
||||
}
|
||||
}
|
||||
func (*chunkWorker) Close() {
|
||||
}
|
||||
|
||||
func subtaskPrefix(taskID, subtaskID int64) string {
|
||||
|
||||
@ -72,15 +72,20 @@ func TestEncodeAndSortOperator(t *testing.T) {
|
||||
tableImporter: &importer.TableImporter{
|
||||
LoadDataController: &importer.LoadDataController{
|
||||
Plan: &importer.Plan{
|
||||
CloudStorageURI: "",
|
||||
CloudStorageURI: "s3://test-bucket/test-path",
|
||||
},
|
||||
},
|
||||
},
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
sharedVars := &SharedVars{
|
||||
SortedDataMeta: &external.SortedKVMeta{},
|
||||
SortedIndexMetas: map[int64]*external.SortedKVMeta{},
|
||||
}
|
||||
|
||||
source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask))
|
||||
op := newEncodeAndSortOperator(context.Background(), executorForParam, nil, 3, 0)
|
||||
op := newEncodeAndSortOperator(context.Background(), executorForParam, sharedVars, 3, 0)
|
||||
op.SetSource(source)
|
||||
require.NoError(t, op.Open())
|
||||
require.Greater(t, len(op.String()), 0)
|
||||
@ -100,7 +105,7 @@ func TestEncodeAndSortOperator(t *testing.T) {
|
||||
// cancel on error and log other errors
|
||||
mockErr2 := errors.New("mock err 2")
|
||||
source = operator.NewSimpleDataChannel(make(chan *importStepMinimalTask))
|
||||
op = newEncodeAndSortOperator(context.Background(), executorForParam, nil, 2, 0)
|
||||
op = newEncodeAndSortOperator(context.Background(), executorForParam, sharedVars, 2, 0)
|
||||
op.SetSource(source)
|
||||
executor1 := mock.NewMockMiniTaskExecutor(ctrl)
|
||||
executor2 := mock.NewMockMiniTaskExecutor(ctrl)
|
||||
|
||||
@ -107,7 +107,7 @@ func (s *importStepExecutor) Init(ctx context.Context) error {
|
||||
}()
|
||||
}
|
||||
s.indexMemorySizeLimit = getWriterMemorySizeLimit(s.tableImporter.Plan)
|
||||
s.logger.Info("index writer memory size limit",
|
||||
s.logger.Info("memory size limit per index writer per concurrency",
|
||||
zap.String("limit", units.BytesSize(float64(s.indexMemorySizeLimit))))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -78,6 +78,7 @@ go_test(
|
||||
name = "importer_test",
|
||||
timeout = "short",
|
||||
srcs = [
|
||||
"chunk_process_test.go",
|
||||
"import_test.go",
|
||||
"job_test.go",
|
||||
"precheck_test.go",
|
||||
@ -86,9 +87,10 @@ go_test(
|
||||
embed = [":importer"],
|
||||
flaky = True,
|
||||
race = "on",
|
||||
shard_count = 16,
|
||||
shard_count = 17,
|
||||
deps = [
|
||||
"//br/pkg/errors",
|
||||
"//br/pkg/lightning/backend/external",
|
||||
"//br/pkg/lightning/config",
|
||||
"//br/pkg/lightning/mydump",
|
||||
"//br/pkg/streamhelper",
|
||||
@ -104,6 +106,7 @@ go_test(
|
||||
"//sessionctx/variable",
|
||||
"//testkit",
|
||||
"//types",
|
||||
"//util",
|
||||
"//util/dbterror/exeerrors",
|
||||
"//util/etcd",
|
||||
"//util/logutil",
|
||||
|
||||
@ -17,6 +17,7 @@ package importer
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/go-units"
|
||||
@ -32,6 +33,7 @@ import (
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
|
||||
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
|
||||
"github.com/pingcap/tidb/executor/asyncloaddata"
|
||||
tidbkv "github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/tablecodec"
|
||||
"github.com/pingcap/tidb/util/syncutil"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
@ -347,20 +349,60 @@ func (p *chunkProcessor) deliverLoop(ctx context.Context) error {
|
||||
// writer will take 256MiB buffer on default.
|
||||
// this will take a lot of memory, or even OOM.
|
||||
type IndexRouteWriter struct {
|
||||
writers map[int64]*external.Writer
|
||||
// this writer and all wrappedWriters are shared by all deliver routines,
|
||||
// so we need to synchronize them.
|
||||
sync.RWMutex
|
||||
writers map[int64]*wrappedWriter
|
||||
logger *zap.Logger
|
||||
writerFactory func(int64) *external.Writer
|
||||
}
|
||||
|
||||
type wrappedWriter struct {
|
||||
sync.Mutex
|
||||
*external.Writer
|
||||
}
|
||||
|
||||
func (w *wrappedWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
return w.Writer.WriteRow(ctx, idxKey, idxVal, handle)
|
||||
}
|
||||
|
||||
func (w *wrappedWriter) Close(ctx context.Context) error {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
return w.Writer.Close(ctx)
|
||||
}
|
||||
|
||||
// NewIndexRouteWriter creates a new IndexRouteWriter.
|
||||
func NewIndexRouteWriter(logger *zap.Logger, writerFactory func(int64) *external.Writer) *IndexRouteWriter {
|
||||
return &IndexRouteWriter{
|
||||
writers: make(map[int64]*external.Writer),
|
||||
writers: make(map[int64]*wrappedWriter),
|
||||
logger: logger,
|
||||
writerFactory: writerFactory,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *IndexRouteWriter) getWriter(indexID int64) *wrappedWriter {
|
||||
w.RLock()
|
||||
writer, ok := w.writers[indexID]
|
||||
w.RUnlock()
|
||||
if ok {
|
||||
return writer
|
||||
}
|
||||
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
writer, ok = w.writers[indexID]
|
||||
if !ok {
|
||||
writer = &wrappedWriter{
|
||||
Writer: w.writerFactory(indexID),
|
||||
}
|
||||
w.writers[indexID] = writer
|
||||
}
|
||||
return writer
|
||||
}
|
||||
|
||||
// AppendRows implements backend.EngineWriter interface.
|
||||
func (w *IndexRouteWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error {
|
||||
kvs := kv.Rows2KvPairs(rows)
|
||||
@ -372,11 +414,7 @@ func (w *IndexRouteWriter) AppendRows(ctx context.Context, _ []string, rows enco
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
writer, ok := w.writers[indexID]
|
||||
if !ok {
|
||||
writer = w.writerFactory(indexID)
|
||||
w.writers[indexID] = writer
|
||||
}
|
||||
writer := w.getWriter(indexID)
|
||||
if err = writer.WriteRow(ctx, item.Key, item.Val, nil); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -392,6 +430,8 @@ func (*IndexRouteWriter) IsSynced() bool {
|
||||
// Close implements backend.EngineWriter interface.
|
||||
func (w *IndexRouteWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
|
||||
var firstErr error
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
for _, writer := range w.writers {
|
||||
if err := writer.Close(ctx); err != nil {
|
||||
if firstErr == nil {
|
||||
|
||||
54
executor/importer/chunk_process_test.go
Normal file
54
executor/importer/chunk_process_test.go
Normal file
@ -0,0 +1,54 @@
|
||||
// Copyright 2023 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package importer
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestIndexRouteWriter(t *testing.T) {
|
||||
logger := zap.NewExample()
|
||||
routeWriter := NewIndexRouteWriter(logger, func(i int64) *external.Writer {
|
||||
return external.NewWriterBuilder().Build(nil, "", "")
|
||||
})
|
||||
wg := util.WaitGroupWrapper{}
|
||||
for i := 0; i < 10; i++ {
|
||||
idx := i
|
||||
wg.Run(func() {
|
||||
seed := time.Now().Unix()
|
||||
logger.Info("seed", zap.Int("idx", idx), zap.Int64("seed", seed))
|
||||
r := rand.New(rand.NewSource(seed))
|
||||
gotWriters := make(map[int64]*wrappedWriter)
|
||||
for i := 0; i < 3000; i++ {
|
||||
indexID := int64(r.Int()) % 100
|
||||
writer := routeWriter.getWriter(indexID)
|
||||
require.NotNil(t, writer)
|
||||
if got, ok := gotWriters[indexID]; ok {
|
||||
require.Equal(t, got, writer)
|
||||
} else {
|
||||
gotWriters[indexID] = writer
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"slices"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
@ -115,3 +116,30 @@ func (s *mockGCSSuite) TestGlobalSortBasic() {
|
||||
s.NoError(err)
|
||||
s.Len(files, 0)
|
||||
}
|
||||
|
||||
func (s *mockGCSSuite) TestGlobalSortMultiFiles() {
|
||||
var allData []string
|
||||
for i := 0; i < 10; i++ {
|
||||
var content []byte
|
||||
keyCnt := 1000
|
||||
for j := 0; j < keyCnt; j++ {
|
||||
idx := i*keyCnt + j
|
||||
content = append(content, []byte(fmt.Sprintf("%d,test-%d\n", idx, idx))...)
|
||||
allData = append(allData, fmt.Sprintf("%d test-%d", idx, idx))
|
||||
}
|
||||
s.server.CreateObject(fakestorage.Object{
|
||||
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "gs-multi-files", Name: fmt.Sprintf("t.%d.csv", i)},
|
||||
Content: content,
|
||||
})
|
||||
}
|
||||
slices.Sort(allData)
|
||||
s.prepareAndUseDB("gs_multi_files")
|
||||
s.server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: "sorted"})
|
||||
s.tk.MustExec("create table t (a bigint primary key , b varchar(100), key(b), key(a,b), key(b,a));")
|
||||
// 1 subtask, encoding 10 files using 4 threads.
|
||||
sortStorageURI := fmt.Sprintf("gs://sorted/gs_multi_files?endpoint=%s", gcsEndpoint)
|
||||
importSQL := fmt.Sprintf(`import into t FROM 'gs://gs-multi-files/t.*.csv?endpoint=%s'
|
||||
with thread=4, cloud_storage_uri='%s'`, gcsEndpoint, sortStorageURI)
|
||||
s.tk.MustQuery(importSQL)
|
||||
s.tk.MustQuery("select * from t").Sort().Check(testkit.Rows(allData...))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user