executor: tiny cleanup for WithRecovery (#60425)
This commit is contained in:
@ -179,37 +179,36 @@ func (e *CheckTableExec) Next(ctx context.Context, _ *chunk.Chunk) error {
|
||||
taskCh <- src
|
||||
}
|
||||
for i := 0; i < concurrency; i++ {
|
||||
wg.Run(func() {
|
||||
util.WithRecovery(func() {
|
||||
for {
|
||||
if fail := failure.Load(); fail {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case src := <-taskCh:
|
||||
err1 := e.checkIndexHandle(ctx, src)
|
||||
if err1 == nil && src.index.MVIndex {
|
||||
for offset, idx := range e.indexInfos {
|
||||
if idx.ID == src.index.ID {
|
||||
err1 = e.checkTableRecord(ctx, offset)
|
||||
break
|
||||
}
|
||||
wg.RunWithRecover(func() {
|
||||
for {
|
||||
if fail := failure.Load(); fail {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case src := <-taskCh:
|
||||
err1 := e.checkIndexHandle(ctx, src)
|
||||
if err1 == nil && src.index.MVIndex {
|
||||
for offset, idx := range e.indexInfos {
|
||||
if idx.ID == src.index.ID {
|
||||
err1 = e.checkTableRecord(ctx, offset)
|
||||
break
|
||||
}
|
||||
}
|
||||
if err1 != nil {
|
||||
failure.Store(true)
|
||||
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1))
|
||||
return
|
||||
}
|
||||
case <-e.exitCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
if err1 != nil {
|
||||
failure.Store(true)
|
||||
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1))
|
||||
return
|
||||
}
|
||||
case <-e.exitCh:
|
||||
return
|
||||
default:
|
||||
return
|
||||
}
|
||||
}, e.handlePanic)
|
||||
})
|
||||
}
|
||||
}, e.handlePanic)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
select {
|
||||
case err := <-e.retCh:
|
||||
|
||||
@ -175,7 +175,7 @@ func (w *WaitGroupWrapper) RunWithLog(exec func()) {
|
||||
defer w.Done()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logutil.BgLogger().Error("panic in wait group", zap.Any("recover", r), zap.Stack("stack"))
|
||||
logutil.BgLogger().Error("panic in the wait group", zap.Any("recover", r), zap.Stack("stack"))
|
||||
}
|
||||
}()
|
||||
exec()
|
||||
@ -194,6 +194,9 @@ func (w *WaitGroupWrapper) RunWithRecover(exec func(), recoverFn func(r any)) {
|
||||
if recoverFn != nil {
|
||||
recoverFn(r)
|
||||
}
|
||||
if r != nil {
|
||||
logutil.BgLogger().Error("panic in the wait group", zap.Any("recover", r), zap.Stack("stack"))
|
||||
}
|
||||
w.Done()
|
||||
}()
|
||||
exec()
|
||||
@ -253,7 +256,7 @@ func (g *ErrorGroupWithRecover) Go(fn func() error) {
|
||||
g.Group.Go(func() (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logutil.BgLogger().Error("panic in error group", zap.Any("recover", r), zap.Stack("stack"))
|
||||
logutil.BgLogger().Error("panic in the error group", zap.Any("recover", r), zap.Stack("stack"))
|
||||
err = GetRecoverError(r)
|
||||
}
|
||||
}()
|
||||
|
||||
Reference in New Issue
Block a user