From 2fb314537a190b4e9e5af4cb258b540bbc3e400a Mon Sep 17 00:00:00 2001 From: lysu Date: Tue, 28 Aug 2018 20:08:13 +0800 Subject: [PATCH] chunk: support capacity grow (#7473) --- util/chunk/chunk.go | 128 ++++++++++++++++++++++++++++++--------- util/chunk/chunk_test.go | 98 ++++++++++++++++++++++++++++-- util/chunk/codec.go | 5 +- 3 files changed, 198 insertions(+), 33 deletions(-) diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index ccdcdc7219..a9211cb79a 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -17,6 +17,7 @@ import ( "encoding/binary" "unsafe" + "github.com/cznic/mathutil" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" ) @@ -30,6 +31,8 @@ type Chunk struct { // numVirtualRows indicates the number of virtual rows, which have zero column. // It is used only when this Chunk doesn't hold any data, i.e. "len(columns)==0". numVirtualRows int + // capacity indicates the max number of rows this chunk can hold. + capacity int } // Capacity constants. @@ -39,15 +42,56 @@ const ( // NewChunkWithCapacity creates a new chunk with field types and capacity. func NewChunkWithCapacity(fields []*types.FieldType, cap int) *Chunk { + return New(fields, cap, cap) //FIXME: in following PR. +} + +// New creates a new chunk. +// cap: the limit for the max number of rows. +// maxChunkSize: the max limit for the number of rows. +func New(fields []*types.FieldType, cap, maxChunkSize int) *Chunk { chk := new(Chunk) chk.columns = make([]*column, 0, len(fields)) - chk.numVirtualRows = 0 + chk.capacity = mathutil.Min(cap, maxChunkSize) for _, f := range fields { - chk.addColumnByFieldType(f, cap) + elemLen := getFixedLen(f) + if elemLen == varElemLen { + chk.columns = append(chk.columns, newVarLenColumn(chk.capacity, nil)) + } else { + chk.columns = append(chk.columns, newFixedLenColumn(elemLen, chk.capacity)) + } } + chk.numVirtualRows = 0 return chk } +// Renew creates a new Chunk based on an existing Chunk. The newly created Chunk +// has the same data schema with the old Chunk. The capacity of the new Chunk +// might be doubled based on the capacity of the old Chunk and the maxChunkSize. +// chk: old chunk(often used in previous call). +// maxChunkSize: the limit for the max number of rows. +func Renew(chk *Chunk, maxChunkSize int) *Chunk { + newCap := reCalcCapacity(chk, maxChunkSize) + newChk := new(Chunk) + newChk.columns = renewColumns(chk.columns, newCap) + newChk.numVirtualRows = 0 + newChk.capacity = newCap + return newChk +} + +// renewColumns creates the columns of a Chunk. The capacity of the newly +// created columns is equal to cap. +func renewColumns(oldCol []*column, cap int) []*column { + columns := make([]*column, 0, len(oldCol)) + for _, col := range oldCol { + if col.isFixed() { + columns = append(columns, newFixedLenColumn(len(col.elemBuf), cap)) + } else { + columns = append(columns, newVarLenColumn(cap, col)) + } + } + return columns +} + // MemoryUsage returns the total memory usage of a Chunk in B. // We ignore the size of column.length and column.nullCount // since they have little effect of the total memory usage. @@ -59,32 +103,29 @@ func (c *Chunk) MemoryUsage() (sum int64) { return } -// addFixedLenColumn adds a fixed length column with elemLen and initial data capacity. -func (c *Chunk) addFixedLenColumn(elemLen, initCap int) { - c.columns = append(c.columns, &column{ +// newFixedLenColumn creates a fixed length column with elemLen and initial data capacity. +func newFixedLenColumn(elemLen, cap int) *column { + return &column{ elemBuf: make([]byte, elemLen), - data: make([]byte, 0, initCap*elemLen), - nullBitmap: make([]byte, 0, initCap>>3), - }) -} - -// addVarLenColumn adds a variable length column with initial data capacity. -func (c *Chunk) addVarLenColumn(initCap int) { - c.columns = append(c.columns, &column{ - offsets: make([]int32, 1, initCap+1), - data: make([]byte, 0, initCap*4), - nullBitmap: make([]byte, 0, initCap>>3), - }) -} - -// addColumnByFieldType adds a column by field type. -func (c *Chunk) addColumnByFieldType(fieldTp *types.FieldType, initCap int) { - numFixedBytes := getFixedLen(fieldTp) - if numFixedBytes != -1 { - c.addFixedLenColumn(numFixedBytes, initCap) - return + data: make([]byte, 0, cap*elemLen), + nullBitmap: make([]byte, 0, cap>>3), + } +} + +// newVarLenColumn creates a variable length column with initial data capacity. +func newVarLenColumn(cap int, old *column) *column { + estimatedElemLen := 8 + // For varLenColumn (e.g. varchar), the accurate length of an element is unknown. + // Therefore, in the first executor.Next we use an experience value -- 8 (so it may make runtime.growslice) + // but in the following Next call we estimate the length as AVG x 1.125 elemLen of the previous call. + if old != nil && old.length != 0 { + estimatedElemLen = (len(old.data) + len(old.data)/8) / old.length + } + return &column{ + offsets: make([]int32, 1, cap+1), + data: make([]byte, 0, cap*estimatedElemLen), + nullBitmap: make([]byte, 0, cap>>3), } - c.addVarLenColumn(initCap) } // MakeRef makes column in "dstColIdx" reference to column in "srcColIdx". @@ -112,12 +153,43 @@ func (c *Chunk) SetNumVirtualRows(numVirtualRows int) { // Reset resets the chunk, so the memory it allocated can be reused. // Make sure all the data in the chunk is not used anymore before you reuse this chunk. func (c *Chunk) Reset() { - for _, c := range c.columns { - c.reset() + for _, col := range c.columns { + col.reset() } c.numVirtualRows = 0 } +// GrowAndReset resets the Chunk and doubles the capacity of the Chunk. +// The doubled capacity should not be larger than maxChunkSize. +// TODO: this method will be used in following PR. +func (c *Chunk) GrowAndReset(maxChunkSize int) { + if c.columns == nil { + return + } + newCap := reCalcCapacity(c, maxChunkSize) + if newCap <= c.capacity { + c.Reset() + return + } + c.capacity = newCap + c.columns = renewColumns(c.columns, newCap) + c.numVirtualRows = 0 +} + +// reCalcCapacity calculates the capacity for another Chunk based on the current +// Chunk. The new capacity is doubled only when the current Chunk is full. +func reCalcCapacity(c *Chunk, maxChunkSize int) int { + if c.NumRows() < c.capacity { + return c.capacity + } + return mathutil.Min(c.capacity*2, maxChunkSize) +} + +// Capacity returns the capacity of the Chunk. +func (c *Chunk) Capacity() int { + return c.capacity +} + // NumCols returns the number of columns in the chunk. func (c *Chunk) NumCols() int { return len(c.columns) diff --git a/util/chunk/chunk_test.go b/util/chunk/chunk_test.go index 7c0feea0e1..d6d7a1344a 100644 --- a/util/chunk/chunk_test.go +++ b/util/chunk/chunk_test.go @@ -14,8 +14,10 @@ package chunk import ( + "bytes" "fmt" "math" + "strconv" "testing" "time" "unsafe" @@ -248,9 +250,9 @@ func newChunk(elemLen ...int) *Chunk { chk := &Chunk{} for _, l := range elemLen { if l > 0 { - chk.addFixedLenColumn(l, 0) + chk.columns = append(chk.columns, newFixedLenColumn(l, 0)) } else { - chk.addVarLenColumn(0) + chk.columns = append(chk.columns, newVarLenColumn(0, nil)) } } return chk @@ -416,8 +418,8 @@ func (s *testChunkSuite) TestChunkMemoryUsage(c *check.C) { //cap(c.nullBitmap) + cap(c.offsets)*4 + cap(c.data) + cap(c.elemBuf) colUsage := make([]int, len(fieldTypes)) colUsage[0] = initCap>>3 + 0 + initCap*4 + 4 - colUsage[1] = initCap>>3 + (initCap+1)*4 + initCap*4 + 0 - colUsage[2] = initCap>>3 + (initCap+1)*4 + initCap*4 + 0 + colUsage[1] = initCap>>3 + (initCap+1)*4 + initCap*8 + 0 + colUsage[2] = initCap>>3 + (initCap+1)*4 + initCap*8 + 0 colUsage[3] = initCap>>3 + 0 + initCap*16 + 16 colUsage[4] = initCap>>3 + 0 + initCap*8 + 8 @@ -642,3 +644,91 @@ func BenchmarkChunkMemoryUsage(b *testing.B) { chk.MemoryUsage() } } + +type seqNumberGenerateExec struct { + seq int + genCountSize int +} + +func (x *seqNumberGenerateExec) Next(chk *Chunk, resize bool) { + if resize { + chk.GrowAndReset(1024) + } else { + chk.Reset() + } + for chk.NumRows() < chk.Capacity() { + x.seq++ + if x.seq > x.genCountSize { + break + } + chk.AppendInt64(0, 1) + } +} + +type benchChunkGrowCase struct { + tag string + reuse bool + newReset bool + cntPerCall int + initCap int + maxCap int +} + +func (b *benchChunkGrowCase) String() string { + var buff bytes.Buffer + if b.reuse { + buff.WriteString("renew,") + } else { + buff.WriteString("reset,") + } + buff.WriteString("cntPerCall:" + strconv.Itoa(b.cntPerCall) + ",") + buff.WriteString("cap from:" + strconv.Itoa(b.initCap) + " to " + strconv.Itoa(b.maxCap) + ",") + if b.tag != "" { + buff.WriteString("[" + b.tag + "]") + } + return buff.String() +} + +func BenchmarkChunkGrowSuit(b *testing.B) { + tests := []benchChunkGrowCase{ + {reuse: true, newReset: false, cntPerCall: 10000000, initCap: 1024, maxCap: 1024}, + {reuse: true, newReset: false, cntPerCall: 10000000, initCap: 32, maxCap: 32}, + {reuse: true, newReset: true, cntPerCall: 10000000, initCap: 32, maxCap: 1024, tag: "grow"}, + {reuse: false, newReset: false, cntPerCall: 10000000, initCap: 1024, maxCap: 1024}, + {reuse: false, newReset: false, cntPerCall: 10000000, initCap: 32, maxCap: 32}, + {reuse: false, newReset: true, cntPerCall: 10000000, initCap: 32, maxCap: 1024, tag: "grow"}, + {reuse: true, newReset: false, cntPerCall: 10, initCap: 1024, maxCap: 1024}, + {reuse: true, newReset: false, cntPerCall: 10, initCap: 32, maxCap: 32}, + {reuse: true, newReset: true, cntPerCall: 10, initCap: 32, maxCap: 1024, tag: "grow"}, + {reuse: false, newReset: false, cntPerCall: 10, initCap: 1024, maxCap: 1024}, + {reuse: false, newReset: false, cntPerCall: 10, initCap: 32, maxCap: 32}, + {reuse: false, newReset: true, cntPerCall: 10, initCap: 32, maxCap: 1024, tag: "grow"}, + } + for _, test := range tests { + b.Run(test.String(), benchmarkChunkGrow(test)) + } +} + +func benchmarkChunkGrow(t benchChunkGrowCase) func(b *testing.B) { + return func(b *testing.B) { + b.ReportAllocs() + chk := New([]*types.FieldType{{Tp: mysql.TypeLong}}, t.initCap, t.maxCap) + b.ResetTimer() + for i := 0; i < b.N; i++ { + e := &seqNumberGenerateExec{genCountSize: t.cntPerCall} + for { + e.Next(chk, t.newReset) + if chk.NumRows() == 0 { + break + } + if !t.reuse { + if t.newReset { + chk = Renew(chk, t.maxCap) + } else { + chk = New([]*types.FieldType{{Tp: mysql.TypeLong}}, t.initCap, t.maxCap) + } + } + } + } + } +} diff --git a/util/chunk/codec.go b/util/chunk/codec.go index a9a3df7587..20401f95f7 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -163,6 +163,9 @@ func (c *Codec) bytesToI32Slice(b []byte) (i32s []int32) { return i32s } +// varElemLen indicates this column is a variable length column. +const varElemLen = -1 + func getFixedLen(colType *types.FieldType) int { switch colType.Tp { case mysql.TypeFloat: @@ -175,7 +178,7 @@ func getFixedLen(colType *types.FieldType) int { case mysql.TypeNewDecimal: return types.MyDecimalStructSize default: - return -1 + return varElemLen } }