ddl, session: fix re-upgrade issues (#44469)
* ddl, session: fix re-upgrade issues
This commit is contained in:
@ -171,14 +171,14 @@ func hasSysDB(job *model.Job) bool {
|
||||
|
||||
func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRunnable bool, err error) {
|
||||
if d.stateSyncer.IsUpgradingState() {
|
||||
if job.IsPaused() {
|
||||
return false, nil
|
||||
}
|
||||
// We need to turn the 'pausing' job to be 'paused' in ddl worker,
|
||||
// and stop the reorganization workers
|
||||
if job.IsPausing() || hasSysDB(job) {
|
||||
return true, nil
|
||||
}
|
||||
if job.IsPaused() {
|
||||
return false, nil
|
||||
}
|
||||
var errs []error
|
||||
// During binary upgrade, pause all running DDL jobs
|
||||
errs, err = PauseJobsBySystem(sess.Session(), []int64{job.ID})
|
||||
@ -200,7 +200,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if job.IsPausedBySystem() && !hasSysDB(job) {
|
||||
if job.IsPausedBySystem() {
|
||||
var errs []error
|
||||
errs, err = ResumeJobsBySystem(sess.Session(), []int64{job.ID})
|
||||
if len(errs) > 0 && errs[0] != nil {
|
||||
@ -551,8 +551,7 @@ func job2UniqueIDs(job *model.Job, schema bool) string {
|
||||
}
|
||||
|
||||
func job2SchemaNames(job *model.Job) []string {
|
||||
switch job.Type {
|
||||
case model.ActionRenameTable:
|
||||
if job.Type == model.ActionRenameTable {
|
||||
var oldSchemaID int64
|
||||
var oldSchemaName model.CIStr
|
||||
var tableName model.CIStr
|
||||
@ -562,11 +561,9 @@ func job2SchemaNames(job *model.Job) []string {
|
||||
names = append(names, strings.ToLower(job.SchemaName))
|
||||
names = append(names, oldSchemaName.O)
|
||||
return names
|
||||
case model.ActionRenameTables:
|
||||
// TODO: Get this action's schema names.
|
||||
case model.ActionExchangeTablePartition:
|
||||
// TODO: Get this action's schema names.
|
||||
}
|
||||
// TODO: consider about model.ActionRenameTables and model.ActionExchangeTablePartition, which need to get the schema names.
|
||||
|
||||
return []string{job.SchemaName}
|
||||
}
|
||||
|
||||
|
||||
@ -1168,7 +1168,8 @@ func upgrade(s Session) {
|
||||
}
|
||||
|
||||
func syncUpgradeState(s Session) {
|
||||
ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
totalInterval := time.Duration(internalSQLTimeout) * time.Second
|
||||
ctx, cancelFunc := context.WithTimeout(context.Background(), totalInterval)
|
||||
defer cancelFunc()
|
||||
dom := domain.GetDomain(s)
|
||||
err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading))
|
||||
@ -1176,8 +1177,8 @@ func syncUpgradeState(s Session) {
|
||||
logutil.BgLogger().Fatal("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err))
|
||||
}
|
||||
|
||||
retryTimes := 10
|
||||
interval := 200 * time.Millisecond
|
||||
retryTimes := int(totalInterval / interval)
|
||||
for i := 0; i < retryTimes; i++ {
|
||||
op, err := owner.GetOwnerOpValue(ctx, dom.EtcdClient(), ddl.DDLOwnerKey, "upgrade bootstrap")
|
||||
if err == nil && op.String() == owner.OpGetUpgradingState.String() {
|
||||
@ -1186,7 +1187,9 @@ func syncUpgradeState(s Session) {
|
||||
if i == retryTimes-1 {
|
||||
logutil.BgLogger().Fatal("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
|
||||
}
|
||||
logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
|
||||
if i%10 == 0 {
|
||||
logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
|
||||
}
|
||||
time.Sleep(interval)
|
||||
}
|
||||
|
||||
|
||||
@ -8,7 +8,7 @@ go_test(
|
||||
"main_test.go",
|
||||
],
|
||||
flaky = True,
|
||||
shard_count = 9,
|
||||
shard_count = 10,
|
||||
deps = [
|
||||
"//config",
|
||||
"//ddl",
|
||||
|
||||
@ -344,11 +344,15 @@ func TestUpgradeVersionForPausedJob(t *testing.T) {
|
||||
|
||||
// Resume the DDL job, then add index operation can be executed successfully.
|
||||
session.MustExec(t, seLatestV, fmt.Sprintf("admin resume ddl jobs %d", jobID))
|
||||
checkDDLJobExecSucc(t, seLatestV, jobID)
|
||||
}
|
||||
|
||||
// checkDDLJobExecSucc is used to make sure the DDL operation is successful.
|
||||
func checkDDLJobExecSucc(t *testing.T, se session.Session, jobID int64) {
|
||||
sql := fmt.Sprintf(" admin show ddl jobs where job_id=%d", jobID)
|
||||
// Make sure the add index operation is successful.
|
||||
suc := false
|
||||
for i := 0; i < 20; i++ {
|
||||
rows, err := execute(context.Background(), seLatestV, sql)
|
||||
rows, err := execute(context.Background(), se, sql)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, rows, 1)
|
||||
require.Equal(t, rows[0].GetString(2), "upgrade_tbl")
|
||||
@ -363,6 +367,66 @@ func TestUpgradeVersionForPausedJob(t *testing.T) {
|
||||
require.True(t, suc)
|
||||
}
|
||||
|
||||
// TestUpgradeVersionForSystemPausedJob tests mock the first upgrade failed, and it has a mock system DDL in queue.
|
||||
// Then we do re-upgrade(This operation will pause all DDL jobs by the system).
|
||||
func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
|
||||
// Mock a general and a reorg job in boostrap.
|
||||
*session.WithMockUpgrade = true
|
||||
session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest
|
||||
|
||||
store, dom := session.CreateStoreAndBootstrap(t)
|
||||
defer func() { require.NoError(t, store.Close()) }()
|
||||
|
||||
seV := session.CreateSessionAndSetID(t, store)
|
||||
txn, err := store.Begin()
|
||||
require.NoError(t, err)
|
||||
m := meta.NewMeta(txn)
|
||||
err = m.FinishBootstrap(session.CurrentBootstrapVersion - 1)
|
||||
require.NoError(t, err)
|
||||
err = txn.Commit(context.Background())
|
||||
require.NoError(t, err)
|
||||
session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.CurrentBootstrapVersion-1))
|
||||
session.UnsetStoreBootstrapped(store.UUID())
|
||||
ver, err := session.GetBootstrapVersion(seV)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, session.CurrentBootstrapVersion-1, ver)
|
||||
|
||||
// Add a paused DDL job before upgrade.
|
||||
session.MustExec(t, seV, "create table mysql.upgrade_tbl(a int)")
|
||||
ch := make(chan struct{})
|
||||
var jobID int64
|
||||
hook := &callback.TestDDLCallback{}
|
||||
hook.OnJobRunAfterExported = func(job *model.Job) {
|
||||
if job.SchemaState == model.StateDeleteOnly {
|
||||
se := session.CreateSessionAndSetID(t, store)
|
||||
session.MustExec(t, se, fmt.Sprintf("admin pause ddl jobs %d", job.ID))
|
||||
}
|
||||
if job.State == model.JobStatePaused && jobID == 0 {
|
||||
// Mock pause the ddl job by system.
|
||||
job.AdminOperator = model.AdminCommandBySystem
|
||||
ch <- struct{}{}
|
||||
jobID = job.ID
|
||||
}
|
||||
}
|
||||
dom.DDL().SetHook(hook)
|
||||
go func() {
|
||||
_, err = execute(context.Background(), seV, "alter table mysql.upgrade_tbl add column b int")
|
||||
}()
|
||||
|
||||
<-ch
|
||||
dom.Close()
|
||||
// Make sure upgrade is successful.
|
||||
domLatestV, err := session.BootstrapSession(store)
|
||||
require.NoError(t, err)
|
||||
defer domLatestV.Close()
|
||||
seLatestV := session.CreateSessionAndSetID(t, store)
|
||||
ver, err = session.GetBootstrapVersion(seLatestV)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, session.CurrentBootstrapVersion+1, ver)
|
||||
|
||||
checkDDLJobExecSucc(t, seLatestV, jobID)
|
||||
}
|
||||
|
||||
func TestUpgradeVersionForResumeJob(t *testing.T) {
|
||||
store, dom := session.CreateStoreAndBootstrap(t)
|
||||
defer func() { require.NoError(t, store.Close()) }()
|
||||
|
||||
@ -112,6 +112,21 @@ func mockUpgradeToVerLatest(s Session, ver int64) {
|
||||
TestHook.OnBootstrapAfter(s)
|
||||
}
|
||||
|
||||
// mockSimpleUpgradeToVerLatest mocks a simple bootstrapVersion(make the test faster).
|
||||
func mockSimpleUpgradeToVerLatest(s Session, ver int64) {
|
||||
logutil.BgLogger().Info("mock upgrade to ver latest", zap.Int64("old ver", ver), zap.Int64("mock latest ver", mockLatestVer))
|
||||
if ver >= mockLatestVer {
|
||||
return
|
||||
}
|
||||
mustExecute(s, "use mysql")
|
||||
mustExecute(s, `create table if not exists mock_sys_t(
|
||||
c1 int, c2 int, c3 int, c11 tinyint, index fk_c1(c1)
|
||||
);`)
|
||||
mustExecute(s, "alter table mock_sys_t add column mayNullCol bigint default 1")
|
||||
mustExecute(s, "alter table mock_sys_t add index idx_c2(c2)")
|
||||
TestHook.OnBootstrapAfter(s)
|
||||
}
|
||||
|
||||
// TestHook is exported for testing.
|
||||
var TestHook = TestCallback{}
|
||||
|
||||
@ -140,13 +155,26 @@ func modifyBootstrapVersionForTest(store kv.Storage, ver int64) int64 {
|
||||
return ver
|
||||
}
|
||||
|
||||
const (
|
||||
defaultMockUpgradeToVerLatest = 0
|
||||
// MockSimpleUpgradeToVerLatest is used to indicate the use of the simple mock bootstrapVersion, this is just a few simple DDL operations.
|
||||
MockSimpleUpgradeToVerLatest = 1
|
||||
)
|
||||
|
||||
// MockUpgradeToVerLatestKind is used to indicate the use of different mock bootstrapVersion.
|
||||
var MockUpgradeToVerLatestKind = defaultMockUpgradeToVerLatest
|
||||
|
||||
func addMockBootstrapVersionForTest(s Session) {
|
||||
if !*WithMockUpgrade {
|
||||
return
|
||||
}
|
||||
|
||||
TestHook.OnBootstrapBefore(s)
|
||||
bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest)
|
||||
if MockUpgradeToVerLatestKind == defaultMockUpgradeToVerLatest {
|
||||
bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest)
|
||||
} else {
|
||||
bootstrapVersion = append(bootstrapVersion, mockSimpleUpgradeToVerLatest)
|
||||
}
|
||||
currentBootstrapVersion++
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user