executor: Fix crash during sort spill (#47581)
close pingcap/tidb#47538
This commit is contained in:
@ -238,7 +238,10 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
|
||||
}
|
||||
})
|
||||
if e.rowChunks.NumRow() > 0 {
|
||||
e.rowChunks.Sort()
|
||||
err := e.rowChunks.Sort()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.partitionList = append(e.partitionList, e.rowChunks)
|
||||
}
|
||||
return nil
|
||||
|
||||
@ -78,6 +78,11 @@ func (m *mutexForRowContainer) RUnlock() {
|
||||
m.rLock.RUnlock()
|
||||
}
|
||||
|
||||
type spillHelper interface {
|
||||
SpillToDisk()
|
||||
hasEnoughDataToSpill(t *memory.Tracker) bool
|
||||
}
|
||||
|
||||
// RowContainer provides a place for many rows, so many that we might want to spill them into disk.
|
||||
// nolint:structcheck
|
||||
type RowContainer struct {
|
||||
@ -121,6 +126,14 @@ func (c *RowContainer) ShallowCopyWithNewMutex() *RowContainer {
|
||||
|
||||
// SpillToDisk spills data to disk. This function may be called in parallel.
|
||||
func (c *RowContainer) SpillToDisk() {
|
||||
c.spillToDisk(nil)
|
||||
}
|
||||
|
||||
func (*RowContainer) hasEnoughDataToSpill(_ *memory.Tracker) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *RowContainer) spillToDisk(preSpillError error) {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
if c.alreadySpilled() {
|
||||
@ -153,6 +166,10 @@ func (c *RowContainer) SpillToDisk() {
|
||||
panic("out of disk quota when spilling")
|
||||
}
|
||||
})
|
||||
if preSpillError != nil {
|
||||
c.m.records.spillError = preSpillError
|
||||
return
|
||||
}
|
||||
for i := 0; i < n; i++ {
|
||||
chk := c.m.records.inMemory.GetChunk(i)
|
||||
err = c.m.records.inDisk.Add(chk)
|
||||
@ -331,8 +348,9 @@ func (c *RowContainer) Close() (err error) {
|
||||
func (c *RowContainer) ActionSpill() *SpillDiskAction {
|
||||
if c.actionSpill == nil {
|
||||
c.actionSpill = &SpillDiskAction{
|
||||
c: c,
|
||||
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}}
|
||||
c: c,
|
||||
baseSpillDiskAction: &baseSpillDiskAction{cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}},
|
||||
}
|
||||
}
|
||||
return c.actionSpill
|
||||
}
|
||||
@ -341,23 +359,21 @@ func (c *RowContainer) ActionSpill() *SpillDiskAction {
|
||||
func (c *RowContainer) ActionSpillForTest() *SpillDiskAction {
|
||||
c.actionSpill = &SpillDiskAction{
|
||||
c: c,
|
||||
testSyncInputFunc: func() {
|
||||
c.actionSpill.testWg.Add(1)
|
||||
baseSpillDiskAction: &baseSpillDiskAction{
|
||||
testSyncInputFunc: func() {
|
||||
c.actionSpill.testWg.Add(1)
|
||||
},
|
||||
testSyncOutputFunc: func() {
|
||||
c.actionSpill.testWg.Done()
|
||||
},
|
||||
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled},
|
||||
},
|
||||
testSyncOutputFunc: func() {
|
||||
c.actionSpill.testWg.Done()
|
||||
},
|
||||
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled},
|
||||
}
|
||||
return c.actionSpill
|
||||
}
|
||||
|
||||
// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If
|
||||
// the memory quota of a query is exceeded, SpillDiskAction.Action is
|
||||
// triggered.
|
||||
type SpillDiskAction struct {
|
||||
type baseSpillDiskAction struct {
|
||||
memory.BaseOOMAction
|
||||
c *RowContainer
|
||||
m sync.Mutex
|
||||
once sync.Once
|
||||
cond spillStatusCond
|
||||
@ -368,6 +384,20 @@ type SpillDiskAction struct {
|
||||
testWg sync.WaitGroup
|
||||
}
|
||||
|
||||
// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If
|
||||
// the memory quota of a query is exceeded, SpillDiskAction.Action is
|
||||
// triggered.
|
||||
type SpillDiskAction struct {
|
||||
c *RowContainer
|
||||
*baseSpillDiskAction
|
||||
}
|
||||
|
||||
// Action sends a signal to trigger spillToDisk method of RowContainer
|
||||
// and if it is already triggered before, call its fallbackAction.
|
||||
func (a *SpillDiskAction) Action(t *memory.Tracker) {
|
||||
a.action(t, a.c)
|
||||
}
|
||||
|
||||
type spillStatusCond struct {
|
||||
*sync.Cond
|
||||
// status indicates different stages for the Action
|
||||
@ -385,38 +415,35 @@ const (
|
||||
spilledYet
|
||||
)
|
||||
|
||||
func (a *SpillDiskAction) setStatus(status spillStatus) {
|
||||
func (a *baseSpillDiskAction) setStatus(status spillStatus) {
|
||||
a.cond.L.Lock()
|
||||
defer a.cond.L.Unlock()
|
||||
a.cond.status = status
|
||||
}
|
||||
|
||||
func (a *SpillDiskAction) getStatus() spillStatus {
|
||||
func (a *baseSpillDiskAction) getStatus() spillStatus {
|
||||
a.cond.L.Lock()
|
||||
defer a.cond.L.Unlock()
|
||||
return a.cond.status
|
||||
}
|
||||
|
||||
// Action sends a signal to trigger spillToDisk method of RowContainer
|
||||
// and if it is already triggered before, call its fallbackAction.
|
||||
func (a *SpillDiskAction) Action(t *memory.Tracker) {
|
||||
func (a *baseSpillDiskAction) action(t *memory.Tracker, spillHelper spillHelper) {
|
||||
a.m.Lock()
|
||||
defer a.m.Unlock()
|
||||
|
||||
if a.getStatus() == notSpilled {
|
||||
if a.getStatus() == notSpilled && spillHelper.hasEnoughDataToSpill(t) {
|
||||
a.once.Do(func() {
|
||||
logutil.BgLogger().Info("memory exceeds quota, spill to disk now.",
|
||||
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()))
|
||||
if a.testSyncInputFunc != nil {
|
||||
a.testSyncInputFunc()
|
||||
c := a.c
|
||||
go func() {
|
||||
c.SpillToDisk()
|
||||
spillHelper.SpillToDisk()
|
||||
a.testSyncOutputFunc()
|
||||
}()
|
||||
return
|
||||
}
|
||||
go a.c.SpillToDisk()
|
||||
go spillHelper.SpillToDisk()
|
||||
})
|
||||
return
|
||||
}
|
||||
@ -436,7 +463,7 @@ func (a *SpillDiskAction) Action(t *memory.Tracker) {
|
||||
}
|
||||
|
||||
// Reset resets the status for SpillDiskAction.
|
||||
func (a *SpillDiskAction) Reset() {
|
||||
func (a *baseSpillDiskAction) Reset() {
|
||||
a.m.Lock()
|
||||
defer a.m.Unlock()
|
||||
a.setStatus(notSpilled)
|
||||
@ -444,12 +471,12 @@ func (a *SpillDiskAction) Reset() {
|
||||
}
|
||||
|
||||
// GetPriority get the priority of the Action.
|
||||
func (*SpillDiskAction) GetPriority() int64 {
|
||||
func (*baseSpillDiskAction) GetPriority() int64 {
|
||||
return memory.DefSpillPriority
|
||||
}
|
||||
|
||||
// WaitForTest waits all goroutine have gone.
|
||||
func (a *SpillDiskAction) WaitForTest() {
|
||||
func (a *baseSpillDiskAction) WaitForTest() {
|
||||
a.testWg.Wait()
|
||||
}
|
||||
|
||||
@ -540,9 +567,15 @@ func (c *SortedRowContainer) keyColumnsLess(i, j int) bool {
|
||||
}
|
||||
|
||||
// Sort inits pointers and sorts the records.
|
||||
func (c *SortedRowContainer) Sort() {
|
||||
func (c *SortedRowContainer) Sort() (ret error) {
|
||||
c.ptrM.Lock()
|
||||
defer c.ptrM.Unlock()
|
||||
ret = nil
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
ret = fmt.Errorf("%v", r)
|
||||
}
|
||||
}()
|
||||
if c.ptrM.rowPtrs != nil {
|
||||
return
|
||||
}
|
||||
@ -557,12 +590,24 @@ func (c *SortedRowContainer) Sort() {
|
||||
c.ptrM.rowPtrs = append(c.ptrM.rowPtrs, RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
|
||||
}
|
||||
}
|
||||
failpoint.Inject("errorDuringSortRowContainer", func(val failpoint.Value) {
|
||||
if val.(bool) {
|
||||
panic("sort meet error")
|
||||
}
|
||||
})
|
||||
sort.Slice(c.ptrM.rowPtrs, c.keyColumnsLess)
|
||||
return
|
||||
}
|
||||
|
||||
func (c *SortedRowContainer) sortAndSpillToDisk() {
|
||||
c.Sort()
|
||||
c.RowContainer.SpillToDisk()
|
||||
// SpillToDisk spills data to disk. This function may be called in parallel.
|
||||
func (c *SortedRowContainer) SpillToDisk() {
|
||||
err := c.Sort()
|
||||
c.RowContainer.spillToDisk(err)
|
||||
}
|
||||
|
||||
func (c *SortedRowContainer) hasEnoughDataToSpill(t *memory.Tracker) bool {
|
||||
// Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files.
|
||||
return c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10
|
||||
}
|
||||
|
||||
// Add appends a chunk into the SortedRowContainer.
|
||||
@ -597,8 +642,8 @@ func (c *SortedRowContainer) GetSortedRowAndAlwaysAppendToChunk(idx int, chk *Ch
|
||||
func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction {
|
||||
if c.actionSpill == nil {
|
||||
c.actionSpill = &SortAndSpillDiskAction{
|
||||
c: c,
|
||||
SpillDiskAction: c.RowContainer.ActionSpill(),
|
||||
c: c,
|
||||
baseSpillDiskAction: c.RowContainer.ActionSpill().baseSpillDiskAction,
|
||||
}
|
||||
}
|
||||
return c.actionSpill
|
||||
@ -607,8 +652,8 @@ func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction {
|
||||
// ActionSpillForTest returns a SortAndSpillDiskAction for sorting and spilling over to disk for test.
|
||||
func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction {
|
||||
c.actionSpill = &SortAndSpillDiskAction{
|
||||
c: c,
|
||||
SpillDiskAction: c.RowContainer.ActionSpillForTest(),
|
||||
c: c,
|
||||
baseSpillDiskAction: c.RowContainer.ActionSpillForTest().baseSpillDiskAction,
|
||||
}
|
||||
return c.actionSpill
|
||||
}
|
||||
@ -623,45 +668,13 @@ func (c *SortedRowContainer) GetMemTracker() *memory.Tracker {
|
||||
// triggered.
|
||||
type SortAndSpillDiskAction struct {
|
||||
c *SortedRowContainer
|
||||
*SpillDiskAction
|
||||
*baseSpillDiskAction
|
||||
}
|
||||
|
||||
// Action sends a signal to trigger sortAndSpillToDisk method of RowContainer
|
||||
// and if it is already triggered before, call its fallbackAction.
|
||||
func (a *SortAndSpillDiskAction) Action(t *memory.Tracker) {
|
||||
a.m.Lock()
|
||||
defer a.m.Unlock()
|
||||
// Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files.
|
||||
if a.getStatus() == notSpilled && a.c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10 {
|
||||
a.once.Do(func() {
|
||||
logutil.BgLogger().Info("memory exceeds quota, spill to disk now.",
|
||||
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()))
|
||||
if a.testSyncInputFunc != nil {
|
||||
a.testSyncInputFunc()
|
||||
c := a.c
|
||||
go func() {
|
||||
c.sortAndSpillToDisk()
|
||||
a.testSyncOutputFunc()
|
||||
}()
|
||||
return
|
||||
}
|
||||
go a.c.sortAndSpillToDisk()
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
a.cond.L.Lock()
|
||||
for a.cond.status == spilling {
|
||||
a.cond.Wait()
|
||||
}
|
||||
a.cond.L.Unlock()
|
||||
|
||||
if !t.CheckExceed() {
|
||||
return
|
||||
}
|
||||
if fallback := a.GetFallback(); fallback != nil {
|
||||
fallback.Action(t)
|
||||
}
|
||||
a.action(t, a.c)
|
||||
}
|
||||
|
||||
// WaitForTest waits all goroutine have gone.
|
||||
|
||||
@ -470,6 +470,42 @@ func TestPanicWhenSpillToDisk(t *testing.T) {
|
||||
require.EqualError(t, rc.Add(chk), "out of disk quota when spilling")
|
||||
}
|
||||
|
||||
func TestPanicDuringSortedRowContainerSpill(t *testing.T) {
|
||||
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
|
||||
byItemsDesc := []bool{false}
|
||||
keyColumns := []int{0}
|
||||
keyCmpFuncs := []CompareFunc{cmpInt64}
|
||||
sz := 20
|
||||
rc := NewSortedRowContainer(fields, sz, byItemsDesc, keyColumns, keyCmpFuncs)
|
||||
|
||||
chk := NewChunkWithCapacity(fields, sz)
|
||||
for i := 0; i < sz; i++ {
|
||||
chk.AppendInt64(0, int64(i))
|
||||
}
|
||||
var tracker *memory.Tracker
|
||||
var err error
|
||||
tracker = rc.GetMemTracker()
|
||||
tracker.SetBytesLimit(chk.MemoryUsage() + int64(8*chk.NumRows()) + 1)
|
||||
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
|
||||
require.False(t, rc.AlreadySpilledSafeForTest())
|
||||
err = rc.Add(chk)
|
||||
require.NoError(t, err)
|
||||
rc.actionSpill.WaitForTest()
|
||||
require.False(t, rc.AlreadySpilledSafeForTest())
|
||||
|
||||
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer", "return(true)"))
|
||||
defer func() {
|
||||
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer"))
|
||||
}()
|
||||
err = rc.Add(chk)
|
||||
require.NoError(t, err)
|
||||
rc.actionSpill.WaitForTest()
|
||||
require.True(t, rc.AlreadySpilledSafeForTest())
|
||||
|
||||
_, err = rc.GetRow(RowPtr{})
|
||||
require.EqualError(t, err, "sort meet error")
|
||||
}
|
||||
|
||||
func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) {
|
||||
benchmarkRowContainerReaderInDiskWithRowLength(b, 512)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user