// Copyright 2016 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package executor import ( "context" "math" "sync" "sync/atomic" "time" "unsafe" "github.com/pingcap/errors" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mvmap" log "github.com/sirupsen/logrus" "github.com/spaolacci/murmur3" ) var ( _ Executor = &HashJoinExec{} _ Executor = &NestedLoopApplyExec{} ) // HashJoinExec implements the hash join algorithm. type HashJoinExec struct { baseExecutor outerExec Executor innerExec Executor outerFilter expression.CNFExprs outerKeys []*expression.Column innerKeys []*expression.Column prepared bool // concurrency is the number of partition, build and join workers. concurrency uint globalHashTable *mvmap.MVMap innerFinished chan error hashJoinBuffers []*hashJoinBuffer // joinWorkerWaitGroup is for sync multiple join workers. joinWorkerWaitGroup sync.WaitGroup finished atomic.Value // closeCh add a lock for closing executor. closeCh chan struct{} joinType plannercore.JoinType innerIdx int // We build individual joiner for each join worker when use chunk-based // execution, to avoid the concurrency of joiner.chk and joiner.selected. joiners []joiner outerKeyColIdx []int innerKeyColIdx []int innerResult *chunk.List outerChkResourceCh chan *outerChkResource outerResultChs []chan *chunk.Chunk joinChkResourceCh []chan *chunk.Chunk joinResultCh chan *hashjoinWorkerResult hashTableValBufs [][][]byte memTracker *memory.Tracker // track memory usage. // radixBits indicates the bits using for radix partitioning. Inner relation // will be split to 2^radixBitsNumber sub-relations before building the hash // tables. If the complete inner relation can be hold in L2Cache in which // case radixBits will be 1, we can skip the partition phase. // Note: We actually check whether `size of sub inner relation < 3/4 * L2 // cache size` to make sure one inner sub-relation, hash table, one outer // sub-relation and join result of the sub-relations can be totally loaded // in L2 cache size. `3/4` is a magic number, we may adjust it after // benchmark. radixBits uint32 innerParts []partition // innerRowPrts indicates the position in corresponding partition of every // row in innerResult. innerRowPrts [][]partRowPtr // hashTables stores the hash tables built from the partitions of inner relation. hashTables []*mvmap.MVMap numNonEmptyPart int } // partition stores the sub-relations of inner relation and outer relation after // partition phase. Every partition can be fully stored in L2 cache thus can // reduce the cache miss ratio when building and probing the hash table. type partition = *chunk.Chunk // partRowPtr stores the actual index in `innerParts` or `outerParts`. type partRowPtr struct { partitionIdx uint32 rowIdx uint32 } // partPtr4NullKey indicates a partition pointer which points to a row with null-join-key. var partPtr4NullKey = partRowPtr{math.MaxUint32, math.MaxUint32} // outerChkResource stores the result of the join outer fetch worker, // `dest` is for Chunk reuse: after join workers process the outer chunk which is read from `dest`, // they'll store the used chunk as `chk`, and then the outer fetch worker will put new data into `chk` and write `chk` into dest. type outerChkResource struct { chk *chunk.Chunk dest chan<- *chunk.Chunk } // hashjoinWorkerResult stores the result of join workers, // `src` is for Chunk reuse: the main goroutine will get the join result chunk `chk`, // and push `chk` into `src` after processing, join worker goroutines get the empty chunk from `src` // and push new data into this chunk. type hashjoinWorkerResult struct { chk *chunk.Chunk err error src chan<- *chunk.Chunk } type hashJoinBuffer struct { data []types.Datum bytes []byte } // Close implements the Executor Close interface. func (e *HashJoinExec) Close() error { close(e.closeCh) e.finished.Store(true) if e.prepared { if e.innerFinished != nil { for range e.innerFinished { } } if e.joinResultCh != nil { for range e.joinResultCh { } } if e.outerChkResourceCh != nil { close(e.outerChkResourceCh) for range e.outerChkResourceCh { } } for i := range e.outerResultChs { for range e.outerResultChs[i] { } } for i := range e.joinChkResourceCh { close(e.joinChkResourceCh[i]) for range e.joinChkResourceCh[i] { } } e.outerChkResourceCh = nil e.joinChkResourceCh = nil } e.memTracker.Detach() e.memTracker = nil err := e.baseExecutor.Close() return errors.Trace(err) } // Open implements the Executor Open interface. func (e *HashJoinExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } e.prepared = false e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaHashJoin) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.hashTableValBufs = make([][][]byte, e.concurrency) e.hashJoinBuffers = make([]*hashJoinBuffer, 0, e.concurrency) for i := uint(0); i < e.concurrency; i++ { buffer := &hashJoinBuffer{ data: make([]types.Datum, len(e.outerKeys)), bytes: make([]byte, 0, 10000), } e.hashJoinBuffers = append(e.hashJoinBuffers, buffer) } e.innerKeyColIdx = make([]int, len(e.innerKeys)) for i := range e.innerKeys { e.innerKeyColIdx[i] = e.innerKeys[i].Index } e.closeCh = make(chan struct{}) e.finished.Store(false) e.joinWorkerWaitGroup = sync.WaitGroup{} return nil } func (e *HashJoinExec) getJoinKeyFromChkRow(isOuterKey bool, row chunk.Row, keyBuf []byte) (hasNull bool, _ []byte, err error) { var keyColIdx []int var allTypes []*types.FieldType if isOuterKey { keyColIdx = e.outerKeyColIdx allTypes = e.outerExec.retTypes() } else { keyColIdx = e.innerKeyColIdx allTypes = e.innerExec.retTypes() } for _, i := range keyColIdx { if row.IsNull(i) { return true, keyBuf, nil } } keyBuf = keyBuf[:0] keyBuf, err = codec.HashChunkRow(e.ctx.GetSessionVars().StmtCtx, keyBuf, row, allTypes, keyColIdx) if err != nil { err = errors.Trace(err) } return false, keyBuf, err } // fetchOuterChunks get chunks from fetches chunks from the big table in a background goroutine // and sends the chunks to multiple channels which will be read by multiple join workers. func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) { hasWaitedForInner := false for { if e.finished.Load().(bool) { return } var outerResource *outerChkResource ok := true select { case <-e.closeCh: return case outerResource, ok = <-e.outerChkResourceCh: if !ok { return } } outerResult := outerResource.chk err := e.outerExec.Next(ctx, outerResult) if err != nil { e.joinResultCh <- &hashjoinWorkerResult{ err: errors.Trace(err), } return } if !hasWaitedForInner { if outerResult.NumRows() == 0 { e.finished.Store(true) return } jobFinished, innerErr := e.wait4Inner() if innerErr != nil { e.joinResultCh <- &hashjoinWorkerResult{ err: errors.Trace(innerErr), } return } else if jobFinished { return } hasWaitedForInner = true } if outerResult.NumRows() == 0 { return } outerResource.dest <- outerResult } } func (e *HashJoinExec) wait4Inner() (finished bool, err error) { select { case <-e.closeCh: return true, nil case err, _ := <-e.innerFinished: if err != nil { return false, errors.Trace(err) } } if e.innerResult.Len() == 0 && (e.joinType == plannercore.InnerJoin || e.joinType == plannercore.SemiJoin) { return true, nil } return false, nil } // fetchInnerRows fetches all rows from inner executor, and append them to // e.innerResult. func (e *HashJoinExec) fetchInnerRows(ctx context.Context) error { e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) e.innerResult.GetMemTracker().AttachTo(e.memTracker) e.innerResult.GetMemTracker().SetLabel("innerResult") var err error for { if e.finished.Load().(bool) { return nil } chk := e.children[e.innerIdx].newFirstChunk() err = e.innerExec.Next(ctx, chk) if err != nil || chk.NumRows() == 0 { return err } e.innerResult.Add(chk) } } // partitionInnerRows re-order e.innerResults into sub-relations. func (e *HashJoinExec) partitionInnerRows() error { if err := e.preAlloc4InnerParts(); err != nil { return err } wg := sync.WaitGroup{} defer wg.Wait() wg.Add(int(e.concurrency)) for i := 0; i < int(e.concurrency); i++ { workerID := i go util.WithRecovery(func() { defer wg.Done() e.doInnerPartition(workerID) }, e.handlePartitionPanic) } return nil } func (e *HashJoinExec) handlePartitionPanic(r interface{}) { if r != nil { e.joinResultCh <- &hashjoinWorkerResult{err: errors.Errorf("%v", r)} } } // doInnerPartition runs concurrently, partitions and copies the inner relation // to several pre-allocated data partitions. The input inner Chunk idx for each // partitioner is workerId + x*numPartitioners. func (e *HashJoinExec) doInnerPartition(workerID int) { chkIdx, chkNum := workerID, e.innerResult.NumChunks() for ; chkIdx < chkNum; chkIdx += int(e.concurrency) { if e.finished.Load().(bool) { return } chk := e.innerResult.GetChunk(chkIdx) for srcRowIdx, partPtr := range e.innerRowPrts[chkIdx] { if partPtr == partPtr4NullKey { continue } partIdx, destRowIdx := partPtr.partitionIdx, partPtr.rowIdx part := e.innerParts[partIdx] part.Insert(int(destRowIdx), chk.GetRow(srcRowIdx)) } } } // preAlloc4InnerParts evaluates partRowPtr and pre-alloc the memory space // for every inner row to help re-order the inner relation. // TODO: we need to evaluate the skewness for the partitions size, if the // skewness exceeds a threshold, we do not use partition phase. func (e *HashJoinExec) preAlloc4InnerParts() (err error) { hasNull, keyBuf := false, make([]byte, 0, 64) for chkIdx, chkNum := 0, e.innerResult.NumChunks(); chkIdx < chkNum; chkIdx++ { chk := e.innerResult.GetChunk(chkIdx) partPtrs := make([]partRowPtr, chk.NumRows()) for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ { row := chk.GetRow(rowIdx) hasNull, keyBuf, err = e.getJoinKeyFromChkRow(false, row, keyBuf) if err != nil { return err } if hasNull { partPtrs[rowIdx] = partPtr4NullKey continue } joinHash := murmur3.Sum32(keyBuf) partIdx := e.radixBits & joinHash partPtrs[rowIdx].partitionIdx = partIdx partPtrs[rowIdx].rowIdx = e.getPartition(partIdx).PreAlloc(row) } e.innerRowPrts = append(e.innerRowPrts, partPtrs) } if e.numNonEmptyPart < len(e.innerParts) { numTotalPart := len(e.innerParts) numEmptyPart := numTotalPart - e.numNonEmptyPart log.Debugf("[EMPTY_PART_IN_RADIX_HASH_JOIN] txn_start_ts:%v, num_empty_parts:%v, "+ "num_total_parts:%v, empty_ratio:%v", e.ctx.GetSessionVars().TxnCtx.StartTS, numEmptyPart, numTotalPart, float64(numEmptyPart)/float64(numTotalPart)) } return } func (e *HashJoinExec) getPartition(idx uint32) partition { if e.innerParts[idx] == nil { e.numNonEmptyPart++ e.innerParts[idx] = chunk.New(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) } return e.innerParts[idx] } // evalRadixBit evaluates the radix bit numbers. // To ensure that one partition of inner relation, one hash table, one partition // of outer relation and the join result of these two partitions fit into the L2 // cache when the input data obeys the uniform distribution, we suppose every // sub-partition of inner relation using three quarters of the L2 cache size. func (e *HashJoinExec) evalRadixBit() (needPartition bool) { sv := e.ctx.GetSessionVars() innerResultSize := float64(e.innerResult.GetMemTracker().BytesConsumed()) l2CacheSize := float64(sv.L2CacheSize) * 3 / 4 radixBitsNum := uint(math.Ceil(math.Log2(innerResultSize / l2CacheSize))) if radixBitsNum <= 0 { return false } // Take the rightmost radixBitsNum bits as the bitmask. e.radixBits = ^(math.MaxUint32 << radixBitsNum) e.innerParts = make([]partition, 1< 0) { e.joinResultCh <- joinResult } } func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.Row, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { buffer := e.hashJoinBuffers[workerID] hasNull, joinKey, err := e.getJoinKeyFromChkRow(true, outerRow, buffer.bytes) if err != nil { joinResult.err = errors.Trace(err) return false, joinResult } if hasNull { e.joiners[workerID].onMissMatch(outerRow, joinResult.chk) return true, joinResult } e.hashTableValBufs[workerID] = e.globalHashTable.Get(joinKey, e.hashTableValBufs[workerID][:0]) innerPtrs := e.hashTableValBufs[workerID] if len(innerPtrs) == 0 { e.joiners[workerID].onMissMatch(outerRow, joinResult.chk) return true, joinResult } innerRows := make([]chunk.Row, 0, len(innerPtrs)) for _, b := range innerPtrs { ptr := *(*chunk.RowPtr)(unsafe.Pointer(&b[0])) matchedInner := e.innerResult.GetRow(ptr) innerRows = append(innerRows, matchedInner) } iter := chunk.NewIterator4Slice(innerRows) hasMatch := false for iter.Begin(); iter.Current() != iter.End(); { matched, err := e.joiners[workerID].tryToMatch(outerRow, iter, joinResult.chk) if err != nil { joinResult.err = errors.Trace(err) return false, joinResult } hasMatch = hasMatch || matched if joinResult.chk.NumRows() == e.maxChunkSize { ok := true e.joinResultCh <- joinResult ok, joinResult = e.getNewJoinResult(workerID) if !ok { return false, joinResult } } } if !hasMatch { e.joiners[workerID].onMissMatch(outerRow, joinResult.chk) } return true, joinResult } func (e *HashJoinExec) getNewJoinResult(workerID uint) (bool, *hashjoinWorkerResult) { joinResult := &hashjoinWorkerResult{ src: e.joinChkResourceCh[workerID], } ok := true select { case <-e.closeCh: ok = false case joinResult.chk, ok = <-e.joinChkResourceCh[workerID]: } return ok, joinResult } func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResult *hashjoinWorkerResult, selected []bool) (ok bool, _ *hashjoinWorkerResult) { var err error selected, err = expression.VectorizedFilter(e.ctx, e.outerFilter, chunk.NewIterator4Chunk(outerChk), selected) if err != nil { joinResult.err = errors.Trace(err) return false, joinResult } for i := range selected { if !selected[i] { // process unmatched outer rows e.joiners[workerID].onMissMatch(outerChk.GetRow(i), joinResult.chk) } else { // process matched outer rows ok, joinResult = e.joinMatchedOuterRow2Chunk(workerID, outerChk.GetRow(i), joinResult) if !ok { return false, joinResult } } if joinResult.chk.NumRows() == e.maxChunkSize { e.joinResultCh <- joinResult ok, joinResult = e.getNewJoinResult(workerID) if !ok { return false, joinResult } } } return true, joinResult } // Next implements the Executor Next interface. // hash join constructs the result following these steps: // step 1. fetch data from inner child and build a hash table; // step 2. fetch data from outer child in a background goroutine and probe the hash table in multiple join workers. func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) { if e.runtimeStats != nil { start := time.Now() defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } if !e.prepared { e.innerFinished = make(chan error, 1) go util.WithRecovery(func() { e.fetchInnerAndBuildHashTable(ctx) }, e.handleFetchInnerAndBuildHashTablePanic) e.fetchOuterAndProbeHashTable(ctx) e.prepared = true } chk.Reset() if e.joinResultCh == nil { return nil } result, ok := <-e.joinResultCh if !ok { return nil } if result.err != nil { e.finished.Store(true) return errors.Trace(result.err) } chk.SwapColumns(result.chk) result.src <- result.chk return nil } func (e *HashJoinExec) handleFetchInnerAndBuildHashTablePanic(r interface{}) { if r != nil { e.innerFinished <- errors.Errorf("%v", r) } close(e.innerFinished) } func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { // Fetch all the data of the inner relation into memory. if err := e.fetchInnerRows(ctx); err != nil { e.innerFinished <- err return } // Do parallel partition if needed. needPartition := e.ctx.GetSessionVars().EnableRadixJoin && e.evalRadixBit() // non-partitioned hash join if !needPartition { if err := e.buildGlobalHashTable(); err != nil { e.innerFinished <- err } return } // radix-partitioned hash join if err := e.partitionInnerRows(); err != nil { e.innerFinished <- err return } if err := e.buildHashTable4Partitions(); err != nil { e.innerFinished <- err } } func (e *HashJoinExec) wait4BuildHashTable(wg *sync.WaitGroup, finishedCh chan error) { wg.Wait() close(finishedCh) } func (e *HashJoinExec) buildHashTable4Partitions() error { e.hashTables = make([]*mvmap.MVMap, len(e.innerParts)) buildFinishedCh := make(chan error, e.concurrency) wg := &sync.WaitGroup{} wg.Add(int(e.concurrency)) go e.wait4BuildHashTable(wg, buildFinishedCh) for i := 0; i < int(e.concurrency); i++ { workerID := i go util.WithRecovery(func() { defer wg.Done() e.doBuild(workerID, buildFinishedCh) }, nil) } return <-buildFinishedCh } func (e *HashJoinExec) doBuild(workerID int, finishedCh chan error) { var err error keyBuf, valBuf := make([]byte, 0, 64), make([]byte, 4) for i := workerID; i < len(e.innerParts); i += int(e.concurrency) { if e.innerParts[i] == nil { continue } e.hashTables[i] = mvmap.NewMVMap() keyBuf = keyBuf[:0] for rowIdx, numRows := 0, e.innerParts[i].NumRows(); rowIdx < numRows; rowIdx++ { // Join-key can be promised to be NOT NULL in a partition(see `partPtr4NullKey`), so we do not check it. _, keyBuf, err = e.getJoinKeyFromChkRow(false, e.innerParts[i].GetRow(rowIdx), keyBuf) if err != nil { e.finished.Store(true) finishedCh <- err return } *(*uint32)(unsafe.Pointer(&valBuf[0])) = uint32(rowIdx) e.hashTables[i].Put(keyBuf, valBuf) } } } // buildGlobalHashTable builds a global hash table for the inner relation. // key of hash table: hash value of key columns // value of hash table: RowPtr of the corresponded row func (e *HashJoinExec) buildGlobalHashTable() error { e.globalHashTable = mvmap.NewMVMap() var ( hasNull bool err error keyBuf = make([]byte, 0, 64) valBuf = make([]byte, 8) ) for chkIdx := 0; chkIdx < e.innerResult.NumChunks(); chkIdx++ { if e.finished.Load().(bool) { return nil } chk := e.innerResult.GetChunk(chkIdx) for j, numRows := 0, chk.NumRows(); j < numRows; j++ { hasNull, keyBuf, err = e.getJoinKeyFromChkRow(false, chk.GetRow(j), keyBuf) if err != nil { return err } if hasNull { continue } rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(j)} *(*chunk.RowPtr)(unsafe.Pointer(&valBuf[0])) = rowPtr e.globalHashTable.Put(keyBuf, valBuf) } } return nil } // NestedLoopApplyExec is the executor for apply. type NestedLoopApplyExec struct { baseExecutor innerRows []chunk.Row cursor int innerExec Executor outerExec Executor innerFilter expression.CNFExprs outerFilter expression.CNFExprs outer bool joiner joiner outerSchema []*expression.CorrelatedColumn outerChunk *chunk.Chunk outerChunkCursor int outerSelected []bool innerList *chunk.List innerChunk *chunk.Chunk innerSelected []bool innerIter chunk.Iterator outerRow *chunk.Row hasMatch bool memTracker *memory.Tracker // track memory usage. } // Close implements the Executor interface. func (e *NestedLoopApplyExec) Close() error { e.innerRows = nil e.memTracker.Detach() e.memTracker = nil return errors.Trace(e.outerExec.Close()) } // Open implements the Executor interface. func (e *NestedLoopApplyExec) Open(ctx context.Context) error { err := e.outerExec.Open(ctx) if err != nil { return errors.Trace(err) } e.cursor = 0 e.innerRows = e.innerRows[:0] e.outerChunk = e.outerExec.newFirstChunk() e.innerChunk = e.innerExec.newFirstChunk() e.innerList = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaNestedLoopApply) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerList.GetMemTracker().SetLabel("innerList") e.innerList.GetMemTracker().AttachTo(e.memTracker) return nil } func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *chunk.Chunk) (*chunk.Row, error) { outerIter := chunk.NewIterator4Chunk(e.outerChunk) for { if e.outerChunkCursor >= e.outerChunk.NumRows() { err := e.outerExec.Next(ctx, e.outerChunk) if err != nil { return nil, errors.Trace(err) } if e.outerChunk.NumRows() == 0 { return nil, nil } e.outerSelected, err = expression.VectorizedFilter(e.ctx, e.outerFilter, outerIter, e.outerSelected) if err != nil { return nil, errors.Trace(err) } e.outerChunkCursor = 0 } outerRow := e.outerChunk.GetRow(e.outerChunkCursor) selected := e.outerSelected[e.outerChunkCursor] e.outerChunkCursor++ if selected { return &outerRow, nil } else if e.outer { e.joiner.onMissMatch(outerRow, chk) if chk.NumRows() == e.maxChunkSize { return nil, nil } } } } // fetchAllInners reads all data from the inner table and stores them in a List. func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error { err := e.innerExec.Open(ctx) defer terror.Call(e.innerExec.Close) if err != nil { return errors.Trace(err) } e.innerList.Reset() innerIter := chunk.NewIterator4Chunk(e.innerChunk) for { err := e.innerExec.Next(ctx, e.innerChunk) if err != nil { return errors.Trace(err) } if e.innerChunk.NumRows() == 0 { return nil } e.innerSelected, err = expression.VectorizedFilter(e.ctx, e.innerFilter, innerIter, e.innerSelected) if err != nil { return errors.Trace(err) } for row := innerIter.Begin(); row != innerIter.End(); row = innerIter.Next() { if e.innerSelected[row.Idx()] { e.innerList.AppendRow(row) } } } } // Next implements the Executor interface. func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) { if e.runtimeStats != nil { start := time.Now() defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } chk.Reset() for { if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() { if e.outerRow != nil && !e.hasMatch { e.joiner.onMissMatch(*e.outerRow, chk) } e.outerRow, err = e.fetchSelectedOuterRow(ctx, chk) if e.outerRow == nil || err != nil { return errors.Trace(err) } e.hasMatch = false for _, col := range e.outerSchema { *col.Data = e.outerRow.GetDatum(col.Index, col.RetType) } err = e.fetchAllInners(ctx) if err != nil { return errors.Trace(err) } e.innerIter = chunk.NewIterator4List(e.innerList) e.innerIter.Begin() } matched, err := e.joiner.tryToMatch(*e.outerRow, e.innerIter, chk) e.hasMatch = e.hasMatch || matched if err != nil || chk.NumRows() == e.maxChunkSize { return errors.Trace(err) } } }