executor: parallel read inner table and build hash table. (#7544)
This commit is contained in:
@ -263,20 +263,40 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) {
|
||||
|
||||
// fetchInnerRows fetches all rows from inner executor,
|
||||
// and append them to e.innerResult.
|
||||
func (e *HashJoinExec) fetchInnerRows(ctx context.Context) (err error) {
|
||||
func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) {
|
||||
defer func() {
|
||||
close(chkCh)
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
stackSize := runtime.Stack(buf, false)
|
||||
buf = buf[:stackSize]
|
||||
log.Errorf("hash join inner fetcher panic stack is:\n%s", buf)
|
||||
}
|
||||
}()
|
||||
e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize)
|
||||
e.innerResult.GetMemTracker().AttachTo(e.memTracker)
|
||||
e.innerResult.GetMemTracker().SetLabel("innerResult")
|
||||
var err error
|
||||
for {
|
||||
if e.finished.Load().(bool) {
|
||||
return nil
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
default:
|
||||
if e.finished.Load().(bool) {
|
||||
return
|
||||
}
|
||||
chk := e.children[e.innerIdx].newChunk()
|
||||
err = e.innerExec.Next(ctx, chk)
|
||||
if err != nil {
|
||||
e.innerFinished <- errors.Trace(err)
|
||||
return
|
||||
}
|
||||
if chk.NumRows() == 0 {
|
||||
return
|
||||
}
|
||||
chkCh <- chk
|
||||
e.innerResult.Add(chk)
|
||||
}
|
||||
chk := e.children[e.innerIdx].newChunk()
|
||||
err = e.innerExec.Next(ctx, chk)
|
||||
if err != nil || chk.NumRows() == 0 {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
e.innerResult.Add(chk)
|
||||
}
|
||||
}
|
||||
|
||||
@ -512,26 +532,31 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) {
|
||||
}
|
||||
close(e.innerFinished)
|
||||
}()
|
||||
|
||||
if err := e.fetchInnerRows(ctx); err != nil {
|
||||
e.innerFinished <- errors.Trace(err)
|
||||
return
|
||||
}
|
||||
// innerResultCh transfer inner result chunk from inner fetch to build hash table.
|
||||
innerResultCh := make(chan *chunk.Chunk, e.concurrency)
|
||||
doneCh := make(chan struct{})
|
||||
go e.fetchInnerRows(ctx, innerResultCh, doneCh)
|
||||
|
||||
if e.finished.Load().(bool) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := e.buildHashTableForList(); err != nil {
|
||||
// TODO: Parallel build hash table. Currently not support because `mvmap` is not thread-safe.
|
||||
err := e.buildHashTableForList(innerResultCh)
|
||||
if err != nil {
|
||||
e.innerFinished <- errors.Trace(err)
|
||||
return
|
||||
close(doneCh)
|
||||
// fetchInnerRows may be blocked by this channel, so read from the channel to unblock it.
|
||||
select {
|
||||
case <-innerResultCh:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// buildHashTableForList builds hash table from `list`.
|
||||
// key of hash table: hash value of key columns
|
||||
// value of hash table: RowPtr of the corresponded row
|
||||
func (e *HashJoinExec) buildHashTableForList() error {
|
||||
func (e *HashJoinExec) buildHashTableForList(innerResultCh chan *chunk.Chunk) error {
|
||||
e.hashTable = mvmap.NewMVMap()
|
||||
e.innerKeyColIdx = make([]int, len(e.innerKeys))
|
||||
for i := range e.innerKeys {
|
||||
@ -543,12 +568,14 @@ func (e *HashJoinExec) buildHashTableForList() error {
|
||||
keyBuf = make([]byte, 0, 64)
|
||||
valBuf = make([]byte, 8)
|
||||
)
|
||||
for i := 0; i < e.innerResult.NumChunks(); i++ {
|
||||
|
||||
chkIdx := uint32(0)
|
||||
for chk := range innerResultCh {
|
||||
if e.finished.Load().(bool) {
|
||||
return nil
|
||||
}
|
||||
chk := e.innerResult.GetChunk(i)
|
||||
for j := 0; j < chk.NumRows(); j++ {
|
||||
numRows := chk.NumRows()
|
||||
for j := 0; j < numRows; j++ {
|
||||
hasNull, keyBuf, err = e.getJoinKeyFromChkRow(false, chk.GetRow(j), keyBuf)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
@ -556,10 +583,11 @@ func (e *HashJoinExec) buildHashTableForList() error {
|
||||
if hasNull {
|
||||
continue
|
||||
}
|
||||
rowPtr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)}
|
||||
rowPtr := chunk.RowPtr{ChkIdx: chkIdx, RowIdx: uint32(j)}
|
||||
*(*chunk.RowPtr)(unsafe.Pointer(&valBuf[0])) = rowPtr
|
||||
e.hashTable.Put(keyBuf, valBuf)
|
||||
}
|
||||
chkIdx++
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user