From 39b1fda5565dc2bc18f77f36d788da076125e59f Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 12 Jun 2017 18:03:15 +0800 Subject: [PATCH] ddl: Revoke the session when the DDL will close (#3454) (#3461) * ddl: Revoke the session when the DDL will close (#3454) --- ddl/ddl.go | 2 +- ddl/owner_manager.go | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index e48fd45322..49af8da260 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -334,11 +334,11 @@ func (d *ddl) close() { } close(d.quitCh) + d.ownerManager.Cancel() err := d.schemaSyncer.RemoveSelfVersionPath() if err != nil { log.Errorf("[ddl] remove self version path failed %v", err) } - d.ownerManager.Cancel() d.wait.Wait() log.Infof("close DDL:%s", d.uuid) diff --git a/ddl/owner_manager.go b/ddl/owner_manager.go index 4f34a16445..e07d8921bf 100644 --- a/ddl/owner_manager.go +++ b/ddl/owner_manager.go @@ -15,6 +15,8 @@ package ddl import ( "math" + "os" + "strconv" "sync/atomic" "time" @@ -112,6 +114,20 @@ func (m *ownerManager) SetBgOwner(isOwner bool) { // ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing. var ManagerSessionTTL = 60 +// setManagerSessionTTL sets the ManagerSessionTTL value, it's used for testing. +func setManagerSessionTTL() error { + ttlStr := os.Getenv("tidb_manager_ttl") + if len(ttlStr) == 0 { + return nil + } + ttl, err := strconv.Atoi(ttlStr) + if err != nil { + return errors.Trace(err) + } + ManagerSessionTTL = ttl + return nil +} + func newSession(ctx goctx.Context, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error) { var err error var etcdSession *concurrency.Session @@ -162,8 +178,13 @@ func (m *ownerManager) campaignLoop(ctx goctx.Context, etcdSession *concurrency. return } case <-ctx.Done(): - log.Infof("[ddl] break %s campaign loop", key) - return + // Revoke the session lease. + // If revoke takes longer than the ttl, lease is expired anyway. + ctx, cancel := goctx.WithTimeout(goctx.Background(), + time.Duration(ManagerSessionTTL)*time.Second) + _, err = m.etcdCli.Revoke(ctx, etcdSession.Lease()) + cancel() + log.Infof("[ddl] break %s campaign loop err %v", key, err) default: } @@ -240,3 +261,10 @@ func (m *ownerManager) watchOwner(ctx goctx.Context, etcdSession *concurrency.Se } } } + +func init() { + err := setManagerSessionTTL() + if err != nil { + log.Warnf("[ddl] set manager session TTL failed %v", err) + } +}