*: refactor GetSnapshot (#20475)
Signed-off-by: Shuaipeng Yu <jackysp@gmail.com>
This commit is contained in:
@ -592,11 +592,8 @@ func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version
|
||||
}
|
||||
|
||||
ver := kv.Version{Ver: version}
|
||||
snap, err := store.GetSnapshot(ver)
|
||||
snap := store.GetSnapshot(ver)
|
||||
snap.SetOption(kv.Priority, priority)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
it, err := snap.Iter(firstKey, upperBound)
|
||||
if err != nil {
|
||||
|
||||
@ -86,10 +86,7 @@ type Domain struct {
|
||||
// It returns the latest schema version, the changed table IDs, whether it's a full load and an error.
|
||||
func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion int64,
|
||||
startTS uint64) (neededSchemaVersion int64, change *tikv.RelatedSchemaChange, fullLoad bool, err error) {
|
||||
snapshot, err := do.store.GetSnapshot(kv.NewVersion(startTS))
|
||||
if err != nil {
|
||||
return 0, nil, fullLoad, err
|
||||
}
|
||||
snapshot := do.store.GetSnapshot(kv.NewVersion(startTS))
|
||||
m := meta.NewSnapshotMeta(snapshot)
|
||||
neededSchemaVersion, err = m.GetSchemaVersion()
|
||||
if err != nil {
|
||||
@ -320,10 +317,7 @@ func (do *Domain) GetSnapshotInfoSchema(snapshotTS uint64) (infoschema.InfoSchem
|
||||
|
||||
// GetSnapshotMeta gets a new snapshot meta at startTS.
|
||||
func (do *Domain) GetSnapshotMeta(startTS uint64) (*meta.Meta, error) {
|
||||
snapshot, err := do.store.GetSnapshot(kv.NewVersion(startTS))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
snapshot := do.store.GetSnapshot(kv.NewVersion(startTS))
|
||||
return meta.NewSnapshotMeta(snapshot), nil
|
||||
}
|
||||
|
||||
|
||||
@ -935,10 +935,7 @@ func (e *AnalyzeFastExec) handleScanIter(iter kv.Iterator) (scanKeysSize int, er
|
||||
}
|
||||
|
||||
func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err error) {
|
||||
snapshot, err := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
snapshot := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
|
||||
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
|
||||
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
|
||||
}
|
||||
@ -958,11 +955,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
|
||||
|
||||
func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
|
||||
defer e.wg.Done()
|
||||
var snapshot kv.Snapshot
|
||||
snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
|
||||
if *err != nil {
|
||||
return
|
||||
}
|
||||
snapshot := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
|
||||
snapshot.SetOption(kv.NotFillCache, true)
|
||||
snapshot.SetOption(kv.IsolationLevel, kv.RC)
|
||||
snapshot.SetOption(kv.Priority, kv.PriorityLow)
|
||||
|
||||
@ -99,10 +99,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
|
||||
// The snapshot may contains cache that can reduce RPC call.
|
||||
snapshot = txn.GetSnapshot()
|
||||
} else {
|
||||
snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
snapshot = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS})
|
||||
}
|
||||
if e.runtimeStats != nil {
|
||||
snapshotStats := &tikv.SnapshotRuntimeStats{}
|
||||
|
||||
@ -132,10 +132,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
|
||||
if e.txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() {
|
||||
e.snapshot = e.txn.GetSnapshot()
|
||||
} else {
|
||||
e.snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: snapshotTS})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.snapshot = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: snapshotTS})
|
||||
}
|
||||
if e.runtimeStats != nil {
|
||||
snapshotStats := &tikv.SnapshotRuntimeStats{}
|
||||
|
||||
@ -73,12 +73,12 @@ func (s *InjectedStore) BeginWithStartTS(startTS uint64) (Transaction, error) {
|
||||
}
|
||||
|
||||
// GetSnapshot creates an injected Snapshot.
|
||||
func (s *InjectedStore) GetSnapshot(ver Version) (Snapshot, error) {
|
||||
snapshot, err := s.Storage.GetSnapshot(ver)
|
||||
func (s *InjectedStore) GetSnapshot(ver Version) Snapshot {
|
||||
snapshot := s.Storage.GetSnapshot(ver)
|
||||
return &InjectedSnapshot{
|
||||
Snapshot: snapshot,
|
||||
cfg: s.cfg,
|
||||
}, err
|
||||
}
|
||||
}
|
||||
|
||||
// InjectedTransaction wraps a Transaction with injections.
|
||||
|
||||
@ -37,8 +37,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) {
|
||||
_, err = storage.BeginWithStartTS(0)
|
||||
c.Assert(err, IsNil)
|
||||
ver := Version{Ver: 1}
|
||||
snap, err := storage.GetSnapshot(ver)
|
||||
c.Assert(err, IsNil)
|
||||
snap := storage.GetSnapshot(ver)
|
||||
b, err := txn.Get(context.TODO(), []byte{'a'})
|
||||
c.Assert(err.Error(), Equals, err1.Error())
|
||||
c.Assert(b, IsNil)
|
||||
@ -63,8 +62,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) {
|
||||
storage = NewInjectedStore(newMockStorage(), &cfg)
|
||||
txn, err = storage.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
snap, err = storage.GetSnapshot(ver)
|
||||
c.Assert(err, IsNil)
|
||||
snap = storage.GetSnapshot(ver)
|
||||
|
||||
b, err = txn.Get(context.TODO(), []byte{'a'})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
@ -158,10 +158,10 @@ func (s *mockStorage) BeginWithStartTS(startTS uint64) (Transaction, error) {
|
||||
return s.Begin()
|
||||
}
|
||||
|
||||
func (s *mockStorage) GetSnapshot(ver Version) (Snapshot, error) {
|
||||
func (s *mockStorage) GetSnapshot(ver Version) Snapshot {
|
||||
return &mockSnapshot{
|
||||
store: newMemDB(),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *mockStorage) Close() error {
|
||||
|
||||
2
kv/kv.go
2
kv/kv.go
@ -448,7 +448,7 @@ type Storage interface {
|
||||
BeginWithStartTS(startTS uint64) (Transaction, error)
|
||||
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
|
||||
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
|
||||
GetSnapshot(ver Version) (Snapshot, error)
|
||||
GetSnapshot(ver Version) Snapshot
|
||||
// GetClient gets a client instance.
|
||||
GetClient() Client
|
||||
// Close store
|
||||
|
||||
@ -30,8 +30,7 @@ func (s testMockSuite) TestInterface(c *C) {
|
||||
storage.UUID()
|
||||
version, err := storage.CurrentVersion()
|
||||
c.Check(err, IsNil)
|
||||
snapshot, err := storage.GetSnapshot(version)
|
||||
c.Check(err, IsNil)
|
||||
snapshot := storage.GetSnapshot(version)
|
||||
_, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")})
|
||||
c.Check(err, IsNil)
|
||||
snapshot.SetOption(Priority, PriorityNormal)
|
||||
|
||||
@ -295,7 +295,7 @@ func (s *testSuite) TestSnapshot(c *C) {
|
||||
c.Assert(n, Equals, int64(2))
|
||||
txn.Commit(context.Background())
|
||||
|
||||
snapshot, _ := store.GetSnapshot(ver1)
|
||||
snapshot := store.GetSnapshot(ver1)
|
||||
snapMeta := meta.NewSnapshotMeta(snapshot)
|
||||
n, _ = snapMeta.GetGlobalID()
|
||||
c.Assert(n, Equals, int64(1))
|
||||
|
||||
@ -506,11 +506,7 @@ func (s *SchemaAmender) prepareKvMap(ctx context.Context, commitMutations tikv.C
|
||||
}
|
||||
// BatchGet the old key values, the Op_Del and Op_Put types keys in storage using forUpdateTS, the Op_put type is for
|
||||
// row update using the same row key, it may not exist.
|
||||
snapshot, err := s.sess.GetStore().GetSnapshot(kv.Version{Ver: s.sess.sessionVars.TxnCtx.GetForUpdateTS()})
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Warn("amend failed to get snapshot using forUpdateTS", zap.Error(err))
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
snapshot := s.sess.GetStore().GetSnapshot(kv.Version{Ver: s.sess.sessionVars.TxnCtx.GetForUpdateTS()})
|
||||
oldValKvMap, err := snapshot.BatchGet(ctx, removeKeys)
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Warn("amend failed to batch get kv old keys", zap.Error(err))
|
||||
|
||||
@ -390,8 +390,7 @@ func (s *testSchemaAmenderSuite) TestAmendCollectAndGenMutations(c *C) {
|
||||
curVer, err := se.store.CurrentVersion()
|
||||
c.Assert(err, IsNil)
|
||||
se.sessionVars.TxnCtx.SetForUpdateTS(curVer.Ver + 1)
|
||||
snap, err := se.store.GetSnapshot(kv.Version{Ver: se.sessionVars.TxnCtx.GetForUpdateTS()})
|
||||
c.Assert(err, IsNil)
|
||||
snap := se.store.GetSnapshot(kv.Version{Ver: se.sessionVars.TxnCtx.GetForUpdateTS()})
|
||||
oldVals, err := snap.BatchGet(ctx, oldKeys)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(oldVals), Equals, len(oldKeys))
|
||||
|
||||
@ -546,8 +546,7 @@ func (s *testKVSuite) TestDBClose(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(kv.MaxVersion.Cmp(ver), Equals, 1)
|
||||
|
||||
snap, err := store.GetSnapshot(kv.MaxVersion)
|
||||
c.Assert(err, IsNil)
|
||||
snap := store.GetSnapshot(kv.MaxVersion)
|
||||
|
||||
_, err = snap.Get(context.TODO(), []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
@ -561,8 +560,7 @@ func (s *testKVSuite) TestDBClose(c *C) {
|
||||
_, err = store.Begin()
|
||||
c.Assert(err, NotNil)
|
||||
|
||||
_, err = store.GetSnapshot(kv.MaxVersion)
|
||||
c.Assert(err, NotNil)
|
||||
_ = store.GetSnapshot(kv.MaxVersion)
|
||||
|
||||
err = txn.Set([]byte("a"), []byte("b"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
@ -1502,18 +1502,8 @@ func (c *twoPhaseCommitter) getUndeterminedErr() error {
|
||||
// schema version change. So we just check the version from meta snapshot, it's much stricter.
|
||||
func checkSchemaVersionForAsyncCommit(ctx context.Context, startTS uint64, commitTS uint64, store Storage) (bool, error) {
|
||||
if commitTS > 0 {
|
||||
snapshotAtStart, err := store.GetSnapshot(kv.NewVersion(startTS))
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Error("get snapshot failed for resolve async startTS",
|
||||
zap.Uint64("startTS", startTS), zap.Uint64("commitTS", commitTS))
|
||||
return false, errors.Trace(err)
|
||||
}
|
||||
snapShotAtCommit, err := store.GetSnapshot(kv.NewVersion(commitTS))
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Error("get snapshot failed for resolve async commitTS",
|
||||
zap.Uint64("startTS", startTS), zap.Uint64("commitTS", commitTS))
|
||||
return false, errors.Trace(err)
|
||||
}
|
||||
snapshotAtStart := store.GetSnapshot(kv.NewVersion(startTS))
|
||||
snapShotAtCommit := store.GetSnapshot(kv.NewVersion(commitTS))
|
||||
schemaVerAtStart, err := meta.NewSnapshotMeta(snapshotAtStart).GetSchemaVersion()
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
|
||||
@ -92,8 +92,7 @@ func (s *testAsyncCommitCommon) mustGetLock(c *C, key []byte) *Lock {
|
||||
}
|
||||
|
||||
func (s *testAsyncCommitCommon) mustPointGet(c *C, key, expectedValue []byte) {
|
||||
snap, err := s.store.GetSnapshot(kv.MaxVersion)
|
||||
c.Assert(err, IsNil)
|
||||
snap := s.store.GetSnapshot(kv.MaxVersion)
|
||||
value, err := snap.Get(context.Background(), key)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(value, BytesEquals, expectedValue)
|
||||
|
||||
@ -120,17 +120,15 @@ func (s *testGCWorkerSuite) mustPut(c *C, key, value string) {
|
||||
}
|
||||
|
||||
func (s *testGCWorkerSuite) mustGet(c *C, key string, ts uint64) string {
|
||||
snap, err := s.store.GetSnapshot(kv.Version{Ver: ts})
|
||||
c.Assert(err, IsNil)
|
||||
snap := s.store.GetSnapshot(kv.Version{Ver: ts})
|
||||
value, err := snap.Get(context.TODO(), []byte(key))
|
||||
c.Assert(err, IsNil)
|
||||
return string(value)
|
||||
}
|
||||
|
||||
func (s *testGCWorkerSuite) mustGetNone(c *C, key string, ts uint64) {
|
||||
snap, err := s.store.GetSnapshot(kv.Version{Ver: ts})
|
||||
c.Assert(err, IsNil)
|
||||
_, err = snap.Get(context.TODO(), []byte(key))
|
||||
snap := s.store.GetSnapshot(kv.Version{Ver: ts})
|
||||
_, err := snap.Get(context.TODO(), []byte(key))
|
||||
if err != nil {
|
||||
// Unistore's gc is based on compaction filter.
|
||||
// So skip the error check if err == nil.
|
||||
|
||||
@ -326,9 +326,9 @@ func (s *tikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error) {
|
||||
return txn, nil
|
||||
}
|
||||
|
||||
func (s *tikvStore) GetSnapshot(ver kv.Version) (kv.Snapshot, error) {
|
||||
func (s *tikvStore) GetSnapshot(ver kv.Version) kv.Snapshot {
|
||||
snapshot := newTiKVSnapshot(s, ver, s.nextReplicaReadSeed())
|
||||
return snapshot, nil
|
||||
return snapshot
|
||||
}
|
||||
|
||||
func (s *tikvStore) Close() error {
|
||||
|
||||
@ -38,7 +38,7 @@ func (s *Store) Begin() (kv.Transaction, error) { return nil, nil }
|
||||
func (s *Store) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { return s.Begin() }
|
||||
|
||||
// GetSnapshot implements kv.Storage interface.
|
||||
func (s *Store) GetSnapshot(ver kv.Version) (kv.Snapshot, error) { return nil, nil }
|
||||
func (s *Store) GetSnapshot(ver kv.Version) kv.Snapshot { return nil }
|
||||
|
||||
// Close implements kv.Storage interface.
|
||||
func (s *Store) Close() error { return nil }
|
||||
|
||||
Reference in New Issue
Block a user