464 lines
18 KiB
Go
464 lines
18 KiB
Go
// Copyright 2022 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package staleread_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/tidb/pkg/domain"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
"github.com/pingcap/tidb/pkg/parser"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
|
|
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
|
|
"github.com/pingcap/tidb/pkg/store/mockstore"
|
|
"github.com/pingcap/tidb/pkg/table/temptable"
|
|
"github.com/pingcap/tidb/pkg/testkit"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/tikv/client-go/v2/oracle"
|
|
)
|
|
|
|
type staleReadPoint struct {
|
|
tk *testkit.TestKit
|
|
ts uint64
|
|
dt string
|
|
tm time.Time
|
|
is infoschema.InfoSchema
|
|
tn *ast.TableName
|
|
}
|
|
|
|
func (p *staleReadPoint) checkMatchProcessor(t *testing.T, processor staleread.Processor, hasEvaluator bool) {
|
|
require.True(t, processor.IsStaleness())
|
|
require.Equal(t, p.ts, processor.GetStalenessReadTS())
|
|
require.Equal(t, p.is.SchemaMetaVersion(), processor.GetStalenessInfoSchema().SchemaMetaVersion())
|
|
require.IsTypef(t, processor.GetStalenessInfoSchema(), temptable.AttachLocalTemporaryTableInfoSchema(p.tk.Session(), p.is), "")
|
|
evaluator := processor.GetStalenessTSEvaluatorForPrepare()
|
|
if hasEvaluator {
|
|
require.NotNil(t, evaluator)
|
|
ts, err := evaluator(context.Background(), p.tk.Session())
|
|
require.NoError(t, err)
|
|
require.Equal(t, processor.GetStalenessReadTS(), ts)
|
|
} else {
|
|
require.Nil(t, evaluator)
|
|
}
|
|
}
|
|
|
|
func genStaleReadPoint(t *testing.T, tk *testkit.TestKit) *staleReadPoint {
|
|
tk.MustExec("create table if not exists test.t(a bigint)")
|
|
tk.MustExec(fmt.Sprintf("alter table test.t alter column a set default %d", time.Now().UnixNano()))
|
|
time.Sleep(time.Millisecond * 20)
|
|
is := domain.GetDomain(tk.Session()).InfoSchema()
|
|
dt := tk.MustQuery("select now(3)").Rows()[0][0].(string)
|
|
tm, err := time.ParseInLocation("2006-01-02 15:04:05.999999", dt, tk.Session().GetSessionVars().Location())
|
|
require.NoError(t, err)
|
|
ts := oracle.GoTimeToTS(tm)
|
|
tn := astTableWithAsOf(t, dt)
|
|
return &staleReadPoint{
|
|
tk: tk,
|
|
ts: ts,
|
|
dt: dt,
|
|
tm: tm,
|
|
is: is,
|
|
tn: tn,
|
|
}
|
|
}
|
|
|
|
func astTableWithAsOf(t *testing.T, dt string) *ast.TableName {
|
|
p := parser.New()
|
|
var sql string
|
|
if dt == "" {
|
|
sql = "select * from test.t"
|
|
} else {
|
|
sql = fmt.Sprintf("select * from test.t as of timestamp '%s'", dt)
|
|
}
|
|
|
|
stmt, err := p.ParseOneStmt(sql, "", "")
|
|
require.NoError(t, err)
|
|
sel := stmt.(*ast.SelectStmt)
|
|
return sel.From.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName)
|
|
}
|
|
|
|
func getCurrentExternalTimestamp(t *testing.T, tk *testkit.TestKit) uint64 {
|
|
externalTimestampStr := tk.MustQuery("select @@tidb_external_ts").Rows()[0][0].(string)
|
|
externalTimestamp, err := strconv.ParseUint(externalTimestampStr, 10, 64)
|
|
require.NoError(t, err)
|
|
|
|
return externalTimestamp
|
|
}
|
|
|
|
func TestStaleReadProcessorWithSelectTable(t *testing.T) {
|
|
store := testkit.CreateMockStore(t, mockstore.WithStoreType(mockstore.EmbedUnistore))
|
|
tk := testkit.NewTestKit(t, store)
|
|
tn := astTableWithAsOf(t, "")
|
|
p1 := genStaleReadPoint(t, tk)
|
|
p2 := genStaleReadPoint(t, tk)
|
|
ctx := context.Background()
|
|
|
|
// create local temporary table to check processor's infoschema will consider temporary table
|
|
tk.MustExec("create temporary table test.t2(a int)")
|
|
|
|
// no sys variable just select ... as of ...
|
|
processor := createProcessor(t, tk.Session())
|
|
err := processor.OnSelectTable(p1.tn)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
err = processor.OnSelectTable(p1.tn)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
err = processor.OnSelectTable(p2.tn)
|
|
require.Error(t, err)
|
|
require.Equal(t, "[planner:8135]can not set different time in the as of", err.Error())
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
|
|
// the first select has not 'as of'
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(tn)
|
|
require.NoError(t, err)
|
|
require.False(t, processor.IsStaleness())
|
|
err = processor.OnSelectTable(p1.tn)
|
|
require.Equal(t, "[planner:8135]can not set different time in the as of", err.Error())
|
|
require.False(t, processor.IsStaleness())
|
|
|
|
// 'as of' is not allowed when @@txn_read_ts is set
|
|
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(p1.tn)
|
|
require.Error(t, err)
|
|
require.Equal(t, "[planner:8135]invalid as of timestamp: can't use select as of while already set transaction as of", err.Error())
|
|
tk.MustExec("set @@tx_read_ts=''")
|
|
|
|
// no 'as of' will consume @txn_read_ts
|
|
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(tn)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
tk.Session().GetSessionVars().CleanupTxnReadTSIfUsed()
|
|
require.Equal(t, uint64(0), tk.Session().GetSessionVars().TxnReadTS.PeakTxnReadTS())
|
|
tk.MustExec("set @@tx_read_ts=''")
|
|
|
|
// `@@tidb_read_staleness`
|
|
tk.MustExec("set @@tidb_read_staleness=-100")
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(tn)
|
|
require.True(t, processor.IsStaleness())
|
|
require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion())
|
|
expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
|
|
evaluator := processor.GetStalenessTSEvaluatorForPrepare()
|
|
evaluatorTS, err := evaluator(ctx, tk.Session())
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedTS, evaluatorTS)
|
|
tk.MustExec("set @@tidb_read_staleness=''")
|
|
|
|
tk.MustExec("do sleep(0.01)")
|
|
evaluatorTS, err = evaluator(ctx, tk.Session())
|
|
require.NoError(t, err)
|
|
expectedTS2, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedTS2, evaluatorTS)
|
|
|
|
// `@@tidb_read_staleness` will be ignored when `as of` or `@@tx_read_ts`
|
|
tk.MustExec("set @@tidb_read_staleness=-100")
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(p1.tn)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
|
|
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(tn)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
tk.MustExec("set @@tidb_read_staleness=''")
|
|
|
|
// `@@tidb_external_ts`
|
|
tk.MustExec("start transaction;set global tidb_external_ts=@@tidb_current_ts;commit")
|
|
tk.MustExec("set tidb_enable_external_ts_read=ON")
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(tn)
|
|
require.True(t, processor.IsStaleness())
|
|
expectedTS = getCurrentExternalTimestamp(t, tk)
|
|
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
|
|
tk.MustExec("set tidb_enable_external_ts_read=OFF")
|
|
|
|
// `@@tidb_external_ts` will be ignored when `as of`, `@@tx_read_ts` or `@@tidb_read_staleness`
|
|
tk.MustExec("start transaction;set global tidb_external_ts=@@tidb_current_ts;commit")
|
|
tk.MustExec("set tidb_enable_external_ts_read=ON")
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(p1.tn)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
|
|
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(tn)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
|
|
tk.MustExec("set @@tidb_read_staleness=-5")
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(tn)
|
|
require.True(t, processor.IsStaleness())
|
|
require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion())
|
|
expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
|
|
evaluator = processor.GetStalenessTSEvaluatorForPrepare()
|
|
evaluatorTS, err = evaluator(ctx, tk.Session())
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedTS, evaluatorTS)
|
|
tk.MustExec("set @@tidb_read_staleness=''")
|
|
|
|
tk.MustExec("set tidb_enable_external_ts_read=OFF")
|
|
}
|
|
|
|
func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) {
|
|
store := testkit.CreateMockStore(t, mockstore.WithStoreType(mockstore.EmbedUnistore))
|
|
tk := testkit.NewTestKit(t, store)
|
|
p1 := genStaleReadPoint(t, tk)
|
|
//p2 := genStaleReadPoint(t, tk)
|
|
ctx := context.Background()
|
|
|
|
// create local temporary table to check processor's infoschema will consider temporary table
|
|
tk.MustExec("create temporary table test.t2(a int)")
|
|
|
|
// execute prepared stmt with ts evaluator
|
|
processor := createProcessor(t, tk.Session())
|
|
err := processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) {
|
|
return p1.ts, nil
|
|
})
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
|
|
// will get an error when ts evaluator fails
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) {
|
|
return 0, errors.New("mock error")
|
|
})
|
|
require.Error(t, err)
|
|
require.Equal(t, "mock error", err.Error())
|
|
require.False(t, processor.IsStaleness())
|
|
|
|
// execute prepared stmt without stale read
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(nil)
|
|
require.NoError(t, err)
|
|
require.False(t, processor.IsStaleness())
|
|
|
|
// execute prepared stmt without ts evaluator will consume tx_read_ts
|
|
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(nil)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
tk.Session().GetSessionVars().CleanupTxnReadTSIfUsed()
|
|
require.Equal(t, uint64(0), tk.Session().GetSessionVars().TxnReadTS.PeakTxnReadTS())
|
|
tk.MustExec("set @@tx_read_ts=''")
|
|
|
|
// prepared ts is not allowed when @@txn_read_ts is set
|
|
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) {
|
|
return p1.ts, nil
|
|
})
|
|
require.Error(t, err)
|
|
require.Equal(t, "[planner:8135]invalid as of timestamp: can't use select as of while already set transaction as of", err.Error())
|
|
tk.MustExec("set @@tx_read_ts=''")
|
|
|
|
// `@@tidb_read_staleness`
|
|
tk.MustExec("set @@tidb_read_staleness=-100")
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(nil)
|
|
require.True(t, processor.IsStaleness())
|
|
require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion())
|
|
expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
|
|
tk.MustExec("set @@tidb_read_staleness=''")
|
|
|
|
// `@@tidb_read_staleness` will be ignored when `as of` or `@@tx_read_ts`
|
|
tk.MustExec("set @@tidb_read_staleness=-100")
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) {
|
|
return p1.ts, nil
|
|
})
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
|
|
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(nil)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
tk.MustExec("set @@tidb_read_staleness=''")
|
|
|
|
// `@@tidb_external_ts`
|
|
tk.MustExec("start transaction;set global tidb_external_ts=@@tidb_current_ts;commit")
|
|
tk.MustExec("set tidb_enable_external_ts_read=ON")
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(nil)
|
|
require.True(t, processor.IsStaleness())
|
|
expectedTS = getCurrentExternalTimestamp(t, tk)
|
|
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
|
|
tk.MustExec("set tidb_enable_external_ts_read=OFF")
|
|
|
|
// `@@tidb_external_ts` will be ignored when `as of`, `@@tx_read_ts` or `@@tidb_read_staleness`
|
|
tk.MustExec("start transaction;set global tidb_external_ts=@@tidb_current_ts;commit")
|
|
tk.MustExec("set tidb_enable_external_ts_read=ON")
|
|
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(p1.tn)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
|
|
tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt))
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(nil)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, true)
|
|
|
|
tk.MustExec("set @@tidb_read_staleness=-5")
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(nil)
|
|
require.True(t, processor.IsStaleness())
|
|
require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion())
|
|
expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
|
|
tk.MustExec("set @@tidb_read_staleness=''")
|
|
|
|
tk.MustExec("set tidb_enable_external_ts_read=OFF")
|
|
}
|
|
|
|
func TestStaleReadProcessorInTxn(t *testing.T) {
|
|
store := testkit.CreateMockStore(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
tn := astTableWithAsOf(t, "")
|
|
p1 := genStaleReadPoint(t, tk)
|
|
_ = genStaleReadPoint(t, tk)
|
|
|
|
tk.MustExec("begin")
|
|
|
|
// no error when there is no 'as of'
|
|
processor := createProcessor(t, tk.Session())
|
|
err := processor.OnSelectTable(tn)
|
|
require.NoError(t, err)
|
|
require.False(t, processor.IsStaleness())
|
|
err = processor.OnSelectTable(tn)
|
|
require.NoError(t, err)
|
|
require.False(t, processor.IsStaleness())
|
|
|
|
// no error when execute prepared stmt without ts evaluator
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(nil)
|
|
require.NoError(t, err)
|
|
require.False(t, processor.IsStaleness())
|
|
|
|
// return an error when 'as of' is set
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(p1.tn)
|
|
require.Error(t, err)
|
|
require.Equal(t, "[planner:8135]invalid as of timestamp: as of timestamp can't be set in transaction.", err.Error())
|
|
|
|
// return an error when execute prepared stmt with as of
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) {
|
|
return p1.ts, nil
|
|
})
|
|
require.Error(t, err)
|
|
require.Equal(t, "[planner:8135]invalid as of timestamp: as of timestamp can't be set in transaction.", err.Error())
|
|
|
|
tk.MustExec("rollback")
|
|
|
|
tk.MustExec(fmt.Sprintf("start transaction read only as of timestamp '%s'", p1.dt))
|
|
|
|
// processor will use the transaction's stale read context
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(tn)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, false)
|
|
err = processor.OnSelectTable(tn)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, false)
|
|
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(nil)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, false)
|
|
|
|
// sys variables will be ignored in txn
|
|
tk.MustExec("set @@tidb_read_staleness=-5")
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnSelectTable(tn)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, false)
|
|
err = processor.OnSelectTable(tn)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, false)
|
|
|
|
processor = createProcessor(t, tk.Session())
|
|
err = processor.OnExecutePreparedStmt(nil)
|
|
require.NoError(t, err)
|
|
p1.checkMatchProcessor(t, processor, false)
|
|
tk.MustExec("set @@tidb_read_staleness=''")
|
|
}
|
|
|
|
func createProcessor(t *testing.T, se sessionctx.Context) staleread.Processor {
|
|
processor := staleread.NewStaleReadProcessor(context.Background(), se)
|
|
require.False(t, processor.IsStaleness())
|
|
require.Equal(t, uint64(0), processor.GetStalenessReadTS())
|
|
require.Nil(t, processor.GetStalenessTSEvaluatorForPrepare())
|
|
require.Nil(t, processor.GetStalenessInfoSchema())
|
|
return processor
|
|
}
|
|
|
|
func TestConsistentCalculateAsOfTsExpr(t *testing.T) {
|
|
store := testkit.CreateMockStore(t, mockstore.WithStoreType(mockstore.EmbedUnistore))
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("set time_zone = '+00:00'")
|
|
tk.Session().GetSessionVars().TimeZone = time.UTC
|
|
|
|
p := parser.New()
|
|
stmt, err := p.ParseOneStmt(`select now(3) - interval 1 second`, "", "")
|
|
require.NoError(t, err)
|
|
secondTsExpr := stmt.(*ast.SelectStmt).Fields.Fields[0].Expr
|
|
stmt, err = p.ParseOneStmt(`select now(3) - interval 3 second`, "", "")
|
|
require.NoError(t, err)
|
|
threeSecondTsExpr := stmt.(*ast.SelectStmt).Fields.Fields[0].Expr
|
|
|
|
se := tk.Session()
|
|
se.GetSessionVars().StmtCtx = stmtctx.NewStmtCtxWithTimeZone(time.UTC)
|
|
|
|
ts1, err := staleread.CalculateAsOfTsExpr(context.Background(), se.GetPlanCtx(), secondTsExpr)
|
|
require.NoError(t, err)
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
ts2, err := staleread.CalculateAsOfTsExpr(context.Background(), se.GetPlanCtx(), secondTsExpr)
|
|
require.NoError(t, err)
|
|
require.Equal(t, ts1, ts2)
|
|
|
|
ts3, err := staleread.CalculateAsOfTsExpr(context.Background(), se.GetPlanCtx(), threeSecondTsExpr)
|
|
require.NoError(t, err)
|
|
require.True(t, ts3 < ts1)
|
|
require.Equal(t, ts1-ts3, uint64(2000<<18))
|
|
}
|