diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 34ee420c46..c526188bf5 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1761,6 +1761,12 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err if err = local.allocateTSIfNotExists(ctx, localEngine); err != nil { return errors.Trace(err) } + failpoint.Inject("mockAllocateTSErr", func() { + // mock generate timestamp error when reset engine. + localEngine.TS = 0 + mockGRPCErr, _ := status.FromError(errors.Errorf("mock generate timestamp error")) + failpoint.Return(errors.Trace(mockGRPCErr.Err())) + }) } localEngine.pendingFileSize.Store(0) diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index 34e9f9ab3f..1c25434974 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -318,9 +318,10 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error { clients = append(clients, wstream) allPeers = append(allPeers, peer) } + dataCommitTS := j.ingestData.GetTS() req.Chunk = &sst.WriteRequest_Batch{ Batch: &sst.WriteBatch{ - CommitTs: j.ingestData.GetTS(), + CommitTs: dataCommitTS, }, } @@ -407,7 +408,8 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error { logutil.Key("endKey", j.keyRange.End), logutil.Key("remainStart", remainingStartKey), logutil.Region(region), - logutil.Leader(j.region.Leader)) + logutil.Leader(j.region.Leader), + zap.Uint64("commitTS", dataCommitTS)) } break } diff --git a/pkg/ddl/ingest/backend.go b/pkg/ddl/ingest/backend.go index 3d4bf54a68..130faa4661 100644 --- a/pkg/ddl/ingest/backend.go +++ b/pkg/ddl/ingest/backend.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" lightning "github.com/pingcap/tidb/br/pkg/lightning/config" @@ -214,18 +215,45 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported } }() } - - logutil.Logger(bc.ctx).Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID), - zap.String("usage info", bc.diskRoot.UsageInfo())) - err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(lightning.SplitRegionSize)*int64(lightning.MaxSplitRegionSizeRatio), int64(lightning.SplitRegionKeys)) + err = bc.unsafeImportAndReset(ei) if err != nil { - logutil.Logger(bc.ctx).Error(LitErrIngestDataErr, zap.Int64("index ID", indexID), - zap.String("usage info", bc.diskRoot.UsageInfo())) return true, false, err } return true, true, nil } +func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error { + logutil.Logger(bc.ctx).Info(LitInfoUnsafeImport, zap.Int64("index ID", ei.indexID), + zap.String("usage info", bc.diskRoot.UsageInfo())) + logger := log.FromContext(bc.ctx).With( + zap.Stringer("engineUUID", ei.uuid), + ) + + ei.closedEngine = backend.NewClosedEngine(bc.backend, logger, ei.uuid, 0) + + regionSplitSize := int64(lightning.SplitRegionSize) * int64(lightning.MaxSplitRegionSizeRatio) + regionSplitKeys := int64(lightning.SplitRegionKeys) + if err := ei.closedEngine.Import(bc.ctx, regionSplitSize, regionSplitKeys); err != nil { + logutil.Logger(bc.ctx).Error(LitErrIngestDataErr, zap.Int64("index ID", ei.indexID), + zap.String("usage info", bc.diskRoot.UsageInfo())) + return err + } + + err := bc.backend.ResetEngine(bc.ctx, ei.uuid) + if err != nil { + logutil.Logger(bc.ctx).Error(LitErrResetEngineFail, zap.Int64("index ID", ei.indexID)) + err1 := ei.closedEngine.Cleanup(bc.ctx) + if err1 != nil { + logutil.Logger(ei.ctx).Error(LitErrCleanEngineErr, zap.Error(err1), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + } + ei.openedEngine = nil + ei.closedEngine = nil + return err + } + return nil +} + // ForceSyncFlagForTest is a flag to force sync only for test. var ForceSyncFlagForTest = false diff --git a/pkg/ddl/ingest/message.go b/pkg/ddl/ingest/message.go index 0f74de19dd..1217244c6f 100644 --- a/pkg/ddl/ingest/message.go +++ b/pkg/ddl/ingest/message.go @@ -42,6 +42,7 @@ const ( LitErrCloseWriterErr string = "close writer error" LitErrReadSortPath string = "cannot read sort path" LitErrCleanSortPath string = "cannot cleanup sort path" + LitErrResetEngineFail string = "reset engine failed" LitWarnEnvInitFail string = "initialize environment failed" LitWarnConfigError string = "build config for backend failed" LitInfoEnvInitSucc string = "init global ingest backend environment finished" diff --git a/tests/realtikvtest/addindextest1/BUILD.bazel b/tests/realtikvtest/addindextest1/BUILD.bazel index 448de5e3da..9126f15f9b 100644 --- a/tests/realtikvtest/addindextest1/BUILD.bazel +++ b/tests/realtikvtest/addindextest1/BUILD.bazel @@ -15,8 +15,12 @@ go_test( "//pkg/disttask/framework/dispatcher", "//pkg/disttask/framework/proto", "//pkg/errno", + "//pkg/kv", "//pkg/parser/model", + "//pkg/store/helper", + "//pkg/tablecodec", "//pkg/testkit", + "//pkg/types", "//tests/realtikvtest", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", diff --git a/tests/realtikvtest/addindextest1/disttask_test.go b/tests/realtikvtest/addindextest1/disttask_test.go index eca65ea88f..27715a0613 100644 --- a/tests/realtikvtest/addindextest1/disttask_test.go +++ b/tests/realtikvtest/addindextest1/disttask_test.go @@ -24,8 +24,12 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/dispatcher" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/errno" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/store/helper" + "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" ) @@ -229,3 +233,53 @@ func TestAddIndexForCurrentTimestampColumn(t *testing.T) { tk.MustExec("alter table t add index idx(a);") tk.MustExec("admin check table t;") } + +func TestAddIndexTSErrorWhenResetImportEngine(t *testing.T) { + store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) + var tblInfo *model.TableInfo + var idxInfo *model.IndexInfo + cb := &callback.TestDDLCallback{} + interceptFn := func(job *model.Job) { + if idxInfo == nil { + tbl, _ := dom.InfoSchema().TableByID(job.TableID) + tblInfo = tbl.Meta() + if len(tblInfo.Indices) == 0 { + return + } + idxInfo = tblInfo.Indices[0] + } + } + cb.OnJobUpdatedExported.Store(&interceptFn) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + t.Cleanup(func() { + tk.MustExec("set global tidb_enable_dist_task = off;") + }) + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("set global tidb_enable_dist_task = on;") + + err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockAllocateTSErr", `1*return`) + require.NoError(t, err) + tk.MustExec("create table t (a int);") + tk.MustExec("insert into t values (1), (2), (3);") + dom.DDL().SetHook(cb) + tk.MustExec("alter table t add index idx(a);") + err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockAllocateTSErr") + require.NoError(t, err) + + dts := []types.Datum{types.NewIntDatum(1)} + sctx := tk.Session().GetSessionVars().StmtCtx + idxKey, _, err := tablecodec.GenIndexKey(sctx, tblInfo, idxInfo, tblInfo.ID, dts, kv.IntHandle(1), nil) + require.NoError(t, err) + + tikvStore := dom.Store().(helper.Storage) + newHelper := helper.NewHelper(tikvStore) + mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, 0) + require.NoError(t, err) + require.NotNil(t, mvccResp) + require.NotNil(t, mvccResp.Info) + require.Greater(t, len(mvccResp.Info.Writes), 0) + require.Greater(t, mvccResp.Info.Writes[0].CommitTs, uint64(0)) +} diff --git a/tests/realtikvtest/addindextest4/ingest_test.go b/tests/realtikvtest/addindextest4/ingest_test.go index 8b5b9683aa..8d2ab0c8b0 100644 --- a/tests/realtikvtest/addindextest4/ingest_test.go +++ b/tests/realtikvtest/addindextest4/ingest_test.go @@ -399,6 +399,7 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) { tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) tk.MustExec("set global tidb_ddl_reorg_worker_cnt=1;") + tk.MustExec("set global tidb_enable_dist_task = 0;") tk.MustExec("create table t(id int primary key, b int, k int);") tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1"))