From abd013c82f42045afa5729ebdcdbff577a5f69f9 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 22 May 2019 21:42:58 +0800 Subject: [PATCH] Add tidb_low_resolution_tso session scope variable on master (#10428) --- executor/adapter.go | 4 ++++ executor/executor_test.go | 22 ++++++++++++++++++++++ session/txn.go | 7 ++++++- sessionctx/variable/session.go | 7 ++++++- sessionctx/variable/sysvar.go | 1 + sessionctx/variable/sysvar_test.go | 2 ++ sessionctx/variable/tidb_vars.go | 3 +++ sessionctx/variable/varsutil.go | 2 +- sessionctx/variable/varsutil_test.go | 11 +++++++++++ store/mockoracle/oracle.go | 10 ++++++++++ store/tikv/oracle/oracle.go | 2 ++ store/tikv/oracle/oracles/local.go | 8 ++++++++ store/tikv/oracle/oracles/pd.go | 19 +++++++++++++++++++ 13 files changed, 95 insertions(+), 3 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 6c0e431a61..e172812bdd 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -372,6 +372,10 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex if snapshotTS != 0 { return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set") } + lowResolutionTSO := sctx.GetSessionVars().LowResolutionTSO + if lowResolutionTSO { + return nil, errors.New("can not execute write statement when 'tidb_low_resolution_tso' is set") + } } var err error diff --git a/executor/executor_test.go b/executor/executor_test.go index 8ac9b8f8bd..d28a364aef 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2117,6 +2117,28 @@ func (s *testSuite) TestHistoryRead(c *C) { tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2 ", "4 ", "8 8", "9 9")) } +func (s *testSuite) TestLowResolutionTSORead(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set @@autocommit=1") + tk.MustExec("use test") + tk.MustExec("drop table if exists low_resolution_tso") + tk.MustExec("create table low_resolution_tso(a int)") + tk.MustExec("insert low_resolution_tso values (1)") + + // enable low resolution tso + c.Assert(tk.Se.GetSessionVars().LowResolutionTSO, IsFalse) + tk.Exec("set @@tidb_low_resolution_tso = 'on'") + c.Assert(tk.Se.GetSessionVars().LowResolutionTSO, IsTrue) + + time.Sleep(3 * time.Second) + tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("1")) + _, err := tk.Exec("update low_resolution_tso set a = 2") + c.Assert(err, NotNil) + tk.MustExec("set @@tidb_low_resolution_tso = 'off'") + tk.MustExec("update low_resolution_tso set a = 2") + tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("2")) +} + func (s *testSuite) TestScanControlSelection(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/session/txn.go b/session/txn.go index f010533662..d226614da3 100644 --- a/session/txn.go +++ b/session/txn.go @@ -408,7 +408,12 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture { } oracleStore := s.store.GetOracle() - tsFuture := oracleStore.GetTimestampAsync(ctx) + var tsFuture oracle.Future + if s.sessionVars.LowResolutionTSO { + tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx) + } else { + tsFuture = oracleStore.GetTimestampAsync(ctx) + } ret := &txnFuture{future: tsFuture, store: s.store} if x := ctx.Value("mockGetTSFail"); x != nil { ret.mockFail = true diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 1deb125aaf..a96d616878 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -374,8 +374,11 @@ type SessionVars struct { // EnableFastAnalyze indicates whether to take fast analyze. EnableFastAnalyze bool - // PessimisticLock indicates whether new transaction should be pessimistic . + // PessimisticLock indicates whether new transaction should be pessimistic. PessimisticLock bool + + // LowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds. + LowResolutionTSO bool } // ConnectionInfo present connection used by audit. @@ -793,6 +796,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.WaitTableSplitFinish = TiDBOptOn(val) case TiDBPessimisticLock: s.PessimisticLock = TiDBOptOn(val) + case TiDBLowResolutionTSO: + s.LowResolutionTSO = TiDBOptOn(val) } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index bd8cf2d326..3c349a07f6 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -696,6 +696,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBCheckMb4ValueInUTF8, BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUTF8)}, {ScopeSession, TiDBSlowQueryFile, ""}, {ScopeSession, TiDBWaitTableSplitFinish, BoolToIntStr(DefTiDBWaitTableSplitFinish)}, + {ScopeSession, TiDBLowResolutionTSO, "0"}, } // SynonymsSysVariables is synonyms of system variables. diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index dfdc410b88..e861e120a9 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -44,4 +44,6 @@ func (*testSysVarSuite) TestSysVar(c *C) { c.Assert(f, NotNil) c.Assert(f.Value, Equals, "4000") + f = GetSysVar("tidb_low_resolution_tso") + c.Assert(f.Value, Equals, "0") } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 2f39b2bec3..e66e799bbc 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -138,6 +138,9 @@ const ( // tidb_skip_isolation_level_check is used to control whether to return error when set unsupported transaction // isolation level. TiDBSkipIsolationLevelCheck = "tidb_skip_isolation_level_check" + + // TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds + TiDBLowResolutionTSO = "tidb_low_resolution_tso" ) // TiDB system variable names that both in session and global scope. diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 075751ebda..2180b61391 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -417,7 +417,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze, TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction, - TiDBCheckMb4ValueInUTF8: + TiDBCheckMb4ValueInUTF8, TiDBLowResolutionTSO: if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" { return value, nil } diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 4875582fd3..f4af0357a6 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -276,6 +276,17 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(val, Equals, "0") c.Assert(config.GetGlobalConfig().CheckMb4ValueInUTF8, Equals, false) + SetSessionSystemVar(v, TiDBLowResolutionTSO, types.NewStringDatum("1")) + val, err = GetSessionSystemVar(v, TiDBLowResolutionTSO) + c.Assert(err, IsNil) + c.Assert(val, Equals, "1") + c.Assert(v.LowResolutionTSO, Equals, true) + SetSessionSystemVar(v, TiDBLowResolutionTSO, types.NewStringDatum("0")) + val, err = GetSessionSystemVar(v, TiDBLowResolutionTSO) + c.Assert(err, IsNil) + c.Assert(val, Equals, "0") + c.Assert(v.LowResolutionTSO, Equals, false) + c.Assert(v.CorrelationThreshold, Equals, 0.9) err = SetSessionSystemVar(v, TiDBOptCorrelationThreshold, types.NewStringDatum("0")) c.Assert(err, IsNil) diff --git a/store/mockoracle/oracle.go b/store/mockoracle/oracle.go index aef34e2cfb..0a5d53dc52 100644 --- a/store/mockoracle/oracle.go +++ b/store/mockoracle/oracle.go @@ -92,6 +92,16 @@ func (o *MockOracle) GetTimestampAsync(ctx context.Context) oracle.Future { return &mockOracleFuture{o, ctx} } +// GetLowResolutionTimestamp implements oracle.Oracle interface. +func (o *MockOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) { + return o.GetTimestamp(ctx) +} + +// GetLowResolutionTimestampAsync implements oracle.Oracle interface. +func (o *MockOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future { + return o.GetTimestampAsync(ctx) +} + // IsExpired implements oracle.Oracle interface. func (o *MockOracle) IsExpired(lockTimestamp uint64, TTL uint64) bool { o.RLock() diff --git a/store/tikv/oracle/oracle.go b/store/tikv/oracle/oracle.go index c04909b3d1..021dc71493 100644 --- a/store/tikv/oracle/oracle.go +++ b/store/tikv/oracle/oracle.go @@ -22,6 +22,8 @@ import ( type Oracle interface { GetTimestamp(ctx context.Context) (uint64, error) GetTimestampAsync(ctx context.Context) Future + GetLowResolutionTimestamp(ctx context.Context) (uint64, error) + GetLowResolutionTimestampAsync(ctx context.Context) Future IsExpired(lockTimestamp uint64, TTL uint64) bool UntilExpired(lockTimeStamp uint64, TTL uint64) int64 Close() diff --git a/store/tikv/oracle/oracles/local.go b/store/tikv/oracle/oracles/local.go index 25543ec332..8c4f069f1c 100644 --- a/store/tikv/oracle/oracles/local.go +++ b/store/tikv/oracle/oracles/local.go @@ -70,6 +70,14 @@ func (l *localOracle) GetTimestampAsync(ctx context.Context) oracle.Future { } } +func (l *localOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) { + return l.GetTimestamp(ctx) +} + +func (l *localOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future { + return l.GetTimestampAsync(ctx) +} + type future struct { ctx context.Context l *localOracle diff --git a/store/tikv/oracle/oracles/pd.go b/store/tikv/oracle/oracles/pd.go index 2128c6d2d8..3d07426ed8 100644 --- a/store/tikv/oracle/oracles/pd.go +++ b/store/tikv/oracle/oracles/pd.go @@ -146,3 +146,22 @@ func (o *pdOracle) UntilExpired(lockTS uint64, TTL uint64) int64 { func (o *pdOracle) Close() { close(o.quit) } + +// A future that resolves immediately to a low resolution timestamp. +type lowResolutionTsFuture uint64 + +// Wait implements the oracle.Future interface. +func (f lowResolutionTsFuture) Wait() (uint64, error) { + return uint64(f), nil +} + +// GetLowResolutionTimestamp gets a new increasing time. +func (o *pdOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) { + lastTS := atomic.LoadUint64(&o.lastTS) + return lastTS, nil +} + +func (o *pdOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future { + lastTS := atomic.LoadUint64(&o.lastTS) + return lowResolutionTsFuture(lastTS) +}