diff --git a/domain/domain.go b/domain/domain.go index 109ef73ce9..3aa537b6a2 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -142,45 +142,59 @@ func (do *Domain) tryReload() { } } +const minReloadTimeout = 20 * time.Second + func (do *Domain) reload() error { // lock here for only once at same time. do.m.Lock() defer do.m.Unlock() - err := kv.RunInNewTxn(do.store, false, do.loadInfoSchema) - if err != nil { - return errors.Trace(err) + timeout := do.ddl.GetLease() / 2 + if timeout < minReloadTimeout { + timeout = minReloadTimeout } - atomic.StoreInt64(&do.lastLeaseTS, time.Now().UnixNano()) - return nil + done := make(chan error, 1) + go func() { + var err error + + for { + err = kv.RunInNewTxn(do.store, false, do.loadInfoSchema) + // if err is db closed, we will return it directly, otherwise, we will + // check reloading again. + if terror.ErrorEqual(err, localstore.ErrDBClosed) { + break + } + + if err != nil { + // TODO: use a backoff algorithm. + time.Sleep(500 * time.Millisecond) + continue + } + + atomic.StoreInt64(&do.lastLeaseTS, time.Now().UnixNano()) + break + } + + done <- err + }() + + select { + case err := <-done: + return errors.Trace(err) + case <-time.After(timeout): + return errors.Errorf("reload schema timeout") + } } func (do *Domain) mustReload() { // if reload error, we will terminate whole program to guarantee data safe. - // TODO: retry some times if reload error. err := do.reload() if err != nil { - log.Fatalf("reload schema err %v", err) + log.Fatalf("reload schema err %v", errors.ErrorStack(err)) } } -const maxReloadTimeout = 60 * time.Second -const minReloadTimeout = 20 * time.Second - -func getReloadTimeout(lease time.Duration) time.Duration { - timeout := lease / 4 - - if timeout >= maxReloadTimeout { - return maxReloadTimeout - } - if timeout <= minReloadTimeout { - return minReloadTimeout - } - - return timeout -} - // check schema every 300 seconds default. const defaultLoadTime = 300 * time.Second @@ -192,26 +206,16 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) { ticker := time.NewTicker(lease) defer ticker.Stop() - reloadTimeout := getReloadTimeout(lease) - reloadErrCh := make(chan error, 1) - for { select { case <-ticker.C: - go func() { - reloadErrCh <- do.reload() - }() - select { - case err := <-reloadErrCh: - // we may close store in test, but the domain load schema loop is still checking, - // so we can't panic for ErrDBClosed and just return here. - if terror.ErrorEqual(err, localstore.ErrDBClosed) { - return - } else if err != nil { - log.Fatalf("reload schema err %v", errors.ErrorStack(err)) - } - case <-time.After(reloadTimeout): - log.Fatalf("reload schema timeout:%d", reloadTimeout) + err := do.reload() + // we may close store in test, but the domain load schema loop is still checking, + // so we can't panic for ErrDBClosed and just return here. + if terror.ErrorEqual(err, localstore.ErrDBClosed) { + return + } else if err != nil { + log.Fatalf("reload schema err %v", errors.ErrorStack(err)) } case newLease := <-do.leaseCh: if newLease <= 0 { @@ -224,7 +228,6 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) { } lease = newLease - reloadTimeout = getReloadTimeout(lease) // reset ticker too. ticker.Stop() ticker = time.NewTicker(lease) diff --git a/domain/domain_test.go b/domain/domain_test.go index 06f7addc2b..ddfe9023a7 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -14,11 +14,13 @@ package domain import ( + "sync/atomic" "testing" "time" . "github.com/pingcap/check" "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/localstore" "github.com/pingcap/tidb/store/localstore/goleveldb" "github.com/pingcap/tidb/util/mock" @@ -59,16 +61,13 @@ func (*testSuite) TestT(c *C) { c.Assert(err, IsNil) c.Assert(m[ddlLastReloadSchemaTS], GreaterEqual, int64(0)) - dom.SetLease(50 * time.Millisecond) + c.Assert(dom.GetScope("dummy_status"), Equals, variable.DefaultScopeFlag) + + dom.SetLease(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) + atomic.StoreInt64(&dom.lastLeaseTS, 0) + dom.tryReload() + store.Close() time.Sleep(1 * time.Second) } - -func (*testSuite) TestGetReloadTimeout(c *C) { - timeout := getReloadTimeout(241 * time.Second) - c.Assert(timeout, Equals, maxReloadTimeout) - timeout = getReloadTimeout(76 * time.Second) - c.Assert(timeout, Equals, minReloadTimeout) - timeout = getReloadTimeout(120 * time.Second) - c.Assert(timeout, Equals, 30*time.Second) -}