Files
tidb/pkg/resourcemanager/poolmanager/task_manager.go

123 lines
2.9 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package poolmanager
import (
"sync"
"time"
"go.uber.org/atomic"
)
const shard int = 8
func getShardID(id uint64) uint64 {
return id % uint64(shard)
}
// Meta is a container that can control or watch the task in the pool.
type Meta struct {
createTS time.Time
exitCh chan struct{}
taskCh chan func()
taskID uint64
running atomic.Int32
initialConcurrency int32
}
// NewMeta create a new meta.
func NewMeta(taskID uint64, exitCh chan struct{}, taskCh chan func(), concurrency int32) *Meta {
s := &Meta{
createTS: time.Now(),
initialConcurrency: concurrency,
taskID: taskID,
exitCh: exitCh,
taskCh: taskCh,
}
return s
}
// TaskID is to get the task id.
func (m *Meta) TaskID() uint64 {
return m.taskID
}
// IncTask is to add running task count.
func (m *Meta) IncTask() {
m.running.Add(1)
}
// DecTask is to minus running task count.
func (m *Meta) DecTask() {
m.running.Add(-1)
}
// GetTaskCh is to get the task channel.
func (m *Meta) GetTaskCh() chan func() {
return m.taskCh
}
// GetExitCh is to get the exit channel.
func (m *Meta) GetExitCh() chan struct{} {
return m.exitCh
}
// TaskStatusContainer is a container that can control or watch the pool.
type TaskStatusContainer struct {
stats map[uint64]*Meta
rw sync.RWMutex
}
// TaskManager is a manager that can control or watch the pool.
type TaskManager struct {
task []TaskStatusContainer
concurrency int32
}
// NewTaskManager create a new pool task manager.
func NewTaskManager(c int32) TaskManager {
task := make([]TaskStatusContainer, shard)
for i := range shard {
task[i] = TaskStatusContainer{
stats: make(map[uint64]*Meta),
}
}
return TaskManager{
task: task,
concurrency: c,
}
}
// RegisterTask register a task to the manager.
func (t *TaskManager) RegisterTask(task *Meta) {
id := getShardID(task.taskID)
t.task[id].rw.Lock()
t.task[id].stats[task.taskID] = task
t.task[id].rw.Unlock()
}
// DeleteTask delete a task from the manager.
func (t *TaskManager) DeleteTask(taskID uint64) {
shardID := getShardID(taskID)
t.task[shardID].rw.Lock()
delete(t.task[shardID].stats, taskID)
t.task[shardID].rw.Unlock()
}
// GetOriginConcurrency return the concurrency of the pool at the init.
func (t *TaskManager) GetOriginConcurrency() int32 {
return t.concurrency
}