ddl: add check owner test
This commit is contained in:
@ -89,6 +89,10 @@ func fakeOnDDLChange(err error) error {
|
||||
|
||||
// NewDDL creates a new DDL.
|
||||
func NewDDL(store kv.Storage, infoHandle *infoschema.Handle, hook OnDDLChange, lease time.Duration) DDL {
|
||||
return newDDL(store, infoHandle, hook, lease)
|
||||
}
|
||||
|
||||
func newDDL(store kv.Storage, infoHandle *infoschema.Handle, hook OnDDLChange, lease time.Duration) *ddl {
|
||||
if hook == nil {
|
||||
hook = fakeOnDDLChange
|
||||
}
|
||||
|
||||
@ -107,12 +107,14 @@ func (d *ddl) runReorgJob(f func() error) error {
|
||||
}()
|
||||
}
|
||||
|
||||
waitTimeout := chooseLeaseTime(d.lease, waitReorgTimeout)
|
||||
|
||||
// wait reorgnization job done or timeout
|
||||
select {
|
||||
case err := <-d.reOrgDoneCh:
|
||||
d.reOrgDoneCh = nil
|
||||
return errors.Trace(err)
|
||||
case <-time.After(waitReorgTimeout):
|
||||
case <-time.After(waitTimeout):
|
||||
// if timeout, we will return, check the owner and retry wait job done again.
|
||||
return errWaitReorgTimeout
|
||||
case <-d.quitCh:
|
||||
|
||||
@ -96,10 +96,10 @@ func asyncNotify(ch chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ddl) verifyOwner(t *meta.TMeta) error {
|
||||
func (d *ddl) checkOwner(t *meta.TMeta) (*model.Owner, error) {
|
||||
owner, err := t.GetDDLOwner()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
if owner == nil {
|
||||
@ -108,28 +108,27 @@ func (d *ddl) verifyOwner(t *meta.TMeta) error {
|
||||
owner.OwnerID = d.uuid
|
||||
}
|
||||
|
||||
now := time.Now().Unix()
|
||||
now := time.Now().UnixNano()
|
||||
// we must wait 2 * lease time to guarantee other servers update the schema,
|
||||
// the owner will update its owner status every 2 * lease time, so here we use
|
||||
// 4 * lease to check its timeout.
|
||||
maxTimeout := int64(4 * d.lease / time.Second)
|
||||
maxTimeout := int64(4 * d.lease)
|
||||
if owner.OwnerID == d.uuid || now-owner.LastUpdateTS > maxTimeout {
|
||||
owner.OwnerID = d.uuid
|
||||
owner.LastUpdateTS = now
|
||||
|
||||
// update or try to set itself as owner.
|
||||
// update status.
|
||||
if err = t.SetDDLOwner(owner); err != nil {
|
||||
return errors.Trace(err)
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
log.Debugf("become owner %s", owner.OwnerID)
|
||||
}
|
||||
|
||||
if owner.OwnerID != d.uuid {
|
||||
return errors.Trace(ErrNotOwner)
|
||||
log.Debugf("not owner, owner is %s", owner.OwnerID)
|
||||
return nil, errors.Trace(ErrNotOwner)
|
||||
}
|
||||
|
||||
return nil
|
||||
return owner, nil
|
||||
}
|
||||
|
||||
func (d *ddl) getFirstJob(t *meta.TMeta) (*model.Job, error) {
|
||||
@ -166,7 +165,7 @@ func (d *ddl) handleJobQueue() error {
|
||||
|
||||
var job *model.Job
|
||||
err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
|
||||
err := d.verifyOwner(t)
|
||||
owner, err := d.checkOwner(t)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -193,6 +192,16 @@ func (d *ddl) handleJobQueue() error {
|
||||
err = d.updateJob(t, job)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// running job may cost some time, so here we must update owner status to
|
||||
// prevent other become the owner.
|
||||
owner.LastUpdateTS = time.Now().UnixNano()
|
||||
if err = t.SetDDLOwner(owner); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return errors.Trace(err)
|
||||
})
|
||||
|
||||
@ -212,6 +221,14 @@ func (d *ddl) handleJobQueue() error {
|
||||
}
|
||||
}
|
||||
|
||||
func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration {
|
||||
if n1 > 0 {
|
||||
return n1
|
||||
}
|
||||
|
||||
return n2
|
||||
}
|
||||
|
||||
// onWorker is for async online schema change, it will try to become the owner first,
|
||||
// then wait or pull the job queue to handle a schema change job.
|
||||
func (d *ddl) onWorker() {
|
||||
@ -219,10 +236,7 @@ func (d *ddl) onWorker() {
|
||||
|
||||
// we use 4 * lease time to check owner's timeout, so here, we will update owner's status
|
||||
// every 2 * lease time, if lease is 0, we will use default 10s.
|
||||
checkTime := 2 * d.lease
|
||||
if checkTime == 0 {
|
||||
checkTime = 10 * time.Second
|
||||
}
|
||||
checkTime := chooseLeaseTime(2*d.lease, 10*time.Second)
|
||||
|
||||
ticker := time.NewTicker(checkTime)
|
||||
defer ticker.Stop()
|
||||
|
||||
73
ddl/worker_test.go
Normal file
73
ddl/worker_test.go
Normal file
@ -0,0 +1,73 @@
|
||||
// Copyright 2015 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package ddl
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/meta"
|
||||
"github.com/pingcap/tidb/store/localstore"
|
||||
"github.com/pingcap/tidb/store/localstore/goleveldb"
|
||||
"github.com/pingcap/tidb/util/errors2"
|
||||
)
|
||||
|
||||
var _ = Suite(&testDDLSuite{})
|
||||
|
||||
func createTestStore(c *C, name string) kv.Storage {
|
||||
driver := localstore.Driver{Driver: goleveldb.MemoryDriver{}}
|
||||
store, err := driver.Open(fmt.Sprintf("memory:%s", name))
|
||||
c.Assert(err, IsNil)
|
||||
return store
|
||||
}
|
||||
|
||||
type testDDLSuite struct {
|
||||
}
|
||||
|
||||
func (s *testDDLSuite) TestCheckOnwer(c *C) {
|
||||
store := createTestStore(c, "test_owner")
|
||||
defer store.Close()
|
||||
|
||||
lease := 100 * time.Millisecond
|
||||
d1 := newDDL(store, nil, nil, lease)
|
||||
|
||||
time.Sleep(lease)
|
||||
|
||||
err := d1.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
|
||||
_, err1 := d1.checkOwner(t)
|
||||
return err1
|
||||
})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
d2 := newDDL(store, nil, nil, lease)
|
||||
err = d2.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
|
||||
_, err1 := d2.checkOwner(t)
|
||||
return err1
|
||||
})
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(errors2.ErrorEqual(err, ErrNotOwner), IsTrue)
|
||||
|
||||
d1.close()
|
||||
|
||||
time.Sleep(6 * lease)
|
||||
|
||||
err = d2.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
|
||||
_, err1 := d2.checkOwner(t)
|
||||
return err1
|
||||
})
|
||||
c.Assert(err, IsNil)
|
||||
d2.close()
|
||||
}
|
||||
Reference in New Issue
Block a user