diff --git a/pkg/ddl/attributes_sql_test.go b/pkg/ddl/attributes_sql_test.go index b5215b9b6e..8449a1487a 100644 --- a/pkg/ddl/attributes_sql_test.go +++ b/pkg/ddl/attributes_sql_test.go @@ -252,7 +252,7 @@ PARTITION BY RANGE (c) ( func TestFlashbackTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache()) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -310,7 +310,7 @@ PARTITION BY RANGE (c) ( func TestDropTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache()) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -363,7 +363,7 @@ PARTITION BY RANGE (c) ( func TestCreateWithSameName(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache()) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -427,7 +427,7 @@ PARTITION BY RANGE (c) ( func TestPartition(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache()) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/pkg/ddl/main_test.go b/pkg/ddl/main_test.go index 75ffa1cd3f..e741f53bf7 100644 --- a/pkg/ddl/main_test.go +++ b/pkg/ddl/main_test.go @@ -53,7 +53,7 @@ func TestMain(m *testing.M) { conf.Experimental.AllowsExpressionIndex = true }) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, nil) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err) os.Exit(1) diff --git a/pkg/ddl/tests/serial/main_test.go b/pkg/ddl/tests/serial/main_test.go index 6609434329..ed48da9f0d 100644 --- a/pkg/ddl/tests/serial/main_test.go +++ b/pkg/ddl/tests/serial/main_test.go @@ -57,7 +57,7 @@ func TestMain(m *testing.M) { conf.Experimental.AllowsExpressionIndex = true }) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, nil) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err) os.Exit(1) diff --git a/pkg/domain/db_test.go b/pkg/domain/db_test.go index 86bedb2513..9b083f062b 100644 --- a/pkg/domain/db_test.go +++ b/pkg/domain/db_test.go @@ -80,7 +80,7 @@ func TestNormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, domain.InfoCache()) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" @@ -113,7 +113,7 @@ func TestAbnormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true, domain.InfoCache()) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 4b14d1ae8b..3de1a7d839 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1405,7 +1405,7 @@ func (do *Domain) Init( skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, pdCli, pdHTTPCli, - do.Store().GetCodec(), skipRegisterToDashboard) + do.Store().GetCodec(), skipRegisterToDashboard, do.infoCache) if err != nil { return err } diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 26457b5dca..e0f25e8c3c 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -126,6 +126,7 @@ type InfoSyncer struct { scheduleManager ScheduleManager tiflashReplicaManager TiFlashReplicaManager resourceManagerClient pd.ResourceManagerClient + infoCache infoschemaMinTS } // ServerInfo is server static information. @@ -202,6 +203,10 @@ func SetPDHttpCliForTest(cli pdhttp.Client) func() { } } +type infoschemaMinTS interface { + GetAndResetRecentInfoSchemaTS(now uint64) uint64 +} + // GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing. func GlobalInfoSyncerInit( ctx context.Context, @@ -211,6 +216,7 @@ func GlobalInfoSyncerInit( pdCli pd.Client, pdHTTPCli pdhttp.Client, codec tikv.Codec, skipRegisterToDashBoard bool, + infoCache infoschemaMinTS, ) (*InfoSyncer, error) { if pdHTTPCli != nil { pdHTTPCli = pdHTTPCli. @@ -224,6 +230,7 @@ func GlobalInfoSyncerInit( info: getServerInfo(id, serverIDGetter), serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id), minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id), + infoCache: infoCache, } err := is.init(ctx, skipRegisterToDashBoard) if err != nil { @@ -801,6 +808,14 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { } } + if is.infoCache != nil { + schemaTS := is.infoCache.GetAndResetRecentInfoSchemaTS(currentVer.Ver) + logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("InfoSchema Recent StartTS", schemaTS)) + if schemaTS > startTSLowerLimit && schemaTS < minStartTS { + minStartTS = schemaTS + } + } + is.minStartTS = kv.GetMinInnerTxnStartTS(now, startTSLowerLimit, minStartTS) err = is.storeMinStartTS(context.Background()) diff --git a/pkg/domain/infosync/info_test.go b/pkg/domain/infosync/info_test.go index f8bf26dbb7..3f85c8289c 100644 --- a/pkg/domain/infosync/info_test.go +++ b/pkg/domain/infosync/info_test.go @@ -71,7 +71,7 @@ func TestTopology(t *testing.T) { require.NoError(t, err) }() - info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, nil, keyspace.CodecV1, false) + info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, nil, keyspace.CodecV1, false, nil) require.NoError(t, err) err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) @@ -156,7 +156,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) { } func TestPutBundlesRetry(t *testing.T) { - _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false) + _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false, nil) require.NoError(t, err) bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"}) @@ -220,7 +220,7 @@ func TestPutBundlesRetry(t *testing.T) { func TestTiFlashManager(t *testing.T) { ctx := context.Background() - _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false) + _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false, nil) tiflash := NewMockTiFlash() SetMockTiFlash(tiflash) diff --git a/pkg/infoschema/cache.go b/pkg/infoschema/cache.go index a58dca8c2e..1285b4f19f 100644 --- a/pkg/infoschema/cache.go +++ b/pkg/infoschema/cache.go @@ -59,6 +59,25 @@ func NewCache(r autoid.Requirement, capacity int) *InfoCache { } } +// GetAndResetRecentInfoSchemaTS provides the min start ts for infosync.InfoSyncer. +// It works like this: +// +// There is a background infosync worker calling ReportMinStartTS() function periodically. +// At the beginning of each round, the Data.recentMinTS here is reset to current TS. +// If InfoSchemaV2 APIs are called, there is an internal keepAlive() function will also be called. +// The keepAlive() function will compare the InfoSchemaV2's ts with Data.recentMinTS, and +// update the Data.recentMinTS to smaller one. +// +// In a nutshell, every round of ReportMinStartTS(), the minimal known TS used be InfoSchemaV2 APIs will be reported. +// Some corner cases might happen: the caller take an InfoSchemaV2 instance and not use it immediately. +// Seveval rounds later, that InfoSchema is used and its TS is reported to block GC safepoint advancing. +// But that's too late, the GC has been done, "GC life time is shorter than transaction duration" error still happen. +func (h *InfoCache) GetAndResetRecentInfoSchemaTS(now uint64) uint64 { + ret := h.Data.recentMinTS.Load() + h.Data.recentMinTS.Store(now) + return ret +} + // ReSize re-size the cache. func (h *InfoCache) ReSize(capacity int) { h.mu.Lock() diff --git a/pkg/infoschema/infoschema_v2.go b/pkg/infoschema/infoschema_v2.go index 1b7f254d42..f4c6efb79f 100644 --- a/pkg/infoschema/infoschema_v2.go +++ b/pkg/infoschema/infoschema_v2.go @@ -20,6 +20,7 @@ import ( "math" "strings" "sync" + "sync/atomic" "time" "github.com/ngaut/pools" @@ -122,6 +123,9 @@ type Data struct { // TTLInfo, TiFlashReplica // PlacementPolicyRef, Partition might be added later, and also ForeignKeys, TableLock etc tableInfoResident *btree.BTreeG[tableInfoItem] + + // the minimum ts of the recent used infoschema + recentMinTS atomic.Uint64 } type tableInfoItem struct { @@ -608,6 +612,7 @@ func (is *infoschemaV2) TableByID(ctx context.Context, id int64) (val table.Tabl return } + is.keepAlive() itm, ok := is.searchTableItemByID(id) if !ok { return nil, false @@ -769,8 +774,8 @@ func (is *infoschemaV2) TableByName(ctx context.Context, schema, tbl pmodel.CISt return nil, ErrTableNotExists.FastGenByArgs(schema, tbl) } + is.keepAlive() start := time.Now() - var h tableByNameHelper h.end = tableItem{dbName: schema, tableName: tbl, schemaVersion: math.MaxInt64} h.schemaVersion = is.infoSchema.schemaMetaVersion @@ -819,6 +824,22 @@ func (is *infoschemaV2) TableInfoByID(id int64) (*model.TableInfo, bool) { return getTableInfo(tbl), ok } +// keepAlive prevents the "GC life time is shorter than transaction duration" error on infoschema v2. +// It works by collecting the min TS of the during infoschem v2 API calls, and +// reports the min TS to info.InfoSyncer. +func (is *infoschemaV2) keepAlive() { + for { + v := is.Data.recentMinTS.Load() + if v <= is.ts { + break + } + succ := is.Data.recentMinTS.CompareAndSwap(v, is.ts) + if succ { + break + } + } +} + // SchemaTableInfos implements MetaOnlyInfoSchema. func (is *infoschemaV2) SchemaTableInfos(ctx context.Context, schema pmodel.CIStr) ([]*model.TableInfo, error) { if IsSpecialDB(schema.L) { @@ -834,6 +855,7 @@ func (is *infoschemaV2) SchemaTableInfos(ctx context.Context, schema pmodel.CISt return nil, nil // something wrong? } + is.keepAlive() retry: dbInfo, ok := is.SchemaByName(schema) if !ok { diff --git a/pkg/infoschema/test/infoschemav2test/BUILD.bazel b/pkg/infoschema/test/infoschemav2test/BUILD.bazel index ebfa163c61..584c70f93a 100644 --- a/pkg/infoschema/test/infoschemav2test/BUILD.bazel +++ b/pkg/infoschema/test/infoschemav2test/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "v2_test.go", ], flaky = True, - shard_count = 12, + shard_count = 13, deps = [ "//pkg/domain", "//pkg/domain/infosync", diff --git a/pkg/infoschema/test/infoschemav2test/v2_test.go b/pkg/infoschema/test/infoschemav2test/v2_test.go index 132e2d0642..f476a54ba3 100644 --- a/pkg/infoschema/test/infoschemav2test/v2_test.go +++ b/pkg/infoschema/test/infoschemav2test/v2_test.go @@ -17,6 +17,7 @@ package infoschemav2test import ( "context" "fmt" + "math" "slices" "sort" "strconv" @@ -558,3 +559,54 @@ func TestInfoSchemaCachedAutoIncrement(t *testing.T) { tk.MustExec("drop table t1;") // trigger infoschema cache reload tk.MustQuery(autoIncQuery).Check(testkit.Rows("0")) } + +func TestGetAndResetRecentInfoSchemaTS(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + + // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. + timeSafe := time.Now().Add(-48 * 60 * 60 * time.Second).Format("20060102-15:04:05 -0700 MST") + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + tk.MustExec(fmt.Sprintf(safePointSQL, timeSafe)) + + tk.MustExec("use test") + infoCache := dom.InfoCache() + schemaTS1 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64) + + // After some DDL changes + tk.MustExec("create table dummytbl (id int)") + schemaTS2 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64) + require.LessOrEqual(t, schemaTS1, schemaTS2) + + ts, err := store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("alter table dummytbl add column (c int)") + schemaTS3 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64) + require.LessOrEqual(t, schemaTS2, schemaTS3) + + tk.MustExec("alter table dummytbl add index idx(c)") + schemaTS4 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64) + require.LessOrEqual(t, schemaTS3, schemaTS4) + + // Reload several times + require.NoError(t, dom.Reload()) + schemaTS5 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64) + require.Equal(t, uint64(math.MaxUint64), schemaTS5) + + require.NoError(t, dom.Reload()) + schemaTS6 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64) + require.Equal(t, uint64(math.MaxUint64), schemaTS6) + + tk.MustQuery("select * from dummytbl").Check(testkit.Rows()) + schemaTS7 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64) + require.Less(t, schemaTS4, schemaTS7) + + // Now snapshot read using old infoschema + tk.MustExec(fmt.Sprintf("set @@tidb_snapshot = %d", ts)) + tk.MustQuery("select * from dummytbl").Check(testkit.Rows()) + schemaTS8 := infoCache.GetAndResetRecentInfoSchemaTS(math.MaxUint64) + require.True(t, schemaTS8 < schemaTS7 && schemaTS8 > schemaTS2) +} diff --git a/pkg/server/stat_test.go b/pkg/server/stat_test.go index edc26eed42..cd450a3bcf 100644 --- a/pkg/server/stat_test.go +++ b/pkg/server/stat_test.go @@ -50,7 +50,7 @@ func TestUptime(t *testing.T) { }() require.NoError(t, err) - _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true) + _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true, dom.InfoCache()) require.NoError(t, err) tidbdrv := NewTiDBDriver(store)