executor: Fix query hang on hash aggregate operator (#6982)

This commit is contained in:
Zhang Jian
2018-07-04 17:44:40 +08:00
committed by Shen Li
parent 1310996667
commit 5601c45189

View File

@ -481,8 +481,8 @@ func (w *HashAggFinalWorker) consumeIntermData(sc *stmtctx.StatementContext) (er
func (w *HashAggFinalWorker) getFinalResult(sc *stmtctx.StatementContext) {
groupIter := w.groupSet.NewIterator()
result, ok := <-w.finalResultHolderCh
if !ok {
result, finished := w.receiveFinalResultHolder()
if finished {
return
}
result.Reset()
@ -503,8 +503,8 @@ func (w *HashAggFinalWorker) getFinalResult(sc *stmtctx.StatementContext) {
result.AppendRow(w.mutableRow.ToRow())
if result.NumRows() == w.maxChunkSize {
w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh}
result, ok = <-w.finalResultHolderCh
if !ok {
result, finished = w.receiveFinalResultHolder()
if finished {
return
}
result.Reset()
@ -512,6 +512,15 @@ func (w *HashAggFinalWorker) getFinalResult(sc *stmtctx.StatementContext) {
}
}
func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) {
select {
case <-w.finishCh:
return nil, true
case result, ok := <-w.finalResultHolderCh:
return result, !ok
}
}
func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup) {
defer func() {
waitGroup.Done()