86 lines
2.9 KiB
Go
86 lines
2.9 KiB
Go
// Copyright 2022 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package ddl_test
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ngaut/pools"
|
|
"github.com/pingcap/tidb/pkg/ddl"
|
|
"github.com/pingcap/tidb/pkg/ddl/copr"
|
|
"github.com/pingcap/tidb/pkg/ddl/session"
|
|
"github.com/pingcap/tidb/pkg/ddl/testutil"
|
|
"github.com/pingcap/tidb/pkg/dxf/framework/taskexecutor/execute"
|
|
"github.com/pingcap/tidb/pkg/dxf/operator"
|
|
"github.com/pingcap/tidb/pkg/errctx"
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/table"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/mock"
|
|
)
|
|
|
|
func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey, endKey kv.Key, store kv.Storage,
|
|
batchSize int) (*chunk.Chunk, error) {
|
|
resPool := pools.NewResourcePool(func() (pools.Resource, error) {
|
|
ctx := mock.NewContext()
|
|
ctx.Store = store
|
|
return ctx, nil
|
|
}, 8, 8, 0)
|
|
sessPool := session.NewSessionPool(resPool)
|
|
srcChkPool := &sync.Pool{
|
|
New: func() any {
|
|
return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes,
|
|
batchSize)
|
|
},
|
|
}
|
|
wctx := ddl.NewLocalWorkerCtx(context.Background(), 1)
|
|
defer wctx.Cancel()
|
|
src := testutil.NewOperatorTestSource(ddl.TableScanTask{ID: 1, Start: startKey, End: endKey})
|
|
scanOp := ddl.NewTableScanOperator(wctx, sessPool, copCtx, srcChkPool, 1, 0, &model.DDLReorgMeta{}, nil, &execute.TestCollector{})
|
|
sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]()
|
|
|
|
operator.Compose(src, scanOp)
|
|
operator.Compose(scanOp, sink)
|
|
|
|
pipeline := operator.NewAsyncPipeline(src, scanOp, sink)
|
|
err := pipeline.Execute()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = pipeline.Close()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
results := sink.Collect()
|
|
return results[0].Chunk, nil
|
|
}
|
|
|
|
func ConvertRowToHandleAndIndexDatum(
|
|
ctx expression.EvalContext,
|
|
handleDataBuf, idxDataBuf []types.Datum,
|
|
row chunk.Row, copCtx copr.CopContext, idxID int64) (kv.Handle, []types.Datum, error) {
|
|
c := copCtx.GetBase()
|
|
idxData := ddl.ExtractDatumByOffsets(ctx, row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf)
|
|
handleData := ddl.ExtractDatumByOffsets(ctx, row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf)
|
|
handle, err := ddl.BuildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, time.Local, errctx.StrictNoWarningContext)
|
|
return handle, idxData, err
|
|
}
|