ddl, ingest: improve test coverage for checkpoint validation (#65421)
close pingcap/tidb#65420
This commit is contained in:
@ -77,7 +77,7 @@ go_test(
|
||||
],
|
||||
flaky = True,
|
||||
race = "on",
|
||||
shard_count = 28,
|
||||
shard_count = 31,
|
||||
deps = [
|
||||
":ingest",
|
||||
"//pkg/config",
|
||||
|
||||
@ -29,6 +29,8 @@ import (
|
||||
func TestGenLightningDataDir(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
port, iPort := "5678", uint(5678)
|
||||
restore := config.RestoreFunc()
|
||||
t.Cleanup(restore)
|
||||
config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TempDir = tmpDir
|
||||
conf.Port = iPort
|
||||
|
||||
@ -15,13 +15,17 @@
|
||||
package ingest_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/pkg/config"
|
||||
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
||||
"github.com/pingcap/tidb/pkg/ddl/ingest"
|
||||
ingesttestutil "github.com/pingcap/tidb/pkg/ddl/ingest/testutil"
|
||||
@ -381,7 +385,7 @@ func TestAddIndexDuplicateMessage(t *testing.T) {
|
||||
|
||||
var errDML error
|
||||
var once sync.Once
|
||||
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/ingest/afterMockWriterWriteRow", func() {
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/afterMockWriterWriteRow", func() {
|
||||
once.Do(func() {
|
||||
_, errDML = tk1.Exec("insert into t values (2, 1, 2);")
|
||||
})
|
||||
@ -401,6 +405,11 @@ func TestMultiSchemaAddIndexMerge(t *testing.T) {
|
||||
tk2 := testkit.NewTestKit(t, store)
|
||||
tk2.MustExec("use test")
|
||||
|
||||
oldMockExecAfterWriteRow := ingest.MockExecAfterWriteRow
|
||||
t.Cleanup(func() {
|
||||
ingest.MockExecAfterWriteRow = oldMockExecAfterWriteRow
|
||||
})
|
||||
|
||||
for _, createTableSQL := range []string{
|
||||
"create table t (a int, b int);",
|
||||
"create table t (a int, b int) PARTITION BY HASH (`a`) PARTITIONS 4;",
|
||||
@ -438,7 +447,7 @@ func TestAddIndexIngestJobWriteConflict(t *testing.T) {
|
||||
tk.MustExec("set global tidb_enable_dist_task = off;")
|
||||
|
||||
injected := false
|
||||
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterRunIngestReorgJob", func(job *model.Job, done bool) {
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterRunIngestReorgJob", func(job *model.Job, done bool) {
|
||||
if done && !injected {
|
||||
tk2 := testkit.NewTestKit(t, store)
|
||||
tk2.MustExec("use test")
|
||||
@ -451,7 +460,7 @@ func TestAddIndexIngestJobWriteConflict(t *testing.T) {
|
||||
}
|
||||
})
|
||||
rowCnt := atomic.Int32{}
|
||||
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/ingest/onMockWriterWriteRow", func() {
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/onMockWriterWriteRow", func() {
|
||||
rowCnt.Add(1)
|
||||
})
|
||||
tk.MustExec("alter table t add index idx(b);")
|
||||
@ -682,6 +691,257 @@ func TestModifyColumnWithMultipleIndex(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestCheckpointInstanceAddrValidation tests that checkpoint instance address
|
||||
// validation works correctly. When instance address changes (e.g., after restart
|
||||
// or owner transfer), the local checkpoint should not be used.
|
||||
// This covers issues #43983 and #43957.
|
||||
// The bug was: using host:port as instance identifier caused issues when
|
||||
// the same host:port was reused after restart but local data was stale.
|
||||
// The fix uses AdvertiseAddress + TempDir as a more unique identifier.
|
||||
func TestCheckpointInstanceAddrValidation(t *testing.T) {
|
||||
if kerneltype.IsNextGen() {
|
||||
t.Skip("add-index always runs on DXF with ingest mode in nextgen")
|
||||
}
|
||||
store := testkit.CreateMockStore(t)
|
||||
defer ingesttestutil.InjectMockBackendCtx(t, store)()
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("use test;")
|
||||
tk.MustExec("set global tidb_enable_dist_task = 0;")
|
||||
|
||||
tk.MustExec("create table t (a int primary key, b int);")
|
||||
for i := range 10 {
|
||||
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i))
|
||||
}
|
||||
|
||||
// Test that InstanceAddr uses AdvertiseAddress + TempDir (not just Host:Port)
|
||||
// The fix ensures we use config.AdvertiseAddress + port + tempDir as identifier
|
||||
cfg := config.GetGlobalConfig()
|
||||
instanceAddr := ingest.InstanceAddr()
|
||||
require.NotEmpty(t, instanceAddr)
|
||||
|
||||
// Instance address should contain the temp dir path (this is the key fix)
|
||||
// Format should be "host:port:tempDir"
|
||||
require.NotEmpty(t, cfg.TempDir)
|
||||
tempDirSuffix := ":" + cfg.TempDir
|
||||
require.True(t, strings.HasSuffix(instanceAddr, tempDirSuffix), "instance addr should end with temp dir, got: %s", instanceAddr)
|
||||
|
||||
dsn := strings.TrimSuffix(instanceAddr, tempDirSuffix)
|
||||
host, port, err := net.SplitHostPort(dsn)
|
||||
require.NoError(t, err, "instance addr should contain host:port, got: %s", instanceAddr)
|
||||
require.Equal(t, strconv.Itoa(int(cfg.Port)), port)
|
||||
if cfg.AdvertiseAddress != "" {
|
||||
require.Equal(t, cfg.AdvertiseAddress, host)
|
||||
} else {
|
||||
require.NotEqual(t, "0.0.0.0", host, "instance addr should not use default host")
|
||||
}
|
||||
|
||||
// Track that checkpoint mechanism is exercised
|
||||
checkpointExercised := atomic.Bool{}
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/afterMockWriterWriteRow", func() {
|
||||
checkpointExercised.Store(true)
|
||||
})
|
||||
|
||||
// Add index and verify checkpoint works
|
||||
tk.MustExec("alter table t add index idx(b);")
|
||||
|
||||
// Key assertion: checkpoint mechanism should have been exercised
|
||||
require.True(t, checkpointExercised.Load(), "checkpoint mechanism should have been exercised during add index")
|
||||
|
||||
tk.MustExec("admin check table t;")
|
||||
}
|
||||
|
||||
// TestCheckpointPhysicalIDValidation tests that checkpoint saves physical_id
|
||||
// that matches actual partition IDs from information_schema.
|
||||
func TestCheckpointPhysicalIDValidation(t *testing.T) {
|
||||
if kerneltype.IsNextGen() {
|
||||
t.Skip("add-index always runs on DXF with ingest mode in nextgen")
|
||||
}
|
||||
store := testkit.CreateMockStore(t)
|
||||
defer ingesttestutil.InjectMockBackendCtx(t, store)()
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("use test;")
|
||||
tk.MustExec("set global tidb_enable_dist_task = 0;")
|
||||
|
||||
// Create partitioned table
|
||||
tk.MustExec(`create table t (
|
||||
a int primary key,
|
||||
b int
|
||||
) partition by hash(a) partitions 4;`)
|
||||
for i := range 20 {
|
||||
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i))
|
||||
}
|
||||
|
||||
// Get valid partition IDs from information_schema
|
||||
partitionRows := tk.MustQuery("select TIDB_PARTITION_ID from information_schema.partitions where table_schema='test' and table_name='t';").Rows()
|
||||
require.Len(t, partitionRows, 4)
|
||||
validPartIDs := make(map[int64]struct{}, len(partitionRows))
|
||||
for _, row := range partitionRows {
|
||||
var pidStr string
|
||||
switch v := row[0].(type) {
|
||||
case int64:
|
||||
pidStr = strconv.FormatInt(v, 10)
|
||||
case string:
|
||||
pidStr = v
|
||||
case []byte:
|
||||
pidStr = string(v)
|
||||
default:
|
||||
require.Failf(t, "unexpected partition id type", "%T", row[0])
|
||||
}
|
||||
pid, err := strconv.ParseInt(pidStr, 10, 64)
|
||||
require.NoError(t, err)
|
||||
validPartIDs[pid] = struct{}{}
|
||||
}
|
||||
|
||||
// Track physical_id from checkpoint during add index
|
||||
var observedPhysicalID atomic.Int64
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/onMockWriterWriteRow", func() {
|
||||
if observedPhysicalID.Load() != 0 {
|
||||
return
|
||||
}
|
||||
tk2 := testkit.NewTestKit(t, store)
|
||||
rows := tk2.MustQuery("select reorg_meta from mysql.tidb_ddl_reorg where ele_type = '_idx_' limit 1;").Rows()
|
||||
if len(rows) > 0 && rows[0][0] != nil {
|
||||
var raw []byte
|
||||
switch v := rows[0][0].(type) {
|
||||
case []byte:
|
||||
raw = v
|
||||
case string:
|
||||
raw = []byte(v)
|
||||
default:
|
||||
return
|
||||
}
|
||||
var reorgMeta ingest.JobReorgMeta
|
||||
if err := json.Unmarshal(raw, &reorgMeta); err != nil || reorgMeta.Checkpoint == nil {
|
||||
return
|
||||
}
|
||||
if reorgMeta.Checkpoint.PhysicalID > 0 {
|
||||
observedPhysicalID.Store(reorgMeta.Checkpoint.PhysicalID)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
tk.MustExec("alter table t add index idx(b);")
|
||||
tk.MustExec("admin check table t;")
|
||||
|
||||
// Key assertion: observed physical_id must be a valid partition ID
|
||||
physicalID := observedPhysicalID.Load()
|
||||
require.NotZero(t, physicalID, "should have observed physical_id in checkpoint")
|
||||
_, exists := validPartIDs[physicalID]
|
||||
require.True(t, exists, "physical_id %d should be a valid partition ID", physicalID)
|
||||
}
|
||||
|
||||
// TestAddIndexWithEmptyPartitions tests that add index correctly iterates through
|
||||
// all partitions including empty ones, and reorg physical_id is always valid.
|
||||
// This covers #44265 where empty partitions could cause checkpoint issues.
|
||||
func TestAddIndexWithEmptyPartitions(t *testing.T) {
|
||||
if kerneltype.IsNextGen() {
|
||||
t.Skip("add-index always runs on DXF with ingest mode in nextgen")
|
||||
}
|
||||
store := testkit.CreateMockStore(t)
|
||||
defer ingesttestutil.InjectMockBackendCtx(t, store)()
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("use test;")
|
||||
tk.MustExec("set global tidb_enable_dist_task = 0;")
|
||||
|
||||
// Create partitioned table with empty partitions (p1, p3 are empty)
|
||||
tk.MustExec(`create table t (
|
||||
a int primary key,
|
||||
b int
|
||||
) partition by range(a) (
|
||||
partition p0 values less than (100),
|
||||
partition p1 values less than (200),
|
||||
partition p2 values less than (300),
|
||||
partition p3 values less than (400)
|
||||
);`)
|
||||
// Only insert into p0 and p2, leaving p1 and p3 empty
|
||||
for i := range 10 {
|
||||
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i))
|
||||
}
|
||||
for i := 200; i < 210; i++ {
|
||||
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i))
|
||||
}
|
||||
|
||||
// Get all partition IDs including empty ones
|
||||
partitionRows := tk.MustQuery("select TIDB_PARTITION_ID from information_schema.partitions where table_schema='test' and table_name='t';").Rows()
|
||||
require.Len(t, partitionRows, 4)
|
||||
allPartIDs := make(map[int64]struct{}, len(partitionRows))
|
||||
for _, row := range partitionRows {
|
||||
var pidStr string
|
||||
switch v := row[0].(type) {
|
||||
case string:
|
||||
pidStr = v
|
||||
case []byte:
|
||||
pidStr = string(v)
|
||||
default:
|
||||
require.Failf(t, "unexpected partition id type", "%T", row[0])
|
||||
}
|
||||
pid, err := strconv.ParseInt(pidStr, 10, 64)
|
||||
require.NoError(t, err)
|
||||
allPartIDs[pid] = struct{}{}
|
||||
}
|
||||
|
||||
// Track physical_id from tidb_ddl_reorg.physical_id column after each partition completes
|
||||
var observedIDs []int64
|
||||
var mu sync.Mutex
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterUpdatePartitionReorgInfo", func(job *model.Job) {
|
||||
if job.Type != model.ActionAddIndex {
|
||||
return
|
||||
}
|
||||
tk2 := testkit.NewTestKit(t, store)
|
||||
// Query physical_id column directly from tidb_ddl_reorg - this is the NEXT partition to process
|
||||
rows := tk2.MustQuery("select physical_id from mysql.tidb_ddl_reorg where job_id = ? limit 1;", job.ID).Rows()
|
||||
if len(rows) > 0 && rows[0][0] != nil {
|
||||
var pidStr string
|
||||
switch v := rows[0][0].(type) {
|
||||
case int64:
|
||||
if v != 0 {
|
||||
mu.Lock()
|
||||
observedIDs = append(observedIDs, v)
|
||||
mu.Unlock()
|
||||
}
|
||||
return
|
||||
case string:
|
||||
pidStr = v
|
||||
case []byte:
|
||||
pidStr = string(v)
|
||||
default:
|
||||
return
|
||||
}
|
||||
if pidStr == "" || pidStr == "0" {
|
||||
return
|
||||
}
|
||||
pid, err := strconv.ParseInt(pidStr, 10, 64)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
mu.Lock()
|
||||
observedIDs = append(observedIDs, pid)
|
||||
mu.Unlock()
|
||||
}
|
||||
})
|
||||
|
||||
tk.MustExec("alter table t add index idx(b);")
|
||||
tk.MustExec("admin check table t;")
|
||||
|
||||
// Verify data-index consistency
|
||||
rs1 := tk.MustQuery("select count(*) from t use index(idx);").Rows()
|
||||
rs2 := tk.MustQuery("select count(*) from t ignore index(idx);").Rows()
|
||||
require.Equal(t, rs1[0][0], rs2[0][0])
|
||||
require.Equal(t, "20", rs1[0][0])
|
||||
|
||||
// Key assertion: should observe partition switches for all 4 partitions
|
||||
// afterUpdatePartitionReorgInfo triggers after each partition completes, recording the NEXT partition ID
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
// Should observe at least 3 partition switches (p0->p1, p1->p2, p2->p3)
|
||||
require.GreaterOrEqual(t, len(observedIDs), 3, "should observe at least 3 partition switches for 4 partitions")
|
||||
// All observed IDs must be valid partition IDs (including empty partitions p1, p3)
|
||||
for _, pid := range observedIDs {
|
||||
_, exists := allPartIDs[pid]
|
||||
require.True(t, exists, "physical_id %d should be a valid partition ID", pid)
|
||||
}
|
||||
}
|
||||
|
||||
func TestModifyColumnWithIndexWithDefaultValue(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
defer ingesttestutil.InjectMockBackendCtx(t, store)()
|
||||
|
||||
@ -11,7 +11,7 @@ go_test(
|
||||
"temp_index_test.go",
|
||||
],
|
||||
flaky = True,
|
||||
shard_count = 39,
|
||||
shard_count = 42,
|
||||
deps = [
|
||||
"//pkg/config",
|
||||
"//pkg/config/kerneltype",
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
@ -57,7 +58,11 @@ func TestAddIndexIngestMemoryUsage(t *testing.T) {
|
||||
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
|
||||
}
|
||||
|
||||
oldRunInTest := local.RunInTest
|
||||
local.RunInTest = true
|
||||
t.Cleanup(func() {
|
||||
local.RunInTest = oldRunInTest
|
||||
})
|
||||
|
||||
tk.MustExec("create table t (a int, b int, c int);")
|
||||
var sb strings.Builder
|
||||
@ -126,10 +131,14 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) {
|
||||
|
||||
// test cancel is timely
|
||||
enter := make(chan struct{})
|
||||
blockOnce := atomic.Bool{}
|
||||
testfailpoint.EnableCall(
|
||||
t,
|
||||
"github.com/pingcap/tidb/pkg/lightning/backend/local/beforeExecuteRegionJob",
|
||||
func(ctx context.Context) {
|
||||
if !blockOnce.CompareAndSwap(false, true) {
|
||||
return
|
||||
}
|
||||
close(enter)
|
||||
select {
|
||||
case <-time.After(time.Second * 20):
|
||||
@ -148,7 +157,7 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) {
|
||||
tk.MustExec("admin cancel ddl jobs " + jobID)
|
||||
wg.Wait()
|
||||
// cancel should be timely
|
||||
require.Less(t, time.Since(now).Seconds(), 10.0)
|
||||
require.Less(t, time.Since(now).Seconds(), 30.0)
|
||||
}
|
||||
|
||||
func TestAddIndexIngestWriterCountOnPartitionTable(t *testing.T) {
|
||||
@ -247,8 +256,12 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) {
|
||||
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
|
||||
}
|
||||
|
||||
oldImporterRangeConcurrency := ingest.ImporterRangeConcurrencyForTest
|
||||
ingest.ImporterRangeConcurrencyForTest = &atomic.Int32{}
|
||||
ingest.ImporterRangeConcurrencyForTest.Store(2)
|
||||
defer func() {
|
||||
ingest.ImporterRangeConcurrencyForTest = oldImporterRangeConcurrency
|
||||
}()
|
||||
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 20;")
|
||||
|
||||
tk.MustExec("create table t (a int primary key);")
|
||||
@ -271,7 +284,6 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) {
|
||||
} else {
|
||||
require.Equal(t, jobTp, "")
|
||||
}
|
||||
ingest.ImporterRangeConcurrencyForTest = nil
|
||||
}
|
||||
|
||||
func TestAddIndexIngestEmptyTable(t *testing.T) {
|
||||
@ -480,7 +492,9 @@ func testAddIndexDiskQuotaTS(t *testing.T, tk *testkit.TestKit) {
|
||||
tk.MustExec("insert into t values(1, 1, 1);")
|
||||
tk.MustExec("insert into t values(100000, 1, 1);")
|
||||
|
||||
oldForceSyncFlag := ingest.ForceSyncFlagForTest.Load()
|
||||
ingest.ForceSyncFlagForTest.Store(true)
|
||||
defer ingest.ForceSyncFlagForTest.Store(oldForceSyncFlag)
|
||||
tk.MustExec("alter table t add index idx_test(b);")
|
||||
tk.MustExec("update t set b = b + 1;")
|
||||
|
||||
@ -495,7 +509,6 @@ func testAddIndexDiskQuotaTS(t *testing.T, tk *testkit.TestKit) {
|
||||
assert.Equal(t, counter, 0)
|
||||
})
|
||||
tk.MustExec("alter table t add index idx_test2(b);")
|
||||
ingest.ForceSyncFlagForTest.Store(false)
|
||||
}
|
||||
|
||||
func TestAddIndexAdvanceWatermarkFailed(t *testing.T) {
|
||||
@ -513,7 +526,9 @@ func TestAddIndexAdvanceWatermarkFailed(t *testing.T) {
|
||||
tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1"))
|
||||
tk.MustExec("insert into t values(1, 1, 1);")
|
||||
tk.MustExec("insert into t values(100000, 1, 2);")
|
||||
oldForceSyncFlag := ingest.ForceSyncFlagForTest.Load()
|
||||
ingest.ForceSyncFlagForTest.Store(true)
|
||||
defer ingest.ForceSyncFlagForTest.Store(oldForceSyncFlag)
|
||||
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/ingest/mockAfterImportAllocTSFailed", "2*return")
|
||||
tk.MustExec("alter table t add index idx(b);")
|
||||
tk.MustExec("admin check table t;")
|
||||
@ -595,9 +610,10 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) {
|
||||
tk.MustExec("insert into t values(1, 1, 1);")
|
||||
tk.MustExec("insert into t values(100000, 1, 1);")
|
||||
|
||||
oldForceSyncFlag := ingest.ForceSyncFlagForTest.Load()
|
||||
ingest.ForceSyncFlagForTest.Store(true)
|
||||
defer ingest.ForceSyncFlagForTest.Store(oldForceSyncFlag)
|
||||
tk.MustGetErrMsg("alter table t add unique index idx(b);", "[kv:1062]Duplicate entry '1' for key 't.idx'")
|
||||
ingest.ForceSyncFlagForTest.Store(false)
|
||||
}
|
||||
|
||||
func TestAddIndexRecoverOnDuplicateCheck(t *testing.T) {
|
||||
@ -968,6 +984,220 @@ func TestAddIndexInsertSameOriginIndexValue(t *testing.T) {
|
||||
tk.MustExec("alter table t add unique index idx(b);")
|
||||
}
|
||||
|
||||
// TestIngestConcurrentJobCleanupRace tests that parallel add index jobs don't
|
||||
// cause panic due to cleanup race. This covers issues #44137 and #44140.
|
||||
// The bug was: multiple jobs running ingest concurrently could trigger cleanup
|
||||
// of stale temp directories that interfered with another job's active files.
|
||||
func TestIngestConcurrentJobCleanupRace(t *testing.T) {
|
||||
if kerneltype.IsNextGen() {
|
||||
t.Skip("DXF is always enabled on nextgen")
|
||||
}
|
||||
oldProcs := runtime.GOMAXPROCS(0)
|
||||
if oldProcs < 8 {
|
||||
runtime.GOMAXPROCS(8)
|
||||
defer runtime.GOMAXPROCS(oldProcs)
|
||||
}
|
||||
store := realtikvtest.CreateMockStoreAndSetup(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("drop database if exists addindexlit;")
|
||||
tk.MustExec("create database addindexlit;")
|
||||
tk.MustExec("use addindexlit;")
|
||||
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
|
||||
tk.MustExec("set global tidb_enable_dist_task = off;") // Ensure both use local ingest
|
||||
|
||||
// Create two tables with enough data to ensure ingest mode is used
|
||||
tk.MustExec("create table t1 (a int primary key, b int);")
|
||||
tk.MustExec("create table t2 (a int primary key, b int);")
|
||||
for i := 0; i < 100; i++ {
|
||||
tk.MustExec(fmt.Sprintf("insert into t1 values (%d, %d);", i, i))
|
||||
tk.MustExec(fmt.Sprintf("insert into t2 values (%d, %d);", i, i))
|
||||
}
|
||||
|
||||
// Ensure both jobs enter ingest and overlap at the ingest stage.
|
||||
ingestBarrier := make(chan struct{})
|
||||
ingestCalls := atomic.Int32{}
|
||||
ingestTimedOut := atomic.Bool{}
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/beforeBackendIngest", func() {
|
||||
if ingestCalls.Add(1) == 2 {
|
||||
close(ingestBarrier)
|
||||
}
|
||||
select {
|
||||
case <-ingestBarrier:
|
||||
case <-time.After(10 * time.Second):
|
||||
ingestTimedOut.Store(true)
|
||||
}
|
||||
})
|
||||
|
||||
// Start two add index jobs concurrently
|
||||
tk1 := testkit.NewTestKit(t, store)
|
||||
tk1.MustExec("use addindexlit;")
|
||||
tk2 := testkit.NewTestKit(t, store)
|
||||
tk2.MustExec("use addindexlit;")
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
|
||||
var err1, err2 error
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err1 = tk1.Exec("alter table t1 add index idx1(b);")
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err2 = tk2.Exec("alter table t2 add index idx2(b);")
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Verify no panic occurred and both indexes created successfully
|
||||
require.NoError(t, err1)
|
||||
require.NoError(t, err2)
|
||||
|
||||
require.False(t, ingestTimedOut.Load(), "ingest jobs did not overlap")
|
||||
require.GreaterOrEqual(t, ingestCalls.Load(), int32(2), "both jobs should enter ingest")
|
||||
|
||||
// Verify both jobs used ingest mode
|
||||
rows := tk.MustQuery("admin show ddl jobs 2;").Rows()
|
||||
require.Len(t, rows, 2)
|
||||
for _, row := range rows {
|
||||
jobTp := row[12].(string)
|
||||
require.True(t, strings.Contains(jobTp, "ingest"), "both jobs should use ingest mode: %s", jobTp)
|
||||
}
|
||||
|
||||
tk.MustExec("admin check table t1;")
|
||||
tk.MustExec("admin check table t2;")
|
||||
}
|
||||
|
||||
// TestIngestGCSafepointBlocking tests that add index correctly uses TS from checkpoint
|
||||
// and blocks GC safepoint advancement. This covers issues #40074 and #40081.
|
||||
// The fix changed Copr request start TS to use explicit transaction start time.
|
||||
// This test verifies that the internal session's start TS is properly registered
|
||||
// to block GC safepoint advancement during add index.
|
||||
func TestIngestGCSafepointBlocking(t *testing.T) {
|
||||
if kerneltype.IsNextGen() {
|
||||
t.Skip("DXF is always enabled on nextgen")
|
||||
}
|
||||
store := realtikvtest.CreateMockStoreAndSetup(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("drop database if exists addindexlit;")
|
||||
tk.MustExec("create database addindexlit;")
|
||||
tk.MustExec("use addindexlit;")
|
||||
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
|
||||
tk.MustExec("set global tidb_enable_dist_task = off;")
|
||||
|
||||
tk.MustExec("create table t (a int primary key, b int);")
|
||||
for i := 0; i < 100; i++ {
|
||||
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i))
|
||||
}
|
||||
|
||||
// Track that internal session TS is registered for GC blocking
|
||||
internalTSRegistered := atomic.Bool{}
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/wrapInBeginRollbackStartTS", func(startTS uint64) {
|
||||
if startTS > 0 {
|
||||
internalTSRegistered.Store(true)
|
||||
}
|
||||
})
|
||||
|
||||
// Force sync mode to ensure we go through the checkpoint/TS path
|
||||
oldForceSyncFlag := ingest.ForceSyncFlagForTest.Load()
|
||||
ingest.ForceSyncFlagForTest.Store(true)
|
||||
defer ingest.ForceSyncFlagForTest.Store(oldForceSyncFlag)
|
||||
|
||||
tk.MustExec("alter table t add index idx(b);")
|
||||
|
||||
// Assert that internal session TS was registered (this is what blocks GC)
|
||||
require.True(t, internalTSRegistered.Load(), "internal session start TS should be registered for GC blocking")
|
||||
|
||||
tk.MustExec("admin check table t;")
|
||||
|
||||
// Verify data consistency
|
||||
rs1 := tk.MustQuery("select count(*) from t use index(idx);").Rows()
|
||||
rs2 := tk.MustQuery("select count(*) from t ignore index(idx);").Rows()
|
||||
require.Equal(t, rs1[0][0], rs2[0][0])
|
||||
require.Equal(t, "100", rs1[0][0])
|
||||
|
||||
// Verify the job used ingest mode
|
||||
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
|
||||
require.Len(t, rows, 1)
|
||||
jobTp := rows[0][12].(string)
|
||||
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
|
||||
}
|
||||
|
||||
// TestIngestCancelCleanupOrder tests that canceling add index after ingest starts
|
||||
// doesn't cause nil pointer panic due to incorrect cleanup order.
|
||||
// This covers issues #43323 and #43326 (weak coverage).
|
||||
// The bug was: during rollback, Close engine was called before cleaning up local path.
|
||||
func TestIngestCancelCleanupOrder(t *testing.T) {
|
||||
if kerneltype.IsNextGen() {
|
||||
t.Skip("DXF is always enabled on nextgen")
|
||||
}
|
||||
store := realtikvtest.CreateMockStoreAndSetup(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("drop database if exists addindexlit;")
|
||||
tk.MustExec("create database addindexlit;")
|
||||
tk.MustExec("use addindexlit;")
|
||||
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
|
||||
tk.MustExec("set global tidb_enable_dist_task = off;")
|
||||
|
||||
tk.MustExec("create table t (a int primary key, b int);")
|
||||
for i := 0; i < 100; i++ {
|
||||
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i, i))
|
||||
}
|
||||
|
||||
tkCancel := testkit.NewTestKit(t, store)
|
||||
tkCancel.MustExec("use addindexlit;")
|
||||
|
||||
// Cancel after backfill starts (WriteReorganization state with running backfill)
|
||||
var jobID atomic.Int64
|
||||
backfillStarted := atomic.Bool{}
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDeliveryJob", func(job *model.Job) {
|
||||
if job.Type == model.ActionAddIndex {
|
||||
jobID.Store(job.ID)
|
||||
}
|
||||
})
|
||||
cancelOnce := sync.Once{}
|
||||
cancelIssued := atomic.Bool{}
|
||||
cancelExecErrCh := make(chan error, 1)
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/beforeBackendIngest", func() {
|
||||
backfillStarted.Store(true)
|
||||
if jobID.Load() <= 0 {
|
||||
return
|
||||
}
|
||||
cancelOnce.Do(func() {
|
||||
cancelIssued.Store(true)
|
||||
rs, err := tkCancel.Exec(fmt.Sprintf("admin cancel ddl jobs %d", jobID.Load()))
|
||||
if rs != nil {
|
||||
closeErr := rs.Close()
|
||||
if err == nil {
|
||||
err = closeErr
|
||||
}
|
||||
}
|
||||
cancelExecErrCh <- err
|
||||
})
|
||||
})
|
||||
|
||||
err := tk.ExecToErr("alter table t add index idx(b);")
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "Cancelled DDL job")
|
||||
|
||||
// Key assertion: backfill actually started before cancel
|
||||
require.True(t, backfillStarted.Load(), "backfill should have started before cancel")
|
||||
require.True(t, cancelIssued.Load(), "cancel should be issued after ingest starts")
|
||||
select {
|
||||
case cancelErr := <-cancelExecErrCh:
|
||||
require.NoError(t, cancelErr)
|
||||
case <-time.After(5 * time.Second):
|
||||
require.Fail(t, "cancel exec should have finished")
|
||||
}
|
||||
|
||||
// Verify no panic occurred and table is consistent
|
||||
tk.MustExec("admin check table t;")
|
||||
|
||||
// Verify backend context is cleaned up properly
|
||||
cnt := ingest.LitDiskRoot.Count()
|
||||
require.Equal(t, 0, cnt, "backend context should be cleaned up after cancel")
|
||||
}
|
||||
|
||||
func TestMergeTempIndexSplitConflictTxn(t *testing.T) {
|
||||
if kerneltype.IsNextGen() {
|
||||
t.Skip("DXF is always enabled on nextgen")
|
||||
|
||||
Reference in New Issue
Block a user