diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 5ecd4bbf1f..4c22af0b35 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -295,7 +295,8 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { startTime := time.Now() var err error // DDLForce2Queue is a flag to tell DDL worker to always push the job to the DDL queue. - if variable.EnableConcurrentDDL.Load() && !variable.DDLForce2Queue.Load() { + toTable := variable.EnableConcurrentDDL.Load() && !variable.DDLForce2Queue.Load() + if toTable { err = d.addBatchDDLJobs2Table(tasks) } else { err = d.addBatchDDLJobs2Queue(tasks) @@ -310,7 +311,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { if err != nil { logutil.BgLogger().Warn("[ddl] add DDL jobs failed", zap.String("jobs", jobs), zap.Error(err)) } else { - logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs)) + logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs), zap.Bool("table", toTable)) } } diff --git a/session/bootstrap.go b/session/bootstrap.go index 918103c669..0f5f4680d1 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -51,6 +51,7 @@ import ( utilparser "github.com/pingcap/tidb/util/parser" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/timeutil" + "go.etcd.io/etcd/client/v3/concurrency" "go.uber.org/zap" ) @@ -790,6 +791,9 @@ func upgrade(s Session) { // Only upgrade from under version92 and this TiDB is not owner set. // The owner in older tidb does not support concurrent DDL, we should add the internal DDL to job queue. if ver < version92 && !domain.GetDomain(s).DDL().OwnerManager().IsOwner() { + if err := waitOwner(context.Background(), domain.GetDomain(s)); err != nil { + logutil.BgLogger().Fatal("[Upgrade] upgrade failed", zap.Error(err)) + } // use another variable DDLForce2Queue but not EnableConcurrentDDL since in upgrade it may set global variable, the initial step will // overwrite variable EnableConcurrentDDL. variable.DDLForce2Queue.Store(true) @@ -830,6 +834,25 @@ func upgrade(s Session) { } } +// waitOwner is used to wait the DDL owner to be elected in the cluster. +func waitOwner(ctx context.Context, dom *domain.Domain) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + logutil.BgLogger().Info("Waiting for the DDL owner to be elected in the cluster") + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + _, err := dom.DDL().OwnerManager().GetOwnerID(ctx) + if err == concurrency.ErrElectionNoLeader { + continue + } + return err + } + } +} + // upgradeToVer2 updates to version 2. func upgradeToVer2(s Session, ver int64) { if ver >= version2 {