Add tidb_low_resolution_tso session scope variable on master (#10428)
This commit is contained in:
committed by
Zhang Jian
parent
52c9d03a5e
commit
abd013c82f
@ -372,6 +372,10 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
|
||||
if snapshotTS != 0 {
|
||||
return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set")
|
||||
}
|
||||
lowResolutionTSO := sctx.GetSessionVars().LowResolutionTSO
|
||||
if lowResolutionTSO {
|
||||
return nil, errors.New("can not execute write statement when 'tidb_low_resolution_tso' is set")
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
@ -2117,6 +2117,28 @@ func (s *testSuite) TestHistoryRead(c *C) {
|
||||
tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2 <nil>", "4 <nil>", "8 8", "9 9"))
|
||||
}
|
||||
|
||||
func (s *testSuite) TestLowResolutionTSORead(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustExec("set @@autocommit=1")
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("drop table if exists low_resolution_tso")
|
||||
tk.MustExec("create table low_resolution_tso(a int)")
|
||||
tk.MustExec("insert low_resolution_tso values (1)")
|
||||
|
||||
// enable low resolution tso
|
||||
c.Assert(tk.Se.GetSessionVars().LowResolutionTSO, IsFalse)
|
||||
tk.Exec("set @@tidb_low_resolution_tso = 'on'")
|
||||
c.Assert(tk.Se.GetSessionVars().LowResolutionTSO, IsTrue)
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("1"))
|
||||
_, err := tk.Exec("update low_resolution_tso set a = 2")
|
||||
c.Assert(err, NotNil)
|
||||
tk.MustExec("set @@tidb_low_resolution_tso = 'off'")
|
||||
tk.MustExec("update low_resolution_tso set a = 2")
|
||||
tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("2"))
|
||||
}
|
||||
|
||||
func (s *testSuite) TestScanControlSelection(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustExec("use test")
|
||||
|
||||
@ -408,7 +408,12 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture {
|
||||
}
|
||||
|
||||
oracleStore := s.store.GetOracle()
|
||||
tsFuture := oracleStore.GetTimestampAsync(ctx)
|
||||
var tsFuture oracle.Future
|
||||
if s.sessionVars.LowResolutionTSO {
|
||||
tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx)
|
||||
} else {
|
||||
tsFuture = oracleStore.GetTimestampAsync(ctx)
|
||||
}
|
||||
ret := &txnFuture{future: tsFuture, store: s.store}
|
||||
if x := ctx.Value("mockGetTSFail"); x != nil {
|
||||
ret.mockFail = true
|
||||
|
||||
@ -374,8 +374,11 @@ type SessionVars struct {
|
||||
// EnableFastAnalyze indicates whether to take fast analyze.
|
||||
EnableFastAnalyze bool
|
||||
|
||||
// PessimisticLock indicates whether new transaction should be pessimistic .
|
||||
// PessimisticLock indicates whether new transaction should be pessimistic.
|
||||
PessimisticLock bool
|
||||
|
||||
// LowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds.
|
||||
LowResolutionTSO bool
|
||||
}
|
||||
|
||||
// ConnectionInfo present connection used by audit.
|
||||
@ -793,6 +796,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
|
||||
s.WaitTableSplitFinish = TiDBOptOn(val)
|
||||
case TiDBPessimisticLock:
|
||||
s.PessimisticLock = TiDBOptOn(val)
|
||||
case TiDBLowResolutionTSO:
|
||||
s.LowResolutionTSO = TiDBOptOn(val)
|
||||
}
|
||||
s.systems[name] = val
|
||||
return nil
|
||||
|
||||
@ -696,6 +696,7 @@ var defaultSysVars = []*SysVar{
|
||||
{ScopeSession, TiDBCheckMb4ValueInUTF8, BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUTF8)},
|
||||
{ScopeSession, TiDBSlowQueryFile, ""},
|
||||
{ScopeSession, TiDBWaitTableSplitFinish, BoolToIntStr(DefTiDBWaitTableSplitFinish)},
|
||||
{ScopeSession, TiDBLowResolutionTSO, "0"},
|
||||
}
|
||||
|
||||
// SynonymsSysVariables is synonyms of system variables.
|
||||
|
||||
@ -44,4 +44,6 @@ func (*testSysVarSuite) TestSysVar(c *C) {
|
||||
c.Assert(f, NotNil)
|
||||
c.Assert(f.Value, Equals, "4000")
|
||||
|
||||
f = GetSysVar("tidb_low_resolution_tso")
|
||||
c.Assert(f.Value, Equals, "0")
|
||||
}
|
||||
|
||||
@ -138,6 +138,9 @@ const (
|
||||
// tidb_skip_isolation_level_check is used to control whether to return error when set unsupported transaction
|
||||
// isolation level.
|
||||
TiDBSkipIsolationLevelCheck = "tidb_skip_isolation_level_check"
|
||||
|
||||
// TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds
|
||||
TiDBLowResolutionTSO = "tidb_low_resolution_tso"
|
||||
)
|
||||
|
||||
// TiDB system variable names that both in session and global scope.
|
||||
|
||||
@ -417,7 +417,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
|
||||
TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze,
|
||||
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming,
|
||||
TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction,
|
||||
TiDBCheckMb4ValueInUTF8:
|
||||
TiDBCheckMb4ValueInUTF8, TiDBLowResolutionTSO:
|
||||
if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" {
|
||||
return value, nil
|
||||
}
|
||||
|
||||
@ -276,6 +276,17 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
|
||||
c.Assert(val, Equals, "0")
|
||||
c.Assert(config.GetGlobalConfig().CheckMb4ValueInUTF8, Equals, false)
|
||||
|
||||
SetSessionSystemVar(v, TiDBLowResolutionTSO, types.NewStringDatum("1"))
|
||||
val, err = GetSessionSystemVar(v, TiDBLowResolutionTSO)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(val, Equals, "1")
|
||||
c.Assert(v.LowResolutionTSO, Equals, true)
|
||||
SetSessionSystemVar(v, TiDBLowResolutionTSO, types.NewStringDatum("0"))
|
||||
val, err = GetSessionSystemVar(v, TiDBLowResolutionTSO)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(val, Equals, "0")
|
||||
c.Assert(v.LowResolutionTSO, Equals, false)
|
||||
|
||||
c.Assert(v.CorrelationThreshold, Equals, 0.9)
|
||||
err = SetSessionSystemVar(v, TiDBOptCorrelationThreshold, types.NewStringDatum("0"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
@ -92,6 +92,16 @@ func (o *MockOracle) GetTimestampAsync(ctx context.Context) oracle.Future {
|
||||
return &mockOracleFuture{o, ctx}
|
||||
}
|
||||
|
||||
// GetLowResolutionTimestamp implements oracle.Oracle interface.
|
||||
func (o *MockOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) {
|
||||
return o.GetTimestamp(ctx)
|
||||
}
|
||||
|
||||
// GetLowResolutionTimestampAsync implements oracle.Oracle interface.
|
||||
func (o *MockOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future {
|
||||
return o.GetTimestampAsync(ctx)
|
||||
}
|
||||
|
||||
// IsExpired implements oracle.Oracle interface.
|
||||
func (o *MockOracle) IsExpired(lockTimestamp uint64, TTL uint64) bool {
|
||||
o.RLock()
|
||||
|
||||
@ -22,6 +22,8 @@ import (
|
||||
type Oracle interface {
|
||||
GetTimestamp(ctx context.Context) (uint64, error)
|
||||
GetTimestampAsync(ctx context.Context) Future
|
||||
GetLowResolutionTimestamp(ctx context.Context) (uint64, error)
|
||||
GetLowResolutionTimestampAsync(ctx context.Context) Future
|
||||
IsExpired(lockTimestamp uint64, TTL uint64) bool
|
||||
UntilExpired(lockTimeStamp uint64, TTL uint64) int64
|
||||
Close()
|
||||
|
||||
@ -70,6 +70,14 @@ func (l *localOracle) GetTimestampAsync(ctx context.Context) oracle.Future {
|
||||
}
|
||||
}
|
||||
|
||||
func (l *localOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) {
|
||||
return l.GetTimestamp(ctx)
|
||||
}
|
||||
|
||||
func (l *localOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future {
|
||||
return l.GetTimestampAsync(ctx)
|
||||
}
|
||||
|
||||
type future struct {
|
||||
ctx context.Context
|
||||
l *localOracle
|
||||
|
||||
@ -146,3 +146,22 @@ func (o *pdOracle) UntilExpired(lockTS uint64, TTL uint64) int64 {
|
||||
func (o *pdOracle) Close() {
|
||||
close(o.quit)
|
||||
}
|
||||
|
||||
// A future that resolves immediately to a low resolution timestamp.
|
||||
type lowResolutionTsFuture uint64
|
||||
|
||||
// Wait implements the oracle.Future interface.
|
||||
func (f lowResolutionTsFuture) Wait() (uint64, error) {
|
||||
return uint64(f), nil
|
||||
}
|
||||
|
||||
// GetLowResolutionTimestamp gets a new increasing time.
|
||||
func (o *pdOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) {
|
||||
lastTS := atomic.LoadUint64(&o.lastTS)
|
||||
return lastTS, nil
|
||||
}
|
||||
|
||||
func (o *pdOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future {
|
||||
lastTS := atomic.LoadUint64(&o.lastTS)
|
||||
return lowResolutionTsFuture(lastTS)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user