Files
tidb/pkg/owner/manager_test.go
2025-04-27 10:39:16 +00:00

606 lines
19 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package owner_test
import (
"context"
"fmt"
"net/url"
"runtime"
"sync/atomic"
"testing"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/tests/v3/integration"
"golang.org/x/exp/rand"
)
type testInfo struct {
cluster *integration.ClusterV3
client *clientv3.Client
}
func newTestInfo(t *testing.T) *testInfo {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
return &testInfo{
cluster: cluster,
client: cluster.Client(0),
}
}
func (ti *testInfo) Close(t *testing.T) {
ti.cluster.Terminate(t)
}
type listener struct {
val atomic.Bool
}
func (l *listener) OnBecomeOwner() {
l.val.Store(true)
}
func (l *listener) OnRetireOwner() {
l.val.Store(false)
}
func TestForceToBeOwner(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)
tInfo := newTestInfo(t)
client := tInfo.client
defer tInfo.Close(t)
// put a key with same prefix to mock another node
ctx := context.Background()
testKey := "/owner/key/a"
_, err := client.Put(ctx, testKey, "a")
require.NoError(t, err)
resp, err := client.Get(ctx, testKey)
require.NoError(t, err)
require.Len(t, resp.Kvs, 1)
bak := owner.WaitTimeOnForceOwner
t.Cleanup(func() {
owner.WaitTimeOnForceOwner = bak
})
owner.WaitTimeOnForceOwner = time.Millisecond
ownerMgr := owner.NewOwnerManager(ctx, client, "ddl", "1", "/owner/key")
defer ownerMgr.Close()
lis := &listener{}
ownerMgr.SetListener(lis)
require.NoError(t, ownerMgr.ForceToBeOwner(ctx))
// key of other node is deleted
resp, err = client.Get(ctx, testKey)
require.NoError(t, err)
require.Empty(t, resp.Kvs)
require.NoError(t, ownerMgr.CampaignOwner())
isOwner := checkOwner(ownerMgr, true)
require.True(t, isOwner)
require.True(t, lis.val.Load())
}
func TestSingle(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)
tInfo := newTestInfo(t)
client := tInfo.client
defer tInfo.Close(t)
t.Run("retry on session closed before election", func(t *testing.T) {
ownerMgr := owner.NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key")
defer ownerMgr.Close()
var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/owner/beforeElectionCampaign",
func(se *concurrency.Session) {
if counter.Add(1) <= 1 {
require.NoError(t, se.Close())
}
},
)
require.NoError(t, ownerMgr.CampaignOwner())
isOwner := checkOwner(ownerMgr, true)
require.True(t, isOwner)
require.EqualValues(t, 2, counter.Load())
})
t.Run("retry on lease revoked before election", func(t *testing.T) {
ownerMgr := owner.NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key")
defer ownerMgr.Close()
var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/owner/beforeElectionCampaign",
func(se *concurrency.Session) {
if counter.Add(1) <= 2 {
_, err := client.Revoke(context.Background(), se.Lease())
require.NoError(t, err)
}
},
)
require.NoError(t, ownerMgr.CampaignOwner())
isOwner := checkOwner(ownerMgr, true)
require.True(t, isOwner)
require.EqualValues(t, 3, counter.Load())
})
ownerMgr := owner.NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key")
lis := &listener{}
ownerMgr.SetListener(lis)
require.NoError(t, ownerMgr.CampaignOwner())
isOwner := checkOwner(ownerMgr, true)
require.True(t, isOwner)
require.True(t, lis.val.Load())
// test for newSession failed
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
ownerMgr2 := owner.NewOwnerManager(ctx, client, "ddl", "2", "/owner/key")
defer ownerMgr2.Close()
cancel()
err := ownerMgr2.CampaignOwner()
comment := fmt.Sprintf("campaigned result don't match, err %v", err)
require.True(t, terror.ErrorEqual(err, context.Canceled) || terror.ErrorEqual(err, context.DeadlineExceeded), comment)
isOwner = checkOwner(ownerMgr, true)
require.True(t, isOwner)
// The test is used to exit campaign loop.
ownerMgr.Close()
isOwner = checkOwner(ownerMgr, false)
require.False(t, isOwner)
require.False(t, lis.val.Load())
time.Sleep(200 * time.Millisecond)
// err is ok to be not nil since we canceled the manager.
ownerID, _ := ownerMgr2.GetOwnerID(ctx)
require.Equal(t, "", ownerID)
op, _ := owner.GetOwnerOpValue(ctx, client, "/owner/key")
require.Equal(t, op, owner.OpNone)
}
func TestSetAndGetOwnerOpValue(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)
tInfo := newTestInfo(t)
defer tInfo.Close(t)
ownerMgr := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "1", "/owner/key")
defer ownerMgr.Close()
require.NoError(t, ownerMgr.CampaignOwner())
isOwner := checkOwner(ownerMgr, true)
require.True(t, isOwner)
// test set/get owner info
ownerID, err := ownerMgr.GetOwnerID(context.Background())
require.NoError(t, err)
require.Equal(t, ownerMgr.ID(), ownerID)
op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key")
require.NoError(t, err)
require.Equal(t, op, owner.OpNone)
require.False(t, op.IsSyncedUpgradingState())
err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState)
require.NoError(t, err)
op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key")
require.NoError(t, err)
require.Equal(t, op, owner.OpSyncUpgradingState)
require.True(t, op.IsSyncedUpgradingState())
// update the same as the original value
err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState)
require.NoError(t, err)
op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key")
require.NoError(t, err)
require.Equal(t, op, owner.OpSyncUpgradingState)
require.True(t, op.IsSyncedUpgradingState())
// test del owner key when SetOwnerOpValue
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey", `return("delOwnerKeyAndNotOwner")`))
err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpNone)
require.ErrorContains(t, err, "put owner key failed, cmp is false")
op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key")
require.NotNil(t, err)
require.Equal(t, concurrency.ErrElectionNoLeader.Error(), err.Error())
require.Equal(t, op, owner.OpNone)
require.False(t, op.IsSyncedUpgradingState())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey"))
// Let ddl run for the owner again.
require.NoError(t, ownerMgr.CampaignOwner())
isOwner = checkOwner(ownerMgr, true)
require.True(t, isOwner)
// Mock the manager become not owner because the owner is deleted(like TTL is timeout).
// And then the manager campaigns the owner again, and become the owner.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey", `return("onlyDelOwnerKey")`))
err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState)
require.ErrorContains(t, err, "put owner key failed, cmp is false")
isOwner = checkOwner(ownerMgr, true)
require.True(t, isOwner)
op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key")
require.NoError(t, err)
require.Equal(t, op, owner.OpNone)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey"))
}
// TestGetOwnerOpValueBeforeSet tests get owner opValue before set this value when the etcdClient is nil.
func TestGetOwnerOpValueBeforeSet(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockNotSetOwnerOp", `return(true)`))
ownerMgr := owner.NewMockManager(context.Background(), "1", nil, "/owner/key")
defer ownerMgr.Close()
require.NoError(t, ownerMgr.CampaignOwner())
isOwner := checkOwner(ownerMgr, true)
require.True(t, isOwner)
// test set/get owner info
ownerID, err := ownerMgr.GetOwnerID(context.Background())
require.NoError(t, err)
require.Equal(t, ownerMgr.ID(), ownerID)
op, err := owner.GetOwnerOpValue(context.Background(), nil, "/owner/key")
require.NoError(t, err)
require.Equal(t, op, owner.OpNone)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockNotSetOwnerOp"))
err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState)
require.NoError(t, err)
op, err = owner.GetOwnerOpValue(context.Background(), nil, "/owner/key")
require.NoError(t, err)
require.Equal(t, op, owner.OpSyncUpgradingState)
}
func TestCluster(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)
originalTTL := owner.ManagerSessionTTL
owner.ManagerSessionTTL = 3
defer func() {
owner.ManagerSessionTTL = originalTTL
}()
tInfo := newTestInfo(t)
defer tInfo.Close(t)
ownerMgr := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "1", "/owner/key")
require.NoError(t, ownerMgr.CampaignOwner())
isOwner := checkOwner(ownerMgr, true)
require.True(t, isOwner)
ownerMgr2 := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "2", "/owner/key")
require.NoError(t, ownerMgr2.CampaignOwner())
isOwner = checkOwner(ownerMgr2, false)
require.False(t, isOwner)
// Delete the leader key, the d1 become the owner.
err := deleteLeader(tInfo.client, "/owner/key")
require.NoError(t, err)
isOwner = checkOwner(ownerMgr, false)
require.False(t, isOwner)
ownerMgr.Close()
// d3 (not owner) stop
ownerMgr3 := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "3", "/owner/key")
require.NoError(t, ownerMgr3.CampaignOwner())
isOwner = checkOwner(ownerMgr3, false)
require.False(t, isOwner)
ownerMgr3.Close()
// Cancel the owner context, there is no owner.
ownerMgr2.Close()
_, _, err = owner.GetOwnerKeyInfo(context.Background(), tInfo.client, "/owner/key", "useless id")
require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err)
op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key")
require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err)
require.Equal(t, op, owner.OpNone)
}
func TestWatchOwner(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)
tInfo := newTestInfo(t)
client := tInfo.client
defer tInfo.Close(t)
ownerMgr := owner.NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key")
defer ownerMgr.Close()
lis := &listener{}
ownerMgr.SetListener(lis)
require.NoError(t, ownerMgr.CampaignOwner())
isOwner := checkOwner(ownerMgr, true)
require.True(t, isOwner)
// get the owner id.
ctx := context.Background()
id, err := ownerMgr.GetOwnerID(ctx)
require.NoError(t, err)
// create etcd session.
session, err := concurrency.NewSession(client)
require.NoError(t, err)
// test the GetOwnerKeyInfo()
ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, client, "/owner/key", id)
require.NoError(t, err)
// watch the ownerKey.
ctx2, cancel2 := context.WithTimeout(ctx, time.Millisecond*300)
defer cancel2()
watchDone := make(chan bool)
watched := false
go func() {
watchErr := owner.WatchOwnerForTest(ctx, ownerMgr, session, ownerKey, currRevision)
require.NoError(t, watchErr)
watchDone <- true
}()
select {
case watched = <-watchDone:
case <-ctx2.Done():
}
require.False(t, watched)
// delete the owner, and can watch the DELETE event.
err = deleteLeader(client, "/owner/key")
require.NoError(t, err)
watched = <-watchDone
require.True(t, watched)
// the ownerKey has been deleted, watch ownerKey again, it can be watched.
go func() {
watchErr := owner.WatchOwnerForTest(ctx, ownerMgr, session, ownerKey, currRevision)
require.NoError(t, watchErr)
watchDone <- true
}()
watched = <-watchDone
require.True(t, watched)
}
func TestWatchOwnerAfterDeleteOwnerKey(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)
tInfo := newTestInfo(t)
client := tInfo.client
defer tInfo.Close(t)
ownerMgr := owner.NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key")
defer ownerMgr.Close()
lis := &listener{}
ownerMgr.SetListener(lis)
require.NoError(t, ownerMgr.CampaignOwner())
isOwner := checkOwner(ownerMgr, true)
require.True(t, isOwner)
// get the owner id.
ctx := context.Background()
id, err := ownerMgr.GetOwnerID(ctx)
require.NoError(t, err)
session, err := concurrency.NewSession(client)
require.NoError(t, err)
// get the ownkey informations.
ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, client, "/owner/key", id)
require.NoError(t, err)
// delete the ownerkey
err = deleteLeader(client, "/owner/key")
require.NoError(t, err)
// watch the ownerKey with the current revisoin.
watchDone := make(chan bool)
go func() {
watchErr := owner.WatchOwnerForTest(ctx, ownerMgr, session, ownerKey, currRevision)
require.NoError(t, watchErr)
watchDone <- true
}()
<-watchDone
}
func checkOwner(ownerMgr owner.Manager, fbVal bool) (isOwner bool) {
// The longest to wait for 30 seconds to
// make sure that campaigning owners is completed.
for range 6000 {
time.Sleep(5 * time.Millisecond)
isOwner = ownerMgr.IsOwner()
if isOwner == fbVal {
break
}
}
return
}
func deleteLeader(cli *clientv3.Client, prefixKey string) error {
session, err := concurrency.NewSession(cli)
if err != nil {
return errors.Trace(err)
}
defer func() {
_ = session.Close()
}()
election := concurrency.NewElection(session, prefixKey)
resp, err := election.Leader(context.Background())
if err != nil {
return errors.Trace(err)
}
_, err = cli.Delete(context.Background(), string(resp.Kvs[0].Key))
return errors.Trace(err)
}
func TestImmediatelyCancel(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)
tInfo := newTestInfo(t)
defer tInfo.Close(t)
ownerMgr := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "1", "/owner/key")
defer ownerMgr.Close()
for range 10 {
err := ownerMgr.CampaignOwner()
require.NoError(t, err)
ownerMgr.CampaignCancel()
}
}
func TestAcquireDistributedLock(t *testing.T) {
const addrFmt = "http://127.0.0.1:%d"
cfg := embed.NewConfig()
cfg.Dir = t.TempDir()
// rand port in [20000, 60000)
randPort := int(rand.Int31n(40000)) + 20000
clientAddr := fmt.Sprintf(addrFmt, randPort)
lcurl, _ := url.Parse(clientAddr)
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{*lcurl}, []url.URL{*lcurl}
lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1))
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{*lpurl}, []url.URL{*lpurl}
cfg.InitialCluster = "default=" + lpurl.String()
cfg.Logger = "zap"
embedEtcd, err := embed.StartEtcd(cfg)
require.NoError(t, err)
<-embedEtcd.Server.ReadyNotify()
t.Cleanup(func() {
embedEtcd.Close()
})
makeEtcdCli := func(t *testing.T) (cli *clientv3.Client) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{lcurl.String()},
})
require.NoError(t, err)
t.Cleanup(func() {
cli.Close()
})
return cli
}
t.Run("acquire distributed lock with same client", func(t *testing.T) {
cli := makeEtcdCli(t)
getLock := make(chan struct{})
ctx := context.Background()
release1, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10)
require.NoError(t, err)
var wg util.WaitGroupWrapper
wg.Run(func() {
// Acquire another distributed lock will be blocked.
release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10)
require.NoError(t, err)
getLock <- struct{}{}
release2()
})
timer := time.NewTimer(300 * time.Millisecond)
select {
case <-getLock:
require.FailNow(t, "acquired same lock unexpectedly")
case <-timer.C:
release1()
<-getLock
}
wg.Wait()
release1, err = owner.AcquireDistributedLock(ctx, cli, "test-lock/1", 10)
require.NoError(t, err)
release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock/2", 10)
require.NoError(t, err)
release1()
release2()
})
t.Run("acquire distributed lock with different clients", func(t *testing.T) {
cli1 := makeEtcdCli(t)
cli2 := makeEtcdCli(t)
getLock := make(chan struct{})
ctx := context.Background()
release1, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 10)
require.NoError(t, err)
var wg util.WaitGroupWrapper
wg.Run(func() {
// Acquire another distributed lock will be blocked.
release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10)
require.NoError(t, err)
getLock <- struct{}{}
release2()
})
timer := time.NewTimer(300 * time.Millisecond)
select {
case <-getLock:
require.FailNow(t, "acquired same lock unexpectedly")
case <-timer.C:
release1()
<-getLock
}
wg.Wait()
})
t.Run("acquire distributed lock until timeout", func(t *testing.T) {
cli1 := makeEtcdCli(t)
cli2 := makeEtcdCli(t)
ctx := context.Background()
_, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 1)
require.NoError(t, err)
cli1.Close() // Note that release() is not invoked.
release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10)
require.NoError(t, err)
release2()
})
}
func TestListenersWrapper(t *testing.T) {
lis1 := &listener{}
lis2 := &listener{}
wrapper := owner.NewListenersWrapper(lis1, lis2)
// Test OnBecomeOwner
wrapper.OnBecomeOwner()
require.True(t, lis1.val.Load())
require.True(t, lis2.val.Load())
// Test OnRetireOwner
wrapper.OnRetireOwner()
require.False(t, lis1.val.Load())
require.False(t, lis2.val.Load())
}