infoschema: fix snapshot read bug for information_schema.tables (#55830)
close pingcap/tidb#55827
This commit is contained in:
@ -653,19 +653,23 @@ type TableItem struct {
|
||||
|
||||
// IterateAllTableItems is used for special performance optimization.
|
||||
// Used by executor/infoschema_reader.go to handle reading from INFORMATION_SCHEMA.TABLES.
|
||||
// If visit return false, stop the iterate process.
|
||||
func (is *infoschemaV2) IterateAllTableItems(visit func(TableItem) bool) {
|
||||
pivot, ok := is.byName.Max()
|
||||
max, ok := is.byName.Max()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if !visit(TableItem{DBName: pivot.dbName, TableName: pivot.tableName}) {
|
||||
return
|
||||
}
|
||||
is.byName.Descend(pivot, func(item tableItem) bool {
|
||||
if pivot.dbName == item.dbName && pivot.tableName == item.tableName {
|
||||
return true // skip MVCC version
|
||||
var pivot *tableItem
|
||||
is.byName.Descend(max, func(item tableItem) bool {
|
||||
if item.schemaVersion > is.schemaMetaVersion {
|
||||
// skip MVCC version, those items are not visible to the queried schema version
|
||||
return true
|
||||
}
|
||||
pivot = item
|
||||
if pivot != nil && pivot.dbName == item.dbName && pivot.tableName == item.tableName {
|
||||
// skip MVCC version, this db.table has been visited already
|
||||
return true
|
||||
}
|
||||
pivot = &item
|
||||
if !item.tomb {
|
||||
return visit(TableItem{DBName: item.dbName, TableName: item.tableName})
|
||||
}
|
||||
|
||||
@ -8,7 +8,7 @@ go_test(
|
||||
"v2_test.go",
|
||||
],
|
||||
flaky = True,
|
||||
shard_count = 10,
|
||||
shard_count = 11,
|
||||
deps = [
|
||||
"//pkg/domain",
|
||||
"//pkg/domain/infosync",
|
||||
|
||||
@ -504,3 +504,32 @@ func TestSchemaSimpleTableInfos(t *testing.T) {
|
||||
require.Equal(t, tblInfos[0].Name.L, "t2")
|
||||
require.Equal(t, tblInfos[1].Name.L, "t1")
|
||||
}
|
||||
|
||||
func TestSnapshotInfoschemaReader(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
|
||||
safePointName := "tikv_gc_safe_point"
|
||||
safePointValue := "20160102-15:04:05 -0700"
|
||||
safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
|
||||
updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
|
||||
ON DUPLICATE KEY
|
||||
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
|
||||
tk.MustExec(updateSafePoint)
|
||||
|
||||
tk.MustExec("create database issue55827")
|
||||
tk.MustExec("use issue55827")
|
||||
|
||||
time1 := time.Now()
|
||||
timeStr := time1.Format("2006-1-2 15:04:05.000")
|
||||
|
||||
tk.MustExec("create table t (id int primary key);")
|
||||
tk.MustQuery("select count(*) from INFORMATION_SCHEMA.TABLES where table_schema = 'issue55827'").Check(testkit.Rows("1"))
|
||||
tk.MustQuery("select count(tidb_table_id) from INFORMATION_SCHEMA.TABLES where table_schema = 'issue55827'").Check(testkit.Rows("1"))
|
||||
|
||||
// For issue 55827
|
||||
sql := fmt.Sprintf("select count(*) from INFORMATION_SCHEMA.TABLES as of timestamp '%s' where table_schema = 'issue55827'", timeStr)
|
||||
tk.MustQuery(sql).Check(testkit.Rows("0"))
|
||||
sql = fmt.Sprintf("select * from INFORMATION_SCHEMA.TABLES as of timestamp '%s' where table_schema = 'issue55827'", timeStr)
|
||||
tk.MustQuery(sql).Check(testkit.Rows())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user