diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 22743f618f..e5c795c81a 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -66,6 +66,13 @@ const ( importMutexStateReadLock ) +const ( + // DupDetectDirSuffix is used by pre-deduplication to store the encoded index KV. + DupDetectDirSuffix = ".dupdetect" + // DupResultDirSuffix is used by pre-deduplication to store the duplicated row ID. + DupResultDirSuffix = ".dupresult" +) + // engineMeta contains some field that is necessary to continue the engine restore/import process. // These field should be written to disk when we update chunk checkpoint type engineMeta struct { @@ -159,14 +166,21 @@ func (e *Engine) Close() error { return err } -// Cleanup remove meta and db files +// Cleanup remove meta, db and duplicate detection files func (e *Engine) Cleanup(dataDir string) error { if err := os.RemoveAll(e.sstDir); err != nil { return errors.Trace(err) } + uuid := e.UUID.String() + if err := os.RemoveAll(filepath.Join(dataDir, uuid+DupDetectDirSuffix)); err != nil { + return errors.Trace(err) + } + if err := os.RemoveAll(filepath.Join(dataDir, uuid+DupResultDirSuffix)); err != nil { + return errors.Trace(err) + } - dbPath := filepath.Join(dataDir, e.UUID.String()) - return os.RemoveAll(dbPath) + dbPath := filepath.Join(dataDir, uuid) + return errors.Trace(os.RemoveAll(dbPath)) } // Exist checks if db folder existing (meta sometimes won't flush before lightning exit) diff --git a/br/pkg/lightning/duplicate/detector.go b/br/pkg/lightning/duplicate/detector.go index 23d1f97e54..21309e0095 100644 --- a/br/pkg/lightning/duplicate/detector.go +++ b/br/pkg/lightning/duplicate/detector.go @@ -199,6 +199,7 @@ type Handler interface { Begin(key []byte) error // Append appends a keyID to the current duplicate key. // Multiple keyIDs are appended in lexicographical order. + // NOTE: keyID may be changed after the call. Append(keyID []byte) error // End is called when all keyIDs of the current duplicate key have been appended. End() error diff --git a/br/pkg/lightning/errormanager/errormanager.go b/br/pkg/lightning/errormanager/errormanager.go index 59e06d04b0..347be8a65c 100644 --- a/br/pkg/lightning/errormanager/errormanager.go +++ b/br/pkg/lightning/errormanager/errormanager.go @@ -133,7 +133,7 @@ const ( insertIntoConflictErrorV2 = ` INSERT INTO %s.` + conflictErrorV2TableName + ` (task_id, table_name, path, offset, error, row_id, row_data) - VALUES (?, ?, ?, ?, ?, ?, ?); + VALUES (?, ?, ?, ?, ?, ?, ?); ` ) @@ -156,6 +156,11 @@ func (em *ErrorManager) TypeErrorsRemain() int64 { return em.remainingError.Type.Load() } +// RemainRecord returns the number of errors that need be recorded. +func (em *ErrorManager) RemainRecord() int64 { + return em.maxErrRecords.Load() +} + // New creates a new error manager. func New(db *sql.DB, cfg *config.Config, logger log.Logger) *ErrorManager { maxErrRecords := &atomic.Int64{} diff --git a/br/pkg/lightning/importer/BUILD.bazel b/br/pkg/lightning/importer/BUILD.bazel index bb41fe1ef1..0ed8a538dc 100644 --- a/br/pkg/lightning/importer/BUILD.bazel +++ b/br/pkg/lightning/importer/BUILD.bazel @@ -58,10 +58,13 @@ go_library( "//parser/mysql", "//planner/core", "//store/driver", + "//store/driver/txn", "//store/pdtypes", "//table", "//table/tables", + "//tablecodec", "//types", + "//util/codec", "//util/collate", "//util/dbterror", "//util/engine", @@ -147,7 +150,9 @@ go_test( "//store/mockstore", "//store/pdtypes", "//table/tables", + "//tablecodec", "//types", + "//util/codec", "//util/dbutil", "//util/extsort", "//util/mock", diff --git a/br/pkg/lightning/importer/chunk_process.go b/br/pkg/lightning/importer/chunk_process.go index 9f5908bcce..1fc236c2b9 100644 --- a/br/pkg/lightning/importer/chunk_process.go +++ b/br/pkg/lightning/importer/chunk_process.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + kv2 "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/backend/tidb" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" @@ -36,7 +37,11 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/store/driver/txn" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/extsort" "go.uber.org/zap" ) @@ -199,7 +204,15 @@ func (cr *chunkProcessor) process( logTask := logger.Begin(zap.InfoLevel, "restore file") - readTotalDur, encodeTotalDur, encodeErr := cr.encodeLoop(ctx, kvsCh, t, logger, kvEncoder, deliverCompleteCh, rc) + readTotalDur, encodeTotalDur, encodeErr := cr.encodeLoop( + ctx, + kvsCh, + t, + logger, + kvEncoder, + deliverCompleteCh, + rc, + ) var deliverErr error select { case deliverResult, ok := <-deliverCompleteCh: @@ -233,6 +246,36 @@ func (cr *chunkProcessor) encodeLoop( ) (readTotalDur time.Duration, encodeTotalDur time.Duration, err error) { defer close(kvsCh) + // when AddIndexBySQL, we use all PK and UK to run pre-deduplication, and then we + // strip almost all secondary index to run encodeLoop. In encodeLoop when we meet + // a duplicated row marked by pre-deduplication, we need original table structure + // to generate the duplicate error message, so here create a new encoder with + // original table structure. + originalTableEncoder := kvEncoder + if rc.cfg.TikvImporter.AddIndexBySQL { + encTable, err := tables.TableFromMeta(t.alloc, t.tableInfo.Desired) + if err != nil { + return 0, 0, errors.Trace(err) + } + + originalTableEncoder, err = rc.encBuilder.NewEncoder(ctx, &encode.EncodingConfig{ + SessionOptions: encode.SessionOptions{ + SQLMode: rc.cfg.TiDB.SQLMode, + Timestamp: cr.chunk.Timestamp, + SysVars: rc.sysVars, + // use chunk.PrevRowIDMax as the auto random seed, so it can stay the same value after recover from checkpoint. + AutoRandomSeed: cr.chunk.Chunk.PrevRowIDMax, + }, + Path: cr.chunk.Key.Path, + Table: encTable, + Logger: logger, + }) + if err != nil { + return 0, 0, errors.Trace(err) + } + defer originalTableEncoder.Close() + } + send := func(kvs []deliveredKVs) error { select { case kvsCh <- kvs: @@ -376,11 +419,32 @@ func (cr *chunkProcessor) encodeLoop( } if isDupIgnored { cr.parser.RecycleRow(lastRow) + lastOffset := curOffset curOffset = newOffset + if rc.errorMgr.RemainRecord() <= 0 { + continue + } + + dupMsg := cr.getDuplicateMessage( + originalTableEncoder, + lastRow, + lastOffset, + dupIgnoreRowsIter.UnsafeValue(), + t.tableInfo.Desired, + logger, + ) rowText := tidb.EncodeRowForRecord(ctx, t.encTable, rc.cfg.TiDB.SQLMode, lastRow.Row, cr.chunk.ColumnPermutation) - // TODO: fill error message - err = rc.errorMgr.RecordConflictErrorV2(ctx, logger, t.tableName, cr.chunk.Key.Path, newOffset, "", lastRow.RowID, rowText) + err = rc.errorMgr.RecordConflictErrorV2( + ctx, + logger, + t.tableName, + cr.chunk.Key.Path, + newOffset, + dupMsg, + lastRow.RowID, + rowText, + ) if err != nil { return 0, 0, err } @@ -452,6 +516,57 @@ func (cr *chunkProcessor) encodeLoop( return } +// getDuplicateMessage gets the duplicate message like a SQL error. When it meets +// internal error, the error message will be returned instead of the duplicate message. +// If the index is not found (which is not expected), an empty string will be returned. +func (cr *chunkProcessor) getDuplicateMessage( + kvEncoder encode.Encoder, + lastRow mydump.Row, + lastOffset int64, + encodedIdxID []byte, + tableInfo *model.TableInfo, + logger log.Logger, +) string { + _, idxID, err := codec.DecodeVarint(encodedIdxID) + if err != nil { + return err.Error() + } + kvs, err := kvEncoder.Encode(lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation, lastOffset) + if err != nil { + return err.Error() + } + + if idxID == conflictOnHandle { + for _, kv := range kvs.(*kv2.Pairs).Pairs { + if tablecodec.IsRecordKey(kv.Key) { + dupErr := txn.ExtractKeyExistsErrFromHandle(kv.Key, kv.Val, tableInfo) + return dupErr.Error() + } + } + // should not happen + logger.Warn("fail to find conflict record key", + zap.String("file", cr.chunk.FileMeta.Path), + zap.Any("row", lastRow.Row)) + } else { + for _, kv := range kvs.(*kv2.Pairs).Pairs { + _, decodedIdxID, isRecordKey, err := tablecodec.DecodeKeyHead(kv.Key) + if err != nil { + return err.Error() + } + if !isRecordKey && decodedIdxID == idxID { + dupErr := txn.ExtractKeyExistsErrFromIndex(kv.Key, kv.Val, tableInfo, idxID) + return dupErr.Error() + } + } + // should not happen + logger.Warn("fail to find conflict index key", + zap.String("file", cr.chunk.FileMeta.Path), + zap.Int64("idxID", idxID), + zap.Any("row", lastRow.Row)) + } + return "" +} + //nolint:nakedret // TODO: refactor func (cr *chunkProcessor) deliverLoop( ctx context.Context, diff --git a/br/pkg/lightning/importer/dup_detect.go b/br/pkg/lightning/importer/dup_detect.go index 241ee62784..6a457366fb 100644 --- a/br/pkg/lightning/importer/dup_detect.go +++ b/br/pkg/lightning/importer/dup_detect.go @@ -29,9 +29,12 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/extsort" "go.uber.org/zap" + "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" ) @@ -105,38 +108,63 @@ func makeDupHandlerConstructor( } } +// ErrDuplicateKey is an error class for duplicate key error. +var ErrDuplicateKey = errors.Normalize("duplicate key detected on indexID %d of KeyID: %v", errors.RFCCodeText("Lightning:PreDedup:ErrDuplicateKey")) + var ( _ duplicate.Handler = &errorOnDup{} _ duplicate.Handler = &replaceOnDup{} _ duplicate.Handler = &ignoreOnDup{} ) -type errorOnDup struct{} - -func (errorOnDup) Begin(key []byte) error { - // TODO: add more useful information to the error message. - return errors.Errorf("duplicate key detected: %X", key) +type errorOnDup struct { + idxID int64 + keyIDs [][]byte } -func (errorOnDup) Append(_ []byte) error { return nil } -func (errorOnDup) End() error { return nil } -func (errorOnDup) Close() error { return nil } +func (h *errorOnDup) Begin(key []byte) error { + idxID, err := decodeIndexID(key) + if err != nil { + return err + } + h.idxID = idxID + return nil +} + +func (h *errorOnDup) Append(keyID []byte) error { + if len(h.keyIDs) >= 2 { + // we only need 2 keyIDs to report the error. + return nil + } + h.keyIDs = append(h.keyIDs, slices.Clone(keyID)) + return nil +} +func (h *errorOnDup) End() error { + return ErrDuplicateKey.GenWithStackByArgs(h.idxID, h.keyIDs) +} +func (*errorOnDup) Close() error { return nil } type replaceOnDup struct { // All keyIDs except the last one will be written to w. // keyID written to w will be ignored during importing. w extsort.Writer keyID []byte + idxID []byte // Varint encoded indexID } -func (h *replaceOnDup) Begin(_ []byte) error { +func (h *replaceOnDup) Begin(key []byte) error { h.keyID = h.keyID[:0] + idxID, err := decodeIndexID(key) + if err != nil { + return err + } + h.idxID = codec.EncodeVarint(nil, idxID) return nil } func (h *replaceOnDup) Append(keyID []byte) error { if len(h.keyID) > 0 { - if err := h.w.Put(h.keyID, nil); err != nil { + if err := h.w.Put(h.keyID, h.idxID); err != nil { return err } } @@ -157,10 +185,16 @@ type ignoreOnDup struct { // keyID written to w will be ignored during importing. w extsort.Writer first bool + idxID []byte // Varint encoded indexID } -func (h *ignoreOnDup) Begin(_ []byte) error { +func (h *ignoreOnDup) Begin(key []byte) error { h.first = true + idxID, err := decodeIndexID(key) + if err != nil { + return err + } + h.idxID = codec.EncodeVarint(nil, idxID) return nil } @@ -169,7 +203,7 @@ func (h *ignoreOnDup) Append(keyID []byte) error { h.first = false return nil } - return h.w.Put(keyID, nil) + return h.w.Put(keyID, h.idxID) } func (*ignoreOnDup) End() error { @@ -319,7 +353,7 @@ func simplifyTable( usedColOffsets := make(map[int]struct{}) for _, idxInfo := range tblInfo.Indices { if idxInfo.Primary || idxInfo.Unique { - usedIndices = append(usedIndices, idxInfo) + usedIndices = append(usedIndices, idxInfo.Clone()) for _, col := range idxInfo.Columns { usedColOffsets[col.Offset] = struct{}{} } @@ -368,3 +402,18 @@ func simplifyTable( } return newTblInfo, newColPerm } + +const conflictOnHandle = int64(-1) + +func decodeIndexID(key []byte) (int64, error) { + switch { + case tablecodec.IsRecordKey(key): + return conflictOnHandle, nil + case tablecodec.IsIndexKey(key): + _, idxID, _, err := tablecodec.DecodeIndexKey(key) + return idxID, errors.Trace(err) + + default: + return 0, errors.Errorf("unexpected key: %X, expected a record key or index key", key) + } +} diff --git a/br/pkg/lightning/importer/dup_detect_test.go b/br/pkg/lightning/importer/dup_detect_test.go index 802399bb38..2f6aaad85e 100644 --- a/br/pkg/lightning/importer/dup_detect_test.go +++ b/br/pkg/lightning/importer/dup_detect_test.go @@ -18,18 +18,45 @@ import ( "context" "testing" + "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/duplicate" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/dbutil" "github.com/pingcap/tidb/util/extsort" "github.com/stretchr/testify/require" "golang.org/x/exp/slices" ) +var ( + exampleHandleKey = tablecodec.EncodeRowKeyWithHandle(121, kv.IntHandle(22)) + exampleIndexID = int64(23) + exampleIndexKey = tablecodec.EncodeIndexSeekKey(122, exampleIndexID, nil) +) + func TestErrorOnDup(t *testing.T) { h := &errorOnDup{} - err := h.Begin([]byte("key")) - require.Error(t, err) + require.NoError(t, h.Begin(exampleHandleKey)) + require.NoError(t, h.Append([]byte{1})) + require.NoError(t, h.Append([]byte{2})) + err := h.End() + require.ErrorIs(t, err, ErrDuplicateKey) + dupErr := errors.Cause(err).(*errors.Error) + require.Equal(t, conflictOnHandle, dupErr.Args()[0]) + require.Equal(t, [][]byte{{1}, {2}}, dupErr.Args()[1]) + require.NoError(t, h.Close()) + + h = &errorOnDup{} + require.NoError(t, h.Begin(exampleIndexKey)) + require.NoError(t, h.Append([]byte{11})) + require.NoError(t, h.Append([]byte{12})) + err = h.End() + require.ErrorIs(t, err, ErrDuplicateKey) + dupErr = errors.Cause(err).(*errors.Error) + require.Equal(t, int64(23), dupErr.Args()[0]) + require.Equal(t, [][]byte{{11}, {12}}, dupErr.Args()[1]) require.NoError(t, h.Close()) } @@ -37,9 +64,12 @@ func TestReplaceOnDup(t *testing.T) { runDupHandlerTest(t, func(w extsort.Writer) duplicate.Handler { return &replaceOnDup{w: w} }, []dupRecord{{ - []byte("key1"), [][]byte{[]byte("01"), []byte("02"), []byte("03")}}, - {[]byte("key2"), [][]byte{[]byte("11"), []byte("12"), []byte("13")}}}, - [][]byte{[]byte("01"), []byte("02"), []byte("11"), []byte("12")}, + exampleHandleKey, [][]byte{[]byte("01"), []byte("02"), []byte("03")}}, + {exampleIndexKey, [][]byte{[]byte("11"), []byte("12"), []byte("13")}}}, + map[int64][][]byte{ + conflictOnHandle: {[]byte("01"), []byte("02")}, + exampleIndexID: {[]byte("11"), []byte("12")}, + }, ) } @@ -47,9 +77,12 @@ func TestIgnoreOnDup(t *testing.T) { runDupHandlerTest(t, func(w extsort.Writer) duplicate.Handler { return &ignoreOnDup{w: w} }, []dupRecord{{ - []byte("key1"), [][]byte{[]byte("01"), []byte("02"), []byte("03")}}, - {[]byte("key2"), [][]byte{[]byte("11"), []byte("12"), []byte("13")}}}, - [][]byte{[]byte("02"), []byte("03"), []byte("12"), []byte("13")}, + exampleHandleKey, [][]byte{[]byte("01"), []byte("02"), []byte("03")}}, + {exampleIndexKey, [][]byte{[]byte("11"), []byte("12"), []byte("13")}}}, + map[int64][][]byte{ + conflictOnHandle: {[]byte("02"), []byte("03")}, + exampleIndexID: {[]byte("12"), []byte("13")}, + }, ) } @@ -62,7 +95,7 @@ func runDupHandlerTest( t *testing.T, makeHandler func(w extsort.Writer) duplicate.Handler, input []dupRecord, - ignoredRowIDs [][]byte, + ignoredRowIDs map[int64][][]byte, ) { ignoreRows, err := extsort.OpenDiskSorter(t.TempDir(), nil) require.NoError(t, err) @@ -86,9 +119,11 @@ func runDupHandlerTest( it, err := ignoreRows.NewIterator(ctx) require.NoError(t, err) - var rowIDs [][]byte + rowIDs := map[int64][][]byte{} for it.First(); it.Valid(); it.Next() { - rowIDs = append(rowIDs, slices.Clone(it.UnsafeKey())) + _, idxID, err := codec.DecodeVarint(it.UnsafeValue()) + require.NoError(t, err) + rowIDs[idxID] = append(rowIDs[idxID], slices.Clone(it.UnsafeKey())) } require.NoError(t, it.Error()) require.NoError(t, it.Close()) @@ -122,31 +157,40 @@ func TestSimplifyTable(t *testing.T) { expTable: "CREATE TABLE t(a int UNIQUE KEY, b int, c int, UNIQUE INDEX idx_bc(b, c))", expColPerm: []int{0, 1, 2, 10}, }, + { + table: "CREATE TABLE t(a int, b int, c int, d int, INDEX idx_b(b), INDEX idx_c(c), UNIQUE INDEX idx_cd(c, d))", + colPerm: []int{0, 1, 2, 3, 10}, + expTable: "CREATE TABLE t(c int, d int, UNIQUE INDEX idx_cd(c, d))", + expColPerm: []int{2, 3, 10}, + }, } for _, tc := range testCases { p := parser.New() - tblInfo, err := dbutil.GetTableInfoBySQL(tc.table, p) + originalTblInfo, err := dbutil.GetTableInfoBySQL(tc.table, p) require.NoError(t, err) - actualTblInfo, actualColPerm := simplifyTable(tblInfo, tc.colPerm) - if tc.expTableHasNoCols { - require.Empty(t, actualTblInfo.Columns) - } else { - expTblInfo, err := dbutil.GetTableInfoBySQL(tc.expTable, p) - require.NoError(t, err) + // run twice to make sure originalTblInfo is not changed + for i := 0; i < 2; i++ { + actualTblInfo, actualColPerm := simplifyTable(originalTblInfo, tc.colPerm) + if tc.expTableHasNoCols { + require.Empty(t, actualTblInfo.Columns) + } else { + expTblInfo, err := dbutil.GetTableInfoBySQL(tc.expTable, p) + require.NoError(t, err) - require.Equal(t, len(expTblInfo.Columns), len(actualTblInfo.Columns)) - for i, col := range actualTblInfo.Columns { - require.Equal(t, expTblInfo.Columns[i].Name, col.Name) - require.Equal(t, expTblInfo.Columns[i].Offset, col.Offset) - } - - require.Equal(t, len(expTblInfo.Indices), len(actualTblInfo.Indices)) - for i, idxInfo := range actualTblInfo.Indices { - require.Equal(t, expTblInfo.Indices[i].Name, idxInfo.Name) - require.Equal(t, expTblInfo.Indices[i].Columns, idxInfo.Columns) + require.Equal(t, len(expTblInfo.Columns), len(actualTblInfo.Columns)) + for i, col := range actualTblInfo.Columns { + require.Equal(t, expTblInfo.Columns[i].Name, col.Name) + require.Equal(t, expTblInfo.Columns[i].Offset, col.Offset) + } + + require.Equal(t, len(expTblInfo.Indices), len(actualTblInfo.Indices)) + for i, idxInfo := range actualTblInfo.Indices { + require.Equal(t, expTblInfo.Indices[i].Name, idxInfo.Name) + require.Equal(t, expTblInfo.Indices[i].Columns, idxInfo.Columns) + } } + require.Equal(t, tc.expColPerm, actualColPerm) } - require.Equal(t, tc.expColPerm, actualColPerm) } } diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index 5c0e76a27d..62c4da53d0 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -17,6 +17,7 @@ package importer import ( "context" "database/sql" + "encoding/hex" "fmt" "path/filepath" "strings" @@ -47,6 +48,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/extsort" "github.com/pingcap/tidb/util/mathutil" "go.uber.org/multierr" @@ -198,8 +200,8 @@ func (tr *TableImporter) importTable( // 2. Do duplicate detection if needed if isLocalBackend(rc.cfg) && rc.cfg.TikvImporter.OnDuplicate != "" { _, uuid := backend.MakeUUID(tr.tableName, common.IndexEngineID) - workingDir := filepath.Join(rc.cfg.TikvImporter.SortedKVDir, uuid.String()+".dupdetect") - resultDir := filepath.Join(rc.cfg.TikvImporter.SortedKVDir, uuid.String()+".dupresult") + workingDir := filepath.Join(rc.cfg.TikvImporter.SortedKVDir, uuid.String()+local.DupDetectDirSuffix) + resultDir := filepath.Join(rc.cfg.TikvImporter.SortedKVDir, uuid.String()+local.DupResultDirSuffix) dupIgnoreRows, err := extsort.OpenDiskSorter(resultDir, &extsort.DiskSorterOptions{ Concurrency: rc.cfg.App.RegionConcurrency, @@ -210,13 +212,7 @@ func (tr *TableImporter) importTable( tr.dupIgnoreRows = dupIgnoreRows if cp.Status < checkpoints.CheckpointStatusDupDetected { - d := &dupDetector{ - tr: tr, - rc: rc, - cp: cp, - logger: tr.logger, - } - err := d.run(ctx, workingDir, dupIgnoreRows) + err := tr.preDeduplicate(ctx, rc, cp, workingDir) saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusDupDetected) if err := firstErr(err, saveCpErr); err != nil { return false, errors.Trace(err) @@ -1576,3 +1572,106 @@ func getDDLJobIDByQuery(ctx context.Context, db *sql.DB, wantQuery string) (int6 } return 0, errors.Trace(rows.Err()) } + +func (tr *TableImporter) preDeduplicate( + ctx context.Context, + rc *Controller, + cp *checkpoints.TableCheckpoint, + workingDir string, +) error { + d := &dupDetector{ + tr: tr, + rc: rc, + cp: cp, + logger: tr.logger, + } + originalErr := d.run(ctx, workingDir, tr.dupIgnoreRows) + if originalErr == nil { + return nil + } + + if !ErrDuplicateKey.Equal(originalErr) { + return errors.Trace(originalErr) + } + + var ( + idxName string + oneConflictMsg, otherConflictMsg string + ) + + // provide a more friendly error message + + dupErr := errors.Cause(originalErr).(*errors.Error) + conflictIdxID := dupErr.Args()[0].(int64) + if conflictIdxID == conflictOnHandle { + idxName = "PRIMARY" + } else { + for _, idxInfo := range tr.tableInfo.Core.Indices { + if idxInfo.ID == conflictIdxID { + idxName = idxInfo.Name.O + break + } + } + } + if idxName == "" { + tr.logger.Error("cannot find index name", zap.Int64("conflictIdxID", conflictIdxID)) + return errors.Trace(originalErr) + } + if !rc.cfg.Checkpoint.Enable { + return errors.Errorf("duplicate key in table %s caused by index `%s`, you can turn on checkpoint and re-run to see the conflicting rows", + tr.tableName, idxName) + } + conflictEncodedRowIDs := dupErr.Args()[1].([][]byte) + if len(conflictEncodedRowIDs) < 2 { + tr.logger.Error("invalid conflictEncodedRowIDs", zap.Int("len", len(conflictEncodedRowIDs))) + return errors.Trace(originalErr) + } + rowID := make([]int64, 2) + var err error + _, rowID[0], err = codec.DecodeComparableVarint(conflictEncodedRowIDs[0]) + if err != nil { + rowIDHex := hex.EncodeToString(conflictEncodedRowIDs[0]) + tr.logger.Error("failed to decode rowID", + zap.String("rowID", rowIDHex), + zap.Error(err)) + return errors.Trace(originalErr) + } + _, rowID[1], err = codec.DecodeComparableVarint(conflictEncodedRowIDs[1]) + if err != nil { + rowIDHex := hex.EncodeToString(conflictEncodedRowIDs[1]) + tr.logger.Error("failed to decode rowID", + zap.String("rowID", rowIDHex), + zap.Error(err)) + return errors.Trace(originalErr) + } + + tableCp, err := rc.checkpointsDB.Get(ctx, tr.tableName) + if err != nil { + tr.logger.Error("failed to get table checkpoint", zap.Error(err)) + return errors.Trace(err) + } + for _, engineCp := range tableCp.Engines { + for _, chunkCp := range engineCp.Chunks { + if chunkCp.Chunk.PrevRowIDMax <= rowID[0] && rowID[0] < chunkCp.Chunk.RowIDMax { + oneConflictMsg = fmt.Sprintf("row %d counting from offset %d in file %s", + rowID[0]-chunkCp.Chunk.PrevRowIDMax, + chunkCp.Chunk.Offset, + chunkCp.FileMeta.Path) + } + if chunkCp.Chunk.PrevRowIDMax <= rowID[1] && rowID[1] < chunkCp.Chunk.RowIDMax { + otherConflictMsg = fmt.Sprintf("row %d counting from offset %d in file %s", + rowID[1]-chunkCp.Chunk.PrevRowIDMax, + chunkCp.Chunk.Offset, + chunkCp.FileMeta.Path) + } + } + } + if oneConflictMsg == "" || otherConflictMsg == "" { + tr.logger.Error("cannot find conflict rows by rowID", + zap.Int64("rowID[0]", rowID[0]), + zap.Int64("rowID[1]", rowID[1])) + return errors.Trace(originalErr) + } + return errors.Errorf("duplicate entry for key '%s', a pair of conflicting rows are (%s, %s)", + idxName, oneConflictMsg, otherConflictMsg) +} diff --git a/br/tests/_utils/run_lightning_ctl b/br/tests/_utils/run_lightning_ctl index a20c30cdb9..a00e88a33c 100755 --- a/br/tests/_utils/run_lightning_ctl +++ b/br/tests/_utils/run_lightning_ctl @@ -24,7 +24,7 @@ bin/tidb-lightning-ctl.test -test.coverprofile="$TEST_DIR/cov.ctl.$TEST_NAME.$$. --tidb-port 4000 \ --pd-urls '127.0.0.1:2379' \ -d "tests/$TEST_NAME/data" \ - --sorted-kv-dir "$TEST_DIR/sorted" \ + --sorted-kv-dir "$TEST_DIR/$TEST_NAME.sorted" \ --enable-checkpoint=0 \ --check-requirements=0 \ "$@" diff --git a/br/tests/lightning_duplicate_detection_new/data/test.dup_detect.4.sql b/br/tests/lightning_duplicate_detection_new/data/test.dup_detect.4.sql index 2355193d11..e37c311e23 100644 --- a/br/tests/lightning_duplicate_detection_new/data/test.dup_detect.4.sql +++ b/br/tests/lightning_duplicate_detection_new/data/test.dup_detect.4.sql @@ -108,3 +108,4 @@ INSERT INTO `dup_detect` VALUES (141, '4Wtmr', 9061058647512236690, 'yfg', 51, 1, 411690808), (70, 'A21nJ', 2624269271790371549, 'v0281', 72, 0, 176061556), (66, 'WA4Lz', 5647568340668202073, 'hy9da3', 67, 3, 179726484); +(987, 'nEoKu', 7836621565948506759, 'y6', 48, 0, 177543185), diff --git a/br/tests/lightning_duplicate_detection_new/local-error.toml b/br/tests/lightning_duplicate_detection_new/local-error.toml index 0ae027c89d..7bdfadf58b 100644 --- a/br/tests/lightning_duplicate_detection_new/local-error.toml +++ b/br/tests/lightning_duplicate_detection_new/local-error.toml @@ -1,2 +1,5 @@ +[checkpoint] +driver = "mysql" + [tikv-importer] on-duplicate = "error" diff --git a/br/tests/lightning_duplicate_detection_new/local-limit-error-records.toml b/br/tests/lightning_duplicate_detection_new/local-limit-error-records.toml index bde37c8cea..bec29584bc 100644 --- a/br/tests/lightning_duplicate_detection_new/local-limit-error-records.toml +++ b/br/tests/lightning_duplicate_detection_new/local-limit-error-records.toml @@ -3,3 +3,4 @@ max-error-records = 50 [tikv-importer] on-duplicate = "replace" +add-index-by-sql = true diff --git a/br/tests/lightning_duplicate_detection_new/run.sh b/br/tests/lightning_duplicate_detection_new/run.sh index 7c3fa127f4..a0cde14e53 100755 --- a/br/tests/lightning_duplicate_detection_new/run.sh +++ b/br/tests/lightning_duplicate_detection_new/run.sh @@ -40,6 +40,14 @@ if [ "$expected_rows" != "$actual_rows" ] || [ "$expected_pks" != "$actual_pks" fi run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2" check_contains "count(*): 227" +run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2 WHERE error = ''" +check_contains "count(*): 0" +run_sql "SELECT * FROM lightning_task_info.conflict_error_v2 WHERE row_id = 12" +check_contains "(171,'yRxZE',9201592769833450947,'xs3d',5,4,283270321)" +check_contains "[kv:1062]Duplicate entry '171' for key 'dup_detect.PRIMARY'" +run_sql "SELECT * FROM lightning_task_info.conflict_error_v2 WHERE row_id = 1" +check_contains "(87,'nEoKu',7836621565948506759,'y6',48,0,177543185)" +check_contains "[kv:1062]Duplicate entry '0-177543185' for key 'dup_detect.uniq_col6_col7'" # 2. Test ignore strategy. cleanup @@ -56,20 +64,33 @@ if [ "$expected_rows" != "$actual_rows" ] || [ "$expected_pks" != "$actual_pks" exit 1 fi run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2" -check_contains "count(*): 227" +check_contains "count(*): 228" # 3. Test error strategy. cleanup -run_lightning --backend local --config "tests/$TEST_NAME/local-error.toml" --log-file "$LOG_FILE" || true -check_contains "duplicate key detected" "$LOG_FILE" +run_lightning --backend local --config "tests/$TEST_NAME/local-error.toml" --log-file "$LOG_FILE" 2>&1 | grep -q "duplicate key in table \`test\`.\`dup_detect\` caused by index .*, you can turn on checkpoint and re-run to see the conflicting rows" +run_lightning --backend local --config "tests/$TEST_NAME/local-error.toml" --log-file "$LOG_FILE" --enable-checkpoint=1 2>&1 | grep -q "duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are (row 1 counting from offset 0 in file test.dup_detect.1.sql, row 101 counting from offset 0 in file test.dup_detect.4.sql)" +check_contains "restore table \`test\`.\`dup_detect\` failed: duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are (row 1 counting from offset 0 in file test.dup_detect.1.sql, row 101 counting from offset 0 in file test.dup_detect.4.sql)" "$LOG_FILE" +run_lightning_ctl --enable-checkpoint=1 --backend local --config "tests/$TEST_NAME/local-error.toml" --checkpoint-error-destroy="\`test\`.\`dup_detect\`" +files_left=$(ls "$TEST_DIR/$TEST_NAME.sorted" | wc -l) +if [ "$files_left" -ne "0" ];then + echo "checkpoint-error-destroy has left some files" + ls "$TEST_DIR/$TEST_NAME.sorted" + exit 1 +fi +rm -rf "$TEST_DIR/$TEST_NAME.sorted" # 4. Test limit error records. cleanup run_lightning --backend local --config "tests/$TEST_NAME/local-limit-error-records.toml" --log-file "$LOG_FILE" run_sql "SELECT count(*) FROM test.dup_detect" -check_contains "count(*): 173" +check_contains "count(*): 174" run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2" check_contains "count(*): 50" +run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2 WHERE error LIKE '%PRIMARY%'" +check_contains "count(*): 49" +run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2 WHERE error LIKE '%uniq_col6_col7%'" +check_contains "count(*): 1" # 5. Test fail after duplicate detection. cleanup @@ -82,7 +103,7 @@ rm -f "$LOG_FILE" run_lightning_ctl --enable-checkpoint=1 --backend local --config "tests/$TEST_NAME/local-replace.toml" --checkpoint-error-ignore="\`test\`.\`dup_detect\`" run_lightning --enable-checkpoint=1 --backend local --config "tests/$TEST_NAME/local-replace.toml" --log-file "$LOG_FILE" run_sql "SELECT count(*) FROM test.dup_detect" -check_contains "count(*): 173" +check_contains "count(*): 174" run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2" check_contains "count(*): 227" check_not_contains "duplicate detection start" "$LOG_FILE" diff --git a/errors.toml b/errors.toml index 855fec6fc3..b3e461eb46 100644 --- a/errors.toml +++ b/errors.toml @@ -506,6 +506,11 @@ error = ''' system requirement not met ''' +["Lightning:PreDedup:ErrDuplicateKey"] +error = ''' +duplicate key detected on indexID %d of KeyID: %v +''' + ["Lightning:Restore:ErrAddIndexFailed"] error = ''' add index on table %s failed diff --git a/store/driver/txn/error.go b/store/driver/txn/error.go index 80543fe2f8..3ab33579a0 100644 --- a/store/driver/txn/error.go +++ b/store/driver/txn/error.go @@ -46,7 +46,8 @@ func genKeyExistsError(name string, value string, err error) error { return kv.ErrKeyExists.FastGenByArgs(value, name) } -func extractKeyExistsErrFromHandle(key kv.Key, value []byte, tblInfo *model.TableInfo) error { +// ExtractKeyExistsErrFromHandle returns a ErrKeyExists error from a handle key. +func ExtractKeyExistsErrFromHandle(key kv.Key, value []byte, tblInfo *model.TableInfo) error { name := tblInfo.Name.String() + ".PRIMARY" _, handle, err := tablecodec.DecodeRecordKey(key) if err != nil { @@ -109,7 +110,8 @@ func extractKeyExistsErrFromHandle(key kv.Key, value []byte, tblInfo *model.Tabl return genKeyExistsError(name, strings.Join(valueStr, "-"), nil) } -func extractKeyExistsErrFromIndex(key kv.Key, value []byte, tblInfo *model.TableInfo, indexID int64) error { +// ExtractKeyExistsErrFromIndex returns a ErrKeyExists error from a index key. +func ExtractKeyExistsErrFromIndex(key kv.Key, value []byte, tblInfo *model.TableInfo, indexID int64) error { var idxInfo *model.IndexInfo for _, index := range tblInfo.Indices { if index.ID == indexID { diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index b0a4b92946..9ce3ec75f0 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -333,9 +333,9 @@ func (txn *tikvTxn) extractKeyExistsErr(key kv.Key) error { } if isRecord { - return extractKeyExistsErrFromHandle(key, value, tblInfo) + return ExtractKeyExistsErrFromHandle(key, value, tblInfo) } - return extractKeyExistsErrFromIndex(key, value, tblInfo, indexID) + return ExtractKeyExistsErrFromIndex(key, value, tblInfo, indexID) } // SetAssertion sets an assertion for the key operation.