290 lines
6.8 KiB
Go
290 lines
6.8 KiB
Go
// Copyright 2024 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package sortexec
|
|
|
|
import (
|
|
"slices"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/pingcap/tidb/pkg/executor/internal/exec"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/disk"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"github.com/pingcap/tidb/pkg/util/memory"
|
|
"github.com/pingcap/tidb/pkg/util/sqlkiller"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type topNSpillHelper struct {
|
|
cond *sync.Cond
|
|
spillStatus int
|
|
sortedRowsInDisk []*chunk.DataInDiskByChunks
|
|
|
|
finishCh <-chan struct{}
|
|
errOutputChan chan<- rowWithError
|
|
|
|
memTracker *memory.Tracker
|
|
diskTracker *disk.Tracker
|
|
|
|
fieldTypes []*types.FieldType
|
|
tmpSpillChunksChan chan *chunk.Chunk
|
|
|
|
workers []*topNWorker
|
|
|
|
bytesConsumed atomic.Int64
|
|
bytesLimit atomic.Int64
|
|
|
|
sqlKiller *sqlkiller.SQLKiller
|
|
|
|
fileNamePrefixForTest string
|
|
}
|
|
|
|
func newTopNSpillerHelper(
|
|
topn *TopNExec,
|
|
finishCh <-chan struct{},
|
|
errOutputChan chan<- rowWithError,
|
|
memTracker *memory.Tracker,
|
|
diskTracker *disk.Tracker,
|
|
fieldTypes []*types.FieldType,
|
|
workers []*topNWorker,
|
|
concurrencyNum int,
|
|
sqlKiller *sqlkiller.SQLKiller,
|
|
) *topNSpillHelper {
|
|
lock := sync.Mutex{}
|
|
tmpSpillChunksChan := make(chan *chunk.Chunk, concurrencyNum)
|
|
for range workers {
|
|
tmpSpillChunksChan <- exec.TryNewCacheChunk(topn.Children(0))
|
|
}
|
|
|
|
return &topNSpillHelper{
|
|
cond: sync.NewCond(&lock),
|
|
spillStatus: notSpilled,
|
|
sortedRowsInDisk: make([]*chunk.DataInDiskByChunks, 0),
|
|
finishCh: finishCh,
|
|
errOutputChan: errOutputChan,
|
|
memTracker: memTracker,
|
|
diskTracker: diskTracker,
|
|
fieldTypes: fieldTypes,
|
|
tmpSpillChunksChan: tmpSpillChunksChan,
|
|
workers: workers,
|
|
bytesConsumed: atomic.Int64{},
|
|
bytesLimit: atomic.Int64{},
|
|
sqlKiller: sqlKiller,
|
|
fileNamePrefixForTest: topn.FileNamePrefixForTest,
|
|
}
|
|
}
|
|
|
|
func (t *topNSpillHelper) close() {
|
|
for _, inDisk := range t.sortedRowsInDisk {
|
|
inDisk.Close()
|
|
}
|
|
}
|
|
|
|
func (t *topNSpillHelper) isNotSpilledNoLock() bool {
|
|
return t.spillStatus == notSpilled
|
|
}
|
|
|
|
func (t *topNSpillHelper) isInSpillingNoLock() bool {
|
|
return t.spillStatus == inSpilling
|
|
}
|
|
|
|
func (t *topNSpillHelper) isSpillNeeded() bool {
|
|
t.cond.L.Lock()
|
|
defer t.cond.L.Unlock()
|
|
return t.spillStatus == needSpill
|
|
}
|
|
|
|
func (t *topNSpillHelper) isSpillTriggered() bool {
|
|
t.cond.L.Lock()
|
|
defer t.cond.L.Unlock()
|
|
return len(t.sortedRowsInDisk) > 0
|
|
}
|
|
|
|
func (t *topNSpillHelper) setInSpilling() {
|
|
t.cond.L.Lock()
|
|
defer t.cond.L.Unlock()
|
|
t.spillStatus = inSpilling
|
|
logutil.BgLogger().Info(spillInfo, zap.Int64("consumed", t.bytesConsumed.Load()), zap.Int64("quota", t.bytesLimit.Load()))
|
|
}
|
|
|
|
func (t *topNSpillHelper) setNotSpilled() {
|
|
t.cond.L.Lock()
|
|
defer t.cond.L.Unlock()
|
|
t.spillStatus = notSpilled
|
|
}
|
|
|
|
func (t *topNSpillHelper) setNeedSpillNoLock() {
|
|
t.spillStatus = needSpill
|
|
}
|
|
|
|
func (t *topNSpillHelper) addInDisk(inDisk *chunk.DataInDiskByChunks) {
|
|
t.cond.L.Lock()
|
|
defer t.cond.L.Unlock()
|
|
t.sortedRowsInDisk = append(t.sortedRowsInDisk, inDisk)
|
|
}
|
|
|
|
func (*topNSpillHelper) spillTmpSpillChunk(inDisk *chunk.DataInDiskByChunks, tmpSpillChunk *chunk.Chunk) error {
|
|
err := inDisk.Add(tmpSpillChunk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tmpSpillChunk.Reset()
|
|
return nil
|
|
}
|
|
|
|
func (t *topNSpillHelper) spill() (err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
err = util.GetRecoverError(r)
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-t.finishCh:
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
t.setInSpilling()
|
|
defer t.cond.Broadcast()
|
|
defer t.setNotSpilled()
|
|
|
|
workerNum := len(t.workers)
|
|
errChan := make(chan error, workerNum)
|
|
workerWaiter := &sync.WaitGroup{}
|
|
workerWaiter.Add(workerNum)
|
|
for i := range workerNum {
|
|
go func(idx int) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
processPanicAndLog(t.errOutputChan, r)
|
|
}
|
|
workerWaiter.Done()
|
|
}()
|
|
|
|
spillErr := t.spillHeap(t.workers[idx].chkHeap)
|
|
if spillErr != nil {
|
|
errChan <- spillErr
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
workerWaiter.Wait()
|
|
close(errChan)
|
|
|
|
// Fetch only one error is enough
|
|
spillErr := <-errChan
|
|
if spillErr != nil {
|
|
return spillErr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *topNSpillHelper) spillHeap(chkHeap *topNChunkHeap) error {
|
|
if chkHeap.Len() <= 0 && chkHeap.rowChunks.Len() <= 0 {
|
|
return nil
|
|
}
|
|
|
|
if !chkHeap.isRowPtrsInit {
|
|
// Do not consume memory here, as it will hang
|
|
chkHeap.initPtrsImpl()
|
|
}
|
|
slices.SortFunc(chkHeap.rowPtrs, chkHeap.keyColumnsCompare)
|
|
|
|
tmpSpillChunk := <-t.tmpSpillChunksChan
|
|
tmpSpillChunk.Reset()
|
|
defer func() {
|
|
t.tmpSpillChunksChan <- tmpSpillChunk
|
|
}()
|
|
|
|
inDisk := chunk.NewDataInDiskByChunks(t.fieldTypes, t.fileNamePrefixForTest)
|
|
inDisk.GetDiskTracker().AttachTo(t.diskTracker)
|
|
|
|
isInDiskCollected := false
|
|
defer func() {
|
|
if !isInDiskCollected {
|
|
inDisk.Close()
|
|
}
|
|
}()
|
|
|
|
rowPtrNum := chkHeap.Len()
|
|
for ; chkHeap.idx < rowPtrNum; chkHeap.idx++ {
|
|
if chkHeap.idx%100 == 0 && t.sqlKiller != nil {
|
|
err := t.sqlKiller.HandleSignal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if tmpSpillChunk.IsFull() {
|
|
err := t.spillTmpSpillChunk(inDisk, tmpSpillChunk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
tmpSpillChunk.AppendRow(chkHeap.rowChunks.GetRow(chkHeap.rowPtrs[chkHeap.idx]))
|
|
}
|
|
|
|
// Spill remaining rows in tmpSpillChunk
|
|
if tmpSpillChunk.NumRows() > 0 {
|
|
err := t.spillTmpSpillChunk(inDisk, tmpSpillChunk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
injectTopNRandomFail(200)
|
|
|
|
t.addInDisk(inDisk)
|
|
isInDiskCollected = true
|
|
|
|
chkHeap.clear()
|
|
return nil
|
|
}
|
|
|
|
type topNSpillAction struct {
|
|
memory.BaseOOMAction
|
|
spillHelper *topNSpillHelper
|
|
}
|
|
|
|
// GetPriority get the priority of the Action.
|
|
func (*topNSpillAction) GetPriority() int64 {
|
|
return memory.DefSpillPriority
|
|
}
|
|
|
|
func (t *topNSpillAction) Action(tracker *memory.Tracker) {
|
|
t.spillHelper.cond.L.Lock()
|
|
defer t.spillHelper.cond.L.Unlock()
|
|
|
|
for t.spillHelper.isInSpillingNoLock() {
|
|
t.spillHelper.cond.Wait()
|
|
}
|
|
|
|
hasEnoughData := hasEnoughDataToSpill(t.spillHelper.memTracker, tracker)
|
|
if tracker.CheckExceed() && t.spillHelper.isNotSpilledNoLock() && hasEnoughData {
|
|
t.spillHelper.setNeedSpillNoLock()
|
|
t.spillHelper.bytesConsumed.Store(tracker.BytesConsumed())
|
|
t.spillHelper.bytesLimit.Store(tracker.GetBytesLimit())
|
|
return
|
|
}
|
|
|
|
if tracker.CheckExceed() && !hasEnoughData {
|
|
t.TriggerFallBackAction(tracker)
|
|
}
|
|
}
|