// Copyright 2024 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package sortexec import ( "slices" "sync" "sync/atomic" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/disk" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/sqlkiller" "go.uber.org/zap" ) type topNSpillHelper struct { cond *sync.Cond spillStatus int sortedRowsInDisk []*chunk.DataInDiskByChunks finishCh <-chan struct{} errOutputChan chan<- rowWithError memTracker *memory.Tracker diskTracker *disk.Tracker fieldTypes []*types.FieldType tmpSpillChunksChan chan *chunk.Chunk workers []*topNWorker bytesConsumed atomic.Int64 bytesLimit atomic.Int64 sqlKiller *sqlkiller.SQLKiller fileNamePrefixForTest string } func newTopNSpillerHelper( topn *TopNExec, finishCh <-chan struct{}, errOutputChan chan<- rowWithError, memTracker *memory.Tracker, diskTracker *disk.Tracker, fieldTypes []*types.FieldType, workers []*topNWorker, concurrencyNum int, sqlKiller *sqlkiller.SQLKiller, ) *topNSpillHelper { lock := sync.Mutex{} tmpSpillChunksChan := make(chan *chunk.Chunk, concurrencyNum) for range workers { tmpSpillChunksChan <- exec.TryNewCacheChunk(topn.Children(0)) } return &topNSpillHelper{ cond: sync.NewCond(&lock), spillStatus: notSpilled, sortedRowsInDisk: make([]*chunk.DataInDiskByChunks, 0), finishCh: finishCh, errOutputChan: errOutputChan, memTracker: memTracker, diskTracker: diskTracker, fieldTypes: fieldTypes, tmpSpillChunksChan: tmpSpillChunksChan, workers: workers, bytesConsumed: atomic.Int64{}, bytesLimit: atomic.Int64{}, sqlKiller: sqlKiller, fileNamePrefixForTest: topn.FileNamePrefixForTest, } } func (t *topNSpillHelper) close() { for _, inDisk := range t.sortedRowsInDisk { inDisk.Close() } } func (t *topNSpillHelper) isNotSpilledNoLock() bool { return t.spillStatus == notSpilled } func (t *topNSpillHelper) isInSpillingNoLock() bool { return t.spillStatus == inSpilling } func (t *topNSpillHelper) isSpillNeeded() bool { t.cond.L.Lock() defer t.cond.L.Unlock() return t.spillStatus == needSpill } func (t *topNSpillHelper) isSpillTriggered() bool { t.cond.L.Lock() defer t.cond.L.Unlock() return len(t.sortedRowsInDisk) > 0 } func (t *topNSpillHelper) setInSpilling() { t.cond.L.Lock() defer t.cond.L.Unlock() t.spillStatus = inSpilling logutil.BgLogger().Info(spillInfo, zap.Int64("consumed", t.bytesConsumed.Load()), zap.Int64("quota", t.bytesLimit.Load())) } func (t *topNSpillHelper) setNotSpilled() { t.cond.L.Lock() defer t.cond.L.Unlock() t.spillStatus = notSpilled } func (t *topNSpillHelper) setNeedSpillNoLock() { t.spillStatus = needSpill } func (t *topNSpillHelper) addInDisk(inDisk *chunk.DataInDiskByChunks) { t.cond.L.Lock() defer t.cond.L.Unlock() t.sortedRowsInDisk = append(t.sortedRowsInDisk, inDisk) } func (*topNSpillHelper) spillTmpSpillChunk(inDisk *chunk.DataInDiskByChunks, tmpSpillChunk *chunk.Chunk) error { err := inDisk.Add(tmpSpillChunk) if err != nil { return err } tmpSpillChunk.Reset() return nil } func (t *topNSpillHelper) spill() (err error) { defer func() { if r := recover(); r != nil { err = util.GetRecoverError(r) } }() select { case <-t.finishCh: return nil default: } t.setInSpilling() defer t.cond.Broadcast() defer t.setNotSpilled() workerNum := len(t.workers) errChan := make(chan error, workerNum) workerWaiter := &sync.WaitGroup{} workerWaiter.Add(workerNum) for i := range workerNum { go func(idx int) { defer func() { if r := recover(); r != nil { processPanicAndLog(t.errOutputChan, r) } workerWaiter.Done() }() spillErr := t.spillHeap(t.workers[idx].chkHeap) if spillErr != nil { errChan <- spillErr } }(i) } workerWaiter.Wait() close(errChan) // Fetch only one error is enough spillErr := <-errChan if spillErr != nil { return spillErr } return nil } func (t *topNSpillHelper) spillHeap(chkHeap *topNChunkHeap) error { if chkHeap.Len() <= 0 && chkHeap.rowChunks.Len() <= 0 { return nil } if !chkHeap.isRowPtrsInit { // Do not consume memory here, as it will hang chkHeap.initPtrsImpl() } slices.SortFunc(chkHeap.rowPtrs, chkHeap.keyColumnsCompare) tmpSpillChunk := <-t.tmpSpillChunksChan tmpSpillChunk.Reset() defer func() { t.tmpSpillChunksChan <- tmpSpillChunk }() inDisk := chunk.NewDataInDiskByChunks(t.fieldTypes, t.fileNamePrefixForTest) inDisk.GetDiskTracker().AttachTo(t.diskTracker) isInDiskCollected := false defer func() { if !isInDiskCollected { inDisk.Close() } }() rowPtrNum := chkHeap.Len() for ; chkHeap.idx < rowPtrNum; chkHeap.idx++ { if chkHeap.idx%100 == 0 && t.sqlKiller != nil { err := t.sqlKiller.HandleSignal() if err != nil { return err } } if tmpSpillChunk.IsFull() { err := t.spillTmpSpillChunk(inDisk, tmpSpillChunk) if err != nil { return err } } tmpSpillChunk.AppendRow(chkHeap.rowChunks.GetRow(chkHeap.rowPtrs[chkHeap.idx])) } // Spill remaining rows in tmpSpillChunk if tmpSpillChunk.NumRows() > 0 { err := t.spillTmpSpillChunk(inDisk, tmpSpillChunk) if err != nil { return err } } injectTopNRandomFail(200) t.addInDisk(inDisk) isInDiskCollected = true chkHeap.clear() return nil } type topNSpillAction struct { memory.BaseOOMAction spillHelper *topNSpillHelper } // GetPriority get the priority of the Action. func (*topNSpillAction) GetPriority() int64 { return memory.DefSpillPriority } func (t *topNSpillAction) Action(tracker *memory.Tracker) { t.spillHelper.cond.L.Lock() defer t.spillHelper.cond.L.Unlock() for t.spillHelper.isInSpillingNoLock() { t.spillHelper.cond.Wait() } hasEnoughData := hasEnoughDataToSpill(t.spillHelper.memTracker, tracker) if tracker.CheckExceed() && t.spillHelper.isNotSpilledNoLock() && hasEnoughData { t.spillHelper.setNeedSpillNoLock() t.spillHelper.bytesConsumed.Store(tracker.BytesConsumed()) t.spillHelper.bytesLimit.Store(tracker.GetBytesLimit()) return } if tracker.CheckExceed() && !hasEnoughData { t.TriggerFallBackAction(tracker) } }