sessiontxn/staleread: more accurate stale ts (#44272)
close pingcap/tidb#44215
This commit is contained in:
@ -132,19 +132,19 @@ type calibrateResourceExec struct {
|
||||
done bool
|
||||
}
|
||||
|
||||
func (e *calibrateResourceExec) parseCalibrateDuration() (startTime time.Time, endTime time.Time, err error) {
|
||||
func (e *calibrateResourceExec) parseCalibrateDuration(ctx context.Context) (startTime time.Time, endTime time.Time, err error) {
|
||||
var dur time.Duration
|
||||
var ts uint64
|
||||
for _, op := range e.optionList {
|
||||
switch op.Tp {
|
||||
case ast.CalibrateStartTime:
|
||||
ts, err = staleread.CalculateAsOfTsExpr(e.ctx, op.Ts)
|
||||
ts, err = staleread.CalculateAsOfTsExpr(ctx, e.ctx, op.Ts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
startTime = oracle.GetTimeFromTS(ts)
|
||||
case ast.CalibrateEndTime:
|
||||
ts, err = staleread.CalculateAsOfTsExpr(e.ctx, op.Ts)
|
||||
ts, err = staleread.CalculateAsOfTsExpr(ctx, e.ctx, op.Ts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -197,7 +197,7 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro
|
||||
}
|
||||
|
||||
func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
|
||||
startTs, endTs, err := e.parseCalibrateDuration()
|
||||
startTs, endTs, err := e.parseCalibrateDuration(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -538,7 +538,7 @@ func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.J
|
||||
}
|
||||
|
||||
func (e *DDLExec) executeFlashBackCluster(s *ast.FlashBackToTimestampStmt) error {
|
||||
flashbackTS, err := staleread.CalculateAsOfTsExpr(e.ctx, s.FlashbackTS)
|
||||
flashbackTS, err := staleread.CalculateAsOfTsExpr(context.Background(), e.ctx, s.FlashbackTS)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1396,3 +1396,28 @@ func TestStalePrepare(t *testing.T) {
|
||||
tk.MustQuery("execute stmt").Check(expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStaleTSO(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("drop table if exists t")
|
||||
defer tk.MustExec("drop table if exists t")
|
||||
tk.MustExec("create table t (id int)")
|
||||
|
||||
tk.MustExec("insert into t values(1)")
|
||||
|
||||
asOfExprs := []string{
|
||||
"now(3) - interval 1 second",
|
||||
"current_time() - interval 1 second",
|
||||
"curtime() - interval 1 second",
|
||||
}
|
||||
|
||||
nextTSO := oracle.GoTimeToTS(time.Now().Add(2 * time.Second))
|
||||
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO", fmt.Sprintf("return(%d)", nextTSO)))
|
||||
defer failpoint.Disable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO")
|
||||
for _, expr := range asOfExprs {
|
||||
// Make sure the now() expr is evaluated from the stale ts provider.
|
||||
tk.MustQuery("select * from t as of timestamp " + expr + " order by id asc").Check(testkit.Rows("1"))
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,6 +27,9 @@ import (
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/types"
|
||||
driver "github.com/pingcap/tidb/types/parser_driver"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func boolToInt64(v bool) int64 {
|
||||
@ -158,6 +161,15 @@ func getStmtTimestamp(ctx sessionctx.Context) (time.Time, error) {
|
||||
failpoint.Return(v, nil)
|
||||
})
|
||||
|
||||
if ctx != nil {
|
||||
staleTSO, err := ctx.GetSessionVars().StmtCtx.GetStaleTSO()
|
||||
if staleTSO != 0 && err == nil {
|
||||
return oracle.GetTimeFromTS(staleTSO), nil
|
||||
} else if err != nil {
|
||||
logutil.BgLogger().Error("get stale tso failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
if ctx == nil {
|
||||
|
||||
@ -3549,7 +3549,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
|
||||
case *ast.BeginStmt:
|
||||
readTS := b.ctx.GetSessionVars().TxnReadTS.PeakTxnReadTS()
|
||||
if raw.AsOf != nil {
|
||||
startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf.TsExpr)
|
||||
startTS, err := staleread.CalculateAsOfTsExpr(ctx, b.ctx, raw.AsOf.TsExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -244,7 +244,10 @@ const allowedTimeFromNow = 100 * time.Millisecond
|
||||
|
||||
// ValidateStaleReadTS validates that readTS does not exceed the current time not strictly.
|
||||
func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error {
|
||||
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
|
||||
currentTS, err := sctx.GetSessionVars().StmtCtx.GetStaleTSO()
|
||||
if currentTS == 0 || err != nil {
|
||||
currentTS, err = sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
|
||||
}
|
||||
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD
|
||||
if err != nil {
|
||||
metrics.ValidateReadTSFromPDCount.Inc()
|
||||
|
||||
@ -405,6 +405,12 @@ type StatementContext struct {
|
||||
TiFlashEngineRemovedDueToStrictSQLMode bool
|
||||
// CanonicalHashCode try to get the canonical hash code from expression.
|
||||
CanonicalHashCode bool
|
||||
// StaleTSOProvider is used to provide stale timestamp oracle for read-only transactions.
|
||||
StaleTSOProvider struct {
|
||||
sync.Mutex
|
||||
value *uint64
|
||||
eval func() (uint64, error)
|
||||
}
|
||||
}
|
||||
|
||||
// StmtHints are SessionVars related sql hints.
|
||||
@ -1229,6 +1235,32 @@ func (sc *StatementContext) DetachMemDiskTracker() {
|
||||
}
|
||||
}
|
||||
|
||||
// SetStaleTSOProvider sets the stale TSO provider.
|
||||
func (sc *StatementContext) SetStaleTSOProvider(eval func() (uint64, error)) {
|
||||
sc.StaleTSOProvider.Lock()
|
||||
defer sc.StaleTSOProvider.Unlock()
|
||||
sc.StaleTSOProvider.value = nil
|
||||
sc.StaleTSOProvider.eval = eval
|
||||
}
|
||||
|
||||
// GetStaleTSO returns the TSO for stale-read usage which calculate from PD's last response.
|
||||
func (sc *StatementContext) GetStaleTSO() (uint64, error) {
|
||||
sc.StaleTSOProvider.Lock()
|
||||
defer sc.StaleTSOProvider.Unlock()
|
||||
if sc.StaleTSOProvider.value != nil {
|
||||
return *sc.StaleTSOProvider.value, nil
|
||||
}
|
||||
if sc.StaleTSOProvider.eval == nil {
|
||||
return 0, nil
|
||||
}
|
||||
tso, err := sc.StaleTSOProvider.eval()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
sc.StaleTSOProvider.value = &tso
|
||||
return tso, nil
|
||||
}
|
||||
|
||||
// CopTasksDetails collects some useful information of cop-tasks during execution.
|
||||
type CopTasksDetails struct {
|
||||
NumCopTasks int
|
||||
|
||||
@ -29,6 +29,7 @@ go_library(
|
||||
"//types",
|
||||
"//util/dbterror",
|
||||
"@com_github_pingcap_errors//:errors",
|
||||
"@com_github_pingcap_failpoint//:failpoint",
|
||||
"@com_github_tikv_client_go_v2//oracle",
|
||||
],
|
||||
)
|
||||
|
||||
@ -280,7 +280,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
ts, err := CalculateAsOfTsExpr(sctx, asOf.TsExpr)
|
||||
ts, err := CalculateAsOfTsExpr(ctx, sctx, asOf.TsExpr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/expression"
|
||||
"github.com/pingcap/tidb/parser/ast"
|
||||
"github.com/pingcap/tidb/parser/mysql"
|
||||
@ -29,7 +30,16 @@ import (
|
||||
)
|
||||
|
||||
// CalculateAsOfTsExpr calculates the TsExpr of AsOfClause to get a StartTS.
|
||||
func CalculateAsOfTsExpr(sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) {
|
||||
func CalculateAsOfTsExpr(ctx context.Context, sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) {
|
||||
sctx.GetSessionVars().StmtCtx.SetStaleTSOProvider(func() (uint64, error) {
|
||||
failpoint.Inject("mockStaleReadTSO", func(val failpoint.Value) (uint64, error) {
|
||||
return uint64(val.(int)), nil
|
||||
})
|
||||
// this function accepts a context, but we don't need it when there is a valid cached ts.
|
||||
// in most cases, the stale read ts can be calculated from `cached ts + time since cache - staleness`,
|
||||
// this can be more accurate than `time.Now() - staleness`, because TiDB's local time can drift.
|
||||
return sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
|
||||
})
|
||||
tsVal, err := expression.EvalAstExpr(sctx, tsExpr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
|
||||
Reference in New Issue
Block a user