296 lines
8.0 KiB
Go
296 lines
8.0 KiB
Go
// Copyright 2024 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package join
|
|
|
|
import (
|
|
"unsafe"
|
|
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/sqlkiller"
|
|
)
|
|
|
|
type semiJoinProbe struct {
|
|
baseSemiJoin
|
|
}
|
|
|
|
func newSemiJoinProbe(base baseJoinProbe, isLeftSideBuild bool) *semiJoinProbe {
|
|
ret := &semiJoinProbe{
|
|
baseSemiJoin: *newBaseSemiJoin(base, isLeftSideBuild),
|
|
}
|
|
return ret
|
|
}
|
|
|
|
func (s *semiJoinProbe) InitForScanRowTable() {
|
|
if !s.isLeftSideBuild {
|
|
panic("should not reach here")
|
|
}
|
|
s.rowIter = commonInitForScanRowTable(&s.baseJoinProbe)
|
|
}
|
|
|
|
func (s *semiJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
|
|
err = s.baseJoinProbe.SetChunkForProbe(chk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.resetProbeState()
|
|
return nil
|
|
}
|
|
|
|
func (s *semiJoinProbe) SetRestoredChunkForProbe(chk *chunk.Chunk) error {
|
|
err := s.baseJoinProbe.SetRestoredChunkForProbe(chk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.resetProbeState()
|
|
return nil
|
|
}
|
|
|
|
func (s *semiJoinProbe) NeedScanRowTable() bool {
|
|
return s.isLeftSideBuild
|
|
}
|
|
|
|
func (s *semiJoinProbe) IsScanRowTableDone() bool {
|
|
if !s.isLeftSideBuild {
|
|
panic("should not reach here")
|
|
}
|
|
return s.rowIter.isEnd()
|
|
}
|
|
|
|
func (s *semiJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) *hashjoinWorkerResult {
|
|
if !s.isLeftSideBuild {
|
|
panic("should not reach here")
|
|
}
|
|
if joinResult.chk.IsFull() {
|
|
return joinResult
|
|
}
|
|
if s.rowIter == nil {
|
|
panic("scanRowTable before init")
|
|
}
|
|
s.nextCachedBuildRowIndex = 0
|
|
meta := s.ctx.hashTableMeta
|
|
insertedRows := 0
|
|
remainCap := joinResult.chk.RequiredRows() - joinResult.chk.NumRows()
|
|
for insertedRows < remainCap && !s.rowIter.isEnd() {
|
|
currentRow := s.rowIter.getValue()
|
|
if meta.isCurrentRowUsed(currentRow) {
|
|
// append build side of this row
|
|
s.appendBuildRowToCachedBuildRowsV1(0, currentRow, joinResult.chk, 0, false)
|
|
insertedRows++
|
|
}
|
|
s.rowIter.next()
|
|
}
|
|
err := checkSQLKiller(sqlKiller, "killedDuringProbe")
|
|
if err != nil {
|
|
joinResult.err = err
|
|
return joinResult
|
|
}
|
|
if s.nextCachedBuildRowIndex > 0 {
|
|
s.batchConstructBuildRows(joinResult.chk, 0, false)
|
|
}
|
|
return joinResult
|
|
}
|
|
|
|
func (s *semiJoinProbe) ResetProbe() {
|
|
s.rowIter = nil
|
|
s.baseJoinProbe.ResetProbe()
|
|
}
|
|
|
|
func (s *semiJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (ok bool, _ *hashjoinWorkerResult) {
|
|
if joinResult.chk.IsFull() {
|
|
return true, joinResult
|
|
}
|
|
|
|
joinedChk, remainCap, err := s.prepareForProbe(joinResult.chk)
|
|
if err != nil {
|
|
joinResult.err = err
|
|
return false, joinResult
|
|
}
|
|
|
|
hasOtherCondition := s.ctx.hasOtherCondition()
|
|
if s.isLeftSideBuild {
|
|
if hasOtherCondition {
|
|
err = s.probeForLeftSideBuildHasOtherCondition(joinedChk, sqlKiller)
|
|
} else {
|
|
err = s.probeForLeftSideBuildNoOtherCondition(sqlKiller)
|
|
}
|
|
} else {
|
|
if hasOtherCondition {
|
|
err = s.probeForRightSideBuildHasOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller)
|
|
} else {
|
|
err = s.probeForRightSideBuildNoOtherCondition(joinResult.chk, remainCap, sqlKiller)
|
|
}
|
|
}
|
|
if err != nil {
|
|
joinResult.err = err
|
|
return false, joinResult
|
|
}
|
|
return true, joinResult
|
|
}
|
|
|
|
func (s *semiJoinProbe) setIsMatchedRows() {
|
|
for i, res := range s.selected {
|
|
if !res {
|
|
continue
|
|
}
|
|
|
|
s.isMatchedRows[s.rowIndexInfos[i].probeRowIndex] = true
|
|
}
|
|
}
|
|
|
|
func (s *semiJoinProbe) probeForLeftSideBuildHasOtherCondition(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) (err error) {
|
|
err = s.concatenateProbeAndBuildRows(joinedChk, sqlKiller, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
meta := s.ctx.hashTableMeta
|
|
if joinedChk.NumRows() > 0 {
|
|
s.selected, err = expression.VectorizedFilter(s.ctx.SessCtx.GetExprCtx().GetEvalCtx(), s.ctx.SessCtx.GetSessionVars().EnableVectorizedExpression, s.ctx.OtherCondition, chunk.NewIterator4Chunk(joinedChk), s.selected)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for index, result := range s.selected {
|
|
if result {
|
|
meta.setUsedFlag(*(*unsafe.Pointer)(unsafe.Pointer(&s.rowIndexInfos[index].buildRowStart)))
|
|
}
|
|
}
|
|
}
|
|
|
|
if s.unFinishedProbeRowIdxQueue.IsEmpty() {
|
|
// To avoid `Previous chunk is not probed yet` error
|
|
s.currentProbeRow = s.chunkRows
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *semiJoinProbe) probeForLeftSideBuildNoOtherCondition(sqlKiller *sqlkiller.SQLKiller) (err error) {
|
|
meta := s.ctx.hashTableMeta
|
|
tagHelper := s.ctx.hashTableContext.tagHelper
|
|
|
|
loopCnt := 0
|
|
|
|
for s.currentProbeRow < s.chunkRows {
|
|
if s.matchedRowsHeaders[s.currentProbeRow] != 0 {
|
|
candidateRow := tagHelper.toUnsafePointer(s.matchedRowsHeaders[s.currentProbeRow])
|
|
if !meta.isCurrentRowUsedWithAtomic(candidateRow) {
|
|
if isKeyMatched(meta.keyMode, s.serializedKeys[s.currentProbeRow], candidateRow, meta) {
|
|
meta.setUsedFlag(candidateRow)
|
|
} else {
|
|
s.probeCollision++
|
|
}
|
|
}
|
|
s.matchedRowsHeaders[s.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, s.matchedRowsHashValue[s.currentProbeRow])
|
|
} else {
|
|
s.currentProbeRow++
|
|
}
|
|
|
|
loopCnt++
|
|
if loopCnt%2000 == 0 {
|
|
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *semiJoinProbe) produceResult(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) (err error) {
|
|
err = s.concatenateProbeAndBuildRows(joinedChk, sqlKiller, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if joinedChk.NumRows() > 0 {
|
|
s.selected = s.selected[:0]
|
|
s.selected, err = expression.VectorizedFilter(s.ctx.SessCtx.GetExprCtx().GetEvalCtx(), s.ctx.SessCtx.GetSessionVars().EnableVectorizedExpression, s.ctx.OtherCondition, chunk.NewIterator4Chunk(joinedChk), s.selected)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.setIsMatchedRows()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *semiJoinProbe) probeForRightSideBuildHasOtherCondition(chk, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
|
|
if !s.unFinishedProbeRowIdxQueue.IsEmpty() {
|
|
err = s.produceResult(joinedChk, sqlKiller)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.currentProbeRow = 0
|
|
}
|
|
|
|
if s.unFinishedProbeRowIdxQueue.IsEmpty() {
|
|
s.generateResultChkForRightBuildWithOtherCondition(remainCap, chk, s.isMatchedRows, true)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *semiJoinProbe) probeForRightSideBuildNoOtherCondition(chk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
|
|
meta := s.ctx.hashTableMeta
|
|
tagHelper := s.ctx.hashTableContext.tagHelper
|
|
|
|
if cap(s.offsets) == 0 {
|
|
s.offsets = make([]int, 0, remainCap)
|
|
}
|
|
|
|
s.offsets = s.offsets[:0]
|
|
|
|
for remainCap > 0 && s.currentProbeRow < s.chunkRows {
|
|
if s.matchedRowsHeaders[s.currentProbeRow] != 0 {
|
|
candidateRow := tagHelper.toUnsafePointer(s.matchedRowsHeaders[s.currentProbeRow])
|
|
if isKeyMatched(meta.keyMode, s.serializedKeys[s.currentProbeRow], candidateRow, meta) {
|
|
s.matchedRowsHeaders[s.currentProbeRow] = 0
|
|
s.offsets = append(s.offsets, s.usedRows[s.currentProbeRow])
|
|
remainCap--
|
|
s.currentProbeRow++
|
|
} else {
|
|
s.probeCollision++
|
|
s.matchedRowsHeaders[s.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, s.matchedRowsHashValue[s.currentProbeRow])
|
|
}
|
|
} else {
|
|
s.currentProbeRow++
|
|
}
|
|
}
|
|
|
|
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.generateResultChkForRightBuildNoOtherCondition(chk)
|
|
return
|
|
}
|
|
|
|
func (s *semiJoinProbe) IsCurrentChunkProbeDone() bool {
|
|
if s.ctx.hasOtherCondition() && !s.unFinishedProbeRowIdxQueue.IsEmpty() {
|
|
return false
|
|
}
|
|
return s.baseJoinProbe.IsCurrentChunkProbeDone()
|
|
}
|