From 37f8a14657bcdd2b45bb575af6fa8bf16cb45d7d Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 9 Sep 2025 20:07:54 +0800 Subject: [PATCH] importinto: fix zero update time during validation phase and refine logs (#63425) ref pingcap/tidb#61702 --- pkg/disttask/framework/scheduler/scheduler.go | 5 ++++ pkg/disttask/importinto/planner.go | 3 +- pkg/disttask/importinto/proto.go | 2 ++ pkg/disttask/importinto/scheduler.go | 30 ++++++++----------- pkg/disttask/importinto/subtask_executor.go | 4 +-- pkg/disttask/importinto/task_executor.go | 8 +++-- pkg/executor/importer/import.go | 16 +++++++++- pkg/executor/show.go | 8 ++++- pkg/executor/show_test.go | 9 ++++++ 9 files changed, 59 insertions(+), 26 deletions(-) diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index f23e52a5ea..77ae61d067 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -690,6 +690,11 @@ func (*BaseScheduler) isStepSucceed(cntByStates map[proto.SubtaskState]int64) bo return len(cntByStates) == 0 || (len(cntByStates) == 1 && ok) } +// GetLogger returns the logger. +func (s *BaseScheduler) GetLogger() *zap.Logger { + return s.logger +} + // IsCancelledErr checks if the error is a cancelled error. func IsCancelledErr(err error) bool { return strings.Contains(err.Error(), taskCancelMsg) diff --git a/pkg/disttask/importinto/planner.go b/pkg/disttask/importinto/planner.go index 43419ff6f7..ca81ff336e 100644 --- a/pkg/disttask/importinto/planner.go +++ b/pkg/disttask/importinto/planner.go @@ -55,6 +55,7 @@ type LogicalPlan struct { Stmt string EligibleInstances []*serverinfo.ServerInfo ChunkMap map[int32][]importer.Chunk + Logger *zap.Logger // summary for next step summary importer.StepSummary @@ -294,7 +295,7 @@ func buildControllerForPlan(p *LogicalPlan) (*importer.LoadDataController, error if err != nil { return nil, err } - controller, err := importer.NewLoadDataController(plan, tbl, astArgs) + controller, err := importer.NewLoadDataController(plan, tbl, astArgs, importer.WithLogger(p.Logger)) if err != nil { return nil, err } diff --git a/pkg/disttask/importinto/proto.go b/pkg/disttask/importinto/proto.go index c32f5e9d29..9808b65d9b 100644 --- a/pkg/disttask/importinto/proto.go +++ b/pkg/disttask/importinto/proto.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/backend/external" "github.com/pingcap/tidb/pkg/lightning/verification" "github.com/pingcap/tidb/pkg/meta/autoid" + "go.uber.org/zap" ) // TaskMeta is the task of IMPORT INTO. @@ -162,6 +163,7 @@ type importStepMinimalTask struct { Chunk importer.Chunk SharedVars *SharedVars panicked *atomic.Bool + logger *zap.Logger } // RecoverArgs implements workerpool.TaskMayPanic interface. diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index 9df2563ee5..bbe9c7e620 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -70,6 +70,7 @@ type taskInfo struct { // initialized lazily in register() etcdClient *clientv3.Client taskRegister utils.TaskRegister + logger *zap.Logger } func (t *taskInfo) register(ctx context.Context) { @@ -80,7 +81,7 @@ func (t *taskInfo) register(ctx context.Context) { if time.Since(t.lastRegisterTime) < refreshTaskTTLInterval { return } - logger := logutil.BgLogger().With(zap.Int64("task-id", t.taskID)) + logger := t.logger if t.taskRegister == nil { client, err := store.NewEtcdCli(t.store) if err != nil { @@ -104,7 +105,7 @@ func (t *taskInfo) register(ctx context.Context) { } func (t *taskInfo) close(ctx context.Context) { - logger := logutil.BgLogger().With(zap.Int64("task-id", t.taskID)) + logger := t.logger if t.taskRegister != nil { timeoutCtx, cancel := context.WithTimeout(ctx, registerTimeout) defer cancel() @@ -241,7 +242,7 @@ func (sch *importScheduler) switchTiKVMode(ctx context.Context, task *proto.Task return } - logger := logutil.BgLogger().With(zap.Int64("task-id", task.ID)) + logger := sch.GetLogger() // TODO: use the TLS object from TiDB server tidbCfg := tidb.GetGlobalConfig() tls, err := util.NewTLSConfig( @@ -260,7 +261,7 @@ func (sch *importScheduler) switchTiKVMode(ctx context.Context, task *proto.Task } func (sch *importScheduler) registerTask(ctx context.Context, task *proto.Task) { - val, _ := sch.taskInfoMap.LoadOrStore(task.ID, &taskInfo{store: sch.store, taskID: task.ID}) + val, _ := sch.taskInfoMap.LoadOrStore(task.ID, &taskInfo{store: sch.store, taskID: task.ID, logger: sch.GetLogger()}) info := val.(*taskInfo) info.register(ctx) } @@ -287,18 +288,17 @@ func (sch *importScheduler) OnNextSubtasksBatch( // available. nodeCnt = task.MaxNodeCount } - logger := logutil.BgLogger().With( - zap.Stringer("type", task.Type), - zap.Int64("task-id", task.ID), - zap.String("curr-step", proto.Step2Str(task.Type, task.Step)), - zap.String("next-step", proto.Step2Str(task.Type, nextStep)), - zap.Int("node-count", nodeCnt), - ) taskMeta := &TaskMeta{} err = json.Unmarshal(task.Meta, taskMeta) if err != nil { return nil, errors.Trace(err) } + logger := sch.GetLogger().With( + zap.String("curr-step", proto.Step2Str(task.Type, task.Step)), + zap.String("next-step", proto.Step2Str(task.Type, nextStep)), + zap.Int("node-count", nodeCnt), + zap.Int64("table-id", taskMeta.Plan.TableInfo.ID), + ) logger.Info("on next subtasks batch") previousSubtaskMetas := make(map[proto.Step][][]byte, 1) @@ -373,7 +373,7 @@ func (sch *importScheduler) OnNextSubtasksBatch( Store: sch.store, ThreadCnt: task.Concurrency, } - logicalPlan := &LogicalPlan{} + logicalPlan := &LogicalPlan{Logger: logger} if err := logicalPlan.FromTaskMeta(task.Meta); err != nil { return nil, err } @@ -396,11 +396,7 @@ func (sch *importScheduler) OnNextSubtasksBatch( // OnDone implements scheduler.Extension interface. func (sch *importScheduler) OnDone(ctx context.Context, _ storage.TaskHandle, task *proto.Task) error { - logger := logutil.BgLogger().With( - zap.Stringer("type", task.Type), - zap.Int64("task-id", task.ID), - zap.String("step", proto.Step2Str(task.Type, task.Step)), - ) + logger := sch.GetLogger().With(zap.String("step", proto.Step2Str(task.Type, task.Step))) logger.Info("task done", zap.Stringer("state", task.State), zap.Error(task.Error)) taskMeta := &TaskMeta{} err := json.Unmarshal(task.Meta, taskMeta) diff --git a/pkg/disttask/importinto/subtask_executor.go b/pkg/disttask/importinto/subtask_executor.go index 8770c6eb30..026cbfed96 100644 --- a/pkg/disttask/importinto/subtask_executor.go +++ b/pkg/disttask/importinto/subtask_executor.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config/kerneltype" "github.com/pingcap/tidb/pkg/ddl" - "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/executor/importer" @@ -63,8 +62,7 @@ func (e *importMinimalTaskExecutor) Run( dataWriter, indexWriter backend.EngineWriter, collector execute.Collector, ) error { - logger := logutil.BgLogger().With(zap.Stringer("type", proto.ImportInto), zap.Int64("table-id", e.mTtask.Plan.TableInfo.ID)) - logger.Info("execute chunk") + logger := e.mTtask.logger failpoint.Inject("beforeSortChunk", func() {}) failpoint.Inject("errorWhenSortChunk", func() { failpoint.Return(errors.New("occur an error when sort chunk")) diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index b774fe62df..a60d3a3e61 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -84,6 +84,7 @@ func getTableImporter( taskID int64, taskMeta *TaskMeta, store tidbkv.Storage, + logger *zap.Logger, ) (*importer.TableImporter, error) { idAlloc := kv.NewPanickingAllocators(taskMeta.Plan.TableInfo.SepAutoInc()) tbl, err := tables.TableFromMeta(idAlloc, taskMeta.Plan.TableInfo) @@ -94,7 +95,7 @@ func getTableImporter( if err != nil { return nil, err } - controller, err := importer.NewLoadDataController(&taskMeta.Plan, tbl, astArgs) + controller, err := importer.NewLoadDataController(&taskMeta.Plan, tbl, astArgs, importer.WithLogger(logger)) if err != nil { return nil, err } @@ -109,7 +110,7 @@ func (s *importStepExecutor) Init(ctx context.Context) (err error) { s.logger.Info("init subtask env") var tableImporter *importer.TableImporter var taskManager *dxfstorage.TaskManager - tableImporter, err = getTableImporter(ctx, s.taskID, s.taskMeta, s.store) + tableImporter, err = getTableImporter(ctx, s.taskID, s.taskMeta, s.store, s.logger) if err != nil { return err } @@ -241,6 +242,7 @@ outer: Chunk: chunk, SharedVars: sharedVars, panicked: &panicked, + logger: logger, }: case <-op.Done(): break outer @@ -497,7 +499,7 @@ type writeAndIngestStepExecutor struct { var _ execute.StepExecutor = &writeAndIngestStepExecutor{} func (e *writeAndIngestStepExecutor) Init(ctx context.Context) error { - tableImporter, err := getTableImporter(ctx, e.taskID, e.taskMeta, e.store) + tableImporter, err := getTableImporter(ctx, e.taskID, e.taskMeta, e.store, e.logger) if err != nil { return err } diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index db539176ec..99521e8090 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -551,8 +551,18 @@ func ASTArgsFromStmt(stmt string) (*ASTArgs, error) { }, nil } +// Option is used to set optional parameters for LoadDataController. +type Option func(c *LoadDataController) + +// WithLogger sets the logger for LoadDataController. +func WithLogger(logger *zap.Logger) Option { + return func(c *LoadDataController) { + c.logger = logger + } +} + // NewLoadDataController create new controller. -func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs) (*LoadDataController, error) { +func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs, options ...Option) (*LoadDataController, error) { fullTableName := tbl.Meta().Name.String() logger := log.L().With(zap.String("table", fullTableName)) c := &LoadDataController{ @@ -562,6 +572,10 @@ func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs) (*Load logger: logger, ExecuteNodesCnt: 1, } + for _, opt := range options { + opt(c) + } + if err := c.checkFieldParams(); err != nil { return nil, err } diff --git a/pkg/executor/show.go b/pkg/executor/show.go index fd83d5bf64..7e7a5484ab 100644 --- a/pkg/executor/show.go +++ b/pkg/executor/show.go @@ -2456,7 +2456,13 @@ func FillOneImportJobInfo(result *chunk.Chunk, info *importer.JobInfo, runInfo * return } - result.AppendTime(14, runInfo.UpdateTime) + // update time of run info comes from subtask summary, but checksum step don't + // have period updated summary. + updateTime := runInfo.UpdateTime + if updateTime.IsZero() { + updateTime = info.UpdateTime + } + result.AppendTime(14, updateTime) result.AppendString(15, proto.Step2Str(proto.ImportInto, runInfo.Step)) result.AppendString(16, runInfo.ProcessedSize()) result.AppendString(17, runInfo.TotalSize()) diff --git a/pkg/executor/show_test.go b/pkg/executor/show_test.go index e872cf1385..c3196e2615 100644 --- a/pkg/executor/show_test.go +++ b/pkg/executor/show_test.go @@ -44,8 +44,11 @@ func TestFillOneImportJobInfo(t *testing.T) { fieldTypes = append(fieldTypes, fieldType) } c := chunk.New(fieldTypes, 10, 10) + t2024 := types.NewTime(types.FromGoTime(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)), mysql.TypeTimestamp, 0) + t2025 := types.NewTime(types.FromGoTime(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)), mysql.TypeTimestamp, 0) jobInfo := &importer.JobInfo{ Parameters: importer.ImportParameters{}, + UpdateTime: t2024, } fmap := plannercore.ImportIntoFieldMap @@ -63,6 +66,8 @@ func TestFillOneImportJobInfo(t *testing.T) { require.Equal(t, uint64(0), c.GetRow(1).GetUint64(rowCntIdx)) require.True(t, c.GetRow(1).IsNull(startIdx)) require.True(t, c.GetRow(1).IsNull(endIdx)) + // runtime info doesn't have update time, so use job info's update time + require.EqualValues(t, t2024, c.GetRow(1).GetTime(14)) jobInfo.Status = importer.JobStatusFinished jobInfo.Summary = &importer.Summary{ImportedRows: 123} @@ -85,6 +90,10 @@ func TestFillOneImportJobInfo(t *testing.T) { require.Equal(t, "97.66KiB", c.GetRow(3).GetString(fmap["CurStepTotalSize"])) require.Equal(t, "0", c.GetRow(3).GetString(fmap["CurStepProgressPct"])) require.Equal(t, "13:53:15", c.GetRow(3).GetString(fmap["CurStepETA"])) + + // runtime info have update time, so use it + executor.FillOneImportJobInfo(c, jobInfo, &importinto.RuntimeInfo{ImportRows: 0, UpdateTime: t2025}) + require.EqualValues(t, t2025, c.GetRow(4).GetTime(14)) } func TestShow(t *testing.T) {