Files
tidb/pkg/executor/join/base_semi_join.go
2025-05-07 14:32:26 +00:00

190 lines
6.0 KiB
Go

// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package join
import (
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/queue"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
)
// The following described case has other condition.
// During the probe, when a probe matches one build row, we need to put the probe and build rows
// together and generate a new row. If one probe row could match n build row, then we will get
// n new rows. If n is very big, there will generate too much rows. In order to avoid this case
// we need to limit the max generated row number. This variable describe this max number.
// NOTE: Suppose probe chunk has n rows and n*maxMatchedRowNum << chunkRemainingCapacity.
// We will keep on join probe rows that have been matched before with build rows, though
// probe row with idx i may have produced `maxMatchedRowNum` number rows before. So that
// we can process as many rows as possible.
var maxMatchedRowNum = 4
type baseSemiJoin struct {
baseJoinProbe
isLeftSideBuild bool
// isMatchedRows marks whether the left side row is matched
// It's used only when right side is build side.
isMatchedRows []bool
isNulls []bool
// used when left side is build side
rowIter *rowIter
// used in other condition to record which rows need to be processed
unFinishedProbeRowIdxQueue *queue.Queue[int]
// Used for right side build without other condition in semi and anti semi join
offsets []int
}
func newBaseSemiJoin(base baseJoinProbe, isLeftSideBuild bool) *baseSemiJoin {
ret := &baseSemiJoin{
baseJoinProbe: base,
isLeftSideBuild: isLeftSideBuild,
isNulls: make([]bool, 0),
}
return ret
}
func (b *baseSemiJoin) resetProbeState() {
if !b.isLeftSideBuild {
b.isMatchedRows = b.isMatchedRows[:0]
for range b.chunkRows {
b.isMatchedRows = append(b.isMatchedRows, false)
}
}
if b.ctx.hasOtherCondition() {
if b.unFinishedProbeRowIdxQueue == nil {
b.unFinishedProbeRowIdxQueue = queue.NewQueue[int](b.chunkRows)
} else {
b.unFinishedProbeRowIdxQueue.ClearAndExpandIfNeed(b.chunkRows)
}
for i := range b.chunkRows {
if b.matchedRowsHeaders[i] != 0 {
b.unFinishedProbeRowIdxQueue.Push(i)
}
}
}
}
func (b *baseSemiJoin) matchMultiBuildRows(joinedChk *chunk.Chunk, joinedChkRemainCap *int, isRightSideBuild bool) {
tagHelper := b.ctx.hashTableContext.tagHelper
meta := b.ctx.hashTableMeta
for b.matchedRowsHeaders[b.currentProbeRow] != 0 && *joinedChkRemainCap > 0 && b.matchedRowsForCurrentProbeRow < maxMatchedRowNum {
candidateRow := tagHelper.toUnsafePointer(b.matchedRowsHeaders[b.currentProbeRow])
if isRightSideBuild || !meta.isCurrentRowUsedWithAtomic(candidateRow) {
if isKeyMatched(meta.keyMode, b.serializedKeys[b.currentProbeRow], candidateRow, meta) {
b.appendBuildRowToCachedBuildRowsV1(b.currentProbeRow, candidateRow, joinedChk, 0, true)
b.matchedRowsForCurrentProbeRow++
*joinedChkRemainCap--
} else {
b.probeCollision++
}
}
b.matchedRowsHeaders[b.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, b.matchedRowsHashValue[b.currentProbeRow])
}
b.finishLookupCurrentProbeRow()
}
func (b *baseSemiJoin) concatenateProbeAndBuildRows(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller, isRightSideBuild bool) error {
joinedChkRemainCap := joinedChk.Capacity()
for joinedChkRemainCap > 0 && !b.unFinishedProbeRowIdxQueue.IsEmpty() {
probeRowIdx := b.unFinishedProbeRowIdxQueue.Pop()
if isRightSideBuild && b.isMatchedRows[probeRowIdx] {
continue
}
b.currentProbeRow = probeRowIdx
b.matchMultiBuildRows(joinedChk, &joinedChkRemainCap, isRightSideBuild)
if b.matchedRowsHeaders[probeRowIdx] == 0 {
continue
}
b.unFinishedProbeRowIdxQueue.Push(probeRowIdx)
}
err := checkSQLKiller(sqlKiller, "killedDuringProbe")
if err != nil {
return err
}
b.finishCurrentLookupLoop(joinedChk)
return nil
}
// Only used for semi and anti semi join
func (b *baseSemiJoin) generateResultChkForRightBuildNoOtherCondition(resultChk *chunk.Chunk) {
if len(b.offsets) == 0 {
return
}
for index, colIndex := range b.lUsed {
srcCol := b.currentChunk.Column(colIndex)
dstCol := resultChk.Column(index)
chunk.CopyRows(dstCol, srcCol, b.offsets)
}
if len(b.lUsed) == 0 {
resultChk.SetNumVirtualRows(resultChk.NumRows() + len(b.offsets))
} else {
resultChk.SetNumVirtualRows(resultChk.NumRows())
}
}
// Only used for semi and anti semi join
func (b *baseSemiJoin) generateResultChkForRightBuildWithOtherCondition(remainCap int, chk *chunk.Chunk, resultRows []bool, expectedResult bool) {
for remainCap > 0 && (b.currentProbeRow < b.chunkRows) {
rowNumToTryAppend := min(remainCap, b.chunkRows-b.currentProbeRow)
start := b.currentProbeRow
end := b.currentProbeRow + rowNumToTryAppend
for index, usedColIdx := range b.lUsed {
dstCol := chk.Column(index)
srcCol := b.currentChunk.Column(usedColIdx)
chunk.CopyExpectedRowsWithRowIDFunc(dstCol, srcCol, resultRows, expectedResult, start, end, func(i int) int {
return b.usedRows[i]
})
}
if len(b.lUsed) == 0 {
// For calculating virtual row num
virtualRowNum := chk.GetNumVirtualRows()
for i := start; i < end; i++ {
if resultRows[i] == expectedResult {
virtualRowNum++
}
}
// When `len(b.lUsed) == 0`, column number in chk is 0
// We need to manually calculate virtual row number.
chk.SetNumVirtualRows(virtualRowNum)
} else {
chk.SetNumVirtualRows(chk.NumRows())
}
b.currentProbeRow += rowNumToTryAppend
remainCap = chk.RequiredRows() - chk.NumRows()
}
}