* ddl: Revoke the session when the DDL will close (#3454)
This commit is contained in:
@ -334,11 +334,11 @@ func (d *ddl) close() {
|
||||
}
|
||||
|
||||
close(d.quitCh)
|
||||
d.ownerManager.Cancel()
|
||||
err := d.schemaSyncer.RemoveSelfVersionPath()
|
||||
if err != nil {
|
||||
log.Errorf("[ddl] remove self version path failed %v", err)
|
||||
}
|
||||
d.ownerManager.Cancel()
|
||||
|
||||
d.wait.Wait()
|
||||
log.Infof("close DDL:%s", d.uuid)
|
||||
|
||||
@ -15,6 +15,8 @@ package ddl
|
||||
|
||||
import (
|
||||
"math"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -112,6 +114,20 @@ func (m *ownerManager) SetBgOwner(isOwner bool) {
|
||||
// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
|
||||
var ManagerSessionTTL = 60
|
||||
|
||||
// setManagerSessionTTL sets the ManagerSessionTTL value, it's used for testing.
|
||||
func setManagerSessionTTL() error {
|
||||
ttlStr := os.Getenv("tidb_manager_ttl")
|
||||
if len(ttlStr) == 0 {
|
||||
return nil
|
||||
}
|
||||
ttl, err := strconv.Atoi(ttlStr)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
ManagerSessionTTL = ttl
|
||||
return nil
|
||||
}
|
||||
|
||||
func newSession(ctx goctx.Context, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error) {
|
||||
var err error
|
||||
var etcdSession *concurrency.Session
|
||||
@ -162,8 +178,13 @@ func (m *ownerManager) campaignLoop(ctx goctx.Context, etcdSession *concurrency.
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
log.Infof("[ddl] break %s campaign loop", key)
|
||||
return
|
||||
// Revoke the session lease.
|
||||
// If revoke takes longer than the ttl, lease is expired anyway.
|
||||
ctx, cancel := goctx.WithTimeout(goctx.Background(),
|
||||
time.Duration(ManagerSessionTTL)*time.Second)
|
||||
_, err = m.etcdCli.Revoke(ctx, etcdSession.Lease())
|
||||
cancel()
|
||||
log.Infof("[ddl] break %s campaign loop err %v", key, err)
|
||||
default:
|
||||
}
|
||||
|
||||
@ -240,3 +261,10 @@ func (m *ownerManager) watchOwner(ctx goctx.Context, etcdSession *concurrency.Se
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
err := setManagerSessionTTL()
|
||||
if err != nil {
|
||||
log.Warnf("[ddl] set manager session TTL failed %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user