ddl: support read records with copr for adding index (#39191)

ref pingcap/tidb#35983
This commit is contained in:
tangenta
2022-11-18 19:51:56 +08:00
committed by GitHub
parent 48e17e2ecb
commit e9253f73e0
6 changed files with 374 additions and 2 deletions

View File

@ -27,6 +27,7 @@ go_library(
"foreign_key.go",
"generated_column.go",
"index.go",
"index_cop.go",
"index_merge_tmp.go",
"job_table.go",
"mock.go",
@ -168,6 +169,7 @@ go_test(
"fail_test.go",
"foreign_key_test.go",
"index_change_test.go",
"index_cop_test.go",
"index_merge_tmp_test.go",
"index_modify_test.go",
"integration_test.go",

View File

@ -158,6 +158,13 @@ type reorgBackfillTask struct {
endInclude bool
}
func (r *reorgBackfillTask) excludedEndKey() kv.Key {
if r.endInclude {
return r.endKey.Next()
}
return r.endKey
}
func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
startKey := tryDecodeToHandleString(r.startKey)
@ -342,6 +349,11 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
w.batchCnt = int(variable.GetDDLReorgBatchSize())
result := w.handleBackfillTask(d, task, bf)
w.resultCh <- result
if result.err != nil {
logutil.BgLogger().Info("[ddl] backfill worker exit on error",
zap.Stringer("type", w.tp), zap.Int("workerID", w.id), zap.Error(result.err))
return
}
}
}

View File

@ -14,6 +14,28 @@
package ddl
import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/types"
)
func SetBatchInsertDeleteRangeSize(i int) {
batchInsertDeleteRangeSize = i
}
var (
FetchRowsFromCop4Test = fetchRowsFromCop
NewCopContext4Test = newCopContext
)
type (
IndexRecord4Test = *indexRecord
)
func (i IndexRecord4Test) GetHandle() kv.Handle {
return i.handle
}
func (i IndexRecord4Test) GetIndexValues() []types.Datum {
return i.vals
}

View File

@ -1180,6 +1180,7 @@ type baseIndexWorker struct {
type addIndexWorker struct {
baseIndexWorker
index table.Index
copCtx *copContext
writerCtx *ingest.WriterContext
// The following attributes are used to reduce memory allocation.
@ -1200,6 +1201,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
var lwCtx *ingest.WriterContext
var copCtx *copContext
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
if !ok {
@ -1213,6 +1215,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
if err != nil {
return nil, err
}
copCtx = newCopContext(t.Meta(), indexInfo, sessCtx)
}
return &addIndexWorker{
@ -1227,6 +1230,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
jobContext: jc,
},
index: index,
copCtx: copCtx,
writerCtx: lwCtx,
}, nil
}
@ -1484,7 +1488,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
oprStartTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(kv.Priority, w.priority)
@ -1492,7 +1496,16 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
var (
idxRecords []*indexRecord
nextKey kv.Key
taskDone bool
)
if w.copCtx != nil {
idxRecords, nextKey, taskDone, err = w.fetchRowColValsFromCop(txn, handleRange)
} else {
idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange)
}
if err != nil {
return errors.Trace(err)
}

221
ddl/index_cop.go Normal file
View File

@ -0,0 +1,221 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"context"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
)
// copReadBatchFactor is the factor of batch size of coprocessor read.
// It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range.
const copReadBatchFactor = 10
func (w *addIndexWorker) fetchRowColValsFromCop(txn kv.Transaction, handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) {
w.idxRecords = w.idxRecords[:0]
start, end := handleRange.startKey, handleRange.excludedEndKey()
batchCnt := w.batchCnt * copReadBatchFactor
return fetchRowsFromCop(w.ctx, w.copCtx, start, end, txn.StartTS(), w.idxRecords, batchCnt)
}
// fetchRowsFromCop sends a coprocessor request and fetches the first batchCnt rows.
func fetchRowsFromCop(ctx context.Context, copCtx *copContext, startKey, endKey kv.Key, startTS uint64,
buf []*indexRecord, batchCnt int) ([]*indexRecord, kv.Key, bool, error) {
srcResult, err := copCtx.buildTableScan(ctx, startTS, startKey, endKey)
if err != nil {
return nil, nil, false, errors.Trace(err)
}
var done bool
buf, done, err = copCtx.fetchTableScanResult(ctx, srcResult, buf, batchCnt)
nextKey := endKey
if !done {
lastHandle := buf[len(buf)-1].handle
prefix := tablecodec.GenTableRecordPrefix(copCtx.tblInfo.ID)
nextKey = tablecodec.EncodeRecordKey(prefix, lastHandle).Next()
}
return buf, nextKey, done, err
}
type copContext struct {
tblInfo *model.TableInfo
idxInfo *model.IndexInfo
pkInfo *model.IndexInfo
colInfos []*model.ColumnInfo
fieldTps []*types.FieldType
sessCtx sessionctx.Context
srcChunk *chunk.Chunk
}
func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) *copContext {
colInfos := make([]*model.ColumnInfo, 0, len(idxInfo.Columns))
fieldTps := make([]*types.FieldType, 0, len(idxInfo.Columns))
for _, idxCol := range idxInfo.Columns {
c := tblInfo.Columns[idxCol.Offset]
if c.IsGenerated() && !c.GeneratedStored {
// TODO(tangenta): support reading virtual generated columns.
return nil
}
colInfos = append(colInfos, c)
fieldTps = append(fieldTps, &c.FieldType)
}
pkColInfos, pkFieldTps, pkInfo := buildHandleColInfoAndFieldTypes(tblInfo)
colInfos = append(colInfos, pkColInfos...)
fieldTps = append(fieldTps, pkFieldTps...)
copCtx := &copContext{
tblInfo: tblInfo,
idxInfo: idxInfo,
pkInfo: pkInfo,
colInfos: colInfos,
fieldTps: fieldTps,
sessCtx: sessCtx,
srcChunk: chunk.NewChunkWithCapacity(fieldTps, variable.DefMaxChunkSize),
}
return copCtx
}
func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start, end kv.Key) (distsql.SelectResult, error) {
dagPB, err := buildDAGPB(c.sessCtx, c.tblInfo, c.colInfos)
if err != nil {
return nil, err
}
var builder distsql.RequestBuilder
kvReq, err := builder.
SetDAGRequest(dagPB).
SetStartTS(startTS).
SetKeyRanges([]kv.KeyRange{{StartKey: start, EndKey: end}}).
SetKeepOrder(true).
SetFromSessionVars(c.sessCtx.GetSessionVars()).
SetFromInfoSchema(c.sessCtx.GetDomainInfoSchema()).
SetConcurrency(1).
Build()
if err != nil {
return nil, err
}
return distsql.Select(ctx, c.sessCtx, kvReq, c.fieldTps, statistics.NewQueryFeedback(0, nil, 0, false))
}
func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.SelectResult,
buf []*indexRecord, batchCnt int) ([]*indexRecord, bool, error) {
sctx := c.sessCtx.GetSessionVars().StmtCtx
for {
err := result.Next(ctx, c.srcChunk)
if err != nil {
return nil, false, errors.Trace(err)
}
if c.srcChunk.NumRows() == 0 {
return buf, true, nil
}
iter := chunk.NewIterator4Chunk(c.srcChunk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps)
handle, err := buildHandle(hdDt, c.tblInfo, c.pkInfo, sctx)
if err != nil {
return nil, false, errors.Trace(err)
}
rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo)
buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false})
if len(buf) >= batchCnt {
return buf, false, nil
}
}
}
}
func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(sCtx.GetSessionVars().Location())
sc := sCtx.GetSessionVars().StmtCtx
dagReq.Flags = sc.PushDownFlags()
for i := range colInfos {
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
}
execPB, err := constructTableScanPB(sCtx, tblInfo, colInfos)
if err != nil {
return nil, err
}
dagReq.Executors = append(dagReq.Executors, execPB)
distsql.SetEncodeType(sCtx, dagReq)
return dagReq, nil
}
func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.Executor, error) {
tblScan := tables.BuildTableScanFromInfos(tblInfo, colInfos)
tblScan.TableId = tblInfo.ID
err := tables.SetPBColumnsDefaultValue(sCtx, tblScan.Columns, colInfos)
return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err
}
func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, *model.IndexInfo) {
if tbInfo.PKIsHandle {
for i := range tbInfo.Columns {
if mysql.HasPriKeyFlag(tbInfo.Columns[i].GetFlag()) {
return []*model.ColumnInfo{tbInfo.Columns[i]}, []*types.FieldType{&tbInfo.Columns[i].FieldType}, nil
}
}
} else if tbInfo.IsCommonHandle {
primaryIdx := tables.FindPrimaryIndex(tbInfo)
pkCols := make([]*model.ColumnInfo, 0, len(primaryIdx.Columns))
pkFts := make([]*types.FieldType, 0, len(primaryIdx.Columns))
for _, pkCol := range primaryIdx.Columns {
pkCols = append(pkCols, tbInfo.Columns[pkCol.Offset])
pkFts = append(pkFts, &tbInfo.Columns[pkCol.Offset].FieldType)
}
return pkCols, pkFts, primaryIdx
}
extra := model.NewExtraHandleColInfo()
return []*model.ColumnInfo{extra}, []*types.FieldType{&extra.FieldType}, nil
}
func extractIdxValsAndHandle(row chunk.Row, idxInfo *model.IndexInfo, fieldTps []*types.FieldType) ([]types.Datum, []types.Datum) {
datumBuf := make([]types.Datum, 0, len(fieldTps))
idxColLen := len(idxInfo.Columns)
for i, ft := range fieldTps {
datumBuf = append(datumBuf, row.GetDatum(i, ft))
}
return datumBuf[:idxColLen], datumBuf[idxColLen:]
}
func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo,
pkInfo *model.IndexInfo, stmtCtx *stmtctx.StatementContext) (kv.Handle, error) {
if tblInfo.IsCommonHandle {
tablecodec.TruncateIndexValues(tblInfo, pkInfo, pkDts)
handleBytes, err := codec.EncodeKey(stmtCtx, nil, pkDts...)
if err != nil {
return nil, err
}
return kv.NewCommonHandle(handleBytes)
}
return kv.IntHandle(pkDts[0].GetInt64()), nil
}

102
ddl/index_cop_test.go Normal file
View File

@ -0,0 +1,102 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl_test
import (
"context"
"fmt"
"strconv"
"testing"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
)
func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
testFetchRows := func(db, tb, idx string) ([]kv.Handle, [][]types.Datum) {
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(tb))
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.FindIndexByName(idx)
copCtx := ddl.NewCopContext4Test(tblInfo, idxInfo, tk.Session())
startKey := tbl.RecordPrefix()
endKey := startKey.PrefixNext()
txn, err := store.Begin()
require.NoError(t, err)
idxRec, _, done, err := ddl.FetchRowsFromCop4Test(context.Background(), copCtx, startKey, endKey,
txn.StartTS(), nil, 10)
require.NoError(t, err)
require.True(t, done)
require.NoError(t, txn.Rollback())
handles := make([]kv.Handle, 0, len(idxRec))
values := make([][]types.Datum, 0, len(idxRec))
for _, rec := range idxRec {
handles = append(handles, rec.GetHandle())
values = append(values, rec.GetIndexValues())
}
return handles, values
}
// Test nonclustered primary key table.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint, b int, index idx (b));")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
hds, vals := testFetchRows("test", "t", "idx")
require.Len(t, hds, 8)
for i := 0; i < 8; i++ {
require.Equal(t, hds[i].IntValue(), int64(i+1))
require.Len(t, vals[i], 1)
require.Equal(t, vals[i][0].GetInt64(), int64(i))
}
// Test clustered primary key table(pk_is_handle).
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint primary key, b int, index idx (b));")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
hds, vals = testFetchRows("test", "t", "idx")
require.Len(t, hds, 8)
for i := 0; i < 8; i++ {
require.Equal(t, hds[i].IntValue(), int64(i))
require.Len(t, vals[i], 1)
require.Equal(t, vals[i][0].GetInt64(), int64(i))
}
// Test clustered primary key table(common_handle).
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a varchar(10), b int, c char(10), primary key (a, c) clustered, index idx (b));")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t values (?, ?, ?)", strconv.Itoa(i), i, strconv.Itoa(i))
}
hds, vals = testFetchRows("test", "t", "idx")
require.Len(t, hds, 8)
for i := 0; i < 8; i++ {
require.Equal(t, hds[i].String(), fmt.Sprintf("{%d, %d}", i, i))
require.Len(t, vals[i], 1)
require.Equal(t, vals[i][0].GetInt64(), int64(i))
}
}