Files
tidb/pkg/dxf/framework/storage/nodes.go

242 lines
7.5 KiB
Go

// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"fmt"
"strings"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/dxf/framework/proto"
"github.com/pingcap/tidb/pkg/dxf/framework/schstatus"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/injectfailpoint"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/tracing"
)
// InitMeta insert the manager information into dist_framework_meta.
func (mgr *TaskManager) InitMeta(ctx context.Context, tidbID string, role string) error {
return mgr.WithNewSession(func(se sessionctx.Context) error {
return mgr.InitMetaSession(ctx, se, tidbID, role)
})
}
// InitMetaSession insert the manager information into dist_framework_meta.
// if the record exists, update the cpu_count and role.
func (*TaskManager) InitMetaSession(ctx context.Context, se sessionctx.Context, execID string, role string) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
cpuCount := cpu.GetCPUCount()
_, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `
insert into mysql.dist_framework_meta(host, role, cpu_count, keyspace_id)
values (%?, %?, %?, -1)
on duplicate key
update cpu_count = %?, role = %?`,
execID, role, cpuCount, cpuCount, role)
return err
}
// RecoverMeta insert the manager information into dist_framework_meta.
// if the record exists, update the cpu_count.
// Don't update role for we only update it in `set global tidb_service_scope`.
// if not there might has a data race.
func (mgr *TaskManager) RecoverMeta(ctx context.Context, execID string, role string) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
cpuCount := cpu.GetCPUCount()
_, err := mgr.ExecuteSQLWithNewSession(ctx, `
insert into mysql.dist_framework_meta(host, role, cpu_count, keyspace_id)
values (%?, %?, %?, -1)
on duplicate key
update cpu_count = %?`,
execID, role, cpuCount, cpuCount)
return err
}
// DeleteDeadNodes deletes the dead nodes from mysql.dist_framework_meta.
func (mgr *TaskManager) DeleteDeadNodes(ctx context.Context, nodes []string) error {
if len(nodes) == 0 {
return nil
}
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
return mgr.WithNewTxn(ctx, func(se sessionctx.Context) error {
deleteSQL := new(strings.Builder)
if err := sqlescape.FormatSQL(deleteSQL, "delete from mysql.dist_framework_meta where host in("); err != nil {
return err
}
deleteElems := make([]string, 0, len(nodes))
for _, node := range nodes {
deleteElems = append(deleteElems, fmt.Sprintf(`"%s"`, node))
}
deleteSQL.WriteString(strings.Join(deleteElems, ", "))
deleteSQL.WriteString(")")
_, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), deleteSQL.String())
return err
})
}
// GetAllNodes gets nodes in dist_framework_meta.
func (mgr *TaskManager) GetAllNodes(ctx context.Context) ([]proto.ManagedNode, error) {
r := tracing.StartRegion(ctx, "TaskManager.GetAllNodes")
defer r.End()
var nodes []proto.ManagedNode
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nodes, err
}
err := mgr.WithNewSession(func(se sessionctx.Context) error {
var err2 error
nodes, err2 = mgr.getAllNodesWithSession(ctx, se)
return err2
})
return nodes, err
}
func (*TaskManager) getAllNodesWithSession(ctx context.Context, se sessionctx.Context) ([]proto.ManagedNode, error) {
r := tracing.StartRegion(ctx, "TaskManager.getAllNodesWithSession")
defer r.End()
rs, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `
select host, role, cpu_count
from mysql.dist_framework_meta
order by host`)
if err != nil {
return nil, err
}
nodes := make([]proto.ManagedNode, 0, len(rs))
for _, r := range rs {
nodes = append(nodes, proto.ManagedNode{
ID: r.GetString(0),
Role: r.GetString(1),
CPUCount: int(r.GetInt64(2)),
})
}
return nodes, nil
}
// GetBusyNodes gets nodes that are currently running subtasks.
func (mgr *TaskManager) GetBusyNodes(ctx context.Context) ([]schstatus.Node, error) {
var execIDs []schstatus.Node
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return execIDs, err
}
err := mgr.WithNewSession(func(se sessionctx.Context) error {
rs, err2 := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `
select distinct exec_id from mysql.tidb_background_subtask
where state in (%?, %?)`,
proto.SubtaskStatePending, proto.SubtaskStateRunning)
if err2 != nil {
return err2
}
execIDs = make([]schstatus.Node, 0, len(rs))
for _, r := range rs {
execIDs = append(execIDs, schstatus.Node{ID: r.GetString(0)})
}
return nil
})
return execIDs, err
}
// GetUsedSlotsOnNodes implements the scheduler.TaskManager interface.
func (mgr *TaskManager) GetUsedSlotsOnNodes(ctx context.Context) (map[string]int, error) {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nil, err
}
// concurrency of subtasks of some step is the same, we use max(concurrency)
// to make group by works.
rs, err := mgr.ExecuteSQLWithNewSession(ctx, `
select
exec_id, sum(concurrency)
from (
select exec_id, task_key, max(concurrency) concurrency
from mysql.tidb_background_subtask
where state in (%?, %?)
group by exec_id, task_key
) a
group by exec_id`,
proto.SubtaskStatePending, proto.SubtaskStateRunning,
)
if err != nil {
return nil, err
}
slots := make(map[string]int, len(rs))
for _, r := range rs {
val, _ := r.GetMyDecimal(1).ToInt()
slots[r.GetString(0)] = int(val)
}
return slots, nil
}
// GetCPUCountOfNode gets the cpu count of node.
func (mgr *TaskManager) GetCPUCountOfNode(ctx context.Context) (int, error) {
var cnt int
err := mgr.WithNewSession(func(se sessionctx.Context) error {
var err2 error
cnt, err2 = mgr.getCPUCountOfNodeByRole(ctx, se, "", true)
return err2
})
return cnt, err
}
// GetCPUCountOfNodeByRole gets the cpu count of node by role.
func (mgr *TaskManager) GetCPUCountOfNodeByRole(ctx context.Context, role string) (int, error) {
var cnt int
err := mgr.WithNewSession(func(se sessionctx.Context) error {
var err2 error
cnt, err2 = mgr.getCPUCountOfNodeByRole(ctx, se, role, false)
return err2
})
return cnt, err
}
// getCPUCountOfNodeByRole gets the cpu count of managed node by role,
// returns error when there's no node or no node has valid cpu count.
func (mgr *TaskManager) getCPUCountOfNodeByRole(
ctx context.Context,
se sessionctx.Context,
role string,
arbitarary bool,
) (int, error) {
nodes, err := mgr.getAllNodesWithSession(ctx, se)
if err != nil {
return 0, err
}
if len(nodes) == 0 {
return 0, errors.New("no managed nodes")
}
var cpuCount int
for _, n := range nodes {
if !arbitarary && n.Role != role {
continue
}
if n.CPUCount > 0 {
cpuCount = n.CPUCount
break
}
}
if cpuCount == 0 {
return 0, errors.New("no managed node have enough resource for dist task")
}
return cpuCount, nil
}