199 lines
7.3 KiB
Go
199 lines
7.3 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package proto
|
|
|
|
import (
|
|
"cmp"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// see doc.go for more details.
|
|
const (
|
|
TaskStatePending TaskState = "pending"
|
|
TaskStateRunning TaskState = "running"
|
|
TaskStateSucceed TaskState = "succeed"
|
|
TaskStateFailed TaskState = "failed"
|
|
TaskStateReverting TaskState = "reverting"
|
|
TaskStateAwaitingResolution TaskState = "awaiting-resolution"
|
|
TaskStateReverted TaskState = "reverted"
|
|
TaskStateCancelling TaskState = "cancelling"
|
|
TaskStatePausing TaskState = "pausing"
|
|
TaskStatePaused TaskState = "paused"
|
|
TaskStateResuming TaskState = "resuming"
|
|
TaskStateModifying TaskState = "modifying"
|
|
)
|
|
|
|
type (
|
|
// TaskState is the state of task.
|
|
TaskState string
|
|
// TaskType is the type of task.
|
|
TaskType string
|
|
)
|
|
|
|
func (t TaskType) String() string {
|
|
return string(t)
|
|
}
|
|
|
|
func (s TaskState) String() string {
|
|
return string(s)
|
|
}
|
|
|
|
// CanMoveToModifying checks if current state can move to 'modifying' state.
|
|
func (s TaskState) CanMoveToModifying() bool {
|
|
return s == TaskStatePending || s == TaskStateRunning || s == TaskStatePaused
|
|
}
|
|
|
|
const (
|
|
// TaskIDLabelName is the label name of task id.
|
|
TaskIDLabelName = "task_id"
|
|
// NormalPriority represents the normal priority of task.
|
|
NormalPriority = 512
|
|
)
|
|
|
|
// MaxConcurrentTask is the max concurrency of task.
|
|
// TODO: remove this limit later.
|
|
var MaxConcurrentTask = 16
|
|
|
|
// ExtraParams is the extra params of task.
|
|
// Note: only store params that's not used for filter or sort in this struct.
|
|
type ExtraParams struct {
|
|
// ManualRecovery indicates whether the task can be recovered manually.
|
|
// if enabled, the task will enter 'awaiting-resolution' state when it failed,
|
|
// then the user can recover the task manually or fail it if it's not recoverable.
|
|
ManualRecovery bool `json:"manual_recovery,omitempty"`
|
|
// MaxRuntimeSlots is the max slots when running subtasks of this task in
|
|
// TargetSteps steps.
|
|
// normally it's 0, means we will use the RequiredSlots to run the subtasks.
|
|
// if set, we will use the min of RequiredSlots and MaxRuntimeSlots as the
|
|
// execution effective slots of the task step defined in TargetSteps.
|
|
// this field is used to workaround OOM issue where TiDB might repeatedly
|
|
// restart. the DXF framework won't detect changes in this field, so it's not
|
|
// part of normal schedule workflow, when TiDB restarts the newest value will
|
|
// be used.
|
|
// RequiredSlots might be modified, and MaxRuntimeSlots is not touched in this
|
|
// case due to above reason, so MaxRuntimeSlots might > RequiredSlots.
|
|
MaxRuntimeSlots int `json:"max_runtime_slots,omitempty"`
|
|
// TargetSteps indicates the steps that MaxRuntimeSlots takes effect.
|
|
// if empty or nil, MaxRuntimeSlots takes effect in all steps.
|
|
// normally OOM only happens in some specific steps, so we can just limit the
|
|
// concurrency in those steps to reduce the impact on the overall performance.
|
|
TargetSteps []Step `json:"target_steps,omitempty"`
|
|
}
|
|
|
|
// TaskBase contains the basic information of a task.
|
|
// we define this to avoid load task meta which might be very large into memory.
|
|
type TaskBase struct {
|
|
ID int64
|
|
Key string
|
|
Type TaskType
|
|
State TaskState
|
|
Step Step
|
|
// Priority is the priority of task, the smaller value means the higher priority.
|
|
// valid range is [1, 1024], default is NormalPriority.
|
|
Priority int
|
|
// RequiredSlots is the required slots of the task.
|
|
// we use this field to allocate slots when scheduling and creating the task
|
|
// executor, but the effective slots when running the task is determined by
|
|
// GetRuntimeSlots.
|
|
// in normal case, they are the same. but when meeting OOM and TiDB repeatedly
|
|
// restarts, we might set a lower MaxRuntimeSlots in ExtraParams, then the
|
|
// effective slots is smaller than RequiredSlots.
|
|
// Note: in application layer, don't use this field directly, use GetRuntimeSlots
|
|
// or GetResource of step executor instead.
|
|
// Note: in the system table, we store it inside 'concurrency' column as
|
|
// required slots is introduced later.
|
|
RequiredSlots int
|
|
// TargetScope indicates that the task should be running on tidb nodes which
|
|
// contain the tidb_service_scope=TargetScope label.
|
|
// To be compatible with previous version, if it's "" or "background", the
|
|
// task try run on nodes of "background" scope,
|
|
// if there is no such nodes, will try nodes of "" scope.
|
|
TargetScope string
|
|
CreateTime time.Time
|
|
MaxNodeCount int
|
|
ExtraParams ExtraParams
|
|
// keyspace name is the keyspace that the task belongs to.
|
|
// it's only useful for nextgen cluster.
|
|
Keyspace string
|
|
}
|
|
|
|
// IsDone checks if the task is done.
|
|
func (t *TaskBase) IsDone() bool {
|
|
return t.State == TaskStateSucceed || t.State == TaskStateReverted ||
|
|
t.State == TaskStateFailed
|
|
}
|
|
|
|
// CompareTask a wrapper of Compare.
|
|
func (t *TaskBase) CompareTask(other *Task) int {
|
|
return t.Compare(&other.TaskBase)
|
|
}
|
|
|
|
// Compare compares two tasks by task rank.
|
|
// returns < 0 represents rank of t is higher than 'other'.
|
|
func (t *TaskBase) Compare(other *TaskBase) int {
|
|
if r := cmp.Compare(t.Priority, other.Priority); r != 0 {
|
|
return r
|
|
}
|
|
if r := t.CreateTime.Compare(other.CreateTime); r != 0 {
|
|
return r
|
|
}
|
|
return cmp.Compare(t.ID, other.ID)
|
|
}
|
|
|
|
// GetRuntimeSlots gets the runtime slots of current task step.
|
|
// application layer might use this as the concurrency of the task step.
|
|
func (t *TaskBase) GetRuntimeSlots() int {
|
|
if t.ExtraParams.MaxRuntimeSlots > 0 {
|
|
if len(t.ExtraParams.TargetSteps) == 0 {
|
|
return min(t.ExtraParams.MaxRuntimeSlots, t.RequiredSlots)
|
|
}
|
|
for _, step := range t.ExtraParams.TargetSteps {
|
|
if step == t.Step {
|
|
return min(t.ExtraParams.MaxRuntimeSlots, t.RequiredSlots)
|
|
}
|
|
}
|
|
}
|
|
return t.RequiredSlots
|
|
}
|
|
|
|
// String implements fmt.Stringer interface.
|
|
func (t *TaskBase) String() string {
|
|
return fmt.Sprintf("{id: %d, key: %s, type: %s, state: %s, step: %s, priority: %d, required slots: %d, target scope: %s, create time: %s}",
|
|
t.ID, t.Key, t.Type, t.State, Step2Str(t.Type, t.Step), t.Priority, t.RequiredSlots, t.TargetScope, t.CreateTime.Format(time.RFC3339Nano))
|
|
}
|
|
|
|
// Task represents the task of distributed framework, see doc.go for more details.
|
|
type Task struct {
|
|
TaskBase
|
|
// SchedulerID is not used now.
|
|
SchedulerID string
|
|
StartTime time.Time
|
|
StateUpdateTime time.Time
|
|
// Meta is the metadata of task, it's read-only in most cases, but it can be
|
|
// changed in below case, and framework will update the task meta in the storage.
|
|
// - task switches to next step in Scheduler.OnNextSubtasksBatch
|
|
// - on task cleanup, we might do some redaction on the meta.
|
|
// - on task 'modifying', params inside the meta can be changed.
|
|
Meta []byte
|
|
Error error
|
|
ModifyParam ModifyParam
|
|
}
|
|
|
|
var (
|
|
// EmptyMeta is the empty meta of task/subtask.
|
|
EmptyMeta = []byte("{}")
|
|
)
|