diff --git a/executor/aggregate.go b/executor/aggregate.go index 63241b2b52..12db7390a8 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -481,8 +481,8 @@ func (w *HashAggFinalWorker) consumeIntermData(sc *stmtctx.StatementContext) (er func (w *HashAggFinalWorker) getFinalResult(sc *stmtctx.StatementContext) { groupIter := w.groupSet.NewIterator() - result, ok := <-w.finalResultHolderCh - if !ok { + result, finished := w.receiveFinalResultHolder() + if finished { return } result.Reset() @@ -503,8 +503,8 @@ func (w *HashAggFinalWorker) getFinalResult(sc *stmtctx.StatementContext) { result.AppendRow(w.mutableRow.ToRow()) if result.NumRows() == w.maxChunkSize { w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh} - result, ok = <-w.finalResultHolderCh - if !ok { + result, finished = w.receiveFinalResultHolder() + if finished { return } result.Reset() @@ -512,6 +512,15 @@ func (w *HashAggFinalWorker) getFinalResult(sc *stmtctx.StatementContext) { } } +func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) { + select { + case <-w.finishCh: + return nil, true + case result, ok := <-w.finalResultHolderCh: + return result, !ok + } +} + func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup) { defer func() { waitGroup.Done()