974 lines
29 KiB
Go
974 lines
29 KiB
Go
// Copyright 2019 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package join
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"hash"
|
|
"hash/fnv"
|
|
"runtime"
|
|
"runtime/trace"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/executor/internal/exec"
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/parser/terror"
|
|
"github.com/pingcap/tidb/pkg/planner/core/base"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/channel"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/codec"
|
|
"github.com/pingcap/tidb/pkg/util/memory"
|
|
"github.com/pingcap/tidb/pkg/util/ranger"
|
|
)
|
|
|
|
// numResChkHold indicates the number of resource chunks that an inner worker
|
|
// holds at the same time.
|
|
// It's used in 2 cases individually:
|
|
// 1. IndexMergeJoin
|
|
// 2. IndexNestedLoopHashJoin:
|
|
// It's used when IndexNestedLoopHashJoin.KeepOuterOrder is true.
|
|
// Otherwise, there will be at most `concurrency` resource chunks throughout
|
|
// the execution of IndexNestedLoopHashJoin.
|
|
const numResChkHold = 4
|
|
|
|
const maxRowsPerFetch = 4096
|
|
|
|
// IndexNestedLoopHashJoin employs one outer worker and N inner workers to
|
|
// execute concurrently. The output order is not promised.
|
|
//
|
|
// The execution flow is very similar to IndexLookUpReader:
|
|
// 1. The outer worker reads N outer rows, builds a task and sends it to the
|
|
// inner worker channel.
|
|
// 2. The inner worker receives the tasks and does 3 things for every task:
|
|
// 1. builds hash table from the outer rows
|
|
// 2. builds key ranges from outer rows and fetches inner rows
|
|
// 3. probes the hash table and sends the join result to the main thread channel.
|
|
// Note: step 1 and step 2 runs concurrently.
|
|
//
|
|
// 3. The main thread receives the join results.
|
|
type IndexNestedLoopHashJoin struct {
|
|
IndexLookUpJoin
|
|
resultCh chan *indexHashJoinResult
|
|
joinChkResourceCh []chan *chunk.Chunk
|
|
// We build individual joiner for each inner worker when using chunk-based
|
|
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
|
|
Joiners []Joiner
|
|
KeepOuterOrder bool
|
|
curTask *indexHashJoinTask
|
|
// taskCh is only used when `KeepOuterOrder` is true.
|
|
taskCh chan *indexHashJoinTask
|
|
|
|
stats *indexLookUpJoinRuntimeStats
|
|
prepared bool
|
|
// panicErr records the error generated by panic recover. This is introduced to
|
|
// return the actual error message instead of `context cancelled` to the client.
|
|
panicErr struct {
|
|
sync.Mutex
|
|
atomic.Bool
|
|
error
|
|
}
|
|
ctxWithCancel context.Context
|
|
}
|
|
|
|
type indexHashJoinOuterWorker struct {
|
|
outerWorker
|
|
innerCh chan *indexHashJoinTask
|
|
keepOuterOrder bool
|
|
// taskCh is only used when the outer order needs to be promised.
|
|
taskCh chan *indexHashJoinTask
|
|
}
|
|
|
|
type indexHashJoinInnerWorker struct {
|
|
innerWorker
|
|
joiner Joiner
|
|
joinChkResourceCh chan *chunk.Chunk
|
|
// resultCh is valid only when indexNestedLoopHashJoin do not need to keep
|
|
// order. Otherwise, it will be nil.
|
|
resultCh chan *indexHashJoinResult
|
|
taskCh <-chan *indexHashJoinTask
|
|
wg *sync.WaitGroup
|
|
joinKeyBuf []byte
|
|
outerRowStatus []outerRowStatusFlag
|
|
rowIter *chunk.Iterator4Slice
|
|
}
|
|
|
|
type indexHashJoinResult struct {
|
|
chk *chunk.Chunk
|
|
err error
|
|
src chan<- *chunk.Chunk
|
|
}
|
|
|
|
type indexHashJoinTask struct {
|
|
*lookUpJoinTask
|
|
outerRowStatus [][]outerRowStatusFlag
|
|
lookupMap BaseHashTable
|
|
err error
|
|
keepOuterOrder bool
|
|
// resultCh is only used when the outer order needs to be promised.
|
|
resultCh chan *indexHashJoinResult
|
|
// matchedInnerRowPtrs is only valid when the outer order needs to be
|
|
// promised. Otherwise, it will be nil.
|
|
// len(matchedInnerRowPtrs) equals to
|
|
// lookUpJoinTask.outerResult.NumChunks(), and the elements of every
|
|
// matchedInnerRowPtrs[chkIdx][rowIdx] indicates the matched inner row ptrs
|
|
// of the corresponding outer row.
|
|
matchedInnerRowPtrs [][][]chunk.RowPtr
|
|
}
|
|
|
|
// Open implements the IndexNestedLoopHashJoin Executor interface.
|
|
func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
|
|
err := exec.Open(ctx, e.Children(0))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if e.memTracker != nil {
|
|
e.memTracker.Reset()
|
|
} else {
|
|
e.memTracker = memory.NewTracker(e.ID(), -1)
|
|
}
|
|
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)
|
|
e.cancelFunc = nil
|
|
e.innerPtrBytes = make([][]byte, 0, 8)
|
|
if e.RuntimeStats() != nil {
|
|
e.stats = &indexLookUpJoinRuntimeStats{}
|
|
}
|
|
e.Finished.Store(false)
|
|
return nil
|
|
}
|
|
|
|
func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context, initBatchSize int) {
|
|
concurrency := e.Ctx().GetSessionVars().IndexLookupJoinConcurrency()
|
|
if e.stats != nil {
|
|
e.stats.concurrency = concurrency
|
|
}
|
|
workerCtx, cancelFunc := context.WithCancel(ctx)
|
|
e.ctxWithCancel, e.cancelFunc = workerCtx, cancelFunc
|
|
innerCh := make(chan *indexHashJoinTask, concurrency)
|
|
if e.KeepOuterOrder {
|
|
e.taskCh = make(chan *indexHashJoinTask, concurrency)
|
|
// When `KeepOuterOrder` is true, each task holds their own `resultCh`
|
|
// individually, thus we do not need a global resultCh.
|
|
e.resultCh = nil
|
|
} else {
|
|
e.resultCh = make(chan *indexHashJoinResult, concurrency)
|
|
}
|
|
e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency)
|
|
e.WorkerWg.Add(1)
|
|
ow := e.newOuterWorker(innerCh, initBatchSize)
|
|
go util.WithRecovery(func() { ow.run(e.ctxWithCancel) }, e.finishJoinWorkers)
|
|
|
|
for i := range concurrency {
|
|
if !e.KeepOuterOrder {
|
|
e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1)
|
|
e.joinChkResourceCh[i] <- exec.NewFirstChunk(e)
|
|
} else {
|
|
e.joinChkResourceCh[i] = make(chan *chunk.Chunk, numResChkHold)
|
|
for range numResChkHold {
|
|
e.joinChkResourceCh[i] <- exec.NewFirstChunk(e)
|
|
}
|
|
}
|
|
}
|
|
|
|
e.WorkerWg.Add(concurrency)
|
|
for i := range concurrency {
|
|
workerID := i
|
|
go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(e.ctxWithCancel, cancelFunc) }, e.finishJoinWorkers)
|
|
}
|
|
go e.wait4JoinWorkers()
|
|
}
|
|
|
|
func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r any) {
|
|
if r != nil {
|
|
// record the panic error first to avoid returning context canceled
|
|
err := fmt.Errorf("%v", r)
|
|
if recoverdErr, ok := r.(error); ok {
|
|
err = recoverdErr
|
|
}
|
|
|
|
if !e.panicErr.Load() {
|
|
e.panicErr.Lock()
|
|
if !e.panicErr.Load() {
|
|
e.panicErr.error = err
|
|
e.panicErr.Store(true)
|
|
}
|
|
e.panicErr.Unlock()
|
|
}
|
|
|
|
// then cancel workers
|
|
e.IndexLookUpJoin.Finished.Store(true)
|
|
if e.cancelFunc != nil {
|
|
e.cancelFunc()
|
|
}
|
|
|
|
if !e.KeepOuterOrder {
|
|
e.resultCh <- &indexHashJoinResult{err: err}
|
|
} else {
|
|
task := &indexHashJoinTask{err: err}
|
|
e.taskCh <- task
|
|
}
|
|
}
|
|
e.WorkerWg.Done()
|
|
}
|
|
|
|
func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() {
|
|
e.WorkerWg.Wait()
|
|
if e.resultCh != nil {
|
|
close(e.resultCh)
|
|
}
|
|
if e.taskCh != nil {
|
|
close(e.taskCh)
|
|
}
|
|
}
|
|
|
|
// Next implements the IndexNestedLoopHashJoin Executor interface.
|
|
func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error {
|
|
if !e.prepared {
|
|
e.startWorkers(ctx, req.RequiredRows())
|
|
e.prepared = true
|
|
}
|
|
req.Reset()
|
|
if e.KeepOuterOrder {
|
|
return e.runInOrder(e.ctxWithCancel, req)
|
|
}
|
|
return e.runUnordered(e.ctxWithCancel, req)
|
|
}
|
|
|
|
func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error {
|
|
for {
|
|
if e.isDryUpTasks(ctx) {
|
|
if e.panicErr.Load() {
|
|
return e.panicErr.error
|
|
}
|
|
return nil
|
|
}
|
|
if e.curTask.err != nil {
|
|
return e.curTask.err
|
|
}
|
|
result, err := e.getResultFromChannel(ctx, e.curTask.resultCh)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if result == nil {
|
|
e.curTask = nil
|
|
continue
|
|
}
|
|
return e.handleResult(req, result)
|
|
}
|
|
}
|
|
|
|
func (e *IndexNestedLoopHashJoin) runUnordered(ctx context.Context, req *chunk.Chunk) error {
|
|
result, err := e.getResultFromChannel(ctx, e.resultCh)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return e.handleResult(req, result)
|
|
}
|
|
|
|
// isDryUpTasks indicates whether all the tasks have been processed.
|
|
func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
|
|
if e.curTask != nil {
|
|
return false
|
|
}
|
|
var ok bool
|
|
select {
|
|
case e.curTask, ok = <-e.taskCh:
|
|
if !ok {
|
|
return true
|
|
}
|
|
case <-ctx.Done():
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resultCh <-chan *indexHashJoinResult) (*indexHashJoinResult, error) {
|
|
var (
|
|
result *indexHashJoinResult
|
|
ok bool
|
|
)
|
|
select {
|
|
case result, ok = <-resultCh:
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
case <-ctx.Done():
|
|
failpoint.Inject("TestIssue49692", func() {
|
|
for !e.panicErr.Load() {
|
|
runtime.Gosched()
|
|
}
|
|
})
|
|
|
|
err := error(nil)
|
|
if e.panicErr.Load() {
|
|
err = e.panicErr.error
|
|
}
|
|
|
|
if err == nil {
|
|
err = ctx.Err()
|
|
}
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (*IndexNestedLoopHashJoin) handleResult(req *chunk.Chunk, result *indexHashJoinResult) error {
|
|
if result == nil {
|
|
return nil
|
|
}
|
|
req.SwapColumns(result.chk)
|
|
result.src <- result.chk
|
|
return nil
|
|
}
|
|
|
|
// Close implements the IndexNestedLoopHashJoin Executor interface.
|
|
func (e *IndexNestedLoopHashJoin) Close() error {
|
|
if e.stats != nil {
|
|
defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats)
|
|
}
|
|
if e.cancelFunc != nil {
|
|
e.cancelFunc()
|
|
}
|
|
if e.resultCh != nil {
|
|
channel.Clear(e.resultCh)
|
|
e.resultCh = nil
|
|
}
|
|
if e.taskCh != nil {
|
|
channel.Clear(e.taskCh)
|
|
e.taskCh = nil
|
|
}
|
|
for i := range e.joinChkResourceCh {
|
|
close(e.joinChkResourceCh[i])
|
|
}
|
|
e.joinChkResourceCh = nil
|
|
e.Finished.Store(false)
|
|
e.prepared = false
|
|
e.ctxWithCancel = nil
|
|
return e.BaseExecutor.Close()
|
|
}
|
|
|
|
func (ow *indexHashJoinOuterWorker) run(ctx context.Context) {
|
|
defer trace.StartRegion(ctx, "IndexHashJoinOuterWorker").End()
|
|
defer close(ow.innerCh)
|
|
for {
|
|
failpoint.Inject("TestIssue30211", nil)
|
|
task, err := ow.buildTask(ctx)
|
|
failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() {
|
|
err = errors.New("mockIndexHashJoinOuterWorkerErr")
|
|
})
|
|
failpoint.Inject("testIssue54055_1", func(val failpoint.Value) {
|
|
if val.(bool) {
|
|
err = errors.New("testIssue54055_1")
|
|
}
|
|
})
|
|
if err != nil {
|
|
task = &indexHashJoinTask{err: err}
|
|
if ow.keepOuterOrder {
|
|
// The outerBuilder and innerFetcher run concurrently, we may
|
|
// get 2 errors at simultaneously. Thus the capacity of task.resultCh
|
|
// needs to be initialized to 2 to avoid waiting.
|
|
task.keepOuterOrder, task.resultCh = true, make(chan *indexHashJoinResult, 2)
|
|
ow.pushToChan(ctx, task, ow.taskCh)
|
|
}
|
|
ow.pushToChan(ctx, task, ow.innerCh)
|
|
return
|
|
}
|
|
if task == nil {
|
|
return
|
|
}
|
|
if finished := ow.pushToChan(ctx, task, ow.innerCh); finished {
|
|
return
|
|
}
|
|
if ow.keepOuterOrder {
|
|
failpoint.Inject("testIssue20779", func() {
|
|
panic("testIssue20779")
|
|
})
|
|
if finished := ow.pushToChan(ctx, task, ow.taskCh); finished {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ow *indexHashJoinOuterWorker) buildTask(ctx context.Context) (*indexHashJoinTask, error) {
|
|
task, err := ow.outerWorker.buildTask(ctx)
|
|
if task == nil || err != nil {
|
|
return nil, err
|
|
}
|
|
var (
|
|
resultCh chan *indexHashJoinResult
|
|
matchedInnerRowPtrs [][][]chunk.RowPtr
|
|
)
|
|
if ow.keepOuterOrder {
|
|
resultCh = make(chan *indexHashJoinResult, numResChkHold)
|
|
matchedInnerRowPtrs = make([][][]chunk.RowPtr, task.outerResult.NumChunks())
|
|
for i := range matchedInnerRowPtrs {
|
|
matchedInnerRowPtrs[i] = make([][]chunk.RowPtr, task.outerResult.GetChunk(i).NumRows())
|
|
}
|
|
}
|
|
numChks := task.outerResult.NumChunks()
|
|
outerRowStatus := make([][]outerRowStatusFlag, numChks)
|
|
for i := range numChks {
|
|
outerRowStatus[i] = make([]outerRowStatusFlag, task.outerResult.GetChunk(i).NumRows())
|
|
}
|
|
return &indexHashJoinTask{
|
|
lookUpJoinTask: task,
|
|
outerRowStatus: outerRowStatus,
|
|
keepOuterOrder: ow.keepOuterOrder,
|
|
resultCh: resultCh,
|
|
matchedInnerRowPtrs: matchedInnerRowPtrs,
|
|
}, nil
|
|
}
|
|
|
|
func (*indexHashJoinOuterWorker) pushToChan(ctx context.Context, task *indexHashJoinTask, dst chan<- *indexHashJoinTask) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
return true
|
|
case dst <- task:
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (e *IndexNestedLoopHashJoin) newOuterWorker(innerCh chan *indexHashJoinTask, initBatchSize int) *indexHashJoinOuterWorker {
|
|
maxBatchSize := e.Ctx().GetSessionVars().IndexJoinBatchSize
|
|
batchSize := min(initBatchSize, maxBatchSize)
|
|
ow := &indexHashJoinOuterWorker{
|
|
outerWorker: outerWorker{
|
|
OuterCtx: e.OuterCtx,
|
|
ctx: e.Ctx(),
|
|
executor: e.Children(0),
|
|
batchSize: batchSize,
|
|
maxBatchSize: maxBatchSize,
|
|
parentMemTracker: e.memTracker,
|
|
lookup: &e.IndexLookUpJoin,
|
|
},
|
|
innerCh: innerCh,
|
|
keepOuterOrder: e.KeepOuterOrder,
|
|
taskCh: e.taskCh,
|
|
}
|
|
return ow
|
|
}
|
|
|
|
func (e *IndexNestedLoopHashJoin) supportIncrementalLookUp() bool {
|
|
return !e.KeepOuterOrder &&
|
|
(JoinerType(e.Joiners[0]) == base.InnerJoin ||
|
|
JoinerType(e.Joiners[0]) == base.LeftOuterJoin ||
|
|
JoinerType(e.Joiners[0]) == base.RightOuterJoin ||
|
|
JoinerType(e.Joiners[0]) == base.AntiSemiJoin)
|
|
}
|
|
|
|
func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, workerID int) *indexHashJoinInnerWorker {
|
|
// Since multiple inner workers run concurrently, we should copy join's IndexRanges for every worker to avoid data race.
|
|
copiedRanges := make([]*ranger.Range, 0, len(e.IndexRanges.Range()))
|
|
for _, ran := range e.IndexRanges.Range() {
|
|
copiedRanges = append(copiedRanges, ran.Clone())
|
|
}
|
|
var innerStats *innerWorkerRuntimeStats
|
|
if e.stats != nil {
|
|
innerStats = &e.stats.innerWorker
|
|
}
|
|
var maxRows int
|
|
// If the joiner supports incremental look up, we can fetch inner results in batches,
|
|
// retrieving partial results multiple times until all are fetched.
|
|
// Otherwise, we fetch all inner results in one batch.
|
|
if e.supportIncrementalLookUp() {
|
|
maxRows = maxRowsPerFetch
|
|
}
|
|
iw := &indexHashJoinInnerWorker{
|
|
innerWorker: innerWorker{
|
|
InnerCtx: e.InnerCtx,
|
|
outerCtx: e.OuterCtx,
|
|
ctx: e.Ctx(),
|
|
maxFetchSize: maxRows,
|
|
indexRanges: copiedRanges,
|
|
keyOff2IdxOff: e.KeyOff2IdxOff,
|
|
stats: innerStats,
|
|
lookup: &e.IndexLookUpJoin,
|
|
memTracker: memory.NewTracker(memory.LabelForIndexJoinInnerWorker, -1),
|
|
},
|
|
taskCh: taskCh,
|
|
joiner: e.Joiners[workerID],
|
|
joinChkResourceCh: e.joinChkResourceCh[workerID],
|
|
resultCh: e.resultCh,
|
|
joinKeyBuf: make([]byte, 1),
|
|
outerRowStatus: make([]outerRowStatusFlag, 0, e.MaxChunkSize()),
|
|
rowIter: chunk.NewIterator4Slice([]chunk.Row{}),
|
|
}
|
|
iw.memTracker.AttachTo(e.memTracker)
|
|
if len(copiedRanges) != 0 {
|
|
// We should not consume this memory usage in `iw.memTracker`. The
|
|
// memory usage of inner worker will be reset the end of iw.handleTask.
|
|
// While the life cycle of this memory consumption exists throughout the
|
|
// whole active period of inner worker.
|
|
e.Ctx().GetSessionVars().StmtCtx.MemTracker.Consume(2 * types.EstimatedMemUsage(copiedRanges[0].LowVal, len(copiedRanges)))
|
|
}
|
|
if e.LastColHelper != nil {
|
|
// nextCwf.TmpConstant needs to be reset for every individual
|
|
// inner worker to avoid data race when the inner workers is running
|
|
// concurrently.
|
|
nextCwf := *e.LastColHelper
|
|
nextCwf.TmpConstant = make([]*expression.Constant, len(e.LastColHelper.TmpConstant))
|
|
for i := range e.LastColHelper.TmpConstant {
|
|
nextCwf.TmpConstant[i] = &expression.Constant{RetType: nextCwf.TargetCol.RetType}
|
|
}
|
|
iw.nextColCompareFilters = &nextCwf
|
|
}
|
|
return iw
|
|
}
|
|
|
|
func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.CancelFunc) {
|
|
defer trace.StartRegion(ctx, "IndexHashJoinInnerWorker").End()
|
|
var task *indexHashJoinTask
|
|
joinResult, ok := iw.getNewJoinResult(ctx)
|
|
if !ok {
|
|
cancelFunc()
|
|
return
|
|
}
|
|
h, resultCh := fnv.New64(), iw.resultCh
|
|
for {
|
|
// The previous task has been processed, so release the occupied memory
|
|
if task != nil {
|
|
task.memTracker.Detach()
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case task, ok = <-iw.taskCh:
|
|
}
|
|
if !ok {
|
|
break
|
|
}
|
|
// We need to init resultCh before the err is returned.
|
|
if task.keepOuterOrder {
|
|
resultCh = task.resultCh
|
|
}
|
|
if task.err != nil {
|
|
joinResult.err = task.err
|
|
break
|
|
}
|
|
err := iw.handleTask(ctx, task, joinResult, h, resultCh)
|
|
if err != nil && !task.keepOuterOrder {
|
|
// Only need check non-keep-outer-order case because the
|
|
// `joinResult` had been sent to the `resultCh` when err != nil.
|
|
joinResult.err = err
|
|
break
|
|
}
|
|
if task.keepOuterOrder {
|
|
// We need to get a new result holder here because the old
|
|
// `joinResult` hash been sent to the `resultCh` or to the
|
|
// `joinChkResourceCh`.
|
|
joinResult, ok = iw.getNewJoinResult(ctx)
|
|
if !ok {
|
|
cancelFunc()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() {
|
|
joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr")
|
|
})
|
|
// When task.KeepOuterOrder is TRUE (resultCh != iw.resultCh):
|
|
// - the last joinResult will be handled when the task has been processed,
|
|
// thus we DO NOT need to check it here again.
|
|
// - we DO NOT check the error here neither, because:
|
|
// - if the error is from task.err, the main thread will check the error of each task
|
|
// - if the error is from handleTask, the error will be handled in handleTask
|
|
// We should not check `task != nil && !task.KeepOuterOrder` here since it's
|
|
// possible that `join.chk.NumRows > 0` is true even if task == nil.
|
|
if resultCh == iw.resultCh {
|
|
if joinResult.err != nil {
|
|
resultCh <- joinResult
|
|
return
|
|
}
|
|
if joinResult.chk != nil && joinResult.chk.NumRows() > 0 {
|
|
select {
|
|
case resultCh <- joinResult:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*indexHashJoinResult, bool) {
|
|
joinResult := &indexHashJoinResult{
|
|
src: iw.joinChkResourceCh,
|
|
}
|
|
ok := true
|
|
select {
|
|
case joinResult.chk, ok = <-iw.joinChkResourceCh:
|
|
case <-ctx.Done():
|
|
joinResult.err = ctx.Err()
|
|
return joinResult, false
|
|
}
|
|
return joinResult, ok
|
|
}
|
|
|
|
func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(task *indexHashJoinTask, h hash.Hash64) {
|
|
failpoint.Inject("IndexHashJoinBuildHashTablePanic", nil)
|
|
failpoint.Inject("ConsumeRandomPanic", nil)
|
|
if iw.stats != nil {
|
|
start := time.Now()
|
|
defer func() {
|
|
atomic.AddInt64(&iw.stats.build, int64(time.Since(start)))
|
|
}()
|
|
}
|
|
buf, numChks := make([]byte, 1), task.outerResult.NumChunks()
|
|
task.lookupMap = newUnsafeHashTable(task.outerResult.Len())
|
|
for chkIdx := range numChks {
|
|
chk := task.outerResult.GetChunk(chkIdx)
|
|
numRows := chk.NumRows()
|
|
if iw.lookup.Finished.Load().(bool) {
|
|
return
|
|
}
|
|
OUTER:
|
|
for rowIdx := range numRows {
|
|
if task.outerMatch != nil && !task.outerMatch[chkIdx][rowIdx] {
|
|
continue
|
|
}
|
|
row := chk.GetRow(rowIdx)
|
|
hashColIdx := iw.outerCtx.HashCols
|
|
for _, i := range hashColIdx {
|
|
if row.IsNull(i) {
|
|
continue OUTER
|
|
}
|
|
}
|
|
h.Reset()
|
|
err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx.TypeCtx(), h, row, iw.outerCtx.HashTypes, hashColIdx, buf)
|
|
failpoint.Inject("testIndexHashJoinBuildErr", func() {
|
|
err = errors.New("mockIndexHashJoinBuildErr")
|
|
})
|
|
if err != nil {
|
|
// This panic will be recovered by the invoker.
|
|
panic(err.Error())
|
|
}
|
|
rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}
|
|
task.lookupMap.Put(h.Sum64(), rowPtr)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(resultCh chan *indexHashJoinResult, err error) {
|
|
defer func() {
|
|
iw.wg.Done()
|
|
iw.lookup.WorkerWg.Done()
|
|
}()
|
|
if err != nil {
|
|
resultCh <- &indexHashJoinResult{err: err}
|
|
}
|
|
}
|
|
|
|
func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) (err error) {
|
|
defer func() {
|
|
iw.memTracker.Consume(-iw.memTracker.BytesConsumed())
|
|
if task.keepOuterOrder {
|
|
if err != nil {
|
|
joinResult.err = err
|
|
select {
|
|
case <-ctx.Done():
|
|
case resultCh <- joinResult:
|
|
}
|
|
}
|
|
close(resultCh)
|
|
}
|
|
}()
|
|
failpoint.Inject("testIssue54055_2", func(val failpoint.Value) {
|
|
if val.(bool) {
|
|
time.Sleep(10 * time.Millisecond)
|
|
panic("testIssue54055_2")
|
|
}
|
|
})
|
|
var joinStartTime time.Time
|
|
if iw.stats != nil {
|
|
start := time.Now()
|
|
defer func() {
|
|
endTime := time.Now()
|
|
atomic.AddInt64(&iw.stats.totalTime, int64(endTime.Sub(start)))
|
|
// Only used for doJoinInOrder. doJoinUnordered will calculate join time in itself.
|
|
// FetchInnerResults maybe return err and return, so joinStartTime is not initialized.
|
|
if !joinStartTime.IsZero() {
|
|
atomic.AddInt64(&iw.stats.join, int64(endTime.Sub(joinStartTime)))
|
|
}
|
|
}()
|
|
}
|
|
|
|
iw.wg = &sync.WaitGroup{}
|
|
iw.wg.Add(1)
|
|
iw.lookup.WorkerWg.Add(1)
|
|
var buildHashTableErr error
|
|
// TODO(XuHuaiyu): we may always use the smaller side to build the hashtable.
|
|
go util.WithRecovery(
|
|
func() {
|
|
iw.buildHashTableForOuterResult(task, h)
|
|
},
|
|
func(r any) {
|
|
if r != nil {
|
|
buildHashTableErr = errors.Errorf("%v", r)
|
|
}
|
|
iw.handleHashJoinInnerWorkerPanic(resultCh, buildHashTableErr)
|
|
},
|
|
)
|
|
lookUpContents, err := iw.constructLookupContent(task.lookUpJoinTask)
|
|
if err == nil {
|
|
err = iw.innerWorker.fetchInnerResults(ctx, task.lookUpJoinTask, lookUpContents)
|
|
}
|
|
iw.wg.Wait()
|
|
// check error after wg.Wait to make sure error message can be sent to
|
|
// resultCh even if panic happen in buildHashTableForOuterResult.
|
|
failpoint.Inject("IndexHashJoinFetchInnerResultsErr", func() {
|
|
err = errors.New("IndexHashJoinFetchInnerResultsErr")
|
|
})
|
|
failpoint.Inject("ConsumeRandomPanic", nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if buildHashTableErr != nil {
|
|
return buildHashTableErr
|
|
}
|
|
|
|
if !task.keepOuterOrder {
|
|
for {
|
|
err = iw.doJoinUnordered(ctx, task, joinResult, h, resultCh)
|
|
if err != nil {
|
|
if task.innerExec != nil {
|
|
terror.Log(exec.Close(task.innerExec))
|
|
task.innerExec = nil
|
|
}
|
|
return err
|
|
}
|
|
// innerExec not finished, we need to fetch inner results and do join again.
|
|
if task.innerExec != nil {
|
|
err = iw.innerWorker.fetchInnerResults(ctx, task.lookUpJoinTask, lookUpContents)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
return nil
|
|
}
|
|
joinStartTime = time.Now()
|
|
return iw.doJoinInOrder(ctx, task, joinResult, h, resultCh)
|
|
}
|
|
|
|
func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
|
|
if iw.stats != nil {
|
|
start := time.Now()
|
|
defer func() {
|
|
atomic.AddInt64(&iw.stats.join, int64(time.Since(start)))
|
|
}()
|
|
}
|
|
var ok bool
|
|
iter := chunk.NewIterator4List(task.innerResult)
|
|
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
|
|
ok, joinResult = iw.joinMatchedInnerRow2Chunk(ctx, row, task, joinResult, h, iw.joinKeyBuf)
|
|
if !ok {
|
|
return joinResult.err
|
|
}
|
|
}
|
|
if task.innerExec != nil {
|
|
return nil
|
|
}
|
|
// Only when innerExec is finished, we can call OnMissMatch for the unmatched outer rows.
|
|
for chkIdx, outerRowStatus := range task.outerRowStatus {
|
|
chk := task.outerResult.GetChunk(chkIdx)
|
|
for rowIdx, val := range outerRowStatus {
|
|
if val == outerRowMatched {
|
|
continue
|
|
}
|
|
iw.joiner.OnMissMatch(val == outerRowHasNull, chk.GetRow(rowIdx), joinResult.chk)
|
|
if joinResult.chk.IsFull() {
|
|
select {
|
|
case resultCh <- joinResult:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
joinResult, ok = iw.getNewJoinResult(ctx)
|
|
if !ok {
|
|
return errors.New("indexHashJoinInnerWorker.doJoinUnordered failed")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task *indexHashJoinTask, h hash.Hash64, buf []byte) (matchedRows []chunk.Row, matchedRowPtr []chunk.RowPtr, err error) {
|
|
h.Reset()
|
|
err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx.TypeCtx(), h, innerRow, iw.HashTypes, iw.HashCols, buf)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
matchedOuterEntry := task.lookupMap.Get(h.Sum64())
|
|
if matchedOuterEntry == nil {
|
|
return nil, nil, nil
|
|
}
|
|
joinType := JoinerType(iw.joiner)
|
|
isSemiJoin := joinType.IsSemiJoin()
|
|
for ; matchedOuterEntry != nil; matchedOuterEntry = matchedOuterEntry.Next {
|
|
ptr := matchedOuterEntry.Ptr
|
|
outerRow := task.outerResult.GetRow(ptr)
|
|
ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx.TypeCtx(), innerRow, iw.HashTypes, iw.HashCols, outerRow, iw.outerCtx.HashTypes, iw.outerCtx.HashCols)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if !ok || (task.outerRowStatus[ptr.ChkIdx][ptr.RowIdx] == outerRowMatched && isSemiJoin) {
|
|
continue
|
|
}
|
|
matchedRows = append(matchedRows, outerRow)
|
|
matchedRowPtr = append(matchedRowPtr, chunk.RowPtr{ChkIdx: ptr.ChkIdx, RowIdx: ptr.RowIdx})
|
|
}
|
|
return matchedRows, matchedRowPtr, nil
|
|
}
|
|
|
|
func (iw *indexHashJoinInnerWorker) joinMatchedInnerRow2Chunk(ctx context.Context, innerRow chunk.Row, task *indexHashJoinTask,
|
|
joinResult *indexHashJoinResult, h hash.Hash64, buf []byte) (bool, *indexHashJoinResult) {
|
|
matchedOuterRows, matchedOuterRowPtr, err := iw.getMatchedOuterRows(innerRow, task, h, buf)
|
|
if err != nil {
|
|
joinResult.err = err
|
|
return false, joinResult
|
|
}
|
|
if len(matchedOuterRows) == 0 {
|
|
return true, joinResult
|
|
}
|
|
var ok bool
|
|
cursor := 0
|
|
iw.rowIter.Reset(matchedOuterRows)
|
|
iter := iw.rowIter
|
|
for iw.rowIter.Begin(); iter.Current() != iter.End(); {
|
|
iw.outerRowStatus, err = iw.joiner.TryToMatchOuters(iter, innerRow, joinResult.chk, iw.outerRowStatus)
|
|
if err != nil {
|
|
joinResult.err = err
|
|
return false, joinResult
|
|
}
|
|
for _, status := range iw.outerRowStatus {
|
|
chkIdx, rowIdx := matchedOuterRowPtr[cursor].ChkIdx, matchedOuterRowPtr[cursor].RowIdx
|
|
if status == outerRowMatched || task.outerRowStatus[chkIdx][rowIdx] == outerRowUnmatched {
|
|
task.outerRowStatus[chkIdx][rowIdx] = status
|
|
}
|
|
cursor++
|
|
}
|
|
if joinResult.chk.IsFull() {
|
|
select {
|
|
case iw.resultCh <- joinResult:
|
|
case <-ctx.Done():
|
|
joinResult.err = ctx.Err()
|
|
return false, joinResult
|
|
}
|
|
failpoint.InjectCall("joinMatchedInnerRow2Chunk")
|
|
joinResult, ok = iw.getNewJoinResult(ctx)
|
|
if !ok {
|
|
return false, joinResult
|
|
}
|
|
}
|
|
}
|
|
return true, joinResult
|
|
}
|
|
|
|
func (iw *indexHashJoinInnerWorker) collectMatchedInnerPtrs4OuterRows(innerRow chunk.Row, innerRowPtr chunk.RowPtr,
|
|
task *indexHashJoinTask, h hash.Hash64, buf []byte) error {
|
|
_, matchedOuterRowIdx, err := iw.getMatchedOuterRows(innerRow, task, h, buf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, outerRowPtr := range matchedOuterRowIdx {
|
|
chkIdx, rowIdx := outerRowPtr.ChkIdx, outerRowPtr.RowIdx
|
|
task.matchedInnerRowPtrs[chkIdx][rowIdx] = append(task.matchedInnerRowPtrs[chkIdx][rowIdx], innerRowPtr)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// doJoinInOrder follows the following steps:
|
|
// 1. collect all the matched inner row ptrs for every outer row
|
|
// 2. do the join work
|
|
// 2.1 collect all the matched inner rows using the collected ptrs for every outer row
|
|
// 2.2 call TryToMatchInners for every outer row
|
|
// 2.3 call OnMissMatch when no inner rows are matched
|
|
func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) (err error) {
|
|
defer func() {
|
|
if err == nil && joinResult.chk != nil {
|
|
if joinResult.chk.NumRows() > 0 {
|
|
select {
|
|
case resultCh <- joinResult:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
} else {
|
|
joinResult.src <- joinResult.chk
|
|
}
|
|
}
|
|
}()
|
|
for i, numChunks := 0, task.innerResult.NumChunks(); i < numChunks; i++ {
|
|
for j, chk := 0, task.innerResult.GetChunk(i); j < chk.NumRows(); j++ {
|
|
row := chk.GetRow(j)
|
|
ptr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)}
|
|
err = iw.collectMatchedInnerPtrs4OuterRows(row, ptr, task, h, iw.joinKeyBuf)
|
|
failpoint.Inject("TestIssue31129", func() {
|
|
err = errors.New("TestIssue31129")
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
// TODO: matchedInnerRowPtrs and matchedInnerRows can be moved to inner worker.
|
|
matchedInnerRows := make([]chunk.Row, 0, len(task.matchedInnerRowPtrs))
|
|
var hasMatched, hasNull, ok bool
|
|
for chkIdx, innerRowPtrs4Chk := range task.matchedInnerRowPtrs {
|
|
for outerRowIdx, innerRowPtrs := range innerRowPtrs4Chk {
|
|
matchedInnerRows, hasMatched, hasNull = matchedInnerRows[:0], false, false
|
|
outerRow := task.outerResult.GetChunk(chkIdx).GetRow(outerRowIdx)
|
|
for _, ptr := range innerRowPtrs {
|
|
matchedInnerRows = append(matchedInnerRows, task.innerResult.GetRow(ptr))
|
|
}
|
|
iw.rowIter.Reset(matchedInnerRows)
|
|
iter := iw.rowIter
|
|
for iter.Begin(); iter.Current() != iter.End(); {
|
|
matched, isNull, err := iw.joiner.TryToMatchInners(outerRow, iter, joinResult.chk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
hasMatched, hasNull = matched || hasMatched, isNull || hasNull
|
|
if joinResult.chk.IsFull() {
|
|
select {
|
|
case resultCh <- joinResult:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
joinResult, ok = iw.getNewJoinResult(ctx)
|
|
if !ok {
|
|
return errors.New("indexHashJoinInnerWorker.doJoinInOrder failed")
|
|
}
|
|
}
|
|
}
|
|
if !hasMatched {
|
|
iw.joiner.OnMissMatch(hasNull, outerRow, joinResult.chk)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|