*: inject random error for DXF (part1) (#60278)

ref pingcap/tidb#60277
This commit is contained in:
wjHuang
2025-03-27 20:38:13 +08:00
committed by GitHub
parent 979f142e50
commit e2d1d89c9e
11 changed files with 154 additions and 11 deletions

View File

@ -20,6 +20,7 @@ go_library(
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/cpu",
"//pkg/util/injectfailpoint",
"//pkg/util/logutil",
"//pkg/util/sqlescape",
"//pkg/util/sqlexec",

View File

@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/injectfailpoint"
"github.com/pingcap/tidb/pkg/util/sqlexec"
)
@ -46,6 +47,9 @@ func (mgr *TaskManager) TransferTasks2History(ctx context.Context, tasks []*prot
for _, task := range tasks {
taskIDStrs = append(taskIDStrs, fmt.Sprintf("%d", task.ID))
}
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
return mgr.WithNewTxn(ctx, func(se sessionctx.Context) error {
// sensitive data in meta might be redacted, need update first.
exec := se.GetSQLExecutor()
@ -84,6 +88,9 @@ func (mgr *TaskManager) TransferTasks2History(ctx context.Context, tasks []*prot
func (mgr *TaskManager) GCSubtasks(ctx context.Context) error {
subtaskHistoryKeepSeconds := defaultSubtaskKeepDays * 24 * 60 * 60
failpoint.InjectCall("subtaskHistoryKeepSeconds", &subtaskHistoryKeepSeconds)
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
_, err := mgr.ExecuteSQLWithNewSession(
ctx,
fmt.Sprintf("DELETE FROM mysql.tidb_background_subtask_history WHERE state_update_time < UNIX_TIMESTAMP() - %d ;", subtaskHistoryKeepSeconds),

View File

@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/injectfailpoint"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/sqlexec"
)
@ -67,6 +68,9 @@ func (mgr *TaskManager) DeleteDeadNodes(ctx context.Context, nodes []string) err
if len(nodes) == 0 {
return nil
}
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
return mgr.WithNewTxn(ctx, func(se sessionctx.Context) error {
deleteSQL := new(strings.Builder)
if err := sqlescape.FormatSQL(deleteSQL, "delete from mysql.dist_framework_meta where host in("); err != nil {
@ -87,6 +91,9 @@ func (mgr *TaskManager) DeleteDeadNodes(ctx context.Context, nodes []string) err
// GetAllNodes gets nodes in dist_framework_meta.
func (mgr *TaskManager) GetAllNodes(ctx context.Context) ([]proto.ManagedNode, error) {
var nodes []proto.ManagedNode
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nodes, err
}
err := mgr.WithNewSession(func(se sessionctx.Context) error {
var err2 error
nodes, err2 = mgr.getAllNodesWithSession(ctx, se)
@ -116,6 +123,9 @@ func (*TaskManager) getAllNodesWithSession(ctx context.Context, se sessionctx.Co
// GetUsedSlotsOnNodes implements the scheduler.TaskManager interface.
func (mgr *TaskManager) GetUsedSlotsOnNodes(ctx context.Context) (map[string]int, error) {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nil, err
}
// concurrency of subtasks of some step is the same, we use max(concurrency)
// to make group by works.
rs, err := mgr.ExecuteSQLWithNewSession(ctx, `

View File

@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/injectfailpoint"
"github.com/pingcap/tidb/pkg/util/sqlexec"
)
@ -108,6 +109,9 @@ func (mgr *TaskManager) PauseSubtasks(ctx context.Context, execID string, taskID
// ResumeSubtasks update all paused subtasks to pending state.
func (mgr *TaskManager) ResumeSubtasks(ctx context.Context, taskID int64) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
_, err := mgr.ExecuteSQLWithNewSession(ctx,
`update mysql.tidb_background_subtask set state = "pending", error = null where task_key = %? and state = "paused"`, taskID)
return err

View File

@ -22,11 +22,15 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/injectfailpoint"
"github.com/pingcap/tidb/pkg/util/sqlexec"
)
// CancelTask cancels task.
func (mgr *TaskManager) CancelTask(ctx context.Context, taskID int64) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
_, err := mgr.ExecuteSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
@ -53,6 +57,9 @@ func (*TaskManager) CancelTaskByKeySession(ctx context.Context, se sessionctx.Co
// FailTask implements the scheduler.TaskManager interface.
func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
_, err := mgr.ExecuteSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
@ -67,6 +74,9 @@ func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState
// RevertTask implements the scheduler.TaskManager interface.
func (mgr *TaskManager) RevertTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
return mgr.transitTaskStateOnErr(ctx, taskID, taskState, proto.TaskStateReverting, taskErr)
}
@ -84,11 +94,17 @@ func (mgr *TaskManager) transitTaskStateOnErr(ctx context.Context, taskID int64,
// AwaitingResolveTask implements the scheduler.TaskManager interface.
func (mgr *TaskManager) AwaitingResolveTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
return mgr.transitTaskStateOnErr(ctx, taskID, taskState, proto.TaskStateAwaitingResolution, taskErr)
}
// RevertedTask implements the scheduler.TaskManager interface.
func (mgr *TaskManager) RevertedTask(ctx context.Context, taskID int64) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
_, err := mgr.ExecuteSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
@ -103,6 +119,9 @@ func (mgr *TaskManager) RevertedTask(ctx context.Context, taskID int64) error {
// PauseTask pauses the task.
func (mgr *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, error) {
found := false
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return false, err
}
err := mgr.WithNewSession(func(se sessionctx.Context) error {
_, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(),
`update mysql.tidb_global_task
@ -127,6 +146,9 @@ func (mgr *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, er
// PausedTask update the task state from pausing to paused.
func (mgr *TaskManager) PausedTask(ctx context.Context, taskID int64) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
_, err := mgr.ExecuteSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
@ -164,6 +186,9 @@ func (mgr *TaskManager) ResumeTask(ctx context.Context, taskKey string) (bool, e
// ResumedTask implements the scheduler.TaskManager interface.
func (mgr *TaskManager) ResumedTask(ctx context.Context, taskID int64) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
_, err := mgr.ExecuteSQLWithNewSession(ctx, `
update mysql.tidb_global_task
set state = %?,
@ -213,6 +238,9 @@ func (mgr *TaskManager) ModifyTaskByID(ctx context.Context, taskID int64, param
// ModifiedTask implements the scheduler.TaskManager interface.
func (mgr *TaskManager) ModifiedTask(ctx context.Context, task *proto.Task) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
prevState := task.ModifyParam.PrevState
return mgr.WithNewTxn(ctx, func(se sessionctx.Context) error {
failpoint.InjectCall("beforeModifiedTask")
@ -248,6 +276,9 @@ func (mgr *TaskManager) ModifiedTask(ctx context.Context, task *proto.Task) erro
// SucceedTask update task state from running to succeed.
func (mgr *TaskManager) SucceedTask(ctx context.Context, taskID int64) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
return mgr.WithNewSession(func(se sessionctx.Context) error {
_, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `
update mysql.tidb_global_task

View File

@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/injectfailpoint"
"github.com/pingcap/tidb/pkg/util/sqlexec"
clitutil "github.com/tikv/client-go/v2/util"
)
@ -276,6 +277,9 @@ func (mgr *TaskManager) CreateTaskWithSession(
// GetTopUnfinishedTasks implements the scheduler.TaskManager interface.
func (mgr *TaskManager) GetTopUnfinishedTasks(ctx context.Context) ([]*proto.TaskBase, error) {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nil, err
}
rs, err := mgr.ExecuteSQLWithNewSession(ctx,
`select `+basicTaskColumns+` from mysql.tidb_global_task t
where state in (%?, %?, %?, %?, %?, %?, %?)
@ -360,6 +364,9 @@ func (mgr *TaskManager) GetTasksInStates(ctx context.Context, states ...any) (ta
return task, nil
}
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nil, err
}
rs, err := mgr.ExecuteSQLWithNewSession(ctx,
"select "+TaskColumns+" from mysql.tidb_global_task t "+
"where state in ("+strings.Repeat("%?,", len(states)-1)+"%?)"+
@ -389,6 +396,9 @@ func (mgr *TaskManager) GetTaskByID(ctx context.Context, taskID int64) (task *pr
// GetTaskBaseByID implements the TaskManager.GetTaskBaseByID interface.
func (mgr *TaskManager) GetTaskBaseByID(ctx context.Context, taskID int64) (task *proto.TaskBase, err error) {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nil, err
}
err = mgr.WithNewSession(func(se sessionctx.Context) error {
var err2 error
task, err2 = mgr.getTaskBaseByID(ctx, se.GetSQLExecutor(), taskID)
@ -519,6 +529,9 @@ func (mgr *TaskManager) GetFirstSubtaskInStates(ctx context.Context, tidbID stri
// GetActiveSubtasks implements TaskManager.GetActiveSubtasks.
func (mgr *TaskManager) GetActiveSubtasks(ctx context.Context, taskID int64) ([]*proto.SubtaskBase, error) {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nil, err
}
rs, err := mgr.ExecuteSQLWithNewSession(ctx, `
select `+basicSubtaskColumns+` from mysql.tidb_background_subtask
where task_key = %? and state in (%?, %?)`,
@ -535,6 +548,9 @@ func (mgr *TaskManager) GetActiveSubtasks(ctx context.Context, taskID int64) ([]
// GetAllSubtasksByStepAndState gets the subtask by step and state.
func (mgr *TaskManager) GetAllSubtasksByStepAndState(ctx context.Context, taskID int64, step proto.Step, state proto.SubtaskState) ([]*proto.Subtask, error) {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nil, err
}
rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+SubtaskColumns+` from mysql.tidb_background_subtask
where task_key = %? and state = %? and step = %?`,
taskID, state, step)
@ -581,6 +597,9 @@ func (mgr *TaskManager) UpdateSubtaskRowCount(ctx context.Context, subtaskID int
// GetSubtaskCntGroupByStates gets the subtask count by states.
func (mgr *TaskManager) GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.SubtaskState]int64, error) {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nil, err
}
rs, err := mgr.ExecuteSQLWithNewSession(ctx, `
select state, count(*)
from mysql.tidb_background_subtask
@ -602,6 +621,9 @@ func (mgr *TaskManager) GetSubtaskCntGroupByStates(ctx context.Context, taskID i
// GetSubtaskErrors gets subtasks' errors.
func (mgr *TaskManager) GetSubtaskErrors(ctx context.Context, taskID int64) ([]error, error) {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nil, err
}
rs, err := mgr.ExecuteSQLWithNewSession(ctx,
`select error from mysql.tidb_background_subtask
where task_key = %? AND state in (%?, %?)`, taskID, proto.SubtaskStateFailed, proto.SubtaskStateCanceled)
@ -636,6 +658,9 @@ func (mgr *TaskManager) UpdateSubtasksExecIDs(ctx context.Context, subtasks []*p
if len(subtasks) == 0 {
return nil
}
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
err := mgr.WithNewTxn(ctx, func(se sessionctx.Context) error {
for _, subtask := range subtasks {
_, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `
@ -660,6 +685,9 @@ func (mgr *TaskManager) SwitchTaskStep(
nextStep proto.Step,
subtasks []*proto.Subtask,
) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
return mgr.WithNewTxn(ctx, func(se sessionctx.Context) error {
vars := se.GetSessionVars()
if vars.MemQuotaQuery < vardef.DefTiDBMemQuotaQuery {
@ -741,6 +769,9 @@ func (mgr *TaskManager) SwitchTaskStepInBatch(
nextStep proto.Step,
subtasks []*proto.Subtask,
) error {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return err
}
return mgr.WithNewSession(func(se sessionctx.Context) error {
// some subtasks may be inserted by other schedulers, we can skip them.
rs, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `
@ -862,6 +893,9 @@ func (mgr *TaskManager) GetAllTasks(ctx context.Context) ([]*proto.TaskBase, err
// GetAllSubtasks gets all subtasks with basic columns.
func (mgr *TaskManager) GetAllSubtasks(ctx context.Context) ([]*proto.SubtaskBase, error) {
if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil {
return nil, err
}
rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+basicSubtaskColumns+` from mysql.tidb_background_subtask`)
if err != nil {
return nil, err

View File

@ -6,11 +6,12 @@ package checkpointspb
import (
encoding_binary "encoding/binary"
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used.

View File

@ -26,7 +26,6 @@ package parser
import (
// needed to connect to MySQL
dbsql "database/sql"
gio "io"
"os"

View File

@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "injectfailpoint",
srcs = ["random_retry.go"],
importpath = "github.com/pingcap/tidb/pkg/util/injectfailpoint",
visibility = ["//visibility:public"],
deps = [
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
],
)

View File

@ -0,0 +1,44 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package injectfailpoint
import (
"math/rand"
"runtime"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
)
func getFunctionName() string {
pc, _, _, _ := runtime.Caller(2)
return runtime.FuncForPC(pc).Name()
}
// DXFRandomErrorWithOnePercent returns an error with probability 0.01. It controls the DXF's failpoint.
func DXFRandomErrorWithOnePercent() error {
failpoint.Inject("DXFRandomError", func() {
failpoint.Return(RandomError(0.01, errors.Errorf("injected random error, caller: %s", getFunctionName())))
})
return nil
}
// RandomError returns an error with the given probability.
func RandomError(probability float64, err error) error {
if rand.Float64() < probability {
return err
}
return nil
}

View File

@ -18,24 +18,24 @@ import (
"bytes"
"context"
"fmt"
"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl/placement"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
"github.com/pingcap/tidb/pkg/types"
"github.com/tikv/client-go/v2/oracle"
"strconv"
"testing"
"time"
"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/placement"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
)