Files
tidb/pkg/table/tables/cache_test.go
2025-05-07 10:19:40 +00:00

556 lines
18 KiB
Go

// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tables_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/stmtsummary"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)
func lastReadFromCache(tk *testkit.TestKit) bool {
return tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache
}
func TestCacheTableBasicScan(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tmp1")
tk.MustExec("create table tmp1 (id int primary key auto_increment, u int unique, v int)")
tk.MustExec("insert into tmp1 values" +
"(1, 101, 1001), (3, 113, 1003), (5, 105, 1005), (7, 117, 1007), (9, 109, 1009)," +
"(10, 110, 1010), (12, 112, 1012), (14, 114, 1014), (16, 116, 1016), (18, 118, 1018)",
)
tk.MustExec("alter table tmp1 cache")
// For TableReader
// First read will read from original table
tk.MustQuery("select * from tmp1 where id>3 order by id").Check(testkit.Rows(
"5 105 1005", "7 117 1007", "9 109 1009",
"10 110 1010", "12 112 1012", "14 114 1014", "16 116 1016", "18 118 1018",
))
// Test for join two cache table
tk.MustExec("drop table if exists join_t1, join_t2, join_t3")
tk.MustExec("create table join_t1 (id int)")
tk.MustExec("insert into join_t1 values(1)")
tk.MustExec("alter table join_t1 cache")
tk.MustQuery("select *from join_t1").Check(testkit.Rows("1"))
tk.MustExec("create table join_t2 (id int)")
tk.MustExec("insert into join_t2 values(2)")
tk.MustExec("alter table join_t2 cache")
tk.MustQuery("select *from join_t2").Check(testkit.Rows("2"))
tk.MustExec("create table join_t3 (id int)")
tk.MustExec("insert into join_t3 values(3)")
planUsed := false
for range 10 {
tk.MustQuery("select *from join_t1 join join_t2").Check(testkit.Rows("1 2"))
if lastReadFromCache(tk) {
planUsed = true
break
}
}
require.True(t, planUsed)
// Test for join a cache table and a normal table
for range 10 {
tk.MustQuery("select * from join_t1 join join_t3").Check(testkit.Rows("1 3"))
if lastReadFromCache(tk) {
planUsed = true
break
}
}
require.True(t, planUsed)
// Second read will from cache table
for range 100 {
tk.MustQuery("select * from tmp1 where id>4 order by id").Check(testkit.Rows(
"5 105 1005", "7 117 1007", "9 109 1009",
"10 110 1010", "12 112 1012", "14 114 1014", "16 116 1016", "18 118 1018",
))
if lastReadFromCache(tk) {
planUsed = true
break
}
}
require.True(t, planUsed)
// For IndexLookUpReader
for range 10 {
tk.MustQuery("select /*+ use_index(tmp1, u) */ * from tmp1 where u>101 order by u").Check(testkit.Rows(
"5 105 1005", "9 109 1009", "10 110 1010",
"12 112 1012", "3 113 1003", "14 114 1014", "16 116 1016", "7 117 1007", "18 118 1018",
))
if lastReadFromCache(tk) {
planUsed = true
break
}
}
require.True(t, planUsed)
// For IndexReader
tk.MustQuery("select /*+ use_index(tmp1, u) */ id,u from tmp1 where u>101 order by id").Check(testkit.Rows(
"3 113", "5 105", "7 117", "9 109", "10 110",
"12 112", "14 114", "16 116", "18 118",
))
tk.MustQuery("show warnings").Check(testkit.Rows())
// For IndexMerge, cache table should not use index merge
tk.MustQuery("select /*+ use_index_merge(tmp1, primary, u) */ * from tmp1 where id>5 or u>110 order by u").Check(testkit.Rows(
"9 109 1009", "10 110 1010",
"12 112 1012", "3 113 1003", "14 114 1014", "16 116 1016", "7 117 1007", "18 118 1018",
))
tk.MustQuery("show warnings").Check(testkit.Rows())
}
func TestCacheCondition(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t2 (id int primary key, v int)")
tk.MustExec("alter table t2 cache")
// Explain should not trigger cache.
for range 10 {
tk.MustQuery("explain select * from t2")
time.Sleep(100 * time.Millisecond)
require.False(t, lastReadFromCache(tk))
}
// Insert should not trigger cache.
for i := range 10 {
tk.MustExec(fmt.Sprintf("insert into t2 values (%d,%d)", i, i))
time.Sleep(100 * time.Millisecond)
require.False(t, lastReadFromCache(tk))
}
// Update should not trigger cache.
for range 10 {
tk.MustExec("update t2 set v = v + 1 where id > 0")
time.Sleep(100 * time.Millisecond)
require.False(t, lastReadFromCache(tk))
}
// Contains PointGet Update should not trigger cache.
for range 10 {
tk.MustExec("update t2 set v = v + 1 where id = 2")
time.Sleep(100 * time.Millisecond)
require.False(t, lastReadFromCache(tk))
}
// Contains PointGet Delete should not trigger cache.
for i := range 10 {
tk.MustExec(fmt.Sprintf("delete from t2 where id = %d", i))
time.Sleep(100 * time.Millisecond)
require.False(t, lastReadFromCache(tk))
}
// Normal query should trigger cache.
tk.MustQuery("select * from t2")
cacheUsed := false
for range 100 {
tk.MustQuery("select * from t2")
if lastReadFromCache(tk) {
cacheUsed = true
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, cacheUsed)
}
func TestCacheTableBasicReadAndWrite(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk.MustExec("drop table if exists write_tmp1")
tk.MustExec("create table write_tmp1 (id int primary key auto_increment, u int unique, v int)")
tk.MustExec("insert into write_tmp1 values" +
"(1, 101, 1001), (3, 113, 1003)",
)
tk.MustExec("alter table write_tmp1 cache")
// Read and add read lock
tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "3 113 1003"))
// read lock should valid
var i int
for i = range 10 {
if lastReadFromCache(tk) {
break
}
// Wait for the cache to be loaded.
time.Sleep(50 * time.Millisecond)
tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "3 113 1003"))
}
require.True(t, i < 10)
tk.MustExec("use test")
tk1.MustExec("insert into write_tmp1 values (2, 222, 222)")
// write lock exists
tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001",
"2 222 222",
"3 113 1003"))
require.False(t, lastReadFromCache(tk))
// wait write lock expire and check cache can be used again
for !lastReadFromCache(tk) {
tk.MustQuery("select * from write_tmp1").Check(testkit.Rows(
"1 101 1001",
"2 222 222",
"3 113 1003"))
}
tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003"))
tk1.MustExec("update write_tmp1 set v = 3333 where id = 2")
for !lastReadFromCache(tk) {
tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003"))
}
tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003"))
}
func TestCacheTableComplexRead(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")
tk1.MustExec("create table complex_cache (id int primary key auto_increment, u int unique, v int)")
tk1.MustExec("insert into complex_cache values" + "(5, 105, 1005), (7, 117, 1007), (9, 109, 1009)")
tk1.MustExec("alter table complex_cache cache")
tk1.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009"))
var i int
for i = range 100 {
time.Sleep(100 * time.Millisecond)
tk1.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009"))
if lastReadFromCache(tk1) {
break
}
}
require.True(t, i < 10)
tk1.MustExec("begin")
tk2.MustExec("begin")
tk2.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009"))
for i = range 10 {
time.Sleep(100 * time.Millisecond)
tk2.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009"))
if lastReadFromCache(tk2) {
break
}
}
require.True(t, i < 10)
tk2.MustExec("commit")
tk1.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009"))
require.True(t, lastReadFromCache(tk1))
tk1.MustExec("commit")
}
func TestBeginSleepABA(t *testing.T) {
// During the change "cache1 -> no cache -> cache2",
// cache1 and cache2 may be not the same anymore
// A transaction should not only check the cache exists, but also check the cache unchanged.
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")
tk1.MustExec("drop table if exists aba")
tk1.MustExec("create table aba (id int, v int)")
tk1.MustExec("insert into aba values (1, 1)")
tk1.MustExec("alter table aba cache")
tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1"))
cacheUsed := false
for range 100 {
tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1"))
if lastReadFromCache(tk1) {
cacheUsed = true
break
}
}
require.True(t, cacheUsed)
// Begin, read from cache.
tk1.MustExec("begin")
tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1"))
if !lastReadFromCache(tk1) {
// TODO: should read from cache, but it is not stable
// It is a bug, ref https://github.com/pingcap/tidb/issues/36838
t.Skip("unstable now, skip")
return
}
// Another session change the data and make the cache unavailable.
tk2.MustExec("update aba set v = 2")
// And then make the cache available again.
cacheUsed = false
for range 100 {
tk2.MustQuery("select * from aba").Check(testkit.Rows("1 2"))
if lastReadFromCache(tk2) {
cacheUsed = true
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, cacheUsed)
// tk1 should not use the staled cache, because the data is changed.
tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1"))
require.False(t, lastReadFromCache(tk1))
}
func TestRenewLease(t *testing.T) {
// Test RenewLeaseForRead
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk1 := testkit.NewTestKit(t, store)
se := tk.Session()
tk.MustExec("create table cache_renew_t (id int)")
tk.MustExec("alter table cache_renew_t cache")
tbl, err := se.GetInfoSchema().(infoschema.InfoSchema).TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("cache_renew_t"))
require.NoError(t, err)
var i int
tk.MustExec("select * from cache_renew_t")
remote := tables.NewStateRemote(tk1.Session())
var leaseBefore uint64
for i = range 20 {
time.Sleep(200 * time.Millisecond)
lockType, lease, err := remote.Load(context.Background(), tbl.Meta().ID)
require.NoError(t, err)
if lockType == tables.CachedTableLockRead {
leaseBefore = lease
break
}
}
require.True(t, i < 20)
for i = range 20 {
time.Sleep(200 * time.Millisecond)
tk.MustExec("select * from cache_renew_t")
lockType, lease, err := remote.Load(context.Background(), tbl.Meta().ID)
require.NoError(t, err)
require.Equal(t, lockType, tables.CachedTableLockRead)
if leaseBefore != lease {
break
}
}
require.True(t, i < 20)
}
func TestCacheTableWriteOperatorWaitLockLease(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_stmt_summary = 1")
se := tk.Session()
// This line is a hack, if auth user string is "", the statement summary is skipped,
// so it's added to make the later code been covered.
require.NoError(t, se.Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost"}, nil, nil, nil))
tk.MustExec("drop table if exists wait_tb1")
tk.MustExec("create table wait_tb1(id int)")
tk.MustExec("alter table wait_tb1 cache")
var i int
for i = range 10 {
tk.MustQuery("select * from wait_tb1").Check(testkit.Rows())
if lastReadFromCache(tk) {
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, i < 10)
stmtsummary.StmtSummaryByDigestMap.Clear()
tk.MustExec("insert into wait_tb1 values(1)")
require.True(t, se.GetSessionVars().StmtCtx.WaitLockLeaseTime > 0)
tk.MustQuery("select DIGEST_TEXT from INFORMATION_SCHEMA.STATEMENTS_SUMMARY where MAX_BACKOFF_TIME > 0 or MAX_WAIT_TIME > 0").Check(testkit.Rows("insert into `wait_tb1` values ( ? )"))
}
func TestTableCacheLeaseVariable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// Check default value.
tk.MustQuery("select @@global.tidb_table_cache_lease").Check(testkit.Rows("3"))
// Check a valid value.
tk.MustExec("set @@global.tidb_table_cache_lease = 1;")
tk.MustQuery("select @@global.tidb_table_cache_lease").Check(testkit.Rows("1"))
// Check a invalid value, the valid range is [2, 10]
tk.MustExec("set @@global.tidb_table_cache_lease = 111;")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_table_cache_lease value: '111'"))
tk.MustQuery("select @@global.tidb_table_cache_lease").Check(testkit.Rows("10"))
// Change to a non-default value and verify the behaviour.
tk.MustExec("set @@global.tidb_table_cache_lease = 2;")
tk.MustExec("drop table if exists test_lease_variable;")
tk.MustExec(`create table test_lease_variable(c0 int, c1 varchar(20), c2 varchar(20), unique key uk(c0));`)
tk.MustExec(`insert into test_lease_variable(c0, c1, c2) values (1, null, 'green');`)
tk.MustExec(`alter table test_lease_variable cache;`)
cached := false
for range 20 {
tk.MustQuery("select * from test_lease_variable").Check(testkit.Rows("1 <nil> green"))
if lastReadFromCache(tk) {
cached = true
break
}
time.Sleep(50 * time.Millisecond)
}
require.True(t, cached)
start := time.Now()
tk.MustExec("update test_lease_variable set c0 = 2")
duration := time.Since(start)
// The lease is 2s, check how long the write operation takes.
require.True(t, duration > time.Second)
require.True(t, duration < 3*time.Second)
}
func TestMetrics(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists test_metrics;")
tk.MustExec(`create table test_metrics(c0 int, c1 varchar(20), c2 varchar(20), unique key uk(c0));`)
tk.MustExec(`create table nt (c0 int, c1 varchar(20), c2 varchar(20), unique key uk(c0));`)
tk.MustExec(`insert into test_metrics(c0, c1, c2) values (1, null, 'green');`)
tk.MustExec(`alter table test_metrics cache;`)
tk.MustQuery("select * from test_metrics").Check(testkit.Rows("1 <nil> green"))
cached := false
for range 20 {
if lastReadFromCache(tk) {
cached = true
break
}
time.Sleep(50 * time.Millisecond)
tk.MustQuery("select * from test_metrics").Check(testkit.Rows("1 <nil> green"))
}
require.True(t, cached)
counter := metrics.ReadFromTableCacheCounter
pb := &dto.Metric{}
queries := []string{
// Table scan
"select * from test_metrics",
// Index scan
"select c0 from test_metrics use index(uk) where c0 > 1",
// Index Lookup
"select c1 from test_metrics use index(uk) where c0 = 1",
// Point Get
"select c0 from test_metrics use index(uk) where c0 = 1",
// // Aggregation
"select count(*) from test_metrics",
// Join
"select * from test_metrics as a join test_metrics as b on a.c0 = b.c0 where a.c1 != 'xxx'",
}
counter.Write(pb)
i := pb.GetCounter().GetValue()
for _, query := range queries {
tk.MustQuery(query)
i++
counter.Write(pb)
hit := pb.GetCounter().GetValue()
require.Equal(t, i, hit)
}
// A counter-example that doesn't increase metrics.ReadFromTableCacheCounter.
tk.MustQuery("select * from nt")
counter.Write(pb)
hit := pb.GetCounter().GetValue()
require.Equal(t, i, hit)
}
func TestRenewLeaseABAFailPoint(t *testing.T) {
store := testkit.CreateMockStore(t)
tables.TestMockRenewLeaseABA2 = make(chan struct{})
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t_lease;")
tk.MustExec(`create table t_lease(a int, b int);`)
tk.MustExec(`insert into t_lease values (1, 1)`)
tk.MustExec(`alter table t_lease cache`)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")
// Load the cache data by this query.
require.Eventually(t, func() bool {
tk.MustQuery("select * from t_lease").Check(testkit.Rows("1 1"))
return lastReadFromCache(tk)
}, 3*time.Second, 100*time.Millisecond)
// Renew lease by this query, mock the operation is delayed.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/table/tables/mockRenewLeaseABA1", `return`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/table/tables/mockRenewLeaseABA2", `return`))
tk.MustQuery("select * from t_lease").Check(testkit.Rows("1 1"))
// Make the cache data stale after writing: read lock-> write lock
tk1.MustExec("update t_lease set b = 2 where a = 1")
// Mock reading from another TiDB instance: write lock -> read lock
is := tk2.Session().GetInfoSchema().(infoschema.InfoSchema)
tbl, err := is.TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t_lease"))
require.NoError(t, err)
lease := oracle.GoTimeToTS(time.Now().Add(20 * time.Second)) // A big enough future time
tk2.MustExec("update mysql.table_cache_meta set lock_type = 'READ', lease = ? where tid = ?", lease, tbl.Meta().ID)
// Then the stagnant renew lease operation finally arrive.
tables.TestMockRenewLeaseABA2 <- struct{}{}
<-tables.TestMockRenewLeaseABA2
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/table/tables/mockRenewLeaseABA1"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/table/tables/mockRenewLeaseABA2"))
// The renew lease operation should not success,
// And the session should not read from a staled cache data.
tk.MustQuery("select * from t_lease").Check(testkit.Rows("1 2"))
require.False(t, lastReadFromCache(tk))
}