696 lines
23 KiB
Go
696 lines
23 KiB
Go
// Copyright 2017 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package owner
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/metrics"
|
|
"github.com/pingcap/tidb/pkg/parser/terror"
|
|
util2 "github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/etcd"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
|
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
"go.etcd.io/etcd/client/v3/concurrency"
|
|
atomicutil "go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Listener is used to listen the ownerManager's owner state.
|
|
type Listener interface {
|
|
OnBecomeOwner()
|
|
OnRetireOwner()
|
|
}
|
|
|
|
// Manager is used to campaign the owner and manage the owner information.
|
|
type Manager interface {
|
|
// ID returns the ID of the manager.
|
|
ID() string
|
|
// IsOwner returns whether the ownerManager is the owner.
|
|
IsOwner() bool
|
|
// RetireOwner make the manager to be a not owner. It's exported for testing.
|
|
RetireOwner()
|
|
// GetOwnerID gets the owner ID.
|
|
GetOwnerID(ctx context.Context) (string, error)
|
|
// SetOwnerOpValue updates the owner op value.
|
|
SetOwnerOpValue(ctx context.Context, op OpType) error
|
|
// CampaignOwner campaigns the owner. It will start a background goroutine to
|
|
// campaign owner in a loop, and when become or retire owner, it will call methods
|
|
// of the listener.
|
|
CampaignOwner(...int) error
|
|
// CampaignCancel cancels one etcd campaign, it will also close the underlying
|
|
// etcd session. After this method is called, the manager can be used to campaign
|
|
// owner again.
|
|
CampaignCancel()
|
|
// BreakCampaignLoop breaks the campaign loop, related listener methods will
|
|
// be called. The underlying etcd session the related campaign key will remain,
|
|
// so if some instance is the owner before, after break and campaign again, it
|
|
// will still be the owner.
|
|
BreakCampaignLoop()
|
|
// ResignOwner will resign and start a new election if it's the owner.
|
|
ResignOwner(ctx context.Context) error
|
|
// Close closes the manager, after close, no methods can be called.
|
|
Close()
|
|
// SetListener sets the listener, set before CampaignOwner.
|
|
SetListener(listener Listener)
|
|
// ForceToBeOwner restart the owner election and trying to be the new owner by
|
|
// end campaigns of all candidates and start a new campaign in a single transaction.
|
|
//
|
|
// This method is only used during upgrade and try to make node of newer version
|
|
// to be the DDL owner, to mitigate the issue https://github.com/pingcap/tidb/issues/54689,
|
|
// current instance shouldn't call CampaignOwner before calling this method.
|
|
// don't use it in other cases.
|
|
//
|
|
// Note: only one instance can call this method at a time, so you have to use
|
|
// a distributed lock when there are multiple instances of new version TiDB trying
|
|
// to be the owner. See runInBootstrapSession for where we lock it in DDL.
|
|
ForceToBeOwner(ctx context.Context) error
|
|
}
|
|
|
|
const keyOpDefaultTimeout = 5 * time.Second
|
|
|
|
// WaitTimeOnForceOwner is the time to wait before or after force to be owner.
|
|
// make it a var for test.
|
|
var WaitTimeOnForceOwner = 5 * time.Second
|
|
|
|
// OpType is the owner key value operation type.
|
|
type OpType byte
|
|
|
|
// List operation of types.
|
|
const (
|
|
OpNone OpType = 0
|
|
OpSyncUpgradingState OpType = 1
|
|
)
|
|
|
|
// String implements fmt.Stringer interface.
|
|
func (ot OpType) String() string {
|
|
switch ot {
|
|
case OpSyncUpgradingState:
|
|
return "sync upgrading state"
|
|
default:
|
|
return "none"
|
|
}
|
|
}
|
|
|
|
// IsSyncedUpgradingState represents whether the upgrading state is synchronized.
|
|
func (ot OpType) IsSyncedUpgradingState() bool {
|
|
return ot == OpSyncUpgradingState
|
|
}
|
|
|
|
// DDLOwnerChecker is used to check whether tidb is owner.
|
|
type DDLOwnerChecker interface {
|
|
// IsOwner returns whether the ownerManager is the owner.
|
|
IsOwner() bool
|
|
}
|
|
|
|
// ownerManager represents the structure which is used for electing owner.
|
|
type ownerManager struct {
|
|
id string // id is the ID of the manager.
|
|
key string
|
|
ctx context.Context
|
|
prompt string
|
|
logger *zap.Logger
|
|
etcdCli *clientv3.Client
|
|
elec atomic.Pointer[concurrency.Election]
|
|
sessionLease *atomicutil.Int64
|
|
wg sync.WaitGroup
|
|
campaignCancel context.CancelFunc
|
|
|
|
listener Listener
|
|
etcdSes *concurrency.Session
|
|
}
|
|
|
|
// NewOwnerManager creates a new Manager.
|
|
func NewOwnerManager(ctx context.Context, etcdCli *clientv3.Client, prompt, id, key string) Manager {
|
|
return &ownerManager{
|
|
etcdCli: etcdCli,
|
|
id: id,
|
|
key: key,
|
|
ctx: ctx,
|
|
prompt: prompt,
|
|
logger: logutil.BgLogger().With(zap.String("key", key), zap.String("id", id)),
|
|
sessionLease: atomicutil.NewInt64(0),
|
|
}
|
|
}
|
|
|
|
// ID implements Manager.ID interface.
|
|
func (m *ownerManager) ID() string {
|
|
return m.id
|
|
}
|
|
|
|
// IsOwner implements Manager.IsOwner interface.
|
|
func (m *ownerManager) IsOwner() bool {
|
|
return m.elec.Load() != nil
|
|
}
|
|
|
|
// Close implements Manager.Close interface.
|
|
func (m *ownerManager) Close() {
|
|
// same as CampaignCancel
|
|
m.CampaignCancel()
|
|
}
|
|
|
|
func (m *ownerManager) SetListener(listener Listener) {
|
|
m.listener = listener
|
|
}
|
|
|
|
func (m *ownerManager) ForceToBeOwner(context.Context) error {
|
|
m.logger.Info("force to be owner")
|
|
if err := m.refreshSession(util2.NewSessionDefaultRetryCnt, ManagerSessionTTL); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// due to issue https://github.com/pingcap/tidb/issues/54689, if the cluster
|
|
// version before upgrade don't have fix, when retire owners runs on older version
|
|
// and trying to be the new owner, it's possible that multiple owner exist at
|
|
// the same time, it cannot be avoided completely, but we can use below 2 strategies
|
|
// to mitigate this issue:
|
|
// 1. when trying to be owner, we delete all existing owner related keys and
|
|
// put new key in a single txn, if we delete the key one by one, other node
|
|
// might become the owner, it will have more chances to trigger previous issue.
|
|
// 2. sleep for a while before trying to be owner to make sure there is an owner in
|
|
// the cluster, and it has started watching. in the case of upgrade using
|
|
// tiup, tiup might restart current owner node to do rolling upgrade.
|
|
// before the restarted node force owner, another node might try to be
|
|
// the new owner too, it's still possible to trigger the issue. so we
|
|
// sleep a while to wait the cluster have a new owner and start watching.
|
|
for range 3 {
|
|
// we need to sleep in every retry, as other TiDB nodes will start campaign
|
|
// immediately after we delete their key.
|
|
time.Sleep(WaitTimeOnForceOwner)
|
|
if err := m.tryToBeOwnerOnce(); err != nil {
|
|
m.logger.Warn("failed to retire owner on older version", zap.Error(err))
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *ownerManager) tryToBeOwnerOnce() error {
|
|
lease := m.etcdSes.Lease()
|
|
keyPrefix := m.key + "/"
|
|
|
|
getResp, err := m.etcdCli.Get(m.ctx, keyPrefix, clientv3.WithPrefix())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// modifications to the same key multiple times within a single transaction are
|
|
// forbidden in etcd, so we cannot use delete by prefix and put in a single txn.
|
|
// it will report "duplicate key given in txn request" error.
|
|
// It's possible that other nodes put campaign keys between we get the keys and
|
|
// the txn to put new key, we relay on the sleep before calling this method to
|
|
// make sure all TiDBs have already put the key, and the distributed lock inside
|
|
// bootstrap to make sure no concurrent ForceToBeOwner is called.
|
|
txnOps := make([]clientv3.Op, 0, len(getResp.Kvs)+1)
|
|
// below key structure is copied from Election.Campaign.
|
|
campaignKey := fmt.Sprintf("%s%x", keyPrefix, lease)
|
|
for _, kv := range getResp.Kvs {
|
|
key := string(kv.Key)
|
|
if key == campaignKey {
|
|
// if below campaign failed, it will resign automatically, but if resign
|
|
// also failed, the old key might already exist
|
|
continue
|
|
}
|
|
txnOps = append(txnOps, clientv3.OpDelete(key))
|
|
}
|
|
txnOps = append(txnOps, clientv3.OpPut(campaignKey, m.id, clientv3.WithLease(lease)))
|
|
_, err = m.etcdCli.Txn(m.ctx).Then(txnOps...).Commit()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
// Campaign will wait until there is no key with smaller create-revision, either
|
|
// current instance become owner or all the keys are deleted, in case other nodes
|
|
// put keys in between previous get and txn, and makes current node never become
|
|
// the owner, so we add a timeout to avoid blocking.
|
|
ctx, cancel := context.WithTimeout(m.ctx, keyOpDefaultTimeout)
|
|
defer cancel()
|
|
elec := concurrency.NewElection(m.etcdSes, m.key)
|
|
if err = elec.Campaign(ctx, m.id); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// Campaign assumes that it's the only client managing the lifecycle of the campaign
|
|
// key, it only checks whether there are any keys with smaller create-revision,
|
|
// so it will also return when all the campaign keys are deleted by other TiDB
|
|
// instances when the distributed lock has failed to keep alive and another TiDB
|
|
// get the lock. It's a quite rare case, and the TiDB must be of newer version
|
|
// which has the fix of the issue, so it's ok to return now.
|
|
return nil
|
|
}
|
|
|
|
// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
|
|
var ManagerSessionTTL = 60
|
|
|
|
// setManagerSessionTTL sets the ManagerSessionTTL value, it's used for testing.
|
|
func setManagerSessionTTL() error {
|
|
ttlStr := os.Getenv("tidb_manager_ttl")
|
|
if ttlStr == "" {
|
|
return nil
|
|
}
|
|
ttl, err := strconv.Atoi(ttlStr)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
ManagerSessionTTL = ttl
|
|
return nil
|
|
}
|
|
|
|
// CampaignOwner implements Manager.CampaignOwner interface.
|
|
func (m *ownerManager) CampaignOwner(withTTL ...int) error {
|
|
ttl := ManagerSessionTTL
|
|
if len(withTTL) == 1 {
|
|
ttl = withTTL[0]
|
|
}
|
|
if m.etcdSes == nil {
|
|
m.logger.Info("start campaign owner")
|
|
if err := m.refreshSession(util2.NewSessionDefaultRetryCnt, ttl); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
} else {
|
|
m.logger.Info("start campaign owner with existing session",
|
|
zap.String("lease", util2.FormatLeaseID(m.etcdSes.Lease())))
|
|
}
|
|
m.wg.Add(1)
|
|
var campaignContext context.Context
|
|
campaignContext, m.campaignCancel = context.WithCancel(m.ctx)
|
|
go m.campaignLoop(campaignContext)
|
|
return nil
|
|
}
|
|
|
|
// ResignOwner lets the owner start a new election.
|
|
func (m *ownerManager) ResignOwner(ctx context.Context) error {
|
|
elec := m.elec.Load()
|
|
if elec == nil {
|
|
return errors.Errorf("This node is not a owner, can't be resigned")
|
|
}
|
|
|
|
childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
|
|
err := elec.Resign(childCtx)
|
|
cancel()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
m.logger.Warn("resign owner success")
|
|
return nil
|
|
}
|
|
|
|
func (m *ownerManager) toBeOwner(elec *concurrency.Election) {
|
|
m.elec.Store(elec)
|
|
m.logger.Info("become owner")
|
|
if m.listener != nil {
|
|
m.listener.OnBecomeOwner()
|
|
}
|
|
}
|
|
|
|
// RetireOwner make the manager to be a not owner.
|
|
func (m *ownerManager) RetireOwner() {
|
|
m.elec.Store(nil)
|
|
m.logger.Info("retire owner")
|
|
if m.listener != nil {
|
|
m.listener.OnRetireOwner()
|
|
}
|
|
}
|
|
|
|
// CampaignCancel implements Manager.CampaignCancel interface.
|
|
func (m *ownerManager) CampaignCancel() {
|
|
m.BreakCampaignLoop()
|
|
m.closeSession()
|
|
}
|
|
|
|
func (m *ownerManager) BreakCampaignLoop() {
|
|
if m.campaignCancel != nil {
|
|
m.campaignCancel()
|
|
}
|
|
m.wg.Wait()
|
|
}
|
|
|
|
func (m *ownerManager) campaignLoop(campaignContext context.Context) {
|
|
defer func() {
|
|
m.campaignCancel()
|
|
if r := recover(); r != nil {
|
|
m.logger.Error("recover panic", zap.String("prompt", m.prompt), zap.Any("error", r), zap.Stack("buffer"))
|
|
metrics.PanicCounter.WithLabelValues(metrics.LabelDDLOwner).Inc()
|
|
}
|
|
m.wg.Done()
|
|
}()
|
|
|
|
leaseNotFoundCh := make(chan struct{})
|
|
for {
|
|
select {
|
|
case <-m.etcdSes.Done():
|
|
m.logger.Info("etcd session done, refresh it")
|
|
if err2 := m.refreshSession(util2.NewSessionRetryUnlimited, ManagerSessionTTL); err2 != nil {
|
|
m.logger.Info("break campaign loop, refresh session failed", zap.Error(err2))
|
|
return
|
|
}
|
|
case <-leaseNotFoundCh:
|
|
m.logger.Info("meet lease not found error, refresh session")
|
|
if err2 := m.refreshSession(util2.NewSessionRetryUnlimited, ManagerSessionTTL); err2 != nil {
|
|
m.logger.Info("break campaign loop, refresh session failed", zap.Error(err2))
|
|
return
|
|
}
|
|
leaseNotFoundCh = make(chan struct{})
|
|
case <-campaignContext.Done():
|
|
failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) {
|
|
if v.(string) == "delOwnerKeyAndNotOwner" {
|
|
m.logger.Info("mock break campaign and don't clear related info")
|
|
return
|
|
}
|
|
})
|
|
m.logger.Info("break campaign loop, context is done")
|
|
return
|
|
default:
|
|
}
|
|
|
|
if err := m.campaignAndWatch(campaignContext); err != nil {
|
|
metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, err.Error()).Inc()
|
|
|
|
// If the etcd server turns clocks forward,the following case may occur.
|
|
// The etcd server deletes this session's lease ID, but etcd session doesn't find it.
|
|
// In this time if we do the campaign operation, the etcd server will return ErrLeaseNotFound.
|
|
if terror.ErrorEqual(err, rpctypes.ErrLeaseNotFound) {
|
|
close(leaseNotFoundCh)
|
|
}
|
|
m.logger.Info("campaign and watch failed", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *ownerManager) campaignAndWatch(ctx context.Context) error {
|
|
elec := concurrency.NewElection(m.etcdSes, m.key)
|
|
failpoint.InjectCall("beforeElectionCampaign", m.etcdSes)
|
|
err := elec.Campaign(ctx, m.id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ownerKey, currRev, err := GetOwnerKeyInfo(ctx, m.etcdCli, m.key, m.id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
m.toBeOwner(elec)
|
|
|
|
err = m.watchOwner(ctx, m.etcdSes, ownerKey, currRev)
|
|
m.logger.Info("watch owner finished", zap.Error(err))
|
|
m.RetireOwner()
|
|
|
|
metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc()
|
|
m.logger.Info("is not the owner")
|
|
return err
|
|
}
|
|
|
|
func (m *ownerManager) closeSession() {
|
|
if m.etcdSes != nil {
|
|
if err := m.etcdSes.Close(); err != nil {
|
|
m.logger.Info("etcd session close failed", zap.Error(err))
|
|
}
|
|
m.etcdSes = nil
|
|
}
|
|
}
|
|
|
|
func (m *ownerManager) refreshSession(retryCnt, ttl int) error {
|
|
m.closeSession()
|
|
// Note: we must use manager's context to create session. If we use campaign
|
|
// context and the context is cancelled, the created session cannot be closed
|
|
// as session close depends on the context.
|
|
// One drawback is that when you want to break the campaign loop, and the campaign
|
|
// loop is refreshing the session, it might wait for a long time to return, it
|
|
// should be fine as long as network is ok, and acceptable to wait when not.
|
|
logPrefix := fmt.Sprintf("[%s] %s ownerManager %s", m.prompt, m.key, m.id)
|
|
sess, err2 := util2.NewSession(m.ctx, logPrefix, m.etcdCli, retryCnt, ttl)
|
|
if err2 != nil {
|
|
return errors.Trace(err2)
|
|
}
|
|
m.etcdSes = sess
|
|
m.sessionLease.Store(int64(m.etcdSes.Lease()))
|
|
return nil
|
|
}
|
|
|
|
// GetOwnerID implements Manager.GetOwnerID interface.
|
|
func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) {
|
|
_, ownerID, _, _, _, err := getOwnerInfo(ctx, m.etcdCli, m.key)
|
|
return string(ownerID), errors.Trace(err)
|
|
}
|
|
|
|
func getOwnerInfo(ctx context.Context, etcdCli *clientv3.Client, ownerPath string) (ownerKey string, ownerID []byte, op OpType, currRevision, _ int64, err error) {
|
|
var resp *clientv3.GetResponse
|
|
logger := logutil.BgLogger().With(zap.String("key", ownerPath))
|
|
for i := range 3 {
|
|
if err = ctx.Err(); err != nil {
|
|
return "", nil, op, 0, 0, errors.Trace(err)
|
|
}
|
|
|
|
childCtx, cancel := context.WithTimeout(ctx, etcd.KeyOpDefaultTimeout)
|
|
resp, err = etcdCli.Get(childCtx, ownerPath, clientv3.WithFirstCreate()...)
|
|
cancel()
|
|
if err == nil {
|
|
break
|
|
}
|
|
logger.Info("etcd-cli get owner info failed", zap.Int("retryCnt", i), zap.Error(err))
|
|
time.Sleep(etcd.KeyOpRetryInterval)
|
|
}
|
|
if err != nil {
|
|
logger.Warn("etcd-cli get owner info failed", zap.Error(err))
|
|
return "", nil, op, 0, 0, errors.Trace(err)
|
|
}
|
|
if len(resp.Kvs) == 0 {
|
|
return "", nil, op, 0, 0, concurrency.ErrElectionNoLeader
|
|
}
|
|
ownerID, op = splitOwnerValues(resp.Kvs[0].Value)
|
|
return string(resp.Kvs[0].Key), ownerID, op, resp.Header.Revision, resp.Kvs[0].ModRevision, nil
|
|
}
|
|
|
|
// GetOwnerKeyInfo gets the owner key and current revision.
|
|
func GetOwnerKeyInfo(
|
|
ctx context.Context,
|
|
etcdCli *clientv3.Client,
|
|
etcdKey, id string,
|
|
) (string, int64, error) {
|
|
ownerKey, ownerID, _, currRevision, _, err := getOwnerInfo(ctx, etcdCli, etcdKey)
|
|
if err != nil {
|
|
return "", 0, errors.Trace(err)
|
|
}
|
|
logutil.BgLogger().Info("get owner",
|
|
zap.String("key", etcdKey),
|
|
zap.String("owner key", ownerKey),
|
|
zap.ByteString("ownerID", ownerID))
|
|
if string(ownerID) != id {
|
|
logutil.BgLogger().Warn("is not the owner", zap.String("key", etcdKey),
|
|
zap.String("id", id), zap.String("ownerID", string(ownerID)))
|
|
return "", 0, errors.New("ownerInfoNotMatch")
|
|
}
|
|
|
|
return ownerKey, currRevision, nil
|
|
}
|
|
|
|
func splitOwnerValues(val []byte) ([]byte, OpType) {
|
|
vals := bytes.Split(val, []byte("_"))
|
|
var op OpType
|
|
if len(vals) == 2 {
|
|
op = OpType(vals[1][0])
|
|
}
|
|
return vals[0], op
|
|
}
|
|
|
|
func joinOwnerValues(vals ...[]byte) []byte {
|
|
return bytes.Join(vals, []byte("_"))
|
|
}
|
|
|
|
// SetOwnerOpValue implements Manager.SetOwnerOpValue interface.
|
|
func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error {
|
|
// owner don't change.
|
|
ownerKey, ownerID, currOp, _, modRevision, err := getOwnerInfo(ctx, m.etcdCli, m.key)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if currOp == op {
|
|
m.logger.Info("set owner op is the same as the original, so do nothing.", zap.Stringer("op", op))
|
|
return nil
|
|
}
|
|
if string(ownerID) != m.id {
|
|
return errors.New("ownerInfoNotMatch")
|
|
}
|
|
newOwnerVal := joinOwnerValues(ownerID, []byte{byte(op)})
|
|
|
|
failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) {
|
|
if valStr, ok := v.(string); ok {
|
|
if err := mockDelOwnerKey(valStr, ownerKey, m); err != nil {
|
|
failpoint.Return(err)
|
|
}
|
|
}
|
|
})
|
|
|
|
leaseOp := clientv3.WithLease(clientv3.LeaseID(m.sessionLease.Load()))
|
|
resp, err := m.etcdCli.Txn(ctx).
|
|
If(clientv3.Compare(clientv3.ModRevision(ownerKey), "=", modRevision)).
|
|
Then(clientv3.OpPut(ownerKey, string(newOwnerVal), leaseOp)).
|
|
Commit()
|
|
if err == nil && !resp.Succeeded {
|
|
err = errors.New("put owner key failed, cmp is false")
|
|
}
|
|
m.logger.Info("set owner op value", zap.String("owner key", ownerKey), zap.ByteString("ownerID", ownerID),
|
|
zap.Stringer("old Op", currOp), zap.Stringer("op", op), zap.Error(err))
|
|
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.PutValue+"_"+metrics.RetLabel(err)).Inc()
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// GetOwnerOpValue gets the owner op value.
|
|
func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath string) (OpType, error) {
|
|
// It's using for testing.
|
|
if etcdCli == nil {
|
|
return *mockOwnerOpValue.Load(), nil
|
|
}
|
|
|
|
_, _, op, _, _, err := getOwnerInfo(ctx, etcdCli, ownerPath)
|
|
logutil.BgLogger().Info("get owner op value",
|
|
zap.String("key", ownerPath),
|
|
zap.Stringer("owner op", op))
|
|
return op, errors.Trace(err)
|
|
}
|
|
|
|
// WatchOwnerForTest watches the ownerKey.
|
|
// This function is used to test watchOwner().
|
|
func WatchOwnerForTest(ctx context.Context, m Manager, etcdSession *concurrency.Session, key string, createRevison int64) error {
|
|
if ownerManager, ok := m.(*ownerManager); ok {
|
|
return ownerManager.watchOwner(ctx, etcdSession, key, createRevison)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string, currRev int64) error {
|
|
logger := m.logger.With(zap.String("ownerKey", key), zap.Int64("currRev", currRev))
|
|
logger.Info("watching owner key")
|
|
// we need to watch the ownerKey since currRev + 1.
|
|
watchCh := m.etcdCli.Watch(ctx, key, clientv3.WithRev(currRev+1))
|
|
for {
|
|
select {
|
|
case resp, ok := <-watchCh:
|
|
if !ok {
|
|
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.WatcherClosed).Inc()
|
|
logger.Info("watcher is closed, no owner")
|
|
return errors.Errorf("watcher is closed, key: %v", key)
|
|
}
|
|
if resp.Canceled {
|
|
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Cancelled).Inc()
|
|
logger.Info("watch canceled, no owner", zap.Error(resp.Err()))
|
|
return errors.Errorf("watch canceled, key: %v", key)
|
|
}
|
|
|
|
for _, ev := range resp.Events {
|
|
if ev.Type == mvccpb.DELETE {
|
|
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Deleted).Inc()
|
|
logger.Info("watch failed, owner is deleted")
|
|
return nil
|
|
}
|
|
}
|
|
case <-etcdSession.Done():
|
|
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.SessionDone).Inc()
|
|
return nil
|
|
case <-ctx.Done():
|
|
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.CtxDone).Inc()
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
err := setManagerSessionTTL()
|
|
if err != nil {
|
|
logutil.BgLogger().Warn("set manager session TTL failed", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// AcquireDistributedLock creates a mutex with ETCD client, and returns a mutex release function.
|
|
func AcquireDistributedLock(
|
|
ctx context.Context,
|
|
cli *clientv3.Client,
|
|
key string,
|
|
ttlInSec int,
|
|
) (release func(), err error) {
|
|
se, err := concurrency.NewSession(cli, concurrency.WithTTL(ttlInSec))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
mu := concurrency.NewMutex(se, key)
|
|
maxRetryCnt := 10
|
|
err = util2.RunWithRetry(maxRetryCnt, util2.RetryInterval, func() (bool, error) {
|
|
err = mu.Lock(ctx)
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
return false, nil
|
|
})
|
|
failpoint.InjectCall("mockAcquireDistLockFailed", &err)
|
|
if err != nil {
|
|
err1 := se.Close()
|
|
if err1 != nil {
|
|
logutil.Logger(ctx).Warn("close session error", zap.Error(err1))
|
|
}
|
|
return nil, err
|
|
}
|
|
logutil.Logger(ctx).Info("acquire distributed lock success", zap.String("key", key))
|
|
return func() {
|
|
err = mu.Unlock(ctx)
|
|
if err != nil {
|
|
logutil.Logger(ctx).Warn("release distributed lock error", zap.Error(err), zap.String("key", key))
|
|
} else {
|
|
logutil.Logger(ctx).Info("release distributed lock success", zap.String("key", key))
|
|
}
|
|
err = se.Close()
|
|
if err != nil {
|
|
logutil.Logger(ctx).Warn("close session error", zap.Error(err))
|
|
}
|
|
}, nil
|
|
}
|
|
|
|
// ListenersWrapper is a list of listeners.
|
|
// A way to broadcast events to multiple listeners.
|
|
type ListenersWrapper struct {
|
|
listeners []Listener
|
|
}
|
|
|
|
// OnBecomeOwner broadcasts the OnBecomeOwner event to all listeners.
|
|
func (ol *ListenersWrapper) OnBecomeOwner() {
|
|
for _, l := range ol.listeners {
|
|
l.OnBecomeOwner()
|
|
}
|
|
}
|
|
|
|
// OnRetireOwner broadcasts the OnRetireOwner event to all listeners.
|
|
func (ol *ListenersWrapper) OnRetireOwner() {
|
|
for _, l := range ol.listeners {
|
|
l.OnRetireOwner()
|
|
}
|
|
}
|
|
|
|
// NewListenersWrapper creates a new OwnerListeners.
|
|
func NewListenersWrapper(listeners ...Listener) *ListenersWrapper {
|
|
return &ListenersWrapper{listeners: listeners}
|
|
}
|