From af44934dd1b4d8426534747eae192bee3704bab1 Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 28 Oct 2015 11:42:03 +0800 Subject: [PATCH 1/2] localstore: check db closed --- store/localstore/goleveldb/goleveldb_test.go | 4 ++ store/localstore/kv.go | 47 +++++++++++++++++++- store/localstore/kv_test.go | 44 ++++++++++++++++++ store/localstore/snapshot.go | 17 +++++-- 4 files changed, 108 insertions(+), 4 deletions(-) diff --git a/store/localstore/goleveldb/goleveldb_test.go b/store/localstore/goleveldb/goleveldb_test.go index cc04969816..323ca5933f 100644 --- a/store/localstore/goleveldb/goleveldb_test.go +++ b/store/localstore/goleveldb/goleveldb_test.go @@ -40,7 +40,11 @@ func (s *testSuite) SetUpSuite(c *C) { } func (s *testSuite) TearDownSuite(c *C) { + it, err := s.db.Seek(nil) + c.Assert(err, IsNil) s.db.Close() + + it.Release() } func (s *testSuite) TestDB(c *C) { diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 7d15637a87..413e9eb819 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -30,7 +30,9 @@ var ( ) type dbStore struct { - mu sync.Mutex + mu sync.Mutex + snapLock sync.RWMutex + db engine.DB txns map[uint64]*dbTxn @@ -38,6 +40,8 @@ type dbStore struct { uuid string path string compactor *localstoreCompactor + + closed bool } type storeCache struct { @@ -49,6 +53,9 @@ var ( globalID int64 globalVersionProvider kv.VersionProvider mc storeCache + + // ErrDBClosed is the error meaning db is closed and we can use it anymore. + ErrDBClosed = errors.New("db is closed") ) func init() { @@ -92,6 +99,7 @@ func (d Driver) Open(schema string) (kv.Storage, error) { path: schema, db: db, compactor: newLocalCompactor(localCompactDefaultPolicy, db), + closed: false, } mc.cache[schema] = s s.compactor.Start() @@ -103,11 +111,19 @@ func (s *dbStore) UUID() string { } func (s *dbStore) GetSnapshot() (kv.MvccSnapshot, error) { + s.snapLock.RLock() + defer s.snapLock.RUnlock() + + if s.closed { + return nil, errors.Trace(ErrDBClosed) + } + currentVer, err := globalVersionProvider.CurrentVersion() if err != nil { return nil, errors.Trace(err) } return &dbSnapshot{ + store: s, db: s.db, version: currentVer, }, nil @@ -118,6 +134,10 @@ func (s *dbStore) Begin() (kv.Transaction, error) { s.mu.Lock() defer s.mu.Unlock() + if s.closed { + return nil, errors.Trace(ErrDBClosed) + } + beginVer, err := globalVersionProvider.CurrentVersion() if err != nil { return nil, err @@ -131,6 +151,7 @@ func (s *dbStore) Begin() (kv.Transaction, error) { } log.Debugf("Begin txn:%d", txn.tid) txn.UnionStore, err = kv.NewUnionStore(&dbSnapshot{ + store: s, db: s.db, version: beginVer, }) @@ -141,6 +162,18 @@ func (s *dbStore) Begin() (kv.Transaction, error) { } func (s *dbStore) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + s.snapLock.Lock() + defer s.snapLock.Unlock() + + if s.closed { + return nil + } + + s.closed = true + mc.mu.Lock() defer mc.mu.Unlock() s.compactor.Stop() @@ -152,6 +185,10 @@ func (s *dbStore) writeBatch(b engine.Batch) error { s.mu.Lock() defer s.mu.Unlock() + if s.closed { + return errors.Trace(ErrDBClosed) + } + err := s.db.Commit(b) if err != nil { log.Error(err) @@ -170,6 +207,10 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string, snapshotVal []byte s.mu.Lock() defer s.mu.Unlock() + if s.closed { + return errors.Trace(ErrDBClosed) + } + if _, ok := s.keysLocked[key]; ok { return errors.Trace(kv.ErrLockConflict) } @@ -203,6 +244,10 @@ func (s *dbStore) unLockKeys(keys ...string) error { s.mu.Lock() defer s.mu.Unlock() + if s.closed { + return errors.Trace(ErrDBClosed) + } + for _, key := range keys { if _, ok := s.keysLocked[key]; !ok { return errors.Trace(kv.ErrNotExist) diff --git a/store/localstore/kv_test.go b/store/localstore/kv_test.go index 777e5669fd..06b3922dce 100644 --- a/store/localstore/kv_test.go +++ b/store/localstore/kv_test.go @@ -507,3 +507,47 @@ func (s *testKVSuite) TestConditionUpdate(c *C) { err = txn.Commit() c.Assert(err, IsNil) } + +func (s *testKVSuite) TestDBClose(c *C) { + path := "memory:test" + d := Driver{ + goleveldb.MemoryDriver{}, + } + store, err := d.Open(path) + c.Assert(err, IsNil) + + txn, err := store.Begin() + c.Assert(err, IsNil) + + err = txn.Set([]byte("a"), []byte("b")) + c.Assert(err, IsNil) + + err = txn.Commit() + c.Assert(err, IsNil) + + snap, err := store.GetSnapshot() + c.Assert(err, IsNil) + + _, err = snap.MvccGet(kv.EncodeKey([]byte("a")), kv.MaxVersion) + c.Assert(err, IsNil) + + txn, err = store.Begin() + c.Assert(err, IsNil) + + err = store.Close() + c.Assert(err, IsNil) + + _, err = store.Begin() + c.Assert(err, NotNil) + + _, err = store.GetSnapshot() + c.Assert(err, NotNil) + + err = txn.Set([]byte("a"), []byte("b")) + c.Assert(err, IsNil) + + err = txn.Commit() + c.Assert(err, NotNil) + + snap.MvccRelease() +} diff --git a/store/localstore/snapshot.go b/store/localstore/snapshot.go index 2340001520..46ec3573b4 100644 --- a/store/localstore/snapshot.go +++ b/store/localstore/snapshot.go @@ -31,12 +31,20 @@ var ( // dbSnapshot implements MvccSnapshot interface. type dbSnapshot struct { + store *dbStore db engine.DB rawIt engine.Iterator version kv.Version // transaction begin version } func (s *dbSnapshot) internalSeek(startKey []byte) (engine.Iterator, error) { + s.store.snapLock.RLock() + defer s.store.snapLock.RUnlock() + + if s.store.closed { + return nil, errors.Trace(ErrDBClosed) + } + if s.rawIt == nil { var err error s.rawIt, err = s.db.Seek([]byte{0}) @@ -114,10 +122,13 @@ func (s *dbSnapshot) MvccRelease() { } func (s *dbSnapshot) Release() { - if s.rawIt != nil { - s.rawIt.Release() - s.rawIt = nil + if s.rawIt == nil { + return } + + // TODO: check whether Release will panic if store is closed. + s.rawIt.Release() + s.rawIt = nil } type dbIter struct { From c5cb6c01959f5c24d54cfe88dd466cadada92b4f Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 28 Oct 2015 11:44:20 +0800 Subject: [PATCH 2/2] goleveldb: remove unnecessary test. --- store/localstore/goleveldb/goleveldb_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/store/localstore/goleveldb/goleveldb_test.go b/store/localstore/goleveldb/goleveldb_test.go index 323ca5933f..cc04969816 100644 --- a/store/localstore/goleveldb/goleveldb_test.go +++ b/store/localstore/goleveldb/goleveldb_test.go @@ -40,11 +40,7 @@ func (s *testSuite) SetUpSuite(c *C) { } func (s *testSuite) TearDownSuite(c *C) { - it, err := s.db.Seek(nil) - c.Assert(err, IsNil) s.db.Close() - - it.Release() } func (s *testSuite) TestDB(c *C) {