169 lines
4.9 KiB
Go
169 lines
4.9 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package proto
|
|
|
|
import (
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/docker/go-units"
|
|
)
|
|
|
|
// see doc.go for more details.
|
|
const (
|
|
SubtaskStatePending SubtaskState = "pending"
|
|
SubtaskStateRunning SubtaskState = "running"
|
|
SubtaskStateSucceed SubtaskState = "succeed"
|
|
SubtaskStateFailed SubtaskState = "failed"
|
|
SubtaskStateCanceled SubtaskState = "canceled"
|
|
SubtaskStatePaused SubtaskState = "paused"
|
|
)
|
|
|
|
type (
|
|
// SubtaskState is the state of subtask.
|
|
SubtaskState string
|
|
)
|
|
|
|
func (s SubtaskState) String() string {
|
|
return string(s)
|
|
}
|
|
|
|
// SubtaskBase contains the basic information of a subtask.
|
|
// we define this to avoid load subtask meta which might be very large into memory.
|
|
type SubtaskBase struct {
|
|
ID int64
|
|
Step Step
|
|
Type TaskType
|
|
// taken from task_key of the subtask table
|
|
TaskID int64
|
|
State SubtaskState
|
|
// Concurrency is the concurrency of the subtask.
|
|
// it's initialized as the task's required slots, and it's NOT used now.
|
|
// if the required slot of task is modified, the concurrency of un-finished
|
|
// subtasks of the task will be updated too.
|
|
// some subtasks like post-process of import into, don't consume too many resources,
|
|
// can lower this value, can use this field to implement such feature later.
|
|
Concurrency int
|
|
// ExecID is the ID of target executor, right now it's the same as instance_id,
|
|
// its value is IP:PORT, see GenerateExecID
|
|
ExecID string
|
|
CreateTime time.Time
|
|
// StartTime is the time when the subtask is started.
|
|
// it's 0 if it hasn't started yet.
|
|
StartTime time.Time
|
|
// Ordinal is the ordinal of subtask, should be unique for some task and step.
|
|
// starts from 1.
|
|
Ordinal int
|
|
}
|
|
|
|
func (t *SubtaskBase) String() string {
|
|
return fmt.Sprintf("[ID=%d, Step=%d, Type=%s, TaskID=%d, State=%s, ExecID=%s]",
|
|
t.ID, t.Step, t.Type, t.TaskID, t.State, t.ExecID)
|
|
}
|
|
|
|
// IsDone checks if the subtask is done.
|
|
func (t *SubtaskBase) IsDone() bool {
|
|
return t.State == SubtaskStateSucceed || t.State == SubtaskStateCanceled ||
|
|
t.State == SubtaskStateFailed
|
|
}
|
|
|
|
// Subtask represents the subtask of distribute framework.
|
|
// subtasks of a task are run in parallel on different nodes, but on each node,
|
|
// at most 1 subtask can be run at the same time, see StepExecutor too.
|
|
type Subtask struct {
|
|
SubtaskBase
|
|
// UpdateTime is the time when the subtask is updated.
|
|
// it can be used as subtask end time if the subtask is finished.
|
|
// it's 0 if it hasn't started yet.
|
|
UpdateTime time.Time
|
|
// Meta is the metadata of subtask, should not be nil.
|
|
// meta of different subtasks of same step must be different too.
|
|
// NOTE: this field can be changed by StepExecutor.OnFinished method, to store
|
|
// some result, and framework will update the subtask meta in the storage.
|
|
// On other code path, this field should be read-only.
|
|
Meta []byte
|
|
Summary string
|
|
}
|
|
|
|
// NewSubtask create a new subtask.
|
|
func NewSubtask(step Step, taskID int64, tp TaskType, execID string, concurrency int, meta []byte, ordinal int) *Subtask {
|
|
s := &Subtask{
|
|
SubtaskBase: SubtaskBase{
|
|
Step: step,
|
|
Type: tp,
|
|
TaskID: taskID,
|
|
ExecID: execID,
|
|
Concurrency: concurrency,
|
|
Ordinal: ordinal,
|
|
},
|
|
Meta: meta,
|
|
}
|
|
return s
|
|
}
|
|
|
|
// Allocatable is a resource with capacity that can be allocated, it's routine safe.
|
|
type Allocatable struct {
|
|
capacity int64
|
|
used atomic.Int64
|
|
}
|
|
|
|
// NewAllocatable creates a new Allocatable.
|
|
func NewAllocatable(capacity int64) *Allocatable {
|
|
return &Allocatable{capacity: capacity}
|
|
}
|
|
|
|
// Capacity returns the capacity of the Allocatable.
|
|
func (a *Allocatable) Capacity() int64 {
|
|
return a.capacity
|
|
}
|
|
|
|
// Used returns the used resource of the Allocatable.
|
|
func (a *Allocatable) Used() int64 {
|
|
return a.used.Load()
|
|
}
|
|
|
|
// Alloc allocates v from the Allocatable.
|
|
func (a *Allocatable) Alloc(n int64) bool {
|
|
for {
|
|
used := a.used.Load()
|
|
if used+n > a.capacity {
|
|
return false
|
|
}
|
|
if a.used.CompareAndSwap(used, used+n) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
// Free frees v from the Allocatable.
|
|
func (a *Allocatable) Free(n int64) {
|
|
a.used.Add(-n)
|
|
}
|
|
|
|
// StepResource is the max resource that a task step can use.
|
|
// it's also the max resource that a subtask can use, as we run subtasks of task
|
|
// step in sequence.
|
|
type StepResource struct {
|
|
CPU *Allocatable
|
|
Mem *Allocatable
|
|
}
|
|
|
|
// String implements Stringer interface.
|
|
func (s *StepResource) String() string {
|
|
return fmt.Sprintf("[CPU=%d, Mem=%s]", s.CPU.Capacity(),
|
|
units.BytesSize(float64(s.Mem.Capacity())))
|
|
}
|