diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index 68b9083355..fb40a600aa 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -61,7 +61,7 @@ type BackfillSubTaskMeta struct { // NewBackfillSubtaskExecutor creates a new backfill subtask executor. func NewBackfillSubtaskExecutor(taskMeta []byte, d *ddl, - bc ingest.BackendCtx, stage proto.Step, summary *execute.Summary) (execute.StepExecutor, error) { + bc ingest.BackendCtx, stage proto.Step) (execute.StepExecutor, error) { bgm := &BackfillTaskMeta{} err := json.Unmarshal(taskMeta, bgm) if err != nil { @@ -90,7 +90,7 @@ func NewBackfillSubtaskExecutor(taskMeta []byte, d *ddl, d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query) d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type) return newReadIndexExecutor( - d, &bgm.Job, indexInfos, tbl.(table.PhysicalTable), jc, bc, summary, bgm.CloudStorageURI), nil + d, &bgm.Job, indexInfos, tbl.(table.PhysicalTable), jc, bc, bgm.CloudStorageURI), nil case proto.BackfillStepMergeSort: return newMergeSortExecutor(jobMeta.ID, len(indexInfos), tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI) case proto.BackfillStepWriteAndIngest: @@ -167,10 +167,10 @@ func decodeIndexUniqueness(job *model.Job) (bool, error) { return unique[0], nil } -func (s *backfillDistExecutor) GetStepExecutor(task *proto.Task, summary *execute.Summary, _ *proto.StepResource) (execute.StepExecutor, error) { +func (s *backfillDistExecutor) GetStepExecutor(task *proto.Task, _ *proto.StepResource) (execute.StepExecutor, error) { switch task.Step { case proto.BackfillStepReadIndex, proto.BackfillStepMergeSort, proto.BackfillStepWriteAndIngest: - return NewBackfillSubtaskExecutor(task.Meta, s.d, s.backendCtx, task.Step, summary) + return NewBackfillSubtaskExecutor(task.Meta, s.d, s.backendCtx, task.Step) default: return nil, errors.Errorf("unknown backfill step %d for task %d", task.Step, task.ID) } diff --git a/pkg/ddl/backfilling_import_cloud.go b/pkg/ddl/backfilling_import_cloud.go index 77003923cc..4b590ab6f3 100644 --- a/pkg/ddl/backfilling_import_cloud.go +++ b/pkg/ddl/backfilling_import_cloud.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/disttask/framework/proto" + "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/logutil" @@ -30,6 +31,7 @@ import ( ) type cloudImportExecutor struct { + taskexecutor.EmptyStepExecutor job *model.Job jobID int64 index *model.IndexInfo diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index e93443285f..16b6480a67 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/disttask/framework/proto" + "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/logutil" @@ -34,6 +35,7 @@ import ( ) type mergeSortExecutor struct { + taskexecutor.EmptyStepExecutor jobID int64 idxNum int ptbl table.PhysicalTable diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index ce207c5394..3e3fd6ce3c 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -46,8 +46,8 @@ type readIndexExecutor struct { cloudStorageURI string - bc ingest.BackendCtx - summary *execute.Summary + bc ingest.BackendCtx + curRowCount *atomic.Int64 subtaskSummary sync.Map // subtaskID => readIndexSummary } @@ -67,7 +67,6 @@ func newReadIndexExecutor( ptbl table.PhysicalTable, jc *JobContext, bc ingest.BackendCtx, - summary *execute.Summary, cloudStorageURI string, ) *readIndexExecutor { return &readIndexExecutor{ @@ -77,8 +76,8 @@ func newReadIndexExecutor( ptbl: ptbl, jc: jc, bc: bc, - summary: summary, cloudStorageURI: cloudStorageURI, + curRowCount: &atomic.Int64{}, } } @@ -117,13 +116,13 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta opCtx := NewOperatorCtx(ctx) defer opCtx.Cancel() - totalRowCount := &atomic.Int64{} + r.curRowCount.Store(0) var pipe *operator.AsyncPipeline if len(r.cloudStorageURI) > 0 { - pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sessCtx, tbl, startKey, endKey, totalRowCount) + pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sessCtx, tbl, startKey, endKey, r.curRowCount) } else { - pipe, err = r.buildLocalStorePipeline(opCtx, sessCtx, tbl, startKey, endKey, totalRowCount) + pipe, err = r.buildLocalStorePipeline(opCtx, sessCtx, tbl, startKey, endKey, r.curRowCount) } if err != nil { return err @@ -142,10 +141,15 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta } r.bc.ResetWorkers(r.job.ID) - r.summary.UpdateRowCount(subtask.ID, totalRowCount.Load()) return nil } +func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary { + return &execute.SubtaskSummary{ + RowCount: r.curRowCount.Load(), + } +} + func (*readIndexExecutor) Cleanup(ctx context.Context) error { logutil.Logger(ctx).Info("read index executor cleanup subtask exec env", zap.String("category", "ddl")) diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 0877d681ac..f8d1b6c5c0 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -742,7 +742,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl { taskexecutor.RegisterTaskType(proto.Backfill, func(ctx context.Context, id string, task *proto.Task, taskTable taskexecutor.TaskTable) taskexecutor.TaskExecutor { return newBackfillDistExecutor(ctx, id, task, taskTable, d) - }, taskexecutor.WithSummary, + }, ) scheduler.RegisterSchedulerFactory(proto.Backfill, diff --git a/pkg/disttask/framework/mock/execute/BUILD.bazel b/pkg/disttask/framework/mock/execute/BUILD.bazel index 5d4e46271f..31ea22ceb0 100644 --- a/pkg/disttask/framework/mock/execute/BUILD.bazel +++ b/pkg/disttask/framework/mock/execute/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/disttask/framework/proto", + "//pkg/disttask/framework/taskexecutor/execute", "@org_uber_go_mock//gomock", ], ) diff --git a/pkg/disttask/framework/mock/execute/execute_mock.go b/pkg/disttask/framework/mock/execute/execute_mock.go index cd6ca330a3..cae959b911 100644 --- a/pkg/disttask/framework/mock/execute/execute_mock.go +++ b/pkg/disttask/framework/mock/execute/execute_mock.go @@ -13,6 +13,7 @@ import ( reflect "reflect" proto "github.com/pingcap/tidb/pkg/disttask/framework/proto" + execute "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" gomock "go.uber.org/mock/gomock" ) @@ -81,6 +82,20 @@ func (mr *MockStepExecutorMockRecorder) OnFinished(arg0, arg1 any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFinished", reflect.TypeOf((*MockStepExecutor)(nil).OnFinished), arg0, arg1) } +// RealtimeSummary mocks base method. +func (m *MockStepExecutor) RealtimeSummary() *execute.SubtaskSummary { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RealtimeSummary") + ret0, _ := ret[0].(*execute.SubtaskSummary) + return ret0 +} + +// RealtimeSummary indicates an expected call of RealtimeSummary. +func (mr *MockStepExecutorMockRecorder) RealtimeSummary() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RealtimeSummary", reflect.TypeOf((*MockStepExecutor)(nil).RealtimeSummary)) +} + // RunSubtask mocks base method. func (m *MockStepExecutor) RunSubtask(arg0 context.Context, arg1 *proto.Subtask) error { m.ctrl.T.Helper() diff --git a/pkg/disttask/framework/mock/task_executor_mock.go b/pkg/disttask/framework/mock/task_executor_mock.go index 68c52ac702..54343ddd2e 100644 --- a/pkg/disttask/framework/mock/task_executor_mock.go +++ b/pkg/disttask/framework/mock/task_executor_mock.go @@ -477,18 +477,18 @@ func (m *MockExtension) EXPECT() *MockExtensionMockRecorder { } // GetStepExecutor mocks base method. -func (m *MockExtension) GetStepExecutor(arg0 *proto.Task, arg1 *execute.Summary, arg2 *proto.StepResource) (execute.StepExecutor, error) { +func (m *MockExtension) GetStepExecutor(arg0 *proto.Task, arg1 *proto.StepResource) (execute.StepExecutor, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetStepExecutor", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "GetStepExecutor", arg0, arg1) ret0, _ := ret[0].(execute.StepExecutor) ret1, _ := ret[1].(error) return ret0, ret1 } // GetStepExecutor indicates an expected call of GetStepExecutor. -func (mr *MockExtensionMockRecorder) GetStepExecutor(arg0, arg1, arg2 any) *gomock.Call { +func (mr *MockExtensionMockRecorder) GetStepExecutor(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStepExecutor", reflect.TypeOf((*MockExtension)(nil).GetStepExecutor), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStepExecutor", reflect.TypeOf((*MockExtension)(nil).GetStepExecutor), arg0, arg1) } // IsIdempotent mocks base method. diff --git a/pkg/disttask/framework/taskexecutor/execute/BUILD.bazel b/pkg/disttask/framework/taskexecutor/execute/BUILD.bazel index 3049283bd4..43ba5d6f84 100644 --- a/pkg/disttask/framework/taskexecutor/execute/BUILD.bazel +++ b/pkg/disttask/framework/taskexecutor/execute/BUILD.bazel @@ -2,16 +2,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "execute", - srcs = [ - "interface.go", - "summary.go", - ], + srcs = ["interface.go"], importpath = "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute", visibility = ["//visibility:public"], - deps = [ - "//pkg/disttask/framework/proto", - "//pkg/disttask/framework/storage", - "//pkg/util/logutil", - "@org_uber_go_zap//:zap", - ], + deps = ["//pkg/disttask/framework/proto"], ) diff --git a/pkg/disttask/framework/taskexecutor/execute/interface.go b/pkg/disttask/framework/taskexecutor/execute/interface.go index beaef2b86a..9be1fb5db5 100644 --- a/pkg/disttask/framework/taskexecutor/execute/interface.go +++ b/pkg/disttask/framework/taskexecutor/execute/interface.go @@ -33,9 +33,18 @@ type StepExecutor interface { Init(context.Context) error // RunSubtask is used to run the subtask. RunSubtask(ctx context.Context, subtask *proto.Subtask) error + + // RealtimeSummary returns the realtime summary of the running subtask by this executor. + RealtimeSummary() *SubtaskSummary + // OnFinished is used to handle the subtask when it is finished. // The subtask meta can be updated in place. OnFinished(ctx context.Context, subtask *proto.Subtask) error // Cleanup is used to clean up the environment for the subtask executor. Cleanup(context.Context) error } + +// SubtaskSummary contains the summary of a subtask. +type SubtaskSummary struct { + RowCount int64 +} diff --git a/pkg/disttask/framework/taskexecutor/execute/summary.go b/pkg/disttask/framework/taskexecutor/execute/summary.go deleted file mode 100644 index 5bd1924800..0000000000 --- a/pkg/disttask/framework/taskexecutor/execute/summary.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package execute - -import ( - "context" - "sync" - "time" - - "github.com/pingcap/tidb/pkg/disttask/framework/storage" - "github.com/pingcap/tidb/pkg/util/logutil" - "go.uber.org/zap" -) - -// Summary is used to collect the summary of subtasks execution. -type Summary struct { - mu struct { - sync.Mutex - RowCount map[int64]int64 // subtask ID -> row count - } -} - -// NewSummary creates a new Summary. -func NewSummary() *Summary { - return &Summary{ - mu: struct { - sync.Mutex - RowCount map[int64]int64 - }{ - RowCount: map[int64]int64{}, - }, - } -} - -// UpdateRowCount updates the row count of the subtask. -func (s *Summary) UpdateRowCount(subtaskID int64, rowCount int64) { - s.mu.Lock() - defer s.mu.Unlock() - s.mu.RowCount[subtaskID] = rowCount -} - -// UpdateRowCountLoop updates the row count of the subtask periodically. -func (s *Summary) UpdateRowCountLoop(ctx context.Context, taskMgr *storage.TaskManager) { - ticker := time.NewTicker(3 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - s.PersistRowCount(ctx, taskMgr) - } - } -} - -// PersistRowCount persists the row count of the subtask to the storage. -func (s *Summary) PersistRowCount(ctx context.Context, taskMgr *storage.TaskManager) { - var copiedRowCount map[int64]int64 - s.mu.Lock() - if len(s.mu.RowCount) == 0 { - s.mu.Unlock() - return - } - copiedRowCount = make(map[int64]int64, len(s.mu.RowCount)) - for subtaskID, rowCount := range s.mu.RowCount { - copiedRowCount[subtaskID] = rowCount - } - s.mu.Unlock() - - for subtaskID, rowCount := range copiedRowCount { - err := taskMgr.UpdateSubtaskRowCount(ctx, subtaskID, rowCount) - if err != nil { - logutil.Logger(ctx).Warn("update subtask row count failed", zap.Error(err)) - } - } - s.mu.Lock() - for subtaskID := range copiedRowCount { - delete(s.mu.RowCount, subtaskID) - } - s.mu.Unlock() -} diff --git a/pkg/disttask/framework/taskexecutor/interface.go b/pkg/disttask/framework/taskexecutor/interface.go index 5e36cafb55..1066815045 100644 --- a/pkg/disttask/framework/taskexecutor/interface.go +++ b/pkg/disttask/framework/taskexecutor/interface.go @@ -113,7 +113,7 @@ type Extension interface { // Note: // 1. summary is the summary manager of all subtask of the same type now. // 2. should not retry the error from it. - GetStepExecutor(task *proto.Task, summary *execute.Summary, resource *proto.StepResource) (execute.StepExecutor, error) + GetStepExecutor(task *proto.Task, resource *proto.StepResource) (execute.StepExecutor, error) // IsRetryableError returns whether the error is transient. // When error is transient, the framework won't mark subtasks as failed, // then the TaskExecutor can load the subtask again and redo it. @@ -137,6 +137,11 @@ func (*EmptyStepExecutor) RunSubtask(context.Context, *proto.Subtask) error { return nil } +// RealtimeSummary implements the StepExecutor interface. +func (*EmptyStepExecutor) RealtimeSummary() *execute.SubtaskSummary { + return nil +} + // Cleanup implements the StepExecutor interface. func (*EmptyStepExecutor) Cleanup(context.Context) error { return nil diff --git a/pkg/disttask/framework/taskexecutor/register.go b/pkg/disttask/framework/taskexecutor/register.go index 6d2f4efac0..caa6f0d9b8 100644 --- a/pkg/disttask/framework/taskexecutor/register.go +++ b/pkg/disttask/framework/taskexecutor/register.go @@ -18,13 +18,9 @@ import ( "context" "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" ) type taskTypeOptions struct { - // Summary is the summary of all tasks of the task type. - // TODO: better have a summary per task/subtask. - Summary *execute.Summary } // TaskTypeOption is the option of TaskType. @@ -58,8 +54,3 @@ func ClearTaskExecutors() { taskTypes = make(map[proto.TaskType]taskTypeOptions) taskExecutorFactories = make(map[proto.TaskType]taskExecutorFactoryFn) } - -// WithSummary is the option of TaskExecutor to set the summary. -var WithSummary TaskTypeOption = func(opts *taskTypeOptions) { - opts.Summary = execute.NewSummary() -} diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 09cc122b74..187ef414ff 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -43,6 +43,10 @@ var ( // checkBalanceSubtaskInterval is the default check interval for checking // subtasks balance to/away from this node. checkBalanceSubtaskInterval = 2 * time.Second + + // updateSubtaskSummaryInterval is the interval for updating the subtask summary to + // subtask table. + updateSubtaskSummaryInterval = 3 * time.Second ) var ( @@ -158,6 +162,30 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) { } } +func (e *BaseTaskExecutor) updateSubtaskSummaryLoop( + checkCtx, runStepCtx context.Context, stepExec execute.StepExecutor) { + taskMgr := e.taskTable.(*storage.TaskManager) + ticker := time.NewTicker(updateSubtaskSummaryInterval) + defer ticker.Stop() + curSubtaskID := e.currSubtaskID.Load() + update := func() { + summary := stepExec.RealtimeSummary() + err := taskMgr.UpdateSubtaskRowCount(runStepCtx, curSubtaskID, summary.RowCount) + if err != nil { + e.logger.Info("update subtask row count failed", zap.Error(err)) + } + } + for { + select { + case <-checkCtx.Done(): + update() + return + case <-ticker.C: + } + update() + } +} + // Init implements the TaskExecutor interface. func (*BaseTaskExecutor) Init(_ context.Context) error { return nil @@ -287,13 +315,7 @@ func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error) stepLogger.End(zap.InfoLevel, resErr) }() - summary, cleanup, err := runSummaryCollectLoop(runStepCtx, task, e.taskTable) - if err != nil { - e.onError(err) - return e.getError() - } - defer cleanup() - stepExecutor, err := e.GetStepExecutor(task, summary, resource) + stepExecutor, err := e.GetStepExecutor(task, resource) if err != nil { e.onError(err) return e.getError() @@ -376,6 +398,11 @@ func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error) return e.getError() } +func (e *BaseTaskExecutor) hasRealtimeSummary(stepExecutor execute.StepExecutor) bool { + _, ok := e.taskTable.(*storage.TaskManager) + return ok && stepExecutor.RealtimeSummary() != nil +} + func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute.StepExecutor, subtask *proto.Subtask) { err := func() error { e.currSubtaskID.Store(subtask.ID) @@ -385,11 +412,16 @@ func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute. wg.RunWithLog(func() { e.checkBalanceSubtask(checkCtx) }) + + if e.hasRealtimeSummary(stepExecutor) { + wg.RunWithLog(func() { + e.updateSubtaskSummaryLoop(checkCtx, ctx, stepExecutor) + }) + } defer func() { checkCancel() wg.Wait() }() - return stepExecutor.RunSubtask(ctx, subtask) }() failpoint.Inject("MockRunSubtaskCancel", func(val failpoint.Value) { @@ -557,31 +589,6 @@ func (e *BaseTaskExecutor) refreshTask() error { return nil } -func runSummaryCollectLoop( - ctx context.Context, - task *proto.Task, - taskTable TaskTable, -) (summary *execute.Summary, cleanup func(), err error) { - failpoint.Inject("mockSummaryCollectErr", func() { - failpoint.Return(nil, func() {}, errors.New("summary collect err")) - }) - taskMgr, ok := taskTable.(*storage.TaskManager) - if !ok { - return nil, func() {}, nil - } - opt, ok := taskTypes[task.Type] - if !ok { - return nil, func() {}, errors.Errorf("taskExecutor option for type %s not found", task.Type) - } - if opt.Summary != nil { - go opt.Summary.UpdateRowCountLoop(ctx, taskMgr) - return opt.Summary, func() { - opt.Summary.PersistRowCount(ctx, taskMgr) - }, nil - } - return nil, func() {}, nil -} - func (e *BaseTaskExecutor) registerRunStepCancelFunc(cancel context.CancelCauseFunc) { e.mu.Lock() defer e.mu.Unlock() diff --git a/pkg/disttask/framework/taskexecutor/task_executor_test.go b/pkg/disttask/framework/taskexecutor/task_executor_test.go index 8bdc219259..e0c6dbe8dc 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor_test.go +++ b/pkg/disttask/framework/taskexecutor/task_executor_test.go @@ -20,7 +20,6 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/disttask/framework/mock" mockexecute "github.com/pingcap/tidb/pkg/disttask/framework/mock/execute" "github.com/pingcap/tidb/pkg/disttask/framework/proto" @@ -56,7 +55,7 @@ func TestTaskExecutorRun(t *testing.T) { // 1. no taskExecutor constructor taskExecutorRegisterErr := errors.Errorf("constructor of taskExecutor for key not found") - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, taskExecutorRegisterErr).Times(2) + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(nil, taskExecutorRegisterErr).Times(2) taskExecutor := NewBaseTaskExecutor(ctx, "id", task1, mockSubtaskTable) taskExecutor.Extension = mockExtension err := taskExecutor.runStep(nil) @@ -67,7 +66,7 @@ func TestTaskExecutorRun(t *testing.T) { require.True(t, ctrl.Satisfied()) // 2. init subtask exec env failed - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() initErr := errors.New("init error") mockStepExecutor.EXPECT().Init(gomock.Any()).Return(initErr) @@ -311,7 +310,7 @@ func TestTaskExecutorRollback(t *testing.T) { taskExecutor := NewBaseTaskExecutor(ctx, "id", task1, mockSubtaskTable) taskExecutor.Extension = mockExtension - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() // 2. get subtask failed getSubtaskErr := errors.New("get subtask error") @@ -357,7 +356,7 @@ func TestTaskExecutor(t *testing.T) { mockStepExecutor := mockexecute.NewMockStepExecutor(ctrl) mockExtension := mock.NewMockExtension(ctrl) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", taskID, proto.SubtaskStateFailed, gomock.Any()).Return(nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() // mock for checkBalanceSubtask mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", @@ -424,7 +423,7 @@ func TestRunStepCurrentSubtaskScheduledAway(t *testing.T) { mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", task.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) // mock for runStep - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil) + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "tidb1", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(subtasks[0], nil) @@ -512,39 +511,39 @@ func TestExecutorErrHandling(t *testing.T) { taskExecutor := NewBaseTaskExecutor(ctx, "id", task, mockSubtaskTable) taskExecutor.Extension = mockExtension - // 1. GetStepExecutor meet retryable error. + // GetStepExecutor meet retryable error. getSubtaskExecutorErr := errors.New("get executor err") - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, getSubtaskExecutorErr) + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(nil, getSubtaskExecutorErr) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true) require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) - // 2. GetStepExecutor meet non retryable error. - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, getSubtaskExecutorErr) + // GetStepExecutor meet non retryable error. + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(nil, getSubtaskExecutorErr) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), getSubtaskExecutorErr) require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) - // 3. Init meet retryable error. + // Init meet retryable error. initErr := errors.New("executor init err") - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(initErr) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true) require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) - // 4. Init meet non retryable error. - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) + // Init meet non retryable error. + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(initErr) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), initErr) require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) - // 5. Cleanup meet retryable error. + // Cleanup meet retryable error. cleanupErr := errors.New("cleanup err") - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ @@ -560,8 +559,8 @@ func TestExecutorErrHandling(t *testing.T) { require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) - // 6. Cleanup meet non retryable error. - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) + // Cleanup meet non retryable error. + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ @@ -578,21 +577,8 @@ func TestExecutorErrHandling(t *testing.T) { require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) - // 7. runSummaryCollectLoop meet retryable error. - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockSummaryCollectErr", "return()")) - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true) - require.NoError(t, taskExecutor.RunStep(nil)) - require.True(t, ctrl.Satisfied()) - - // 8. runSummaryCollectLoop meet non retryable error. - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) - mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), gomock.Any()) - require.NoError(t, taskExecutor.RunStep(nil)) - require.True(t, ctrl.Satisfied()) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockSummaryCollectErr")) - - // 9. subtask succeed. - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) + // subtask succeed. + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ diff --git a/pkg/disttask/framework/testutil/disttest_util.go b/pkg/disttask/framework/testutil/disttest_util.go index 181e939aad..b1e6990377 100644 --- a/pkg/disttask/framework/testutil/disttest_util.go +++ b/pkg/disttask/framework/testutil/disttest_util.go @@ -46,11 +46,12 @@ func RegisterTaskMeta(t *testing.T, ctrl *gomock.Controller, schedulerHandle sch } return nil }).AnyTimes() + mockStepExecutor.EXPECT().RealtimeSummary().Return(nil).AnyTimes() } else { mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(runSubtaskFn).AnyTimes() } mockExtension.EXPECT().IsIdempotent(gomock.Any()).Return(true).AnyTimes() - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() registerTaskMetaInner(t, proto.TaskTypeExample, mockExtension, mockCleanupRountine, schedulerHandle) } @@ -95,9 +96,10 @@ func RegisterRollbackTaskMeta(t *testing.T, ctrl *gomock.Controller, mockSchedul testContext.CollectSubtask(subtask) return nil }).AnyTimes() + mockExecutor.EXPECT().RealtimeSummary().Return(nil).AnyTimes() mockExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockExtension.EXPECT().IsIdempotent(gomock.Any()).Return(true).AnyTimes() - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockExecutor, nil).AnyTimes() + mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockExecutor, nil).AnyTimes() mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() registerTaskMetaInner(t, proto.TaskTypeExample, mockExtension, mockCleanupRountine, mockScheduler) diff --git a/pkg/disttask/framework/testutil/executor_util.go b/pkg/disttask/framework/testutil/executor_util.go index eeade2d1f9..dd25c288f3 100644 --- a/pkg/disttask/framework/testutil/executor_util.go +++ b/pkg/disttask/framework/testutil/executor_util.go @@ -37,7 +37,7 @@ func GetMockStepExecutor(ctrl *gomock.Controller) *mockexecute.MockStepExecutor func GetMockTaskExecutorExtension(ctrl *gomock.Controller, mockStepExecutor *mockexecute.MockStepExecutor) *mock.MockExtension { mockExtension := mock.NewMockExtension(ctrl) mockExtension.EXPECT(). - GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()). + GetStepExecutor(gomock.Any(), gomock.Any()). Return(mockStepExecutor, nil).AnyTimes() mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() return mockExtension @@ -49,6 +49,7 @@ func InitTaskExecutor(ctrl *gomock.Controller, runSubtaskFn func(ctx context.Con mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn( runSubtaskFn, ).AnyTimes() + mockStepExecutor.EXPECT().RealtimeSummary().Return(nil).AnyTimes() mockExtension := GetMockTaskExecutorExtension(ctrl, mockStepExecutor) taskexecutor.RegisterTaskType(proto.TaskTypeExample, diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 3c03f483e2..304b77b346 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -180,6 +180,10 @@ outer: return pipeline.Close() } +func (*importStepExecutor) RealtimeSummary() *execute.SubtaskSummary { + return nil +} + func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subtask) error { var subtaskMeta ImportStepMeta if err := json.Unmarshal(subtask.Meta, &subtaskMeta); err != nil { @@ -390,6 +394,10 @@ func (e *writeAndIngestStepExecutor) RunSubtask(ctx context.Context, subtask *pr return localBackend.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) } +func (*writeAndIngestStepExecutor) RealtimeSummary() *execute.SubtaskSummary { + return nil +} + func (e *writeAndIngestStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subtask) error { var subtaskMeta WriteIngestStepMeta if err := json.Unmarshal(subtask.Meta, &subtaskMeta); err != nil { @@ -481,7 +489,7 @@ func (*importExecutor) IsRetryableError(err error) bool { return common.IsRetryableError(err) } -func (*importExecutor) GetStepExecutor(task *proto.Task, _ *execute.Summary, _ *proto.StepResource) (execute.StepExecutor, error) { +func (*importExecutor) GetStepExecutor(task *proto.Task, _ *proto.StepResource) (execute.StepExecutor, error) { taskMeta := TaskMeta{} if err := json.Unmarshal(task.Meta, &taskMeta); err != nil { return nil, errors.Trace(err) diff --git a/pkg/disttask/importinto/task_executor_test.go b/pkg/disttask/importinto/task_executor_test.go index 4c992918df..ef41622bc3 100644 --- a/pkg/disttask/importinto/task_executor_test.go +++ b/pkg/disttask/importinto/task_executor_test.go @@ -43,12 +43,12 @@ func TestImportTaskExecutor(t *testing.T) { proto.ImportStepWriteAndIngest, proto.ImportStepPostProcess, } { - exe, err := executor.GetStepExecutor(&proto.Task{Step: step, Meta: []byte("{}")}, nil, nil) + exe, err := executor.GetStepExecutor(&proto.Task{Step: step, Meta: []byte("{}")}, nil) require.NoError(t, err) require.NotNil(t, exe) } - _, err := executor.GetStepExecutor(&proto.Task{Step: proto.StepInit, Meta: []byte("{}")}, nil, nil) + _, err := executor.GetStepExecutor(&proto.Task{Step: proto.StepInit, Meta: []byte("{}")}, nil) require.Error(t, err) - _, err = executor.GetStepExecutor(&proto.Task{Step: proto.ImportStepImport, Meta: []byte("")}, nil, nil) + _, err = executor.GetStepExecutor(&proto.Task{Step: proto.ImportStepImport, Meta: []byte("")}, nil) require.Error(t, err) }