store: add open store retry
This commit is contained in:
@ -14,6 +14,8 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/ngaut/log"
|
||||
"github.com/pingcap/tidb/terror"
|
||||
@ -27,7 +29,9 @@ func IsRetryableError(err error) bool {
|
||||
|
||||
if terror.ErrorEqual(err, ErrRetryable) ||
|
||||
terror.ErrorEqual(err, ErrLockConflict) ||
|
||||
terror.ErrorEqual(err, ErrConditionNotMatch) {
|
||||
terror.ErrorEqual(err, ErrConditionNotMatch) ||
|
||||
// HBase exception message will tell you if you should retry or not
|
||||
strings.Contains(err.Error(), "try again later") {
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
18
tidb.go
18
tidb.go
@ -52,6 +52,8 @@ const (
|
||||
EngineGoLevelDBPersistent = "goleveldb://"
|
||||
EngineBoltDB = "boltdb://"
|
||||
EngineHBase = "hbase://"
|
||||
defaultMaxRetries = 30
|
||||
retrySleepInterval = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
type domainMap struct {
|
||||
@ -285,6 +287,10 @@ func RegisterLocalStore(name string, driver engine.Driver) error {
|
||||
// Engine is the storage name registered with RegisterStore.
|
||||
// Schema is the storage specific format.
|
||||
func NewStore(uri string) (kv.Storage, error) {
|
||||
return newStoreWithRetry(uri, defaultMaxRetries)
|
||||
}
|
||||
|
||||
func newStoreWithRetry(uri string, maxRetries int) (kv.Storage, error) {
|
||||
pos := strings.Index(uri, "://")
|
||||
if pos == -1 {
|
||||
return nil, errors.Errorf("invalid uri format, must engine://schema")
|
||||
@ -298,7 +304,17 @@ func NewStore(uri string) (kv.Storage, error) {
|
||||
return nil, errors.Errorf("invalid uri foramt, storage %s is not registered", name)
|
||||
}
|
||||
|
||||
s, err := d.Open(schema)
|
||||
var err error
|
||||
var s kv.Storage
|
||||
for i := 1; i <= maxRetries; i++ {
|
||||
s, err = d.Open(schema)
|
||||
if err == nil || !kv.IsRetryableError(err) {
|
||||
break
|
||||
}
|
||||
sleepTime := time.Duration(uint64(retrySleepInterval) * uint64(i))
|
||||
log.Errorf("Waiting store to get ready, sleep %v and try again...", sleepTime)
|
||||
time.Sleep(sleepTime)
|
||||
}
|
||||
return s, errors.Trace(err)
|
||||
}
|
||||
|
||||
|
||||
16
tidb_test.go
16
tidb_test.go
@ -15,6 +15,7 @@ package tidb
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
@ -48,6 +49,12 @@ type testMainSuite struct {
|
||||
selectSQL string
|
||||
}
|
||||
|
||||
type brokenStore struct{}
|
||||
|
||||
func (s *brokenStore) Open(schema string) (kv.Storage, error) {
|
||||
return nil, errors.New("try again later")
|
||||
}
|
||||
|
||||
func (s *testMainSuite) SetUpSuite(c *C) {
|
||||
s.dbName = "test_main_db"
|
||||
s.createDBSQL = fmt.Sprintf("create database if not exists %s;", s.dbName)
|
||||
@ -306,6 +313,15 @@ func (s *testMainSuite) TestitrimSQL(c *C) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testMainSuite) TestRetryOpenStore(c *C) {
|
||||
begin := time.Now()
|
||||
RegisterStore("dummy", &brokenStore{})
|
||||
_, err := newStoreWithRetry("dummy://dummy-store", 3)
|
||||
c.Assert(err, NotNil)
|
||||
elapse := time.Since(begin)
|
||||
c.Assert(uint64(elapse), Greater, uint64(2*time.Second))
|
||||
}
|
||||
|
||||
func sessionExec(c *C, se Session, sql string) ([]rset.Recordset, error) {
|
||||
se.Execute("BEGIN;")
|
||||
r, err := se.Execute(sql)
|
||||
|
||||
Reference in New Issue
Block a user