236 lines
6.8 KiB
Go
236 lines
6.8 KiB
Go
// Copyright 2024 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package join
|
|
|
|
import (
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/sqlkiller"
|
|
)
|
|
|
|
type leftOuterSemiJoinProbe struct {
|
|
baseSemiJoin
|
|
|
|
// isNullRows marks whether the left side row matched result is null
|
|
isNullRows []bool
|
|
// isAnti marks whether the join is anti semi join
|
|
isAnti bool
|
|
}
|
|
|
|
var _ ProbeV2 = &leftOuterSemiJoinProbe{}
|
|
|
|
func newLeftOuterSemiJoinProbe(base baseJoinProbe, isAnti bool) *leftOuterSemiJoinProbe {
|
|
probe := &leftOuterSemiJoinProbe{
|
|
baseSemiJoin: *newBaseSemiJoin(base, false),
|
|
isAnti: isAnti,
|
|
}
|
|
return probe
|
|
}
|
|
|
|
func (j *leftOuterSemiJoinProbe) SetChunkForProbe(chunk *chunk.Chunk) (err error) {
|
|
err = j.baseJoinProbe.SetChunkForProbe(chunk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
j.resetProbeState()
|
|
return nil
|
|
}
|
|
|
|
func (j *leftOuterSemiJoinProbe) SetRestoredChunkForProbe(chunk *chunk.Chunk) (err error) {
|
|
err = j.baseJoinProbe.SetRestoredChunkForProbe(chunk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
j.resetProbeState()
|
|
return nil
|
|
}
|
|
|
|
func (j *leftOuterSemiJoinProbe) resetProbeState() {
|
|
j.isNullRows = j.isNullRows[:0]
|
|
for range j.chunkRows {
|
|
j.isNullRows = append(j.isNullRows, false)
|
|
}
|
|
j.baseSemiJoin.resetProbeState()
|
|
}
|
|
|
|
func (*leftOuterSemiJoinProbe) NeedScanRowTable() bool {
|
|
return false
|
|
}
|
|
|
|
func (*leftOuterSemiJoinProbe) IsScanRowTableDone() bool {
|
|
panic("should not reach here")
|
|
}
|
|
|
|
func (*leftOuterSemiJoinProbe) InitForScanRowTable() {
|
|
panic("should not reach here")
|
|
}
|
|
|
|
func (*leftOuterSemiJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, _ *sqlkiller.SQLKiller) *hashjoinWorkerResult {
|
|
return joinResult
|
|
}
|
|
|
|
func (j *leftOuterSemiJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (ok bool, _ *hashjoinWorkerResult) {
|
|
joinedChk, remainCap, err := j.prepareForProbe(joinResult.chk)
|
|
if err != nil {
|
|
joinResult.err = err
|
|
return false, joinResult
|
|
}
|
|
|
|
if j.ctx.hasOtherCondition() {
|
|
err = j.probeWithOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller)
|
|
} else {
|
|
err = j.probeWithoutOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller)
|
|
}
|
|
if err != nil {
|
|
joinResult.err = err
|
|
return false, joinResult
|
|
}
|
|
return true, joinResult
|
|
}
|
|
|
|
func (j *leftOuterSemiJoinProbe) probeWithOtherCondition(chk, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
|
|
if !j.unFinishedProbeRowIdxQueue.IsEmpty() {
|
|
err = j.produceResult(joinedChk, sqlKiller)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
j.currentProbeRow = 0
|
|
}
|
|
|
|
if j.unFinishedProbeRowIdxQueue.IsEmpty() {
|
|
startProbeRow := j.currentProbeRow
|
|
j.currentProbeRow = min(startProbeRow+remainCap, j.chunkRows)
|
|
j.buildResult(chk, startProbeRow)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (j *leftOuterSemiJoinProbe) produceResult(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) (err error) {
|
|
// The third parameter is always true, as left outer semi join now only support right side as build
|
|
err = j.concatenateProbeAndBuildRows(joinedChk, sqlKiller, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if joinedChk.NumRows() > 0 {
|
|
j.selected, j.isNulls, err = expression.VecEvalBool(j.ctx.SessCtx.GetExprCtx().GetEvalCtx(), j.ctx.SessCtx.GetSessionVars().EnableVectorizedExpression, j.ctx.OtherCondition, joinedChk, j.selected, j.isNulls)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for i := range joinedChk.NumRows() {
|
|
if j.selected[i] {
|
|
j.isMatchedRows[j.rowIndexInfos[i].probeRowIndex] = true
|
|
}
|
|
if j.isNulls[i] {
|
|
j.isNullRows[j.rowIndexInfos[i].probeRowIndex] = true
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (j *leftOuterSemiJoinProbe) probeWithoutOtherCondition(_, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
|
|
meta := j.ctx.hashTableMeta
|
|
startProbeRow := j.currentProbeRow
|
|
tagHelper := j.ctx.hashTableContext.tagHelper
|
|
|
|
for remainCap > 0 && j.currentProbeRow < j.chunkRows {
|
|
if j.matchedRowsHeaders[j.currentProbeRow] != 0 {
|
|
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
|
|
if !isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
|
|
j.probeCollision++
|
|
j.matchedRowsHeaders[j.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, j.matchedRowsHashValue[j.currentProbeRow])
|
|
continue
|
|
}
|
|
j.isMatchedRows[j.currentProbeRow] = true
|
|
}
|
|
j.matchedRowsHeaders[j.currentProbeRow] = 0
|
|
remainCap--
|
|
j.currentProbeRow++
|
|
}
|
|
|
|
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
j.buildResult(joinedChk, startProbeRow)
|
|
return nil
|
|
}
|
|
|
|
func (j *leftOuterSemiJoinProbe) buildResult(chk *chunk.Chunk, startProbeRow int) {
|
|
var selected []bool
|
|
if startProbeRow == 0 && j.currentProbeRow == j.chunkRows && j.currentChunk.Sel() == nil && chk.NumRows() == 0 && len(j.spilledIdx) == 0 {
|
|
// TODO: Can do a shallow copy by directly copying the Column pointers
|
|
for index, colIndex := range j.lUsed {
|
|
j.currentChunk.Column(colIndex).CopyConstruct(chk.Column(index))
|
|
}
|
|
} else {
|
|
selected = make([]bool, j.chunkRows)
|
|
for i := startProbeRow; i < j.currentProbeRow; i++ {
|
|
selected[i] = true
|
|
}
|
|
for _, spilledIdx := range j.spilledIdx {
|
|
selected[spilledIdx] = false // ignore spilled rows
|
|
}
|
|
for index, colIndex := range j.lUsed {
|
|
dstCol := chk.Column(index)
|
|
srcCol := j.currentChunk.Column(colIndex)
|
|
chunk.CopySelectedRowsWithRowIDFunc(dstCol, srcCol, selected, 0, len(selected), func(i int) int {
|
|
return j.usedRows[i]
|
|
})
|
|
}
|
|
}
|
|
|
|
if j.isAnti {
|
|
for i := startProbeRow; i < j.currentProbeRow; i++ {
|
|
if selected != nil && !selected[i] {
|
|
continue
|
|
}
|
|
if j.isMatchedRows[i] {
|
|
chk.AppendInt64(len(j.lUsed), 0)
|
|
} else if j.isNullRows[i] {
|
|
chk.AppendNull(len(j.lUsed))
|
|
} else {
|
|
chk.AppendInt64(len(j.lUsed), 1)
|
|
}
|
|
}
|
|
} else {
|
|
for i := startProbeRow; i < j.currentProbeRow; i++ {
|
|
if selected != nil && !selected[i] {
|
|
continue
|
|
}
|
|
if j.isMatchedRows[i] {
|
|
chk.AppendInt64(len(j.lUsed), 1)
|
|
} else if j.isNullRows[i] {
|
|
chk.AppendNull(len(j.lUsed))
|
|
} else {
|
|
chk.AppendInt64(len(j.lUsed), 0)
|
|
}
|
|
}
|
|
}
|
|
|
|
chk.SetNumVirtualRows(chk.NumRows())
|
|
}
|
|
|
|
func (j *leftOuterSemiJoinProbe) IsCurrentChunkProbeDone() bool {
|
|
if j.ctx.hasOtherCondition() && !j.unFinishedProbeRowIdxQueue.IsEmpty() {
|
|
return false
|
|
}
|
|
return j.baseJoinProbe.IsCurrentChunkProbeDone()
|
|
}
|