diff --git a/ddl/job_table.go b/ddl/job_table.go index 43a4b0dd15..9391437be1 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -171,14 +171,14 @@ func hasSysDB(job *model.Job) bool { func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRunnable bool, err error) { if d.stateSyncer.IsUpgradingState() { + if job.IsPaused() { + return false, nil + } // We need to turn the 'pausing' job to be 'paused' in ddl worker, // and stop the reorganization workers if job.IsPausing() || hasSysDB(job) { return true, nil } - if job.IsPaused() { - return false, nil - } var errs []error // During binary upgrade, pause all running DDL jobs errs, err = PauseJobsBySystem(sess.Session(), []int64{job.ID}) @@ -200,7 +200,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun return false, nil } - if job.IsPausedBySystem() && !hasSysDB(job) { + if job.IsPausedBySystem() { var errs []error errs, err = ResumeJobsBySystem(sess.Session(), []int64{job.ID}) if len(errs) > 0 && errs[0] != nil { @@ -551,8 +551,7 @@ func job2UniqueIDs(job *model.Job, schema bool) string { } func job2SchemaNames(job *model.Job) []string { - switch job.Type { - case model.ActionRenameTable: + if job.Type == model.ActionRenameTable { var oldSchemaID int64 var oldSchemaName model.CIStr var tableName model.CIStr @@ -562,11 +561,9 @@ func job2SchemaNames(job *model.Job) []string { names = append(names, strings.ToLower(job.SchemaName)) names = append(names, oldSchemaName.O) return names - case model.ActionRenameTables: - // TODO: Get this action's schema names. - case model.ActionExchangeTablePartition: - // TODO: Get this action's schema names. } + // TODO: consider about model.ActionRenameTables and model.ActionExchangeTablePartition, which need to get the schema names. + return []string{job.SchemaName} } diff --git a/session/bootstrap.go b/session/bootstrap.go index 57a2dac9ad..02585d0321 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -1168,7 +1168,8 @@ func upgrade(s Session) { } func syncUpgradeState(s Session) { - ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) + totalInterval := time.Duration(internalSQLTimeout) * time.Second + ctx, cancelFunc := context.WithTimeout(context.Background(), totalInterval) defer cancelFunc() dom := domain.GetDomain(s) err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading)) @@ -1176,8 +1177,8 @@ func syncUpgradeState(s Session) { logutil.BgLogger().Fatal("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err)) } - retryTimes := 10 interval := 200 * time.Millisecond + retryTimes := int(totalInterval / interval) for i := 0; i < retryTimes; i++ { op, err := owner.GetOwnerOpValue(ctx, dom.EtcdClient(), ddl.DDLOwnerKey, "upgrade bootstrap") if err == nil && op.String() == owner.OpGetUpgradingState.String() { @@ -1186,7 +1187,9 @@ func syncUpgradeState(s Session) { if i == retryTimes-1 { logutil.BgLogger().Fatal("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) } - logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) + if i%10 == 0 { + logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) + } time.Sleep(interval) } diff --git a/session/bootstraptest/BUILD.bazel b/session/bootstraptest/BUILD.bazel index 71e5966c7b..cfce2f5ec0 100644 --- a/session/bootstraptest/BUILD.bazel +++ b/session/bootstraptest/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 9, + shard_count = 10, deps = [ "//config", "//ddl", diff --git a/session/bootstraptest/bootstrap_upgrade_test.go b/session/bootstraptest/bootstrap_upgrade_test.go index 1acd560eb5..760342d01e 100644 --- a/session/bootstraptest/bootstrap_upgrade_test.go +++ b/session/bootstraptest/bootstrap_upgrade_test.go @@ -344,11 +344,15 @@ func TestUpgradeVersionForPausedJob(t *testing.T) { // Resume the DDL job, then add index operation can be executed successfully. session.MustExec(t, seLatestV, fmt.Sprintf("admin resume ddl jobs %d", jobID)) + checkDDLJobExecSucc(t, seLatestV, jobID) +} + +// checkDDLJobExecSucc is used to make sure the DDL operation is successful. +func checkDDLJobExecSucc(t *testing.T, se session.Session, jobID int64) { sql := fmt.Sprintf(" admin show ddl jobs where job_id=%d", jobID) - // Make sure the add index operation is successful. suc := false for i := 0; i < 20; i++ { - rows, err := execute(context.Background(), seLatestV, sql) + rows, err := execute(context.Background(), se, sql) require.NoError(t, err) require.Len(t, rows, 1) require.Equal(t, rows[0].GetString(2), "upgrade_tbl") @@ -363,6 +367,66 @@ func TestUpgradeVersionForPausedJob(t *testing.T) { require.True(t, suc) } +// TestUpgradeVersionForSystemPausedJob tests mock the first upgrade failed, and it has a mock system DDL in queue. +// Then we do re-upgrade(This operation will pause all DDL jobs by the system). +func TestUpgradeVersionForSystemPausedJob(t *testing.T) { + // Mock a general and a reorg job in boostrap. + *session.WithMockUpgrade = true + session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest + + store, dom := session.CreateStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + seV := session.CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(session.CurrentBootstrapVersion - 1) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.CurrentBootstrapVersion-1)) + session.UnsetStoreBootstrapped(store.UUID()) + ver, err := session.GetBootstrapVersion(seV) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion-1, ver) + + // Add a paused DDL job before upgrade. + session.MustExec(t, seV, "create table mysql.upgrade_tbl(a int)") + ch := make(chan struct{}) + var jobID int64 + hook := &callback.TestDDLCallback{} + hook.OnJobRunAfterExported = func(job *model.Job) { + if job.SchemaState == model.StateDeleteOnly { + se := session.CreateSessionAndSetID(t, store) + session.MustExec(t, se, fmt.Sprintf("admin pause ddl jobs %d", job.ID)) + } + if job.State == model.JobStatePaused && jobID == 0 { + // Mock pause the ddl job by system. + job.AdminOperator = model.AdminCommandBySystem + ch <- struct{}{} + jobID = job.ID + } + } + dom.DDL().SetHook(hook) + go func() { + _, err = execute(context.Background(), seV, "alter table mysql.upgrade_tbl add column b int") + }() + + <-ch + dom.Close() + // Make sure upgrade is successful. + domLatestV, err := session.BootstrapSession(store) + require.NoError(t, err) + defer domLatestV.Close() + seLatestV := session.CreateSessionAndSetID(t, store) + ver, err = session.GetBootstrapVersion(seLatestV) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion+1, ver) + + checkDDLJobExecSucc(t, seLatestV, jobID) +} + func TestUpgradeVersionForResumeJob(t *testing.T) { store, dom := session.CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() diff --git a/session/mock_bootstrap.go b/session/mock_bootstrap.go index df7f4f582d..cfad1f8fa2 100644 --- a/session/mock_bootstrap.go +++ b/session/mock_bootstrap.go @@ -112,6 +112,21 @@ func mockUpgradeToVerLatest(s Session, ver int64) { TestHook.OnBootstrapAfter(s) } +// mockSimpleUpgradeToVerLatest mocks a simple bootstrapVersion(make the test faster). +func mockSimpleUpgradeToVerLatest(s Session, ver int64) { + logutil.BgLogger().Info("mock upgrade to ver latest", zap.Int64("old ver", ver), zap.Int64("mock latest ver", mockLatestVer)) + if ver >= mockLatestVer { + return + } + mustExecute(s, "use mysql") + mustExecute(s, `create table if not exists mock_sys_t( + c1 int, c2 int, c3 int, c11 tinyint, index fk_c1(c1) + );`) + mustExecute(s, "alter table mock_sys_t add column mayNullCol bigint default 1") + mustExecute(s, "alter table mock_sys_t add index idx_c2(c2)") + TestHook.OnBootstrapAfter(s) +} + // TestHook is exported for testing. var TestHook = TestCallback{} @@ -140,13 +155,26 @@ func modifyBootstrapVersionForTest(store kv.Storage, ver int64) int64 { return ver } +const ( + defaultMockUpgradeToVerLatest = 0 + // MockSimpleUpgradeToVerLatest is used to indicate the use of the simple mock bootstrapVersion, this is just a few simple DDL operations. + MockSimpleUpgradeToVerLatest = 1 +) + +// MockUpgradeToVerLatestKind is used to indicate the use of different mock bootstrapVersion. +var MockUpgradeToVerLatestKind = defaultMockUpgradeToVerLatest + func addMockBootstrapVersionForTest(s Session) { if !*WithMockUpgrade { return } TestHook.OnBootstrapBefore(s) - bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest) + if MockUpgradeToVerLatestKind == defaultMockUpgradeToVerLatest { + bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest) + } else { + bootstrapVersion = append(bootstrapVersion, mockSimpleUpgradeToVerLatest) + } currentBootstrapVersion++ }