Files
tidb/pkg/executor/join/hash_table_v1.go
2025-12-10 09:40:20 +00:00

721 lines
26 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package join
import (
"hash"
"hash/fnv"
"sync/atomic"
"time"
"unsafe"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/bitmap"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/disk"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/memory"
)
// HashContext keeps the needed hash context of a db table in hash join.
type HashContext struct {
// AllTypes one-to-one correspondence with KeyColIdx
AllTypes []*types.FieldType
KeyColIdx []int
NaKeyColIdx []int
Buf []byte
HashVals []hash.Hash64
HasNull []bool
naHasNull []bool
naColNullBitMap []*bitmap.ConcurrentBitmap
}
// InitHash init HashContext
func (hc *HashContext) InitHash(rows int) {
if hc.Buf == nil {
hc.Buf = make([]byte, 1)
}
if len(hc.HashVals) < rows {
hc.HasNull = make([]bool, rows)
hc.HashVals = make([]hash.Hash64, rows)
for i := range rows {
hc.HashVals[i] = fnv.New64()
}
} else {
for i := range rows {
hc.HasNull[i] = false
hc.HashVals[i].Reset()
}
}
if len(hc.NaKeyColIdx) > 0 {
// isNAAJ
if len(hc.naColNullBitMap) < rows {
hc.naHasNull = make([]bool, rows)
hc.naColNullBitMap = make([]*bitmap.ConcurrentBitmap, rows)
for i := range rows {
hc.naColNullBitMap[i] = bitmap.NewConcurrentBitmap(len(hc.NaKeyColIdx))
}
} else {
for i := range rows {
hc.naHasNull[i] = false
hc.naColNullBitMap[i].Reset(len(hc.NaKeyColIdx))
}
}
}
}
type hashNANullBucket struct {
entries []*naEntry
}
// hashRowContainer handles the rows and the hash map of a table.
// NOTE: a hashRowContainer may be shallow copied by the invoker, define all the
// member attributes as pointer type to avoid unexpected problems.
type hashRowContainer struct {
sc *stmtctx.StatementContext
hCtx *HashContext
stat *hashStatistic
// hashTable stores the map of hashKey and RowPtr
hashTable BaseHashTable
// hashNANullBucket stores the rows with any null value in NAAJ join key columns.
// After build process, NANUllBucket is read only here for multi probe worker.
hashNANullBucket *hashNANullBucket
rowContainer *chunk.RowContainer
memTracker *memory.Tracker
// chkBuf buffer the data reads from the disk if rowContainer is spilled.
chkBuf *chunk.Chunk
chkBufSizeForOneProbe int64
}
func newHashRowContainer(sCtx sessionctx.Context, hCtx *HashContext, allTypes []*types.FieldType) *hashRowContainer {
maxChunkSize := sCtx.GetSessionVars().MaxChunkSize
rc := chunk.NewRowContainer(allTypes, maxChunkSize)
c := &hashRowContainer{
sc: sCtx.GetSessionVars().StmtCtx,
hCtx: hCtx,
stat: new(hashStatistic),
hashTable: NewConcurrentMapHashTable(),
rowContainer: rc,
memTracker: memory.NewTracker(memory.LabelForRowContainer, -1),
}
if isNAAJ := len(hCtx.NaKeyColIdx) > 0; isNAAJ {
c.hashNANullBucket = &hashNANullBucket{}
}
rc.GetMemTracker().AttachTo(c.GetMemTracker())
return c
}
func (c *hashRowContainer) ShallowCopy() *hashRowContainer {
newHRC := *c
newHRC.rowContainer = c.rowContainer.ShallowCopyWithNewMutex()
// multi hashRowContainer ref to one single NA-NULL bucket slice.
// newHRC.hashNANullBucket = c.hashNANullBucket
return &newHRC
}
// GetMatchedRows get matched rows from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
func (c *hashRowContainer) GetMatchedRows(probeKey uint64, probeRow chunk.Row, hCtx *HashContext, matched []chunk.Row) ([]chunk.Row, error) {
matchedRows, _, err := c.GetMatchedRowsAndPtrs(probeKey, probeRow, hCtx, matched, nil, false)
return matchedRows, err
}
// GetOneMatchedRow get one matched rows from probeRow.
func (c *hashRowContainer) GetOneMatchedRow(probeKey uint64, probeRow chunk.Row, hCtx *HashContext) (*chunk.Row, error) {
var err error
innerEntry := c.hashTable.Get(probeKey)
if innerEntry == nil {
return nil, err
}
var matchedRow chunk.Row
if c.chkBuf != nil {
c.chkBuf.Reset()
}
capacity := 0
for i := 0; innerEntry != nil; i, innerEntry = i+1, innerEntry.Next {
ptr := innerEntry.Ptr
matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunkIfInDisk(ptr, c.chkBuf)
if err != nil {
return nil, err
}
var ok bool
ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx)
if err != nil {
return nil, err
}
if ok {
return &matchedRow, nil
}
atomic.AddInt64(&c.stat.probeCollision, 1)
if i == 0 {
capacity = max(c.chkBuf.Capacity(), 128)
} else if (i+1)%capacity == 0 {
c.chkBuf.Reset()
}
}
return nil, err
}
func (c *hashRowContainer) GetAllMatchedRows(probeHCtx *HashContext, probeSideRow chunk.Row,
probeKeyNullBits *bitmap.ConcurrentBitmap, matched []chunk.Row, needCheckBuildColPos, needCheckProbeColPos []int, needCheckBuildTypes, needCheckProbeTypes []*types.FieldType) ([]chunk.Row, error) {
// for NAAJ probe row with null, we should match them with all build rows.
var (
ok bool
err error
innerPtrs []chunk.RowPtr
)
c.hashTable.Iter(
func(_ uint64, e *entry) {
entryAddr := e
for entryAddr != nil {
innerPtrs = append(innerPtrs, entryAddr.Ptr)
entryAddr = entryAddr.Next
}
})
matched = matched[:0]
if len(innerPtrs) == 0 {
return matched, nil
}
// all built bucket rows come from hash table, their bitmap are all nil (doesn't contain any null). so
// we could only use the probe null bits to filter valid rows.
if probeKeyNullBits != nil && len(probeHCtx.NaKeyColIdx) > 1 {
// if len(probeHCtx.NaKeyColIdx)=1
// that means the NA-Join probe key is directly a (null) <-> (fetch all buckets), nothing to do.
// else like
// (null, 1, 2), we should use the not-null probe bit to filter rows. Only fetch rows like
// ( ? , 1, 2), that exactly with value as 1 and 2 in the second and third join key column.
needCheckProbeColPos = needCheckProbeColPos[:0]
needCheckBuildColPos = needCheckBuildColPos[:0]
needCheckBuildTypes = needCheckBuildTypes[:0]
needCheckProbeTypes = needCheckProbeTypes[:0]
keyColLen := len(c.hCtx.NaKeyColIdx)
for i := range keyColLen {
// since all bucket is from hash table (Not Null), so the buildSideNullBits check is eliminated.
if probeKeyNullBits.UnsafeIsSet(i) {
continue
}
needCheckBuildColPos = append(needCheckBuildColPos, c.hCtx.NaKeyColIdx[i])
needCheckBuildTypes = append(needCheckBuildTypes, c.hCtx.AllTypes[i])
needCheckProbeColPos = append(needCheckProbeColPos, probeHCtx.NaKeyColIdx[i])
needCheckProbeTypes = append(needCheckProbeTypes, probeHCtx.AllTypes[i])
}
}
var mayMatchedRow chunk.Row
for _, ptr := range innerPtrs {
mayMatchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunkIfInDisk(ptr, c.chkBuf)
if err != nil {
return nil, err
}
if probeKeyNullBits != nil && len(probeHCtx.NaKeyColIdx) > 1 {
// check the idxs-th value of the join columns.
ok, err = codec.EqualChunkRow(c.sc.TypeCtx(), mayMatchedRow, needCheckBuildTypes, needCheckBuildColPos, probeSideRow, needCheckProbeTypes, needCheckProbeColPos)
if err != nil {
return nil, err
}
if !ok {
continue
}
// once ok. just append the (maybe) valid build row for latter other conditions check if any.
}
matched = append(matched, mayMatchedRow)
}
return matched, nil
}
// signalCheckpointForJoinMask indicates the times of row probe that a signal detection will be triggered.
const signalCheckpointForJoinMask int = 1<<17 - 1
// rowSize is the size of Row.
const rowSize = int64(unsafe.Sizeof(chunk.Row{}))
// rowPtrSize is the size of RowPtr.
const rowPtrSize = int64(unsafe.Sizeof(chunk.RowPtr{}))
// GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *HashContext, matched []chunk.Row, matchedPtrs []chunk.RowPtr, needPtr bool) ([]chunk.Row, []chunk.RowPtr, error) {
var err error
entry := c.hashTable.Get(probeKey)
var innerPtrs []chunk.RowPtr
for ; entry != nil; entry = entry.Next {
innerPtrs = append(innerPtrs, entry.Ptr)
}
if len(innerPtrs) == 0 {
return nil, nil, err
}
matched = matched[:0]
var matchedRow chunk.Row
matchedPtrs = matchedPtrs[:0]
// Some variables used for memTracker.
var (
matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize
needTrackMemUsage = cap(innerPtrs) > signalCheckpointForJoinMask
lastChunkBufPointer = c.chkBuf
memDelta int64
)
c.memTracker.Consume(-c.chkBufSizeForOneProbe)
defer func() { c.memTracker.Consume(memDelta) }()
if needTrackMemUsage {
c.memTracker.Consume(int64(cap(innerPtrs)) * rowPtrSize)
defer c.memTracker.Consume(-int64(cap(innerPtrs)) * rowPtrSize)
}
c.chkBufSizeForOneProbe = 0
for i, ptr := range innerPtrs {
matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunkIfInDisk(ptr, c.chkBuf)
if err != nil {
return nil, nil, err
}
var ok bool
ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx)
if err != nil {
return nil, nil, err
}
if c.chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil {
lastChunkSize := lastChunkBufPointer.MemoryUsage()
c.chkBufSizeForOneProbe += lastChunkSize
memDelta += lastChunkSize
}
lastChunkBufPointer = c.chkBuf
if needTrackMemUsage && (i&signalCheckpointForJoinMask == signalCheckpointForJoinMask) {
// Trigger Consume for checking the OOM Action signal
memDelta += int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize - matchedDataSize
matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize
c.memTracker.Consume(memDelta + 1)
memDelta = 0
}
if !ok {
atomic.AddInt64(&c.stat.probeCollision, 1)
continue
}
matched = append(matched, matchedRow)
if needPtr {
matchedPtrs = append(matchedPtrs, ptr)
}
}
return matched, matchedPtrs, err
}
func (c *hashRowContainer) GetNullBucketRows(probeHCtx *HashContext, probeSideRow chunk.Row,
probeKeyNullBits *bitmap.ConcurrentBitmap, matched []chunk.Row, needCheckBuildColPos, needCheckProbeColPos []int, needCheckBuildTypes, needCheckProbeTypes []*types.FieldType) ([]chunk.Row, error) {
var (
ok bool
err error
mayMatchedRow chunk.Row
)
matched = matched[:0]
for _, nullEntry := range c.hashNANullBucket.entries {
mayMatchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunkIfInDisk(nullEntry.ptr, c.chkBuf)
if err != nil {
return nil, err
}
// since null bucket is a unified bucket. cases like below:
// case1: left side (probe side) has null
// left side key <1,null>, actually we can fetch all bucket <1, ?> and filter 1 at the first join key, once
// got a valid right row after other condition, then we can just return.
// case2: left side (probe side) don't have null
// left side key <1, 2>, actually we should fetch <1,null>, <null, 2>, <null, null> from the null bucket because
// case like <3,null> is obviously not matched with the probe key.
needCheckProbeColPos = needCheckProbeColPos[:0]
needCheckBuildColPos = needCheckBuildColPos[:0]
needCheckBuildTypes = needCheckBuildTypes[:0]
needCheckProbeTypes = needCheckProbeTypes[:0]
keyColLen := len(c.hCtx.NaKeyColIdx)
if probeKeyNullBits != nil {
// when the probeKeyNullBits is not nil, it means the probe key has null values, where we should distinguish
// whether is empty set or not. In other words, we should fetch at least a valid from the null bucket here.
// for values at the same index of the join key in which they are both not null, the values should be exactly the same.
//
// step: probeKeyNullBits & buildKeyNullBits, for those bits with 0, we should check if both values are the same.
// we can just use the UnsafeIsSet here, because insert action of the build side has all finished.
//
// 1 0 1 0 means left join key : null ? null ?
// 1 0 0 0 means right join key : null ? ? ?
// ---------------------------------------------
// left & right: 1 0 1 0: just do the explicit column value check for whose bit is 0. (means no null from both side)
for i := range keyColLen {
if probeKeyNullBits.UnsafeIsSet(i) || nullEntry.nullBitMap.UnsafeIsSet(i) {
continue
}
needCheckBuildColPos = append(needCheckBuildColPos, c.hCtx.NaKeyColIdx[i])
needCheckBuildTypes = append(needCheckBuildTypes, c.hCtx.AllTypes[i])
needCheckProbeColPos = append(needCheckProbeColPos, probeHCtx.NaKeyColIdx[i])
needCheckProbeTypes = append(needCheckProbeTypes, probeHCtx.AllTypes[i])
}
// check the idxs-th value of the join columns.
ok, err = codec.EqualChunkRow(c.sc.TypeCtx(), mayMatchedRow, needCheckBuildTypes, needCheckBuildColPos, probeSideRow, needCheckProbeTypes, needCheckProbeColPos)
if err != nil {
return nil, err
}
if !ok {
continue
}
} else {
// when the probeKeyNullBits is nil, it means the probe key is not null. But in the process of matching the null bucket,
// we still need to do the non-null (explicit) value check.
//
// eg: the probe key is <1,2>, we only get <2, null> in the null bucket, even we can take the null as a wildcard symbol,
// the first value of this two tuple is obviously not a match. So we need filter it here.
for i := range keyColLen {
if nullEntry.nullBitMap.UnsafeIsSet(i) {
continue
}
needCheckBuildColPos = append(needCheckBuildColPos, c.hCtx.NaKeyColIdx[i])
needCheckBuildTypes = append(needCheckBuildTypes, c.hCtx.AllTypes[i])
needCheckProbeColPos = append(needCheckProbeColPos, probeHCtx.NaKeyColIdx[i])
needCheckProbeTypes = append(needCheckProbeTypes, probeHCtx.AllTypes[i])
}
// check the idxs-th value of the join columns.
ok, err = codec.EqualChunkRow(c.sc.TypeCtx(), mayMatchedRow, needCheckBuildTypes, needCheckBuildColPos, probeSideRow, needCheckProbeTypes, needCheckProbeColPos)
if err != nil {
return nil, err
}
if !ok {
continue
}
}
// once ok. just append the (maybe) valid build row for latter other conditions check if any.
matched = append(matched, mayMatchedRow)
}
return matched, err
}
// matchJoinKey checks if join keys of buildRow and probeRow are logically equal.
func (c *hashRowContainer) matchJoinKey(buildRow, probeRow chunk.Row, probeHCtx *HashContext) (ok bool, err error) {
if len(c.hCtx.NaKeyColIdx) > 0 {
return codec.EqualChunkRow(c.sc.TypeCtx(),
buildRow, c.hCtx.AllTypes, c.hCtx.NaKeyColIdx,
probeRow, probeHCtx.AllTypes, probeHCtx.NaKeyColIdx)
}
return codec.EqualChunkRow(c.sc.TypeCtx(),
buildRow, c.hCtx.AllTypes, c.hCtx.KeyColIdx,
probeRow, probeHCtx.AllTypes, probeHCtx.KeyColIdx)
}
// AlreadySpilledSafeForTest indicates that records have spilled out into disk. It's thread-safe.
// nolint: unused
func (c *hashRowContainer) AlreadySpilledSafeForTest() bool {
return c.rowContainer.AlreadySpilledSafeForTest()
}
// PutChunk puts a chunk into hashRowContainer and build hash map. It's not thread-safe.
// key of hash table: hash value of key columns
// value of hash table: RowPtr of the corresponded row
func (c *hashRowContainer) PutChunk(chk *chunk.Chunk, ignoreNulls []bool) error {
return c.PutChunkSelected(chk, nil, ignoreNulls)
}
// PutChunkSelected selectively puts a chunk into hashRowContainer and build hash map. It's not thread-safe.
// key of hash table: hash value of key columns
// value of hash table: RowPtr of the corresponded Row
func (c *hashRowContainer) PutChunkSelected(chk *chunk.Chunk, selected, ignoreNulls []bool) error {
start := time.Now()
defer func() { c.stat.buildTableElapse += time.Since(start) }()
chkIdx := uint32(c.rowContainer.NumChunks())
err := c.rowContainer.Add(chk)
if err != nil {
return err
}
numRows := chk.NumRows()
c.hCtx.InitHash(numRows)
hCtx := c.hCtx
// By now, the combination of 1 and 2 can't take a run at same time.
// 1: write the row data of join key to hashVals. (normal EQ key should ignore the null values.) null-EQ for Except statement is an exception.
for keyIdx, colIdx := range c.hCtx.KeyColIdx {
ignoreNull := len(ignoreNulls) > keyIdx && ignoreNulls[keyIdx]
err := codec.HashChunkSelected(c.sc.TypeCtx(), hCtx.HashVals, chk, hCtx.AllTypes[keyIdx], colIdx, hCtx.Buf, hCtx.HasNull, selected, ignoreNull)
if err != nil {
return errors.Trace(err)
}
}
// 2: write the row data of NA join key to hashVals. (NA EQ key should collect all rows including null value as one bucket.)
isNAAJ := len(c.hCtx.NaKeyColIdx) > 0
hasNullMark := make([]bool, len(hCtx.HasNull))
for keyIdx, colIdx := range c.hCtx.NaKeyColIdx {
// NAAJ won't ignore any null values, but collect them as one hash bucket.
err := codec.HashChunkSelected(c.sc.TypeCtx(), hCtx.HashVals, chk, hCtx.AllTypes[keyIdx], colIdx, hCtx.Buf, hCtx.HasNull, selected, false)
if err != nil {
return errors.Trace(err)
}
// todo: we can collect the bitmap in codec.HashChunkSelected to avoid loop here, but the params modification is quite big.
// after fetch one NA column, collect the null value to null bitmap for every row. (use hasNull flag to accelerate)
// eg: if a NA Join cols is (a, b, c), for every build row here we maintained a 3-bit map to mark which column are null for them.
for rowIdx := range numRows {
if hCtx.HasNull[rowIdx] {
hCtx.naColNullBitMap[rowIdx].UnsafeSet(keyIdx)
// clean and try fetch Next NA join col.
hCtx.HasNull[rowIdx] = false
// just a mark variable for whether there is a null in at least one NA join column.
hasNullMark[rowIdx] = true
}
}
}
for i := range numRows {
if isNAAJ {
if selected != nil && !selected[i] {
continue
}
if hasNullMark[i] {
// collect the null rows to slice.
rowPtr := chunk.RowPtr{ChkIdx: chkIdx, RowIdx: uint32(i)}
// do not directly ref the null bits map here, because the bit map will be reset and reused in next batch of chunk data.
c.hashNANullBucket.entries = append(c.hashNANullBucket.entries, &naEntry{rowPtr, c.hCtx.naColNullBitMap[i].Clone()})
} else {
// insert the not-null rows to hash table.
key := c.hCtx.HashVals[i].Sum64()
rowPtr := chunk.RowPtr{ChkIdx: chkIdx, RowIdx: uint32(i)}
c.hashTable.Put(key, rowPtr)
}
} else {
if (selected != nil && !selected[i]) || c.hCtx.HasNull[i] {
continue
}
key := c.hCtx.HashVals[i].Sum64()
rowPtr := chunk.RowPtr{ChkIdx: chkIdx, RowIdx: uint32(i)}
c.hashTable.Put(key, rowPtr)
}
}
c.GetMemTracker().Consume(c.hashTable.GetAndCleanMemoryDelta())
return nil
}
// NumChunks returns the number of chunks in the RowContainer
func (c *hashRowContainer) NumChunks() int {
return c.rowContainer.NumChunks()
}
// NumRowsOfChunk returns the number of rows of a chunk
func (c *hashRowContainer) NumRowsOfChunk(chkID int) int {
return c.rowContainer.NumRowsOfChunk(chkID)
}
// GetChunk returns chkIdx th chunk of in memory records, only works if RowContainer is not spilled
func (c *hashRowContainer) GetChunk(chkIdx int) (*chunk.Chunk, error) {
return c.rowContainer.GetChunk(chkIdx)
}
// GetRow returns the Row the Ptr pointed to in the RowContainer
func (c *hashRowContainer) GetRow(ptr chunk.RowPtr) (chunk.Row, error) {
return c.rowContainer.GetRow(ptr)
}
// Len returns number of records in the hash table.
func (c *hashRowContainer) Len() uint64 {
return c.hashTable.Len()
}
func (c *hashRowContainer) Close() error {
failpoint.Inject("issue60926", nil)
defer c.memTracker.Detach()
c.chkBuf = nil
return c.rowContainer.Close()
}
// GetMemTracker returns the underlying memory usage tracker in hashRowContainer.
func (c *hashRowContainer) GetMemTracker() *memory.Tracker { return c.memTracker }
// GetDiskTracker returns the underlying disk usage tracker in hashRowContainer.
func (c *hashRowContainer) GetDiskTracker() *disk.Tracker { return c.rowContainer.GetDiskTracker() }
// ActionSpill returns a memory.ActionOnExceed for spilling over to disk.
func (c *hashRowContainer) ActionSpill() memory.ActionOnExceed {
return c.rowContainer.ActionSpill()
}
const (
initialEntrySliceLen = 64
maxEntrySliceLen = 8192
)
type entry struct {
Ptr chunk.RowPtr
Next *entry
}
type naEntry struct {
ptr chunk.RowPtr
nullBitMap *bitmap.ConcurrentBitmap
}
type entryStore struct {
slices [][]entry
cursor int
}
func newEntryStore() *entryStore {
es := new(entryStore)
es.slices = [][]entry{make([]entry, initialEntrySliceLen)}
es.cursor = 0
return es
}
func (es *entryStore) GetStore() (e *entry, memDelta int64) {
sliceIdx := uint32(len(es.slices) - 1)
slice := es.slices[sliceIdx]
if es.cursor >= cap(slice) {
size := min(cap(slice)*2, maxEntrySliceLen)
slice = make([]entry, size)
es.slices = append(es.slices, slice)
sliceIdx++
es.cursor = 0
memDelta = int64(unsafe.Sizeof(entry{})) * int64(size)
}
e = &es.slices[sliceIdx][es.cursor]
es.cursor++
return
}
// BaseHashTable is the interface of the hash table used in hash join
type BaseHashTable interface {
Put(hashKey uint64, rowPtr chunk.RowPtr)
// e := Get(hashKey)
// for ; e != nil; e = e.Next {
// rowPtr := e.Ptr
// ...
// }
Get(hashKey uint64) *entry
Len() uint64
// GetAndCleanMemoryDelta gets and cleans the memDelta of the BaseHashTable. Memory delta will be cleared after each fetch.
// It indicates the memory delta of the BaseHashTable since the last calling GetAndCleanMemoryDelta().
GetAndCleanMemoryDelta() int64
Iter(func(uint64, *entry))
}
// TODO (fangzhuhe) remove unsafeHashTable later if it not used anymore
// unsafeHashTable stores multiple rowPtr of rows for a given key with minimum GC overhead.
// A given key can store multiple values.
// It is not thread-safe, should only be used in one goroutine.
type unsafeHashTable struct {
entryStore *entryStore
length uint64
hashMap hack.MemAwareMap[uint64, *entry]
memDelta int64 // the memory delta of the unsafeHashTable since the last calling GetAndCleanMemoryDelta()
}
// newUnsafeHashTable creates a new unsafeHashTable. estCount means the estimated size of the hashMap.
// If unknown, set it to 0.
func newUnsafeHashTable(estCount int) *unsafeHashTable {
ht := &unsafeHashTable{}
ht.hashMap.Init(make(map[uint64]*entry, estCount))
ht.entryStore = newEntryStore()
return ht
}
// Put puts the key/rowPtr pairs to the unsafeHashTable, multiple rowPtrs are stored in a list.
func (ht *unsafeHashTable) Put(hashKey uint64, rowPtr chunk.RowPtr) {
oldEntry := ht.hashMap.M[hashKey]
newEntry, memDelta := ht.entryStore.GetStore()
newEntry.Ptr = rowPtr
newEntry.Next = oldEntry
memDelta += ht.hashMap.Set(hashKey, newEntry)
ht.length++
ht.memDelta += memDelta
}
// Get gets the values of the "key" and appends them to "values".
func (ht *unsafeHashTable) Get(hashKey uint64) *entry {
entryAddr := ht.hashMap.M[hashKey]
return entryAddr
}
// Len returns the number of rowPtrs in the unsafeHashTable, the number of keys may be less than Len
// if the same key is put more than once.
func (ht *unsafeHashTable) Len() uint64 { return ht.length }
// GetAndCleanMemoryDelta gets and cleans the memDelta of the unsafeHashTable.
func (ht *unsafeHashTable) GetAndCleanMemoryDelta() int64 {
memDelta := ht.memDelta
ht.memDelta = 0
return memDelta
}
func (ht *unsafeHashTable) Iter(traverse func(key uint64, e *entry)) {
for k, entryAddr := range ht.hashMap.M {
traverse(k, entryAddr)
}
}
// concurrentMapHashTable is a concurrent hash table built on concurrentMap
type concurrentMapHashTable struct {
hashMap concurrentMap
entryStore *entryStore
length uint64
memDelta int64 // the memory delta of the concurrentMapHashTable since the last calling GetAndCleanMemoryDelta()
}
// NewConcurrentMapHashTable creates a concurrentMapHashTable
func NewConcurrentMapHashTable() *concurrentMapHashTable {
ht := &concurrentMapHashTable{}
ht.hashMap = newConcurrentMap()
ht.entryStore = newEntryStore()
ht.length = 0
ht.memDelta = int64(unsafe.Sizeof(concurrentMapHashTable{})) + int64(len(ht.hashMap))*int64((unsafe.Sizeof(concurrentMapShared{})))
for _, m := range ht.hashMap {
ht.memDelta += int64(m.items.Bytes)
}
ht.memDelta += int64(unsafe.Sizeof(entryStore{})) + int64(unsafe.Sizeof(entry{}))*initialEntrySliceLen
return ht
}
// Len return the number of rowPtrs in the concurrentMapHashTable
func (ht *concurrentMapHashTable) Len() uint64 {
return ht.length
}
// Put puts the key/rowPtr pairs to the concurrentMapHashTable, multiple rowPtrs are stored in a list.
func (ht *concurrentMapHashTable) Put(hashKey uint64, rowPtr chunk.RowPtr) {
newEntry, memDelta := ht.entryStore.GetStore()
newEntry.Ptr = rowPtr
newEntry.Next = nil
memDelta += ht.hashMap.Insert(hashKey, newEntry)
if memDelta != 0 {
atomic.AddInt64(&ht.memDelta, memDelta)
}
atomic.AddUint64(&ht.length, 1)
}
// Get gets the values of the "key" and appends them to "values".
func (ht *concurrentMapHashTable) Get(hashKey uint64) *entry {
entryAddr, _ := ht.hashMap.Get(hashKey)
return entryAddr
}
// Iter gets the every value of the hash table.
func (ht *concurrentMapHashTable) Iter(traverse func(key uint64, e *entry)) {
ht.hashMap.IterCb(traverse)
}
// GetAndCleanMemoryDelta gets and cleans the memDelta of the concurrentMapHashTable. Memory delta will be cleared after each fetch.
func (ht *concurrentMapHashTable) GetAndCleanMemoryDelta() int64 {
return atomic.SwapInt64(&ht.memDelta, 0)
}