executor: make the memory tracker of groupConcat more accurate. (#23034)

This commit is contained in:
Shenghui Wu
2021-03-04 10:50:54 +08:00
committed by GitHub
parent 953de47119
commit bee6d37c8b
2 changed files with 69 additions and 44 deletions

View File

@ -41,6 +41,11 @@ const (
DefPartialResult4GroupConcatOrderSize = int64(unsafe.Sizeof(partialResult4GroupConcatOrder{}))
// DefPartialResult4GroupConcatOrderDistinctSize is the size of partialResult4GroupConcatOrderDistinct
DefPartialResult4GroupConcatOrderDistinctSize = int64(unsafe.Sizeof(partialResult4GroupConcatOrderDistinct{}))
// DefBytesBufferSize is the size of bytes.Buffer.
DefBytesBufferSize = int64(unsafe.Sizeof(bytes.Buffer{}))
// DefTopNRowsSize is the size of topNRows.
DefTopNRowsSize = int64(unsafe.Sizeof(topNRows{}))
)
type baseGroupConcat4String struct {
@ -99,7 +104,7 @@ type groupConcat struct {
func (e *groupConcat) AllocPartialResult() (pr PartialResult, memDelta int64) {
p := new(partialResult4GroupConcat)
p.valsBuf = &bytes.Buffer{}
return PartialResult(p), DefPartialResult4GroupConcatSize
return PartialResult(p), DefPartialResult4GroupConcatSize + DefBytesBufferSize
}
func (e *groupConcat) ResetPartialResult(pr PartialResult) {
@ -110,6 +115,18 @@ func (e *groupConcat) ResetPartialResult(pr PartialResult) {
func (e *groupConcat) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
p := (*partialResult4GroupConcat)(pr)
v, isNull := "", false
memDelta += int64(-p.valsBuf.Cap())
if p.buffer != nil {
memDelta += int64(-p.buffer.Cap())
}
defer func() {
memDelta += int64(p.valsBuf.Cap())
if p.buffer != nil {
memDelta += int64(p.buffer.Cap())
}
}()
for _, row := range rowsInGroup {
p.valsBuf.Reset()
for _, arg := range e.args {
@ -125,16 +142,13 @@ func (e *groupConcat) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup [
if isNull {
continue
}
var oldMem int
if p.buffer == nil {
p.buffer = &bytes.Buffer{}
memDelta += DefBytesBufferSize
} else {
oldMem = p.buffer.Cap()
p.buffer.WriteString(e.sep)
}
p.buffer.WriteString(p.valsBuf.String())
newMem := p.buffer.Cap()
memDelta += int64(newMem - oldMem)
}
if p.buffer != nil {
return memDelta, e.truncatePartialResultIfNeed(sctx, p.buffer)
@ -151,9 +165,11 @@ func (e *groupConcat) MergePartialResult(sctx sessionctx.Context, src, dst Parti
p2.buffer = p1.buffer
return 0, nil
}
memDelta -= int64(p2.buffer.Cap())
p2.buffer.WriteString(e.sep)
p2.buffer.WriteString(p1.buffer.String())
return 0, e.truncatePartialResultIfNeed(sctx, p2.buffer)
memDelta += int64(p2.buffer.Cap())
return memDelta, e.truncatePartialResultIfNeed(sctx, p2.buffer)
}
// SetTruncated will be called in `executorBuilder#buildHashAgg` with duck-type.
@ -180,7 +196,7 @@ func (e *groupConcatDistinct) AllocPartialResult() (pr PartialResult, memDelta i
p := new(partialResult4GroupConcatDistinct)
p.valsBuf = &bytes.Buffer{}
p.valSet = set.NewStringSet()
return PartialResult(p), DefPartialResult4GroupConcatDistinctSize
return PartialResult(p), DefPartialResult4GroupConcatDistinctSize + DefBytesBufferSize
}
func (e *groupConcatDistinct) ResetPartialResult(pr PartialResult) {
@ -191,6 +207,16 @@ func (e *groupConcatDistinct) ResetPartialResult(pr PartialResult) {
func (e *groupConcatDistinct) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
p := (*partialResult4GroupConcatDistinct)(pr)
v, isNull := "", false
memDelta += int64(-p.valsBuf.Cap()) + (int64(-cap(p.encodeBytesBuffer)))
if p.buffer != nil {
memDelta += int64(-p.buffer.Cap())
}
defer func() {
memDelta += int64(p.valsBuf.Cap()) + (int64(cap(p.encodeBytesBuffer)))
if p.buffer != nil {
memDelta += int64(p.buffer.Cap())
}
}()
for _, row := range rowsInGroup {
p.valsBuf.Reset()
p.encodeBytesBuffer = p.encodeBytesBuffer[:0]
@ -214,18 +240,15 @@ func (e *groupConcatDistinct) UpdatePartialResult(sctx sessionctx.Context, rowsI
}
p.valSet.Insert(joinedVal)
memDelta += int64(len(joinedVal))
var oldMem int
// write separator
if p.buffer == nil {
p.buffer = &bytes.Buffer{}
memDelta += DefBytesBufferSize
} else {
oldMem = p.buffer.Cap()
p.buffer.WriteString(e.sep)
}
// write values
p.buffer.WriteString(p.valsBuf.String())
newMem := p.buffer.Cap()
memDelta += int64(newMem - oldMem)
}
if p.buffer != nil {
return memDelta, e.truncatePartialResultIfNeed(sctx, p.buffer)
@ -299,19 +322,18 @@ func (h *topNRows) Pop() interface{} {
return x
}
func (h *topNRows) tryToAdd(row sortRow) (truncated bool, sortRowMemSize int64) {
oldSize := h.currSize
func (h *topNRows) tryToAdd(row sortRow) (truncated bool, memDelta int64) {
h.currSize += uint64(row.buffer.Len())
if len(h.rows) > 0 {
h.currSize += h.sepSize
}
heap.Push(h, row)
memDelta += int64(row.buffer.Cap())
for _, dt := range row.byItems {
sortRowMemSize += GetDatumMemSize(dt)
memDelta += GetDatumMemSize(dt)
}
if h.currSize <= h.limitSize {
sortRowMemSize += int64(h.currSize - oldSize)
return false, sortRowMemSize
return false, memDelta
}
for h.currSize > h.limitSize {
@ -321,14 +343,14 @@ func (h *topNRows) tryToAdd(row sortRow) (truncated bool, sortRowMemSize int64)
h.rows[0].buffer.Truncate(h.rows[0].buffer.Len() - int(debt))
} else {
h.currSize -= uint64(h.rows[0].buffer.Len()) + h.sepSize
memDelta -= int64(h.rows[0].buffer.Cap())
for _, dt := range h.rows[0].byItems {
sortRowMemSize -= GetDatumMemSize(dt)
memDelta -= GetDatumMemSize(dt)
}
heap.Pop(h)
}
}
sortRowMemSize += int64(h.currSize - oldSize)
return true, sortRowMemSize
return true, memDelta
}
func (h *topNRows) reset() {
@ -385,7 +407,7 @@ func (e *groupConcatOrder) AllocPartialResult() (pr PartialResult, memDelta int6
sepSize: uint64(len(e.sep)),
},
}
return PartialResult(p), DefPartialResult4GroupConcatOrderSize
return PartialResult(p), DefPartialResult4GroupConcatOrderSize + DefTopNRowsSize
}
func (e *groupConcatOrder) ResetPartialResult(pr PartialResult) {
@ -487,7 +509,7 @@ func (e *groupConcatDistinctOrder) AllocPartialResult() (pr PartialResult, memDe
},
valSet: set.NewStringSet(),
}
return PartialResult(p), DefPartialResult4GroupConcatOrderDistinctSize
return PartialResult(p), DefPartialResult4GroupConcatOrderDistinctSize + DefTopNRowsSize
}
func (e *groupConcatDistinctOrder) ResetPartialResult(pr PartialResult) {
@ -500,6 +522,8 @@ func (e *groupConcatDistinctOrder) UpdatePartialResult(sctx sessionctx.Context,
p := (*partialResult4GroupConcatOrderDistinct)(pr)
p.topN.sctx = sctx
v, isNull := "", false
memDelta -= int64(cap(p.encodeBytesBuffer))
defer func() { memDelta += int64(cap(p.encodeBytesBuffer)) }()
for _, row := range rowsInGroup {
buffer := new(bytes.Buffer)
p.encodeBytesBuffer = p.encodeBytesBuffer[:0]

View File

@ -59,15 +59,15 @@ func (s *testSuite) TestGroupConcat(c *C) {
func (s *testSuite) TestMemGroupConcat(c *C) {
multiArgsTest1 := buildMultiArgsAggMemTester(ast.AggFuncGroupConcat, []byte{mysql.TypeString, mysql.TypeString}, mysql.TypeString, 5,
aggfuncs.DefPartialResult4GroupConcatSize, groupConcatMultiArgsUpdateMemDeltaGens, false)
aggfuncs.DefPartialResult4GroupConcatSize+aggfuncs.DefBytesBufferSize, groupConcatMultiArgsUpdateMemDeltaGens, false)
multiArgsTest2 := buildMultiArgsAggMemTester(ast.AggFuncGroupConcat, []byte{mysql.TypeString, mysql.TypeString}, mysql.TypeString, 5,
aggfuncs.DefPartialResult4GroupConcatDistinctSize, groupConcatDistinctMultiArgsUpdateMemDeltaGens, true)
aggfuncs.DefPartialResult4GroupConcatDistinctSize+aggfuncs.DefBytesBufferSize, groupConcatDistinctMultiArgsUpdateMemDeltaGens, true)
multiArgsTest3 := buildMultiArgsAggMemTester(ast.AggFuncGroupConcat, []byte{mysql.TypeString, mysql.TypeString}, mysql.TypeString, 5,
aggfuncs.DefPartialResult4GroupConcatOrderSize, groupConcatOrderMultiArgsUpdateMemDeltaGens, false)
aggfuncs.DefPartialResult4GroupConcatOrderSize+aggfuncs.DefTopNRowsSize, groupConcatOrderMultiArgsUpdateMemDeltaGens, false)
multiArgsTest3.multiArgsAggTest.orderBy = true
multiArgsTest4 := buildMultiArgsAggMemTester(ast.AggFuncGroupConcat, []byte{mysql.TypeString, mysql.TypeString}, mysql.TypeString, 5,
aggfuncs.DefPartialResult4GroupConcatOrderDistinctSize, groupConcatDistinctOrderMultiArgsUpdateMemDeltaGens, true)
aggfuncs.DefPartialResult4GroupConcatOrderDistinctSize+aggfuncs.DefTopNRowsSize, groupConcatDistinctOrderMultiArgsUpdateMemDeltaGens, true)
multiArgsTest4.multiArgsAggTest.orderBy = true
multiArgsTests := []multiArgsAggMemTest{multiArgsTest1, multiArgsTest2, multiArgsTest3, multiArgsTest4}
@ -79,21 +79,27 @@ func (s *testSuite) TestMemGroupConcat(c *C) {
func groupConcatMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataType []*types.FieldType, byItems []*util.ByItems) (memDeltas []int64, err error) {
memDeltas = make([]int64, 0)
buffer := new(bytes.Buffer)
valBuffer := new(bytes.Buffer)
for i := 0; i < srcChk.NumRows(); i++ {
valBuffer.Reset()
row := srcChk.GetRow(i)
if row.IsNull(0) {
memDeltas = append(memDeltas, int64(0))
continue
}
oldMemSize := buffer.Cap()
oldMemSize := buffer.Cap() + valBuffer.Cap()
if i != 0 {
buffer.WriteString(separator)
}
for j := 0; j < len(dataType); j++ {
curVal := row.GetString(j)
buffer.WriteString(curVal)
valBuffer.WriteString(curVal)
}
buffer.WriteString(valBuffer.String())
memDelta := int64(buffer.Cap() + valBuffer.Cap() - oldMemSize)
if i == 0 {
memDelta += aggfuncs.DefBytesBufferSize
}
memDelta := int64(buffer.Cap() - oldMemSize)
memDeltas = append(memDeltas, memDelta)
}
return memDeltas, nil
@ -101,22 +107,19 @@ func groupConcatMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataType []*typ
func groupConcatOrderMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataType []*types.FieldType, byItems []*util.ByItems) (memDeltas []int64, err error) {
memDeltas = make([]int64, 0)
buffer := new(bytes.Buffer)
for i := 0; i < srcChk.NumRows(); i++ {
buffer := new(bytes.Buffer)
row := srcChk.GetRow(i)
if row.IsNull(0) {
memDeltas = append(memDeltas, int64(0))
continue
}
oldMemSize := buffer.Len()
if i != 0 {
buffer.WriteString(separator)
}
oldMemSize := buffer.Cap()
for j := 0; j < len(dataType); j++ {
curVal := row.GetString(j)
buffer.WriteString(curVal)
}
memDelta := int64(buffer.Len() - oldMemSize)
memDelta := int64(buffer.Cap() - oldMemSize)
for _, byItem := range byItems {
fdt, _ := byItem.Expr.Eval(row)
datumMem := aggfuncs.GetDatumMemSize(&fdt)
@ -139,6 +142,7 @@ func groupConcatDistinctMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataTyp
continue
}
valsBuf.Reset()
oldMemSize := buffer.Cap() + valsBuf.Cap() + cap(encodeBytesBuffer)
encodeBytesBuffer = encodeBytesBuffer[:0]
for j := 0; j < len(dataType); j++ {
curVal := row.GetString(j)
@ -151,12 +155,14 @@ func groupConcatDistinctMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataTyp
continue
}
valSet.Insert(joinedVal)
oldMemSize := buffer.Cap()
if i != 0 {
buffer.WriteString(separator)
}
buffer.WriteString(valsBuf.String())
memDelta := int64(len(joinedVal) + (buffer.Cap() - oldMemSize))
memDelta := int64(len(joinedVal) + (buffer.Cap() + valsBuf.Cap() + cap(encodeBytesBuffer) - oldMemSize))
if i == 0 {
memDelta += aggfuncs.DefBytesBufferSize
}
memDeltas = append(memDeltas, memDelta)
}
return memDeltas, nil
@ -164,10 +170,9 @@ func groupConcatDistinctMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataTyp
func groupConcatDistinctOrderMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataType []*types.FieldType, byItems []*util.ByItems) (memDeltas []int64, err error) {
valSet := set.NewStringSet()
buffer := new(bytes.Buffer)
valsBuf := new(bytes.Buffer)
var encodeBytesBuffer []byte
for i := 0; i < srcChk.NumRows(); i++ {
valsBuf := new(bytes.Buffer)
row := srcChk.GetRow(i)
if row.IsNull(0) {
memDeltas = append(memDeltas, int64(0))
@ -175,6 +180,7 @@ func groupConcatDistinctOrderMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, da
}
valsBuf.Reset()
encodeBytesBuffer = encodeBytesBuffer[:0]
oldMemSize := valsBuf.Cap() + cap(encodeBytesBuffer)
for j := 0; j < len(dataType); j++ {
curVal := row.GetString(j)
encodeBytesBuffer = codec.EncodeBytes(encodeBytesBuffer, hack.Slice(curVal))
@ -185,13 +191,8 @@ func groupConcatDistinctOrderMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, da
memDeltas = append(memDeltas, int64(0))
continue
}
oldMemSize := buffer.Len()
if i != 0 {
buffer.WriteString(separator)
}
valSet.Insert(joinedVal)
buffer.WriteString(valsBuf.String())
memDelta := int64(len(joinedVal) + (buffer.Len() - oldMemSize))
memDelta := int64(len(joinedVal) + (valsBuf.Cap() + cap(encodeBytesBuffer) - oldMemSize))
for _, byItem := range byItems {
fdt, _ := byItem.Expr.Eval(row)
datumMem := aggfuncs.GetDatumMemSize(&fdt)