@ -547,6 +547,8 @@ type Instance struct {
|
||||
MaxConnections uint32 `toml:"max_connections" json:"max_connections"`
|
||||
TiDBEnableDDL AtomicBool `toml:"tidb_enable_ddl" json:"tidb_enable_ddl"`
|
||||
TiDBRCReadCheckTS bool `toml:"tidb_rc_read_check_ts" json:"tidb_rc_read_check_ts"`
|
||||
// TiDBServiceScope indicates the role for tidb for distributed task framework.
|
||||
TiDBServiceScope string `toml:"tidb_service_scope" json:"tidb_service_scope"`
|
||||
}
|
||||
|
||||
func (l *Log) getDisableTimestamp() bool {
|
||||
@ -957,6 +959,7 @@ var defaultConf = Config{
|
||||
MaxConnections: 0,
|
||||
TiDBEnableDDL: *NewAtomicBool(true),
|
||||
TiDBRCReadCheckTS: false,
|
||||
TiDBServiceScope: "",
|
||||
},
|
||||
Status: Status{
|
||||
ReportStatus: true,
|
||||
|
||||
@ -11,7 +11,7 @@ go_test(
|
||||
],
|
||||
flaky = True,
|
||||
race = "off",
|
||||
shard_count = 23,
|
||||
shard_count = 24,
|
||||
deps = [
|
||||
"//disttask/framework/dispatcher",
|
||||
"//disttask/framework/mock",
|
||||
|
||||
@ -405,6 +405,14 @@ func (d *BaseDispatcher) dispatchSubTask(task *proto.Task, metas [][]byte) error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// 4. filter by role.
|
||||
serverNodes, err = d.filterByRole(serverNodes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logutil.Logger(d.logCtx).Info("eligible instances", zap.Int("num", len(serverNodes)))
|
||||
|
||||
if len(serverNodes) == 0 {
|
||||
return errors.New("no available TiDB node to dispatch subtasks")
|
||||
}
|
||||
@ -456,6 +464,30 @@ func GenerateSchedulerNodes(ctx context.Context) (serverNodes []*infosync.Server
|
||||
return serverNodes, nil
|
||||
}
|
||||
|
||||
func (d *BaseDispatcher) filterByRole(infos []*infosync.ServerInfo) ([]*infosync.ServerInfo, error) {
|
||||
nodes, err := d.taskMgr.GetNodesByRole("background")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(nodes) == 0 {
|
||||
nodes, err = d.taskMgr.GetNodesByRole("")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := make([]*infosync.ServerInfo, 0, len(nodes))
|
||||
for _, info := range infos {
|
||||
_, ok := nodes[disttaskutil.GenerateExecID(info.IP, info.Port)]
|
||||
if ok {
|
||||
res = append(res, info)
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// GetAllSchedulerIDs gets all the scheduler IDs.
|
||||
func (d *BaseDispatcher) GetAllSchedulerIDs(ctx context.Context, task *proto.Task) ([]string, error) {
|
||||
serverInfos, err := d.GetEligibleInstances(ctx, task)
|
||||
|
||||
@ -226,6 +226,9 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool, isCancel bool) {
|
||||
dispatcher.DefaultDispatchConcurrency = originalConcurrency
|
||||
}
|
||||
}()
|
||||
|
||||
require.NoError(t, mgr.StartManager(":4000", "background"))
|
||||
|
||||
// 3s
|
||||
cnt := 60
|
||||
checkGetRunningTaskCnt := func(expected int) {
|
||||
|
||||
@ -586,6 +586,34 @@ func TestSchedulerDownManyNodes(t *testing.T) {
|
||||
distContext.Close()
|
||||
}
|
||||
|
||||
func TestFrameworkSetLabel(t *testing.T) {
|
||||
var m sync.Map
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{})
|
||||
distContext := testkit.NewDistExecutionContext(t, 3)
|
||||
tk := testkit.NewTestKit(t, distContext.Store)
|
||||
// 1. all "" role.
|
||||
DispatchTaskAndCheckSuccess("😁", t, &m)
|
||||
// 2. one "background" role.
|
||||
tk.MustExec("set global tidb_service_scope=background")
|
||||
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background"))
|
||||
tk.MustQuery("select @@tidb_service_scope").Check(testkit.Rows("background"))
|
||||
DispatchTaskAndCheckSuccess("😊", t, &m)
|
||||
// 3. 2 "background" role.
|
||||
tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"")
|
||||
DispatchTaskAndCheckSuccess("😆", t, &m)
|
||||
|
||||
// 4. set wrong sys var.
|
||||
tk.MustMatchErrMsg("set global tidb_service_scope=wrong", `incorrect value: .*. tidb_service_scope options: "", background`)
|
||||
|
||||
// 5. set keyspace id.
|
||||
tk.MustExec("update mysql.dist_framework_meta set keyspace_id = 16777216 where host = \":4001\"")
|
||||
tk.MustQuery("select keyspace_id from mysql.dist_framework_meta where host = \":4001\"").Check(testkit.Rows("16777216"))
|
||||
|
||||
distContext.Close()
|
||||
}
|
||||
|
||||
func TestMultiTasks(t *testing.T) {
|
||||
defer dispatcher.ClearDispatcherFactory()
|
||||
defer scheduler.ClearSchedulers()
|
||||
|
||||
@ -139,6 +139,20 @@ func (mr *MockTaskTableMockRecorder) IsSchedulerCanceled(arg0, arg1 interface{})
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSchedulerCanceled", reflect.TypeOf((*MockTaskTable)(nil).IsSchedulerCanceled), arg0, arg1)
|
||||
}
|
||||
|
||||
// StartManager mocks base method.
|
||||
func (m *MockTaskTable) StartManager(arg0, arg1 string) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StartManager", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// StartManager indicates an expected call of StartManager.
|
||||
func (mr *MockTaskTableMockRecorder) StartManager(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartManager", reflect.TypeOf((*MockTaskTable)(nil).StartManager), arg0, arg1)
|
||||
}
|
||||
|
||||
// StartSubtask mocks base method.
|
||||
func (m *MockTaskTable) StartSubtask(arg0 int64) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
||||
@ -11,6 +11,7 @@ go_library(
|
||||
importpath = "github.com/pingcap/tidb/disttask/framework/scheduler",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//config",
|
||||
"//disttask/framework/proto",
|
||||
"//disttask/framework/scheduler/execute",
|
||||
"//disttask/framework/storage",
|
||||
|
||||
@ -27,9 +27,11 @@ type TaskTable interface {
|
||||
GetGlobalTaskByID(taskID int64) (task *proto.Task, err error)
|
||||
|
||||
GetSubtaskInStates(instanceID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error)
|
||||
StartManager(tidbID string, role string) error
|
||||
StartSubtask(subtaskID int64) error
|
||||
UpdateSubtaskStateAndError(subtaskID int64, state string, err error) error
|
||||
FinishSubtask(subtaskID int64, meta []byte) error
|
||||
|
||||
HasSubtasksInStates(instanceID string, taskID int64, step int64, states ...interface{}) (bool, error)
|
||||
UpdateErrorToSubtask(instanceID string, taskID int64, err error) error
|
||||
IsSchedulerCanceled(taskID int64, instanceID string) (bool, error)
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/config"
|
||||
"github.com/pingcap/tidb/disttask/framework/proto"
|
||||
"github.com/pingcap/tidb/domain/infosync"
|
||||
"github.com/pingcap/tidb/resourcemanager/pool/spool"
|
||||
@ -34,7 +35,9 @@ var (
|
||||
schedulerPoolSize int32 = 4
|
||||
subtaskExecutorPoolSize int32 = 10
|
||||
// same as dispatcher
|
||||
checkTime = 300 * time.Millisecond
|
||||
checkTime = 300 * time.Millisecond
|
||||
retrySQLTimes = 3
|
||||
retrySQLInterval = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
// ManagerBuilder is used to build a Manager.
|
||||
@ -111,8 +114,24 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
|
||||
}
|
||||
|
||||
// Start starts the Manager.
|
||||
func (m *Manager) Start() {
|
||||
func (m *Manager) Start() error {
|
||||
logutil.Logger(m.logCtx).Debug("manager start")
|
||||
var err error
|
||||
for i := 0; i < retrySQLTimes; i++ {
|
||||
err = m.taskTable.StartManager(m.id, config.GetGlobalConfig().Instance.TiDBServiceScope)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if i%10 == 0 {
|
||||
logutil.Logger(m.logCtx).Warn("start manager failed", zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope),
|
||||
zap.Int("retry times", retrySQLTimes), zap.Error(err))
|
||||
}
|
||||
time.Sleep(retrySQLInterval)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.wg.Add(1)
|
||||
go func() {
|
||||
defer m.wg.Done()
|
||||
@ -124,6 +143,7 @@ func (m *Manager) Start() {
|
||||
defer m.wg.Done()
|
||||
m.fetchAndFastCancelTasks(m.ctx)
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the Manager.
|
||||
|
||||
@ -177,7 +177,7 @@ func TestManager(t *testing.T) {
|
||||
taskID2 := int64(2)
|
||||
task1 := &proto.Task{ID: taskID1, State: proto.TaskStateRunning, Step: proto.StepOne, Type: "type"}
|
||||
task2 := &proto.Task{ID: taskID2, State: proto.TaskStateReverting, Step: proto.StepOne, Type: "type"}
|
||||
|
||||
mockTaskTable.EXPECT().StartManager("test", "").Return(nil).Times(1)
|
||||
mockTaskTable.EXPECT().GetGlobalTasksInStates(proto.TaskStateRunning, proto.TaskStateReverting).
|
||||
Return([]*proto.Task{task1, task2}, nil).AnyTimes()
|
||||
mockTaskTable.EXPECT().GetGlobalTasksInStates(proto.TaskStateReverting).
|
||||
@ -215,7 +215,7 @@ func TestManager(t *testing.T) {
|
||||
}).Times(2)
|
||||
m, err := b.BuildManager(context.Background(), id, mockTaskTable)
|
||||
require.NoError(t, err)
|
||||
m.Start()
|
||||
require.NoError(t, m.Start())
|
||||
time.Sleep(5 * time.Second)
|
||||
m.Stop()
|
||||
}
|
||||
|
||||
@ -27,7 +27,7 @@ go_test(
|
||||
srcs = ["table_test.go"],
|
||||
flaky = True,
|
||||
race = "on",
|
||||
shard_count = 3,
|
||||
shard_count = 4,
|
||||
deps = [
|
||||
":storage",
|
||||
"//disttask/framework/proto",
|
||||
|
||||
@ -416,3 +416,33 @@ func TestBothGlobalAndSubTaskTable(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(0), cnt)
|
||||
}
|
||||
|
||||
func TestDistFrameworkMeta(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
pool := pools.NewResourcePool(func() (pools.Resource, error) {
|
||||
return tk.Session(), nil
|
||||
}, 1, 1, time.Second)
|
||||
defer pool.Close()
|
||||
sm := storage.NewTaskManager(context.Background(), pool)
|
||||
|
||||
storage.SetTaskManager(sm)
|
||||
sm, err := storage.GetTaskManager()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, sm.StartManager(":4000", "background"))
|
||||
require.NoError(t, sm.StartManager(":4001", ""))
|
||||
require.NoError(t, sm.StartManager(":4002", "background"))
|
||||
nodes, err := sm.GetNodesByRole("background")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, map[string]bool{
|
||||
":4000": true,
|
||||
":4002": true,
|
||||
}, nodes)
|
||||
|
||||
nodes, err = sm.GetNodesByRole("")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, map[string]bool{
|
||||
":4001": true,
|
||||
}, nodes)
|
||||
}
|
||||
|
||||
@ -477,6 +477,13 @@ func (stm *TaskManager) StartSubtask(subtaskID int64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// StartManager insert the manager information into dist_framework_meta.
|
||||
func (stm *TaskManager) StartManager(tidbID string, role string) error {
|
||||
_, err := stm.executeSQLWithNewSession(stm.ctx, `insert into mysql.dist_framework_meta values(%?, %?, DEFAULT)
|
||||
on duplicate key update role = %?`, tidbID, role, role)
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateSubtaskStateAndError updates the subtask state.
|
||||
func (stm *TaskManager) UpdateSubtaskStateAndError(id int64, state string, subTaskErr error) error {
|
||||
_, err := stm.executeSQLWithNewSession(stm.ctx, `update mysql.tidb_background_subtask
|
||||
@ -661,7 +668,7 @@ func serializeErr(err error) []byte {
|
||||
return errBytes
|
||||
}
|
||||
|
||||
// CancelGlobalTask cancels global task
|
||||
// CancelGlobalTask cancels global task.
|
||||
func (stm *TaskManager) CancelGlobalTask(taskID int64) error {
|
||||
_, err := stm.executeSQLWithNewSession(stm.ctx, "update mysql.tidb_global_task set state=%? where id=%? and state in (%?, %?)",
|
||||
proto.TaskStateCancelling, taskID, proto.TaskStatePending, proto.TaskStateRunning,
|
||||
@ -669,14 +676,14 @@ func (stm *TaskManager) CancelGlobalTask(taskID int64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// CancelGlobalTaskByKeySession cancels global task by key using input session
|
||||
// CancelGlobalTaskByKeySession cancels global task by key using input session.
|
||||
func (stm *TaskManager) CancelGlobalTaskByKeySession(se sessionctx.Context, taskKey string) error {
|
||||
_, err := ExecSQL(stm.ctx, se, "update mysql.tidb_global_task set state=%? where task_key=%? and state in (%?, %?)",
|
||||
proto.TaskStateCancelling, taskKey, proto.TaskStatePending, proto.TaskStateRunning)
|
||||
return err
|
||||
}
|
||||
|
||||
// IsGlobalTaskCancelling checks whether the task state is cancelling
|
||||
// IsGlobalTaskCancelling checks whether the task state is cancelling.
|
||||
func (stm *TaskManager) IsGlobalTaskCancelling(taskID int64) (bool, error) {
|
||||
rs, err := stm.executeSQLWithNewSession(stm.ctx, "select 1 from mysql.tidb_global_task where id=%? and state = %?",
|
||||
taskID, proto.TaskStateCancelling,
|
||||
@ -689,7 +696,7 @@ func (stm *TaskManager) IsGlobalTaskCancelling(taskID int64) (bool, error) {
|
||||
return len(rs) > 0, nil
|
||||
}
|
||||
|
||||
// GetSubtasksByStep gets subtasks of global task by step
|
||||
// GetSubtasksByStep gets subtasks of global task by step.
|
||||
func (stm *TaskManager) GetSubtasksByStep(taskID, step int64) ([]*proto.Subtask, error) {
|
||||
rs, err := stm.executeSQLWithNewSession(stm.ctx,
|
||||
"select * from mysql.tidb_background_subtask where task_key = %? and step = %?",
|
||||
@ -706,3 +713,17 @@ func (stm *TaskManager) GetSubtasksByStep(taskID, step int64) ([]*proto.Subtask,
|
||||
}
|
||||
return subtasks, nil
|
||||
}
|
||||
|
||||
// GetNodesByRole gets nodes map from dist_framework_meta by role.
|
||||
func (stm *TaskManager) GetNodesByRole(role string) (map[string]bool, error) {
|
||||
rs, err := stm.executeSQLWithNewSession(stm.ctx,
|
||||
"select host from mysql.dist_framework_meta where role = %?", role)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodes := make(map[string]bool, len(rs))
|
||||
for _, r := range rs {
|
||||
nodes[r.GetString(0)] = true
|
||||
}
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
@ -1470,7 +1470,11 @@ func (do *Domain) InitDistTaskLoop(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storage.TaskManager, schedulerManager *scheduler.Manager, serverID string) {
|
||||
schedulerManager.Start()
|
||||
err := schedulerManager.Start()
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("dist task scheduler failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
logutil.BgLogger().Info("dist task scheduler started")
|
||||
defer func() {
|
||||
logutil.BgLogger().Info("stopping dist task scheduler")
|
||||
@ -1486,7 +1490,7 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag
|
||||
var err error
|
||||
dispatcherManager, err = dispatcher.NewManager(ctx, taskManager, serverID)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("failed to create a disttask dispatcher", zap.Error(err))
|
||||
logutil.BgLogger().Error("failed to create a dist task dispatcher", zap.Error(err))
|
||||
return
|
||||
}
|
||||
dispatcherManager.Start()
|
||||
|
||||
@ -192,6 +192,7 @@ go_library(
|
||||
"//util/dbterror/exeerrors",
|
||||
"//util/deadlockhistory",
|
||||
"//util/disk",
|
||||
"//util/disttask",
|
||||
"//util/etcd",
|
||||
"//util/execdetails",
|
||||
"//util/filter",
|
||||
|
||||
@ -373,7 +373,7 @@ func TestTableStorageStats(t *testing.T) {
|
||||
"test 2",
|
||||
))
|
||||
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
|
||||
result := 53
|
||||
result := 54
|
||||
require.Len(t, rows, result)
|
||||
|
||||
// More tests about the privileges.
|
||||
|
||||
@ -34,9 +34,11 @@ import (
|
||||
"github.com/pingcap/tidb/util/chunk"
|
||||
"github.com/pingcap/tidb/util/collate"
|
||||
"github.com/pingcap/tidb/util/dbterror/exeerrors"
|
||||
disttaskutil "github.com/pingcap/tidb/util/disttask"
|
||||
"github.com/pingcap/tidb/util/gcutil"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/pingcap/tidb/util/sem"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -160,6 +162,14 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres
|
||||
return nil
|
||||
})
|
||||
logutil.BgLogger().Info("set global var", zap.Uint64("conn", sessionVars.ConnectionID), zap.String("name", name), zap.String("val", valStr))
|
||||
if name == variable.TiDBServiceScope {
|
||||
dom := domain.GetDomain(e.Ctx())
|
||||
serverID := disttaskutil.GenerateSubtaskExecID(ctx, dom.DDL().GetID())
|
||||
_, err = e.Ctx().(sqlexec.SQLExecutor).ExecuteInternal(ctx,
|
||||
`update mysql.dist_framework_meta
|
||||
set role = %?
|
||||
where host = %?`, valStr, serverID)
|
||||
}
|
||||
return err
|
||||
}
|
||||
// Set session variable
|
||||
|
||||
@ -582,6 +582,11 @@ const (
|
||||
key(state),
|
||||
UNIQUE KEY task_key(task_key)
|
||||
);`
|
||||
// CreateDistFrameworkMeta create a system table that distributed task framework use to store meta information
|
||||
CreateDistFrameworkMeta = `CREATE TABLE IF NOT EXISTS mysql.dist_framework_meta (
|
||||
host VARCHAR(100) NOT NULL PRIMARY KEY,
|
||||
role VARCHAR(64),
|
||||
keyspace_id bigint(8) NOT NULL DEFAULT -1);`
|
||||
|
||||
// CreateLoadDataJobs is a table that LOAD DATA uses
|
||||
CreateLoadDataJobs = `CREATE TABLE IF NOT EXISTS mysql.load_data_jobs (
|
||||
@ -2841,6 +2846,8 @@ func doDDLWorks(s Session) {
|
||||
mustExecute(s, CreateTimers)
|
||||
// create runaway_watch done
|
||||
mustExecute(s, CreateDoneRunawayWatchTable)
|
||||
// create dist_framework_meta
|
||||
mustExecute(s, CreateDistFrameworkMeta)
|
||||
}
|
||||
|
||||
// doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap.
|
||||
@ -2962,7 +2969,6 @@ func doDMLWorks(s Session) {
|
||||
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES(%?, %?, "Bootstrap version. Do not delete.")`,
|
||||
mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion,
|
||||
)
|
||||
|
||||
writeSystemTZ(s)
|
||||
|
||||
writeNewCollationParameter(s, config.GetGlobalConfig().NewCollationsEnabledOnFirstBootstrap)
|
||||
|
||||
@ -45,6 +45,7 @@ go_library(
|
||||
"//util/collate",
|
||||
"//util/dbterror",
|
||||
"//util/disk",
|
||||
"//util/distrole",
|
||||
"//util/execdetails",
|
||||
"//util/gctuner",
|
||||
"//util/kvcache",
|
||||
|
||||
@ -40,6 +40,7 @@ import (
|
||||
_ "github.com/pingcap/tidb/types/parser_driver" // for parser driver
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/collate"
|
||||
distroleutil "github.com/pingcap/tidb/util/distrole"
|
||||
"github.com/pingcap/tidb/util/gctuner"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/pingcap/tidb/util/mathutil"
|
||||
@ -2831,6 +2832,23 @@ var defaultSysVars = []*SysVar{
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{Scope: ScopeInstance, Name: TiDBServiceScope, Value: "", Type: TypeStr,
|
||||
Validation: func(_ *SessionVars, normalizedValue string, originalValue string, _ ScopeFlag) (string, error) {
|
||||
_, ok := distroleutil.ToTiDBServiceScope(originalValue)
|
||||
if !ok {
|
||||
err := fmt.Errorf("incorrect value: `%s`. %s options: %s",
|
||||
originalValue,
|
||||
TiDBServiceScope, `"", background`)
|
||||
return normalizedValue, err
|
||||
}
|
||||
return normalizedValue, nil
|
||||
},
|
||||
SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
|
||||
ServiceScope.Store(strings.ToLower(s))
|
||||
return nil
|
||||
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
|
||||
return ServiceScope.Load(), nil
|
||||
}},
|
||||
}
|
||||
|
||||
func setTiFlashComputeDispatchPolicy(s *SessionVars, val string) error {
|
||||
|
||||
@ -1095,6 +1095,8 @@ const (
|
||||
TiDBSkipMissingPartitionStats = "tidb_skip_missing_partition_stats"
|
||||
// TiDBSessionAlias indicates the alias of a session which is used for tracing.
|
||||
TiDBSessionAlias = "tidb_session_alias"
|
||||
// TiDBServiceScope indicates the role for tidb for distributed task framework.
|
||||
TiDBServiceScope = "tidb_service_scope"
|
||||
)
|
||||
|
||||
// TiDB intentional limits
|
||||
@ -1497,6 +1499,7 @@ var (
|
||||
EnableResourceControl = atomic.NewBool(false)
|
||||
EnableCheckConstraint = atomic.NewBool(DefTiDBEnableCheckConstraint)
|
||||
SkipMissingPartitionStats = atomic.NewBool(DefTiDBSkipMissingPartitionStats)
|
||||
ServiceScope = atomic.NewString("")
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@ -40,6 +40,7 @@ go_library(
|
||||
"//util/cpuprofile",
|
||||
"//util/deadlockhistory",
|
||||
"//util/disk",
|
||||
"//util/distrole",
|
||||
"//util/domainutil",
|
||||
"//util/kvcache",
|
||||
"//util/logutil",
|
||||
|
||||
@ -64,6 +64,7 @@ import (
|
||||
"github.com/pingcap/tidb/util/cpuprofile"
|
||||
"github.com/pingcap/tidb/util/deadlockhistory"
|
||||
"github.com/pingcap/tidb/util/disk"
|
||||
distroleutil "github.com/pingcap/tidb/util/distrole"
|
||||
"github.com/pingcap/tidb/util/domainutil"
|
||||
"github.com/pingcap/tidb/util/kvcache"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
@ -129,6 +130,7 @@ const (
|
||||
nmInitializeSQLFile = "initialize-sql-file"
|
||||
nmDisconnectOnExpiredPassword = "disconnect-on-expired-password"
|
||||
nmKeyspaceName = "keyspace-name"
|
||||
nmTiDBServiceScope = "tidb-service-scope"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -179,6 +181,7 @@ var (
|
||||
initializeSQLFile = flag.String(nmInitializeSQLFile, "", "SQL file to execute on first bootstrap")
|
||||
disconnectOnExpiredPassword = flagBoolean(nmDisconnectOnExpiredPassword, true, "the server disconnects the client when the password is expired")
|
||||
keyspaceName = flag.String(nmKeyspaceName, "", "keyspace name.")
|
||||
serviceScope = flag.String(nmTiDBServiceScope, "", "tidb service scope")
|
||||
)
|
||||
|
||||
func main() {
|
||||
@ -592,6 +595,17 @@ func overrideConfig(cfg *config.Config) {
|
||||
if actualFlags[nmKeyspaceName] {
|
||||
cfg.KeyspaceName = *keyspaceName
|
||||
}
|
||||
|
||||
if actualFlags[nmTiDBServiceScope] {
|
||||
scope, ok := distroleutil.ToTiDBServiceScope(*serviceScope)
|
||||
if !ok {
|
||||
err := fmt.Errorf("incorrect value: `%s`. %s options: %s",
|
||||
*serviceScope,
|
||||
nmTiDBServiceScope, `"", background`)
|
||||
terror.MustNil(err)
|
||||
}
|
||||
cfg.Instance.TiDBServiceScope = scope
|
||||
}
|
||||
}
|
||||
|
||||
func setVersions() {
|
||||
@ -773,6 +787,10 @@ func setGlobalVars() {
|
||||
txninfo.Recorder.ResizeSummaries(cfg.TrxSummary.TransactionSummaryCapacity)
|
||||
txninfo.Recorder.SetMinDuration(time.Duration(cfg.TrxSummary.TransactionIDDigestMinDuration) * time.Millisecond)
|
||||
chunk.InitChunkAllocSize(cfg.TiDBMaxReuseChunk, cfg.TiDBMaxReuseColumn)
|
||||
|
||||
if len(cfg.Instance.TiDBServiceScope) > 0 {
|
||||
variable.ServiceScope.Store(strings.ToLower(cfg.Instance.TiDBServiceScope))
|
||||
}
|
||||
}
|
||||
|
||||
func setupLog() {
|
||||
|
||||
8
util/distrole/BUILD.bazel
Normal file
8
util/distrole/BUILD.bazel
Normal file
@ -0,0 +1,8 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "distrole",
|
||||
srcs = ["role.go"],
|
||||
importpath = "github.com/pingcap/tidb/util/distrole",
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
37
util/distrole/role.go
Normal file
37
util/distrole/role.go
Normal file
@ -0,0 +1,37 @@
|
||||
// Copyright 2023 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package distroleutil
|
||||
|
||||
import "strings"
|
||||
|
||||
const (
|
||||
// TiDBServiceScopeDefault indicates no role.
|
||||
TiDBServiceScopeDefault = ""
|
||||
// TiDBServiceScopeBackground indicates node can run background tasks in distributed execution framework.
|
||||
TiDBServiceScopeBackground = "background"
|
||||
)
|
||||
|
||||
// ToTiDBServiceScope returns the TiDBServiceScope from name
|
||||
func ToTiDBServiceScope(name string) (string, bool) {
|
||||
name = strings.ToLower(name)
|
||||
switch name {
|
||||
case TiDBServiceScopeDefault:
|
||||
return TiDBServiceScopeDefault, true
|
||||
case TiDBServiceScopeBackground:
|
||||
return TiDBServiceScopeBackground, true
|
||||
default:
|
||||
return "", false
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user