From 81fa1b37e8a487382a9feaf8697cc84c8a2120fc Mon Sep 17 00:00:00 2001 From: Jack Yu Date: Fri, 16 Oct 2020 11:30:43 +0800 Subject: [PATCH] *: refactor GetSnapshot (#20475) Signed-off-by: Shuaipeng Yu --- ddl/backfilling.go | 5 +---- domain/domain.go | 10 ++-------- executor/analyze.go | 11 ++--------- executor/batch_point_get.go | 5 +---- executor/point_get.go | 5 +---- kv/fault_injection.go | 6 +++--- kv/fault_injection_test.go | 6 ++---- kv/interface_mock_test.go | 4 ++-- kv/kv.go | 2 +- kv/mock_test.go | 3 +-- meta/meta_test.go | 2 +- session/schema_amender.go | 6 +----- session/schema_amender_test.go | 3 +-- store/store_test.go | 6 ++---- store/tikv/2pc.go | 14 ++------------ store/tikv/async_commit_test.go | 3 +-- store/tikv/gcworker/gc_worker_test.go | 8 +++----- store/tikv/kv.go | 4 ++-- util/mock/store.go | 2 +- 19 files changed, 30 insertions(+), 75 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 5a8e4ece3e..5876ff3bf5 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -592,11 +592,8 @@ func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version } ver := kv.Version{Ver: version} - snap, err := store.GetSnapshot(ver) + snap := store.GetSnapshot(ver) snap.SetOption(kv.Priority, priority) - if err != nil { - return errors.Trace(err) - } it, err := snap.Iter(firstKey, upperBound) if err != nil { diff --git a/domain/domain.go b/domain/domain.go index 1c674ac5fe..858c1d9f5c 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -86,10 +86,7 @@ type Domain struct { // It returns the latest schema version, the changed table IDs, whether it's a full load and an error. func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion int64, startTS uint64) (neededSchemaVersion int64, change *tikv.RelatedSchemaChange, fullLoad bool, err error) { - snapshot, err := do.store.GetSnapshot(kv.NewVersion(startTS)) - if err != nil { - return 0, nil, fullLoad, err - } + snapshot := do.store.GetSnapshot(kv.NewVersion(startTS)) m := meta.NewSnapshotMeta(snapshot) neededSchemaVersion, err = m.GetSchemaVersion() if err != nil { @@ -320,10 +317,7 @@ func (do *Domain) GetSnapshotInfoSchema(snapshotTS uint64) (infoschema.InfoSchem // GetSnapshotMeta gets a new snapshot meta at startTS. func (do *Domain) GetSnapshotMeta(startTS uint64) (*meta.Meta, error) { - snapshot, err := do.store.GetSnapshot(kv.NewVersion(startTS)) - if err != nil { - return nil, err - } + snapshot := do.store.GetSnapshot(kv.NewVersion(startTS)) return meta.NewSnapshotMeta(snapshot), nil } diff --git a/executor/analyze.go b/executor/analyze.go index a3aab93b09..e505ba45cd 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -935,10 +935,7 @@ func (e *AnalyzeFastExec) handleScanIter(iter kv.Iterator) (scanKeysSize int, er } func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err error) { - snapshot, err := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion) - if err != nil { - return 0, err - } + snapshot := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion) if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } @@ -958,11 +955,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { defer e.wg.Done() - var snapshot kv.Snapshot - snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion) - if *err != nil { - return - } + snapshot := e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion) snapshot.SetOption(kv.NotFillCache, true) snapshot.SetOption(kv.IsolationLevel, kv.RC) snapshot.SetOption(kv.Priority, kv.PriorityLow) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index c4fdca29b4..3b7e169410 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -99,10 +99,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { // The snapshot may contains cache that can reduce RPC call. snapshot = txn.GetSnapshot() } else { - snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS}) - if err != nil { - return err - } + snapshot = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS}) } if e.runtimeStats != nil { snapshotStats := &tikv.SnapshotRuntimeStats{} diff --git a/executor/point_get.go b/executor/point_get.go index ab0f13c9be..79672c1fe7 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -132,10 +132,7 @@ func (e *PointGetExecutor) Open(context.Context) error { if e.txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() { e.snapshot = e.txn.GetSnapshot() } else { - e.snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: snapshotTS}) - if err != nil { - return err - } + e.snapshot = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: snapshotTS}) } if e.runtimeStats != nil { snapshotStats := &tikv.SnapshotRuntimeStats{} diff --git a/kv/fault_injection.go b/kv/fault_injection.go index ca187d36bb..e5c3d0aebe 100644 --- a/kv/fault_injection.go +++ b/kv/fault_injection.go @@ -73,12 +73,12 @@ func (s *InjectedStore) BeginWithStartTS(startTS uint64) (Transaction, error) { } // GetSnapshot creates an injected Snapshot. -func (s *InjectedStore) GetSnapshot(ver Version) (Snapshot, error) { - snapshot, err := s.Storage.GetSnapshot(ver) +func (s *InjectedStore) GetSnapshot(ver Version) Snapshot { + snapshot := s.Storage.GetSnapshot(ver) return &InjectedSnapshot{ Snapshot: snapshot, cfg: s.cfg, - }, err + } } // InjectedTransaction wraps a Transaction with injections. diff --git a/kv/fault_injection_test.go b/kv/fault_injection_test.go index 004664d5bf..500dd3a536 100644 --- a/kv/fault_injection_test.go +++ b/kv/fault_injection_test.go @@ -37,8 +37,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) { _, err = storage.BeginWithStartTS(0) c.Assert(err, IsNil) ver := Version{Ver: 1} - snap, err := storage.GetSnapshot(ver) - c.Assert(err, IsNil) + snap := storage.GetSnapshot(ver) b, err := txn.Get(context.TODO(), []byte{'a'}) c.Assert(err.Error(), Equals, err1.Error()) c.Assert(b, IsNil) @@ -63,8 +62,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) { storage = NewInjectedStore(newMockStorage(), &cfg) txn, err = storage.Begin() c.Assert(err, IsNil) - snap, err = storage.GetSnapshot(ver) - c.Assert(err, IsNil) + snap = storage.GetSnapshot(ver) b, err = txn.Get(context.TODO(), []byte{'a'}) c.Assert(err, IsNil) diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 33fc7e7287..ade7eb0e8c 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -158,10 +158,10 @@ func (s *mockStorage) BeginWithStartTS(startTS uint64) (Transaction, error) { return s.Begin() } -func (s *mockStorage) GetSnapshot(ver Version) (Snapshot, error) { +func (s *mockStorage) GetSnapshot(ver Version) Snapshot { return &mockSnapshot{ store: newMemDB(), - }, nil + } } func (s *mockStorage) Close() error { diff --git a/kv/kv.go b/kv/kv.go index eb8d9dfdf8..6a20467dd4 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -448,7 +448,7 @@ type Storage interface { BeginWithStartTS(startTS uint64) (Transaction, error) // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. - GetSnapshot(ver Version) (Snapshot, error) + GetSnapshot(ver Version) Snapshot // GetClient gets a client instance. GetClient() Client // Close store diff --git a/kv/mock_test.go b/kv/mock_test.go index 52d972263f..bbc0b74151 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -30,8 +30,7 @@ func (s testMockSuite) TestInterface(c *C) { storage.UUID() version, err := storage.CurrentVersion() c.Check(err, IsNil) - snapshot, err := storage.GetSnapshot(version) - c.Check(err, IsNil) + snapshot := storage.GetSnapshot(version) _, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")}) c.Check(err, IsNil) snapshot.SetOption(Priority, PriorityNormal) diff --git a/meta/meta_test.go b/meta/meta_test.go index 17a6c8f735..4ea9db0e88 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -295,7 +295,7 @@ func (s *testSuite) TestSnapshot(c *C) { c.Assert(n, Equals, int64(2)) txn.Commit(context.Background()) - snapshot, _ := store.GetSnapshot(ver1) + snapshot := store.GetSnapshot(ver1) snapMeta := meta.NewSnapshotMeta(snapshot) n, _ = snapMeta.GetGlobalID() c.Assert(n, Equals, int64(1)) diff --git a/session/schema_amender.go b/session/schema_amender.go index a52a64a9bd..d4befdec04 100644 --- a/session/schema_amender.go +++ b/session/schema_amender.go @@ -506,11 +506,7 @@ func (s *SchemaAmender) prepareKvMap(ctx context.Context, commitMutations tikv.C } // BatchGet the old key values, the Op_Del and Op_Put types keys in storage using forUpdateTS, the Op_put type is for // row update using the same row key, it may not exist. - snapshot, err := s.sess.GetStore().GetSnapshot(kv.Version{Ver: s.sess.sessionVars.TxnCtx.GetForUpdateTS()}) - if err != nil { - logutil.Logger(ctx).Warn("amend failed to get snapshot using forUpdateTS", zap.Error(err)) - return nil, errors.Trace(err) - } + snapshot := s.sess.GetStore().GetSnapshot(kv.Version{Ver: s.sess.sessionVars.TxnCtx.GetForUpdateTS()}) oldValKvMap, err := snapshot.BatchGet(ctx, removeKeys) if err != nil { logutil.Logger(ctx).Warn("amend failed to batch get kv old keys", zap.Error(err)) diff --git a/session/schema_amender_test.go b/session/schema_amender_test.go index 73f48bcfcd..67d91dad03 100644 --- a/session/schema_amender_test.go +++ b/session/schema_amender_test.go @@ -390,8 +390,7 @@ func (s *testSchemaAmenderSuite) TestAmendCollectAndGenMutations(c *C) { curVer, err := se.store.CurrentVersion() c.Assert(err, IsNil) se.sessionVars.TxnCtx.SetForUpdateTS(curVer.Ver + 1) - snap, err := se.store.GetSnapshot(kv.Version{Ver: se.sessionVars.TxnCtx.GetForUpdateTS()}) - c.Assert(err, IsNil) + snap := se.store.GetSnapshot(kv.Version{Ver: se.sessionVars.TxnCtx.GetForUpdateTS()}) oldVals, err := snap.BatchGet(ctx, oldKeys) c.Assert(err, IsNil) c.Assert(len(oldVals), Equals, len(oldKeys)) diff --git a/store/store_test.go b/store/store_test.go index 066d7334e4..54d431f33a 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -546,8 +546,7 @@ func (s *testKVSuite) TestDBClose(c *C) { c.Assert(err, IsNil) c.Assert(kv.MaxVersion.Cmp(ver), Equals, 1) - snap, err := store.GetSnapshot(kv.MaxVersion) - c.Assert(err, IsNil) + snap := store.GetSnapshot(kv.MaxVersion) _, err = snap.Get(context.TODO(), []byte("a")) c.Assert(err, IsNil) @@ -561,8 +560,7 @@ func (s *testKVSuite) TestDBClose(c *C) { _, err = store.Begin() c.Assert(err, NotNil) - _, err = store.GetSnapshot(kv.MaxVersion) - c.Assert(err, NotNil) + _ = store.GetSnapshot(kv.MaxVersion) err = txn.Set([]byte("a"), []byte("b")) c.Assert(err, IsNil) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 761deb2592..90738da18d 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -1502,18 +1502,8 @@ func (c *twoPhaseCommitter) getUndeterminedErr() error { // schema version change. So we just check the version from meta snapshot, it's much stricter. func checkSchemaVersionForAsyncCommit(ctx context.Context, startTS uint64, commitTS uint64, store Storage) (bool, error) { if commitTS > 0 { - snapshotAtStart, err := store.GetSnapshot(kv.NewVersion(startTS)) - if err != nil { - logutil.Logger(ctx).Error("get snapshot failed for resolve async startTS", - zap.Uint64("startTS", startTS), zap.Uint64("commitTS", commitTS)) - return false, errors.Trace(err) - } - snapShotAtCommit, err := store.GetSnapshot(kv.NewVersion(commitTS)) - if err != nil { - logutil.Logger(ctx).Error("get snapshot failed for resolve async commitTS", - zap.Uint64("startTS", startTS), zap.Uint64("commitTS", commitTS)) - return false, errors.Trace(err) - } + snapshotAtStart := store.GetSnapshot(kv.NewVersion(startTS)) + snapShotAtCommit := store.GetSnapshot(kv.NewVersion(commitTS)) schemaVerAtStart, err := meta.NewSnapshotMeta(snapshotAtStart).GetSchemaVersion() if err != nil { return false, errors.Trace(err) diff --git a/store/tikv/async_commit_test.go b/store/tikv/async_commit_test.go index b3cfff13c5..5d63cfd80a 100644 --- a/store/tikv/async_commit_test.go +++ b/store/tikv/async_commit_test.go @@ -92,8 +92,7 @@ func (s *testAsyncCommitCommon) mustGetLock(c *C, key []byte) *Lock { } func (s *testAsyncCommitCommon) mustPointGet(c *C, key, expectedValue []byte) { - snap, err := s.store.GetSnapshot(kv.MaxVersion) - c.Assert(err, IsNil) + snap := s.store.GetSnapshot(kv.MaxVersion) value, err := snap.Get(context.Background(), key) c.Assert(err, IsNil) c.Assert(value, BytesEquals, expectedValue) diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 222954145e..b8fb124316 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -120,17 +120,15 @@ func (s *testGCWorkerSuite) mustPut(c *C, key, value string) { } func (s *testGCWorkerSuite) mustGet(c *C, key string, ts uint64) string { - snap, err := s.store.GetSnapshot(kv.Version{Ver: ts}) - c.Assert(err, IsNil) + snap := s.store.GetSnapshot(kv.Version{Ver: ts}) value, err := snap.Get(context.TODO(), []byte(key)) c.Assert(err, IsNil) return string(value) } func (s *testGCWorkerSuite) mustGetNone(c *C, key string, ts uint64) { - snap, err := s.store.GetSnapshot(kv.Version{Ver: ts}) - c.Assert(err, IsNil) - _, err = snap.Get(context.TODO(), []byte(key)) + snap := s.store.GetSnapshot(kv.Version{Ver: ts}) + _, err := snap.Get(context.TODO(), []byte(key)) if err != nil { // Unistore's gc is based on compaction filter. // So skip the error check if err == nil. diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 6b9b7f1197..4950b812d3 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -326,9 +326,9 @@ func (s *tikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { return txn, nil } -func (s *tikvStore) GetSnapshot(ver kv.Version) (kv.Snapshot, error) { +func (s *tikvStore) GetSnapshot(ver kv.Version) kv.Snapshot { snapshot := newTiKVSnapshot(s, ver, s.nextReplicaReadSeed()) - return snapshot, nil + return snapshot } func (s *tikvStore) Close() error { diff --git a/util/mock/store.go b/util/mock/store.go index 0322017133..76d3974b38 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -38,7 +38,7 @@ func (s *Store) Begin() (kv.Transaction, error) { return nil, nil } func (s *Store) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { return s.Begin() } // GetSnapshot implements kv.Storage interface. -func (s *Store) GetSnapshot(ver kv.Version) (kv.Snapshot, error) { return nil, nil } +func (s *Store) GetSnapshot(ver kv.Version) kv.Snapshot { return nil } // Close implements kv.Storage interface. func (s *Store) Close() error { return nil }