ddl: wait until there is an owner in the cluster (#36664)
close pingcap/tidb#36659
This commit is contained in:
@ -295,7 +295,8 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) {
|
||||
startTime := time.Now()
|
||||
var err error
|
||||
// DDLForce2Queue is a flag to tell DDL worker to always push the job to the DDL queue.
|
||||
if variable.EnableConcurrentDDL.Load() && !variable.DDLForce2Queue.Load() {
|
||||
toTable := variable.EnableConcurrentDDL.Load() && !variable.DDLForce2Queue.Load()
|
||||
if toTable {
|
||||
err = d.addBatchDDLJobs2Table(tasks)
|
||||
} else {
|
||||
err = d.addBatchDDLJobs2Queue(tasks)
|
||||
@ -310,7 +311,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) {
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("[ddl] add DDL jobs failed", zap.String("jobs", jobs), zap.Error(err))
|
||||
} else {
|
||||
logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs))
|
||||
logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs), zap.Bool("table", toTable))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -51,6 +51,7 @@ import (
|
||||
utilparser "github.com/pingcap/tidb/util/parser"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
"github.com/pingcap/tidb/util/timeutil"
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -790,6 +791,9 @@ func upgrade(s Session) {
|
||||
// Only upgrade from under version92 and this TiDB is not owner set.
|
||||
// The owner in older tidb does not support concurrent DDL, we should add the internal DDL to job queue.
|
||||
if ver < version92 && !domain.GetDomain(s).DDL().OwnerManager().IsOwner() {
|
||||
if err := waitOwner(context.Background(), domain.GetDomain(s)); err != nil {
|
||||
logutil.BgLogger().Fatal("[Upgrade] upgrade failed", zap.Error(err))
|
||||
}
|
||||
// use another variable DDLForce2Queue but not EnableConcurrentDDL since in upgrade it may set global variable, the initial step will
|
||||
// overwrite variable EnableConcurrentDDL.
|
||||
variable.DDLForce2Queue.Store(true)
|
||||
@ -830,6 +834,25 @@ func upgrade(s Session) {
|
||||
}
|
||||
}
|
||||
|
||||
// waitOwner is used to wait the DDL owner to be elected in the cluster.
|
||||
func waitOwner(ctx context.Context, dom *domain.Domain) error {
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
logutil.BgLogger().Info("Waiting for the DDL owner to be elected in the cluster")
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
_, err := dom.DDL().OwnerManager().GetOwnerID(ctx)
|
||||
if err == concurrency.ErrElectionNoLeader {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// upgradeToVer2 updates to version 2.
|
||||
func upgradeToVer2(s Session, ver int64) {
|
||||
if ver >= version2 {
|
||||
|
||||
Reference in New Issue
Block a user