diff --git a/kv/compactor.go b/kv/compactor.go deleted file mode 100644 index 1687225e82..0000000000 --- a/kv/compactor.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2015 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import "time" - -// CompactPolicy defines gc policy of MVCC storage. -type CompactPolicy struct { - // SafePoint specifies - SafePoint int - // TriggerInterval specifies how often should the compactor - // scans outdated data. - TriggerInterval time.Duration - // BatchDeleteCnt specifies the batch size for - // deleting outdated data transaction. - BatchDeleteCnt int -} - -// Compactor compacts MVCC storage. -type Compactor interface { - // OnGet is the hook point on Txn.Get. - OnGet(k Key) - // OnSet is the hook point on Txn.Set. - OnSet(k Key) - // OnDelete is the hook point on Txn.Delete. - OnDelete(k Key) - // Compact is the function removes the given key. - Compact(k Key) error -} diff --git a/kv/error.go b/kv/error.go new file mode 100644 index 0000000000..23523a1e16 --- /dev/null +++ b/kv/error.go @@ -0,0 +1,75 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "errors" + "strings" + + "github.com/pingcap/go-themis" + "github.com/pingcap/tidb/terror" +) + +var ( + // ErrClosed is used when close an already closed txn. + ErrClosed = errors.New("Error: Transaction already closed") + // ErrNotExist is used when try to get an entry with an unexist key from KV store. + ErrNotExist = errors.New("Error: key not exist") + // ErrKeyExists is used when try to put an entry to KV store. + ErrKeyExists = errors.New("Error: key already exist") + // ErrConditionNotMatch is used when condition is not met. + ErrConditionNotMatch = errors.New("Error: Condition not match") + // ErrLockConflict is used when try to lock an already locked key. + ErrLockConflict = errors.New("Error: Lock conflict") + // ErrLazyConditionPairsNotMatch is used when value in store differs from expect pairs. + ErrLazyConditionPairsNotMatch = errors.New("Error: Lazy condition pairs not match") + // ErrRetryable is used when KV store occurs RPC error or some other + // errors which SQL layer can safely retry. + ErrRetryable = errors.New("Error: KV error safe to retry") + // ErrCannotSetNilValue is the error when sets an empty value. + ErrCannotSetNilValue = errors.New("can not set nil value") + // ErrInvalidTxn is the error when commits or rollbacks in an invalid transaction. + ErrInvalidTxn = errors.New("invalid transaction") + + // ErrNotCommitted is the error returned by CommitVersion when this + // transaction is not committed. + ErrNotCommitted = errors.New("this transaction has not committed") +) + +// IsRetryableError checks if the err is a fatal error and the under going operation is worth to retry. +func IsRetryableError(err error) bool { + if err == nil { + return false + } + + if terror.ErrorEqual(err, ErrRetryable) || + terror.ErrorEqual(err, ErrLockConflict) || + terror.ErrorEqual(err, ErrConditionNotMatch) || + terror.ErrorEqual(err, themis.ErrRetryable) || + // HBase exception message will tell you if you should retry or not + strings.Contains(err.Error(), "try again later") { + return true + } + + return false +} + +// IsErrNotFound checks if err is a kind of NotFound error. +func IsErrNotFound(err error) bool { + if terror.ErrorEqual(err, ErrNotExist) { + return true + } + + return false +} diff --git a/kv/iter.go b/kv/iter.go index b65d4e963e..d2d4dd7dcd 100644 --- a/kv/iter.go +++ b/kv/iter.go @@ -15,28 +15,6 @@ package kv import "github.com/juju/errors" -var ( - // ErrClosed is used when close an already closed txn. - ErrClosed = errors.New("Error: Transaction already closed") - // ErrNotExist is used when try to get an entry with an unexist key from KV store. - ErrNotExist = errors.New("Error: key not exist") - // ErrKeyExists is used when try to put an entry to KV store. - ErrKeyExists = errors.New("Error: key already exist") - // ErrConditionNotMatch is used when condition is not met. - ErrConditionNotMatch = errors.New("Error: Condition not match") - // ErrLockConflict is used when try to lock an already locked key. - ErrLockConflict = errors.New("Error: Lock conflict") - // ErrLazyConditionPairsNotMatch is used when value in store differs from expect pairs. - ErrLazyConditionPairsNotMatch = errors.New("Error: Lazy condition pairs not match") - // ErrRetryable is used when KV store occurs RPC error or some other - // errors which SQL layer can safely retry. - ErrRetryable = errors.New("Error: KV error safe to retry") - // ErrCannotSetNilValue is the error when sets an empty value. - ErrCannotSetNilValue = errors.New("can not set nil value") - // ErrInvalidTxn is the error when commits or rollbacks in an invalid transaction. - ErrInvalidTxn = errors.New("invalid transaction") -) - // NextUntil applies FnKeyCmp to each entry of the iterator until meets some condition. // It will stop when fn returns true, or iterator is invalid or an error occurs. func NextUntil(it Iterator, fn FnKeyCmp) error { diff --git a/kv/key.go b/kv/key.go new file mode 100644 index 0000000000..d76f505fd3 --- /dev/null +++ b/kv/key.go @@ -0,0 +1,47 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import "bytes" + +// Key represents high-level Key type. +type Key []byte + +// Next returns the next key in byte-order. +func (k Key) Next() Key { + // add 0x0 to the end of key + buf := make([]byte, len([]byte(k))+1) + copy(buf, []byte(k)) + return buf +} + +// Cmp returns the comparison result of two key. +// The result will be 0 if a==b, -1 if a < b, and +1 if a > b. +func (k Key) Cmp(another Key) int { + return bytes.Compare(k, another) +} + +// EncodedKey represents encoded key in low-level storage engine. +type EncodedKey []byte + +// Cmp returns the comparison result of two key. +// The result will be 0 if a==b, -1 if a < b, and +1 if a > b. +func (k EncodedKey) Cmp(another EncodedKey) int { + return bytes.Compare(k, another) +} + +// Next returns the next key in byte-order. +func (k EncodedKey) Next() EncodedKey { + return EncodedKey(bytes.Join([][]byte{k, Key{0}}, nil)) +} diff --git a/kv/kv.go b/kv/kv.go index fc918338cb..fab9416c82 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -13,98 +13,6 @@ package kv -import ( - "bytes" - "math" - - "github.com/juju/errors" -) - -// Key represents high-level Key type. -type Key []byte - -// Next returns the next key in byte-order. -func (k Key) Next() Key { - // add 0x0 to the end of key - buf := make([]byte, len([]byte(k))+1) - copy(buf, []byte(k)) - return buf -} - -// Cmp returns the comparison result of two key. -// The result will be 0 if a==b, -1 if a < b, and +1 if a > b. -func (k Key) Cmp(another Key) int { - return bytes.Compare(k, another) -} - -// EncodedKey represents encoded key in low-level storage engine. -type EncodedKey []byte - -// Cmp returns the comparison result of two key. -// The result will be 0 if a==b, -1 if a < b, and +1 if a > b. -func (k EncodedKey) Cmp(another EncodedKey) int { - return bytes.Compare(k, another) -} - -// Next returns the next key in byte-order. -func (k EncodedKey) Next() EncodedKey { - return EncodedKey(bytes.Join([][]byte{k, Key{0}}, nil)) -} - -// VersionProvider provides increasing IDs. -type VersionProvider interface { - CurrentVersion() (Version, error) -} - -// Version is the wrapper of KV's version. -type Version struct { - Ver uint64 -} - -var ( - // MaxVersion is the maximum version, notice that it's not a valid version. - MaxVersion = Version{Ver: math.MaxUint64} - // MinVersion is the minimum version, it's not a valid version, too. - MinVersion = Version{Ver: 0} -) - -// NewVersion creates a new Version struct. -func NewVersion(v uint64) Version { - return Version{ - Ver: v, - } -} - -// Cmp returns the comparison result of two versions. -// The result will be 0 if a==b, -1 if a < b, and +1 if a > b. -func (v Version) Cmp(another Version) int { - if v.Ver > another.Ver { - return 1 - } else if v.Ver < another.Ver { - return -1 - } - return 0 -} - -// DecodeFn is a function that decodes data after fetching from store. -type DecodeFn func(raw interface{}) (interface{}, error) - -// EncodeFn is a function that encodes data before putting into store. -type EncodeFn func(raw interface{}) (interface{}, error) - -// ErrNotCommitted is the error returned by CommitVersion when this -// transaction is not committed. -var ErrNotCommitted = errors.New("this transaction is not committed") - -// Option is used for customizing kv store's behaviors during a transaction. -type Option int - -// Options is an interface of a set of options. Each option is associated with a value. -type Options interface { - // Get gets an option value. - Get(opt Option) (v interface{}, ok bool) -} - const ( // RangePrefetchOnCacheMiss directives that when dealing with a Get operation but failing to read data from cache, // it will launch a RangePrefetch to underlying storage instead of Get. The range starts from requested key and @@ -152,33 +60,6 @@ type MemBuffer interface { Release() } -// UnionStore is a store that wraps a snapshot for read and a BufferStore for buffered write. -// Also, it provides some transaction related utilities. -type UnionStore interface { - MemBuffer - // Inc increases the value for key k in KV storage by step. - Inc(k Key, step int64) (int64, error) - // GetInt64 get int64 which created by Inc method. - GetInt64(k Key) (int64, error) - // CheckLazyConditionPairs loads all lazy values from store then checks if all values are matched. - // Lazy condition pairs should be checked before transaction commit. - CheckLazyConditionPairs() error - // BatchPrefetch fetches values from KV storage to cache for later use. - BatchPrefetch(keys []Key) error - // RangePrefetch fetches values in the range [start, end] from KV storage - // to cache for later use. Maximum number of values is up to limit. - RangePrefetch(start, end Key, limit int) error - // WalkBuffer iterates all buffered kv pairs. - WalkBuffer(f func(k Key, v []byte) error) error - // SetOption sets an option with a value, when val is nil, uses the default - // value of this option. - SetOption(opt Option, val interface{}) - // DelOption deletes an option. - DelOption(opt Option) - // ReleaseSnapshot releases underlying snapshot. - ReleaseSnapshot() -} - // Transaction defines the interface for operations inside a Transaction. // This is not thread safe. type Transaction interface { diff --git a/kv/txn.go b/kv/txn.go index 2bf483970d..197a6fc5c5 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -16,33 +16,12 @@ package kv import ( "math" "math/rand" - "strings" "time" "github.com/juju/errors" "github.com/ngaut/log" - "github.com/pingcap/go-themis" - "github.com/pingcap/tidb/terror" ) -// IsRetryableError checks if the err is a fatal error and the under going operation is worth to retry. -func IsRetryableError(err error) bool { - if err == nil { - return false - } - - if terror.ErrorEqual(err, ErrRetryable) || - terror.ErrorEqual(err, ErrLockConflict) || - terror.ErrorEqual(err, ErrConditionNotMatch) || - terror.ErrorEqual(err, themis.ErrRetryable) || - // HBase exception message will tell you if you should retry or not - strings.Contains(err.Error(), "try again later") { - return true - } - - return false -} - // RunInNewTxn will run the f in a new transaction environment. func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) error { for i := 0; i < maxRetryCnt; i++ { diff --git a/kv/union_store.go b/kv/union_store.go index 7e11b13610..d286e5786c 100644 --- a/kv/union_store.go +++ b/kv/union_store.go @@ -19,24 +19,50 @@ import ( "github.com/juju/errors" "github.com/ngaut/pool" - "github.com/pingcap/tidb/terror" ) +// UnionStore is a store that wraps a snapshot for read and a BufferStore for buffered write. +// Also, it provides some transaction related utilities. +type UnionStore interface { + MemBuffer + // Inc increases the value for key k in KV storage by step. + Inc(k Key, step int64) (int64, error) + // GetInt64 get int64 which created by Inc method. + GetInt64(k Key) (int64, error) + // CheckLazyConditionPairs loads all lazy values from store then checks if all values are matched. + // Lazy condition pairs should be checked before transaction commit. + CheckLazyConditionPairs() error + // BatchPrefetch fetches values from KV storage to cache for later use. + BatchPrefetch(keys []Key) error + // RangePrefetch fetches values in the range [start, end] from KV storage + // to cache for later use. Maximum number of values is up to limit. + RangePrefetch(start, end Key, limit int) error + // WalkBuffer iterates all buffered kv pairs. + WalkBuffer(f func(k Key, v []byte) error) error + // SetOption sets an option with a value, when val is nil, uses the default + // value of this option. + SetOption(opt Option, val interface{}) + // DelOption deletes an option. + DelOption(opt Option) + // ReleaseSnapshot releases underlying snapshot. + ReleaseSnapshot() +} + +// Option is used for customizing kv store's behaviors during a transaction. +type Option int + +// Options is an interface of a set of options. Each option is associated with a value. +type Options interface { + // Get gets an option value. + Get(opt Option) (v interface{}, ok bool) +} + var ( p = pool.NewCache("memdb pool", 100, func() interface{} { return NewMemDbBuffer() }) ) -// IsErrNotFound checks if err is a kind of NotFound error. -func IsErrNotFound(err error) bool { - if terror.ErrorEqual(err, ErrNotExist) { - return true - } - - return false -} - // UnionStore is an in-memory Store which contains a buffer for write and a // snapshot for read. type unionStore struct { diff --git a/kv/version.go b/kv/version.go new file mode 100644 index 0000000000..f009215863 --- /dev/null +++ b/kv/version.go @@ -0,0 +1,51 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import "math" + +// VersionProvider provides increasing IDs. +type VersionProvider interface { + CurrentVersion() (Version, error) +} + +// Version is the wrapper of KV's version. +type Version struct { + Ver uint64 +} + +var ( + // MaxVersion is the maximum version, notice that it's not a valid version. + MaxVersion = Version{Ver: math.MaxUint64} + // MinVersion is the minimum version, it's not a valid version, too. + MinVersion = Version{Ver: 0} +) + +// NewVersion creates a new Version struct. +func NewVersion(v uint64) Version { + return Version{ + Ver: v, + } +} + +// Cmp returns the comparison result of two versions. +// The result will be 0 if a==b, -1 if a < b, and +1 if a > b. +func (v Version) Cmp(another Version) int { + if v.Ver > another.Ver { + return 1 + } else if v.Ver < another.Ver { + return -1 + } + return 0 +} diff --git a/store/localstore/compactor.go b/store/localstore/compactor.go index 2a70276261..97698fd831 100644 --- a/store/localstore/compactor.go +++ b/store/localstore/compactor.go @@ -25,13 +25,23 @@ import ( "github.com/pingcap/tidb/util/bytes" ) -var _ kv.Compactor = (*localstoreCompactor)(nil) - const ( deleteWorkerCnt = 3 ) -var localCompactDefaultPolicy = kv.CompactPolicy{ +// compactPolicy defines gc policy of MVCC storage. +type compactPolicy struct { + // SafePoint specifies + SafePoint int + // TriggerInterval specifies how often should the compactor + // scans outdated data. + TriggerInterval time.Duration + // BatchDeleteCnt specifies the batch size for + // deleting outdated data transaction. + BatchDeleteCnt int +} + +var localCompactDefaultPolicy = compactPolicy{ SafePoint: 20 * 1000, // in ms TriggerInterval: 10 * time.Second, BatchDeleteCnt: 100, @@ -45,7 +55,7 @@ type localstoreCompactor struct { workerWaitGroup *sync.WaitGroup ticker *time.Ticker db engine.DB - policy kv.CompactPolicy + policy compactPolicy } func (gc *localstoreCompactor) OnSet(k kv.Key) { @@ -196,7 +206,7 @@ func (gc *localstoreCompactor) Stop() { gc.workerWaitGroup.Wait() } -func newLocalCompactor(policy kv.CompactPolicy, db engine.DB) *localstoreCompactor { +func newLocalCompactor(policy compactPolicy, db engine.DB) *localstoreCompactor { return &localstoreCompactor{ recentKeys: make(map[string]struct{}), stopCh: make(chan struct{}), diff --git a/store/localstore/compactor_test.go b/store/localstore/compactor_test.go index 0f4da22d13..b3bf436b6f 100644 --- a/store/localstore/compactor_test.go +++ b/store/localstore/compactor_test.go @@ -46,7 +46,7 @@ func (s *localstoreCompactorTestSuite) TestCompactor(c *C) { db := store.(*dbStore).db store.(*dbStore).compactor.Stop() - policy := kv.CompactPolicy{ + policy := compactPolicy{ SafePoint: 500, BatchDeleteCnt: 1, TriggerInterval: 100 * time.Millisecond, @@ -119,7 +119,7 @@ func (s *localstoreCompactorTestSuite) TestStartStop(c *C) { db := store.(*dbStore).db for i := 0; i < 10000; i++ { - policy := kv.CompactPolicy{ + policy := compactPolicy{ SafePoint: 500, BatchDeleteCnt: 1, TriggerInterval: 100 * time.Millisecond,