diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 05005bb390..7e844cbe0e 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/bindinfo" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/expression" @@ -1309,6 +1310,47 @@ var ( SupportUpgradeHTTPOpVer int64 = version174 ) +func checkDistTask(s sessiontypes.Session) { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) + rs, err := s.ExecuteInternal(ctx, "SELECT HIGH_PRIORITY variable_value from mysql.global_variables where variable_name = %?;", variable.TiDBEnableDistTask) + if err != nil { + logutil.BgLogger().Fatal("check dist task failed, getting tidb_enable_dist_task failed", zap.Error(err)) + } + defer terror.Call(rs.Close) + req := rs.NewChunk(nil) + err = rs.Next(ctx, req) + if err != nil { + logutil.BgLogger().Fatal("check dist task failed, getting tidb_enable_dist_task failed", zap.Error(err)) + } + if req.NumRows() == 0 { + // Not set yet. + return + } else if req.GetRow(0).GetString(0) == variable.On { + logutil.BgLogger().Fatal("check dist task failed, tidb_enable_dist_task is enabled", zap.Error(err)) + } + + // Even if the variable is set to `off`, we still need to check the tidb_global_task. + rs2, err := s.ExecuteInternal(ctx, `SELECT id FROM %n.%n WHERE state not in (%?, %?, %?)`, + mysql.SystemDB, + "tidb_global_task", + proto.TaskStateSucceed, + proto.TaskStateFailed, + proto.TaskStateReverted, + ) + if err != nil { + logutil.BgLogger().Fatal("check dist task failed, reading tidb_global_task failed", zap.Error(err)) + } + defer terror.Call(rs2.Close) + req = rs2.NewChunk(nil) + err = rs2.Next(ctx, req) + if err != nil { + logutil.BgLogger().Fatal("check dist task failed, reading tidb_global_task failed", zap.Error(err)) + } + if req.NumRows() > 0 { + logutil.BgLogger().Fatal("check dist task failed, some distributed tasks is still running", zap.Error(err)) + } +} + // upgrade function will do some upgrade works, when the system is bootstrapped by low version TiDB server // For example, add new system variables into mysql.global_variables table. func upgrade(s sessiontypes.Session) { @@ -1318,6 +1360,8 @@ func upgrade(s sessiontypes.Session) { // It is already bootstrapped/upgraded by a higher version TiDB server. return } + + checkDistTask(s) printClusterState(s, ver) // Only upgrade from under version92 and this TiDB is not owner set. diff --git a/pkg/session/bootstrap_test.go b/pkg/session/bootstrap_test.go index 66dd4fafb6..eb3e5a32a7 100644 --- a/pkg/session/bootstrap_test.go +++ b/pkg/session/bootstrap_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/bindinfo" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain" @@ -36,6 +37,8 @@ import ( "github.com/pingcap/tidb/pkg/store/mockstore" tbctximpl "github.com/pingcap/tidb/pkg/table/contextimpl" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // This test file have many problem. @@ -2185,3 +2188,79 @@ func TestTiDBUpgradeToVer179(t *testing.T) { dom.Close() } + +func testTiDBUpgradeWithDistTask(t *testing.T, injectQuery string, fatal bool) { + store, _ := CreateStoreAndBootstrap(t) + defer func() { + require.NoError(t, store.Close()) + }() + ver178 := version178 + seV178 := CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(int64(ver178)) + require.NoError(t, err) + MustExec(t, seV178, fmt.Sprintf("update mysql.tidb set variable_value=%d where variable_name='tidb_server_version'", ver178)) + MustExec(t, seV178, injectQuery) + err = txn.Commit(context.Background()) + require.NoError(t, err) + + unsetStoreBootstrapped(store.UUID()) + ver, err := getBootstrapVersion(seV178) + require.NoError(t, err) + require.Equal(t, int64(ver178), ver) + + conf := new(log.Config) + lg, p, e := log.InitLogger(conf, zap.WithFatalHook(zapcore.WriteThenPanic)) + require.NoError(t, e) + rs := log.ReplaceGlobals(lg, p) + defer func() { + rs() + }() + + var dom *domain.Domain + + fatal2panic := false + fc := func() { + defer func() { + if err := recover(); err != nil { + fatal2panic = true + } + }() + dom, _ = BootstrapSession(store) + } + fc() + + if fatal { + dom = domain.GetDomain(seV178) + } + dom.Close() + require.Equal(t, fatal, fatal2panic) +} + +func TestTiDBUpgradeWithDistTaskEnable(t *testing.T) { + t.Run("test enable dist task", func(t *testing.T) { testTiDBUpgradeWithDistTask(t, "set global tidb_enable_dist_task = 1", true) }) + t.Run("test disable dist task", func(t *testing.T) { testTiDBUpgradeWithDistTask(t, "set global tidb_enable_dist_task = 0", false) }) +} + +func TestTiDBUpgradeWithDistTaskRunning(t *testing.T) { + t.Run("test dist task running", func(t *testing.T) { + testTiDBUpgradeWithDistTask(t, "insert into mysql.tidb_global_task set id = 1, task_key = 'aaa', type= 'aaa', state = 'running'", true) + }) + t.Run("test dist task succeed", func(t *testing.T) { + testTiDBUpgradeWithDistTask(t, "insert into mysql.tidb_global_task set id = 1, task_key = 'aaa', type= 'aaa', state = 'succeed'", false) + }) + t.Run("test dist task failed", func(t *testing.T) { + testTiDBUpgradeWithDistTask(t, "insert into mysql.tidb_global_task set id = 1, task_key = 'aaa', type= 'aaa', state = 'failed'", false) + }) + t.Run("test dist task reverted", func(t *testing.T) { + testTiDBUpgradeWithDistTask(t, "insert into mysql.tidb_global_task set id = 1, task_key = 'aaa', type= 'aaa', state = 'reverted'", false) + }) + t.Run("test dist task paused", func(t *testing.T) { + testTiDBUpgradeWithDistTask(t, "insert into mysql.tidb_global_task set id = 1, task_key = 'aaa', type= 'aaa', state = 'paused'", true) + }) + t.Run("test dist task other", func(t *testing.T) { + testTiDBUpgradeWithDistTask(t, "insert into mysql.tidb_global_task set id = 1, task_key = 'aaa', type= 'aaa', state = 'other'", true) + }) +}