diff --git a/executor/join.go b/executor/join.go index 244cf82c67..9480dfe5ef 100644 --- a/executor/join.go +++ b/executor/join.go @@ -263,20 +263,40 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) { // fetchInnerRows fetches all rows from inner executor, // and append them to e.innerResult. -func (e *HashJoinExec) fetchInnerRows(ctx context.Context) (err error) { +func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) { + defer func() { + close(chkCh) + if r := recover(); r != nil { + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + log.Errorf("hash join inner fetcher panic stack is:\n%s", buf) + } + }() e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize) e.innerResult.GetMemTracker().AttachTo(e.memTracker) e.innerResult.GetMemTracker().SetLabel("innerResult") + var err error for { - if e.finished.Load().(bool) { - return nil + select { + case <-doneCh: + return + default: + if e.finished.Load().(bool) { + return + } + chk := e.children[e.innerIdx].newChunk() + err = e.innerExec.Next(ctx, chk) + if err != nil { + e.innerFinished <- errors.Trace(err) + return + } + if chk.NumRows() == 0 { + return + } + chkCh <- chk + e.innerResult.Add(chk) } - chk := e.children[e.innerIdx].newChunk() - err = e.innerExec.Next(ctx, chk) - if err != nil || chk.NumRows() == 0 { - return errors.Trace(err) - } - e.innerResult.Add(chk) } } @@ -512,26 +532,31 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { } close(e.innerFinished) }() - - if err := e.fetchInnerRows(ctx); err != nil { - e.innerFinished <- errors.Trace(err) - return - } + // innerResultCh transfer inner result chunk from inner fetch to build hash table. + innerResultCh := make(chan *chunk.Chunk, e.concurrency) + doneCh := make(chan struct{}) + go e.fetchInnerRows(ctx, innerResultCh, doneCh) if e.finished.Load().(bool) { return } - - if err := e.buildHashTableForList(); err != nil { + // TODO: Parallel build hash table. Currently not support because `mvmap` is not thread-safe. + err := e.buildHashTableForList(innerResultCh) + if err != nil { e.innerFinished <- errors.Trace(err) - return + close(doneCh) + // fetchInnerRows may be blocked by this channel, so read from the channel to unblock it. + select { + case <-innerResultCh: + default: + } } } // buildHashTableForList builds hash table from `list`. // key of hash table: hash value of key columns // value of hash table: RowPtr of the corresponded row -func (e *HashJoinExec) buildHashTableForList() error { +func (e *HashJoinExec) buildHashTableForList(innerResultCh chan *chunk.Chunk) error { e.hashTable = mvmap.NewMVMap() e.innerKeyColIdx = make([]int, len(e.innerKeys)) for i := range e.innerKeys { @@ -543,12 +568,14 @@ func (e *HashJoinExec) buildHashTableForList() error { keyBuf = make([]byte, 0, 64) valBuf = make([]byte, 8) ) - for i := 0; i < e.innerResult.NumChunks(); i++ { + + chkIdx := uint32(0) + for chk := range innerResultCh { if e.finished.Load().(bool) { return nil } - chk := e.innerResult.GetChunk(i) - for j := 0; j < chk.NumRows(); j++ { + numRows := chk.NumRows() + for j := 0; j < numRows; j++ { hasNull, keyBuf, err = e.getJoinKeyFromChkRow(false, chk.GetRow(j), keyBuf) if err != nil { return errors.Trace(err) @@ -556,10 +583,11 @@ func (e *HashJoinExec) buildHashTableForList() error { if hasNull { continue } - rowPtr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)} + rowPtr := chunk.RowPtr{ChkIdx: chkIdx, RowIdx: uint32(j)} *(*chunk.RowPtr)(unsafe.Pointer(&valBuf[0])) = rowPtr e.hashTable.Put(keyBuf, valBuf) } + chkIdx++ } return nil }