From e2d1d89c9eee8dd5ca29a8aec7b19fd78df9e85a Mon Sep 17 00:00:00 2001 From: wjHuang Date: Thu, 27 Mar 2025 20:38:13 +0800 Subject: [PATCH] *: inject random error for DXF (part1) (#60278) ref pingcap/tidb#60277 --- pkg/disttask/framework/storage/BUILD.bazel | 1 + pkg/disttask/framework/storage/history.go | 7 +++ pkg/disttask/framework/storage/nodes.go | 10 +++++ .../framework/storage/subtask_state.go | 4 ++ pkg/disttask/framework/storage/task_state.go | 31 +++++++++++++ pkg/disttask/framework/storage/task_table.go | 34 ++++++++++++++ .../checkpointspb/file_checkpoints.pb.go | 5 ++- pkg/parser/reserved_words_test.go | 1 - pkg/util/injectfailpoint/BUILD.bazel | 12 +++++ pkg/util/injectfailpoint/random_retry.go | 44 +++++++++++++++++++ tests/realtikvtest/txntest/stale_read_test.go | 16 +++---- 11 files changed, 154 insertions(+), 11 deletions(-) create mode 100644 pkg/util/injectfailpoint/BUILD.bazel create mode 100644 pkg/util/injectfailpoint/random_retry.go diff --git a/pkg/disttask/framework/storage/BUILD.bazel b/pkg/disttask/framework/storage/BUILD.bazel index b96e1cf7de..0315dd04b3 100644 --- a/pkg/disttask/framework/storage/BUILD.bazel +++ b/pkg/disttask/framework/storage/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/util", "//pkg/util/chunk", "//pkg/util/cpu", + "//pkg/util/injectfailpoint", "//pkg/util/logutil", "//pkg/util/sqlescape", "//pkg/util/sqlexec", diff --git a/pkg/disttask/framework/storage/history.go b/pkg/disttask/framework/storage/history.go index 47655d05ff..3f854dcb03 100644 --- a/pkg/disttask/framework/storage/history.go +++ b/pkg/disttask/framework/storage/history.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/util/injectfailpoint" "github.com/pingcap/tidb/pkg/util/sqlexec" ) @@ -46,6 +47,9 @@ func (mgr *TaskManager) TransferTasks2History(ctx context.Context, tasks []*prot for _, task := range tasks { taskIDStrs = append(taskIDStrs, fmt.Sprintf("%d", task.ID)) } + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } return mgr.WithNewTxn(ctx, func(se sessionctx.Context) error { // sensitive data in meta might be redacted, need update first. exec := se.GetSQLExecutor() @@ -84,6 +88,9 @@ func (mgr *TaskManager) TransferTasks2History(ctx context.Context, tasks []*prot func (mgr *TaskManager) GCSubtasks(ctx context.Context) error { subtaskHistoryKeepSeconds := defaultSubtaskKeepDays * 24 * 60 * 60 failpoint.InjectCall("subtaskHistoryKeepSeconds", &subtaskHistoryKeepSeconds) + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } _, err := mgr.ExecuteSQLWithNewSession( ctx, fmt.Sprintf("DELETE FROM mysql.tidb_background_subtask_history WHERE state_update_time < UNIX_TIMESTAMP() - %d ;", subtaskHistoryKeepSeconds), diff --git a/pkg/disttask/framework/storage/nodes.go b/pkg/disttask/framework/storage/nodes.go index b8c2c1b3af..986e7f4b27 100644 --- a/pkg/disttask/framework/storage/nodes.go +++ b/pkg/disttask/framework/storage/nodes.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util/cpu" + "github.com/pingcap/tidb/pkg/util/injectfailpoint" "github.com/pingcap/tidb/pkg/util/sqlescape" "github.com/pingcap/tidb/pkg/util/sqlexec" ) @@ -67,6 +68,9 @@ func (mgr *TaskManager) DeleteDeadNodes(ctx context.Context, nodes []string) err if len(nodes) == 0 { return nil } + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } return mgr.WithNewTxn(ctx, func(se sessionctx.Context) error { deleteSQL := new(strings.Builder) if err := sqlescape.FormatSQL(deleteSQL, "delete from mysql.dist_framework_meta where host in("); err != nil { @@ -87,6 +91,9 @@ func (mgr *TaskManager) DeleteDeadNodes(ctx context.Context, nodes []string) err // GetAllNodes gets nodes in dist_framework_meta. func (mgr *TaskManager) GetAllNodes(ctx context.Context) ([]proto.ManagedNode, error) { var nodes []proto.ManagedNode + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return nodes, err + } err := mgr.WithNewSession(func(se sessionctx.Context) error { var err2 error nodes, err2 = mgr.getAllNodesWithSession(ctx, se) @@ -116,6 +123,9 @@ func (*TaskManager) getAllNodesWithSession(ctx context.Context, se sessionctx.Co // GetUsedSlotsOnNodes implements the scheduler.TaskManager interface. func (mgr *TaskManager) GetUsedSlotsOnNodes(ctx context.Context) (map[string]int, error) { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return nil, err + } // concurrency of subtasks of some step is the same, we use max(concurrency) // to make group by works. rs, err := mgr.ExecuteSQLWithNewSession(ctx, ` diff --git a/pkg/disttask/framework/storage/subtask_state.go b/pkg/disttask/framework/storage/subtask_state.go index a0acbd0c19..2717e120ab 100644 --- a/pkg/disttask/framework/storage/subtask_state.go +++ b/pkg/disttask/framework/storage/subtask_state.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/util/injectfailpoint" "github.com/pingcap/tidb/pkg/util/sqlexec" ) @@ -108,6 +109,9 @@ func (mgr *TaskManager) PauseSubtasks(ctx context.Context, execID string, taskID // ResumeSubtasks update all paused subtasks to pending state. func (mgr *TaskManager) ResumeSubtasks(ctx context.Context, taskID int64) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_background_subtask set state = "pending", error = null where task_key = %? and state = "paused"`, taskID) return err diff --git a/pkg/disttask/framework/storage/task_state.go b/pkg/disttask/framework/storage/task_state.go index 6b208d656c..9ffeae72f2 100644 --- a/pkg/disttask/framework/storage/task_state.go +++ b/pkg/disttask/framework/storage/task_state.go @@ -22,11 +22,15 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/util/injectfailpoint" "github.com/pingcap/tidb/pkg/util/sqlexec" ) // CancelTask cancels task. func (mgr *TaskManager) CancelTask(ctx context.Context, taskID int64) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_global_task set state = %?, @@ -53,6 +57,9 @@ func (*TaskManager) CancelTaskByKeySession(ctx context.Context, se sessionctx.Co // FailTask implements the scheduler.TaskManager interface. func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_global_task set state = %?, @@ -67,6 +74,9 @@ func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState // RevertTask implements the scheduler.TaskManager interface. func (mgr *TaskManager) RevertTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } return mgr.transitTaskStateOnErr(ctx, taskID, taskState, proto.TaskStateReverting, taskErr) } @@ -84,11 +94,17 @@ func (mgr *TaskManager) transitTaskStateOnErr(ctx context.Context, taskID int64, // AwaitingResolveTask implements the scheduler.TaskManager interface. func (mgr *TaskManager) AwaitingResolveTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } return mgr.transitTaskStateOnErr(ctx, taskID, taskState, proto.TaskStateAwaitingResolution, taskErr) } // RevertedTask implements the scheduler.TaskManager interface. func (mgr *TaskManager) RevertedTask(ctx context.Context, taskID int64) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_global_task set state = %?, @@ -103,6 +119,9 @@ func (mgr *TaskManager) RevertedTask(ctx context.Context, taskID int64) error { // PauseTask pauses the task. func (mgr *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, error) { found := false + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return false, err + } err := mgr.WithNewSession(func(se sessionctx.Context) error { _, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `update mysql.tidb_global_task @@ -127,6 +146,9 @@ func (mgr *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, er // PausedTask update the task state from pausing to paused. func (mgr *TaskManager) PausedTask(ctx context.Context, taskID int64) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_global_task set state = %?, @@ -164,6 +186,9 @@ func (mgr *TaskManager) ResumeTask(ctx context.Context, taskKey string) (bool, e // ResumedTask implements the scheduler.TaskManager interface. func (mgr *TaskManager) ResumedTask(ctx context.Context, taskID int64) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } _, err := mgr.ExecuteSQLWithNewSession(ctx, ` update mysql.tidb_global_task set state = %?, @@ -213,6 +238,9 @@ func (mgr *TaskManager) ModifyTaskByID(ctx context.Context, taskID int64, param // ModifiedTask implements the scheduler.TaskManager interface. func (mgr *TaskManager) ModifiedTask(ctx context.Context, task *proto.Task) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } prevState := task.ModifyParam.PrevState return mgr.WithNewTxn(ctx, func(se sessionctx.Context) error { failpoint.InjectCall("beforeModifiedTask") @@ -248,6 +276,9 @@ func (mgr *TaskManager) ModifiedTask(ctx context.Context, task *proto.Task) erro // SucceedTask update task state from running to succeed. func (mgr *TaskManager) SucceedTask(ctx context.Context, taskID int64) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } return mgr.WithNewSession(func(se sessionctx.Context) error { _, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), ` update mysql.tidb_global_task diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index 7c2174d875..871a20cf6a 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/injectfailpoint" "github.com/pingcap/tidb/pkg/util/sqlexec" clitutil "github.com/tikv/client-go/v2/util" ) @@ -276,6 +277,9 @@ func (mgr *TaskManager) CreateTaskWithSession( // GetTopUnfinishedTasks implements the scheduler.TaskManager interface. func (mgr *TaskManager) GetTopUnfinishedTasks(ctx context.Context) ([]*proto.TaskBase, error) { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return nil, err + } rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+basicTaskColumns+` from mysql.tidb_global_task t where state in (%?, %?, %?, %?, %?, %?, %?) @@ -360,6 +364,9 @@ func (mgr *TaskManager) GetTasksInStates(ctx context.Context, states ...any) (ta return task, nil } + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return nil, err + } rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select "+TaskColumns+" from mysql.tidb_global_task t "+ "where state in ("+strings.Repeat("%?,", len(states)-1)+"%?)"+ @@ -389,6 +396,9 @@ func (mgr *TaskManager) GetTaskByID(ctx context.Context, taskID int64) (task *pr // GetTaskBaseByID implements the TaskManager.GetTaskBaseByID interface. func (mgr *TaskManager) GetTaskBaseByID(ctx context.Context, taskID int64) (task *proto.TaskBase, err error) { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return nil, err + } err = mgr.WithNewSession(func(se sessionctx.Context) error { var err2 error task, err2 = mgr.getTaskBaseByID(ctx, se.GetSQLExecutor(), taskID) @@ -519,6 +529,9 @@ func (mgr *TaskManager) GetFirstSubtaskInStates(ctx context.Context, tidbID stri // GetActiveSubtasks implements TaskManager.GetActiveSubtasks. func (mgr *TaskManager) GetActiveSubtasks(ctx context.Context, taskID int64) ([]*proto.SubtaskBase, error) { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return nil, err + } rs, err := mgr.ExecuteSQLWithNewSession(ctx, ` select `+basicSubtaskColumns+` from mysql.tidb_background_subtask where task_key = %? and state in (%?, %?)`, @@ -535,6 +548,9 @@ func (mgr *TaskManager) GetActiveSubtasks(ctx context.Context, taskID int64) ([] // GetAllSubtasksByStepAndState gets the subtask by step and state. func (mgr *TaskManager) GetAllSubtasksByStepAndState(ctx context.Context, taskID int64, step proto.Step, state proto.SubtaskState) ([]*proto.Subtask, error) { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return nil, err + } rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+SubtaskColumns+` from mysql.tidb_background_subtask where task_key = %? and state = %? and step = %?`, taskID, state, step) @@ -581,6 +597,9 @@ func (mgr *TaskManager) UpdateSubtaskRowCount(ctx context.Context, subtaskID int // GetSubtaskCntGroupByStates gets the subtask count by states. func (mgr *TaskManager) GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.SubtaskState]int64, error) { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return nil, err + } rs, err := mgr.ExecuteSQLWithNewSession(ctx, ` select state, count(*) from mysql.tidb_background_subtask @@ -602,6 +621,9 @@ func (mgr *TaskManager) GetSubtaskCntGroupByStates(ctx context.Context, taskID i // GetSubtaskErrors gets subtasks' errors. func (mgr *TaskManager) GetSubtaskErrors(ctx context.Context, taskID int64) ([]error, error) { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return nil, err + } rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select error from mysql.tidb_background_subtask where task_key = %? AND state in (%?, %?)`, taskID, proto.SubtaskStateFailed, proto.SubtaskStateCanceled) @@ -636,6 +658,9 @@ func (mgr *TaskManager) UpdateSubtasksExecIDs(ctx context.Context, subtasks []*p if len(subtasks) == 0 { return nil } + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } err := mgr.WithNewTxn(ctx, func(se sessionctx.Context) error { for _, subtask := range subtasks { _, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), ` @@ -660,6 +685,9 @@ func (mgr *TaskManager) SwitchTaskStep( nextStep proto.Step, subtasks []*proto.Subtask, ) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } return mgr.WithNewTxn(ctx, func(se sessionctx.Context) error { vars := se.GetSessionVars() if vars.MemQuotaQuery < vardef.DefTiDBMemQuotaQuery { @@ -741,6 +769,9 @@ func (mgr *TaskManager) SwitchTaskStepInBatch( nextStep proto.Step, subtasks []*proto.Subtask, ) error { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return err + } return mgr.WithNewSession(func(se sessionctx.Context) error { // some subtasks may be inserted by other schedulers, we can skip them. rs, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), ` @@ -862,6 +893,9 @@ func (mgr *TaskManager) GetAllTasks(ctx context.Context) ([]*proto.TaskBase, err // GetAllSubtasks gets all subtasks with basic columns. func (mgr *TaskManager) GetAllSubtasks(ctx context.Context) ([]*proto.SubtaskBase, error) { + if err := injectfailpoint.DXFRandomErrorWithOnePercent(); err != nil { + return nil, err + } rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+basicSubtaskColumns+` from mysql.tidb_background_subtask`) if err != nil { return nil, err diff --git a/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.pb.go b/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.pb.go index 464d3acb93..d3f5f4a9d4 100644 --- a/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.pb.go +++ b/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.pb.go @@ -6,11 +6,12 @@ package checkpointspb import ( encoding_binary "encoding/binary" fmt "fmt" - _ "github.com/gogo/protobuf/gogoproto" - proto "github.com/gogo/protobuf/proto" io "io" math "math" math_bits "math/bits" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/pkg/parser/reserved_words_test.go b/pkg/parser/reserved_words_test.go index f8a82c9315..9d7f0b2f01 100644 --- a/pkg/parser/reserved_words_test.go +++ b/pkg/parser/reserved_words_test.go @@ -26,7 +26,6 @@ package parser import ( // needed to connect to MySQL - dbsql "database/sql" gio "io" "os" diff --git a/pkg/util/injectfailpoint/BUILD.bazel b/pkg/util/injectfailpoint/BUILD.bazel new file mode 100644 index 0000000000..0000306239 --- /dev/null +++ b/pkg/util/injectfailpoint/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "injectfailpoint", + srcs = ["random_retry.go"], + importpath = "github.com/pingcap/tidb/pkg/util/injectfailpoint", + visibility = ["//visibility:public"], + deps = [ + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + ], +) diff --git a/pkg/util/injectfailpoint/random_retry.go b/pkg/util/injectfailpoint/random_retry.go new file mode 100644 index 0000000000..deeeb4ded4 --- /dev/null +++ b/pkg/util/injectfailpoint/random_retry.go @@ -0,0 +1,44 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package injectfailpoint + +import ( + "math/rand" + "runtime" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" +) + +func getFunctionName() string { + pc, _, _, _ := runtime.Caller(2) + return runtime.FuncForPC(pc).Name() +} + +// DXFRandomErrorWithOnePercent returns an error with probability 0.01. It controls the DXF's failpoint. +func DXFRandomErrorWithOnePercent() error { + failpoint.Inject("DXFRandomError", func() { + failpoint.Return(RandomError(0.01, errors.Errorf("injected random error, caller: %s", getFunctionName()))) + }) + return nil +} + +// RandomError returns an error with the given probability. +func RandomError(probability float64, err error) error { + if rand.Float64() < probability { + return err + } + return nil +} diff --git a/tests/realtikvtest/txntest/stale_read_test.go b/tests/realtikvtest/txntest/stale_read_test.go index d12ce23c59..4105ab0a1d 100644 --- a/tests/realtikvtest/txntest/stale_read_test.go +++ b/tests/realtikvtest/txntest/stale_read_test.go @@ -18,24 +18,24 @@ import ( "bytes" "context" "fmt" - "github.com/docker/go-units" - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/pkg/ddl/placement" - "github.com/pingcap/tidb/pkg/sessiontxn" - "github.com/pingcap/tidb/pkg/sessiontxn/staleread" - "github.com/pingcap/tidb/pkg/types" - "github.com/tikv/client-go/v2/oracle" "strconv" "testing" "time" + "github.com/docker/go-units" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl/placement" + "github.com/pingcap/tidb/pkg/sessiontxn" + "github.com/pingcap/tidb/pkg/sessiontxn/staleread" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc/interceptor" )