Files
tidb/pkg/executor/join/semi_join_probe.go
2024-12-13 09:21:00 +00:00

296 lines
8.0 KiB
Go

// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package join
import (
"unsafe"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
)
type semiJoinProbe struct {
baseSemiJoin
}
func newSemiJoinProbe(base baseJoinProbe, isLeftSideBuild bool) *semiJoinProbe {
ret := &semiJoinProbe{
baseSemiJoin: *newBaseSemiJoin(base, isLeftSideBuild),
}
return ret
}
func (s *semiJoinProbe) InitForScanRowTable() {
if !s.isLeftSideBuild {
panic("should not reach here")
}
s.rowIter = commonInitForScanRowTable(&s.baseJoinProbe)
}
func (s *semiJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
err = s.baseJoinProbe.SetChunkForProbe(chk)
if err != nil {
return err
}
s.resetProbeState()
return nil
}
func (s *semiJoinProbe) SetRestoredChunkForProbe(chk *chunk.Chunk) error {
err := s.baseJoinProbe.SetRestoredChunkForProbe(chk)
if err != nil {
return err
}
s.resetProbeState()
return nil
}
func (s *semiJoinProbe) NeedScanRowTable() bool {
return s.isLeftSideBuild
}
func (s *semiJoinProbe) IsScanRowTableDone() bool {
if !s.isLeftSideBuild {
panic("should not reach here")
}
return s.rowIter.isEnd()
}
func (s *semiJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) *hashjoinWorkerResult {
if !s.isLeftSideBuild {
panic("should not reach here")
}
if joinResult.chk.IsFull() {
return joinResult
}
if s.rowIter == nil {
panic("scanRowTable before init")
}
s.nextCachedBuildRowIndex = 0
meta := s.ctx.hashTableMeta
insertedRows := 0
remainCap := joinResult.chk.RequiredRows() - joinResult.chk.NumRows()
for insertedRows < remainCap && !s.rowIter.isEnd() {
currentRow := s.rowIter.getValue()
if meta.isCurrentRowUsed(currentRow) {
// append build side of this row
s.appendBuildRowToCachedBuildRowsV1(0, currentRow, joinResult.chk, 0, false)
insertedRows++
}
s.rowIter.next()
}
err := checkSQLKiller(sqlKiller, "killedDuringProbe")
if err != nil {
joinResult.err = err
return joinResult
}
if s.nextCachedBuildRowIndex > 0 {
s.batchConstructBuildRows(joinResult.chk, 0, false)
}
return joinResult
}
func (s *semiJoinProbe) ResetProbe() {
s.rowIter = nil
s.baseJoinProbe.ResetProbe()
}
func (s *semiJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (ok bool, _ *hashjoinWorkerResult) {
if joinResult.chk.IsFull() {
return true, joinResult
}
joinedChk, remainCap, err := s.prepareForProbe(joinResult.chk)
if err != nil {
joinResult.err = err
return false, joinResult
}
hasOtherCondition := s.ctx.hasOtherCondition()
if s.isLeftSideBuild {
if hasOtherCondition {
err = s.probeForLeftSideBuildHasOtherCondition(joinedChk, sqlKiller)
} else {
err = s.probeForLeftSideBuildNoOtherCondition(sqlKiller)
}
} else {
if hasOtherCondition {
err = s.probeForRightSideBuildHasOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller)
} else {
err = s.probeForRightSideBuildNoOtherCondition(joinResult.chk, remainCap, sqlKiller)
}
}
if err != nil {
joinResult.err = err
return false, joinResult
}
return true, joinResult
}
func (s *semiJoinProbe) setIsMatchedRows() {
for i, res := range s.selected {
if !res {
continue
}
s.isMatchedRows[s.rowIndexInfos[i].probeRowIndex] = true
}
}
func (s *semiJoinProbe) probeForLeftSideBuildHasOtherCondition(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) (err error) {
err = s.concatenateProbeAndBuildRows(joinedChk, sqlKiller, false)
if err != nil {
return err
}
meta := s.ctx.hashTableMeta
if joinedChk.NumRows() > 0 {
s.selected, err = expression.VectorizedFilter(s.ctx.SessCtx.GetExprCtx().GetEvalCtx(), s.ctx.SessCtx.GetSessionVars().EnableVectorizedExpression, s.ctx.OtherCondition, chunk.NewIterator4Chunk(joinedChk), s.selected)
if err != nil {
return err
}
for index, result := range s.selected {
if result {
meta.setUsedFlag(*(*unsafe.Pointer)(unsafe.Pointer(&s.rowIndexInfos[index].buildRowStart)))
}
}
}
if s.unFinishedProbeRowIdxQueue.IsEmpty() {
// To avoid `Previous chunk is not probed yet` error
s.currentProbeRow = s.chunkRows
}
return
}
func (s *semiJoinProbe) probeForLeftSideBuildNoOtherCondition(sqlKiller *sqlkiller.SQLKiller) (err error) {
meta := s.ctx.hashTableMeta
tagHelper := s.ctx.hashTableContext.tagHelper
loopCnt := 0
for s.currentProbeRow < s.chunkRows {
if s.matchedRowsHeaders[s.currentProbeRow] != 0 {
candidateRow := tagHelper.toUnsafePointer(s.matchedRowsHeaders[s.currentProbeRow])
if !meta.isCurrentRowUsedWithAtomic(candidateRow) {
if isKeyMatched(meta.keyMode, s.serializedKeys[s.currentProbeRow], candidateRow, meta) {
meta.setUsedFlag(candidateRow)
} else {
s.probeCollision++
}
}
s.matchedRowsHeaders[s.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, s.matchedRowsHashValue[s.currentProbeRow])
} else {
s.currentProbeRow++
}
loopCnt++
if loopCnt%2000 == 0 {
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
if err != nil {
return err
}
}
}
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
if err != nil {
return err
}
return
}
func (s *semiJoinProbe) produceResult(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) (err error) {
err = s.concatenateProbeAndBuildRows(joinedChk, sqlKiller, true)
if err != nil {
return err
}
if joinedChk.NumRows() > 0 {
s.selected = s.selected[:0]
s.selected, err = expression.VectorizedFilter(s.ctx.SessCtx.GetExprCtx().GetEvalCtx(), s.ctx.SessCtx.GetSessionVars().EnableVectorizedExpression, s.ctx.OtherCondition, chunk.NewIterator4Chunk(joinedChk), s.selected)
if err != nil {
return err
}
s.setIsMatchedRows()
}
return nil
}
func (s *semiJoinProbe) probeForRightSideBuildHasOtherCondition(chk, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
if !s.unFinishedProbeRowIdxQueue.IsEmpty() {
err = s.produceResult(joinedChk, sqlKiller)
if err != nil {
return err
}
s.currentProbeRow = 0
}
if s.unFinishedProbeRowIdxQueue.IsEmpty() {
s.generateResultChkForRightBuildWithOtherCondition(remainCap, chk, s.isMatchedRows, true)
}
return
}
func (s *semiJoinProbe) probeForRightSideBuildNoOtherCondition(chk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
meta := s.ctx.hashTableMeta
tagHelper := s.ctx.hashTableContext.tagHelper
if cap(s.offsets) == 0 {
s.offsets = make([]int, 0, remainCap)
}
s.offsets = s.offsets[:0]
for remainCap > 0 && s.currentProbeRow < s.chunkRows {
if s.matchedRowsHeaders[s.currentProbeRow] != 0 {
candidateRow := tagHelper.toUnsafePointer(s.matchedRowsHeaders[s.currentProbeRow])
if isKeyMatched(meta.keyMode, s.serializedKeys[s.currentProbeRow], candidateRow, meta) {
s.matchedRowsHeaders[s.currentProbeRow] = 0
s.offsets = append(s.offsets, s.usedRows[s.currentProbeRow])
remainCap--
s.currentProbeRow++
} else {
s.probeCollision++
s.matchedRowsHeaders[s.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, s.matchedRowsHashValue[s.currentProbeRow])
}
} else {
s.currentProbeRow++
}
}
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
if err != nil {
return err
}
s.generateResultChkForRightBuildNoOtherCondition(chk)
return
}
func (s *semiJoinProbe) IsCurrentChunkProbeDone() bool {
if s.ctx.hasOtherCondition() && !s.unFinishedProbeRowIdxQueue.IsEmpty() {
return false
}
return s.baseJoinProbe.IsCurrentChunkProbeDone()
}