From 75fa8150da89aa40beedd72c4190fc9b9e9b071f Mon Sep 17 00:00:00 2001 From: EasonBall <592838129@qq.com> Date: Fri, 8 Sep 2023 12:29:13 +0800 Subject: [PATCH] disttask: add role to select instance (#46453) ref pingcap/tidb#46258 --- config/config.go | 3 ++ disttask/framework/BUILD.bazel | 2 +- disttask/framework/dispatcher/dispatcher.go | 32 ++++++++++++++++ .../framework/dispatcher/dispatcher_test.go | 3 ++ disttask/framework/framework_test.go | 28 ++++++++++++++ disttask/framework/mock/scheduler_mock.go | 14 +++++++ disttask/framework/scheduler/BUILD.bazel | 1 + disttask/framework/scheduler/interface.go | 2 + disttask/framework/scheduler/manager.go | 24 +++++++++++- disttask/framework/scheduler/manager_test.go | 4 +- disttask/framework/storage/BUILD.bazel | 2 +- disttask/framework/storage/table_test.go | 30 +++++++++++++++ disttask/framework/storage/task_table.go | 29 +++++++++++++-- domain/domain.go | 8 +++- executor/BUILD.bazel | 1 + executor/infoschema_cluster_table_test.go | 2 +- executor/set.go | 10 +++++ session/bootstrap.go | 8 +++- sessionctx/variable/BUILD.bazel | 1 + sessionctx/variable/sysvar.go | 18 +++++++++ sessionctx/variable/tidb_vars.go | 3 ++ tidb-server/BUILD.bazel | 1 + tidb-server/main.go | 18 +++++++++ util/distrole/BUILD.bazel | 8 ++++ util/distrole/role.go | 37 +++++++++++++++++++ 25 files changed, 275 insertions(+), 14 deletions(-) create mode 100644 util/distrole/BUILD.bazel create mode 100644 util/distrole/role.go diff --git a/config/config.go b/config/config.go index c130630b3f..200aa77746 100644 --- a/config/config.go +++ b/config/config.go @@ -547,6 +547,8 @@ type Instance struct { MaxConnections uint32 `toml:"max_connections" json:"max_connections"` TiDBEnableDDL AtomicBool `toml:"tidb_enable_ddl" json:"tidb_enable_ddl"` TiDBRCReadCheckTS bool `toml:"tidb_rc_read_check_ts" json:"tidb_rc_read_check_ts"` + // TiDBServiceScope indicates the role for tidb for distributed task framework. + TiDBServiceScope string `toml:"tidb_service_scope" json:"tidb_service_scope"` } func (l *Log) getDisableTimestamp() bool { @@ -957,6 +959,7 @@ var defaultConf = Config{ MaxConnections: 0, TiDBEnableDDL: *NewAtomicBool(true), TiDBRCReadCheckTS: false, + TiDBServiceScope: "", }, Status: Status{ ReportStatus: true, diff --git a/disttask/framework/BUILD.bazel b/disttask/framework/BUILD.bazel index 604119b4b2..9099913c05 100644 --- a/disttask/framework/BUILD.bazel +++ b/disttask/framework/BUILD.bazel @@ -11,7 +11,7 @@ go_test( ], flaky = True, race = "off", - shard_count = 23, + shard_count = 24, deps = [ "//disttask/framework/dispatcher", "//disttask/framework/mock", diff --git a/disttask/framework/dispatcher/dispatcher.go b/disttask/framework/dispatcher/dispatcher.go index 2e018330fe..3f166ef578 100644 --- a/disttask/framework/dispatcher/dispatcher.go +++ b/disttask/framework/dispatcher/dispatcher.go @@ -405,6 +405,14 @@ func (d *BaseDispatcher) dispatchSubTask(task *proto.Task, metas [][]byte) error if err != nil { return err } + // 4. filter by role. + serverNodes, err = d.filterByRole(serverNodes) + if err != nil { + return err + } + + logutil.Logger(d.logCtx).Info("eligible instances", zap.Int("num", len(serverNodes))) + if len(serverNodes) == 0 { return errors.New("no available TiDB node to dispatch subtasks") } @@ -456,6 +464,30 @@ func GenerateSchedulerNodes(ctx context.Context) (serverNodes []*infosync.Server return serverNodes, nil } +func (d *BaseDispatcher) filterByRole(infos []*infosync.ServerInfo) ([]*infosync.ServerInfo, error) { + nodes, err := d.taskMgr.GetNodesByRole("background") + if err != nil { + return nil, err + } + + if len(nodes) == 0 { + nodes, err = d.taskMgr.GetNodesByRole("") + } + + if err != nil { + return nil, err + } + + res := make([]*infosync.ServerInfo, 0, len(nodes)) + for _, info := range infos { + _, ok := nodes[disttaskutil.GenerateExecID(info.IP, info.Port)] + if ok { + res = append(res, info) + } + } + return res, nil +} + // GetAllSchedulerIDs gets all the scheduler IDs. func (d *BaseDispatcher) GetAllSchedulerIDs(ctx context.Context, task *proto.Task) ([]string, error) { serverInfos, err := d.GetEligibleInstances(ctx, task) diff --git a/disttask/framework/dispatcher/dispatcher_test.go b/disttask/framework/dispatcher/dispatcher_test.go index 15511cb919..38680b4e9a 100644 --- a/disttask/framework/dispatcher/dispatcher_test.go +++ b/disttask/framework/dispatcher/dispatcher_test.go @@ -226,6 +226,9 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool, isCancel bool) { dispatcher.DefaultDispatchConcurrency = originalConcurrency } }() + + require.NoError(t, mgr.StartManager(":4000", "background")) + // 3s cnt := 60 checkGetRunningTaskCnt := func(expected int) { diff --git a/disttask/framework/framework_test.go b/disttask/framework/framework_test.go index 5f2c0cb731..6aae0f6e41 100644 --- a/disttask/framework/framework_test.go +++ b/disttask/framework/framework_test.go @@ -586,6 +586,34 @@ func TestSchedulerDownManyNodes(t *testing.T) { distContext.Close() } +func TestFrameworkSetLabel(t *testing.T) { + var m sync.Map + ctrl := gomock.NewController(t) + defer ctrl.Finish() + RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{}) + distContext := testkit.NewDistExecutionContext(t, 3) + tk := testkit.NewTestKit(t, distContext.Store) + // 1. all "" role. + DispatchTaskAndCheckSuccess("😁", t, &m) + // 2. one "background" role. + tk.MustExec("set global tidb_service_scope=background") + tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background")) + tk.MustQuery("select @@tidb_service_scope").Check(testkit.Rows("background")) + DispatchTaskAndCheckSuccess("😊", t, &m) + // 3. 2 "background" role. + tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"") + DispatchTaskAndCheckSuccess("😆", t, &m) + + // 4. set wrong sys var. + tk.MustMatchErrMsg("set global tidb_service_scope=wrong", `incorrect value: .*. tidb_service_scope options: "", background`) + + // 5. set keyspace id. + tk.MustExec("update mysql.dist_framework_meta set keyspace_id = 16777216 where host = \":4001\"") + tk.MustQuery("select keyspace_id from mysql.dist_framework_meta where host = \":4001\"").Check(testkit.Rows("16777216")) + + distContext.Close() +} + func TestMultiTasks(t *testing.T) { defer dispatcher.ClearDispatcherFactory() defer scheduler.ClearSchedulers() diff --git a/disttask/framework/mock/scheduler_mock.go b/disttask/framework/mock/scheduler_mock.go index 10a86c22db..593a834e8f 100644 --- a/disttask/framework/mock/scheduler_mock.go +++ b/disttask/framework/mock/scheduler_mock.go @@ -139,6 +139,20 @@ func (mr *MockTaskTableMockRecorder) IsSchedulerCanceled(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSchedulerCanceled", reflect.TypeOf((*MockTaskTable)(nil).IsSchedulerCanceled), arg0, arg1) } +// StartManager mocks base method. +func (m *MockTaskTable) StartManager(arg0, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StartManager", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// StartManager indicates an expected call of StartManager. +func (mr *MockTaskTableMockRecorder) StartManager(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartManager", reflect.TypeOf((*MockTaskTable)(nil).StartManager), arg0, arg1) +} + // StartSubtask mocks base method. func (m *MockTaskTable) StartSubtask(arg0 int64) error { m.ctrl.T.Helper() diff --git a/disttask/framework/scheduler/BUILD.bazel b/disttask/framework/scheduler/BUILD.bazel index 585985c345..94c4d7af9a 100644 --- a/disttask/framework/scheduler/BUILD.bazel +++ b/disttask/framework/scheduler/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/pingcap/tidb/disttask/framework/scheduler", visibility = ["//visibility:public"], deps = [ + "//config", "//disttask/framework/proto", "//disttask/framework/scheduler/execute", "//disttask/framework/storage", diff --git a/disttask/framework/scheduler/interface.go b/disttask/framework/scheduler/interface.go index 13fbf6660f..0482de2a63 100644 --- a/disttask/framework/scheduler/interface.go +++ b/disttask/framework/scheduler/interface.go @@ -27,9 +27,11 @@ type TaskTable interface { GetGlobalTaskByID(taskID int64) (task *proto.Task, err error) GetSubtaskInStates(instanceID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error) + StartManager(tidbID string, role string) error StartSubtask(subtaskID int64) error UpdateSubtaskStateAndError(subtaskID int64, state string, err error) error FinishSubtask(subtaskID int64, meta []byte) error + HasSubtasksInStates(instanceID string, taskID int64, step int64, states ...interface{}) (bool, error) UpdateErrorToSubtask(instanceID string, taskID int64, err error) error IsSchedulerCanceled(taskID int64, instanceID string) (bool, error) diff --git a/disttask/framework/scheduler/manager.go b/disttask/framework/scheduler/manager.go index a47896435a..8546bac07f 100644 --- a/disttask/framework/scheduler/manager.go +++ b/disttask/framework/scheduler/manager.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/resourcemanager/pool/spool" @@ -34,7 +35,9 @@ var ( schedulerPoolSize int32 = 4 subtaskExecutorPoolSize int32 = 10 // same as dispatcher - checkTime = 300 * time.Millisecond + checkTime = 300 * time.Millisecond + retrySQLTimes = 3 + retrySQLInterval = 500 * time.Millisecond ) // ManagerBuilder is used to build a Manager. @@ -111,8 +114,24 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable } // Start starts the Manager. -func (m *Manager) Start() { +func (m *Manager) Start() error { logutil.Logger(m.logCtx).Debug("manager start") + var err error + for i := 0; i < retrySQLTimes; i++ { + err = m.taskTable.StartManager(m.id, config.GetGlobalConfig().Instance.TiDBServiceScope) + if err == nil { + break + } + if i%10 == 0 { + logutil.Logger(m.logCtx).Warn("start manager failed", zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope), + zap.Int("retry times", retrySQLTimes), zap.Error(err)) + } + time.Sleep(retrySQLInterval) + } + if err != nil { + return err + } + m.wg.Add(1) go func() { defer m.wg.Done() @@ -124,6 +143,7 @@ func (m *Manager) Start() { defer m.wg.Done() m.fetchAndFastCancelTasks(m.ctx) }() + return nil } // Stop stops the Manager. diff --git a/disttask/framework/scheduler/manager_test.go b/disttask/framework/scheduler/manager_test.go index 8c6f406e80..ac94536524 100644 --- a/disttask/framework/scheduler/manager_test.go +++ b/disttask/framework/scheduler/manager_test.go @@ -177,7 +177,7 @@ func TestManager(t *testing.T) { taskID2 := int64(2) task1 := &proto.Task{ID: taskID1, State: proto.TaskStateRunning, Step: proto.StepOne, Type: "type"} task2 := &proto.Task{ID: taskID2, State: proto.TaskStateReverting, Step: proto.StepOne, Type: "type"} - + mockTaskTable.EXPECT().StartManager("test", "").Return(nil).Times(1) mockTaskTable.EXPECT().GetGlobalTasksInStates(proto.TaskStateRunning, proto.TaskStateReverting). Return([]*proto.Task{task1, task2}, nil).AnyTimes() mockTaskTable.EXPECT().GetGlobalTasksInStates(proto.TaskStateReverting). @@ -215,7 +215,7 @@ func TestManager(t *testing.T) { }).Times(2) m, err := b.BuildManager(context.Background(), id, mockTaskTable) require.NoError(t, err) - m.Start() + require.NoError(t, m.Start()) time.Sleep(5 * time.Second) m.Stop() } diff --git a/disttask/framework/storage/BUILD.bazel b/disttask/framework/storage/BUILD.bazel index 5df3c504b8..a101545f4d 100644 --- a/disttask/framework/storage/BUILD.bazel +++ b/disttask/framework/storage/BUILD.bazel @@ -27,7 +27,7 @@ go_test( srcs = ["table_test.go"], flaky = True, race = "on", - shard_count = 3, + shard_count = 4, deps = [ ":storage", "//disttask/framework/proto", diff --git a/disttask/framework/storage/table_test.go b/disttask/framework/storage/table_test.go index 9b539354f6..006a24c2de 100644 --- a/disttask/framework/storage/table_test.go +++ b/disttask/framework/storage/table_test.go @@ -416,3 +416,33 @@ func TestBothGlobalAndSubTaskTable(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(0), cnt) } + +func TestDistFrameworkMeta(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + pool := pools.NewResourcePool(func() (pools.Resource, error) { + return tk.Session(), nil + }, 1, 1, time.Second) + defer pool.Close() + sm := storage.NewTaskManager(context.Background(), pool) + + storage.SetTaskManager(sm) + sm, err := storage.GetTaskManager() + require.NoError(t, err) + + require.NoError(t, sm.StartManager(":4000", "background")) + require.NoError(t, sm.StartManager(":4001", "")) + require.NoError(t, sm.StartManager(":4002", "background")) + nodes, err := sm.GetNodesByRole("background") + require.NoError(t, err) + require.Equal(t, map[string]bool{ + ":4000": true, + ":4002": true, + }, nodes) + + nodes, err = sm.GetNodesByRole("") + require.NoError(t, err) + require.Equal(t, map[string]bool{ + ":4001": true, + }, nodes) +} diff --git a/disttask/framework/storage/task_table.go b/disttask/framework/storage/task_table.go index 97a029b785..b2d8b58e6e 100644 --- a/disttask/framework/storage/task_table.go +++ b/disttask/framework/storage/task_table.go @@ -477,6 +477,13 @@ func (stm *TaskManager) StartSubtask(subtaskID int64) error { return err } +// StartManager insert the manager information into dist_framework_meta. +func (stm *TaskManager) StartManager(tidbID string, role string) error { + _, err := stm.executeSQLWithNewSession(stm.ctx, `insert into mysql.dist_framework_meta values(%?, %?, DEFAULT) + on duplicate key update role = %?`, tidbID, role, role) + return err +} + // UpdateSubtaskStateAndError updates the subtask state. func (stm *TaskManager) UpdateSubtaskStateAndError(id int64, state string, subTaskErr error) error { _, err := stm.executeSQLWithNewSession(stm.ctx, `update mysql.tidb_background_subtask @@ -661,7 +668,7 @@ func serializeErr(err error) []byte { return errBytes } -// CancelGlobalTask cancels global task +// CancelGlobalTask cancels global task. func (stm *TaskManager) CancelGlobalTask(taskID int64) error { _, err := stm.executeSQLWithNewSession(stm.ctx, "update mysql.tidb_global_task set state=%? where id=%? and state in (%?, %?)", proto.TaskStateCancelling, taskID, proto.TaskStatePending, proto.TaskStateRunning, @@ -669,14 +676,14 @@ func (stm *TaskManager) CancelGlobalTask(taskID int64) error { return err } -// CancelGlobalTaskByKeySession cancels global task by key using input session +// CancelGlobalTaskByKeySession cancels global task by key using input session. func (stm *TaskManager) CancelGlobalTaskByKeySession(se sessionctx.Context, taskKey string) error { _, err := ExecSQL(stm.ctx, se, "update mysql.tidb_global_task set state=%? where task_key=%? and state in (%?, %?)", proto.TaskStateCancelling, taskKey, proto.TaskStatePending, proto.TaskStateRunning) return err } -// IsGlobalTaskCancelling checks whether the task state is cancelling +// IsGlobalTaskCancelling checks whether the task state is cancelling. func (stm *TaskManager) IsGlobalTaskCancelling(taskID int64) (bool, error) { rs, err := stm.executeSQLWithNewSession(stm.ctx, "select 1 from mysql.tidb_global_task where id=%? and state = %?", taskID, proto.TaskStateCancelling, @@ -689,7 +696,7 @@ func (stm *TaskManager) IsGlobalTaskCancelling(taskID int64) (bool, error) { return len(rs) > 0, nil } -// GetSubtasksByStep gets subtasks of global task by step +// GetSubtasksByStep gets subtasks of global task by step. func (stm *TaskManager) GetSubtasksByStep(taskID, step int64) ([]*proto.Subtask, error) { rs, err := stm.executeSQLWithNewSession(stm.ctx, "select * from mysql.tidb_background_subtask where task_key = %? and step = %?", @@ -706,3 +713,17 @@ func (stm *TaskManager) GetSubtasksByStep(taskID, step int64) ([]*proto.Subtask, } return subtasks, nil } + +// GetNodesByRole gets nodes map from dist_framework_meta by role. +func (stm *TaskManager) GetNodesByRole(role string) (map[string]bool, error) { + rs, err := stm.executeSQLWithNewSession(stm.ctx, + "select host from mysql.dist_framework_meta where role = %?", role) + if err != nil { + return nil, err + } + nodes := make(map[string]bool, len(rs)) + for _, r := range rs { + nodes[r.GetString(0)] = true + } + return nodes, nil +} diff --git a/domain/domain.go b/domain/domain.go index e5587391b1..7bc754cdb6 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1470,7 +1470,11 @@ func (do *Domain) InitDistTaskLoop(ctx context.Context) error { } func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storage.TaskManager, schedulerManager *scheduler.Manager, serverID string) { - schedulerManager.Start() + err := schedulerManager.Start() + if err != nil { + logutil.BgLogger().Error("dist task scheduler failed", zap.Error(err)) + return + } logutil.BgLogger().Info("dist task scheduler started") defer func() { logutil.BgLogger().Info("stopping dist task scheduler") @@ -1486,7 +1490,7 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag var err error dispatcherManager, err = dispatcher.NewManager(ctx, taskManager, serverID) if err != nil { - logutil.BgLogger().Error("failed to create a disttask dispatcher", zap.Error(err)) + logutil.BgLogger().Error("failed to create a dist task dispatcher", zap.Error(err)) return } dispatcherManager.Start() diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 560b59af9e..2444e59e0b 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -192,6 +192,7 @@ go_library( "//util/dbterror/exeerrors", "//util/deadlockhistory", "//util/disk", + "//util/disttask", "//util/etcd", "//util/execdetails", "//util/filter", diff --git a/executor/infoschema_cluster_table_test.go b/executor/infoschema_cluster_table_test.go index 73e668ade0..1237ad2f39 100644 --- a/executor/infoschema_cluster_table_test.go +++ b/executor/infoschema_cluster_table_test.go @@ -373,7 +373,7 @@ func TestTableStorageStats(t *testing.T) { "test 2", )) rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows() - result := 53 + result := 54 require.Len(t, rows, result) // More tests about the privileges. diff --git a/executor/set.go b/executor/set.go index 72188f102c..fbc22f2319 100644 --- a/executor/set.go +++ b/executor/set.go @@ -34,9 +34,11 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/dbterror/exeerrors" + disttaskutil "github.com/pingcap/tidb/util/disttask" "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sem" + "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" ) @@ -160,6 +162,14 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres return nil }) logutil.BgLogger().Info("set global var", zap.Uint64("conn", sessionVars.ConnectionID), zap.String("name", name), zap.String("val", valStr)) + if name == variable.TiDBServiceScope { + dom := domain.GetDomain(e.Ctx()) + serverID := disttaskutil.GenerateSubtaskExecID(ctx, dom.DDL().GetID()) + _, err = e.Ctx().(sqlexec.SQLExecutor).ExecuteInternal(ctx, + `update mysql.dist_framework_meta + set role = %? + where host = %?`, valStr, serverID) + } return err } // Set session variable diff --git a/session/bootstrap.go b/session/bootstrap.go index 4c200adab7..ed690b1219 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -582,6 +582,11 @@ const ( key(state), UNIQUE KEY task_key(task_key) );` + // CreateDistFrameworkMeta create a system table that distributed task framework use to store meta information + CreateDistFrameworkMeta = `CREATE TABLE IF NOT EXISTS mysql.dist_framework_meta ( + host VARCHAR(100) NOT NULL PRIMARY KEY, + role VARCHAR(64), + keyspace_id bigint(8) NOT NULL DEFAULT -1);` // CreateLoadDataJobs is a table that LOAD DATA uses CreateLoadDataJobs = `CREATE TABLE IF NOT EXISTS mysql.load_data_jobs ( @@ -2841,6 +2846,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateTimers) // create runaway_watch done mustExecute(s, CreateDoneRunawayWatchTable) + // create dist_framework_meta + mustExecute(s, CreateDistFrameworkMeta) } // doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap. @@ -2962,7 +2969,6 @@ func doDMLWorks(s Session) { mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES(%?, %?, "Bootstrap version. Do not delete.")`, mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, ) - writeSystemTZ(s) writeNewCollationParameter(s, config.GetGlobalConfig().NewCollationsEnabledOnFirstBootstrap) diff --git a/sessionctx/variable/BUILD.bazel b/sessionctx/variable/BUILD.bazel index b655af627d..7b2073915b 100644 --- a/sessionctx/variable/BUILD.bazel +++ b/sessionctx/variable/BUILD.bazel @@ -45,6 +45,7 @@ go_library( "//util/collate", "//util/dbterror", "//util/disk", + "//util/distrole", "//util/execdetails", "//util/gctuner", "//util/kvcache", diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 3c7e1aa4e3..836cbfa0e3 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -40,6 +40,7 @@ import ( _ "github.com/pingcap/tidb/types/parser_driver" // for parser driver "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/collate" + distroleutil "github.com/pingcap/tidb/util/distrole" "github.com/pingcap/tidb/util/gctuner" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" @@ -2831,6 +2832,23 @@ var defaultSysVars = []*SysVar{ return nil }, }, + {Scope: ScopeInstance, Name: TiDBServiceScope, Value: "", Type: TypeStr, + Validation: func(_ *SessionVars, normalizedValue string, originalValue string, _ ScopeFlag) (string, error) { + _, ok := distroleutil.ToTiDBServiceScope(originalValue) + if !ok { + err := fmt.Errorf("incorrect value: `%s`. %s options: %s", + originalValue, + TiDBServiceScope, `"", background`) + return normalizedValue, err + } + return normalizedValue, nil + }, + SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + ServiceScope.Store(strings.ToLower(s)) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return ServiceScope.Load(), nil + }}, } func setTiFlashComputeDispatchPolicy(s *SessionVars, val string) error { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index b7eebdead8..02e41cc7c8 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1095,6 +1095,8 @@ const ( TiDBSkipMissingPartitionStats = "tidb_skip_missing_partition_stats" // TiDBSessionAlias indicates the alias of a session which is used for tracing. TiDBSessionAlias = "tidb_session_alias" + // TiDBServiceScope indicates the role for tidb for distributed task framework. + TiDBServiceScope = "tidb_service_scope" ) // TiDB intentional limits @@ -1497,6 +1499,7 @@ var ( EnableResourceControl = atomic.NewBool(false) EnableCheckConstraint = atomic.NewBool(DefTiDBEnableCheckConstraint) SkipMissingPartitionStats = atomic.NewBool(DefTiDBSkipMissingPartitionStats) + ServiceScope = atomic.NewString("") ) var ( diff --git a/tidb-server/BUILD.bazel b/tidb-server/BUILD.bazel index 33c9065fbb..213b5d82eb 100644 --- a/tidb-server/BUILD.bazel +++ b/tidb-server/BUILD.bazel @@ -40,6 +40,7 @@ go_library( "//util/cpuprofile", "//util/deadlockhistory", "//util/disk", + "//util/distrole", "//util/domainutil", "//util/kvcache", "//util/logutil", diff --git a/tidb-server/main.go b/tidb-server/main.go index 031d6e1475..bef6972431 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -64,6 +64,7 @@ import ( "github.com/pingcap/tidb/util/cpuprofile" "github.com/pingcap/tidb/util/deadlockhistory" "github.com/pingcap/tidb/util/disk" + distroleutil "github.com/pingcap/tidb/util/distrole" "github.com/pingcap/tidb/util/domainutil" "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/logutil" @@ -129,6 +130,7 @@ const ( nmInitializeSQLFile = "initialize-sql-file" nmDisconnectOnExpiredPassword = "disconnect-on-expired-password" nmKeyspaceName = "keyspace-name" + nmTiDBServiceScope = "tidb-service-scope" ) var ( @@ -179,6 +181,7 @@ var ( initializeSQLFile = flag.String(nmInitializeSQLFile, "", "SQL file to execute on first bootstrap") disconnectOnExpiredPassword = flagBoolean(nmDisconnectOnExpiredPassword, true, "the server disconnects the client when the password is expired") keyspaceName = flag.String(nmKeyspaceName, "", "keyspace name.") + serviceScope = flag.String(nmTiDBServiceScope, "", "tidb service scope") ) func main() { @@ -592,6 +595,17 @@ func overrideConfig(cfg *config.Config) { if actualFlags[nmKeyspaceName] { cfg.KeyspaceName = *keyspaceName } + + if actualFlags[nmTiDBServiceScope] { + scope, ok := distroleutil.ToTiDBServiceScope(*serviceScope) + if !ok { + err := fmt.Errorf("incorrect value: `%s`. %s options: %s", + *serviceScope, + nmTiDBServiceScope, `"", background`) + terror.MustNil(err) + } + cfg.Instance.TiDBServiceScope = scope + } } func setVersions() { @@ -773,6 +787,10 @@ func setGlobalVars() { txninfo.Recorder.ResizeSummaries(cfg.TrxSummary.TransactionSummaryCapacity) txninfo.Recorder.SetMinDuration(time.Duration(cfg.TrxSummary.TransactionIDDigestMinDuration) * time.Millisecond) chunk.InitChunkAllocSize(cfg.TiDBMaxReuseChunk, cfg.TiDBMaxReuseColumn) + + if len(cfg.Instance.TiDBServiceScope) > 0 { + variable.ServiceScope.Store(strings.ToLower(cfg.Instance.TiDBServiceScope)) + } } func setupLog() { diff --git a/util/distrole/BUILD.bazel b/util/distrole/BUILD.bazel new file mode 100644 index 0000000000..6282e64685 --- /dev/null +++ b/util/distrole/BUILD.bazel @@ -0,0 +1,8 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "distrole", + srcs = ["role.go"], + importpath = "github.com/pingcap/tidb/util/distrole", + visibility = ["//visibility:public"], +) diff --git a/util/distrole/role.go b/util/distrole/role.go new file mode 100644 index 0000000000..643580220c --- /dev/null +++ b/util/distrole/role.go @@ -0,0 +1,37 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package distroleutil + +import "strings" + +const ( + // TiDBServiceScopeDefault indicates no role. + TiDBServiceScopeDefault = "" + // TiDBServiceScopeBackground indicates node can run background tasks in distributed execution framework. + TiDBServiceScopeBackground = "background" +) + +// ToTiDBServiceScope returns the TiDBServiceScope from name +func ToTiDBServiceScope(name string) (string, bool) { + name = strings.ToLower(name) + switch name { + case TiDBServiceScopeDefault: + return TiDBServiceScopeDefault, true + case TiDBServiceScopeBackground: + return TiDBServiceScopeBackground, true + default: + return "", false + } +}