123 lines
2.9 KiB
Go
123 lines
2.9 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package poolmanager
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/atomic"
|
|
)
|
|
|
|
const shard int = 8
|
|
|
|
func getShardID(id uint64) uint64 {
|
|
return id % uint64(shard)
|
|
}
|
|
|
|
// Meta is a container that can control or watch the task in the pool.
|
|
type Meta struct {
|
|
createTS time.Time
|
|
exitCh chan struct{}
|
|
taskCh chan func()
|
|
taskID uint64
|
|
running atomic.Int32
|
|
initialConcurrency int32
|
|
}
|
|
|
|
// NewMeta create a new meta.
|
|
func NewMeta(taskID uint64, exitCh chan struct{}, taskCh chan func(), concurrency int32) *Meta {
|
|
s := &Meta{
|
|
createTS: time.Now(),
|
|
initialConcurrency: concurrency,
|
|
taskID: taskID,
|
|
exitCh: exitCh,
|
|
taskCh: taskCh,
|
|
}
|
|
return s
|
|
}
|
|
|
|
// TaskID is to get the task id.
|
|
func (m *Meta) TaskID() uint64 {
|
|
return m.taskID
|
|
}
|
|
|
|
// IncTask is to add running task count.
|
|
func (m *Meta) IncTask() {
|
|
m.running.Add(1)
|
|
}
|
|
|
|
// DecTask is to minus running task count.
|
|
func (m *Meta) DecTask() {
|
|
m.running.Add(-1)
|
|
}
|
|
|
|
// GetTaskCh is to get the task channel.
|
|
func (m *Meta) GetTaskCh() chan func() {
|
|
return m.taskCh
|
|
}
|
|
|
|
// GetExitCh is to get the exit channel.
|
|
func (m *Meta) GetExitCh() chan struct{} {
|
|
return m.exitCh
|
|
}
|
|
|
|
// TaskStatusContainer is a container that can control or watch the pool.
|
|
type TaskStatusContainer struct {
|
|
stats map[uint64]*Meta
|
|
rw sync.RWMutex
|
|
}
|
|
|
|
// TaskManager is a manager that can control or watch the pool.
|
|
type TaskManager struct {
|
|
task []TaskStatusContainer
|
|
concurrency int32
|
|
}
|
|
|
|
// NewTaskManager create a new pool task manager.
|
|
func NewTaskManager(c int32) TaskManager {
|
|
task := make([]TaskStatusContainer, shard)
|
|
for i := range shard {
|
|
task[i] = TaskStatusContainer{
|
|
stats: make(map[uint64]*Meta),
|
|
}
|
|
}
|
|
return TaskManager{
|
|
task: task,
|
|
concurrency: c,
|
|
}
|
|
}
|
|
|
|
// RegisterTask register a task to the manager.
|
|
func (t *TaskManager) RegisterTask(task *Meta) {
|
|
id := getShardID(task.taskID)
|
|
t.task[id].rw.Lock()
|
|
t.task[id].stats[task.taskID] = task
|
|
t.task[id].rw.Unlock()
|
|
}
|
|
|
|
// DeleteTask delete a task from the manager.
|
|
func (t *TaskManager) DeleteTask(taskID uint64) {
|
|
shardID := getShardID(taskID)
|
|
t.task[shardID].rw.Lock()
|
|
delete(t.task[shardID].stats, taskID)
|
|
t.task[shardID].rw.Unlock()
|
|
}
|
|
|
|
// GetOriginConcurrency return the concurrency of the pool at the init.
|
|
func (t *TaskManager) GetOriginConcurrency() int32 {
|
|
return t.concurrency
|
|
}
|