Merge pull request #462 from pingcap/siddontang/fix-store-close
localstore: check db closed
This commit is contained in:
@ -30,7 +30,9 @@ var (
|
||||
)
|
||||
|
||||
type dbStore struct {
|
||||
mu sync.Mutex
|
||||
mu sync.Mutex
|
||||
snapLock sync.RWMutex
|
||||
|
||||
db engine.DB
|
||||
|
||||
txns map[uint64]*dbTxn
|
||||
@ -38,6 +40,8 @@ type dbStore struct {
|
||||
uuid string
|
||||
path string
|
||||
compactor *localstoreCompactor
|
||||
|
||||
closed bool
|
||||
}
|
||||
|
||||
type storeCache struct {
|
||||
@ -49,6 +53,9 @@ var (
|
||||
globalID int64
|
||||
globalVersionProvider kv.VersionProvider
|
||||
mc storeCache
|
||||
|
||||
// ErrDBClosed is the error meaning db is closed and we can use it anymore.
|
||||
ErrDBClosed = errors.New("db is closed")
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -92,6 +99,7 @@ func (d Driver) Open(schema string) (kv.Storage, error) {
|
||||
path: schema,
|
||||
db: db,
|
||||
compactor: newLocalCompactor(localCompactDefaultPolicy, db),
|
||||
closed: false,
|
||||
}
|
||||
mc.cache[schema] = s
|
||||
s.compactor.Start()
|
||||
@ -103,11 +111,19 @@ func (s *dbStore) UUID() string {
|
||||
}
|
||||
|
||||
func (s *dbStore) GetSnapshot() (kv.MvccSnapshot, error) {
|
||||
s.snapLock.RLock()
|
||||
defer s.snapLock.RUnlock()
|
||||
|
||||
if s.closed {
|
||||
return nil, errors.Trace(ErrDBClosed)
|
||||
}
|
||||
|
||||
currentVer, err := globalVersionProvider.CurrentVersion()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return &dbSnapshot{
|
||||
store: s,
|
||||
db: s.db,
|
||||
version: currentVer,
|
||||
}, nil
|
||||
@ -118,6 +134,10 @@ func (s *dbStore) Begin() (kv.Transaction, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.closed {
|
||||
return nil, errors.Trace(ErrDBClosed)
|
||||
}
|
||||
|
||||
beginVer, err := globalVersionProvider.CurrentVersion()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -131,6 +151,7 @@ func (s *dbStore) Begin() (kv.Transaction, error) {
|
||||
}
|
||||
log.Debugf("Begin txn:%d", txn.tid)
|
||||
txn.UnionStore, err = kv.NewUnionStore(&dbSnapshot{
|
||||
store: s,
|
||||
db: s.db,
|
||||
version: beginVer,
|
||||
})
|
||||
@ -141,6 +162,18 @@ func (s *dbStore) Begin() (kv.Transaction, error) {
|
||||
}
|
||||
|
||||
func (s *dbStore) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.snapLock.Lock()
|
||||
defer s.snapLock.Unlock()
|
||||
|
||||
if s.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.closed = true
|
||||
|
||||
mc.mu.Lock()
|
||||
defer mc.mu.Unlock()
|
||||
s.compactor.Stop()
|
||||
@ -152,6 +185,10 @@ func (s *dbStore) writeBatch(b engine.Batch) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.closed {
|
||||
return errors.Trace(ErrDBClosed)
|
||||
}
|
||||
|
||||
err := s.db.Commit(b)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
@ -170,6 +207,10 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string, snapshotVal []byte
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.closed {
|
||||
return errors.Trace(ErrDBClosed)
|
||||
}
|
||||
|
||||
if _, ok := s.keysLocked[key]; ok {
|
||||
return errors.Trace(kv.ErrLockConflict)
|
||||
}
|
||||
@ -203,6 +244,10 @@ func (s *dbStore) unLockKeys(keys ...string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.closed {
|
||||
return errors.Trace(ErrDBClosed)
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
if _, ok := s.keysLocked[key]; !ok {
|
||||
return errors.Trace(kv.ErrNotExist)
|
||||
|
||||
@ -507,3 +507,47 @@ func (s *testKVSuite) TestConditionUpdate(c *C) {
|
||||
err = txn.Commit()
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
|
||||
func (s *testKVSuite) TestDBClose(c *C) {
|
||||
path := "memory:test"
|
||||
d := Driver{
|
||||
goleveldb.MemoryDriver{},
|
||||
}
|
||||
store, err := d.Open(path)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
txn, err := store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = txn.Set([]byte("a"), []byte("b"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = txn.Commit()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
snap, err := store.GetSnapshot()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
_, err = snap.MvccGet(kv.EncodeKey([]byte("a")), kv.MaxVersion)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
txn, err = store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = store.Close()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
_, err = store.Begin()
|
||||
c.Assert(err, NotNil)
|
||||
|
||||
_, err = store.GetSnapshot()
|
||||
c.Assert(err, NotNil)
|
||||
|
||||
err = txn.Set([]byte("a"), []byte("b"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = txn.Commit()
|
||||
c.Assert(err, NotNil)
|
||||
|
||||
snap.MvccRelease()
|
||||
}
|
||||
|
||||
@ -31,12 +31,20 @@ var (
|
||||
|
||||
// dbSnapshot implements MvccSnapshot interface.
|
||||
type dbSnapshot struct {
|
||||
store *dbStore
|
||||
db engine.DB
|
||||
rawIt engine.Iterator
|
||||
version kv.Version // transaction begin version
|
||||
}
|
||||
|
||||
func (s *dbSnapshot) internalSeek(startKey []byte) (engine.Iterator, error) {
|
||||
s.store.snapLock.RLock()
|
||||
defer s.store.snapLock.RUnlock()
|
||||
|
||||
if s.store.closed {
|
||||
return nil, errors.Trace(ErrDBClosed)
|
||||
}
|
||||
|
||||
if s.rawIt == nil {
|
||||
var err error
|
||||
s.rawIt, err = s.db.Seek([]byte{0})
|
||||
@ -114,10 +122,13 @@ func (s *dbSnapshot) MvccRelease() {
|
||||
}
|
||||
|
||||
func (s *dbSnapshot) Release() {
|
||||
if s.rawIt != nil {
|
||||
s.rawIt.Release()
|
||||
s.rawIt = nil
|
||||
if s.rawIt == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: check whether Release will panic if store is closed.
|
||||
s.rawIt.Release()
|
||||
s.rawIt = nil
|
||||
}
|
||||
|
||||
type dbIter struct {
|
||||
|
||||
Reference in New Issue
Block a user