Files
tidb/pkg/sessiontxn/staleread/processor_test.go

464 lines
18 KiB
Go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package staleread_test
import (
"context"
"fmt"
"strconv"
"testing"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/table/temptable"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)
type staleReadPoint struct {
tk *testkit.TestKit
ts uint64
dt string
tm time.Time
is infoschema.InfoSchema
tn *ast.TableName
}
func (p *staleReadPoint) checkMatchProcessor(t *testing.T, processor staleread.Processor, hasEvaluator bool) {
require.True(t, processor.IsStaleness())
require.Equal(t, p.ts, processor.GetStalenessReadTS())
require.Equal(t, p.is.SchemaMetaVersion(), processor.GetStalenessInfoSchema().SchemaMetaVersion())
require.IsTypef(t, processor.GetStalenessInfoSchema(), temptable.AttachLocalTemporaryTableInfoSchema(p.tk.Session(), p.is), "")
evaluator := processor.GetStalenessTSEvaluatorForPrepare()
if hasEvaluator {
require.NotNil(t, evaluator)
ts, err := evaluator(context.Background(), p.tk.Session())
require.NoError(t, err)
require.Equal(t, processor.GetStalenessReadTS(), ts)
} else {
require.Nil(t, evaluator)
}
}
func genStaleReadPoint(t *testing.T, tk *testkit.TestKit) *staleReadPoint {
tk.MustExec("create table if not exists test.t(a bigint)")
tk.MustExec(fmt.Sprintf("alter table test.t alter column a set default %d", time.Now().UnixNano()))
time.Sleep(time.Millisecond * 20)
is := domain.GetDomain(tk.Session()).InfoSchema()
dt := tk.MustQuery("select now(3)").Rows()[0][0].(string)
tm, err := time.ParseInLocation("2006-01-02 15:04:05.999999", dt, tk.Session().GetSessionVars().Location())
require.NoError(t, err)
ts := oracle.GoTimeToTS(tm)
tn := astTableWithAsOf(t, dt)
return &staleReadPoint{
tk: tk,
ts: ts,
dt: dt,
tm: tm,
is: is,
tn: tn,
}
}
func astTableWithAsOf(t *testing.T, dt string) *ast.TableName {
p := parser.New()
var sql string
if dt == "" {
sql = "select * from test.t"
} else {
sql = fmt.Sprintf("select * from test.t as of timestamp '%s'", dt)
}
stmt, err := p.ParseOneStmt(sql, "", "")
require.NoError(t, err)
sel := stmt.(*ast.SelectStmt)
return sel.From.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName)
}
func getCurrentExternalTimestamp(t *testing.T, tk *testkit.TestKit) uint64 {
externalTimestampStr := tk.MustQuery("select @@tidb_external_ts").Rows()[0][0].(string)
externalTimestamp, err := strconv.ParseUint(externalTimestampStr, 10, 64)
require.NoError(t, err)
return externalTimestamp
}
func TestStaleReadProcessorWithSelectTable(t *testing.T) {
store := testkit.CreateMockStore(t, mockstore.WithStoreType(mockstore.EmbedUnistore))
tk := testkit.NewTestKit(t, store)
tn := astTableWithAsOf(t, "")
p1 := genStaleReadPoint(t, tk)
p2 := genStaleReadPoint(t, tk)
ctx := context.Background()
// create local temporary table to check processor's infoschema will consider temporary table
tk.MustExec("create temporary table test.t2(a int)")
// no sys variable just select ... as of ...
processor := createProcessor(t, tk.Session())
err := processor.OnSelectTable(p1.tn)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, true)
err = processor.OnSelectTable(p1.tn)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, true)
err = processor.OnSelectTable(p2.tn)
require.Error(t, err)
require.Equal(t, "[planner:8135]can not set different time in the as of", err.Error())
p1.checkMatchProcessor(t, processor, true)
// the first select has not 'as of'
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(tn)
require.NoError(t, err)
require.False(t, processor.IsStaleness())
err = processor.OnSelectTable(p1.tn)
require.Equal(t, "[planner:8135]can not set different time in the as of", err.Error())
require.False(t, processor.IsStaleness())
// 'as of' is not allowed when @@txn_read_ts is set
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(p1.tn)
require.Error(t, err)
require.Equal(t, "[planner:8135]invalid as of timestamp: can't use select as of while already set transaction as of", err.Error())
tk.MustExec("set @@tx_read_ts=''")
// no 'as of' will consume @txn_read_ts
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(tn)
p1.checkMatchProcessor(t, processor, true)
tk.Session().GetSessionVars().CleanupTxnReadTSIfUsed()
require.Equal(t, uint64(0), tk.Session().GetSessionVars().TxnReadTS.PeakTxnReadTS())
tk.MustExec("set @@tx_read_ts=''")
// `@@tidb_read_staleness`
tk.MustExec("set @@tidb_read_staleness=-100")
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(tn)
require.True(t, processor.IsStaleness())
require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion())
expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second)
require.NoError(t, err)
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
evaluator := processor.GetStalenessTSEvaluatorForPrepare()
evaluatorTS, err := evaluator(ctx, tk.Session())
require.NoError(t, err)
require.Equal(t, expectedTS, evaluatorTS)
tk.MustExec("set @@tidb_read_staleness=''")
tk.MustExec("do sleep(0.01)")
evaluatorTS, err = evaluator(ctx, tk.Session())
require.NoError(t, err)
expectedTS2, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second)
require.NoError(t, err)
require.Equal(t, expectedTS2, evaluatorTS)
// `@@tidb_read_staleness` will be ignored when `as of` or `@@tx_read_ts`
tk.MustExec("set @@tidb_read_staleness=-100")
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(p1.tn)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, true)
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(tn)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, true)
tk.MustExec("set @@tidb_read_staleness=''")
// `@@tidb_external_ts`
tk.MustExec("start transaction;set global tidb_external_ts=@@tidb_current_ts;commit")
tk.MustExec("set tidb_enable_external_ts_read=ON")
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(tn)
require.True(t, processor.IsStaleness())
expectedTS = getCurrentExternalTimestamp(t, tk)
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
tk.MustExec("set tidb_enable_external_ts_read=OFF")
// `@@tidb_external_ts` will be ignored when `as of`, `@@tx_read_ts` or `@@tidb_read_staleness`
tk.MustExec("start transaction;set global tidb_external_ts=@@tidb_current_ts;commit")
tk.MustExec("set tidb_enable_external_ts_read=ON")
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(p1.tn)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, true)
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(tn)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, true)
tk.MustExec("set @@tidb_read_staleness=-5")
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(tn)
require.True(t, processor.IsStaleness())
require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion())
expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second)
require.NoError(t, err)
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
evaluator = processor.GetStalenessTSEvaluatorForPrepare()
evaluatorTS, err = evaluator(ctx, tk.Session())
require.NoError(t, err)
require.Equal(t, expectedTS, evaluatorTS)
tk.MustExec("set @@tidb_read_staleness=''")
tk.MustExec("set tidb_enable_external_ts_read=OFF")
}
func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) {
store := testkit.CreateMockStore(t, mockstore.WithStoreType(mockstore.EmbedUnistore))
tk := testkit.NewTestKit(t, store)
p1 := genStaleReadPoint(t, tk)
//p2 := genStaleReadPoint(t, tk)
ctx := context.Background()
// create local temporary table to check processor's infoschema will consider temporary table
tk.MustExec("create temporary table test.t2(a int)")
// execute prepared stmt with ts evaluator
processor := createProcessor(t, tk.Session())
err := processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) {
return p1.ts, nil
})
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, true)
// will get an error when ts evaluator fails
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) {
return 0, errors.New("mock error")
})
require.Error(t, err)
require.Equal(t, "mock error", err.Error())
require.False(t, processor.IsStaleness())
// execute prepared stmt without stale read
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(nil)
require.NoError(t, err)
require.False(t, processor.IsStaleness())
// execute prepared stmt without ts evaluator will consume tx_read_ts
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(nil)
p1.checkMatchProcessor(t, processor, true)
tk.Session().GetSessionVars().CleanupTxnReadTSIfUsed()
require.Equal(t, uint64(0), tk.Session().GetSessionVars().TxnReadTS.PeakTxnReadTS())
tk.MustExec("set @@tx_read_ts=''")
// prepared ts is not allowed when @@txn_read_ts is set
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) {
return p1.ts, nil
})
require.Error(t, err)
require.Equal(t, "[planner:8135]invalid as of timestamp: can't use select as of while already set transaction as of", err.Error())
tk.MustExec("set @@tx_read_ts=''")
// `@@tidb_read_staleness`
tk.MustExec("set @@tidb_read_staleness=-100")
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(nil)
require.True(t, processor.IsStaleness())
require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion())
expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second)
require.NoError(t, err)
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
tk.MustExec("set @@tidb_read_staleness=''")
// `@@tidb_read_staleness` will be ignored when `as of` or `@@tx_read_ts`
tk.MustExec("set @@tidb_read_staleness=-100")
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) {
return p1.ts, nil
})
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, true)
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(nil)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, true)
tk.MustExec("set @@tidb_read_staleness=''")
// `@@tidb_external_ts`
tk.MustExec("start transaction;set global tidb_external_ts=@@tidb_current_ts;commit")
tk.MustExec("set tidb_enable_external_ts_read=ON")
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(nil)
require.True(t, processor.IsStaleness())
expectedTS = getCurrentExternalTimestamp(t, tk)
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
tk.MustExec("set tidb_enable_external_ts_read=OFF")
// `@@tidb_external_ts` will be ignored when `as of`, `@@tx_read_ts` or `@@tidb_read_staleness`
tk.MustExec("start transaction;set global tidb_external_ts=@@tidb_current_ts;commit")
tk.MustExec("set tidb_enable_external_ts_read=ON")
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(p1.tn)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, true)
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(nil)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, true)
tk.MustExec("set @@tidb_read_staleness=-5")
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(nil)
require.True(t, processor.IsStaleness())
require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion())
expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second)
require.NoError(t, err)
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
tk.MustExec("set @@tidb_read_staleness=''")
tk.MustExec("set tidb_enable_external_ts_read=OFF")
}
func TestStaleReadProcessorInTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tn := astTableWithAsOf(t, "")
p1 := genStaleReadPoint(t, tk)
_ = genStaleReadPoint(t, tk)
tk.MustExec("begin")
// no error when there is no 'as of'
processor := createProcessor(t, tk.Session())
err := processor.OnSelectTable(tn)
require.NoError(t, err)
require.False(t, processor.IsStaleness())
err = processor.OnSelectTable(tn)
require.NoError(t, err)
require.False(t, processor.IsStaleness())
// no error when execute prepared stmt without ts evaluator
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(nil)
require.NoError(t, err)
require.False(t, processor.IsStaleness())
// return an error when 'as of' is set
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(p1.tn)
require.Error(t, err)
require.Equal(t, "[planner:8135]invalid as of timestamp: as of timestamp can't be set in transaction.", err.Error())
// return an error when execute prepared stmt with as of
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) {
return p1.ts, nil
})
require.Error(t, err)
require.Equal(t, "[planner:8135]invalid as of timestamp: as of timestamp can't be set in transaction.", err.Error())
tk.MustExec("rollback")
tk.MustExec(fmt.Sprintf("start transaction read only as of timestamp '%s'", p1.dt))
// processor will use the transaction's stale read context
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(tn)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, false)
err = processor.OnSelectTable(tn)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, false)
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(nil)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, false)
// sys variables will be ignored in txn
tk.MustExec("set @@tidb_read_staleness=-5")
processor = createProcessor(t, tk.Session())
err = processor.OnSelectTable(tn)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, false)
err = processor.OnSelectTable(tn)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, false)
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(nil)
require.NoError(t, err)
p1.checkMatchProcessor(t, processor, false)
tk.MustExec("set @@tidb_read_staleness=''")
}
func createProcessor(t *testing.T, se sessionctx.Context) staleread.Processor {
processor := staleread.NewStaleReadProcessor(context.Background(), se)
require.False(t, processor.IsStaleness())
require.Equal(t, uint64(0), processor.GetStalenessReadTS())
require.Nil(t, processor.GetStalenessTSEvaluatorForPrepare())
require.Nil(t, processor.GetStalenessInfoSchema())
return processor
}
func TestConsistentCalculateAsOfTsExpr(t *testing.T) {
store := testkit.CreateMockStore(t, mockstore.WithStoreType(mockstore.EmbedUnistore))
tk := testkit.NewTestKit(t, store)
tk.MustExec("set time_zone = '+00:00'")
tk.Session().GetSessionVars().TimeZone = time.UTC
p := parser.New()
stmt, err := p.ParseOneStmt(`select now(3) - interval 1 second`, "", "")
require.NoError(t, err)
secondTsExpr := stmt.(*ast.SelectStmt).Fields.Fields[0].Expr
stmt, err = p.ParseOneStmt(`select now(3) - interval 3 second`, "", "")
require.NoError(t, err)
threeSecondTsExpr := stmt.(*ast.SelectStmt).Fields.Fields[0].Expr
se := tk.Session()
se.GetSessionVars().StmtCtx = stmtctx.NewStmtCtxWithTimeZone(time.UTC)
ts1, err := staleread.CalculateAsOfTsExpr(context.Background(), se.GetPlanCtx(), secondTsExpr)
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
ts2, err := staleread.CalculateAsOfTsExpr(context.Background(), se.GetPlanCtx(), secondTsExpr)
require.NoError(t, err)
require.Equal(t, ts1, ts2)
ts3, err := staleread.CalculateAsOfTsExpr(context.Background(), se.GetPlanCtx(), threeSecondTsExpr)
require.NoError(t, err)
require.True(t, ts3 < ts1)
require.Equal(t, ts1-ts3, uint64(2000<<18))
}