diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index f7f5b48e50..e862693346 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -77,7 +77,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 28, + shard_count = 31, deps = [ ":ingest", "//pkg/config", diff --git a/pkg/ddl/ingest/env_test.go b/pkg/ddl/ingest/env_test.go index e2e3dd5f00..06493a9ea0 100644 --- a/pkg/ddl/ingest/env_test.go +++ b/pkg/ddl/ingest/env_test.go @@ -29,6 +29,8 @@ import ( func TestGenLightningDataDir(t *testing.T) { tmpDir := t.TempDir() port, iPort := "5678", uint(5678) + restore := config.RestoreFunc() + t.Cleanup(restore) config.UpdateGlobal(func(conf *config.Config) { conf.TempDir = tmpDir conf.Port = iPort diff --git a/pkg/ddl/ingest/integration_test.go b/pkg/ddl/ingest/integration_test.go index 2010f83956..98a03ac5f1 100644 --- a/pkg/ddl/ingest/integration_test.go +++ b/pkg/ddl/ingest/integration_test.go @@ -15,13 +15,17 @@ package ingest_test import ( + "encoding/json" "fmt" + "net" + "strconv" "strings" "sync" "sync/atomic" "testing" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/config/kerneltype" "github.com/pingcap/tidb/pkg/ddl/ingest" ingesttestutil "github.com/pingcap/tidb/pkg/ddl/ingest/testutil" @@ -381,7 +385,7 @@ func TestAddIndexDuplicateMessage(t *testing.T) { var errDML error var once sync.Once - failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/ingest/afterMockWriterWriteRow", func() { + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/afterMockWriterWriteRow", func() { once.Do(func() { _, errDML = tk1.Exec("insert into t values (2, 1, 2);") }) @@ -401,6 +405,11 @@ func TestMultiSchemaAddIndexMerge(t *testing.T) { tk2 := testkit.NewTestKit(t, store) tk2.MustExec("use test") + oldMockExecAfterWriteRow := ingest.MockExecAfterWriteRow + t.Cleanup(func() { + ingest.MockExecAfterWriteRow = oldMockExecAfterWriteRow + }) + for _, createTableSQL := range []string{ "create table t (a int, b int);", "create table t (a int, b int) PARTITION BY HASH (`a`) PARTITIONS 4;", @@ -438,7 +447,7 @@ func TestAddIndexIngestJobWriteConflict(t *testing.T) { tk.MustExec("set global tidb_enable_dist_task = off;") injected := false - failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterRunIngestReorgJob", func(job *model.Job, done bool) { + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterRunIngestReorgJob", func(job *model.Job, done bool) { if done && !injected { tk2 := testkit.NewTestKit(t, store) tk2.MustExec("use test") @@ -451,7 +460,7 @@ func TestAddIndexIngestJobWriteConflict(t *testing.T) { } }) rowCnt := atomic.Int32{} - failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/ingest/onMockWriterWriteRow", func() { + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/onMockWriterWriteRow", func() { rowCnt.Add(1) }) tk.MustExec("alter table t add index idx(b);") @@ -682,6 +691,257 @@ func TestModifyColumnWithMultipleIndex(t *testing.T) { } } +// TestCheckpointInstanceAddrValidation tests that checkpoint instance address +// validation works correctly. When instance address changes (e.g., after restart +// or owner transfer), the local checkpoint should not be used. +// This covers issues #43983 and #43957. +// The bug was: using host:port as instance identifier caused issues when +// the same host:port was reused after restart but local data was stale. +// The fix uses AdvertiseAddress + TempDir as a more unique identifier. +func TestCheckpointInstanceAddrValidation(t *testing.T) { + if kerneltype.IsNextGen() { + t.Skip("add-index always runs on DXF with ingest mode in nextgen") + } + store := testkit.CreateMockStore(t) + defer ingesttestutil.InjectMockBackendCtx(t, store)() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("set global tidb_enable_dist_task = 0;") + + tk.MustExec("create table t (a int primary key, b int);") + for i := range 10 { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i)) + } + + // Test that InstanceAddr uses AdvertiseAddress + TempDir (not just Host:Port) + // The fix ensures we use config.AdvertiseAddress + port + tempDir as identifier + cfg := config.GetGlobalConfig() + instanceAddr := ingest.InstanceAddr() + require.NotEmpty(t, instanceAddr) + + // Instance address should contain the temp dir path (this is the key fix) + // Format should be "host:port:tempDir" + require.NotEmpty(t, cfg.TempDir) + tempDirSuffix := ":" + cfg.TempDir + require.True(t, strings.HasSuffix(instanceAddr, tempDirSuffix), "instance addr should end with temp dir, got: %s", instanceAddr) + + dsn := strings.TrimSuffix(instanceAddr, tempDirSuffix) + host, port, err := net.SplitHostPort(dsn) + require.NoError(t, err, "instance addr should contain host:port, got: %s", instanceAddr) + require.Equal(t, strconv.Itoa(int(cfg.Port)), port) + if cfg.AdvertiseAddress != "" { + require.Equal(t, cfg.AdvertiseAddress, host) + } else { + require.NotEqual(t, "0.0.0.0", host, "instance addr should not use default host") + } + + // Track that checkpoint mechanism is exercised + checkpointExercised := atomic.Bool{} + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/afterMockWriterWriteRow", func() { + checkpointExercised.Store(true) + }) + + // Add index and verify checkpoint works + tk.MustExec("alter table t add index idx(b);") + + // Key assertion: checkpoint mechanism should have been exercised + require.True(t, checkpointExercised.Load(), "checkpoint mechanism should have been exercised during add index") + + tk.MustExec("admin check table t;") +} + +// TestCheckpointPhysicalIDValidation tests that checkpoint saves physical_id +// that matches actual partition IDs from information_schema. +func TestCheckpointPhysicalIDValidation(t *testing.T) { + if kerneltype.IsNextGen() { + t.Skip("add-index always runs on DXF with ingest mode in nextgen") + } + store := testkit.CreateMockStore(t) + defer ingesttestutil.InjectMockBackendCtx(t, store)() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("set global tidb_enable_dist_task = 0;") + + // Create partitioned table + tk.MustExec(`create table t ( + a int primary key, + b int + ) partition by hash(a) partitions 4;`) + for i := range 20 { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i)) + } + + // Get valid partition IDs from information_schema + partitionRows := tk.MustQuery("select TIDB_PARTITION_ID from information_schema.partitions where table_schema='test' and table_name='t';").Rows() + require.Len(t, partitionRows, 4) + validPartIDs := make(map[int64]struct{}, len(partitionRows)) + for _, row := range partitionRows { + var pidStr string + switch v := row[0].(type) { + case int64: + pidStr = strconv.FormatInt(v, 10) + case string: + pidStr = v + case []byte: + pidStr = string(v) + default: + require.Failf(t, "unexpected partition id type", "%T", row[0]) + } + pid, err := strconv.ParseInt(pidStr, 10, 64) + require.NoError(t, err) + validPartIDs[pid] = struct{}{} + } + + // Track physical_id from checkpoint during add index + var observedPhysicalID atomic.Int64 + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/onMockWriterWriteRow", func() { + if observedPhysicalID.Load() != 0 { + return + } + tk2 := testkit.NewTestKit(t, store) + rows := tk2.MustQuery("select reorg_meta from mysql.tidb_ddl_reorg where ele_type = '_idx_' limit 1;").Rows() + if len(rows) > 0 && rows[0][0] != nil { + var raw []byte + switch v := rows[0][0].(type) { + case []byte: + raw = v + case string: + raw = []byte(v) + default: + return + } + var reorgMeta ingest.JobReorgMeta + if err := json.Unmarshal(raw, &reorgMeta); err != nil || reorgMeta.Checkpoint == nil { + return + } + if reorgMeta.Checkpoint.PhysicalID > 0 { + observedPhysicalID.Store(reorgMeta.Checkpoint.PhysicalID) + } + } + }) + + tk.MustExec("alter table t add index idx(b);") + tk.MustExec("admin check table t;") + + // Key assertion: observed physical_id must be a valid partition ID + physicalID := observedPhysicalID.Load() + require.NotZero(t, physicalID, "should have observed physical_id in checkpoint") + _, exists := validPartIDs[physicalID] + require.True(t, exists, "physical_id %d should be a valid partition ID", physicalID) +} + +// TestAddIndexWithEmptyPartitions tests that add index correctly iterates through +// all partitions including empty ones, and reorg physical_id is always valid. +// This covers #44265 where empty partitions could cause checkpoint issues. +func TestAddIndexWithEmptyPartitions(t *testing.T) { + if kerneltype.IsNextGen() { + t.Skip("add-index always runs on DXF with ingest mode in nextgen") + } + store := testkit.CreateMockStore(t) + defer ingesttestutil.InjectMockBackendCtx(t, store)() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("set global tidb_enable_dist_task = 0;") + + // Create partitioned table with empty partitions (p1, p3 are empty) + tk.MustExec(`create table t ( + a int primary key, + b int + ) partition by range(a) ( + partition p0 values less than (100), + partition p1 values less than (200), + partition p2 values less than (300), + partition p3 values less than (400) + );`) + // Only insert into p0 and p2, leaving p1 and p3 empty + for i := range 10 { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i)) + } + for i := 200; i < 210; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i)) + } + + // Get all partition IDs including empty ones + partitionRows := tk.MustQuery("select TIDB_PARTITION_ID from information_schema.partitions where table_schema='test' and table_name='t';").Rows() + require.Len(t, partitionRows, 4) + allPartIDs := make(map[int64]struct{}, len(partitionRows)) + for _, row := range partitionRows { + var pidStr string + switch v := row[0].(type) { + case string: + pidStr = v + case []byte: + pidStr = string(v) + default: + require.Failf(t, "unexpected partition id type", "%T", row[0]) + } + pid, err := strconv.ParseInt(pidStr, 10, 64) + require.NoError(t, err) + allPartIDs[pid] = struct{}{} + } + + // Track physical_id from tidb_ddl_reorg.physical_id column after each partition completes + var observedIDs []int64 + var mu sync.Mutex + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterUpdatePartitionReorgInfo", func(job *model.Job) { + if job.Type != model.ActionAddIndex { + return + } + tk2 := testkit.NewTestKit(t, store) + // Query physical_id column directly from tidb_ddl_reorg - this is the NEXT partition to process + rows := tk2.MustQuery("select physical_id from mysql.tidb_ddl_reorg where job_id = ? limit 1;", job.ID).Rows() + if len(rows) > 0 && rows[0][0] != nil { + var pidStr string + switch v := rows[0][0].(type) { + case int64: + if v != 0 { + mu.Lock() + observedIDs = append(observedIDs, v) + mu.Unlock() + } + return + case string: + pidStr = v + case []byte: + pidStr = string(v) + default: + return + } + if pidStr == "" || pidStr == "0" { + return + } + pid, err := strconv.ParseInt(pidStr, 10, 64) + if err != nil { + return + } + mu.Lock() + observedIDs = append(observedIDs, pid) + mu.Unlock() + } + }) + + tk.MustExec("alter table t add index idx(b);") + tk.MustExec("admin check table t;") + + // Verify data-index consistency + rs1 := tk.MustQuery("select count(*) from t use index(idx);").Rows() + rs2 := tk.MustQuery("select count(*) from t ignore index(idx);").Rows() + require.Equal(t, rs1[0][0], rs2[0][0]) + require.Equal(t, "20", rs1[0][0]) + + // Key assertion: should observe partition switches for all 4 partitions + // afterUpdatePartitionReorgInfo triggers after each partition completes, recording the NEXT partition ID + mu.Lock() + defer mu.Unlock() + // Should observe at least 3 partition switches (p0->p1, p1->p2, p2->p3) + require.GreaterOrEqual(t, len(observedIDs), 3, "should observe at least 3 partition switches for 4 partitions") + // All observed IDs must be valid partition IDs (including empty partitions p1, p3) + for _, pid := range observedIDs { + _, exists := allPartIDs[pid] + require.True(t, exists, "physical_id %d should be a valid partition ID", pid) + } +} + func TestModifyColumnWithIndexWithDefaultValue(t *testing.T) { store := testkit.CreateMockStore(t) defer ingesttestutil.InjectMockBackendCtx(t, store)() diff --git a/tests/realtikvtest/addindextest3/BUILD.bazel b/tests/realtikvtest/addindextest3/BUILD.bazel index 302fd2f5c3..b3559877ed 100644 --- a/tests/realtikvtest/addindextest3/BUILD.bazel +++ b/tests/realtikvtest/addindextest3/BUILD.bazel @@ -11,7 +11,7 @@ go_test( "temp_index_test.go", ], flaky = True, - shard_count = 39, + shard_count = 42, deps = [ "//pkg/config", "//pkg/config/kerneltype", diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index abab189b7d..a4bfa43aea 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -20,6 +20,7 @@ import ( "io/fs" "os" "path/filepath" + "runtime" "strings" "sync" "testing" @@ -57,7 +58,11 @@ func TestAddIndexIngestMemoryUsage(t *testing.T) { tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) } + oldRunInTest := local.RunInTest local.RunInTest = true + t.Cleanup(func() { + local.RunInTest = oldRunInTest + }) tk.MustExec("create table t (a int, b int, c int);") var sb strings.Builder @@ -126,10 +131,14 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) { // test cancel is timely enter := make(chan struct{}) + blockOnce := atomic.Bool{} testfailpoint.EnableCall( t, "github.com/pingcap/tidb/pkg/lightning/backend/local/beforeExecuteRegionJob", func(ctx context.Context) { + if !blockOnce.CompareAndSwap(false, true) { + return + } close(enter) select { case <-time.After(time.Second * 20): @@ -148,7 +157,7 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) { tk.MustExec("admin cancel ddl jobs " + jobID) wg.Wait() // cancel should be timely - require.Less(t, time.Since(now).Seconds(), 10.0) + require.Less(t, time.Since(now).Seconds(), 30.0) } func TestAddIndexIngestWriterCountOnPartitionTable(t *testing.T) { @@ -247,8 +256,12 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) } + oldImporterRangeConcurrency := ingest.ImporterRangeConcurrencyForTest ingest.ImporterRangeConcurrencyForTest = &atomic.Int32{} ingest.ImporterRangeConcurrencyForTest.Store(2) + defer func() { + ingest.ImporterRangeConcurrencyForTest = oldImporterRangeConcurrency + }() tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 20;") tk.MustExec("create table t (a int primary key);") @@ -271,7 +284,6 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { } else { require.Equal(t, jobTp, "") } - ingest.ImporterRangeConcurrencyForTest = nil } func TestAddIndexIngestEmptyTable(t *testing.T) { @@ -480,7 +492,9 @@ func testAddIndexDiskQuotaTS(t *testing.T, tk *testkit.TestKit) { tk.MustExec("insert into t values(1, 1, 1);") tk.MustExec("insert into t values(100000, 1, 1);") + oldForceSyncFlag := ingest.ForceSyncFlagForTest.Load() ingest.ForceSyncFlagForTest.Store(true) + defer ingest.ForceSyncFlagForTest.Store(oldForceSyncFlag) tk.MustExec("alter table t add index idx_test(b);") tk.MustExec("update t set b = b + 1;") @@ -495,7 +509,6 @@ func testAddIndexDiskQuotaTS(t *testing.T, tk *testkit.TestKit) { assert.Equal(t, counter, 0) }) tk.MustExec("alter table t add index idx_test2(b);") - ingest.ForceSyncFlagForTest.Store(false) } func TestAddIndexAdvanceWatermarkFailed(t *testing.T) { @@ -513,7 +526,9 @@ func TestAddIndexAdvanceWatermarkFailed(t *testing.T) { tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1")) tk.MustExec("insert into t values(1, 1, 1);") tk.MustExec("insert into t values(100000, 1, 2);") + oldForceSyncFlag := ingest.ForceSyncFlagForTest.Load() ingest.ForceSyncFlagForTest.Store(true) + defer ingest.ForceSyncFlagForTest.Store(oldForceSyncFlag) testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/ingest/mockAfterImportAllocTSFailed", "2*return") tk.MustExec("alter table t add index idx(b);") tk.MustExec("admin check table t;") @@ -595,9 +610,10 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) { tk.MustExec("insert into t values(1, 1, 1);") tk.MustExec("insert into t values(100000, 1, 1);") + oldForceSyncFlag := ingest.ForceSyncFlagForTest.Load() ingest.ForceSyncFlagForTest.Store(true) + defer ingest.ForceSyncFlagForTest.Store(oldForceSyncFlag) tk.MustGetErrMsg("alter table t add unique index idx(b);", "[kv:1062]Duplicate entry '1' for key 't.idx'") - ingest.ForceSyncFlagForTest.Store(false) } func TestAddIndexRecoverOnDuplicateCheck(t *testing.T) { @@ -968,6 +984,220 @@ func TestAddIndexInsertSameOriginIndexValue(t *testing.T) { tk.MustExec("alter table t add unique index idx(b);") } +// TestIngestConcurrentJobCleanupRace tests that parallel add index jobs don't +// cause panic due to cleanup race. This covers issues #44137 and #44140. +// The bug was: multiple jobs running ingest concurrently could trigger cleanup +// of stale temp directories that interfered with another job's active files. +func TestIngestConcurrentJobCleanupRace(t *testing.T) { + if kerneltype.IsNextGen() { + t.Skip("DXF is always enabled on nextgen") + } + oldProcs := runtime.GOMAXPROCS(0) + if oldProcs < 8 { + runtime.GOMAXPROCS(8) + defer runtime.GOMAXPROCS(oldProcs) + } + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("set global tidb_enable_dist_task = off;") // Ensure both use local ingest + + // Create two tables with enough data to ensure ingest mode is used + tk.MustExec("create table t1 (a int primary key, b int);") + tk.MustExec("create table t2 (a int primary key, b int);") + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("insert into t1 values (%d, %d);", i, i)) + tk.MustExec(fmt.Sprintf("insert into t2 values (%d, %d);", i, i)) + } + + // Ensure both jobs enter ingest and overlap at the ingest stage. + ingestBarrier := make(chan struct{}) + ingestCalls := atomic.Int32{} + ingestTimedOut := atomic.Bool{} + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/beforeBackendIngest", func() { + if ingestCalls.Add(1) == 2 { + close(ingestBarrier) + } + select { + case <-ingestBarrier: + case <-time.After(10 * time.Second): + ingestTimedOut.Store(true) + } + }) + + // Start two add index jobs concurrently + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use addindexlit;") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use addindexlit;") + + wg := &sync.WaitGroup{} + wg.Add(2) + + var err1, err2 error + go func() { + defer wg.Done() + _, err1 = tk1.Exec("alter table t1 add index idx1(b);") + }() + go func() { + defer wg.Done() + _, err2 = tk2.Exec("alter table t2 add index idx2(b);") + }() + + wg.Wait() + + // Verify no panic occurred and both indexes created successfully + require.NoError(t, err1) + require.NoError(t, err2) + + require.False(t, ingestTimedOut.Load(), "ingest jobs did not overlap") + require.GreaterOrEqual(t, ingestCalls.Load(), int32(2), "both jobs should enter ingest") + + // Verify both jobs used ingest mode + rows := tk.MustQuery("admin show ddl jobs 2;").Rows() + require.Len(t, rows, 2) + for _, row := range rows { + jobTp := row[12].(string) + require.True(t, strings.Contains(jobTp, "ingest"), "both jobs should use ingest mode: %s", jobTp) + } + + tk.MustExec("admin check table t1;") + tk.MustExec("admin check table t2;") +} + +// TestIngestGCSafepointBlocking tests that add index correctly uses TS from checkpoint +// and blocks GC safepoint advancement. This covers issues #40074 and #40081. +// The fix changed Copr request start TS to use explicit transaction start time. +// This test verifies that the internal session's start TS is properly registered +// to block GC safepoint advancement during add index. +func TestIngestGCSafepointBlocking(t *testing.T) { + if kerneltype.IsNextGen() { + t.Skip("DXF is always enabled on nextgen") + } + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("set global tidb_enable_dist_task = off;") + + tk.MustExec("create table t (a int primary key, b int);") + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i)) + } + + // Track that internal session TS is registered for GC blocking + internalTSRegistered := atomic.Bool{} + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/wrapInBeginRollbackStartTS", func(startTS uint64) { + if startTS > 0 { + internalTSRegistered.Store(true) + } + }) + + // Force sync mode to ensure we go through the checkpoint/TS path + oldForceSyncFlag := ingest.ForceSyncFlagForTest.Load() + ingest.ForceSyncFlagForTest.Store(true) + defer ingest.ForceSyncFlagForTest.Store(oldForceSyncFlag) + + tk.MustExec("alter table t add index idx(b);") + + // Assert that internal session TS was registered (this is what blocks GC) + require.True(t, internalTSRegistered.Load(), "internal session start TS should be registered for GC blocking") + + tk.MustExec("admin check table t;") + + // Verify data consistency + rs1 := tk.MustQuery("select count(*) from t use index(idx);").Rows() + rs2 := tk.MustQuery("select count(*) from t ignore index(idx);").Rows() + require.Equal(t, rs1[0][0], rs2[0][0]) + require.Equal(t, "100", rs1[0][0]) + + // Verify the job used ingest mode + rows := tk.MustQuery("admin show ddl jobs 1;").Rows() + require.Len(t, rows, 1) + jobTp := rows[0][12].(string) + require.True(t, strings.Contains(jobTp, "ingest"), jobTp) +} + +// TestIngestCancelCleanupOrder tests that canceling add index after ingest starts +// doesn't cause nil pointer panic due to incorrect cleanup order. +// This covers issues #43323 and #43326 (weak coverage). +// The bug was: during rollback, Close engine was called before cleaning up local path. +func TestIngestCancelCleanupOrder(t *testing.T) { + if kerneltype.IsNextGen() { + t.Skip("DXF is always enabled on nextgen") + } + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("set global tidb_enable_dist_task = off;") + + tk.MustExec("create table t (a int primary key, b int);") + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i)) + } + + tkCancel := testkit.NewTestKit(t, store) + tkCancel.MustExec("use addindexlit;") + + // Cancel after backfill starts (WriteReorganization state with running backfill) + var jobID atomic.Int64 + backfillStarted := atomic.Bool{} + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDeliveryJob", func(job *model.Job) { + if job.Type == model.ActionAddIndex { + jobID.Store(job.ID) + } + }) + cancelOnce := sync.Once{} + cancelIssued := atomic.Bool{} + cancelExecErrCh := make(chan error, 1) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/beforeBackendIngest", func() { + backfillStarted.Store(true) + if jobID.Load() <= 0 { + return + } + cancelOnce.Do(func() { + cancelIssued.Store(true) + rs, err := tkCancel.Exec(fmt.Sprintf("admin cancel ddl jobs %d", jobID.Load())) + if rs != nil { + closeErr := rs.Close() + if err == nil { + err = closeErr + } + } + cancelExecErrCh <- err + }) + }) + + err := tk.ExecToErr("alter table t add index idx(b);") + require.Error(t, err) + require.ErrorContains(t, err, "Cancelled DDL job") + + // Key assertion: backfill actually started before cancel + require.True(t, backfillStarted.Load(), "backfill should have started before cancel") + require.True(t, cancelIssued.Load(), "cancel should be issued after ingest starts") + select { + case cancelErr := <-cancelExecErrCh: + require.NoError(t, cancelErr) + case <-time.After(5 * time.Second): + require.Fail(t, "cancel exec should have finished") + } + + // Verify no panic occurred and table is consistent + tk.MustExec("admin check table t;") + + // Verify backend context is cleaned up properly + cnt := ingest.LitDiskRoot.Count() + require.Equal(t, 0, cnt, "backend context should be cleaned up after cancel") +} + func TestMergeTempIndexSplitConflictTxn(t *testing.T) { if kerneltype.IsNextGen() { t.Skip("DXF is always enabled on nextgen")