diff --git a/pkg/executor/check_table_index.go b/pkg/executor/check_table_index.go index 3ff0c0dd7e..d955b41c1a 100644 --- a/pkg/executor/check_table_index.go +++ b/pkg/executor/check_table_index.go @@ -179,37 +179,36 @@ func (e *CheckTableExec) Next(ctx context.Context, _ *chunk.Chunk) error { taskCh <- src } for i := 0; i < concurrency; i++ { - wg.Run(func() { - util.WithRecovery(func() { - for { - if fail := failure.Load(); fail { - return - } - select { - case src := <-taskCh: - err1 := e.checkIndexHandle(ctx, src) - if err1 == nil && src.index.MVIndex { - for offset, idx := range e.indexInfos { - if idx.ID == src.index.ID { - err1 = e.checkTableRecord(ctx, offset) - break - } + wg.RunWithRecover(func() { + for { + if fail := failure.Load(); fail { + return + } + select { + case src := <-taskCh: + err1 := e.checkIndexHandle(ctx, src) + if err1 == nil && src.index.MVIndex { + for offset, idx := range e.indexInfos { + if idx.ID == src.index.ID { + err1 = e.checkTableRecord(ctx, offset) + break } } - if err1 != nil { - failure.Store(true) - logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1)) - return - } - case <-e.exitCh: - return - default: + } + if err1 != nil { + failure.Store(true) + logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1)) return } + case <-e.exitCh: + return + default: + return } - }, e.handlePanic) - }) + } + }, e.handlePanic) } + wg.Wait() select { case err := <-e.retCh: diff --git a/pkg/util/wait_group_wrapper.go b/pkg/util/wait_group_wrapper.go index c2e3e7f2d3..3fccd66810 100644 --- a/pkg/util/wait_group_wrapper.go +++ b/pkg/util/wait_group_wrapper.go @@ -175,7 +175,7 @@ func (w *WaitGroupWrapper) RunWithLog(exec func()) { defer w.Done() defer func() { if r := recover(); r != nil { - logutil.BgLogger().Error("panic in wait group", zap.Any("recover", r), zap.Stack("stack")) + logutil.BgLogger().Error("panic in the wait group", zap.Any("recover", r), zap.Stack("stack")) } }() exec() @@ -194,6 +194,9 @@ func (w *WaitGroupWrapper) RunWithRecover(exec func(), recoverFn func(r any)) { if recoverFn != nil { recoverFn(r) } + if r != nil { + logutil.BgLogger().Error("panic in the wait group", zap.Any("recover", r), zap.Stack("stack")) + } w.Done() }() exec() @@ -253,7 +256,7 @@ func (g *ErrorGroupWithRecover) Go(fn func() error) { g.Group.Go(func() (err error) { defer func() { if r := recover(); r != nil { - logutil.BgLogger().Error("panic in error group", zap.Any("recover", r), zap.Stack("stack")) + logutil.BgLogger().Error("panic in the error group", zap.Any("recover", r), zap.Stack("stack")) err = GetRecoverError(r) } }()