From 65de11dd7201ce108e696f5e4b3f0309aa63fb09 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 29 Oct 2015 13:18:45 +0800 Subject: [PATCH] ddl: add check owner test --- ddl/ddl.go | 4 +++ ddl/reorg.go | 4 ++- ddl/worker.go | 44 ++++++++++++++++++---------- ddl/worker_test.go | 73 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 109 insertions(+), 16 deletions(-) create mode 100644 ddl/worker_test.go diff --git a/ddl/ddl.go b/ddl/ddl.go index 9d00da5764..13f25786e6 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -89,6 +89,10 @@ func fakeOnDDLChange(err error) error { // NewDDL creates a new DDL. func NewDDL(store kv.Storage, infoHandle *infoschema.Handle, hook OnDDLChange, lease time.Duration) DDL { + return newDDL(store, infoHandle, hook, lease) +} + +func newDDL(store kv.Storage, infoHandle *infoschema.Handle, hook OnDDLChange, lease time.Duration) *ddl { if hook == nil { hook = fakeOnDDLChange } diff --git a/ddl/reorg.go b/ddl/reorg.go index 8a195e025c..763c1b469c 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -107,12 +107,14 @@ func (d *ddl) runReorgJob(f func() error) error { }() } + waitTimeout := chooseLeaseTime(d.lease, waitReorgTimeout) + // wait reorgnization job done or timeout select { case err := <-d.reOrgDoneCh: d.reOrgDoneCh = nil return errors.Trace(err) - case <-time.After(waitReorgTimeout): + case <-time.After(waitTimeout): // if timeout, we will return, check the owner and retry wait job done again. return errWaitReorgTimeout case <-d.quitCh: diff --git a/ddl/worker.go b/ddl/worker.go index 145551ec53..fcef780632 100644 --- a/ddl/worker.go +++ b/ddl/worker.go @@ -96,10 +96,10 @@ func asyncNotify(ch chan struct{}) { } } -func (d *ddl) verifyOwner(t *meta.TMeta) error { +func (d *ddl) checkOwner(t *meta.TMeta) (*model.Owner, error) { owner, err := t.GetDDLOwner() if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } if owner == nil { @@ -108,28 +108,27 @@ func (d *ddl) verifyOwner(t *meta.TMeta) error { owner.OwnerID = d.uuid } - now := time.Now().Unix() + now := time.Now().UnixNano() // we must wait 2 * lease time to guarantee other servers update the schema, // the owner will update its owner status every 2 * lease time, so here we use // 4 * lease to check its timeout. - maxTimeout := int64(4 * d.lease / time.Second) + maxTimeout := int64(4 * d.lease) if owner.OwnerID == d.uuid || now-owner.LastUpdateTS > maxTimeout { owner.OwnerID = d.uuid owner.LastUpdateTS = now - - // update or try to set itself as owner. + // update status. if err = t.SetDDLOwner(owner); err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } - log.Debugf("become owner %s", owner.OwnerID) } if owner.OwnerID != d.uuid { - return errors.Trace(ErrNotOwner) + log.Debugf("not owner, owner is %s", owner.OwnerID) + return nil, errors.Trace(ErrNotOwner) } - return nil + return owner, nil } func (d *ddl) getFirstJob(t *meta.TMeta) (*model.Job, error) { @@ -166,7 +165,7 @@ func (d *ddl) handleJobQueue() error { var job *model.Job err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error { - err := d.verifyOwner(t) + owner, err := d.checkOwner(t) if err != nil { return errors.Trace(err) } @@ -193,6 +192,16 @@ func (d *ddl) handleJobQueue() error { err = d.updateJob(t, job) } + if err != nil { + return errors.Trace(err) + } + + // running job may cost some time, so here we must update owner status to + // prevent other become the owner. + owner.LastUpdateTS = time.Now().UnixNano() + if err = t.SetDDLOwner(owner); err != nil { + return errors.Trace(err) + } return errors.Trace(err) }) @@ -212,6 +221,14 @@ func (d *ddl) handleJobQueue() error { } } +func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration { + if n1 > 0 { + return n1 + } + + return n2 +} + // onWorker is for async online schema change, it will try to become the owner first, // then wait or pull the job queue to handle a schema change job. func (d *ddl) onWorker() { @@ -219,10 +236,7 @@ func (d *ddl) onWorker() { // we use 4 * lease time to check owner's timeout, so here, we will update owner's status // every 2 * lease time, if lease is 0, we will use default 10s. - checkTime := 2 * d.lease - if checkTime == 0 { - checkTime = 10 * time.Second - } + checkTime := chooseLeaseTime(2*d.lease, 10*time.Second) ticker := time.NewTicker(checkTime) defer ticker.Stop() diff --git a/ddl/worker_test.go b/ddl/worker_test.go new file mode 100644 index 0000000000..92b268903d --- /dev/null +++ b/ddl/worker_test.go @@ -0,0 +1,73 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "fmt" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/store/localstore" + "github.com/pingcap/tidb/store/localstore/goleveldb" + "github.com/pingcap/tidb/util/errors2" +) + +var _ = Suite(&testDDLSuite{}) + +func createTestStore(c *C, name string) kv.Storage { + driver := localstore.Driver{Driver: goleveldb.MemoryDriver{}} + store, err := driver.Open(fmt.Sprintf("memory:%s", name)) + c.Assert(err, IsNil) + return store +} + +type testDDLSuite struct { +} + +func (s *testDDLSuite) TestCheckOnwer(c *C) { + store := createTestStore(c, "test_owner") + defer store.Close() + + lease := 100 * time.Millisecond + d1 := newDDL(store, nil, nil, lease) + + time.Sleep(lease) + + err := d1.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + _, err1 := d1.checkOwner(t) + return err1 + }) + c.Assert(err, IsNil) + + d2 := newDDL(store, nil, nil, lease) + err = d2.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + _, err1 := d2.checkOwner(t) + return err1 + }) + c.Assert(err, NotNil) + c.Assert(errors2.ErrorEqual(err, ErrNotOwner), IsTrue) + + d1.close() + + time.Sleep(6 * lease) + + err = d2.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + _, err1 := d2.checkOwner(t) + return err1 + }) + c.Assert(err, IsNil) + d2.close() +}