From b6628ea83528ed90e7ea91c006c4a67d0c97f302 Mon Sep 17 00:00:00 2001 From: Jiaqiang Huang Date: Mon, 11 Aug 2025 17:02:45 +0800 Subject: [PATCH] ddl, disttask: import into integrate table mode (#61200) close pingcap/tidb#61199 --- pkg/ddl/index.go | 49 ++++-- pkg/ddl/table_mode.go | 11 ++ pkg/ddl/table_mode_test.go | 2 + pkg/ddl/testutil/testutil.go | 7 +- pkg/disttask/importinto/BUILD.bazel | 5 +- .../importinto/{clean_s3.go => clean_up.go} | 30 +++- pkg/disttask/importinto/job.go | 9 ++ pkg/disttask/importinto/subtask_executor.go | 17 ++- pkg/disttask/importinto/task_executor.go | 37 ++++- .../importinto/task_executor_testkit_test.go | 3 + pkg/executor/importer/table_import.go | 3 + pkg/planner/core/optimizer.go | 7 + tests/realtikvtest/importintotest/BUILD.bazel | 6 + .../importintotest/import_into_test.go | 140 +++++++++++++++++- .../importintotest4/manual_recovery_test.go | 15 ++ 15 files changed, 312 insertions(+), 29 deletions(-) rename pkg/disttask/importinto/{clean_s3.go => clean_up.go} (69%) diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 1c1c904742..c2eba4a6a2 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1256,7 +1256,38 @@ func checkIfTableReorgWorkCanSkip( ctx := NewReorgContext() ctx.resourceGroupName = job.ReorgMeta.ResourceGroupName ctx.setDDLLabelForTopSQL(job.Query) - return checkIfTableIsEmpty(ctx, store, tbl, startTS) + if isEmpty, err := checkIfTableIsEmpty(ctx, store, tbl, startTS); err != nil || !isEmpty { + return false + } + return true +} + +// CheckImportIntoTableIsEmpty check import into table is empty or not. +func CheckImportIntoTableIsEmpty( + store kv.Storage, + sessCtx sessionctx.Context, + tbl table.Table, +) (bool, error) { + failpoint.Inject("checkImportIntoTableIsEmpty", func(_val failpoint.Value) { + if val, ok := _val.(string); ok { + switch val { + case "error": + failpoint.Return(false, errors.New("check is empty get error")) + case "notEmpty": + failpoint.Return(false, nil) + } + } + }) + txn, err := sessCtx.Txn(true) + if err != nil { + return false, err + } + validTxn := txn != nil && txn.Valid() + if !validTxn { + return false, errors.New("check if table is empty failed") + } + startTS := txn.StartTS() + return checkIfTableIsEmpty(NewReorgContext(), store, tbl, startTS) } func checkIfTableIsEmpty( @@ -1264,15 +1295,15 @@ func checkIfTableIsEmpty( store kv.Storage, tbl table.Table, startTS uint64, -) bool { +) (bool, error) { if pTbl, ok := tbl.(table.PartitionedTable); ok { for _, pid := range pTbl.GetAllPartitionIDs() { pTbl := pTbl.GetPartition(pid) - if !checkIfPhysicalTableIsEmpty(ctx, store, pTbl, startTS) { - return false + if isEmpty, err := checkIfPhysicalTableIsEmpty(ctx, store, pTbl, startTS); err != nil || !isEmpty { + return false, err } } - return true + return true, nil } //nolint:forcetypeassert plainTbl := tbl.(table.PhysicalTable) @@ -1284,14 +1315,14 @@ func checkIfPhysicalTableIsEmpty( store kv.Storage, tbl table.PhysicalTable, startTS uint64, -) bool { +) (bool, error) { hasRecord, err := existsTableRow(ctx, store, tbl, startTS) intest.Assert(err == nil) if err != nil { - logutil.DDLLogger().Info("check if table is empty failed", zap.Error(err)) - return false + logutil.DDLLogger().Warn("check if table is empty failed", zap.Error(err)) + return false, err } - return !hasRecord + return !hasRecord, nil } func checkIfTempIndexReorgWorkCanSkip( diff --git a/pkg/ddl/table_mode.go b/pkg/ddl/table_mode.go index 7659946f1f..b3d7e1ada8 100644 --- a/pkg/ddl/table_mode.go +++ b/pkg/ddl/table_mode.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx" ) // onAlterTableMode should only be called by alterTableMode, will call updateVersionAndTableInfo @@ -87,3 +88,13 @@ func validateTableMode(origin, target model.TableMode) bool { } return true } + +// AlterTableMode creates a DDL job for alter table mode. +func AlterTableMode(de Executor, sctx sessionctx.Context, mode model.TableMode, schemaID, tableID int64) error { + args := &model.AlterTableModeArgs{ + TableMode: mode, + SchemaID: schemaID, + TableID: tableID, + } + return de.AlterTableMode(sctx, args) +} diff --git a/pkg/ddl/table_mode_test.go b/pkg/ddl/table_mode_test.go index 52153dfa99..7ec2d6af68 100644 --- a/pkg/ddl/table_mode_test.go +++ b/pkg/ddl/table_mode_test.go @@ -101,6 +101,8 @@ func TestTableModeBasic(t *testing.T) { tk.MustExec("create view t1_restore_import_view as select * from t1_restore_import") tk.MustExec("create table foreign_key_child(id int, pid INT, INDEX idx_pid (pid),FOREIGN KEY (pid) REFERENCES t1_restore_import(c1) ON DELETE CASCADE)") tk.MustExec("drop table foreign_key_child") + // special case allow admin checksum table for import into + tk.MustExec("admin checksum table t1_restore_import;") // For testing below stmt is not allowed when table is in ModeImport/ModeRestore // DMLs diff --git a/pkg/ddl/testutil/testutil.go b/pkg/ddl/testutil/testutil.go index b893780d28..af5f01ab21 100644 --- a/pkg/ddl/testutil/testutil.go +++ b/pkg/ddl/testutil/testutil.go @@ -174,12 +174,7 @@ func SetTableMode( tblInfo *model.TableInfo, mode model.TableMode, ) error { - args := &model.AlterTableModeArgs{ - TableMode: mode, - SchemaID: dbInfo.ID, - TableID: tblInfo.ID, - } - err := de.AlterTableMode(ctx, args) + err := ddl.AlterTableMode(de, ctx, mode, dbInfo.ID, tblInfo.ID) if err == nil { checkTableState(t, store, dbInfo, tblInfo, model.StatePublic) CheckTableMode(t, store, dbInfo, tblInfo, mode) diff --git a/pkg/disttask/importinto/BUILD.bazel b/pkg/disttask/importinto/BUILD.bazel index 6b07dee50d..661dad5452 100644 --- a/pkg/disttask/importinto/BUILD.bazel +++ b/pkg/disttask/importinto/BUILD.bazel @@ -3,7 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "importinto", srcs = [ - "clean_s3.go", + "clean_up.go", "encode_and_sort_operator.go", "job.go", "metrics.go", @@ -22,6 +22,7 @@ go_library( "//br/pkg/utils", "//pkg/config", "//pkg/config/kerneltype", + "//pkg/ddl", "//pkg/disttask/framework/handle", "//pkg/disttask/framework/planner", "//pkg/disttask/framework/proto", @@ -30,6 +31,7 @@ go_library( "//pkg/disttask/framework/taskexecutor", "//pkg/disttask/framework/taskexecutor/execute", "//pkg/disttask/operator", + "//pkg/domain", "//pkg/domain/infosync", "//pkg/domain/serverinfo", "//pkg/executor/importer", @@ -60,6 +62,7 @@ go_library( "//pkg/types", "//pkg/util", "//pkg/util/backoff", + "//pkg/util/dbterror/exeerrors", "//pkg/util/disttask", "//pkg/util/logutil", "//pkg/util/promutil", diff --git a/pkg/disttask/importinto/clean_s3.go b/pkg/disttask/importinto/clean_up.go similarity index 69% rename from pkg/disttask/importinto/clean_s3.go rename to pkg/disttask/importinto/clean_up.go index 352cc76353..e7325bd702 100644 --- a/pkg/disttask/importinto/clean_s3.go +++ b/pkg/disttask/importinto/clean_up.go @@ -19,27 +19,33 @@ import ( "encoding/json" "strconv" + "github.com/pingcap/tidb/pkg/config/kerneltype" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" + "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/executor/importer" "github.com/pingcap/tidb/pkg/lightning/backend/external" "github.com/pingcap/tidb/pkg/lightning/log" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) -var _ scheduler.CleanUpRoutine = (*ImportCleanUpS3)(nil) +var _ scheduler.CleanUpRoutine = (*ImportCleanUp)(nil) -// ImportCleanUpS3 implements scheduler.CleanUpRoutine. -type ImportCleanUpS3 struct { +// ImportCleanUp implements scheduler.CleanUpRoutine. +type ImportCleanUp struct { } func newImportCleanUpS3() scheduler.CleanUpRoutine { - return &ImportCleanUpS3{} + return &ImportCleanUp{} } // CleanUp implements the CleanUpRoutine.CleanUp interface. -func (*ImportCleanUpS3) CleanUp(ctx context.Context, task *proto.Task) error { +func (*ImportCleanUp) CleanUp(ctx context.Context, task *proto.Task) error { // we can only clean up files after all write&ingest subtasks are finished, // since they might share the same file. taskMeta := &TaskMeta{} @@ -48,7 +54,19 @@ func (*ImportCleanUpS3) CleanUp(ctx context.Context, task *proto.Task) error { return err } defer redactSensitiveInfo(task, taskMeta) - // Not use cloud storage, no need to clean up. + + if kerneltype.IsClassic() { + taskManager, err := storage.GetTaskManager() + if err != nil { + return err + } + if err = taskManager.WithNewTxn(ctx, func(se sessionctx.Context) error { + return ddl.AlterTableMode(domain.GetDomain(se).DDLExecutor(), se, model.TableModeNormal, taskMeta.Plan.DBID, taskMeta.Plan.TableInfo.ID) + }); err != nil { + return err + } + } + // Not use cloud storage, no need to cleanUp. if taskMeta.Plan.CloudStorageURI == "" { return nil } diff --git a/pkg/disttask/importinto/job.go b/pkg/disttask/importinto/job.go index 92c0a3de69..3f64a021b7 100644 --- a/pkg/disttask/importinto/job.go +++ b/pkg/disttask/importinto/job.go @@ -26,16 +26,19 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/config/kerneltype" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/planner" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" + "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/domain/serverinfo" "github.com/pingcap/tidb/pkg/executor/importer" "github.com/pingcap/tidb/pkg/keyspace" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/types" @@ -95,6 +98,12 @@ func doSubmitTask(ctx context.Context, plan *importer.Plan, stmt string, instanc if err2 != nil { return err2 } + if kerneltype.IsClassic() { + err2 = ddl.AlterTableMode(domain.GetDomain(se).DDLExecutor(), se, model.TableModeImport, plan.DBID, plan.TableInfo.ID) + if err2 != nil { + return err2 + } + } // in classical kernel or if we are inside SYSTEM keyspace itself, we // submit the task to DXF in the same transaction as creating the job. if kerneltype.IsClassic() || config.GetGlobalKeyspaceName() == keyspace.System { diff --git a/pkg/disttask/importinto/subtask_executor.go b/pkg/disttask/importinto/subtask_executor.go index 2c33097393..16d48c09d6 100644 --- a/pkg/disttask/importinto/subtask_executor.go +++ b/pkg/disttask/importinto/subtask_executor.go @@ -20,9 +20,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config/kerneltype" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" + "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/executor/importer" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/lightning/backend" @@ -30,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/checkpoints" "github.com/pingcap/tidb/pkg/lightning/log" verify "github.com/pingcap/tidb/pkg/lightning/verification" + "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util/logutil" @@ -154,10 +157,22 @@ func (p *postProcessStepExecutor) postProcess(ctx context.Context, subtaskMeta * return err } return taskManager.WithNewSession(func(se sessionctx.Context) error { - return importer.VerifyChecksum(ctx, plan, localChecksum.MergedChecksum(), logger, + err = importer.VerifyChecksum(ctx, plan, localChecksum.MergedChecksum(), logger, func() (*local.RemoteChecksum, error) { return importer.RemoteChecksumTableBySQL(ctx, se, plan, logger) }, ) + if kerneltype.IsClassic() { + failpoint.Inject("skipPostProcessAlterTableMode", func() { + failpoint.Return(err) + }) + // log error instead of raise error to avoid user rerun task, + // clean up will alter table mode to normal finally. + err2 := ddl.AlterTableMode(domain.GetDomain(se).DDLExecutor(), se, model.TableModeNormal, p.taskMeta.Plan.DBID, p.taskMeta.Plan.TableInfo.ID) + if err2 != nil { + callLog.Warn("alter table mode to normal failure", zap.Error(err2)) + } + } + return err }) } diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 77ff86b015..1114a5c332 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -28,6 +28,8 @@ import ( brlogutil "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/storage" tidbconfig "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/config/kerneltype" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/disttask/framework/proto" dxfstorage "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" @@ -48,6 +50,7 @@ import ( "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table/tables" + "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -103,14 +106,44 @@ func getTableImporter( return importer.NewTableImporter(ctx, controller, strconv.FormatInt(taskID, 10), store) } -func (s *importStepExecutor) Init(ctx context.Context) error { +func (s *importStepExecutor) Init(ctx context.Context) (err error) { s.logger.Info("init subtask env") - tableImporter, err := getTableImporter(ctx, s.taskID, s.taskMeta, s.store) + var tableImporter *importer.TableImporter + var taskManager *dxfstorage.TaskManager + tableImporter, err = getTableImporter(ctx, s.taskID, s.taskMeta, s.store) if err != nil { return err } s.tableImporter = tableImporter + defer func() { + if err == nil { + return + } + if err2 := s.tableImporter.Close(); err2 != nil { + s.logger.Warn("close importer failed", zap.Error(err2)) + } + }() + if kerneltype.IsClassic() { + taskManager, err = dxfstorage.GetTaskManager() + if err != nil { + return err + } + if err = taskManager.WithNewTxn(ctx, func(se sessionctx.Context) error { + // User can write table between precheck and alter table mode to Import, + // so check table emptyness again. + isEmpty, err2 := ddl.CheckImportIntoTableIsEmpty(s.store, se, s.tableImporter.Table) + if err2 != nil { + return err2 + } + if !isEmpty { + return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs("target table is not empty") + } + return nil + }); err != nil { + return err + } + } // we need this sub context since Cleanup which wait on this routine is called // before parent context is canceled in normal flow. s.importCtx, s.importCancel = context.WithCancel(ctx) diff --git a/pkg/disttask/importinto/task_executor_testkit_test.go b/pkg/disttask/importinto/task_executor_testkit_test.go index b7998fd64a..ae3670e663 100644 --- a/pkg/disttask/importinto/task_executor_testkit_test.go +++ b/pkg/disttask/importinto/task_executor_testkit_test.go @@ -59,10 +59,13 @@ func TestPostProcessStepExecutor(t *testing.T) { dom, err := session.GetDomain(store) require.NoError(t, err) + db, ok := dom.InfoSchema().SchemaByName(ast.NewCIStr("test")) + require.True(t, ok) table, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t")) require.NoError(t, err) taskMeta := &importinto.TaskMeta{ Plan: importer.Plan{ + DBID: db.ID, Checksum: config.OpLevelRequired, TableInfo: table.Meta(), DesiredTableInfo: table.Meta(), diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index 3d328a6f94..70588b2897 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -861,6 +861,9 @@ func VerifyChecksum(ctx context.Context, plan *Plan, localChecksum verify.KVChec failpoint.Inject("waitCtxDone", func() { <-ctx.Done() }) + failpoint.Inject("retryableError", func() { + failpoint.Return(common.ErrWriteTooSlow) + }) remoteChecksum, err := getRemoteChecksumFn() if err != nil { diff --git a/pkg/planner/core/optimizer.go b/pkg/planner/core/optimizer.go index 1be5641836..49ff20ee4e 100644 --- a/pkg/planner/core/optimizer.go +++ b/pkg/planner/core/optimizer.go @@ -262,6 +262,13 @@ func CheckTableMode(node *resolve.NodeW) error { switch node.Node.(type) { case *ast.ShowStmt, *ast.ExplainStmt: default: + // Special handling to the `ADMIN CHECKSUM TABLE`, as `IMPORT INTO` will + // executes this statement during post checksum to verify data. + // TODO: only allow `ADMIN CHECKSUM TABLE` from import into task + adminStmt, ok := node.Node.(*ast.AdminStmt) + if ok && adminStmt.Tp == ast.AdminChecksumTable { + return nil + } for _, tblNameW := range node.GetResolveContext().GetTableNames() { if err := dbutil.CheckTableModeIsNormal(tblNameW.Name, tblNameW.TableInfo.Mode); err != nil { return err diff --git a/tests/realtikvtest/importintotest/BUILD.bazel b/tests/realtikvtest/importintotest/BUILD.bazel index cf811bcb59..a0f3a6e3d3 100644 --- a/tests/realtikvtest/importintotest/BUILD.bazel +++ b/tests/realtikvtest/importintotest/BUILD.bazel @@ -23,12 +23,17 @@ go_test( "//pkg/config/kerneltype", "//pkg/disttask/framework/proto", "//pkg/disttask/framework/storage", + "//pkg/disttask/framework/taskexecutor", "//pkg/disttask/framework/testutil", "//pkg/disttask/importinto", + "//pkg/domain", "//pkg/domain/infosync", "//pkg/executor/importer", + "//pkg/infoschema", "//pkg/kv", "//pkg/lightning/backend/local", + "//pkg/lightning/common", + "//pkg/parser/ast", "//pkg/parser/auth", "//pkg/parser/terror", "//pkg/planner/core", @@ -37,6 +42,7 @@ go_test( "//pkg/testkit/testfailpoint", "//pkg/util/dbterror/exeerrors", "//pkg/util/dbterror/plannererrors", + "//pkg/util/domainutil", "//pkg/util/sem", "//tests/realtikvtest", "//tests/realtikvtest/testutils", diff --git a/tests/realtikvtest/importintotest/import_into_test.go b/tests/realtikvtest/importintotest/import_into_test.go index c7553af7fc..f1e1465abe 100644 --- a/tests/realtikvtest/importintotest/import_into_test.go +++ b/tests/realtikvtest/importintotest/import_into_test.go @@ -37,17 +37,22 @@ import ( "github.com/pingcap/tidb/pkg/config/kerneltype" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" "github.com/pingcap/tidb/pkg/disttask/importinto" + "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/executor/importer" + "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/lightning/backend/local" + "github.com/pingcap/tidb/pkg/lightning/common" + "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/auth" "github.com/pingcap/tidb/pkg/parser/terror" plannercore "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" - "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" + "github.com/pingcap/tidb/pkg/util/domainutil" "github.com/pingcap/tidb/pkg/util/sem" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/util" @@ -1033,6 +1038,7 @@ func (s *mockGCSSuite) TestRegisterTask() { err := s.tk.QueryToErr(sql) s.Error(err) s.Greater(unregisterTime, registerTime) + s.checkMode(s.tk, "SELECT * FROM load_data.register_task", "register_task", true) client, err := importer.GetEtcdClient() s.NoError(err) @@ -1047,9 +1053,9 @@ func (s *mockGCSSuite) TestRegisterTask() { func() { // cannot run 2 import job to the same target table. tk2 := testkit.NewTestKit(s.T(), s.store) - err = tk2.QueryToErr(sql) - s.ErrorIs(err, exeerrors.ErrLoadDataPreCheckFailed) - s.ErrorContains(err, "there is active job on the target table already") + err = tk2.ExecToErr(sql) + s.ErrorIs(err, infoschema.ErrProtectedTableMode) + s.ErrorContains(err, "Table register_task is in mode Import") etcdKey = fmt.Sprintf("/tidb/brie/import/import-into/%d", storage.TestLastTaskID.Load()) s.Eventually(func() bool { resp, err2 := client.Get(context.Background(), etcdKey) @@ -1317,3 +1323,129 @@ func (s *mockGCSSuite) TestImportIntoWithFK() { s.tk.MustQuery(sql) s.tk.MustQuery("SELECT * FROM import_into.child;").Check(testkit.Rows("1 1", "2 2")) } + +func (s *mockGCSSuite) TestTableMode() { + content := []byte(`1,1 + 2,2`) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "table-mode-test", + Name: "data.csv", + }, + Content: content, + }) + dbName := "import_into" + s.prepareAndUseDB(dbName) + tk2 := testkit.NewTestKit(s.T(), s.store) + tk2.MustExec("use " + dbName) + createTableSQL := "create table table_mode (id int primary key, fk int);" + s.tk.MustExec(createTableSQL) + loadDataSQL := fmt.Sprintf(`IMPORT INTO table_mode + FROM 'gs://table-mode-test/data.csv?endpoint=%s'`, gcsEndpoint) + query := "SELECT * FROM table_mode" + + // Test import into clean up can alter table mode to Normal finally. + testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/disttask/importinto/skipPostProcessAlterTableMode", `return`) + s.tk.MustQuery(loadDataSQL) + s.checkMode(s.tk, query, "table_mode", true) + s.tk.MustQuery(query).Check(testkit.Rows([]string{"1 1", "2 2"}...)) + testfailpoint.Disable(s.T(), "github.com/pingcap/tidb/pkg/disttask/importinto/skipPostProcessAlterTableMode") + + // Test import into post process will alter table mode to Normal. + s.tk.MustExec("truncate table table_mode") + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer func() { + testfailpoint.Disable(s.T(), "github.com/pingcap/tidb/pkg/disttask/importinto/waitBeforePostProcess") + wg.Done() + }() + testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/disttask/importinto/waitBeforePostProcess", "return") + s.tk.MustQuery(loadDataSQL) + }() + go func() { + defer wg.Done() + // Make sure table mode is Import. + s.checkMode(tk2, query, "table_mode", false) + s.checkMode(tk2, query, "table_mode", true) + tk2.MustQuery(query).Check(testkit.Rows([]string{"1 1", "2 2"}...)) + }() + wg.Wait() + + // Test occur retryable error when checksum + s.tk.MustExec("truncate table table_mode") + testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/executor/importer/retryableError", `return`) + getError := false + testfailpoint.EnableCall(s.T(), "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterRunSubtask", + func(e taskexecutor.TaskExecutor, errP *error, _ context.Context) { + if errP != nil && *errP == common.ErrWriteTooSlow { + getError = true + testfailpoint.Disable(s.T(), "github.com/pingcap/tidb/pkg/executor/importer/retryableError") + } + }, + ) + s.tk.MustQuery(loadDataSQL) + s.tk.MustQuery(query).Sort().Check(testkit.Rows([]string{"1 1", "2 2"}...)) + require.True(s.T(), getError) + + // Test import into check table is empty get error. + s.tk.MustExec("truncate table table_mode") + testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/ddl/checkImportIntoTableIsEmpty", `return("error")`) + err := s.tk.QueryToErr(loadDataSQL) + s.ErrorContains(err, "check is empty get error") + s.checkMode(s.tk, query, "table_mode", true) + s.tk.MustQuery("SELECT * FROM table_mode;").Sort().Check(testkit.Rows([]string{}...)) + testfailpoint.Disable(s.T(), "github.com/pingcap/tidb/pkg/ddl/checkImportIntoTableIsEmpty") + + // Test import into check table is not empty between precheck and + // alter table mode to Import. + testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/ddl/checkImportIntoTableIsEmpty", `return("notEmpty")`) + err = s.tk.QueryToErr(loadDataSQL) + s.ErrorContains(err, "PreCheck failed: target table is not empty") + s.checkMode(s.tk, query, "table_mode", true) + s.tk.MustQuery(query).Sort().Check(testkit.Rows([]string{}...)) + testfailpoint.Disable(s.T(), "github.com/pingcap/tidb/pkg/ddl/checkImportIntoTableIsEmpty") + + // Test admin repair table can reset table mode to normal. + wg = sync.WaitGroup{} + wg.Add(2) + go func() { + defer func() { + testfailpoint.Disable(s.T(), "github.com/pingcap/tidb/pkg/disttask/importinto/waitBeforePostProcess") + wg.Done() + }() + testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/disttask/importinto/waitBeforePostProcess", "return") + s.tk.MustQuery(loadDataSQL) + }() + go func() { + defer wg.Done() + // Make sure table mode is Import. + s.checkMode(tk2, query, "table_mode", false) + s.adminRepairTable(dbName, "table_mode", createTableSQL) + tk2.EventuallyMustQueryAndCheck(query, nil, testkit.Rows([]string{"1 1", "2 2"}...), 10*time.Second, 100*time.Millisecond) + }() + wg.Wait() +} + +func (s *mockGCSSuite) adminRepairTable(db, table, createTableSQL string) { + domainutil.RepairInfo.SetRepairMode(true) + domainutil.RepairInfo.SetRepairTableList([]string{db + "." + table}) + dom := domain.GetDomain(s.tk.Session()) + dbInfo, ok := dom.InfoCache().GetLatest().SchemaByName(ast.NewCIStr(db)) + require.True(s.T(), ok) + tableInfo, err := dom.InfoCache().GetLatest().TableByName(context.Background(), ast.NewCIStr(db), ast.NewCIStr(table)) + s.NoError(err) + domainutil.RepairInfo.CheckAndFetchRepairedTable(dbInfo, tableInfo.Meta()) + s.tk.MustExec("admin repair table " + table + " " + createTableSQL) +} + +func (s *mockGCSSuite) checkMode(tk *testkit.TestKit, sql, tableName string, expect bool) { + require.Eventually(s.T(), func() bool { + err := tk.ExecToErr(sql) + if err != nil { + s.ErrorContains(err, "Table "+tableName+" is in mode Import") + return !expect + } + return expect + }, 10*time.Second, 100*time.Millisecond) +} diff --git a/tests/realtikvtest/importintotest4/manual_recovery_test.go b/tests/realtikvtest/importintotest4/manual_recovery_test.go index ff32b52566..46c1bdea7f 100644 --- a/tests/realtikvtest/importintotest4/manual_recovery_test.go +++ b/tests/realtikvtest/importintotest4/manual_recovery_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "strconv" + "time" "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/pingcap/tidb/pkg/disttask/framework/handle" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/disttask/importinto" plannercore "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/testkit" + "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/util" ) @@ -70,6 +72,7 @@ func (s *mockGCSSuite) TestResolutionFailTheTask() { return t.State == proto.TaskStateReverted }) s.NoError(err) + s.checkMode(s.tk, "SELECT * FROM t", "t", true) s.tk.MustQuery("select * from t").Check(testkit.Rows()) } @@ -80,6 +83,7 @@ func (s *mockGCSSuite) TestResolutionCancelTheTask() { return t.State == proto.TaskStateReverted }) s.NoError(err) + s.checkMode(s.tk, "SELECT * FROM t", "t", true) s.tk.MustQuery("select * from t").Check(testkit.Rows()) } @@ -94,3 +98,14 @@ func (s *mockGCSSuite) TestResolutionSuccessAfterManualChangeData() { s.NoError(handle.WaitTaskDoneOrPaused(ctx, task.ID)) s.tk.MustQuery("select * from t").Check(testkit.Rows("1 2")) } + +func (s *mockGCSSuite) checkMode(tk *testkit.TestKit, sql, tableName string, expect bool) { + require.Eventually(s.T(), func() bool { + err := tk.QueryToErr(sql) + if err != nil { + s.ErrorContains(err, "Table "+tableName+" is in mode Import") + return !expect + } + return expect + }, 10*time.Second, 100*time.Millisecond) +}