Files
tidb/pkg/executor/join/left_outer_semi_join_probe.go
2025-05-07 14:32:26 +00:00

236 lines
6.8 KiB
Go

// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package join
import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
)
type leftOuterSemiJoinProbe struct {
baseSemiJoin
// isNullRows marks whether the left side row matched result is null
isNullRows []bool
// isAnti marks whether the join is anti semi join
isAnti bool
}
var _ ProbeV2 = &leftOuterSemiJoinProbe{}
func newLeftOuterSemiJoinProbe(base baseJoinProbe, isAnti bool) *leftOuterSemiJoinProbe {
probe := &leftOuterSemiJoinProbe{
baseSemiJoin: *newBaseSemiJoin(base, false),
isAnti: isAnti,
}
return probe
}
func (j *leftOuterSemiJoinProbe) SetChunkForProbe(chunk *chunk.Chunk) (err error) {
err = j.baseJoinProbe.SetChunkForProbe(chunk)
if err != nil {
return err
}
j.resetProbeState()
return nil
}
func (j *leftOuterSemiJoinProbe) SetRestoredChunkForProbe(chunk *chunk.Chunk) (err error) {
err = j.baseJoinProbe.SetRestoredChunkForProbe(chunk)
if err != nil {
return err
}
j.resetProbeState()
return nil
}
func (j *leftOuterSemiJoinProbe) resetProbeState() {
j.isNullRows = j.isNullRows[:0]
for range j.chunkRows {
j.isNullRows = append(j.isNullRows, false)
}
j.baseSemiJoin.resetProbeState()
}
func (*leftOuterSemiJoinProbe) NeedScanRowTable() bool {
return false
}
func (*leftOuterSemiJoinProbe) IsScanRowTableDone() bool {
panic("should not reach here")
}
func (*leftOuterSemiJoinProbe) InitForScanRowTable() {
panic("should not reach here")
}
func (*leftOuterSemiJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, _ *sqlkiller.SQLKiller) *hashjoinWorkerResult {
return joinResult
}
func (j *leftOuterSemiJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (ok bool, _ *hashjoinWorkerResult) {
joinedChk, remainCap, err := j.prepareForProbe(joinResult.chk)
if err != nil {
joinResult.err = err
return false, joinResult
}
if j.ctx.hasOtherCondition() {
err = j.probeWithOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller)
} else {
err = j.probeWithoutOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller)
}
if err != nil {
joinResult.err = err
return false, joinResult
}
return true, joinResult
}
func (j *leftOuterSemiJoinProbe) probeWithOtherCondition(chk, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
if !j.unFinishedProbeRowIdxQueue.IsEmpty() {
err = j.produceResult(joinedChk, sqlKiller)
if err != nil {
return err
}
j.currentProbeRow = 0
}
if j.unFinishedProbeRowIdxQueue.IsEmpty() {
startProbeRow := j.currentProbeRow
j.currentProbeRow = min(startProbeRow+remainCap, j.chunkRows)
j.buildResult(chk, startProbeRow)
}
return
}
func (j *leftOuterSemiJoinProbe) produceResult(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) (err error) {
// The third parameter is always true, as left outer semi join now only support right side as build
err = j.concatenateProbeAndBuildRows(joinedChk, sqlKiller, true)
if err != nil {
return err
}
if joinedChk.NumRows() > 0 {
j.selected, j.isNulls, err = expression.VecEvalBool(j.ctx.SessCtx.GetExprCtx().GetEvalCtx(), j.ctx.SessCtx.GetSessionVars().EnableVectorizedExpression, j.ctx.OtherCondition, joinedChk, j.selected, j.isNulls)
if err != nil {
return err
}
for i := range joinedChk.NumRows() {
if j.selected[i] {
j.isMatchedRows[j.rowIndexInfos[i].probeRowIndex] = true
}
if j.isNulls[i] {
j.isNullRows[j.rowIndexInfos[i].probeRowIndex] = true
}
}
}
return nil
}
func (j *leftOuterSemiJoinProbe) probeWithoutOtherCondition(_, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
meta := j.ctx.hashTableMeta
startProbeRow := j.currentProbeRow
tagHelper := j.ctx.hashTableContext.tagHelper
for remainCap > 0 && j.currentProbeRow < j.chunkRows {
if j.matchedRowsHeaders[j.currentProbeRow] != 0 {
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
if !isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
j.probeCollision++
j.matchedRowsHeaders[j.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, j.matchedRowsHashValue[j.currentProbeRow])
continue
}
j.isMatchedRows[j.currentProbeRow] = true
}
j.matchedRowsHeaders[j.currentProbeRow] = 0
remainCap--
j.currentProbeRow++
}
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
if err != nil {
return err
}
j.buildResult(joinedChk, startProbeRow)
return nil
}
func (j *leftOuterSemiJoinProbe) buildResult(chk *chunk.Chunk, startProbeRow int) {
var selected []bool
if startProbeRow == 0 && j.currentProbeRow == j.chunkRows && j.currentChunk.Sel() == nil && chk.NumRows() == 0 && len(j.spilledIdx) == 0 {
// TODO: Can do a shallow copy by directly copying the Column pointers
for index, colIndex := range j.lUsed {
j.currentChunk.Column(colIndex).CopyConstruct(chk.Column(index))
}
} else {
selected = make([]bool, j.chunkRows)
for i := startProbeRow; i < j.currentProbeRow; i++ {
selected[i] = true
}
for _, spilledIdx := range j.spilledIdx {
selected[spilledIdx] = false // ignore spilled rows
}
for index, colIndex := range j.lUsed {
dstCol := chk.Column(index)
srcCol := j.currentChunk.Column(colIndex)
chunk.CopySelectedRowsWithRowIDFunc(dstCol, srcCol, selected, 0, len(selected), func(i int) int {
return j.usedRows[i]
})
}
}
if j.isAnti {
for i := startProbeRow; i < j.currentProbeRow; i++ {
if selected != nil && !selected[i] {
continue
}
if j.isMatchedRows[i] {
chk.AppendInt64(len(j.lUsed), 0)
} else if j.isNullRows[i] {
chk.AppendNull(len(j.lUsed))
} else {
chk.AppendInt64(len(j.lUsed), 1)
}
}
} else {
for i := startProbeRow; i < j.currentProbeRow; i++ {
if selected != nil && !selected[i] {
continue
}
if j.isMatchedRows[i] {
chk.AppendInt64(len(j.lUsed), 1)
} else if j.isNullRows[i] {
chk.AppendNull(len(j.lUsed))
} else {
chk.AppendInt64(len(j.lUsed), 0)
}
}
}
chk.SetNumVirtualRows(chk.NumRows())
}
func (j *leftOuterSemiJoinProbe) IsCurrentChunkProbeDone() bool {
if j.ctx.hasOtherCondition() && !j.unFinishedProbeRowIdxQueue.IsEmpty() {
return false
}
return j.baseJoinProbe.IsCurrentChunkProbeDone()
}