Merge pull request #748 from pingcap/ngaut/cleanup-kv

kv: Clean up kv.go
This commit is contained in:
goroutine
2015-12-18 13:45:40 +08:00
10 changed files with 226 additions and 219 deletions

View File

@ -1,40 +0,0 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import "time"
// CompactPolicy defines gc policy of MVCC storage.
type CompactPolicy struct {
// SafePoint specifies
SafePoint int
// TriggerInterval specifies how often should the compactor
// scans outdated data.
TriggerInterval time.Duration
// BatchDeleteCnt specifies the batch size for
// deleting outdated data transaction.
BatchDeleteCnt int
}
// Compactor compacts MVCC storage.
type Compactor interface {
// OnGet is the hook point on Txn.Get.
OnGet(k Key)
// OnSet is the hook point on Txn.Set.
OnSet(k Key)
// OnDelete is the hook point on Txn.Delete.
OnDelete(k Key)
// Compact is the function removes the given key.
Compact(k Key) error
}

75
kv/error.go Normal file
View File

@ -0,0 +1,75 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"errors"
"strings"
"github.com/pingcap/go-themis"
"github.com/pingcap/tidb/terror"
)
var (
// ErrClosed is used when close an already closed txn.
ErrClosed = errors.New("Error: Transaction already closed")
// ErrNotExist is used when try to get an entry with an unexist key from KV store.
ErrNotExist = errors.New("Error: key not exist")
// ErrKeyExists is used when try to put an entry to KV store.
ErrKeyExists = errors.New("Error: key already exist")
// ErrConditionNotMatch is used when condition is not met.
ErrConditionNotMatch = errors.New("Error: Condition not match")
// ErrLockConflict is used when try to lock an already locked key.
ErrLockConflict = errors.New("Error: Lock conflict")
// ErrLazyConditionPairsNotMatch is used when value in store differs from expect pairs.
ErrLazyConditionPairsNotMatch = errors.New("Error: Lazy condition pairs not match")
// ErrRetryable is used when KV store occurs RPC error or some other
// errors which SQL layer can safely retry.
ErrRetryable = errors.New("Error: KV error safe to retry")
// ErrCannotSetNilValue is the error when sets an empty value.
ErrCannotSetNilValue = errors.New("can not set nil value")
// ErrInvalidTxn is the error when commits or rollbacks in an invalid transaction.
ErrInvalidTxn = errors.New("invalid transaction")
// ErrNotCommitted is the error returned by CommitVersion when this
// transaction is not committed.
ErrNotCommitted = errors.New("this transaction has not committed")
)
// IsRetryableError checks if the err is a fatal error and the under going operation is worth to retry.
func IsRetryableError(err error) bool {
if err == nil {
return false
}
if terror.ErrorEqual(err, ErrRetryable) ||
terror.ErrorEqual(err, ErrLockConflict) ||
terror.ErrorEqual(err, ErrConditionNotMatch) ||
terror.ErrorEqual(err, themis.ErrRetryable) ||
// HBase exception message will tell you if you should retry or not
strings.Contains(err.Error(), "try again later") {
return true
}
return false
}
// IsErrNotFound checks if err is a kind of NotFound error.
func IsErrNotFound(err error) bool {
if terror.ErrorEqual(err, ErrNotExist) {
return true
}
return false
}

View File

@ -15,28 +15,6 @@ package kv
import "github.com/juju/errors"
var (
// ErrClosed is used when close an already closed txn.
ErrClosed = errors.New("Error: Transaction already closed")
// ErrNotExist is used when try to get an entry with an unexist key from KV store.
ErrNotExist = errors.New("Error: key not exist")
// ErrKeyExists is used when try to put an entry to KV store.
ErrKeyExists = errors.New("Error: key already exist")
// ErrConditionNotMatch is used when condition is not met.
ErrConditionNotMatch = errors.New("Error: Condition not match")
// ErrLockConflict is used when try to lock an already locked key.
ErrLockConflict = errors.New("Error: Lock conflict")
// ErrLazyConditionPairsNotMatch is used when value in store differs from expect pairs.
ErrLazyConditionPairsNotMatch = errors.New("Error: Lazy condition pairs not match")
// ErrRetryable is used when KV store occurs RPC error or some other
// errors which SQL layer can safely retry.
ErrRetryable = errors.New("Error: KV error safe to retry")
// ErrCannotSetNilValue is the error when sets an empty value.
ErrCannotSetNilValue = errors.New("can not set nil value")
// ErrInvalidTxn is the error when commits or rollbacks in an invalid transaction.
ErrInvalidTxn = errors.New("invalid transaction")
)
// NextUntil applies FnKeyCmp to each entry of the iterator until meets some condition.
// It will stop when fn returns true, or iterator is invalid or an error occurs.
func NextUntil(it Iterator, fn FnKeyCmp) error {

47
kv/key.go Normal file
View File

@ -0,0 +1,47 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import "bytes"
// Key represents high-level Key type.
type Key []byte
// Next returns the next key in byte-order.
func (k Key) Next() Key {
// add 0x0 to the end of key
buf := make([]byte, len([]byte(k))+1)
copy(buf, []byte(k))
return buf
}
// Cmp returns the comparison result of two key.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func (k Key) Cmp(another Key) int {
return bytes.Compare(k, another)
}
// EncodedKey represents encoded key in low-level storage engine.
type EncodedKey []byte
// Cmp returns the comparison result of two key.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func (k EncodedKey) Cmp(another EncodedKey) int {
return bytes.Compare(k, another)
}
// Next returns the next key in byte-order.
func (k EncodedKey) Next() EncodedKey {
return EncodedKey(bytes.Join([][]byte{k, Key{0}}, nil))
}

119
kv/kv.go
View File

@ -13,98 +13,6 @@
package kv
import (
"bytes"
"math"
"github.com/juju/errors"
)
// Key represents high-level Key type.
type Key []byte
// Next returns the next key in byte-order.
func (k Key) Next() Key {
// add 0x0 to the end of key
buf := make([]byte, len([]byte(k))+1)
copy(buf, []byte(k))
return buf
}
// Cmp returns the comparison result of two key.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func (k Key) Cmp(another Key) int {
return bytes.Compare(k, another)
}
// EncodedKey represents encoded key in low-level storage engine.
type EncodedKey []byte
// Cmp returns the comparison result of two key.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func (k EncodedKey) Cmp(another EncodedKey) int {
return bytes.Compare(k, another)
}
// Next returns the next key in byte-order.
func (k EncodedKey) Next() EncodedKey {
return EncodedKey(bytes.Join([][]byte{k, Key{0}}, nil))
}
// VersionProvider provides increasing IDs.
type VersionProvider interface {
CurrentVersion() (Version, error)
}
// Version is the wrapper of KV's version.
type Version struct {
Ver uint64
}
var (
// MaxVersion is the maximum version, notice that it's not a valid version.
MaxVersion = Version{Ver: math.MaxUint64}
// MinVersion is the minimum version, it's not a valid version, too.
MinVersion = Version{Ver: 0}
)
// NewVersion creates a new Version struct.
func NewVersion(v uint64) Version {
return Version{
Ver: v,
}
}
// Cmp returns the comparison result of two versions.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func (v Version) Cmp(another Version) int {
if v.Ver > another.Ver {
return 1
} else if v.Ver < another.Ver {
return -1
}
return 0
}
// DecodeFn is a function that decodes data after fetching from store.
type DecodeFn func(raw interface{}) (interface{}, error)
// EncodeFn is a function that encodes data before putting into store.
type EncodeFn func(raw interface{}) (interface{}, error)
// ErrNotCommitted is the error returned by CommitVersion when this
// transaction is not committed.
var ErrNotCommitted = errors.New("this transaction is not committed")
// Option is used for customizing kv store's behaviors during a transaction.
type Option int
// Options is an interface of a set of options. Each option is associated with a value.
type Options interface {
// Get gets an option value.
Get(opt Option) (v interface{}, ok bool)
}
const (
// RangePrefetchOnCacheMiss directives that when dealing with a Get operation but failing to read data from cache,
// it will launch a RangePrefetch to underlying storage instead of Get. The range starts from requested key and
@ -152,33 +60,6 @@ type MemBuffer interface {
Release()
}
// UnionStore is a store that wraps a snapshot for read and a BufferStore for buffered write.
// Also, it provides some transaction related utilities.
type UnionStore interface {
MemBuffer
// Inc increases the value for key k in KV storage by step.
Inc(k Key, step int64) (int64, error)
// GetInt64 get int64 which created by Inc method.
GetInt64(k Key) (int64, error)
// CheckLazyConditionPairs loads all lazy values from store then checks if all values are matched.
// Lazy condition pairs should be checked before transaction commit.
CheckLazyConditionPairs() error
// BatchPrefetch fetches values from KV storage to cache for later use.
BatchPrefetch(keys []Key) error
// RangePrefetch fetches values in the range [start, end] from KV storage
// to cache for later use. Maximum number of values is up to limit.
RangePrefetch(start, end Key, limit int) error
// WalkBuffer iterates all buffered kv pairs.
WalkBuffer(f func(k Key, v []byte) error) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
// DelOption deletes an option.
DelOption(opt Option)
// ReleaseSnapshot releases underlying snapshot.
ReleaseSnapshot()
}
// Transaction defines the interface for operations inside a Transaction.
// This is not thread safe.
type Transaction interface {

View File

@ -16,33 +16,12 @@ package kv
import (
"math"
"math/rand"
"strings"
"time"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/go-themis"
"github.com/pingcap/tidb/terror"
)
// IsRetryableError checks if the err is a fatal error and the under going operation is worth to retry.
func IsRetryableError(err error) bool {
if err == nil {
return false
}
if terror.ErrorEqual(err, ErrRetryable) ||
terror.ErrorEqual(err, ErrLockConflict) ||
terror.ErrorEqual(err, ErrConditionNotMatch) ||
terror.ErrorEqual(err, themis.ErrRetryable) ||
// HBase exception message will tell you if you should retry or not
strings.Contains(err.Error(), "try again later") {
return true
}
return false
}
// RunInNewTxn will run the f in a new transaction environment.
func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) error {
for i := 0; i < maxRetryCnt; i++ {

View File

@ -19,24 +19,50 @@ import (
"github.com/juju/errors"
"github.com/ngaut/pool"
"github.com/pingcap/tidb/terror"
)
// UnionStore is a store that wraps a snapshot for read and a BufferStore for buffered write.
// Also, it provides some transaction related utilities.
type UnionStore interface {
MemBuffer
// Inc increases the value for key k in KV storage by step.
Inc(k Key, step int64) (int64, error)
// GetInt64 get int64 which created by Inc method.
GetInt64(k Key) (int64, error)
// CheckLazyConditionPairs loads all lazy values from store then checks if all values are matched.
// Lazy condition pairs should be checked before transaction commit.
CheckLazyConditionPairs() error
// BatchPrefetch fetches values from KV storage to cache for later use.
BatchPrefetch(keys []Key) error
// RangePrefetch fetches values in the range [start, end] from KV storage
// to cache for later use. Maximum number of values is up to limit.
RangePrefetch(start, end Key, limit int) error
// WalkBuffer iterates all buffered kv pairs.
WalkBuffer(f func(k Key, v []byte) error) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
// DelOption deletes an option.
DelOption(opt Option)
// ReleaseSnapshot releases underlying snapshot.
ReleaseSnapshot()
}
// Option is used for customizing kv store's behaviors during a transaction.
type Option int
// Options is an interface of a set of options. Each option is associated with a value.
type Options interface {
// Get gets an option value.
Get(opt Option) (v interface{}, ok bool)
}
var (
p = pool.NewCache("memdb pool", 100, func() interface{} {
return NewMemDbBuffer()
})
)
// IsErrNotFound checks if err is a kind of NotFound error.
func IsErrNotFound(err error) bool {
if terror.ErrorEqual(err, ErrNotExist) {
return true
}
return false
}
// UnionStore is an in-memory Store which contains a buffer for write and a
// snapshot for read.
type unionStore struct {

51
kv/version.go Normal file
View File

@ -0,0 +1,51 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import "math"
// VersionProvider provides increasing IDs.
type VersionProvider interface {
CurrentVersion() (Version, error)
}
// Version is the wrapper of KV's version.
type Version struct {
Ver uint64
}
var (
// MaxVersion is the maximum version, notice that it's not a valid version.
MaxVersion = Version{Ver: math.MaxUint64}
// MinVersion is the minimum version, it's not a valid version, too.
MinVersion = Version{Ver: 0}
)
// NewVersion creates a new Version struct.
func NewVersion(v uint64) Version {
return Version{
Ver: v,
}
}
// Cmp returns the comparison result of two versions.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func (v Version) Cmp(another Version) int {
if v.Ver > another.Ver {
return 1
} else if v.Ver < another.Ver {
return -1
}
return 0
}

View File

@ -25,13 +25,23 @@ import (
"github.com/pingcap/tidb/util/bytes"
)
var _ kv.Compactor = (*localstoreCompactor)(nil)
const (
deleteWorkerCnt = 3
)
var localCompactDefaultPolicy = kv.CompactPolicy{
// compactPolicy defines gc policy of MVCC storage.
type compactPolicy struct {
// SafePoint specifies
SafePoint int
// TriggerInterval specifies how often should the compactor
// scans outdated data.
TriggerInterval time.Duration
// BatchDeleteCnt specifies the batch size for
// deleting outdated data transaction.
BatchDeleteCnt int
}
var localCompactDefaultPolicy = compactPolicy{
SafePoint: 20 * 1000, // in ms
TriggerInterval: 10 * time.Second,
BatchDeleteCnt: 100,
@ -45,7 +55,7 @@ type localstoreCompactor struct {
workerWaitGroup *sync.WaitGroup
ticker *time.Ticker
db engine.DB
policy kv.CompactPolicy
policy compactPolicy
}
func (gc *localstoreCompactor) OnSet(k kv.Key) {
@ -196,7 +206,7 @@ func (gc *localstoreCompactor) Stop() {
gc.workerWaitGroup.Wait()
}
func newLocalCompactor(policy kv.CompactPolicy, db engine.DB) *localstoreCompactor {
func newLocalCompactor(policy compactPolicy, db engine.DB) *localstoreCompactor {
return &localstoreCompactor{
recentKeys: make(map[string]struct{}),
stopCh: make(chan struct{}),

View File

@ -46,7 +46,7 @@ func (s *localstoreCompactorTestSuite) TestCompactor(c *C) {
db := store.(*dbStore).db
store.(*dbStore).compactor.Stop()
policy := kv.CompactPolicy{
policy := compactPolicy{
SafePoint: 500,
BatchDeleteCnt: 1,
TriggerInterval: 100 * time.Millisecond,
@ -119,7 +119,7 @@ func (s *localstoreCompactorTestSuite) TestStartStop(c *C) {
db := store.(*dbStore).db
for i := 0; i < 10000; i++ {
policy := kv.CompactPolicy{
policy := compactPolicy{
SafePoint: 500,
BatchDeleteCnt: 1,
TriggerInterval: 100 * time.Millisecond,