store/tikv: resolve variable dependency (#22609)
Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
@ -437,16 +437,16 @@ func (*testSuite) TestT(c *C) {
|
||||
PS: make([]*util.ProcessInfo, 0),
|
||||
}
|
||||
infoSyncer.SetSessionManager(sm)
|
||||
beforeTS := variable.GoTimeToTS(time.Now())
|
||||
beforeTS := oracle.GoTimeToTS(time.Now())
|
||||
infoSyncer.ReportMinStartTS(dom.Store())
|
||||
afterTS := variable.GoTimeToTS(time.Now())
|
||||
afterTS := oracle.GoTimeToTS(time.Now())
|
||||
c.Assert(infoSyncer.GetMinStartTS() > beforeTS && infoSyncer.GetMinStartTS() < afterTS, IsFalse)
|
||||
lowerLimit := time.Now().Add(-time.Duration(kv.MaxTxnTimeUse) * time.Millisecond)
|
||||
validTS := variable.GoTimeToTS(lowerLimit.Add(time.Minute))
|
||||
validTS := oracle.GoTimeToTS(lowerLimit.Add(time.Minute))
|
||||
sm.PS = []*util.ProcessInfo{
|
||||
{CurTxnStartTS: 0},
|
||||
{CurTxnStartTS: math.MaxUint64},
|
||||
{CurTxnStartTS: variable.GoTimeToTS(lowerLimit)},
|
||||
{CurTxnStartTS: oracle.GoTimeToTS(lowerLimit)},
|
||||
{CurTxnStartTS: validTS},
|
||||
}
|
||||
infoSyncer.SetSessionManager(sm)
|
||||
|
||||
@ -40,7 +40,6 @@ import (
|
||||
"github.com/pingcap/tidb/metrics"
|
||||
"github.com/pingcap/tidb/owner"
|
||||
"github.com/pingcap/tidb/sessionctx/binloginfo"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
util2 "github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/dbterror"
|
||||
@ -564,9 +563,9 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
|
||||
return
|
||||
}
|
||||
now := time.Unix(0, oracle.ExtractPhysical(currentVer.Ver)*1e6)
|
||||
startTSLowerLimit := variable.GoTimeToTS(now.Add(-time.Duration(kv.MaxTxnTimeUse) * time.Millisecond))
|
||||
startTSLowerLimit := oracle.GoTimeToTS(now.Add(-time.Duration(kv.MaxTxnTimeUse) * time.Millisecond))
|
||||
|
||||
minStartTS := variable.GoTimeToTS(now)
|
||||
minStartTS := oracle.GoTimeToTS(now)
|
||||
for _, info := range pl {
|
||||
if info.CurTxnStartTS > startTSLowerLimit && info.CurTxnStartTS < minStartTS {
|
||||
minStartTS = info.CurTxnStartTS
|
||||
|
||||
@ -39,7 +39,7 @@ import (
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util/chunk"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
@ -164,7 +164,7 @@ func (b *executorBuilder) parseTSString(ts string) (uint64, error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return variable.GoTimeToTS(t1), nil
|
||||
return oracle.GoTimeToTS(t1), nil
|
||||
}
|
||||
|
||||
func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) Executor {
|
||||
|
||||
@ -22,7 +22,7 @@ import (
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/tidb/domain"
|
||||
"github.com/pingcap/tidb/session"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util/gcutil"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
@ -104,7 +104,7 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
||||
writeError(w, err)
|
||||
return
|
||||
}
|
||||
snapshot := variable.GoTimeToTS(t1)
|
||||
snapshot := oracle.GoTimeToTS(t1)
|
||||
err = gcutil.ValidateSnapshot(se, snapshot)
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
|
||||
@ -31,7 +31,6 @@ import (
|
||||
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/tidb/config"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/mockstore/cluster"
|
||||
"github.com/pingcap/tidb/store/mockstore/mocktikv"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
@ -957,14 +956,14 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) {
|
||||
LockType: kvrpcpb.Op_PessimisticLock,
|
||||
LockForUpdateTS: txn1.startTS,
|
||||
}
|
||||
status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey2, variable.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false)
|
||||
status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey2, oracle.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_TTLExpirePessimisticRollback)
|
||||
|
||||
// Txn2 tries to lock the secondary key k2, there should be no dead loop.
|
||||
// Since the resolving key k2 is a pessimistic lock, no rollback record should be written, and later lock
|
||||
// and the other secondary key k3 should succeed if there is no fail point enabled.
|
||||
status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey2, variable.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false)
|
||||
status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey2, oracle.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_LockNotExistDoNothing)
|
||||
txn2 := s.begin(c)
|
||||
@ -997,7 +996,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) {
|
||||
lockCtx = &kv.LockCtx{ForUpdateTS: txn3.startTS, WaitStartTime: time.Now(), LockWaitTime: kv.LockNoWait}
|
||||
err = txn3.LockKeys(ctx, lockCtx, k3)
|
||||
c.Assert(err, IsNil)
|
||||
status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey3, variable.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false)
|
||||
status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey3, oracle.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_LockNotExistDoNothing)
|
||||
}
|
||||
|
||||
@ -42,7 +42,6 @@ import (
|
||||
"github.com/pingcap/tidb/metrics"
|
||||
"github.com/pingcap/tidb/privilege"
|
||||
"github.com/pingcap/tidb/session"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/tikv"
|
||||
"github.com/pingcap/tidb/store/tikv/logutil"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
@ -502,7 +501,7 @@ func (w *GCWorker) calcNewSafePoint(ctx context.Context, now time.Time) (*time.T
|
||||
return nil, 0, errors.Trace(err)
|
||||
}
|
||||
|
||||
safePointValue := w.calcSafePointByMinStartTS(ctx, variable.GoTimeToTS(now.Add(-*lifeTime)))
|
||||
safePointValue := w.calcSafePointByMinStartTS(ctx, oracle.GoTimeToTS(now.Add(-*lifeTime)))
|
||||
safePointValue, err = w.setGCWorkerServiceSafePoint(ctx, safePointValue)
|
||||
if err != nil {
|
||||
return nil, 0, errors.Trace(err)
|
||||
|
||||
@ -37,7 +37,6 @@ import (
|
||||
"github.com/pingcap/tidb/domain/infosync"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/session"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/mockoracle"
|
||||
"github.com/pingcap/tidb/store/mockstore"
|
||||
"github.com/pingcap/tidb/store/mockstore/cluster"
|
||||
@ -241,7 +240,7 @@ func (s *testGCWorkerSuite) TestMinStartTS(c *C) {
|
||||
spkv := s.store.GetSafePointKV()
|
||||
err := spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(math.MaxUint64, 10))
|
||||
c.Assert(err, IsNil)
|
||||
now := variable.GoTimeToTS(time.Now())
|
||||
now := oracle.GoTimeToTS(time.Now())
|
||||
sp := s.gcWorker.calcSafePointByMinStartTS(ctx, now)
|
||||
c.Assert(sp, Equals, now)
|
||||
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), "0")
|
||||
@ -380,7 +379,7 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) {
|
||||
|
||||
// Check skipping GC if safe point is not changed.
|
||||
safePointTime, err := s.gcWorker.loadTime(gcSafePointKey)
|
||||
minStartTS := variable.GoTimeToTS(*safePointTime) + 1
|
||||
minStartTS := oracle.GoTimeToTS(*safePointTime) + 1
|
||||
c.Assert(err, IsNil)
|
||||
spkv := s.store.GetSafePointKV()
|
||||
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(minStartTS, 10))
|
||||
|
||||
@ -88,3 +88,9 @@ func GetTimeFromTS(ts uint64) time.Time {
|
||||
ms := ExtractPhysical(ts)
|
||||
return time.Unix(ms/1e3, (ms%1e3)*1e6)
|
||||
}
|
||||
|
||||
// GoTimeToTS converts a Go time to uint64 timestamp.
|
||||
func GoTimeToTS(t time.Time) uint64 {
|
||||
ts := (t.UnixNano() / int64(time.Millisecond)) << physicalShiftBits
|
||||
return uint64(ts)
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/store/tikv/util"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
)
|
||||
@ -86,6 +87,6 @@ func GetGCSafePoint(ctx sessionctx.Context) (uint64, error) {
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
ts := variable.GoTimeToTS(safePointTime)
|
||||
ts := oracle.GoTimeToTS(safePointTime)
|
||||
return ts, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user