diff --git a/executor/aggfuncs/func_group_concat.go b/executor/aggfuncs/func_group_concat.go index 0eab8ff3ff..e3e2fa98a5 100644 --- a/executor/aggfuncs/func_group_concat.go +++ b/executor/aggfuncs/func_group_concat.go @@ -41,6 +41,11 @@ const ( DefPartialResult4GroupConcatOrderSize = int64(unsafe.Sizeof(partialResult4GroupConcatOrder{})) // DefPartialResult4GroupConcatOrderDistinctSize is the size of partialResult4GroupConcatOrderDistinct DefPartialResult4GroupConcatOrderDistinctSize = int64(unsafe.Sizeof(partialResult4GroupConcatOrderDistinct{})) + + // DefBytesBufferSize is the size of bytes.Buffer. + DefBytesBufferSize = int64(unsafe.Sizeof(bytes.Buffer{})) + // DefTopNRowsSize is the size of topNRows. + DefTopNRowsSize = int64(unsafe.Sizeof(topNRows{})) ) type baseGroupConcat4String struct { @@ -99,7 +104,7 @@ type groupConcat struct { func (e *groupConcat) AllocPartialResult() (pr PartialResult, memDelta int64) { p := new(partialResult4GroupConcat) p.valsBuf = &bytes.Buffer{} - return PartialResult(p), DefPartialResult4GroupConcatSize + return PartialResult(p), DefPartialResult4GroupConcatSize + DefBytesBufferSize } func (e *groupConcat) ResetPartialResult(pr PartialResult) { @@ -110,6 +115,18 @@ func (e *groupConcat) ResetPartialResult(pr PartialResult) { func (e *groupConcat) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) { p := (*partialResult4GroupConcat)(pr) v, isNull := "", false + memDelta += int64(-p.valsBuf.Cap()) + if p.buffer != nil { + memDelta += int64(-p.buffer.Cap()) + } + + defer func() { + memDelta += int64(p.valsBuf.Cap()) + if p.buffer != nil { + memDelta += int64(p.buffer.Cap()) + } + }() + for _, row := range rowsInGroup { p.valsBuf.Reset() for _, arg := range e.args { @@ -125,16 +142,13 @@ func (e *groupConcat) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup [ if isNull { continue } - var oldMem int if p.buffer == nil { p.buffer = &bytes.Buffer{} + memDelta += DefBytesBufferSize } else { - oldMem = p.buffer.Cap() p.buffer.WriteString(e.sep) } p.buffer.WriteString(p.valsBuf.String()) - newMem := p.buffer.Cap() - memDelta += int64(newMem - oldMem) } if p.buffer != nil { return memDelta, e.truncatePartialResultIfNeed(sctx, p.buffer) @@ -151,9 +165,11 @@ func (e *groupConcat) MergePartialResult(sctx sessionctx.Context, src, dst Parti p2.buffer = p1.buffer return 0, nil } + memDelta -= int64(p2.buffer.Cap()) p2.buffer.WriteString(e.sep) p2.buffer.WriteString(p1.buffer.String()) - return 0, e.truncatePartialResultIfNeed(sctx, p2.buffer) + memDelta += int64(p2.buffer.Cap()) + return memDelta, e.truncatePartialResultIfNeed(sctx, p2.buffer) } // SetTruncated will be called in `executorBuilder#buildHashAgg` with duck-type. @@ -180,7 +196,7 @@ func (e *groupConcatDistinct) AllocPartialResult() (pr PartialResult, memDelta i p := new(partialResult4GroupConcatDistinct) p.valsBuf = &bytes.Buffer{} p.valSet = set.NewStringSet() - return PartialResult(p), DefPartialResult4GroupConcatDistinctSize + return PartialResult(p), DefPartialResult4GroupConcatDistinctSize + DefBytesBufferSize } func (e *groupConcatDistinct) ResetPartialResult(pr PartialResult) { @@ -191,6 +207,16 @@ func (e *groupConcatDistinct) ResetPartialResult(pr PartialResult) { func (e *groupConcatDistinct) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) { p := (*partialResult4GroupConcatDistinct)(pr) v, isNull := "", false + memDelta += int64(-p.valsBuf.Cap()) + (int64(-cap(p.encodeBytesBuffer))) + if p.buffer != nil { + memDelta += int64(-p.buffer.Cap()) + } + defer func() { + memDelta += int64(p.valsBuf.Cap()) + (int64(cap(p.encodeBytesBuffer))) + if p.buffer != nil { + memDelta += int64(p.buffer.Cap()) + } + }() for _, row := range rowsInGroup { p.valsBuf.Reset() p.encodeBytesBuffer = p.encodeBytesBuffer[:0] @@ -214,18 +240,15 @@ func (e *groupConcatDistinct) UpdatePartialResult(sctx sessionctx.Context, rowsI } p.valSet.Insert(joinedVal) memDelta += int64(len(joinedVal)) - var oldMem int // write separator if p.buffer == nil { p.buffer = &bytes.Buffer{} + memDelta += DefBytesBufferSize } else { - oldMem = p.buffer.Cap() p.buffer.WriteString(e.sep) } // write values p.buffer.WriteString(p.valsBuf.String()) - newMem := p.buffer.Cap() - memDelta += int64(newMem - oldMem) } if p.buffer != nil { return memDelta, e.truncatePartialResultIfNeed(sctx, p.buffer) @@ -299,19 +322,18 @@ func (h *topNRows) Pop() interface{} { return x } -func (h *topNRows) tryToAdd(row sortRow) (truncated bool, sortRowMemSize int64) { - oldSize := h.currSize +func (h *topNRows) tryToAdd(row sortRow) (truncated bool, memDelta int64) { h.currSize += uint64(row.buffer.Len()) if len(h.rows) > 0 { h.currSize += h.sepSize } heap.Push(h, row) + memDelta += int64(row.buffer.Cap()) for _, dt := range row.byItems { - sortRowMemSize += GetDatumMemSize(dt) + memDelta += GetDatumMemSize(dt) } if h.currSize <= h.limitSize { - sortRowMemSize += int64(h.currSize - oldSize) - return false, sortRowMemSize + return false, memDelta } for h.currSize > h.limitSize { @@ -321,14 +343,14 @@ func (h *topNRows) tryToAdd(row sortRow) (truncated bool, sortRowMemSize int64) h.rows[0].buffer.Truncate(h.rows[0].buffer.Len() - int(debt)) } else { h.currSize -= uint64(h.rows[0].buffer.Len()) + h.sepSize + memDelta -= int64(h.rows[0].buffer.Cap()) for _, dt := range h.rows[0].byItems { - sortRowMemSize -= GetDatumMemSize(dt) + memDelta -= GetDatumMemSize(dt) } heap.Pop(h) } } - sortRowMemSize += int64(h.currSize - oldSize) - return true, sortRowMemSize + return true, memDelta } func (h *topNRows) reset() { @@ -385,7 +407,7 @@ func (e *groupConcatOrder) AllocPartialResult() (pr PartialResult, memDelta int6 sepSize: uint64(len(e.sep)), }, } - return PartialResult(p), DefPartialResult4GroupConcatOrderSize + return PartialResult(p), DefPartialResult4GroupConcatOrderSize + DefTopNRowsSize } func (e *groupConcatOrder) ResetPartialResult(pr PartialResult) { @@ -487,7 +509,7 @@ func (e *groupConcatDistinctOrder) AllocPartialResult() (pr PartialResult, memDe }, valSet: set.NewStringSet(), } - return PartialResult(p), DefPartialResult4GroupConcatOrderDistinctSize + return PartialResult(p), DefPartialResult4GroupConcatOrderDistinctSize + DefTopNRowsSize } func (e *groupConcatDistinctOrder) ResetPartialResult(pr PartialResult) { @@ -500,6 +522,8 @@ func (e *groupConcatDistinctOrder) UpdatePartialResult(sctx sessionctx.Context, p := (*partialResult4GroupConcatOrderDistinct)(pr) p.topN.sctx = sctx v, isNull := "", false + memDelta -= int64(cap(p.encodeBytesBuffer)) + defer func() { memDelta += int64(cap(p.encodeBytesBuffer)) }() for _, row := range rowsInGroup { buffer := new(bytes.Buffer) p.encodeBytesBuffer = p.encodeBytesBuffer[:0] diff --git a/executor/aggfuncs/func_group_concat_test.go b/executor/aggfuncs/func_group_concat_test.go index 1651a29c20..ae701d7bf9 100644 --- a/executor/aggfuncs/func_group_concat_test.go +++ b/executor/aggfuncs/func_group_concat_test.go @@ -59,15 +59,15 @@ func (s *testSuite) TestGroupConcat(c *C) { func (s *testSuite) TestMemGroupConcat(c *C) { multiArgsTest1 := buildMultiArgsAggMemTester(ast.AggFuncGroupConcat, []byte{mysql.TypeString, mysql.TypeString}, mysql.TypeString, 5, - aggfuncs.DefPartialResult4GroupConcatSize, groupConcatMultiArgsUpdateMemDeltaGens, false) + aggfuncs.DefPartialResult4GroupConcatSize+aggfuncs.DefBytesBufferSize, groupConcatMultiArgsUpdateMemDeltaGens, false) multiArgsTest2 := buildMultiArgsAggMemTester(ast.AggFuncGroupConcat, []byte{mysql.TypeString, mysql.TypeString}, mysql.TypeString, 5, - aggfuncs.DefPartialResult4GroupConcatDistinctSize, groupConcatDistinctMultiArgsUpdateMemDeltaGens, true) + aggfuncs.DefPartialResult4GroupConcatDistinctSize+aggfuncs.DefBytesBufferSize, groupConcatDistinctMultiArgsUpdateMemDeltaGens, true) multiArgsTest3 := buildMultiArgsAggMemTester(ast.AggFuncGroupConcat, []byte{mysql.TypeString, mysql.TypeString}, mysql.TypeString, 5, - aggfuncs.DefPartialResult4GroupConcatOrderSize, groupConcatOrderMultiArgsUpdateMemDeltaGens, false) + aggfuncs.DefPartialResult4GroupConcatOrderSize+aggfuncs.DefTopNRowsSize, groupConcatOrderMultiArgsUpdateMemDeltaGens, false) multiArgsTest3.multiArgsAggTest.orderBy = true multiArgsTest4 := buildMultiArgsAggMemTester(ast.AggFuncGroupConcat, []byte{mysql.TypeString, mysql.TypeString}, mysql.TypeString, 5, - aggfuncs.DefPartialResult4GroupConcatOrderDistinctSize, groupConcatDistinctOrderMultiArgsUpdateMemDeltaGens, true) + aggfuncs.DefPartialResult4GroupConcatOrderDistinctSize+aggfuncs.DefTopNRowsSize, groupConcatDistinctOrderMultiArgsUpdateMemDeltaGens, true) multiArgsTest4.multiArgsAggTest.orderBy = true multiArgsTests := []multiArgsAggMemTest{multiArgsTest1, multiArgsTest2, multiArgsTest3, multiArgsTest4} @@ -79,21 +79,27 @@ func (s *testSuite) TestMemGroupConcat(c *C) { func groupConcatMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataType []*types.FieldType, byItems []*util.ByItems) (memDeltas []int64, err error) { memDeltas = make([]int64, 0) buffer := new(bytes.Buffer) + valBuffer := new(bytes.Buffer) for i := 0; i < srcChk.NumRows(); i++ { + valBuffer.Reset() row := srcChk.GetRow(i) if row.IsNull(0) { memDeltas = append(memDeltas, int64(0)) continue } - oldMemSize := buffer.Cap() + oldMemSize := buffer.Cap() + valBuffer.Cap() if i != 0 { buffer.WriteString(separator) } for j := 0; j < len(dataType); j++ { curVal := row.GetString(j) - buffer.WriteString(curVal) + valBuffer.WriteString(curVal) + } + buffer.WriteString(valBuffer.String()) + memDelta := int64(buffer.Cap() + valBuffer.Cap() - oldMemSize) + if i == 0 { + memDelta += aggfuncs.DefBytesBufferSize } - memDelta := int64(buffer.Cap() - oldMemSize) memDeltas = append(memDeltas, memDelta) } return memDeltas, nil @@ -101,22 +107,19 @@ func groupConcatMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataType []*typ func groupConcatOrderMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataType []*types.FieldType, byItems []*util.ByItems) (memDeltas []int64, err error) { memDeltas = make([]int64, 0) - buffer := new(bytes.Buffer) for i := 0; i < srcChk.NumRows(); i++ { + buffer := new(bytes.Buffer) row := srcChk.GetRow(i) if row.IsNull(0) { memDeltas = append(memDeltas, int64(0)) continue } - oldMemSize := buffer.Len() - if i != 0 { - buffer.WriteString(separator) - } + oldMemSize := buffer.Cap() for j := 0; j < len(dataType); j++ { curVal := row.GetString(j) buffer.WriteString(curVal) } - memDelta := int64(buffer.Len() - oldMemSize) + memDelta := int64(buffer.Cap() - oldMemSize) for _, byItem := range byItems { fdt, _ := byItem.Expr.Eval(row) datumMem := aggfuncs.GetDatumMemSize(&fdt) @@ -139,6 +142,7 @@ func groupConcatDistinctMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataTyp continue } valsBuf.Reset() + oldMemSize := buffer.Cap() + valsBuf.Cap() + cap(encodeBytesBuffer) encodeBytesBuffer = encodeBytesBuffer[:0] for j := 0; j < len(dataType); j++ { curVal := row.GetString(j) @@ -151,12 +155,14 @@ func groupConcatDistinctMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataTyp continue } valSet.Insert(joinedVal) - oldMemSize := buffer.Cap() if i != 0 { buffer.WriteString(separator) } buffer.WriteString(valsBuf.String()) - memDelta := int64(len(joinedVal) + (buffer.Cap() - oldMemSize)) + memDelta := int64(len(joinedVal) + (buffer.Cap() + valsBuf.Cap() + cap(encodeBytesBuffer) - oldMemSize)) + if i == 0 { + memDelta += aggfuncs.DefBytesBufferSize + } memDeltas = append(memDeltas, memDelta) } return memDeltas, nil @@ -164,10 +170,9 @@ func groupConcatDistinctMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataTyp func groupConcatDistinctOrderMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, dataType []*types.FieldType, byItems []*util.ByItems) (memDeltas []int64, err error) { valSet := set.NewStringSet() - buffer := new(bytes.Buffer) - valsBuf := new(bytes.Buffer) var encodeBytesBuffer []byte for i := 0; i < srcChk.NumRows(); i++ { + valsBuf := new(bytes.Buffer) row := srcChk.GetRow(i) if row.IsNull(0) { memDeltas = append(memDeltas, int64(0)) @@ -175,6 +180,7 @@ func groupConcatDistinctOrderMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, da } valsBuf.Reset() encodeBytesBuffer = encodeBytesBuffer[:0] + oldMemSize := valsBuf.Cap() + cap(encodeBytesBuffer) for j := 0; j < len(dataType); j++ { curVal := row.GetString(j) encodeBytesBuffer = codec.EncodeBytes(encodeBytesBuffer, hack.Slice(curVal)) @@ -185,13 +191,8 @@ func groupConcatDistinctOrderMultiArgsUpdateMemDeltaGens(srcChk *chunk.Chunk, da memDeltas = append(memDeltas, int64(0)) continue } - oldMemSize := buffer.Len() - if i != 0 { - buffer.WriteString(separator) - } valSet.Insert(joinedVal) - buffer.WriteString(valsBuf.String()) - memDelta := int64(len(joinedVal) + (buffer.Len() - oldMemSize)) + memDelta := int64(len(joinedVal) + (valsBuf.Cap() + cap(encodeBytesBuffer) - oldMemSize)) for _, byItem := range byItems { fdt, _ := byItem.Expr.Eval(row) datumMem := aggfuncs.GetDatumMemSize(&fdt)