190 lines
6.0 KiB
Go
190 lines
6.0 KiB
Go
// Copyright 2024 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package join
|
|
|
|
import (
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/queue"
|
|
"github.com/pingcap/tidb/pkg/util/sqlkiller"
|
|
)
|
|
|
|
// The following described case has other condition.
|
|
// During the probe, when a probe matches one build row, we need to put the probe and build rows
|
|
// together and generate a new row. If one probe row could match n build row, then we will get
|
|
// n new rows. If n is very big, there will generate too much rows. In order to avoid this case
|
|
// we need to limit the max generated row number. This variable describe this max number.
|
|
// NOTE: Suppose probe chunk has n rows and n*maxMatchedRowNum << chunkRemainingCapacity.
|
|
// We will keep on join probe rows that have been matched before with build rows, though
|
|
// probe row with idx i may have produced `maxMatchedRowNum` number rows before. So that
|
|
// we can process as many rows as possible.
|
|
var maxMatchedRowNum = 4
|
|
|
|
type baseSemiJoin struct {
|
|
baseJoinProbe
|
|
isLeftSideBuild bool
|
|
|
|
// isMatchedRows marks whether the left side row is matched
|
|
// It's used only when right side is build side.
|
|
isMatchedRows []bool
|
|
|
|
isNulls []bool
|
|
|
|
// used when left side is build side
|
|
rowIter *rowIter
|
|
|
|
// used in other condition to record which rows need to be processed
|
|
unFinishedProbeRowIdxQueue *queue.Queue[int]
|
|
|
|
// Used for right side build without other condition in semi and anti semi join
|
|
offsets []int
|
|
}
|
|
|
|
func newBaseSemiJoin(base baseJoinProbe, isLeftSideBuild bool) *baseSemiJoin {
|
|
ret := &baseSemiJoin{
|
|
baseJoinProbe: base,
|
|
isLeftSideBuild: isLeftSideBuild,
|
|
isNulls: make([]bool, 0),
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
func (b *baseSemiJoin) resetProbeState() {
|
|
if !b.isLeftSideBuild {
|
|
b.isMatchedRows = b.isMatchedRows[:0]
|
|
for range b.chunkRows {
|
|
b.isMatchedRows = append(b.isMatchedRows, false)
|
|
}
|
|
}
|
|
|
|
if b.ctx.hasOtherCondition() {
|
|
if b.unFinishedProbeRowIdxQueue == nil {
|
|
b.unFinishedProbeRowIdxQueue = queue.NewQueue[int](b.chunkRows)
|
|
} else {
|
|
b.unFinishedProbeRowIdxQueue.ClearAndExpandIfNeed(b.chunkRows)
|
|
}
|
|
|
|
for i := range b.chunkRows {
|
|
if b.matchedRowsHeaders[i] != 0 {
|
|
b.unFinishedProbeRowIdxQueue.Push(i)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *baseSemiJoin) matchMultiBuildRows(joinedChk *chunk.Chunk, joinedChkRemainCap *int, isRightSideBuild bool) {
|
|
tagHelper := b.ctx.hashTableContext.tagHelper
|
|
meta := b.ctx.hashTableMeta
|
|
for b.matchedRowsHeaders[b.currentProbeRow] != 0 && *joinedChkRemainCap > 0 && b.matchedRowsForCurrentProbeRow < maxMatchedRowNum {
|
|
candidateRow := tagHelper.toUnsafePointer(b.matchedRowsHeaders[b.currentProbeRow])
|
|
if isRightSideBuild || !meta.isCurrentRowUsedWithAtomic(candidateRow) {
|
|
if isKeyMatched(meta.keyMode, b.serializedKeys[b.currentProbeRow], candidateRow, meta) {
|
|
b.appendBuildRowToCachedBuildRowsV1(b.currentProbeRow, candidateRow, joinedChk, 0, true)
|
|
b.matchedRowsForCurrentProbeRow++
|
|
*joinedChkRemainCap--
|
|
} else {
|
|
b.probeCollision++
|
|
}
|
|
}
|
|
|
|
b.matchedRowsHeaders[b.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, b.matchedRowsHashValue[b.currentProbeRow])
|
|
}
|
|
|
|
b.finishLookupCurrentProbeRow()
|
|
}
|
|
|
|
func (b *baseSemiJoin) concatenateProbeAndBuildRows(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller, isRightSideBuild bool) error {
|
|
joinedChkRemainCap := joinedChk.Capacity()
|
|
|
|
for joinedChkRemainCap > 0 && !b.unFinishedProbeRowIdxQueue.IsEmpty() {
|
|
probeRowIdx := b.unFinishedProbeRowIdxQueue.Pop()
|
|
if isRightSideBuild && b.isMatchedRows[probeRowIdx] {
|
|
continue
|
|
}
|
|
|
|
b.currentProbeRow = probeRowIdx
|
|
b.matchMultiBuildRows(joinedChk, &joinedChkRemainCap, isRightSideBuild)
|
|
|
|
if b.matchedRowsHeaders[probeRowIdx] == 0 {
|
|
continue
|
|
}
|
|
|
|
b.unFinishedProbeRowIdxQueue.Push(probeRowIdx)
|
|
}
|
|
|
|
err := checkSQLKiller(sqlKiller, "killedDuringProbe")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b.finishCurrentLookupLoop(joinedChk)
|
|
return nil
|
|
}
|
|
|
|
// Only used for semi and anti semi join
|
|
func (b *baseSemiJoin) generateResultChkForRightBuildNoOtherCondition(resultChk *chunk.Chunk) {
|
|
if len(b.offsets) == 0 {
|
|
return
|
|
}
|
|
|
|
for index, colIndex := range b.lUsed {
|
|
srcCol := b.currentChunk.Column(colIndex)
|
|
dstCol := resultChk.Column(index)
|
|
chunk.CopyRows(dstCol, srcCol, b.offsets)
|
|
}
|
|
|
|
if len(b.lUsed) == 0 {
|
|
resultChk.SetNumVirtualRows(resultChk.NumRows() + len(b.offsets))
|
|
} else {
|
|
resultChk.SetNumVirtualRows(resultChk.NumRows())
|
|
}
|
|
}
|
|
|
|
// Only used for semi and anti semi join
|
|
func (b *baseSemiJoin) generateResultChkForRightBuildWithOtherCondition(remainCap int, chk *chunk.Chunk, resultRows []bool, expectedResult bool) {
|
|
for remainCap > 0 && (b.currentProbeRow < b.chunkRows) {
|
|
rowNumToTryAppend := min(remainCap, b.chunkRows-b.currentProbeRow)
|
|
start := b.currentProbeRow
|
|
end := b.currentProbeRow + rowNumToTryAppend
|
|
|
|
for index, usedColIdx := range b.lUsed {
|
|
dstCol := chk.Column(index)
|
|
srcCol := b.currentChunk.Column(usedColIdx)
|
|
chunk.CopyExpectedRowsWithRowIDFunc(dstCol, srcCol, resultRows, expectedResult, start, end, func(i int) int {
|
|
return b.usedRows[i]
|
|
})
|
|
}
|
|
|
|
if len(b.lUsed) == 0 {
|
|
// For calculating virtual row num
|
|
virtualRowNum := chk.GetNumVirtualRows()
|
|
for i := start; i < end; i++ {
|
|
if resultRows[i] == expectedResult {
|
|
virtualRowNum++
|
|
}
|
|
}
|
|
|
|
// When `len(b.lUsed) == 0`, column number in chk is 0
|
|
// We need to manually calculate virtual row number.
|
|
chk.SetNumVirtualRows(virtualRowNum)
|
|
} else {
|
|
chk.SetNumVirtualRows(chk.NumRows())
|
|
}
|
|
|
|
b.currentProbeRow += rowNumToTryAppend
|
|
remainCap = chk.RequiredRows() - chk.NumRows()
|
|
}
|
|
}
|