411 lines
15 KiB
Go
411 lines
15 KiB
Go
// Copyright 2018 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package chunk
|
|
|
|
import (
|
|
"io"
|
|
"os"
|
|
"sync/atomic"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/tidb/pkg/config"
|
|
"github.com/pingcap/tidb/pkg/util/checksum"
|
|
"github.com/pingcap/tidb/pkg/util/disjointset"
|
|
"github.com/pingcap/tidb/pkg/util/encrypt"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
)
|
|
|
|
// CopySelectedJoinRowsDirect directly copies the selected joined rows from the source Chunk
|
|
// to the destination Chunk.
|
|
// Return true if at least one joined row was selected.
|
|
func CopySelectedJoinRowsDirect(src *Chunk, selected []bool, dst *Chunk) (bool, error) {
|
|
if src.NumRows() == 0 {
|
|
return false, nil
|
|
}
|
|
if src.sel != nil || dst.sel != nil {
|
|
return false, errors.New(msgErrSelNotNil)
|
|
}
|
|
if len(src.columns) == 0 {
|
|
numSelected := 0
|
|
for _, s := range selected {
|
|
if s {
|
|
numSelected++
|
|
}
|
|
}
|
|
dst.numVirtualRows += numSelected
|
|
return numSelected > 0, nil
|
|
}
|
|
|
|
oldLen := dst.columns[0].length
|
|
for j, srcCol := range src.columns {
|
|
dstCol := dst.columns[j]
|
|
if srcCol.IsFixed() {
|
|
for i := range selected {
|
|
if !selected[i] {
|
|
continue
|
|
}
|
|
dstCol.appendNullBitmap(!srcCol.IsNull(i))
|
|
dstCol.length++
|
|
|
|
elemLen := len(srcCol.elemBuf)
|
|
offset := i * elemLen
|
|
dstCol.data = append(dstCol.data, srcCol.data[offset:offset+elemLen]...)
|
|
}
|
|
} else {
|
|
for i := range selected {
|
|
if !selected[i] {
|
|
continue
|
|
}
|
|
dstCol.appendNullBitmap(!srcCol.IsNull(i))
|
|
dstCol.length++
|
|
|
|
start, end := srcCol.offsets[i], srcCol.offsets[i+1]
|
|
dstCol.data = append(dstCol.data, srcCol.data[start:end]...)
|
|
dstCol.offsets = append(dstCol.offsets, int64(len(dstCol.data)))
|
|
}
|
|
}
|
|
}
|
|
numSelected := dst.columns[0].length - oldLen
|
|
dst.numVirtualRows += numSelected
|
|
return numSelected > 0, nil
|
|
}
|
|
|
|
// CopySelectedJoinRowsWithSameOuterRows copies the selected joined rows from the source Chunk
|
|
// to the destination Chunk.
|
|
// Return true if at least one joined row was selected.
|
|
//
|
|
// NOTE: All the outer rows in the source Chunk should be the same.
|
|
func CopySelectedJoinRowsWithSameOuterRows(src *Chunk, innerColOffset, innerColLen, outerColOffset, outerColLen int, selected []bool, dst *Chunk) (bool, error) {
|
|
if src.NumRows() == 0 {
|
|
return false, nil
|
|
}
|
|
if src.sel != nil || dst.sel != nil {
|
|
return false, errors.New(msgErrSelNotNil)
|
|
}
|
|
|
|
numSelected := copySelectedInnerRows(innerColOffset, innerColLen, src, selected, dst)
|
|
copySameOuterRows(outerColOffset, outerColLen, src, numSelected, dst)
|
|
dst.numVirtualRows += numSelected
|
|
return numSelected > 0, nil
|
|
}
|
|
|
|
// CopySelectedRows copies the selected rows in srcCol to dstCol
|
|
func CopySelectedRows(dstCol *Column, srcCol *Column, selected []bool) {
|
|
CopySelectedRowsWithRowIDFunc(dstCol, srcCol, selected, 0, len(selected), func(i int) int {
|
|
return i
|
|
})
|
|
}
|
|
|
|
// CopySelectedRowsWithRowIDFunc copies the selected rows in srcCol to dstCol
|
|
func CopySelectedRowsWithRowIDFunc(dstCol *Column, srcCol *Column, selected []bool, start int, end int, rowIDFunc func(int) int) {
|
|
CopyExpectedRowsWithRowIDFunc(dstCol, srcCol, selected, true, start, end, rowIDFunc)
|
|
}
|
|
|
|
// CopyExpectedRowsWithRowIDFunc copies the expected rows in srcCol to dstCol
|
|
func CopyExpectedRowsWithRowIDFunc(dstCol *Column, srcCol *Column, selected []bool, expectedResult bool, start int, end int, rowIDFunc func(int) int) {
|
|
if srcCol.IsFixed() {
|
|
for i := start; i < end; i++ {
|
|
if selected[i] != expectedResult {
|
|
continue
|
|
}
|
|
rowID := rowIDFunc(i)
|
|
dstCol.appendNullBitmap(!srcCol.IsNull(rowID))
|
|
dstCol.length++
|
|
|
|
elemLen := len(srcCol.elemBuf)
|
|
offset := rowID * elemLen
|
|
dstCol.data = append(dstCol.data, srcCol.data[offset:offset+elemLen]...)
|
|
}
|
|
} else {
|
|
for i := start; i < end; i++ {
|
|
if selected[i] != expectedResult {
|
|
continue
|
|
}
|
|
rowID := rowIDFunc(i)
|
|
dstCol.appendNullBitmap(!srcCol.IsNull(rowID))
|
|
dstCol.length++
|
|
|
|
start, end := srcCol.offsets[rowID], srcCol.offsets[rowID+1]
|
|
dstCol.data = append(dstCol.data, srcCol.data[start:end]...)
|
|
dstCol.offsets = append(dstCol.offsets, int64(len(dstCol.data)))
|
|
}
|
|
}
|
|
}
|
|
|
|
// CopyRows copies all rows in srcCol to dstCol
|
|
func CopyRows(dstCol *Column, srcCol *Column, selected []int) {
|
|
selectedLen := len(selected)
|
|
|
|
if srcCol.IsFixed() {
|
|
for i := range selectedLen {
|
|
rowID := selected[i]
|
|
dstCol.appendNullBitmap(!srcCol.IsNull(rowID))
|
|
dstCol.length++
|
|
|
|
elemLen := len(srcCol.elemBuf)
|
|
offset := rowID * elemLen
|
|
dstCol.data = append(dstCol.data, srcCol.data[offset:offset+elemLen]...)
|
|
}
|
|
} else {
|
|
for i := range selectedLen {
|
|
rowID := selected[i]
|
|
dstCol.appendNullBitmap(!srcCol.IsNull(rowID))
|
|
dstCol.length++
|
|
|
|
start, end := srcCol.offsets[rowID], srcCol.offsets[rowID+1]
|
|
dstCol.data = append(dstCol.data, srcCol.data[start:end]...)
|
|
dstCol.offsets = append(dstCol.offsets, int64(len(dstCol.data)))
|
|
}
|
|
}
|
|
}
|
|
|
|
// copySelectedInnerRows copies the selected inner rows from the source Chunk
|
|
// to the destination Chunk.
|
|
// return the number of rows which is selected.
|
|
func copySelectedInnerRows(innerColOffset, innerColLen int, src *Chunk, selected []bool, dst *Chunk) int {
|
|
srcCols := src.columns[innerColOffset : innerColOffset+innerColLen]
|
|
if len(srcCols) == 0 {
|
|
numSelected := 0
|
|
for _, s := range selected {
|
|
if s {
|
|
numSelected++
|
|
}
|
|
}
|
|
return numSelected
|
|
}
|
|
oldLen := dst.columns[innerColOffset].length
|
|
for j, srcCol := range srcCols {
|
|
dstCol := dst.columns[innerColOffset+j]
|
|
CopySelectedRows(dstCol, srcCol, selected)
|
|
}
|
|
return dst.columns[innerColOffset].length - oldLen
|
|
}
|
|
|
|
// copySameOuterRows copies the continuous 'numRows' outer rows in the source Chunk
|
|
// to the destination Chunk.
|
|
func copySameOuterRows(outerColOffset, outerColLen int, src *Chunk, numRows int, dst *Chunk) {
|
|
if numRows <= 0 || outerColLen <= 0 {
|
|
return
|
|
}
|
|
row := src.GetRow(0)
|
|
srcCols := src.columns[outerColOffset : outerColOffset+outerColLen]
|
|
for i, srcCol := range srcCols {
|
|
dstCol := dst.columns[outerColOffset+i]
|
|
dstCol.appendMultiSameNullBitmap(!srcCol.IsNull(row.idx), numRows)
|
|
dstCol.length += numRows
|
|
if srcCol.IsFixed() {
|
|
elemLen := len(srcCol.elemBuf)
|
|
start := row.idx * elemLen
|
|
end := start + numRows*elemLen
|
|
dstCol.data = append(dstCol.data, srcCol.data[start:end]...)
|
|
} else {
|
|
start, end := srcCol.offsets[row.idx], srcCol.offsets[row.idx+numRows]
|
|
dstCol.data = append(dstCol.data, srcCol.data[start:end]...)
|
|
offsets := dstCol.offsets
|
|
elemLen := srcCol.offsets[row.idx+1] - srcCol.offsets[row.idx]
|
|
for range numRows {
|
|
offsets = append(offsets, offsets[len(offsets)-1]+elemLen)
|
|
}
|
|
dstCol.offsets = offsets
|
|
}
|
|
}
|
|
}
|
|
|
|
// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file.
|
|
type diskFileReaderWriter struct {
|
|
file *os.File
|
|
writer io.WriteCloser
|
|
|
|
// offWrite is the current offset for writing.
|
|
offWrite int64
|
|
|
|
checksumWriter *checksum.Writer
|
|
cipherWriter *encrypt.Writer // cipherWriter is only enable when config SpilledFileEncryptionMethod is "aes128-ctr"
|
|
|
|
// ctrCipher stores the key and nonce using by aes encrypt io layer
|
|
ctrCipher *encrypt.CtrCipher
|
|
}
|
|
|
|
func (l *diskFileReaderWriter) initWithFileName(fileName string) (err error) {
|
|
// `os.CreateTemp` will insert random string so that a random file name will be generated.
|
|
l.file, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, fileName)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
var underlying io.WriteCloser = l.file
|
|
if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext {
|
|
// The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr"
|
|
l.ctrCipher, err = encrypt.NewCtrCipher()
|
|
if err != nil {
|
|
return
|
|
}
|
|
l.cipherWriter = encrypt.NewWriter(l.file, l.ctrCipher)
|
|
underlying = l.cipherWriter
|
|
}
|
|
l.checksumWriter = checksum.NewWriter(underlying)
|
|
l.writer = l.checksumWriter
|
|
l.offWrite = 0
|
|
return
|
|
}
|
|
|
|
func (l *diskFileReaderWriter) getReader() io.ReaderAt {
|
|
var underlying io.ReaderAt = l.file
|
|
if l.ctrCipher != nil {
|
|
underlying = NewReaderWithCache(encrypt.NewReader(l.file, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
|
|
}
|
|
if l.checksumWriter != nil {
|
|
underlying = NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
|
|
}
|
|
return underlying
|
|
}
|
|
|
|
func (l *diskFileReaderWriter) getSectionReader(off int64) *io.SectionReader {
|
|
checksumReader := l.getReader()
|
|
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
|
|
return r
|
|
}
|
|
|
|
func (l *diskFileReaderWriter) getWriter() io.Writer {
|
|
return l.writer
|
|
}
|
|
|
|
func (l *diskFileReaderWriter) write(writeData []byte) (n int, err error) {
|
|
writeNum, err := l.writer.Write(writeData)
|
|
l.offWrite += int64(writeNum)
|
|
return writeNum, err
|
|
}
|
|
|
|
// ColumnSwapHelper is used to help swap columns in a chunk.
|
|
type ColumnSwapHelper struct {
|
|
// InputIdxToOutputIdxes maps the input column index to the output column indexes.
|
|
InputIdxToOutputIdxes map[int][]int
|
|
// mergedInputIdxToOutputIdxes is only determined in runtime when saw the input chunk.
|
|
mergedInputIdxToOutputIdxes atomic.Pointer[map[int][]int]
|
|
}
|
|
|
|
// SwapColumns evaluates "Column" expressions.
|
|
// it will change the content of the input Chunk.
|
|
func (helper *ColumnSwapHelper) SwapColumns(input, output *Chunk) error {
|
|
// mergedInputIdxToOutputIdxes only can be determined in runtime when we saw the input chunk structure.
|
|
if helper.mergedInputIdxToOutputIdxes.Load() == nil {
|
|
helper.mergeInputIdxToOutputIdxes(input, helper.InputIdxToOutputIdxes)
|
|
}
|
|
for inputIdx, outputIdxes := range *helper.mergedInputIdxToOutputIdxes.Load() {
|
|
if err := output.swapColumn(outputIdxes[0], input, inputIdx); err != nil {
|
|
return err
|
|
}
|
|
for i, length := 1, len(outputIdxes); i < length; i++ {
|
|
output.MakeRef(outputIdxes[0], outputIdxes[i])
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// mergeInputIdxToOutputIdxes merges separate inputIdxToOutputIdxes entries when column references
|
|
// are detected within the input chunk. This process ensures consistent handling of columns derived
|
|
// from the same original source.
|
|
//
|
|
// Consider the following scenario:
|
|
//
|
|
// Initial scan operation produces a column 'a':
|
|
//
|
|
// scan: a (addr: ???)
|
|
//
|
|
// This column 'a' is used in the first projection (proj1) to create two columns a1 and a2, both referencing 'a':
|
|
//
|
|
// proj1
|
|
// / \
|
|
// / \
|
|
// / \
|
|
// a1 (addr: 0xe) a2 (addr: 0xe)
|
|
// / \
|
|
// / \
|
|
// / \
|
|
// proj2 proj2
|
|
// / \ / \
|
|
// / \ / \
|
|
// a3 a4 a5 a6
|
|
//
|
|
// (addr: 0xe) (addr: 0xe) (addr: 0xe) (addr: 0xe)
|
|
//
|
|
// Here, a1 and a2 share the same address (0xe), indicating they reference the same data from the original 'a'.
|
|
//
|
|
// When moving to the second projection (proj2), the system tries to project these columns further:
|
|
// - The first set (left side) consists of a3 and a4, derived from a1, both retaining the address (0xe).
|
|
// - The second set (right side) consists of a5 and a6, derived from a2, also starting with address (0xe).
|
|
//
|
|
// When proj1 is complete, the output chunk contains two columns [a1, a2], both derived from the single column 'a' from the scan.
|
|
// Since both a1 and a2 are column references with the same address (0xe), they are treated as referencing the same data.
|
|
//
|
|
// In proj2, two separate <inputIdx, []outputIdxes> items are created:
|
|
// - <0, [0,1]>: This means the 0th input column (a1) is projected twice, into the 0th and 1st columns of the output chunk.
|
|
// - <1, [2,3]>: This means the 1st input column (a2) is projected twice, into the 2nd and 3rd columns of the output chunk.
|
|
//
|
|
// Due to the column swapping logic in each projection, after applying the <0, [0,1]> projection,
|
|
// the addresses for a1 and a2 may become swapped or invalid:
|
|
//
|
|
// proj1: a1 (addr: invalid) a2 (addr: invalid)
|
|
//
|
|
// This can lead to issues in proj2, where further operations on these columns may be unsafe:
|
|
//
|
|
// proj2: a3 (addr: 0xe) a4 (addr: 0xe) a5 (addr: ???) a6 (addr: ???)
|
|
//
|
|
// Therefore, it's crucial to identify and merge the original column references early, ensuring
|
|
// the final inputIdxToOutputIdxes mapping accurately reflects the shared origins of the data.
|
|
// For instance, <0, [0,1,2,3]> indicates that the 0th input column (original 'a') is referenced
|
|
// by all four output columns in the final output.
|
|
//
|
|
// mergeInputIdxToOutputIdxes merges inputIdxToOutputIdxes based on detected column references.
|
|
// This ensures that columns with the same reference are correctly handled in the output chunk.
|
|
func (helper *ColumnSwapHelper) mergeInputIdxToOutputIdxes(input *Chunk, inputIdxToOutputIdxes map[int][]int) {
|
|
originalDJSet := disjointset.NewSet[int](4)
|
|
flag := make([]bool, input.NumCols())
|
|
// Detect self column-references inside the input chunk by comparing column addresses
|
|
for i := range input.NumCols() {
|
|
if flag[i] {
|
|
continue
|
|
}
|
|
for j := i + 1; j < input.NumCols(); j++ {
|
|
if input.Column(i) == input.Column(j) {
|
|
flag[j] = true
|
|
originalDJSet.Union(i, j)
|
|
}
|
|
}
|
|
}
|
|
// Merge inputIdxToOutputIdxes based on the detected column references.
|
|
newInputIdxToOutputIdxes := make(map[int][]int, len(inputIdxToOutputIdxes))
|
|
for inputIdx := range inputIdxToOutputIdxes {
|
|
// Root idx is internal offset, not the right column index.
|
|
originalRootIdx := originalDJSet.FindRoot(inputIdx)
|
|
originalVal, ok := originalDJSet.FindVal(originalRootIdx)
|
|
intest.Assert(ok)
|
|
mergedOutputIdxes := newInputIdxToOutputIdxes[originalVal]
|
|
mergedOutputIdxes = append(mergedOutputIdxes, inputIdxToOutputIdxes[inputIdx]...)
|
|
newInputIdxToOutputIdxes[originalVal] = mergedOutputIdxes
|
|
}
|
|
// Update the merged inputIdxToOutputIdxes automatically.
|
|
// Once failed, it means other worker has done this job at meantime.
|
|
helper.mergedInputIdxToOutputIdxes.CompareAndSwap(nil, &newInputIdxToOutputIdxes)
|
|
}
|
|
|
|
// NewColumnSwapHelper creates a new ColumnSwapHelper.
|
|
func NewColumnSwapHelper(usedColumnIndex []int) *ColumnSwapHelper {
|
|
helper := &ColumnSwapHelper{InputIdxToOutputIdxes: make(map[int][]int)}
|
|
for outputIndex, inputIndex := range usedColumnIndex {
|
|
helper.InputIdxToOutputIdxes[inputIndex] = append(helper.InputIdxToOutputIdxes[inputIndex], outputIndex)
|
|
}
|
|
return helper
|
|
}
|