// Copyright 2024 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package join import ( "context" "hash" "math" "math/bits" "math/rand" "runtime/trace" "sync" "sync/atomic" "time" "unsafe" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/executor/join/joinversion" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/planner/core/base" "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/channel" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/disk" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/memory" ) const minimalHashTableLen = 32 var ( _ exec.Executor = &HashJoinV2Exec{} // EnableHashJoinV2 enable hash join v2, used for test EnableHashJoinV2 = "set tidb_hash_join_version = " + joinversion.HashJoinVersionOptimized // DisableHashJoinV2 disable hash join v2, used for test DisableHashJoinV2 = "set tidb_hash_join_version = " + joinversion.HashJoinVersionLegacy // HashJoinV2Strings is used for test HashJoinV2Strings = []string{DisableHashJoinV2, EnableHashJoinV2} // fakeSel is used when chunk does not have sel field fakeSel []int // the length of fakeSelLength, default max_chunk_size is 1024, // we set fakeSel size to 4*max_chunk_size so it should be enough for most cases fakeSelLength = 4096 ) func init() { fakeSel = make([]int, fakeSelLength) for i := range fakeSel { fakeSel[i] = i } } type hashTableContext struct { // rowTables is used during split partition stage, each buildWorker has // its own rowTable rowTables [][]*rowTable hashTable *hashTableV2 tagHelper *tagPtrHelper memoryTracker *memory.Tracker } func (htc *hashTableContext) reset() { htc.rowTables = nil htc.hashTable = nil htc.tagHelper = nil htc.memoryTracker.Detach() } func (htc *hashTableContext) getAllMemoryUsageInHashTable() int64 { partNum := len(htc.hashTable.tables) totalMemoryUsage := int64(0) for i := range partNum { mem := htc.hashTable.getPartitionMemoryUsage(i) totalMemoryUsage += mem } return totalMemoryUsage } func (htc *hashTableContext) clearHashTable() { partNum := len(htc.hashTable.tables) for i := range partNum { htc.hashTable.clearPartitionSegments(i) } } func (htc *hashTableContext) getPartitionMemoryUsage(partID int) int64 { totalMemoryUsage := int64(0) for _, tables := range htc.rowTables { if tables != nil && tables[partID] != nil { totalMemoryUsage += tables[partID].getTotalMemoryUsage() } } return totalMemoryUsage } func (htc *hashTableContext) getSegmentsInRowTable(workerID, partitionID int) []*rowTableSegment { if htc.rowTables[workerID] != nil && htc.rowTables[workerID][partitionID] != nil { return htc.rowTables[workerID][partitionID].getSegments() } return nil } func (htc *hashTableContext) getAllSegmentsMemoryUsageInRowTable() int64 { totalMemoryUsage := int64(0) for _, tables := range htc.rowTables { for _, table := range tables { if table != nil { totalMemoryUsage += table.getTotalMemoryUsage() } } } return totalMemoryUsage } func (htc *hashTableContext) clearAllSegmentsInRowTable() { for _, tables := range htc.rowTables { for _, table := range tables { if table != nil { table.clearSegments() } } } } func (htc *hashTableContext) clearSegmentsInRowTable(workerID, partitionID int) { if htc.rowTables[workerID] != nil && htc.rowTables[workerID][partitionID] != nil { htc.rowTables[workerID][partitionID].clearSegments() } } func (htc *hashTableContext) build(task *buildTask) { htc.hashTable.tables[task.partitionIdx].build(task.segStartIdx, task.segEndIdx, htc.tagHelper) } func (htc *hashTableContext) lookup(partitionIndex int, hashValue uint64) taggedPtr { return htc.hashTable.tables[partitionIndex].lookup(hashValue, htc.tagHelper) } func (htc *hashTableContext) appendRowSegment(workerID, partitionID int, seg *rowTableSegment) { if len(seg.hashValues) == 0 { return } if htc.rowTables[workerID][partitionID] == nil { htc.rowTables[workerID][partitionID] = newRowTable() } seg.initTaggedBits() htc.rowTables[workerID][partitionID].segments = append(htc.rowTables[workerID][partitionID].segments, seg) } func (*hashTableContext) calculateHashTableMemoryUsage(rowTables []*rowTable) (int64, []int64) { totalMemoryUsage := int64(0) partitionsMemoryUsage := make([]int64, 0) for _, table := range rowTables { hashTableLength := getHashTableLengthByRowTable(table) memoryUsage := getHashTableMemoryUsage(hashTableLength) partitionsMemoryUsage = append(partitionsMemoryUsage, memoryUsage) totalMemoryUsage += memoryUsage } return totalMemoryUsage, partitionsMemoryUsage } // In order to avoid the allocation of hash table, we pre-calculate the memory usage in advance // to know which hash tables need to be created. func (htc *hashTableContext) tryToSpill(rowTables []*rowTable, spillHelper *hashJoinSpillHelper) ([]*rowTable, error) { totalMemoryUsage, hashTableMemoryUsage := htc.calculateHashTableMemoryUsage(rowTables) // Pre-consume the memory usage htc.memoryTracker.Consume(totalMemoryUsage) if spillHelper != nil && spillHelper.isSpillNeeded() { spillHelper.spillTriggeredBeforeBuildingHashTableForTest = true err := spillHelper.spillRowTable(hashTableMemoryUsage) if err != nil { return nil, err } spilledPartition := spillHelper.getSpilledPartitions() for _, partID := range spilledPartition { // Clear spilled row tables rowTables[partID].clearSegments() } // Though some partitions have been spilled or are empty, their hash tables are still be created // because probe rows in these partitions may access their hash tables. // We need to consider these memory usage. totalDefaultMemUsage := getHashTableMemoryUsage(minimalHashTableLen) * int64(len(spilledPartition)) // Hash table memory usage has already been released in spill operation. // So it's unnecessary to release them again. htc.memoryTracker.Consume(totalDefaultMemUsage) } return rowTables, nil } func (htc *hashTableContext) mergeRowTablesToHashTable(partitionNumber uint, spillHelper *hashJoinSpillHelper) (int, error) { rowTables := make([]*rowTable, partitionNumber) for i := range partitionNumber { rowTables[i] = newRowTable() } totalSegmentCnt := 0 for _, rowTablesPerWorker := range htc.rowTables { for partIdx, rt := range rowTablesPerWorker { if rt == nil { continue } rowTables[partIdx].merge(rt) totalSegmentCnt += len(rt.segments) } } var err error // spillHelper may be nil in ut if spillHelper != nil { rowTables, err = htc.tryToSpill(rowTables, spillHelper) if err != nil { return 0, err } spillHelper.setCanSpillFlag(false) } taggedBits := uint8(maxTaggedBits) for i := range partitionNumber { for _, seg := range rowTables[i].segments { taggedBits = min(taggedBits, seg.taggedBits) } htc.hashTable.tables[i] = newSubTable(rowTables[i]) } htc.tagHelper = &tagPtrHelper{} htc.tagHelper.init(taggedBits) htc.clearAllSegmentsInRowTable() return totalSegmentCnt, nil } // HashJoinCtxV2 is the hash join ctx used in hash join v2 type HashJoinCtxV2 struct { hashJoinCtxBase partitionNumber uint partitionMaskOffset int ProbeKeyTypes []*types.FieldType BuildKeyTypes []*types.FieldType stats *hashJoinRuntimeStatsV2 RightAsBuildSide bool BuildFilter expression.CNFExprs ProbeFilter expression.CNFExprs OtherCondition expression.CNFExprs hashTableContext *hashTableContext hashTableMeta *joinTableMeta needScanRowTableAfterProbeDone bool LUsed, RUsed []int LUsedInOtherCondition, RUsedInOtherCondition []int maxSpillRound int spillHelper *hashJoinSpillHelper spillAction *hashJoinSpillAction } func (hCtx *HashJoinCtxV2) resetHashTableContextForRestore() { memoryUsage := hCtx.hashTableContext.getAllSegmentsMemoryUsageInRowTable() if intest.InTest && memoryUsage != 0 { panic("All rowTables in hashTableContext should be cleared") } memoryUsage = hCtx.hashTableContext.getAllMemoryUsageInHashTable() hCtx.hashTableContext.clearHashTable() hCtx.hashTableContext.memoryTracker.Consume(-memoryUsage) } // partitionNumber is always power of 2 func genHashJoinPartitionNumber(partitionHint uint) uint { partitionNumber := uint(1) for partitionNumber < partitionHint && partitionNumber < 16 { partitionNumber <<= 1 } return partitionNumber } func getPartitionMaskOffset(partitionNumber uint) int { msbPos := bits.TrailingZeros64(uint64(partitionNumber)) // top MSB bits in hash value will be used to partition data return 64 - msbPos } // SetupPartitionInfo set up partitionNumber and partitionMaskOffset based on concurrency func (hCtx *HashJoinCtxV2) SetupPartitionInfo() { hCtx.partitionNumber = genHashJoinPartitionNumber(hCtx.Concurrency) hCtx.partitionMaskOffset = getPartitionMaskOffset(hCtx.partitionNumber) } // initHashTableContext create hashTableContext for current HashJoinCtxV2 func (hCtx *HashJoinCtxV2) initHashTableContext() { hCtx.hashTableContext = &hashTableContext{} hCtx.hashTableContext.rowTables = make([][]*rowTable, hCtx.Concurrency) for index := range hCtx.hashTableContext.rowTables { hCtx.hashTableContext.rowTables[index] = make([]*rowTable, hCtx.partitionNumber) } hCtx.hashTableContext.hashTable = &hashTableV2{ tables: make([]*subTable, hCtx.partitionNumber), partitionNumber: uint64(hCtx.partitionNumber), } hCtx.hashTableContext.memoryTracker = memory.NewTracker(memory.LabelForHashTableInHashJoinV2, -1) } // ProbeSideTupleFetcherV2 reads tuples from ProbeSideExec and send them to ProbeWorkers. type ProbeSideTupleFetcherV2 struct { probeSideTupleFetcherBase *HashJoinCtxV2 canSkipProbeIfHashTableIsEmpty bool } // ProbeWorkerV2 is the probe worker used in hash join v2 type ProbeWorkerV2 struct { probeWorkerBase HashJoinCtx *HashJoinCtxV2 // We build individual joinProbe for each join worker when use chunk-based // execution, to avoid the concurrency of joiner.chk and joiner.selected. JoinProbe ProbeV2 restoredChkBuf *chunk.Chunk } func (w *ProbeWorkerV2) updateProbeStatistic(start time.Time, probeTime int64) { t := time.Since(start) atomic.AddInt64(&w.HashJoinCtx.stats.probe, probeTime) atomic.AddInt64(&w.HashJoinCtx.stats.workerFetchAndProbe, int64(t)) setMaxValue(&w.HashJoinCtx.stats.maxProbeForCurrentRound, probeTime) setMaxValue(&w.HashJoinCtx.stats.maxWorkerFetchAndProbeForCurrentRound, int64(t)) } func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks, start time.Time) { probeTime := int64(0) if w.HashJoinCtx.stats != nil { defer func() { w.updateProbeStatistic(start, probeTime) }() } ok, joinResult := w.getNewJoinResult() if !ok { return } chunkNum := inDisk.NumChunks() for i := range chunkNum { select { case <-w.HashJoinCtx.closeCh: return default: } failpoint.Inject("ConsumeRandomPanic", nil) err := inDisk.FillChunk(i, w.restoredChkBuf) if err != nil { joinResult.err = err break } err = triggerIntest(2) if err != nil { joinResult.err = err break } start := time.Now() waitTime := int64(0) ok, waitTime, joinResult = w.processOneRestoredProbeChunk(joinResult) probeTime += int64(time.Since(start)) - waitTime if !ok { break } } err := w.JoinProbe.SpillRemainingProbeChunks() if err != nil { joinResult.err = err } if joinResult.err != nil || (joinResult.chk != nil && joinResult.chk.NumRows() > 0) { w.HashJoinCtx.joinResultCh <- joinResult } else if joinResult.chk != nil && joinResult.chk.NumRows() == 0 { w.joinChkResourceCh <- joinResult.chk } } // BuildWorkerV2 is the build worker used in hash join v2 type BuildWorkerV2 struct { buildWorkerBase HashJoinCtx *HashJoinCtxV2 BuildTypes []*types.FieldType HasNullableKey bool WorkerID uint builder *rowTableBuilder restoredChkBuf *chunk.Chunk } func (b *BuildWorkerV2) getSegmentsInRowTable(partID int) []*rowTableSegment { return b.HashJoinCtx.hashTableContext.getSegmentsInRowTable(int(b.WorkerID), partID) } func (b *BuildWorkerV2) clearSegmentsInRowTable(partID int) { b.HashJoinCtx.hashTableContext.clearSegmentsInRowTable(int(b.WorkerID), partID) } func (b *BuildWorkerV2) updatePartitionData(cost int64) { atomic.AddInt64(&b.HashJoinCtx.stats.partitionData, cost) setMaxValue(&b.HashJoinCtx.stats.maxPartitionDataForCurrentRound, cost) } func (b *BuildWorkerV2) processOneRestoredChunk(cost *int64) error { start := time.Now() err := b.builder.processOneRestoredChunk(b.restoredChkBuf, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber)) if err != nil { return err } *cost += int64(time.Since(start)) return nil } func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, inDisk *chunk.DataInDiskByChunks, fetcherAndWorkerSyncer *sync.WaitGroup, hasErr bool, cost *int64) (err error) { defer func() { fetcherAndWorkerSyncer.Done() if r := recover(); r != nil { // We shouldn't throw the panic out of this function, or // we can't continue to consume `syncCh` channel and call // the `Done` function of `fetcherAndWorkerSyncer`. // So it's necessary to handle it here. err = util.GetRecoverError(r) } }() if hasErr { return nil } err = inDisk.FillChunk(i, b.restoredChkBuf) if err != nil { return err } err = triggerIntest(3) if err != nil { return err } err = b.processOneRestoredChunk(cost) if err != nil { return err } return nil } func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestore(inDisk *chunk.DataInDiskByChunks, syncCh chan *chunk.Chunk, fetcherAndWorkerSyncer *sync.WaitGroup, errCh chan error, doneCh chan struct{}) { cost := int64(0) defer func() { if b.HashJoinCtx.stats != nil { b.updatePartitionData(cost) } }() // When error happens, hasErr will be set to true. // However, we should not directly exit the function, as we must // call `fetcherAndWorkerSyncer.Done()` in `splitPartitionAndAppendToRowTableForRestoreImpl` // fetcherAndWorkerSyncer is a counter for synchronizing, it should be `Done` for `chunkNum`. // When `hasErr` is set, `splitPartitionAndAppendToRowTableForRestoreImpl` could exit early. hasErr := false chunkNum := inDisk.NumChunks() for i := range chunkNum { _, ok := <-syncCh if !ok { break } err := b.splitPartitionAndAppendToRowTableForRestoreImpl(i, inDisk, fetcherAndWorkerSyncer, hasErr, &cost) if err != nil { hasErr = true handleErr(err, errCh, doneCh) } } } func (b *BuildWorkerV2) splitPartitionAndAppendToRowTable(typeCtx types.Context, fetcherAndWorkerSyncer *sync.WaitGroup, srcChkCh chan *chunk.Chunk, errCh chan error, doneCh chan struct{}) { cost := int64(0) defer func() { if b.HashJoinCtx.stats != nil { b.updatePartitionData(cost) } }() // When error happens, hasErr will be set to true. // However, we should not directly exit the function, as we must // call `fetcherAndWorkerSyncer.Done()` in `splitPartitionAndAppendToRowTableImpl` // fetcherAndWorkerSyncer is a counter for synchronizing, it should be `Done` for `chunkNum`. // When `hasErr` is set, `splitPartitionAndAppendToRowTableImpl` could exit early. hasErr := false for chk := range srcChkCh { err := b.splitPartitionAndAppendToRowTableImpl(typeCtx, chk, fetcherAndWorkerSyncer, hasErr, &cost) if err != nil { hasErr = true handleErr(err, errCh, doneCh) } } } func (b *BuildWorkerV2) processOneChunk(typeCtx types.Context, chk *chunk.Chunk, cost *int64) error { start := time.Now() err := b.builder.processOneChunk(chk, typeCtx, b.HashJoinCtx, int(b.WorkerID)) failpoint.Inject("splitPartitionPanic", nil) *cost += int64(time.Since(start)) return err } func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableImpl(typeCtx types.Context, chk *chunk.Chunk, fetcherAndWorkerSyncer *sync.WaitGroup, hasErr bool, cost *int64) error { defer func() { fetcherAndWorkerSyncer.Done() }() if hasErr { return nil } err := triggerIntest(5) if err != nil { return err } err = b.processOneChunk(typeCtx, chk, cost) if err != nil { return err } return nil } // buildHashTableForList builds hash table from `list`. func (b *BuildWorkerV2) buildHashTable(taskCh chan *buildTask) error { cost := int64(0) defer func() { if b.HashJoinCtx.stats != nil { atomic.AddInt64(&b.HashJoinCtx.stats.buildHashTable, cost) setMaxValue(&b.HashJoinCtx.stats.maxBuildHashTableForCurrentRound, cost) } }() for task := range taskCh { start := time.Now() b.HashJoinCtx.hashTableContext.build(task) failpoint.Inject("buildHashTablePanic", nil) cost += int64(time.Since(start)) err := triggerIntest(5) if err != nil { return err } } return nil } // NewJoinBuildWorkerV2 create a BuildWorkerV2 func NewJoinBuildWorkerV2(ctx *HashJoinCtxV2, workID uint, buildSideExec exec.Executor, buildKeyColIdx []int, buildTypes []*types.FieldType) *BuildWorkerV2 { hasNullableKey := false for _, idx := range buildKeyColIdx { if !mysql.HasNotNullFlag(buildTypes[idx].GetFlag()) { hasNullableKey = true break } } worker := &BuildWorkerV2{ HashJoinCtx: ctx, BuildTypes: buildTypes, WorkerID: workID, HasNullableKey: hasNullableKey, } worker.BuildSideExec = buildSideExec worker.BuildKeyColIdx = buildKeyColIdx return worker } // HashJoinV2Exec implements the hash join algorithm. type HashJoinV2Exec struct { exec.BaseExecutor *HashJoinCtxV2 ProbeSideTupleFetcher *ProbeSideTupleFetcherV2 ProbeWorkers []*ProbeWorkerV2 BuildWorkers []*BuildWorkerV2 workerWg util.WaitGroupWrapper waiterWg util.WaitGroupWrapper restoredBuildInDisk []*chunk.DataInDiskByChunks restoredProbeInDisk []*chunk.DataInDiskByChunks prepared bool inRestore bool IsGA bool isMemoryClearedForTest bool FileNamePrefixForTest string } func (e *HashJoinV2Exec) isAllMemoryClearedForTest() bool { return e.isMemoryClearedForTest } func (e *HashJoinV2Exec) initMaxSpillRound() { if e.partitionNumber > 1024 { e.maxSpillRound = 1 return } // Calculate the minimum number of rounds required for the total partitions to exceed 1024 e.maxSpillRound = int(math.Log(1024) / math.Log(float64(e.partitionNumber))) } // Close implements the Executor Close interface. func (e *HashJoinV2Exec) Close() error { if e.closeCh != nil { close(e.closeCh) } e.finished.Store(true) if e.prepared { if e.buildFinished != nil { channel.Clear(e.buildFinished) } if e.joinResultCh != nil { channel.Clear(e.joinResultCh) } if e.ProbeSideTupleFetcher.probeChkResourceCh != nil { close(e.ProbeSideTupleFetcher.probeChkResourceCh) channel.Clear(e.ProbeSideTupleFetcher.probeChkResourceCh) } for i := range e.ProbeSideTupleFetcher.probeResultChs { channel.Clear(e.ProbeSideTupleFetcher.probeResultChs[i]) } for i := range e.ProbeWorkers { close(e.ProbeWorkers[i].joinChkResourceCh) channel.Clear(e.ProbeWorkers[i].joinChkResourceCh) } e.ProbeSideTupleFetcher.probeChkResourceCh = nil e.waiterWg.Wait() e.hashTableContext.reset() } for _, w := range e.ProbeWorkers { w.joinChkResourceCh = nil } if e.stats != nil { defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats) } e.releaseDisk() if e.spillHelper != nil { e.spillHelper.close() } err := e.BaseExecutor.Close() return err } // Open implements the Executor Open interface. func (e *HashJoinV2Exec) Open(ctx context.Context) error { if err := e.BaseExecutor.Open(ctx); err != nil { e.closeCh = nil e.prepared = false return err } return e.OpenSelf() } // OpenSelf opens hash join itself and initializes the hash join context. func (e *HashJoinV2Exec) OpenSelf() error { e.prepared = false e.inRestore = false needScanRowTableAfterProbeDone := e.ProbeWorkers[0].JoinProbe.NeedScanRowTable() e.HashJoinCtxV2.needScanRowTableAfterProbeDone = needScanRowTableAfterProbeDone if e.RightAsBuildSide { e.hashTableMeta = newTableMeta(e.BuildWorkers[0].BuildKeyColIdx, e.BuildWorkers[0].BuildTypes, e.BuildKeyTypes, e.ProbeKeyTypes, e.RUsedInOtherCondition, e.RUsed, needScanRowTableAfterProbeDone) } else { e.hashTableMeta = newTableMeta(e.BuildWorkers[0].BuildKeyColIdx, e.BuildWorkers[0].BuildTypes, e.BuildKeyTypes, e.ProbeKeyTypes, e.LUsedInOtherCondition, e.LUsed, needScanRowTableAfterProbeDone) } e.HashJoinCtxV2.ChunkAllocPool = e.AllocPool if e.memTracker != nil { e.memTracker.Reset() } else { e.memTracker = memory.NewTracker(e.ID(), -1) } e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) if e.diskTracker != nil { e.diskTracker.Reset() } else { e.diskTracker = disk.NewTracker(e.ID(), -1) } e.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker) e.spillHelper = newHashJoinSpillHelper(e, int(e.partitionNumber), e.ProbeSideTupleFetcher.ProbeSideExec.RetFieldTypes(), e.FileNamePrefixForTest) e.maxSpillRound = 1 if vardef.EnableTmpStorageOnOOM.Load() && e.partitionNumber > 1 { e.initMaxSpillRound() e.spillAction = newHashJoinSpillAction(e.spillHelper) e.Ctx().GetSessionVars().MemTracker.FallbackOldAndSetNewAction(e.spillAction) } e.workerWg = util.WaitGroupWrapper{} e.waiterWg = util.WaitGroupWrapper{} e.closeCh = make(chan struct{}) e.finished.Store(false) if e.RuntimeStats() != nil && e.stats == nil { e.stats = &hashJoinRuntimeStatsV2{} e.stats.concurrent = int(e.Concurrency) } if e.stats != nil { e.stats.reset() e.stats.spill.partitionNum = int(e.partitionNumber) e.stats.isHashJoinGA = e.IsGA } return nil } func (fetcher *ProbeSideTupleFetcherV2) shouldLimitProbeFetchSize() bool { if fetcher.JoinType == base.LeftOuterJoin && fetcher.RightAsBuildSide { return true } if fetcher.JoinType == base.RightOuterJoin && !fetcher.RightAsBuildSide { return true } return false } func (e *HashJoinV2Exec) canSkipProbeIfHashTableIsEmpty() bool { switch e.JoinType { case base.InnerJoin: return true case base.LeftOuterJoin: return !e.RightAsBuildSide case base.RightOuterJoin: return e.RightAsBuildSide case base.SemiJoin: return e.RightAsBuildSide default: return false } } func (e *HashJoinV2Exec) initializeForProbe() { e.ProbeSideTupleFetcher.HashJoinCtxV2 = e.HashJoinCtxV2 // e.joinResultCh is for transmitting the join result chunks to the main thread. e.joinResultCh = make(chan *hashjoinWorkerResult, e.Concurrency+1) e.ProbeSideTupleFetcher.initializeForProbeBase(e.Concurrency, e.joinResultCh) e.ProbeSideTupleFetcher.canSkipProbeIfHashTableIsEmpty = e.canSkipProbeIfHashTableIsEmpty() // set buildSuccess to false by default, it will be set to true if build finishes successfully e.ProbeSideTupleFetcher.buildSuccess = false for i := range e.Concurrency { e.ProbeWorkers[i].initializeForProbe(e.ProbeSideTupleFetcher.probeChkResourceCh, e.ProbeSideTupleFetcher.probeResultChs[i], e) e.ProbeWorkers[i].JoinProbe.ResetProbeCollision() } } func (e *HashJoinV2Exec) startProbeFetcher(ctx context.Context) { if !e.inRestore { fetchProbeSideChunksFunc := func() { defer trace.StartRegion(ctx, "HashJoinProbeSideFetcher").End() e.ProbeSideTupleFetcher.fetchProbeSideChunks( ctx, e.MaxChunkSize(), func() bool { return e.ProbeSideTupleFetcher.hashTableContext.hashTable.isHashTableEmpty() }, func() bool { return e.spillHelper.isSpillTriggered() }, e.ProbeSideTupleFetcher.canSkipProbeIfHashTableIsEmpty, e.ProbeSideTupleFetcher.needScanRowTableAfterProbeDone, e.ProbeSideTupleFetcher.shouldLimitProbeFetchSize(), &e.ProbeSideTupleFetcher.hashJoinCtxBase) } e.workerWg.RunWithRecover(fetchProbeSideChunksFunc, e.ProbeSideTupleFetcher.handleProbeSideFetcherPanic) } } func (e *HashJoinV2Exec) startProbeJoinWorkers(ctx context.Context) { var start time.Time if e.HashJoinCtxV2.stats != nil { start = time.Now() } if e.inRestore { // Wait for the restore build err := <-e.buildFinished if err != nil { return } // in restore, there is no standalone probe fetcher goroutine, so set buildSuccess here e.ProbeSideTupleFetcher.buildSuccess = true } for i := range e.Concurrency { workerID := i e.workerWg.RunWithRecover(func() { defer trace.StartRegion(ctx, "HashJoinWorker").End() if e.inRestore { e.ProbeWorkers[workerID].restoreAndProbe(e.restoredProbeInDisk[workerID], start) } else { e.ProbeWorkers[workerID].runJoinWorker(start) } }, e.ProbeWorkers[workerID].handleProbeWorkerPanic) } } func (e *HashJoinV2Exec) fetchAndProbeHashTable(ctx context.Context) { start := time.Now() e.startProbeFetcher(ctx) // Join workers directly read data from disk when we are in restore status // and read data from fetcher otherwise. e.startProbeJoinWorkers(ctx) e.waiterWg.RunWithRecover( func() { e.waitJoinWorkers(start) }, nil) } func (w *ProbeWorkerV2) handleProbeWorkerPanic(r any) { if r != nil { w.HashJoinCtx.joinResultCh <- &hashjoinWorkerResult{err: util.GetRecoverError(r)} } } func (e *HashJoinV2Exec) handleJoinWorkerPanic(r any) { if r != nil { e.joinResultCh <- &hashjoinWorkerResult{err: util.GetRecoverError(r)} } } func (e *HashJoinV2Exec) waitJoinWorkers(start time.Time) { e.workerWg.Wait() if e.stats != nil { e.HashJoinCtxV2.stats.fetchAndProbe += int64(time.Since(start)) for _, prober := range e.ProbeWorkers { e.stats.probeCollision += int64(prober.JoinProbe.GetProbeCollision()) } } if e.ProbeSideTupleFetcher.buildSuccess { // only scan row table if build is successful if e.ProbeWorkers[0] != nil && e.ProbeWorkers[0].JoinProbe.NeedScanRowTable() { for i := range e.Concurrency { var workerID = i e.workerWg.RunWithRecover(func() { e.ProbeWorkers[workerID].scanRowTableAfterProbeDone() }, e.handleJoinWorkerPanic) } e.workerWg.Wait() } } } func (w *ProbeWorkerV2) scanRowTableAfterProbeDone() { w.JoinProbe.InitForScanRowTable() ok, joinResult := w.getNewJoinResult() if !ok { return } for !w.JoinProbe.IsScanRowTableDone() { joinResult = w.JoinProbe.ScanRowTable(joinResult, &w.HashJoinCtx.SessCtx.GetSessionVars().SQLKiller) if joinResult.err != nil { w.HashJoinCtx.joinResultCh <- joinResult return } err := triggerIntest(4) if err != nil { w.HashJoinCtx.joinResultCh <- &hashjoinWorkerResult{err: err} return } if joinResult.chk.IsFull() { w.HashJoinCtx.joinResultCh <- joinResult ok, joinResult = w.getNewJoinResult() if !ok { return } } } if joinResult.err != nil || (joinResult.chk != nil && joinResult.chk.NumRows() > 0) { w.HashJoinCtx.joinResultCh <- joinResult } else if joinResult.chk != nil && joinResult.chk.NumRows() == 0 { w.joinChkResourceCh <- joinResult.chk } } func (w *ProbeWorkerV2) processOneRestoredProbeChunk(joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) { joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(w.restoredChkBuf) if joinResult.err != nil { return false, 0, joinResult } return w.probeAndSendResult(joinResult) } func (w *ProbeWorkerV2) processOneProbeChunk(probeChunk *chunk.Chunk, joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) { joinResult.err = w.JoinProbe.SetChunkForProbe(probeChunk) if joinResult.err != nil { return false, 0, joinResult } return w.probeAndSendResult(joinResult) } func (w *ProbeWorkerV2) probeAndSendResult(joinResult *hashjoinWorkerResult) (bool, int64, *hashjoinWorkerResult) { if w.HashJoinCtx.spillHelper.areAllPartitionsSpilled() { if intest.InTest && w.HashJoinCtx.spillHelper.hashJoinExec.inRestore { w.HashJoinCtx.spillHelper.skipProbeInRestoreForTest.Store(true) } return true, 0, joinResult } var ok bool waitTime := int64(0) for !w.JoinProbe.IsCurrentChunkProbeDone() { ok, joinResult = w.JoinProbe.Probe(joinResult, &w.HashJoinCtx.SessCtx.GetSessionVars().SQLKiller) if !ok || joinResult.err != nil { return ok, waitTime, joinResult } failpoint.Inject("processOneProbeChunkPanic", nil) if joinResult.chk.IsFull() { waitStart := time.Now() w.HashJoinCtx.joinResultCh <- joinResult ok, joinResult = w.getNewJoinResult() waitTime += int64(time.Since(waitStart)) if !ok { return false, waitTime, joinResult } } } return true, waitTime, joinResult } func (w *ProbeWorkerV2) runJoinWorker(start time.Time) { probeTime := int64(0) if w.HashJoinCtx.stats != nil { defer func() { w.updateProbeStatistic(start, probeTime) }() } var ( probeSideResult *chunk.Chunk ) ok, joinResult := w.getNewJoinResult() if !ok { return } // Read and filter probeSideResult, and join the probeSideResult with the build side rows. emptyProbeSideResult := &probeChkResource{ dest: w.probeResultCh, } for ok := true; ok; { select { case <-w.HashJoinCtx.closeCh: return case probeSideResult, ok = <-w.probeResultCh: } failpoint.Inject("ConsumeRandomPanic", nil) if !ok { break } err := triggerIntest(2) if err != nil { joinResult.err = err break } start := time.Now() waitTime := int64(0) ok, waitTime, joinResult = w.processOneProbeChunk(probeSideResult, joinResult) probeTime += int64(time.Since(start)) - waitTime if !ok { break } probeSideResult.Reset() emptyProbeSideResult.chk = probeSideResult // Give back to probe fetcher w.probeChkResourceCh <- emptyProbeSideResult } err := w.JoinProbe.SpillRemainingProbeChunks() if err != nil { joinResult.err = err } if joinResult.err != nil || (joinResult.chk != nil && joinResult.chk.NumRows() > 0) { w.HashJoinCtx.joinResultCh <- joinResult } else if joinResult.chk != nil && joinResult.chk.NumRows() == 0 { w.joinChkResourceCh <- joinResult.chk } } func (w *ProbeWorkerV2) getNewJoinResult() (bool, *hashjoinWorkerResult) { joinResult := &hashjoinWorkerResult{ src: w.joinChkResourceCh, } ok := true select { case <-w.HashJoinCtx.closeCh: ok = false case joinResult.chk, ok = <-w.joinChkResourceCh: } return ok, joinResult } func (e *HashJoinV2Exec) reset() { e.resetProbeStatus() e.releaseDisk() // set buildSuccess to false by default, it will be set to true if build finishes successfully e.ProbeSideTupleFetcher.buildSuccess = false e.resetHashTableContextForRestore() e.spillHelper.setCanSpillFlag(true) if e.HashJoinCtxV2.stats != nil { e.HashJoinCtxV2.stats.resetCurrentRound() } } func (e *HashJoinV2Exec) collectSpillStats() { if e.stats == nil || !e.spillHelper.isSpillTriggered() { return } round := e.spillHelper.round if len(e.stats.spill.totalSpillBytesPerRound) < round+1 { e.stats.spill.totalSpillBytesPerRound = append(e.stats.spill.totalSpillBytesPerRound, 0) e.stats.spill.spillBuildRowTableBytesPerRound = append(e.stats.spill.spillBuildRowTableBytesPerRound, 0) e.stats.spill.spillBuildHashTableBytesPerRound = append(e.stats.spill.spillBuildHashTableBytesPerRound, 0) e.stats.spill.spilledPartitionNumPerRound = append(e.stats.spill.spilledPartitionNumPerRound, 0) } buildRowTableSpillBytes := e.spillHelper.getBuildSpillBytes() buildHashTableSpillBytes := getHashTableMemoryUsage(getHashTableLengthByRowLen(e.spillHelper.spilledValidRowNum.Load())) probeSpillBytes := e.spillHelper.getProbeSpillBytes() spilledPartitionNum := e.spillHelper.getSpilledPartitionsNum() e.stats.spill.spillBuildRowTableBytesPerRound[round] += buildRowTableSpillBytes e.stats.spill.spillBuildHashTableBytesPerRound[round] += buildHashTableSpillBytes e.stats.spill.totalSpillBytesPerRound[round] += buildRowTableSpillBytes + probeSpillBytes e.stats.spill.spilledPartitionNumPerRound[round] += spilledPartitionNum } func (e *HashJoinV2Exec) startBuildAndProbe(ctx context.Context) { defer func() { if r := recover(); r != nil { e.joinResultCh <- &hashjoinWorkerResult{err: util.GetRecoverError(r)} } close(e.joinResultCh) }() lastRound := 0 for { if e.finished.Load() { return } e.buildFinished = make(chan error, 1) e.fetchAndBuildHashTable(ctx) e.fetchAndProbeHashTable(ctx) e.waiterWg.Wait() e.collectSpillStats() e.reset() e.spillHelper.spillRoundForTest = max(e.spillHelper.spillRoundForTest, lastRound) err := e.spillHelper.prepareForRestoring(lastRound) if err != nil { e.joinResultCh <- &hashjoinWorkerResult{err: err} return } restoredPartition := e.spillHelper.stack.pop() if restoredPartition == nil { // No more data to restore return } e.spillHelper.round = restoredPartition.round if e.memTracker.BytesConsumed() != 0 { e.isMemoryClearedForTest = false } lastRound = restoredPartition.round e.restoredBuildInDisk = restoredPartition.buildSideChunks e.restoredProbeInDisk = restoredPartition.probeSideChunks if e.stats != nil && e.stats.spill.round < lastRound { e.stats.spill.round = lastRound } e.inRestore = true } } func (e *HashJoinV2Exec) resetProbeStatus() { for _, probe := range e.ProbeWorkers { probe.JoinProbe.ResetProbe() } } func (e *HashJoinV2Exec) releaseDisk() { if e.restoredBuildInDisk != nil { for _, inDisk := range e.restoredBuildInDisk { inDisk.Close() } e.restoredBuildInDisk = nil } if e.restoredProbeInDisk != nil { for _, inDisk := range e.restoredProbeInDisk { inDisk.Close() } e.restoredProbeInDisk = nil } } // Next implements the Executor Next interface. // hash join constructs the result following these steps: // step 1. fetch data from build side child and build a hash table; // step 2. fetch data from probe child in a background goroutine and probe the hash table in multiple join workers. func (e *HashJoinV2Exec) Next(ctx context.Context, req *chunk.Chunk) (err error) { if !e.prepared { e.initHashTableContext() e.initializeForProbe() e.spillHelper.setCanSpillFlag(true) e.buildFinished = make(chan error, 1) e.hashTableContext.memoryTracker.AttachTo(e.memTracker) go e.startBuildAndProbe(ctx) e.prepared = true } if e.ProbeSideTupleFetcher.shouldLimitProbeFetchSize() { atomic.StoreInt64(&e.ProbeSideTupleFetcher.requiredRows, int64(req.RequiredRows())) } req.Reset() result, ok := <-e.joinResultCh if !ok { return nil } if result.err != nil { e.finished.Store(true) return result.err } req.SwapColumns(result.chk) result.src <- result.chk return nil } func (e *HashJoinV2Exec) handleFetchAndBuildHashTablePanic(r any) { if r != nil { e.buildFinished <- util.GetRecoverError(r) } close(e.buildFinished) } // checkBalance checks whether the segment count of each partition is balanced. func (e *HashJoinV2Exec) checkBalance(totalSegmentCnt int) bool { isBalanced := e.Concurrency == e.partitionNumber if !isBalanced { return false } avgSegCnt := totalSegmentCnt / int(e.partitionNumber) balanceThreshold := int(float64(avgSegCnt) * 0.8) subTables := e.HashJoinCtxV2.hashTableContext.hashTable.tables for _, subTable := range subTables { if math.Abs(float64(len(subTable.rowData.segments)-avgSegCnt)) > float64(balanceThreshold) { isBalanced = false break } } return isBalanced } func (e *HashJoinV2Exec) createTasks(buildTaskCh chan<- *buildTask, totalSegmentCnt int, doneCh chan struct{}) { isBalanced := e.checkBalance(totalSegmentCnt) segStep := max(1, totalSegmentCnt/int(e.Concurrency)) subTables := e.HashJoinCtxV2.hashTableContext.hashTable.tables createBuildTask := func(partIdx int, segStartIdx int, segEndIdx int) *buildTask { return &buildTask{partitionIdx: partIdx, segStartIdx: segStartIdx, segEndIdx: segEndIdx} } failpoint.Inject("createTasksPanic", nil) if isBalanced { for partIdx, subTable := range subTables { _ = triggerIntest(5) segmentsLen := len(subTable.rowData.segments) select { case <-doneCh: return case buildTaskCh <- createBuildTask(partIdx, 0, segmentsLen): } } return } partitionStartIndex := make([]int, len(subTables)) partitionSegmentLength := make([]int, len(subTables)) for i := range subTables { partitionStartIndex[i] = 0 partitionSegmentLength[i] = len(subTables[i].rowData.segments) } for { hasNewTask := false for partIdx := range subTables { // create table by round-robin all the partitions so the build thread is likely to build different partition at the same time if partitionStartIndex[partIdx] < partitionSegmentLength[partIdx] { startIndex := partitionStartIndex[partIdx] endIndex := min(startIndex+segStep, partitionSegmentLength[partIdx]) select { case <-doneCh: return case buildTaskCh <- createBuildTask(partIdx, startIndex, endIndex): } partitionStartIndex[partIdx] = endIndex hasNewTask = true } } if !hasNewTask { break } } } func (e *HashJoinV2Exec) fetchAndBuildHashTable(ctx context.Context) { e.workerWg.RunWithRecover(func() { defer trace.StartRegion(ctx, "HashJoinHashTableBuilder").End() e.fetchAndBuildHashTableImpl(ctx) }, e.handleFetchAndBuildHashTablePanic) } func (e *HashJoinV2Exec) fetchAndBuildHashTableImpl(ctx context.Context) { if e.stats != nil { start := time.Now() defer func() { e.stats.fetchAndBuildHashTable += int64(time.Since(start)) }() } waitJobDone := func(wg *sync.WaitGroup, errCh chan error) bool { wg.Wait() close(errCh) if err := <-errCh; err != nil { e.buildFinished <- err return false } return true } // It's useful when spill is triggered and the fetcher could know when workers finish their works. fetcherAndWorkerSyncer := &sync.WaitGroup{} wg := new(sync.WaitGroup) errCh := make(chan error, 1+e.Concurrency) // doneCh is used by the consumer(splitAndAppendToRowTable) to info the producer(fetchBuildSideRows) that the consumer meet error and stop consume data doneCh := make(chan struct{}, e.Concurrency) // init builder, todo maybe the builder can be reused during the whole life cycle of the executor hashJoinCtx := e.HashJoinCtxV2 for _, worker := range e.BuildWorkers { worker.builder = createRowTableBuilder(worker.BuildKeyColIdx, hashJoinCtx.BuildKeyTypes, hashJoinCtx.partitionNumber, worker.HasNullableKey, hashJoinCtx.BuildFilter != nil, hashJoinCtx.needScanRowTableAfterProbeDone, hashJoinCtx.hashTableMeta.nullMapLength) } srcChkCh := e.fetchBuildSideRows(ctx, fetcherAndWorkerSyncer, wg, errCh, doneCh) e.splitAndAppendToRowTable(srcChkCh, fetcherAndWorkerSyncer, wg, errCh, doneCh) success := waitJobDone(wg, errCh) if !success { return } if e.spillHelper.spillTriggered { e.spillHelper.spillTriggedInBuildingStageForTest = true } totalSegmentCnt, err := e.hashTableContext.mergeRowTablesToHashTable(e.partitionNumber, e.spillHelper) if err != nil { e.buildFinished <- err return } wg = new(sync.WaitGroup) errCh = make(chan error, 1+e.Concurrency) // doneCh is used by the consumer(buildHashTable) to info the producer(createBuildTasks) that the consumer meet error and stop consume data doneCh = make(chan struct{}, e.Concurrency) buildTaskCh := e.createBuildTasks(totalSegmentCnt, wg, errCh, doneCh) e.buildHashTable(buildTaskCh, wg, errCh, doneCh) waitJobDone(wg, errCh) } func (e *HashJoinV2Exec) fetchBuildSideRows(ctx context.Context, fetcherAndWorkerSyncer *sync.WaitGroup, wg *sync.WaitGroup, errCh chan error, doneCh chan struct{}) chan *chunk.Chunk { srcChkCh := make(chan *chunk.Chunk, 1) wg.Add(1) e.workerWg.RunWithRecover( func() { defer trace.StartRegion(ctx, "HashJoinBuildSideFetcher").End() if e.inRestore { chunkNum := e.getRestoredBuildChunkNum() e.controlWorkersForRestore(chunkNum, srcChkCh, fetcherAndWorkerSyncer, errCh, doneCh) } else { fetcher := e.BuildWorkers[0] fetcher.fetchBuildSideRows(ctx, &fetcher.HashJoinCtx.hashJoinCtxBase, fetcherAndWorkerSyncer, e.spillHelper, srcChkCh, errCh, doneCh) } }, func(r any) { if r != nil { errCh <- util.GetRecoverError(r) } wg.Done() }, ) return srcChkCh } func (e *HashJoinV2Exec) getRestoredBuildChunkNum() int { chunkNum := 0 for _, inDisk := range e.restoredBuildInDisk { chunkNum += inDisk.NumChunks() } return chunkNum } func (e *HashJoinV2Exec) controlWorkersForRestore(chunkNum int, syncCh chan *chunk.Chunk, fetcherAndWorkerSyncer *sync.WaitGroup, errCh chan<- error, doneCh <-chan struct{}) { defer func() { close(syncCh) hasError := false if r := recover(); r != nil { errCh <- util.GetRecoverError(r) hasError = true } fetcherAndWorkerSyncer.Wait() // Spill remaining rows if !hasError && e.spillHelper.isSpillTriggered() { err := e.spillHelper.spillRemainingRows() if err != nil { errCh <- err } } }() for range chunkNum { if e.finished.Load() { return } err := checkAndSpillRowTableIfNeeded(fetcherAndWorkerSyncer, e.spillHelper) if err != nil { errCh <- err return } err = triggerIntest(2) if err != nil { errCh <- err return } fetcherAndWorkerSyncer.Add(1) select { case <-doneCh: fetcherAndWorkerSyncer.Done() return case <-e.hashJoinCtxBase.closeCh: fetcherAndWorkerSyncer.Done() return case syncCh <- nil: } } } func handleErr(err error, errCh chan error, doneCh chan struct{}) { errCh <- err doneCh <- struct{}{} } func (e *HashJoinV2Exec) splitAndAppendToRowTable(srcChkCh chan *chunk.Chunk, fetcherAndWorkerSyncer *sync.WaitGroup, wg *sync.WaitGroup, errCh chan error, doneCh chan struct{}) { wg.Add(int(e.Concurrency)) for i := range e.Concurrency { workIndex := i e.workerWg.RunWithRecover( func() { if e.inRestore { e.BuildWorkers[workIndex].splitPartitionAndAppendToRowTableForRestore(e.restoredBuildInDisk[workIndex], srcChkCh, fetcherAndWorkerSyncer, errCh, doneCh) } else { e.BuildWorkers[workIndex].splitPartitionAndAppendToRowTable(e.SessCtx.GetSessionVars().StmtCtx.TypeCtx(), fetcherAndWorkerSyncer, srcChkCh, errCh, doneCh) } }, func(r any) { if r != nil { errCh <- util.GetRecoverError(r) doneCh <- struct{}{} } wg.Done() }, ) } } func (e *HashJoinV2Exec) createBuildTasks(totalSegmentCnt int, wg *sync.WaitGroup, errCh chan error, doneCh chan struct{}) chan *buildTask { buildTaskCh := make(chan *buildTask, e.Concurrency) wg.Add(1) e.workerWg.RunWithRecover( func() { e.createTasks(buildTaskCh, totalSegmentCnt, doneCh) }, func(r any) { if r != nil { errCh <- util.GetRecoverError(r) } close(buildTaskCh) wg.Done() }, ) return buildTaskCh } func (e *HashJoinV2Exec) buildHashTable(buildTaskCh chan *buildTask, wg *sync.WaitGroup, errCh chan error, doneCh chan struct{}) { for i := range e.Concurrency { wg.Add(1) workID := i e.workerWg.RunWithRecover( func() { err := e.BuildWorkers[workID].buildHashTable(buildTaskCh) if err != nil { errCh <- err doneCh <- struct{}{} } }, func(r any) { if r != nil { errCh <- util.GetRecoverError(r) doneCh <- struct{}{} } wg.Done() }, ) } } type buildTask struct { partitionIdx int segStartIdx int segEndIdx int } func generatePartitionIndex(hashValue uint64, partitionMaskOffset int) uint64 { return hashValue >> uint64(partitionMaskOffset) } func getProbeSpillChunkFieldTypes(probeFieldTypes []*types.FieldType) []*types.FieldType { ret := make([]*types.FieldType, 0, len(probeFieldTypes)+2) hashValueField := types.NewFieldType(mysql.TypeLonglong) hashValueField.AddFlag(mysql.UnsignedFlag) ret = append(ret, hashValueField) // hash value ret = append(ret, types.NewFieldType(mysql.TypeBit)) // serialized key ret = append(ret, probeFieldTypes...) // row data return ret } func rehash(oldHashValue uint64, rehashBuf []byte, hash hash.Hash64) uint64 { *(*uint64)(unsafe.Pointer(&rehashBuf[0])) = oldHashValue hash.Reset() hash.Write(rehashBuf) return hash.Sum64() } func issue59377Intest(err *error) { failpoint.Inject("Issue59377", func() { *err = errors.New("Random failpoint error is triggered") }) } func triggerIntest(errProbability int) error { failpoint.Inject("slowWorkers", func(val failpoint.Value) { if val.(bool) { num := rand.Intn(100000) if num < 2 { time.Sleep(time.Duration(num) * time.Millisecond) } } }) var err error failpoint.Inject("panicOrError", func(val failpoint.Value) { if val.(bool) { num := rand.Intn(100000) if num < errProbability/2 { panic("Random failpoint panic") } else if num < errProbability { err = errors.New("Random failpoint error is triggered") } } }) return err }