importinto: fix zero update time during validation phase and refine logs (#63425)

ref pingcap/tidb#61702
This commit is contained in:
D3Hunter
2025-09-09 20:07:54 +08:00
committed by GitHub
parent b337e44c84
commit 37f8a14657
9 changed files with 59 additions and 26 deletions

View File

@ -690,6 +690,11 @@ func (*BaseScheduler) isStepSucceed(cntByStates map[proto.SubtaskState]int64) bo
return len(cntByStates) == 0 || (len(cntByStates) == 1 && ok)
}
// GetLogger returns the logger.
func (s *BaseScheduler) GetLogger() *zap.Logger {
return s.logger
}
// IsCancelledErr checks if the error is a cancelled error.
func IsCancelledErr(err error) bool {
return strings.Contains(err.Error(), taskCancelMsg)

View File

@ -55,6 +55,7 @@ type LogicalPlan struct {
Stmt string
EligibleInstances []*serverinfo.ServerInfo
ChunkMap map[int32][]importer.Chunk
Logger *zap.Logger
// summary for next step
summary importer.StepSummary
@ -294,7 +295,7 @@ func buildControllerForPlan(p *LogicalPlan) (*importer.LoadDataController, error
if err != nil {
return nil, err
}
controller, err := importer.NewLoadDataController(plan, tbl, astArgs)
controller, err := importer.NewLoadDataController(plan, tbl, astArgs, importer.WithLogger(p.Logger))
if err != nil {
return nil, err
}

View File

@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/verification"
"github.com/pingcap/tidb/pkg/meta/autoid"
"go.uber.org/zap"
)
// TaskMeta is the task of IMPORT INTO.
@ -162,6 +163,7 @@ type importStepMinimalTask struct {
Chunk importer.Chunk
SharedVars *SharedVars
panicked *atomic.Bool
logger *zap.Logger
}
// RecoverArgs implements workerpool.TaskMayPanic interface.

View File

@ -70,6 +70,7 @@ type taskInfo struct {
// initialized lazily in register()
etcdClient *clientv3.Client
taskRegister utils.TaskRegister
logger *zap.Logger
}
func (t *taskInfo) register(ctx context.Context) {
@ -80,7 +81,7 @@ func (t *taskInfo) register(ctx context.Context) {
if time.Since(t.lastRegisterTime) < refreshTaskTTLInterval {
return
}
logger := logutil.BgLogger().With(zap.Int64("task-id", t.taskID))
logger := t.logger
if t.taskRegister == nil {
client, err := store.NewEtcdCli(t.store)
if err != nil {
@ -104,7 +105,7 @@ func (t *taskInfo) register(ctx context.Context) {
}
func (t *taskInfo) close(ctx context.Context) {
logger := logutil.BgLogger().With(zap.Int64("task-id", t.taskID))
logger := t.logger
if t.taskRegister != nil {
timeoutCtx, cancel := context.WithTimeout(ctx, registerTimeout)
defer cancel()
@ -241,7 +242,7 @@ func (sch *importScheduler) switchTiKVMode(ctx context.Context, task *proto.Task
return
}
logger := logutil.BgLogger().With(zap.Int64("task-id", task.ID))
logger := sch.GetLogger()
// TODO: use the TLS object from TiDB server
tidbCfg := tidb.GetGlobalConfig()
tls, err := util.NewTLSConfig(
@ -260,7 +261,7 @@ func (sch *importScheduler) switchTiKVMode(ctx context.Context, task *proto.Task
}
func (sch *importScheduler) registerTask(ctx context.Context, task *proto.Task) {
val, _ := sch.taskInfoMap.LoadOrStore(task.ID, &taskInfo{store: sch.store, taskID: task.ID})
val, _ := sch.taskInfoMap.LoadOrStore(task.ID, &taskInfo{store: sch.store, taskID: task.ID, logger: sch.GetLogger()})
info := val.(*taskInfo)
info.register(ctx)
}
@ -287,18 +288,17 @@ func (sch *importScheduler) OnNextSubtasksBatch(
// available.
nodeCnt = task.MaxNodeCount
}
logger := logutil.BgLogger().With(
zap.Stringer("type", task.Type),
zap.Int64("task-id", task.ID),
zap.String("curr-step", proto.Step2Str(task.Type, task.Step)),
zap.String("next-step", proto.Step2Str(task.Type, nextStep)),
zap.Int("node-count", nodeCnt),
)
taskMeta := &TaskMeta{}
err = json.Unmarshal(task.Meta, taskMeta)
if err != nil {
return nil, errors.Trace(err)
}
logger := sch.GetLogger().With(
zap.String("curr-step", proto.Step2Str(task.Type, task.Step)),
zap.String("next-step", proto.Step2Str(task.Type, nextStep)),
zap.Int("node-count", nodeCnt),
zap.Int64("table-id", taskMeta.Plan.TableInfo.ID),
)
logger.Info("on next subtasks batch")
previousSubtaskMetas := make(map[proto.Step][][]byte, 1)
@ -373,7 +373,7 @@ func (sch *importScheduler) OnNextSubtasksBatch(
Store: sch.store,
ThreadCnt: task.Concurrency,
}
logicalPlan := &LogicalPlan{}
logicalPlan := &LogicalPlan{Logger: logger}
if err := logicalPlan.FromTaskMeta(task.Meta); err != nil {
return nil, err
}
@ -396,11 +396,7 @@ func (sch *importScheduler) OnNextSubtasksBatch(
// OnDone implements scheduler.Extension interface.
func (sch *importScheduler) OnDone(ctx context.Context, _ storage.TaskHandle, task *proto.Task) error {
logger := logutil.BgLogger().With(
zap.Stringer("type", task.Type),
zap.Int64("task-id", task.ID),
zap.String("step", proto.Step2Str(task.Type, task.Step)),
)
logger := sch.GetLogger().With(zap.String("step", proto.Step2Str(task.Type, task.Step)))
logger.Info("task done", zap.Stringer("state", task.State), zap.Error(task.Error))
taskMeta := &TaskMeta{}
err := json.Unmarshal(task.Meta, taskMeta)

View File

@ -21,7 +21,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config/kerneltype"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor/importer"
@ -63,8 +62,7 @@ func (e *importMinimalTaskExecutor) Run(
dataWriter, indexWriter backend.EngineWriter,
collector execute.Collector,
) error {
logger := logutil.BgLogger().With(zap.Stringer("type", proto.ImportInto), zap.Int64("table-id", e.mTtask.Plan.TableInfo.ID))
logger.Info("execute chunk")
logger := e.mTtask.logger
failpoint.Inject("beforeSortChunk", func() {})
failpoint.Inject("errorWhenSortChunk", func() {
failpoint.Return(errors.New("occur an error when sort chunk"))

View File

@ -84,6 +84,7 @@ func getTableImporter(
taskID int64,
taskMeta *TaskMeta,
store tidbkv.Storage,
logger *zap.Logger,
) (*importer.TableImporter, error) {
idAlloc := kv.NewPanickingAllocators(taskMeta.Plan.TableInfo.SepAutoInc())
tbl, err := tables.TableFromMeta(idAlloc, taskMeta.Plan.TableInfo)
@ -94,7 +95,7 @@ func getTableImporter(
if err != nil {
return nil, err
}
controller, err := importer.NewLoadDataController(&taskMeta.Plan, tbl, astArgs)
controller, err := importer.NewLoadDataController(&taskMeta.Plan, tbl, astArgs, importer.WithLogger(logger))
if err != nil {
return nil, err
}
@ -109,7 +110,7 @@ func (s *importStepExecutor) Init(ctx context.Context) (err error) {
s.logger.Info("init subtask env")
var tableImporter *importer.TableImporter
var taskManager *dxfstorage.TaskManager
tableImporter, err = getTableImporter(ctx, s.taskID, s.taskMeta, s.store)
tableImporter, err = getTableImporter(ctx, s.taskID, s.taskMeta, s.store, s.logger)
if err != nil {
return err
}
@ -241,6 +242,7 @@ outer:
Chunk: chunk,
SharedVars: sharedVars,
panicked: &panicked,
logger: logger,
}:
case <-op.Done():
break outer
@ -497,7 +499,7 @@ type writeAndIngestStepExecutor struct {
var _ execute.StepExecutor = &writeAndIngestStepExecutor{}
func (e *writeAndIngestStepExecutor) Init(ctx context.Context) error {
tableImporter, err := getTableImporter(ctx, e.taskID, e.taskMeta, e.store)
tableImporter, err := getTableImporter(ctx, e.taskID, e.taskMeta, e.store, e.logger)
if err != nil {
return err
}

View File

@ -551,8 +551,18 @@ func ASTArgsFromStmt(stmt string) (*ASTArgs, error) {
}, nil
}
// Option is used to set optional parameters for LoadDataController.
type Option func(c *LoadDataController)
// WithLogger sets the logger for LoadDataController.
func WithLogger(logger *zap.Logger) Option {
return func(c *LoadDataController) {
c.logger = logger
}
}
// NewLoadDataController create new controller.
func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs) (*LoadDataController, error) {
func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs, options ...Option) (*LoadDataController, error) {
fullTableName := tbl.Meta().Name.String()
logger := log.L().With(zap.String("table", fullTableName))
c := &LoadDataController{
@ -562,6 +572,10 @@ func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs) (*Load
logger: logger,
ExecuteNodesCnt: 1,
}
for _, opt := range options {
opt(c)
}
if err := c.checkFieldParams(); err != nil {
return nil, err
}

View File

@ -2456,7 +2456,13 @@ func FillOneImportJobInfo(result *chunk.Chunk, info *importer.JobInfo, runInfo *
return
}
result.AppendTime(14, runInfo.UpdateTime)
// update time of run info comes from subtask summary, but checksum step don't
// have period updated summary.
updateTime := runInfo.UpdateTime
if updateTime.IsZero() {
updateTime = info.UpdateTime
}
result.AppendTime(14, updateTime)
result.AppendString(15, proto.Step2Str(proto.ImportInto, runInfo.Step))
result.AppendString(16, runInfo.ProcessedSize())
result.AppendString(17, runInfo.TotalSize())

View File

@ -44,8 +44,11 @@ func TestFillOneImportJobInfo(t *testing.T) {
fieldTypes = append(fieldTypes, fieldType)
}
c := chunk.New(fieldTypes, 10, 10)
t2024 := types.NewTime(types.FromGoTime(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)), mysql.TypeTimestamp, 0)
t2025 := types.NewTime(types.FromGoTime(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)), mysql.TypeTimestamp, 0)
jobInfo := &importer.JobInfo{
Parameters: importer.ImportParameters{},
UpdateTime: t2024,
}
fmap := plannercore.ImportIntoFieldMap
@ -63,6 +66,8 @@ func TestFillOneImportJobInfo(t *testing.T) {
require.Equal(t, uint64(0), c.GetRow(1).GetUint64(rowCntIdx))
require.True(t, c.GetRow(1).IsNull(startIdx))
require.True(t, c.GetRow(1).IsNull(endIdx))
// runtime info doesn't have update time, so use job info's update time
require.EqualValues(t, t2024, c.GetRow(1).GetTime(14))
jobInfo.Status = importer.JobStatusFinished
jobInfo.Summary = &importer.Summary{ImportedRows: 123}
@ -85,6 +90,10 @@ func TestFillOneImportJobInfo(t *testing.T) {
require.Equal(t, "97.66KiB", c.GetRow(3).GetString(fmap["CurStepTotalSize"]))
require.Equal(t, "0", c.GetRow(3).GetString(fmap["CurStepProgressPct"]))
require.Equal(t, "13:53:15", c.GetRow(3).GetString(fmap["CurStepETA"]))
// runtime info have update time, so use it
executor.FillOneImportJobInfo(c, jobInfo, &importinto.RuntimeInfo{ImportRows: 0, UpdateTime: t2025})
require.EqualValues(t, t2025, c.GetRow(4).GetTime(14))
}
func TestShow(t *testing.T) {