ddl, disttask: collect summary for each subtask (#50838)

ref pingcap/tidb#48795
This commit is contained in:
tangenta
2024-02-01 15:58:54 +08:00
committed by GitHub
parent 134d2bfbe0
commit cac449b337
19 changed files with 135 additions and 203 deletions

View File

@ -61,7 +61,7 @@ type BackfillSubTaskMeta struct {
// NewBackfillSubtaskExecutor creates a new backfill subtask executor.
func NewBackfillSubtaskExecutor(taskMeta []byte, d *ddl,
bc ingest.BackendCtx, stage proto.Step, summary *execute.Summary) (execute.StepExecutor, error) {
bc ingest.BackendCtx, stage proto.Step) (execute.StepExecutor, error) {
bgm := &BackfillTaskMeta{}
err := json.Unmarshal(taskMeta, bgm)
if err != nil {
@ -90,7 +90,7 @@ func NewBackfillSubtaskExecutor(taskMeta []byte, d *ddl,
d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query)
d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type)
return newReadIndexExecutor(
d, &bgm.Job, indexInfos, tbl.(table.PhysicalTable), jc, bc, summary, bgm.CloudStorageURI), nil
d, &bgm.Job, indexInfos, tbl.(table.PhysicalTable), jc, bc, bgm.CloudStorageURI), nil
case proto.BackfillStepMergeSort:
return newMergeSortExecutor(jobMeta.ID, len(indexInfos), tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI)
case proto.BackfillStepWriteAndIngest:
@ -167,10 +167,10 @@ func decodeIndexUniqueness(job *model.Job) (bool, error) {
return unique[0], nil
}
func (s *backfillDistExecutor) GetStepExecutor(task *proto.Task, summary *execute.Summary, _ *proto.StepResource) (execute.StepExecutor, error) {
func (s *backfillDistExecutor) GetStepExecutor(task *proto.Task, _ *proto.StepResource) (execute.StepExecutor, error) {
switch task.Step {
case proto.BackfillStepReadIndex, proto.BackfillStepMergeSort, proto.BackfillStepWriteAndIngest:
return NewBackfillSubtaskExecutor(task.Meta, s.d, s.backendCtx, task.Step, summary)
return NewBackfillSubtaskExecutor(task.Meta, s.d, s.backendCtx, task.Step)
default:
return nil, errors.Errorf("unknown backfill step %d for task %d", task.Step, task.ID)
}

View File

@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
@ -30,6 +31,7 @@ import (
)
type cloudImportExecutor struct {
taskexecutor.EmptyStepExecutor
job *model.Job
jobID int64
index *model.IndexInfo

View File

@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
@ -34,6 +35,7 @@ import (
)
type mergeSortExecutor struct {
taskexecutor.EmptyStepExecutor
jobID int64
idxNum int
ptbl table.PhysicalTable

View File

@ -46,8 +46,8 @@ type readIndexExecutor struct {
cloudStorageURI string
bc ingest.BackendCtx
summary *execute.Summary
bc ingest.BackendCtx
curRowCount *atomic.Int64
subtaskSummary sync.Map // subtaskID => readIndexSummary
}
@ -67,7 +67,6 @@ func newReadIndexExecutor(
ptbl table.PhysicalTable,
jc *JobContext,
bc ingest.BackendCtx,
summary *execute.Summary,
cloudStorageURI string,
) *readIndexExecutor {
return &readIndexExecutor{
@ -77,8 +76,8 @@ func newReadIndexExecutor(
ptbl: ptbl,
jc: jc,
bc: bc,
summary: summary,
cloudStorageURI: cloudStorageURI,
curRowCount: &atomic.Int64{},
}
}
@ -117,13 +116,13 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
opCtx := NewOperatorCtx(ctx)
defer opCtx.Cancel()
totalRowCount := &atomic.Int64{}
r.curRowCount.Store(0)
var pipe *operator.AsyncPipeline
if len(r.cloudStorageURI) > 0 {
pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sessCtx, tbl, startKey, endKey, totalRowCount)
pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sessCtx, tbl, startKey, endKey, r.curRowCount)
} else {
pipe, err = r.buildLocalStorePipeline(opCtx, sessCtx, tbl, startKey, endKey, totalRowCount)
pipe, err = r.buildLocalStorePipeline(opCtx, sessCtx, tbl, startKey, endKey, r.curRowCount)
}
if err != nil {
return err
@ -142,10 +141,15 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
}
r.bc.ResetWorkers(r.job.ID)
r.summary.UpdateRowCount(subtask.ID, totalRowCount.Load())
return nil
}
func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary {
return &execute.SubtaskSummary{
RowCount: r.curRowCount.Load(),
}
}
func (*readIndexExecutor) Cleanup(ctx context.Context) error {
logutil.Logger(ctx).Info("read index executor cleanup subtask exec env",
zap.String("category", "ddl"))

View File

@ -742,7 +742,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
taskexecutor.RegisterTaskType(proto.Backfill,
func(ctx context.Context, id string, task *proto.Task, taskTable taskexecutor.TaskTable) taskexecutor.TaskExecutor {
return newBackfillDistExecutor(ctx, id, task, taskTable, d)
}, taskexecutor.WithSummary,
},
)
scheduler.RegisterSchedulerFactory(proto.Backfill,

View File

@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/taskexecutor/execute",
"@org_uber_go_mock//gomock",
],
)

View File

@ -13,6 +13,7 @@ import (
reflect "reflect"
proto "github.com/pingcap/tidb/pkg/disttask/framework/proto"
execute "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
gomock "go.uber.org/mock/gomock"
)
@ -81,6 +82,20 @@ func (mr *MockStepExecutorMockRecorder) OnFinished(arg0, arg1 any) *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFinished", reflect.TypeOf((*MockStepExecutor)(nil).OnFinished), arg0, arg1)
}
// RealtimeSummary mocks base method.
func (m *MockStepExecutor) RealtimeSummary() *execute.SubtaskSummary {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RealtimeSummary")
ret0, _ := ret[0].(*execute.SubtaskSummary)
return ret0
}
// RealtimeSummary indicates an expected call of RealtimeSummary.
func (mr *MockStepExecutorMockRecorder) RealtimeSummary() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RealtimeSummary", reflect.TypeOf((*MockStepExecutor)(nil).RealtimeSummary))
}
// RunSubtask mocks base method.
func (m *MockStepExecutor) RunSubtask(arg0 context.Context, arg1 *proto.Subtask) error {
m.ctrl.T.Helper()

View File

@ -477,18 +477,18 @@ func (m *MockExtension) EXPECT() *MockExtensionMockRecorder {
}
// GetStepExecutor mocks base method.
func (m *MockExtension) GetStepExecutor(arg0 *proto.Task, arg1 *execute.Summary, arg2 *proto.StepResource) (execute.StepExecutor, error) {
func (m *MockExtension) GetStepExecutor(arg0 *proto.Task, arg1 *proto.StepResource) (execute.StepExecutor, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetStepExecutor", arg0, arg1, arg2)
ret := m.ctrl.Call(m, "GetStepExecutor", arg0, arg1)
ret0, _ := ret[0].(execute.StepExecutor)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetStepExecutor indicates an expected call of GetStepExecutor.
func (mr *MockExtensionMockRecorder) GetStepExecutor(arg0, arg1, arg2 any) *gomock.Call {
func (mr *MockExtensionMockRecorder) GetStepExecutor(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStepExecutor", reflect.TypeOf((*MockExtension)(nil).GetStepExecutor), arg0, arg1, arg2)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStepExecutor", reflect.TypeOf((*MockExtension)(nil).GetStepExecutor), arg0, arg1)
}
// IsIdempotent mocks base method.

View File

@ -2,16 +2,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "execute",
srcs = [
"interface.go",
"summary.go",
],
srcs = ["interface.go"],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute",
visibility = ["//visibility:public"],
deps = [
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/util/logutil",
"@org_uber_go_zap//:zap",
],
deps = ["//pkg/disttask/framework/proto"],
)

View File

@ -33,9 +33,18 @@ type StepExecutor interface {
Init(context.Context) error
// RunSubtask is used to run the subtask.
RunSubtask(ctx context.Context, subtask *proto.Subtask) error
// RealtimeSummary returns the realtime summary of the running subtask by this executor.
RealtimeSummary() *SubtaskSummary
// OnFinished is used to handle the subtask when it is finished.
// The subtask meta can be updated in place.
OnFinished(ctx context.Context, subtask *proto.Subtask) error
// Cleanup is used to clean up the environment for the subtask executor.
Cleanup(context.Context) error
}
// SubtaskSummary contains the summary of a subtask.
type SubtaskSummary struct {
RowCount int64
}

View File

@ -1,93 +0,0 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package execute
import (
"context"
"sync"
"time"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)
// Summary is used to collect the summary of subtasks execution.
type Summary struct {
mu struct {
sync.Mutex
RowCount map[int64]int64 // subtask ID -> row count
}
}
// NewSummary creates a new Summary.
func NewSummary() *Summary {
return &Summary{
mu: struct {
sync.Mutex
RowCount map[int64]int64
}{
RowCount: map[int64]int64{},
},
}
}
// UpdateRowCount updates the row count of the subtask.
func (s *Summary) UpdateRowCount(subtaskID int64, rowCount int64) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RowCount[subtaskID] = rowCount
}
// UpdateRowCountLoop updates the row count of the subtask periodically.
func (s *Summary) UpdateRowCountLoop(ctx context.Context, taskMgr *storage.TaskManager) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.PersistRowCount(ctx, taskMgr)
}
}
}
// PersistRowCount persists the row count of the subtask to the storage.
func (s *Summary) PersistRowCount(ctx context.Context, taskMgr *storage.TaskManager) {
var copiedRowCount map[int64]int64
s.mu.Lock()
if len(s.mu.RowCount) == 0 {
s.mu.Unlock()
return
}
copiedRowCount = make(map[int64]int64, len(s.mu.RowCount))
for subtaskID, rowCount := range s.mu.RowCount {
copiedRowCount[subtaskID] = rowCount
}
s.mu.Unlock()
for subtaskID, rowCount := range copiedRowCount {
err := taskMgr.UpdateSubtaskRowCount(ctx, subtaskID, rowCount)
if err != nil {
logutil.Logger(ctx).Warn("update subtask row count failed", zap.Error(err))
}
}
s.mu.Lock()
for subtaskID := range copiedRowCount {
delete(s.mu.RowCount, subtaskID)
}
s.mu.Unlock()
}

View File

@ -113,7 +113,7 @@ type Extension interface {
// Note:
// 1. summary is the summary manager of all subtask of the same type now.
// 2. should not retry the error from it.
GetStepExecutor(task *proto.Task, summary *execute.Summary, resource *proto.StepResource) (execute.StepExecutor, error)
GetStepExecutor(task *proto.Task, resource *proto.StepResource) (execute.StepExecutor, error)
// IsRetryableError returns whether the error is transient.
// When error is transient, the framework won't mark subtasks as failed,
// then the TaskExecutor can load the subtask again and redo it.
@ -137,6 +137,11 @@ func (*EmptyStepExecutor) RunSubtask(context.Context, *proto.Subtask) error {
return nil
}
// RealtimeSummary implements the StepExecutor interface.
func (*EmptyStepExecutor) RealtimeSummary() *execute.SubtaskSummary {
return nil
}
// Cleanup implements the StepExecutor interface.
func (*EmptyStepExecutor) Cleanup(context.Context) error {
return nil

View File

@ -18,13 +18,9 @@ import (
"context"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
)
type taskTypeOptions struct {
// Summary is the summary of all tasks of the task type.
// TODO: better have a summary per task/subtask.
Summary *execute.Summary
}
// TaskTypeOption is the option of TaskType.
@ -58,8 +54,3 @@ func ClearTaskExecutors() {
taskTypes = make(map[proto.TaskType]taskTypeOptions)
taskExecutorFactories = make(map[proto.TaskType]taskExecutorFactoryFn)
}
// WithSummary is the option of TaskExecutor to set the summary.
var WithSummary TaskTypeOption = func(opts *taskTypeOptions) {
opts.Summary = execute.NewSummary()
}

View File

@ -43,6 +43,10 @@ var (
// checkBalanceSubtaskInterval is the default check interval for checking
// subtasks balance to/away from this node.
checkBalanceSubtaskInterval = 2 * time.Second
// updateSubtaskSummaryInterval is the interval for updating the subtask summary to
// subtask table.
updateSubtaskSummaryInterval = 3 * time.Second
)
var (
@ -158,6 +162,30 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
}
}
func (e *BaseTaskExecutor) updateSubtaskSummaryLoop(
checkCtx, runStepCtx context.Context, stepExec execute.StepExecutor) {
taskMgr := e.taskTable.(*storage.TaskManager)
ticker := time.NewTicker(updateSubtaskSummaryInterval)
defer ticker.Stop()
curSubtaskID := e.currSubtaskID.Load()
update := func() {
summary := stepExec.RealtimeSummary()
err := taskMgr.UpdateSubtaskRowCount(runStepCtx, curSubtaskID, summary.RowCount)
if err != nil {
e.logger.Info("update subtask row count failed", zap.Error(err))
}
}
for {
select {
case <-checkCtx.Done():
update()
return
case <-ticker.C:
}
update()
}
}
// Init implements the TaskExecutor interface.
func (*BaseTaskExecutor) Init(_ context.Context) error {
return nil
@ -287,13 +315,7 @@ func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error)
stepLogger.End(zap.InfoLevel, resErr)
}()
summary, cleanup, err := runSummaryCollectLoop(runStepCtx, task, e.taskTable)
if err != nil {
e.onError(err)
return e.getError()
}
defer cleanup()
stepExecutor, err := e.GetStepExecutor(task, summary, resource)
stepExecutor, err := e.GetStepExecutor(task, resource)
if err != nil {
e.onError(err)
return e.getError()
@ -376,6 +398,11 @@ func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error)
return e.getError()
}
func (e *BaseTaskExecutor) hasRealtimeSummary(stepExecutor execute.StepExecutor) bool {
_, ok := e.taskTable.(*storage.TaskManager)
return ok && stepExecutor.RealtimeSummary() != nil
}
func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute.StepExecutor, subtask *proto.Subtask) {
err := func() error {
e.currSubtaskID.Store(subtask.ID)
@ -385,11 +412,16 @@ func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute.
wg.RunWithLog(func() {
e.checkBalanceSubtask(checkCtx)
})
if e.hasRealtimeSummary(stepExecutor) {
wg.RunWithLog(func() {
e.updateSubtaskSummaryLoop(checkCtx, ctx, stepExecutor)
})
}
defer func() {
checkCancel()
wg.Wait()
}()
return stepExecutor.RunSubtask(ctx, subtask)
}()
failpoint.Inject("MockRunSubtaskCancel", func(val failpoint.Value) {
@ -557,31 +589,6 @@ func (e *BaseTaskExecutor) refreshTask() error {
return nil
}
func runSummaryCollectLoop(
ctx context.Context,
task *proto.Task,
taskTable TaskTable,
) (summary *execute.Summary, cleanup func(), err error) {
failpoint.Inject("mockSummaryCollectErr", func() {
failpoint.Return(nil, func() {}, errors.New("summary collect err"))
})
taskMgr, ok := taskTable.(*storage.TaskManager)
if !ok {
return nil, func() {}, nil
}
opt, ok := taskTypes[task.Type]
if !ok {
return nil, func() {}, errors.Errorf("taskExecutor option for type %s not found", task.Type)
}
if opt.Summary != nil {
go opt.Summary.UpdateRowCountLoop(ctx, taskMgr)
return opt.Summary, func() {
opt.Summary.PersistRowCount(ctx, taskMgr)
}, nil
}
return nil, func() {}, nil
}
func (e *BaseTaskExecutor) registerRunStepCancelFunc(cancel context.CancelCauseFunc) {
e.mu.Lock()
defer e.mu.Unlock()

View File

@ -20,7 +20,6 @@ import (
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
mockexecute "github.com/pingcap/tidb/pkg/disttask/framework/mock/execute"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
@ -56,7 +55,7 @@ func TestTaskExecutorRun(t *testing.T) {
// 1. no taskExecutor constructor
taskExecutorRegisterErr := errors.Errorf("constructor of taskExecutor for key not found")
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, taskExecutorRegisterErr).Times(2)
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(nil, taskExecutorRegisterErr).Times(2)
taskExecutor := NewBaseTaskExecutor(ctx, "id", task1, mockSubtaskTable)
taskExecutor.Extension = mockExtension
err := taskExecutor.runStep(nil)
@ -67,7 +66,7 @@ func TestTaskExecutorRun(t *testing.T) {
require.True(t, ctrl.Satisfied())
// 2. init subtask exec env failed
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes()
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes()
initErr := errors.New("init error")
mockStepExecutor.EXPECT().Init(gomock.Any()).Return(initErr)
@ -311,7 +310,7 @@ func TestTaskExecutorRollback(t *testing.T) {
taskExecutor := NewBaseTaskExecutor(ctx, "id", task1, mockSubtaskTable)
taskExecutor.Extension = mockExtension
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes()
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes()
// 2. get subtask failed
getSubtaskErr := errors.New("get subtask error")
@ -357,7 +356,7 @@ func TestTaskExecutor(t *testing.T) {
mockStepExecutor := mockexecute.NewMockStepExecutor(ctrl)
mockExtension := mock.NewMockExtension(ctrl)
mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", taskID, proto.SubtaskStateFailed, gomock.Any()).Return(nil)
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes()
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes()
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes()
// mock for checkBalanceSubtask
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id",
@ -424,7 +423,7 @@ func TestRunStepCurrentSubtaskScheduledAway(t *testing.T) {
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
task.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil)
// mock for runStep
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil)
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil)
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false)
mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "tidb1", task.ID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return(subtasks[0], nil)
@ -512,39 +511,39 @@ func TestExecutorErrHandling(t *testing.T) {
taskExecutor := NewBaseTaskExecutor(ctx, "id", task, mockSubtaskTable)
taskExecutor.Extension = mockExtension
// 1. GetStepExecutor meet retryable error.
// GetStepExecutor meet retryable error.
getSubtaskExecutorErr := errors.New("get executor err")
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, getSubtaskExecutorErr)
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(nil, getSubtaskExecutorErr)
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true)
require.NoError(t, taskExecutor.RunStep(nil))
require.True(t, ctrl.Satisfied())
// 2. GetStepExecutor meet non retryable error.
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, getSubtaskExecutorErr)
// GetStepExecutor meet non retryable error.
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(nil, getSubtaskExecutorErr)
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false)
mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), getSubtaskExecutorErr)
require.NoError(t, taskExecutor.RunStep(nil))
require.True(t, ctrl.Satisfied())
// 3. Init meet retryable error.
// Init meet retryable error.
initErr := errors.New("executor init err")
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil)
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil)
mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(initErr)
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true)
require.NoError(t, taskExecutor.RunStep(nil))
require.True(t, ctrl.Satisfied())
// 4. Init meet non retryable error.
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil)
// Init meet non retryable error.
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil)
mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(initErr)
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false)
mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), initErr)
require.NoError(t, taskExecutor.RunStep(nil))
require.True(t, ctrl.Satisfied())
// 5. Cleanup meet retryable error.
// Cleanup meet retryable error.
cleanupErr := errors.New("cleanup err")
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil)
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil)
mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return(&proto.Subtask{
@ -560,8 +559,8 @@ func TestExecutorErrHandling(t *testing.T) {
require.NoError(t, taskExecutor.RunStep(nil))
require.True(t, ctrl.Satisfied())
// 6. Cleanup meet non retryable error.
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil)
// Cleanup meet non retryable error.
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil)
mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return(&proto.Subtask{
@ -578,21 +577,8 @@ func TestExecutorErrHandling(t *testing.T) {
require.NoError(t, taskExecutor.RunStep(nil))
require.True(t, ctrl.Satisfied())
// 7. runSummaryCollectLoop meet retryable error.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockSummaryCollectErr", "return()"))
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true)
require.NoError(t, taskExecutor.RunStep(nil))
require.True(t, ctrl.Satisfied())
// 8. runSummaryCollectLoop meet non retryable error.
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false)
mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), gomock.Any())
require.NoError(t, taskExecutor.RunStep(nil))
require.True(t, ctrl.Satisfied())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockSummaryCollectErr"))
// 9. subtask succeed.
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil)
// subtask succeed.
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil)
mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return(&proto.Subtask{

View File

@ -46,11 +46,12 @@ func RegisterTaskMeta(t *testing.T, ctrl *gomock.Controller, schedulerHandle sch
}
return nil
}).AnyTimes()
mockStepExecutor.EXPECT().RealtimeSummary().Return(nil).AnyTimes()
} else {
mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(runSubtaskFn).AnyTimes()
}
mockExtension.EXPECT().IsIdempotent(gomock.Any()).Return(true).AnyTimes()
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes()
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes()
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes()
registerTaskMetaInner(t, proto.TaskTypeExample, mockExtension, mockCleanupRountine, schedulerHandle)
}
@ -95,9 +96,10 @@ func RegisterRollbackTaskMeta(t *testing.T, ctrl *gomock.Controller, mockSchedul
testContext.CollectSubtask(subtask)
return nil
}).AnyTimes()
mockExecutor.EXPECT().RealtimeSummary().Return(nil).AnyTimes()
mockExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
mockExtension.EXPECT().IsIdempotent(gomock.Any()).Return(true).AnyTimes()
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockExecutor, nil).AnyTimes()
mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockExecutor, nil).AnyTimes()
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes()
registerTaskMetaInner(t, proto.TaskTypeExample, mockExtension, mockCleanupRountine, mockScheduler)

View File

@ -37,7 +37,7 @@ func GetMockStepExecutor(ctrl *gomock.Controller) *mockexecute.MockStepExecutor
func GetMockTaskExecutorExtension(ctrl *gomock.Controller, mockStepExecutor *mockexecute.MockStepExecutor) *mock.MockExtension {
mockExtension := mock.NewMockExtension(ctrl)
mockExtension.EXPECT().
GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).
GetStepExecutor(gomock.Any(), gomock.Any()).
Return(mockStepExecutor, nil).AnyTimes()
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes()
return mockExtension
@ -49,6 +49,7 @@ func InitTaskExecutor(ctrl *gomock.Controller, runSubtaskFn func(ctx context.Con
mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(
runSubtaskFn,
).AnyTimes()
mockStepExecutor.EXPECT().RealtimeSummary().Return(nil).AnyTimes()
mockExtension := GetMockTaskExecutorExtension(ctrl, mockStepExecutor)
taskexecutor.RegisterTaskType(proto.TaskTypeExample,

View File

@ -180,6 +180,10 @@ outer:
return pipeline.Close()
}
func (*importStepExecutor) RealtimeSummary() *execute.SubtaskSummary {
return nil
}
func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subtask) error {
var subtaskMeta ImportStepMeta
if err := json.Unmarshal(subtask.Meta, &subtaskMeta); err != nil {
@ -390,6 +394,10 @@ func (e *writeAndIngestStepExecutor) RunSubtask(ctx context.Context, subtask *pr
return localBackend.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
}
func (*writeAndIngestStepExecutor) RealtimeSummary() *execute.SubtaskSummary {
return nil
}
func (e *writeAndIngestStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subtask) error {
var subtaskMeta WriteIngestStepMeta
if err := json.Unmarshal(subtask.Meta, &subtaskMeta); err != nil {
@ -481,7 +489,7 @@ func (*importExecutor) IsRetryableError(err error) bool {
return common.IsRetryableError(err)
}
func (*importExecutor) GetStepExecutor(task *proto.Task, _ *execute.Summary, _ *proto.StepResource) (execute.StepExecutor, error) {
func (*importExecutor) GetStepExecutor(task *proto.Task, _ *proto.StepResource) (execute.StepExecutor, error) {
taskMeta := TaskMeta{}
if err := json.Unmarshal(task.Meta, &taskMeta); err != nil {
return nil, errors.Trace(err)

View File

@ -43,12 +43,12 @@ func TestImportTaskExecutor(t *testing.T) {
proto.ImportStepWriteAndIngest,
proto.ImportStepPostProcess,
} {
exe, err := executor.GetStepExecutor(&proto.Task{Step: step, Meta: []byte("{}")}, nil, nil)
exe, err := executor.GetStepExecutor(&proto.Task{Step: step, Meta: []byte("{}")}, nil)
require.NoError(t, err)
require.NotNil(t, exe)
}
_, err := executor.GetStepExecutor(&proto.Task{Step: proto.StepInit, Meta: []byte("{}")}, nil, nil)
_, err := executor.GetStepExecutor(&proto.Task{Step: proto.StepInit, Meta: []byte("{}")}, nil)
require.Error(t, err)
_, err = executor.GetStepExecutor(&proto.Task{Step: proto.ImportStepImport, Meta: []byte("")}, nil, nil)
_, err = executor.GetStepExecutor(&proto.Task{Step: proto.ImportStepImport, Meta: []byte("")}, nil)
require.Error(t, err)
}