From 11837925248564fd17cdce8ce93ebc57cc3b0df5 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 1 Jun 2017 15:50:51 +0800 Subject: [PATCH] executor: stop fetch handles after closing. (#3365) --- executor/distsql.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/executor/distsql.go b/executor/distsql.go index e6625d3cac..4b232d17fd 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -365,6 +365,8 @@ type XSelectIndexExec struct { taskCurr *lookupTableTask handleCount uint64 // returned handle count in double read. + closeCh chan struct{} + where *tipb.Expr startTS uint64 returnedRows uint64 // returned row count @@ -398,6 +400,7 @@ type XSelectIndexExec struct { func (e *XSelectIndexExec) Open() error { e.returnedRows = 0 e.partialCount = 0 + e.closeCh = make(chan struct{}) return nil } @@ -414,6 +417,7 @@ func (e *XSelectIndexExec) Close() error { e.taskCurr = nil if e.taskChan != nil { + close(e.closeCh) // Consume the task channel in case channel is full. for range e.taskChan { } @@ -616,6 +620,8 @@ func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan< select { case <-txnCtx.Done(): return + case <-e.closeCh: + return case workCh <- task: default: e.addWorker(workCh, &concurrency, lookupConcurrencyLimit)