disttask/ddl: refine manager error handling (#48095)
ref pingcap/tidb#46258, close pingcap/tidb#48064
This commit is contained in:
@ -79,6 +79,8 @@ var retryableErrorIDs = map[errors.ErrorID]struct{}{
|
||||
ErrKVReadIndexNotReady.ID(): {},
|
||||
ErrKVIngestFailed.ID(): {},
|
||||
ErrKVRaftProposalDropped.ID(): {},
|
||||
// litBackendCtxMgr.Register may return the error.
|
||||
ErrCreatePDClient.ID(): {},
|
||||
// during checksum coprocessor will transform error into driver error in handleCopResponse using ToTiDBErr
|
||||
// met ErrRegionUnavailable on free-tier import during checksum, others hasn't met yet
|
||||
drivererr.ErrRegionUnavailable.ID(): {},
|
||||
|
||||
@ -1088,9 +1088,7 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
|
||||
// TODO(tangenta): get duplicate column and match index.
|
||||
err = convertToKeyExistsErr(err, allIndexInfos[0], tbl.Meta())
|
||||
}
|
||||
if !errorIsRetryable(err, job) ||
|
||||
// TODO: Remove this check make it can be retry. Related test is TestModifyColumnReorgInfo.
|
||||
job.ReorgMeta.IsDistReorg {
|
||||
if !errorIsRetryable(err, job) {
|
||||
logutil.BgLogger().Warn("run add index job failed, convert job to rollback", zap.String("category", "ddl"), zap.String("job", job.String()), zap.Error(err))
|
||||
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), allIndexInfos, err)
|
||||
if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/common"
|
||||
"github.com/pingcap/tidb/pkg/config"
|
||||
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
|
||||
"github.com/pingcap/tidb/pkg/domain/infosync"
|
||||
@ -416,13 +417,18 @@ func (m *Manager) removeHandlingTask(id int64) {
|
||||
}
|
||||
|
||||
func (m *Manager) logErr(err error) {
|
||||
logutil.Logger(m.logCtx).Error("task manager error", zap.Error(err), zap.Stack("stack"))
|
||||
logutil.Logger(m.logCtx).Error("task manager met error", zap.Error(err), zap.Stack("stack"))
|
||||
}
|
||||
|
||||
func (m *Manager) logErrAndPersist(err error, taskID int64) {
|
||||
m.logErr(err)
|
||||
// TODO: use interface if each business to retry
|
||||
if common.IsRetryableError(err) || isRetryableError(err) {
|
||||
return
|
||||
}
|
||||
err1 := m.taskTable.UpdateErrorToSubtask(m.id, taskID, err)
|
||||
if err1 != nil {
|
||||
logutil.Logger(m.logCtx).Error("update to subtask failed", zap.Error(err1), zap.Stack("stack"))
|
||||
}
|
||||
logutil.Logger(m.logCtx).Error("update error to subtask", zap.Int64("task-id", taskID), zap.Error(err1), zap.Stack("stack"))
|
||||
}
|
||||
|
||||
@ -474,13 +474,13 @@ func (s *BaseScheduler) onError(err error) {
|
||||
return
|
||||
}
|
||||
err = errors.Trace(err)
|
||||
logutil.Logger(s.logCtx).Error("onError", zap.Error(err))
|
||||
logutil.Logger(s.logCtx).Error("onError", zap.Error(err), zap.Stack("stack"))
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.mu.err == nil {
|
||||
s.mu.err = err
|
||||
logutil.Logger(s.logCtx).Error("scheduler error", zap.Error(err))
|
||||
logutil.Logger(s.logCtx).Error("scheduler met first error", zap.Error(err))
|
||||
}
|
||||
|
||||
if s.mu.runtimeCancel != nil {
|
||||
@ -620,5 +620,8 @@ func (s *BaseScheduler) updateErrorToSubtask(ctx context.Context, taskID int64,
|
||||
return true, s.taskTable.UpdateErrorToSubtask(s.id, taskID, err)
|
||||
},
|
||||
)
|
||||
if err1 == nil {
|
||||
logger.Warn("update error to subtask success", zap.Error(err))
|
||||
}
|
||||
return err1
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user