*: use chunk grow for simple executor (#7540)

This commit is contained in:
lysu
2018-09-27 09:07:51 +08:00
committed by Zhang Jian
parent 081920da93
commit 05b37de16e
38 changed files with 194 additions and 140 deletions

View File

@ -282,7 +282,7 @@ func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tbl table.PhysicalTable) (maxR
}
defer terror.Call(result.Close)
chk := chunk.NewChunkWithCapacity(getColumnsTypes(columns), 1)
chk := chunk.New(getColumnsTypes(columns), 1, 1)
err = result.Next(ctx, chk)
if err != nil {
return maxRowID, false, errors.Trace(err)

View File

@ -70,7 +70,7 @@ func (s *testSuite) TestSelectNormal(c *C) {
response.Fetch(context.TODO())
// Test Next.
chk := chunk.NewChunkWithCapacity(colTypes, 32)
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err = response.Next(context.TODO(), chk)
@ -122,7 +122,7 @@ func (s *testSuite) TestSelectStreaming(c *C) {
response.Fetch(context.TODO())
// Test Next.
chk := chunk.NewChunkWithCapacity(colTypes, 32)
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err = response.Next(context.TODO(), chk)
@ -259,7 +259,7 @@ func BenchmarkReadRowsData(b *testing.B) {
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.NewChunkWithCapacity(colTypes, numRows)
chk := chunk.New(colTypes, numRows, numRows)
buffer := populateBuffer()
@ -277,7 +277,7 @@ func BenchmarkDecodeToChunk(b *testing.B) {
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.NewChunkWithCapacity(colTypes, numRows)
chk := chunk.New(colTypes, numRows, numRows)
for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ {
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ {

View File

@ -112,7 +112,7 @@ func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
// NewChunk create a new chunk using NewChunk function in chunk package.
func (a *recordSet) NewChunk() *chunk.Chunk {
return a.executor.newChunk()
return a.executor.newFirstChunk()
}
func (a *recordSet) Close() error {
@ -270,7 +270,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
a.logSlowQuery(txnTS, err == nil)
}()
err = e.Next(ctx, e.newChunk())
err = e.Next(ctx, e.newFirstChunk())
if err != nil {
return nil, errors.Trace(err)
}

View File

@ -102,7 +102,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
FieldType: *colTypeForHandle,
})
e.srcChunk = e.newChunk()
e.srcChunk = e.newFirstChunk()
dagPB, err := e.buildDAGPB()
if err != nil {
return errors.Trace(err)
@ -197,7 +197,7 @@ func (e *RecoverIndexExec) Open(ctx context.Context) error {
return errors.Trace(err)
}
e.srcChunk = chunk.NewChunkWithCapacity(e.columnsTypes(), e.maxChunkSize)
e.srcChunk = chunk.New(e.columnsTypes(), e.initCap, e.maxChunkSize)
e.batchSize = 2048
e.recoverRows = make([]recoverRows, 0, e.batchSize)
e.idxValsBufs = make([][]types.Datum, e.batchSize)
@ -636,7 +636,7 @@ func (e *CleanupIndexExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.idxChunk = chunk.NewChunkWithCapacity(e.getIdxColTypes(), e.maxChunkSize)
e.idxChunk = chunk.New(e.getIdxColTypes(), e.initCap, e.maxChunkSize)
e.idxValues = make(map[int64][][]types.Datum, e.batchSize)
e.batchKeys = make([]kv.Key, 0, e.batchSize)
e.idxValsBufs = make([][]types.Datum, e.batchSize)

View File

@ -235,7 +235,7 @@ func (e *HashAggExec) initForUnparallelExec() {
e.partialResultMap = make(aggPartialResultMapper, 0)
e.groupKeyBuffer = make([]byte, 0, 8)
e.groupValDatums = make([]types.Datum, 0, len(e.groupKeyBuffer))
e.childResult = e.children[0].newChunk()
e.childResult = e.children[0].newFirstChunk()
}
func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
@ -270,12 +270,12 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
partialResultsMap: make(aggPartialResultMapper, 0),
groupByItems: e.GroupByItems,
groupValDatums: make([]types.Datum, 0, len(e.GroupByItems)),
chk: e.children[0].newChunk(),
chk: e.children[0].newFirstChunk(),
}
e.partialWorkers[i] = w
e.inputCh <- &HashAggInput{
chk: e.children[0].newChunk(),
chk: e.children[0].newFirstChunk(),
giveBackCh: w.inputCh,
}
}
@ -292,7 +292,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
rowBuffer: make([]types.Datum, 0, e.Schema().Len()),
mutableRow: chunk.MutRowFromTypes(e.retTypes()),
}
e.finalWorkers[i].finalResultHolderCh <- e.newChunk()
e.finalWorkers[i].finalResultHolderCh <- e.newFirstChunk()
}
}
@ -734,7 +734,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.childResult = e.children[0].newFirstChunk()
e.executed = false
e.isChildReturnEmpty = true
e.inputIter = chunk.NewIterator4Chunk(e.childResult)

View File

@ -22,6 +22,7 @@ import (
"sync"
"time"
"github.com/cznic/mathutil"
"github.com/cznic/sortutil"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/distsql"
@ -398,8 +399,10 @@ func (b *executorBuilder) buildChecksumTable(v *plannercore.ChecksumTable) Execu
}
func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor {
base := newBaseExecutor(b.ctx, nil, v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &DeallocateExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()),
baseExecutor: base,
Name: v.Name,
}
return e
@ -431,8 +434,11 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
b.err = errors.Trace(b.err)
return nil
}
n := int(mathutil.MinUint64(v.Count, uint64(b.ctx.GetSessionVars().MaxChunkSize)))
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec)
base.initCap = n
e := &LimitExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
baseExecutor: base,
begin: v.Offset,
end: v.Offset + v.Count,
}
@ -440,8 +446,10 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
}
func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &PrepareExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
is: b.is,
name: v.Name,
sqlText: v.SQLText,
@ -495,8 +503,10 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
case *ast.RevokeStmt:
return b.buildRevoke(s)
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &SimpleExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
Statement: v.Statement,
is: b.is,
}
@ -504,8 +514,10 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
}
func (b *executorBuilder) buildSet(v *plannercore.Set) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &SetExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
vars: v.VarAssigns,
}
return e
@ -523,6 +535,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
} else {
baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID())
}
baseExec.initCap = chunk.ZeroCapacity
ivs := &InsertValues{
baseExecutor: baseExec,
@ -614,12 +627,13 @@ func (b *executorBuilder) buildGrant(grant *ast.GrantStmt) Executor {
func (b *executorBuilder) buildRevoke(revoke *ast.RevokeStmt) Executor {
e := &RevokeExec{
ctx: b.ctx,
Privs: revoke.Privs,
ObjectType: revoke.ObjectType,
Level: revoke.Level,
Users: revoke.Users,
is: b.is,
baseExecutor: newBaseExecutor(b.ctx, nil, "RevokeStmt"),
ctx: b.ctx,
Privs: revoke.Privs,
ObjectType: revoke.ObjectType,
Level: revoke.Level,
Users: revoke.Users,
is: b.is,
}
return e
}
@ -1091,8 +1105,10 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu
b.err = errors.Errorf("buildTableDual failed, invalid row count for dual table: %v", v.RowCount)
return nil
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = v.RowCount
e := &TableDualExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: base,
numDualRows: v.RowCount,
}
// Init the startTS for later use.
@ -1209,9 +1225,10 @@ func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) Execu
b.err = errors.Trace(b.err)
return nil
}
e := &MaxOneRowExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec)
base.initCap = 2
base.maxChunkSize = 2
e := &MaxOneRowExec{baseExecutor: base}
return e
}
@ -1241,8 +1258,10 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
return nil
}
columns2Handle := buildColumns2Handle(v.SelectPlan.Schema(), tblID2table)
base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec)
base.initCap = chunk.ZeroCapacity
updateExec := &UpdateExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec),
baseExecutor: base,
SelectExec: selExec,
OrderedList: v.OrderedList,
tblID2table: tblID2table,
@ -1320,8 +1339,10 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
b.err = errors.Trace(b.err)
return nil
}
base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec)
base.initCap = chunk.ZeroCapacity
deleteExec := &DeleteExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec),
baseExecutor: base,
SelectExec: selExec,
Tables: v.Tables,
IsMultiTable: v.IsMultiTable,
@ -1542,7 +1563,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
innerKeyCols[i] = v.InnerJoinKeys[i].Index
}
e.innerCtx.keyCols = innerKeyCols
e.joinResult = e.newChunk()
e.joinResult = e.newFirstChunk()
metrics.ExecutorCounter.WithLabelValues("IndexLookUpJoin").Inc()
return e
}

View File

@ -104,8 +104,8 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn()
batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize
fields := e.children[0].retTypes()
chk := e.children[0].newFirstChunk()
for {
chk := e.children[0].newChunk()
iter := chunk.NewIterator4Chunk(chk)
err := e.children[0].Next(ctx, chk)
@ -133,6 +133,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
}
rowCount++
}
chk = chunk.Renew(chk, e.maxChunkSize)
}
return nil
@ -184,10 +185,9 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
colPosInfos := e.getColPosInfos(e.children[0].Schema())
tblRowMap := make(tableRowMapType)
fields := e.children[0].retTypes()
chk := e.children[0].newFirstChunk()
for {
chk := e.children[0].newChunk()
iter := chunk.NewIterator4Chunk(chk)
err := e.children[0].Next(ctx, chk)
if err != nil {
return errors.Trace(err)
@ -200,6 +200,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
joinedDatumRow := joinedChunkRow.GetDatumRow(fields)
e.composeTblRowMap(tblRowMap, colPosInfos, joinedDatumRow)
}
chk = chunk.Renew(chk, e.maxChunkSize)
}
return errors.Trace(e.removeRowsInTblRowMap(tblRowMap))

View File

@ -711,7 +711,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
handleCnt := len(task.handles)
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := tableReader.newChunk()
chk := tableReader.newFirstChunk()
err = tableReader.Next(ctx, chk)
if err != nil {
log.Error(err)

View File

@ -67,6 +67,7 @@ type baseExecutor struct {
ctx sessionctx.Context
id string
schema *expression.Schema
initCap int
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
@ -102,9 +103,9 @@ func (e *baseExecutor) Schema() *expression.Schema {
return e.schema
}
// newChunk creates a new chunk to buffer current executor's result.
func (e *baseExecutor) newChunk() *chunk.Chunk {
return chunk.NewChunkWithCapacity(e.retTypes(), e.maxChunkSize)
// newFirstChunk creates a new chunk to buffer current executor's result.
func (e *baseExecutor) newFirstChunk() *chunk.Chunk {
return chunk.New(e.retTypes(), e.initCap, e.maxChunkSize)
}
// retTypes returns all output column types.
@ -123,6 +124,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
ctx: ctx,
id: id,
schema: schema,
initCap: ctx.GetSessionVars().MaxChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
}
if schema != nil {
@ -152,7 +154,7 @@ type Executor interface {
Schema() *expression.Schema
retTypes() []*types.FieldType
newChunk() *chunk.Chunk
newFirstChunk() *chunk.Chunk
}
// CancelDDLJobsExec represents a cancel DDL jobs executor.
@ -166,11 +168,11 @@ type CancelDDLJobsExec struct {
// Next implements the Executor Next interface.
func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobIDs) {
return nil
}
numCurBatch := mathutil.Min(e.maxChunkSize, len(e.jobIDs)-e.cursor)
numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobIDs)-e.cursor)
for i := e.cursor; i < e.cursor+numCurBatch; i++ {
chk.AppendString(0, fmt.Sprintf("%d", e.jobIDs[i]))
if e.errs[i] != nil {
@ -259,14 +261,14 @@ func (e *ShowDDLJobQueriesExec) Open(ctx context.Context) error {
// Next implements the Executor Next interface.
func (e *ShowDDLJobQueriesExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobs) {
return nil
}
if len(e.jobIDs) >= len(e.jobs) {
return nil
}
numCurBatch := mathutil.Min(e.maxChunkSize, len(e.jobs)-e.cursor)
numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobs)-e.cursor)
for _, id := range e.jobIDs {
for i := e.cursor; i < e.cursor+numCurBatch; i++ {
if id == e.jobs[i].ID {
@ -302,11 +304,11 @@ func (e *ShowDDLJobsExec) Open(ctx context.Context) error {
// Next implements the Executor Next interface.
func (e *ShowDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobs) {
return nil
}
numCurBatch := mathutil.Min(e.maxChunkSize, len(e.jobs)-e.cursor)
numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobs)-e.cursor)
for i := e.cursor; i < e.cursor+numCurBatch; i++ {
chk.AppendInt64(0, e.jobs[i].ID)
chk.AppendString(1, getSchemaName(e.is, e.jobs[i].SchemaID))
@ -461,7 +463,7 @@ func (e *CheckIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if err != nil {
return errors.Trace(err)
}
chk = e.src.newChunk()
chk = e.src.newFirstChunk()
for {
err := e.src.Next(ctx, chk)
if err != nil {
@ -564,7 +566,7 @@ func (e *SelectLockExec) Open(ctx context.Context) error {
// Next implements the Executor Next interface.
func (e *SelectLockExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
err := e.children[0].Next(ctx, chk)
if err != nil {
return errors.Trace(err)
@ -659,7 +661,7 @@ func (e *LimitExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.childResult = e.children[0].newFirstChunk()
e.cursor = 0
e.meetFirstBatch = e.begin == 0
return nil
@ -691,8 +693,8 @@ func init() {
if err != nil {
return rows, errors.Trace(err)
}
chk := exec.newFirstChunk()
for {
chk := exec.newChunk()
err = exec.Next(ctx, chk)
if err != nil {
return rows, errors.Trace(err)
@ -705,6 +707,7 @@ func init() {
row := r.GetDatumRow(exec.retTypes())
rows = append(rows, row)
}
chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize)
}
}
}
@ -758,7 +761,7 @@ func (e *SelectionExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.childResult = e.children[0].newFirstChunk()
e.batched = expression.Vectorizable(e.filters)
if e.batched {
e.selected = make([]bool, 0, chunk.InitialCapacity)
@ -777,7 +780,7 @@ func (e *SelectionExec) Close() error {
// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
if !e.batched {
return errors.Trace(e.unBatchedNext(ctx, chk))
@ -788,7 +791,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if !e.selected[e.inputRow.Idx()] {
continue
}
if chk.NumRows() == e.maxChunkSize {
if chk.NumRows() >= chk.Capacity() {
return nil
}
chk.AppendRow(e.inputRow)
@ -852,7 +855,7 @@ type TableScanExec struct {
// Next implements the Executor Next interface.
func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
if e.isVirtualTable {
return errors.Trace(e.nextChunk4InfoSchema(ctx, chk))
}
@ -862,7 +865,7 @@ func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
mutableRow := chunk.MutRowFromTypes(e.retTypes())
for chk.NumRows() < e.maxChunkSize {
for chk.NumRows() < chk.Capacity() {
row, err := e.getRow(handle)
if err != nil {
return errors.Trace(err)
@ -875,9 +878,9 @@ func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
func (e *TableScanExec) nextChunk4InfoSchema(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
if e.virtualTableChunkList == nil {
e.virtualTableChunkList = chunk.NewList(e.retTypes(), e.maxChunkSize)
e.virtualTableChunkList = chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize)
columns := make([]*table.Column, e.schema.Len())
for i, colInfo := range e.columns {
columns[i] = table.ToColumn(colInfo)
@ -971,7 +974,7 @@ func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return errors.New("subquery returns more than 1 row")
}
childChunk := e.children[0].newChunk()
childChunk := e.children[0].newFirstChunk()
err = e.children[0].Next(ctx, childChunk)
if childChunk.NumRows() != 0 {
return errors.New("subquery returns more than 1 row")
@ -1033,7 +1036,7 @@ func (e *UnionExec) Open(ctx context.Context) error {
return errors.Trace(err)
}
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newChunk())
e.childrenResults = append(e.childrenResults, child.newFirstChunk())
}
e.stopFetchData.Store(false)
e.initialized = false
@ -1094,7 +1097,7 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) {
// Next implements the Executor Next interface.
func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
if !e.initialized {
e.initialize(ctx)
e.initialized = true

View File

@ -89,7 +89,7 @@ func (s *testExecSuite) TestShowProcessList(c *C) {
err := e.Open(ctx)
c.Assert(err, IsNil)
chk := e.newChunk()
chk := e.newFirstChunk()
it := chunk.NewIterator4Chunk(chk)
// Run test and check results.
for _, p := range ps {

View File

@ -35,12 +35,12 @@ func (e *ExplainExec) Close() error {
// Next implements the Executor Next interface.
func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.rows) {
return nil
}
numCurRows := mathutil.Min(e.maxChunkSize, len(e.rows)-e.cursor)
numCurRows := mathutil.Min(chk.Capacity(), len(e.rows)-e.cursor)
for i := e.cursor; i < e.cursor+numCurRows; i++ {
for j := range e.rows[i] {
chk.AppendString(j, e.rows[i][j])

View File

@ -317,11 +317,11 @@ func (ow *outerWorker) pushToChan(ctx context.Context, task *lookUpJoinTask, dst
// buildTask builds a lookUpJoinTask and read outer rows.
// When err is not nil, task must not be nil to send the error to the main thread via task.
func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
ow.executor.newChunk()
ow.executor.newFirstChunk()
task := &lookUpJoinTask{
doneCh: make(chan error, 1),
outerResult: ow.executor.newChunk(),
outerResult: ow.executor.newFirstChunk(),
encodedLookUpKeys: chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, ow.ctx.GetSessionVars().MaxChunkSize),
lookupMap: mvmap.NewMVMap(),
}
@ -511,7 +511,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
return errors.Trace(err)
}
defer terror.Call(innerExec.Close)
innerResult := chunk.NewList(innerExec.retTypes(), iw.ctx.GetSessionVars().MaxChunkSize)
innerResult := chunk.NewList(innerExec.retTypes(), iw.ctx.GetSessionVars().MaxChunkSize, iw.ctx.GetSessionVars().MaxChunkSize)
innerResult.GetMemTracker().SetLabel("inner result")
innerResult.GetMemTracker().AttachTo(task.memTracker)
for {
@ -523,7 +523,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
break
}
innerResult.Add(iw.executorChk)
iw.executorChk = innerExec.newChunk()
iw.executorChk = innerExec.newFirstChunk()
}
task.innerResult = innerResult
return nil

View File

@ -250,9 +250,9 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C
// process `insert|replace into ... select ... from ...`
selectExec := e.children[0]
fields := selectExec.retTypes()
chk := selectExec.newChunk()
chk := selectExec.newFirstChunk()
iter := chunk.NewIterator4Chunk(chk)
rows := make([][]types.Datum, 0, e.ctx.GetSessionVars().MaxChunkSize)
rows := make([][]types.Datum, 0, chk.Capacity())
sessVars := e.ctx.GetSessionVars()
batchInsert := sessVars.BatchInsert && !sessVars.InTxn()

View File

@ -250,7 +250,7 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) {
// and append them to e.innerResult.
func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) {
defer close(chkCh)
e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize)
e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize)
e.innerResult.GetMemTracker().AttachTo(e.memTracker)
e.innerResult.GetMemTracker().SetLabel("innerResult")
var err error
@ -262,7 +262,7 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C
if e.finished.Load().(bool) {
return
}
chk := e.children[e.innerIdx].newChunk()
chk := e.children[e.innerIdx].newFirstChunk()
err = e.innerExec.Next(ctx, chk)
if err != nil {
e.innerFinished <- errors.Trace(err)
@ -289,7 +289,7 @@ func (e *HashJoinExec) initializeForProbe() {
e.outerChkResourceCh = make(chan *outerChkResource, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.outerChkResourceCh <- &outerChkResource{
chk: e.outerExec.newChunk(),
chk: e.outerExec.newFirstChunk(),
dest: e.outerResultChs[i],
}
}
@ -299,7 +299,7 @@ func (e *HashJoinExec) initializeForProbe() {
e.joinChkResourceCh = make([]chan *chunk.Chunk, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1)
e.joinChkResourceCh[i] <- e.newChunk()
e.joinChkResourceCh[i] <- e.newFirstChunk()
}
// e.joinResultCh is for transmitting the join result chunks to the main thread.
@ -620,9 +620,9 @@ func (e *NestedLoopApplyExec) Open(ctx context.Context) error {
}
e.cursor = 0
e.innerRows = e.innerRows[:0]
e.outerChunk = e.outerExec.newChunk()
e.innerChunk = e.innerExec.newChunk()
e.innerList = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize)
e.outerChunk = e.outerExec.newFirstChunk()
e.innerChunk = e.innerExec.newFirstChunk()
e.innerList = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize)
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaNestedLoopApply)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

View File

@ -50,7 +50,7 @@ func NewLoadDataInfo(ctx sessionctx.Context, row []types.Datum, tbl table.Table,
// Next implements the Executor Next interface.
func (e *LoadDataExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
// TODO: support load data without local field.
if !e.IsLocal {
return errors.New("Load Data: don't support load data without local field")

View File

@ -51,7 +51,7 @@ const LoadStatsVarKey loadStatsVarKeyType = 0
// Next implements the Executor Next interface.
func (e *LoadStatsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
if len(e.info.Path) == 0 {
return errors.New("Load Stats: file path is empty")
}

View File

@ -179,7 +179,7 @@ func (t *mergeJoinInnerTable) reallocReaderResult() {
// Create a new Chunk and append it to "resourceQueue" if there is no more
// available chunk in "resourceQueue".
if len(t.resourceQueue) == 0 {
newChunk := t.reader.newChunk()
newChunk := t.reader.newFirstChunk()
t.memTracker.Consume(newChunk.MemoryUsage())
t.resourceQueue = append(t.resourceQueue, newChunk)
}
@ -214,7 +214,7 @@ func (e *MergeJoinExec) Open(ctx context.Context) error {
e.childrenResults = make([]*chunk.Chunk, 0, len(e.children))
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newChunk())
e.childrenResults = append(e.childrenResults, child.newFirstChunk())
}
e.innerTable.memTracker = memory.NewTracker("innerTable", -1)

View File

@ -29,7 +29,7 @@ type MockExec struct {
func (m *MockExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
colTypes := m.retTypes()
for ; m.curRowIdx < len(m.Rows) && chk.NumRows() < m.maxChunkSize; m.curRowIdx++ {
for ; m.curRowIdx < len(m.Rows) && chk.NumRows() < chk.Capacity(); m.curRowIdx++ {
curRow := m.Rows[m.curRowIdx]
for i := 0; i < curRow.Len(); i++ {
curDatum := curRow.ToRow().GetDatum(i, colTypes[i])
@ -91,10 +91,10 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) {
innerFilter: []expression.Expression{innerFilter},
joiner: joiner,
}
join.innerList = chunk.NewList(innerExec.retTypes(), innerExec.maxChunkSize)
join.innerChunk = innerExec.newChunk()
join.outerChunk = outerExec.newChunk()
joinChk := join.newChunk()
join.innerList = chunk.NewList(innerExec.retTypes(), innerExec.initCap, innerExec.maxChunkSize)
join.innerChunk = innerExec.newFirstChunk()
join.outerChunk = outerExec.newFirstChunk()
joinChk := join.newFirstChunk()
it := chunk.NewIterator4Chunk(joinChk)
for rowIdx := 1; ; {
err := join.Next(ctx, joinChk)

View File

@ -195,6 +195,6 @@ func (e *PointGetExecutor) retTypes() []*types.FieldType {
return e.tps
}
func (e *PointGetExecutor) newChunk() *chunk.Chunk {
return chunk.NewChunkWithCapacity(e.retTypes(), 1)
func (e *PointGetExecutor) newFirstChunk() *chunk.Chunk {
return chunk.New(e.retTypes(), 1, 1)
}

View File

@ -89,8 +89,10 @@ type PrepareExec struct {
// NewPrepareExec creates a new PrepareExec.
func NewPrepareExec(ctx sessionctx.Context, is infoschema.InfoSchema, sqlTxt string) *PrepareExec {
base := newBaseExecutor(ctx, nil, "PrepareStmt")
base.initCap = chunk.ZeroCapacity
return &PrepareExec{
baseExecutor: newBaseExecutor(ctx, nil, "PrepareStmt"),
baseExecutor: base,
is: is,
sqlText: sqlTxt,
}

View File

@ -75,7 +75,7 @@ func (e *ProjectionExec) Open(ctx context.Context) error {
}
if e.isUnparallelExec() {
e.childResult = e.children[0].newChunk()
e.childResult = e.children[0].newFirstChunk()
}
return nil
@ -139,7 +139,7 @@ func (e *ProjectionExec) Open(ctx context.Context) error {
// +------------------------------+ +----------------------+
//
func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
if e.isUnparallelExec() {
return errors.Trace(e.unParallelExecute(ctx, chk))
}
@ -207,11 +207,11 @@ func (e *ProjectionExec) prepare(ctx context.Context) {
})
e.fetcher.inputCh <- &projectionInput{
chk: e.children[0].newChunk(),
chk: e.children[0].newFirstChunk(),
targetWorker: e.workers[i],
}
e.fetcher.outputCh <- &projectionOutput{
chk: e.newChunk(),
chk: e.newFirstChunk(),
done: make(chan error, 1),
}
}

View File

@ -64,9 +64,9 @@ type ShowExec struct {
// Next implements the Executor Next interface.
func (e *ShowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(e.maxChunkSize)
if e.result == nil {
e.result = e.newChunk()
e.result = e.newFirstChunk()
err := e.fetchAll()
if err != nil {
return errors.Trace(err)
@ -87,7 +87,7 @@ func (e *ShowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.cursor >= e.result.NumRows() {
return nil
}
numCurBatch := mathutil.Min(e.maxChunkSize, e.result.NumRows()-e.cursor)
numCurBatch := mathutil.Min(chk.Capacity(), e.result.NumRows()-e.cursor)
chk.Append(e.result, e.cursor, e.cursor+numCurBatch)
e.cursor += numCurBatch
return nil

View File

@ -107,11 +107,11 @@ func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *SortExec) fetchRowChunks(ctx context.Context) error {
fields := e.retTypes()
e.rowChunks = chunk.NewList(fields, e.maxChunkSize)
e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize)
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.rowChunks.GetMemTracker().SetLabel("rowChunks")
for {
chk := e.children[0].newChunk()
chk := e.children[0].newFirstChunk()
err := e.children[0].Next(ctx, chk)
if err != nil {
return errors.Trace(err)
@ -171,7 +171,7 @@ func (e *SortExec) buildKeyExprsAndTypes() {
}
func (e *SortExec) buildKeyChunks() error {
e.keyChunks = chunk.NewList(e.keyTypes, e.maxChunkSize)
e.keyChunks = chunk.NewList(e.keyTypes, e.initCap, e.maxChunkSize)
e.keyChunks.GetMemTracker().SetLabel("keyChunks")
e.keyChunks.GetMemTracker().AttachTo(e.memTracker)
@ -323,11 +323,11 @@ func (e *TopNExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error {
e.chkHeap = &topNChunkHeap{e}
e.rowChunks = chunk.NewList(e.retTypes(), e.maxChunkSize)
e.rowChunks = chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize)
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.rowChunks.GetMemTracker().SetLabel("rowChunks")
for e.rowChunks.Len() < e.totalLimit {
srcChk := e.children[0].newChunk()
srcChk := e.children[0].newFirstChunk()
err := e.children[0].Next(ctx, srcChk)
if err != nil {
return errors.Trace(err)
@ -362,7 +362,7 @@ func (e *TopNExec) executeTopN(ctx context.Context) error {
if e.keyChunks != nil {
childKeyChk = chunk.NewChunkWithCapacity(e.keyTypes, e.maxChunkSize)
}
childRowChk := e.children[0].newChunk()
childRowChk := e.children[0].newFirstChunk()
for {
err := e.children[0].Next(ctx, childRowChk)
if err != nil {
@ -425,7 +425,7 @@ func (e *TopNExec) processChildChk(childRowChk, childKeyChk *chunk.Chunk) error
// but we want descending top N, then we will keep all data in memory.
// But if data is distributed randomly, this function will be called log(n) times.
func (e *TopNExec) doCompaction() error {
newRowChunks := chunk.NewList(e.retTypes(), e.maxChunkSize)
newRowChunks := chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize)
newRowPtrs := make([]chunk.RowPtr, 0, e.rowChunks.Len())
for _, rowPtr := range e.rowPtrs {
newRowPtr := newRowChunks.AppendRow(e.rowChunks.GetRow(rowPtr))
@ -436,7 +436,7 @@ func (e *TopNExec) doCompaction() error {
e.rowChunks = newRowChunks
if e.keyChunks != nil {
newKeyChunks := chunk.NewList(e.keyTypes, e.maxChunkSize)
newKeyChunks := chunk.NewList(e.keyTypes, e.initCap, e.maxChunkSize)
for _, rowPtr := range e.rowPtrs {
newKeyChunks.AppendRow(e.keyChunks.GetRow(rowPtr))
}

View File

@ -72,7 +72,7 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if err != nil {
return errors.Trace(err)
}
stmtExecChk := stmtExec.newChunk()
stmtExecChk := stmtExec.newFirstChunk()
// store span into context
ctx = opentracing.ContextWithSpan(ctx, e.rootTrace)

View File

@ -119,15 +119,15 @@ func (us *UnionScanExec) Open(ctx context.Context) error {
if err := us.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
us.snapshotChunkBuffer = us.newChunk()
us.snapshotChunkBuffer = us.newFirstChunk()
return nil
}
// Next implements the Executor Next interface.
func (us *UnionScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
chk.GrowAndReset(us.maxChunkSize)
mutableRow := chunk.MutRowFromTypes(us.retTypes())
for i, batchSize := 0, us.ctx.GetSessionVars().MaxChunkSize; i < batchSize; i++ {
for i, batchSize := 0, chk.Capacity(); i < batchSize; i++ {
row, err := us.getOneRow(ctx)
if err != nil {
return errors.Trace(err)

View File

@ -140,8 +140,8 @@ func (e *UpdateExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
fields := e.children[0].retTypes()
globalRowIdx := 0
chk := e.children[0].newFirstChunk()
for {
chk := e.children[0].newChunk()
err := e.children[0].Next(ctx, chk)
if err != nil {
return errors.Trace(err)
@ -162,6 +162,7 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
e.newRowsData = append(e.newRowsData, newRow)
globalRowIdx++
}
chk = chunk.Renew(chk, e.maxChunkSize)
}
return nil
}

View File

@ -277,11 +277,8 @@ func (p *MySQLPrivilege) loadTable(sctx sessionctx.Context, sql string,
defer terror.Call(rs.Close)
fs := rs.Fields()
chk := rs.NewChunk()
for {
// NOTE: decodeTableRow decodes data from a chunk Row, that is a shallow copy.
// The result will reference memory in the chunk, so the chunk must not be reused
// here, otherwise some werid bug will happen!
chk := rs.NewChunk()
err = rs.Next(context.TODO(), chk)
if err != nil {
return errors.Trace(err)
@ -296,6 +293,10 @@ func (p *MySQLPrivilege) loadTable(sctx sessionctx.Context, sql string,
return errors.Trace(err)
}
}
// NOTE: decodeTableRow decodes data from a chunk Row, that is a shallow copy.
// The result will reference memory in the chunk, so the chunk must not be reused
// here, otherwise some werid bug will happen!
chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize)
}
}

View File

@ -1033,8 +1033,8 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet
fetchedRows := rs.GetFetchedRows()
// if fetchedRows is not enough, getting data from recordSet.
chk := rs.NewChunk()
for len(fetchedRows) < fetchSize {
chk := rs.NewChunk()
// Here server.tidbResultSet implements Next method.
err := rs.Next(ctx, chk)
if err != nil {
@ -1048,6 +1048,7 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet
for i := 0; i < rowCount; i++ {
fetchedRows = append(fetchedRows, chk.GetRow(i))
}
chk = chunk.Renew(chk, cc.ctx.GetSessionVars().MaxChunkSize)
}
// tell the client COM_STMT_FETCH has finished by setting proper serverStatus,

View File

@ -17,6 +17,7 @@ import (
"crypto/tls"
"fmt"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/auth"
"github.com/pingcap/tidb/util/chunk"
@ -82,6 +83,9 @@ type QueryCtx interface {
// ShowProcess shows the information about the session.
ShowProcess() util.ProcessInfo
// GetSessionVars return SessionVars.
GetSessionVars() *variable.SessionVars
SetSessionManager(util.SessionManager)
}

View File

@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
@ -324,6 +325,11 @@ func (tc *TiDBContext) ShowProcess() util.ProcessInfo {
return tc.session.ShowProcess()
}
// GetSessionVars return SessionVars.
func (tc *TiDBContext) GetSessionVars() *variable.SessionVars {
return tc.session.GetSessionVars()
}
type tidbResultSet struct {
recordSet ast.RecordSet
columns []*ColumnInfo

View File

@ -555,7 +555,7 @@ func (s *session) ExecRestrictedSQL(sctx sessionctx.Context, sql string) ([]chun
)
// Execute all recordset, take out the first one as result.
for i, rs := range recordSets {
tmp, err := drainRecordSet(ctx, rs)
tmp, err := drainRecordSet(ctx, se, rs)
if err != nil {
return nil, nil, errors.Trace(err)
}
@ -604,10 +604,10 @@ func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.R
}
}
func drainRecordSet(ctx context.Context, rs ast.RecordSet) ([]chunk.Row, error) {
func drainRecordSet(ctx context.Context, se *session, rs ast.RecordSet) ([]chunk.Row, error) {
var rows []chunk.Row
chk := rs.NewChunk()
for {
chk := rs.NewChunk()
err := rs.Next(ctx, chk)
if err != nil || chk.NumRows() == 0 {
return rows, errors.Trace(err)
@ -616,6 +616,7 @@ func drainRecordSet(ctx context.Context, rs ast.RecordSet) ([]chunk.Row, error)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
rows = append(rows, r)
}
chk = chunk.Renew(chk, se.sessionVars.MaxChunkSize)
}
}

View File

@ -199,9 +199,9 @@ func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet
return nil, nil
}
var rows []chunk.Row
chk := rs.NewChunk()
for {
// Since we collect all the rows, we can not reuse the chunk.
chk := rs.NewChunk()
iter := chunk.NewIterator4Chunk(chk)
err := rs.Next(ctx, chk)
@ -215,6 +215,7 @@ func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
rows = append(rows, row)
}
chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize)
}
return rows, nil
}

View File

@ -38,6 +38,7 @@ type Chunk struct {
// Capacity constants.
const (
InitialCapacity = 32
ZeroCapacity = 0
)
// NewChunkWithCapacity creates a new chunk with field types and capacity.
@ -70,8 +71,11 @@ func New(fields []*types.FieldType, cap, maxChunkSize int) *Chunk {
// chk: old chunk(often used in previous call).
// maxChunkSize: the limit for the max number of rows.
func Renew(chk *Chunk, maxChunkSize int) *Chunk {
newCap := reCalcCapacity(chk, maxChunkSize)
newChk := new(Chunk)
if chk.columns == nil {
return newChk
}
newCap := reCalcCapacity(chk, maxChunkSize)
newChk.columns = renewColumns(chk.columns, newCap)
newChk.numVirtualRows = 0
newChk.capacity = newCap
@ -153,6 +157,9 @@ func (c *Chunk) SetNumVirtualRows(numVirtualRows int) {
// Reset resets the chunk, so the memory it allocated can be reused.
// Make sure all the data in the chunk is not used anymore before you reuse this chunk.
func (c *Chunk) Reset() {
if c.columns == nil {
return
}
for _, col := range c.columns {
col.reset()
}

View File

@ -21,7 +21,7 @@ import (
func (s *testChunkSuite) TestIterator(c *check.C) {
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
chk := NewChunkWithCapacity(fields, 32)
chk := New(fields, 32, 1024)
n := 10
var expected []int64
for i := 0; i < n; i++ {
@ -29,8 +29,8 @@ func (s *testChunkSuite) TestIterator(c *check.C) {
expected = append(expected, int64(i))
}
var rows []Row
li := NewList(fields, 1)
li2 := NewList(fields, 5)
li := NewList(fields, 1, 2)
li2 := NewList(fields, 8, 16)
var ptrs []RowPtr
var ptrs2 []RowPtr
for i := 0; i < n; i++ {

View File

@ -21,11 +21,12 @@ import (
// List holds a slice of chunks, use to append rows with max chunk size properly handled.
type List struct {
fieldTypes []*types.FieldType
maxChunkSize int
length int
chunks []*Chunk
freelist []*Chunk
fieldTypes []*types.FieldType
initChunkSize int
maxChunkSize int
length int
chunks []*Chunk
freelist []*Chunk
memTracker *memory.Tracker // track memory usage.
consumedIdx int // chunk index in "chunks", has been consumed.
@ -38,13 +39,14 @@ type RowPtr struct {
RowIdx uint32
}
// NewList creates a new List with field types and max chunk size.
func NewList(fieldTypes []*types.FieldType, maxChunkSize int) *List {
// NewList creates a new List with field types, init chunk size and max chunk size.
func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List {
l := &List{
fieldTypes: fieldTypes,
maxChunkSize: maxChunkSize,
memTracker: memory.NewTracker("chunk.List", -1),
consumedIdx: -1,
fieldTypes: fieldTypes,
initChunkSize: initChunkSize,
maxChunkSize: maxChunkSize,
memTracker: memory.NewTracker("chunk.List", -1),
consumedIdx: -1,
}
return l
}
@ -72,7 +74,7 @@ func (l *List) GetChunk(chkIdx int) *Chunk {
// AppendRow appends a row to the List, the row is copied to the List.
func (l *List) AppendRow(row Row) RowPtr {
chkIdx := len(l.chunks) - 1
if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.maxChunkSize || chkIdx == l.consumedIdx {
if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.chunks[chkIdx].Capacity() || chkIdx == l.consumedIdx {
newChk := l.allocChunk()
l.chunks = append(l.chunks, newChk)
if chkIdx != l.consumedIdx {
@ -115,7 +117,10 @@ func (l *List) allocChunk() (chk *Chunk) {
chk.Reset()
return
}
return NewChunkWithCapacity(l.fieldTypes, l.maxChunkSize)
if len(l.chunks) > 0 {
return Renew(l.chunks[len(l.chunks)-1], l.maxChunkSize)
}
return New(l.fieldTypes, l.initChunkSize, l.maxChunkSize)
}
// GetRow gets a Row from the list by RowPtr.

View File

@ -28,7 +28,7 @@ func (s *testChunkSuite) TestList(c *check.C) {
fields := []*types.FieldType{
types.NewFieldType(mysql.TypeLonglong),
}
l := NewList(fields, 2)
l := NewList(fields, 2, 2)
srcChunk := NewChunkWithCapacity(fields, 32)
srcChunk.AppendInt64(0, 1)
srcRow := srcChunk.GetRow(0)
@ -100,7 +100,7 @@ func (s *testChunkSuite) TestListMemoryUsage(c *check.C) {
srcChk.AppendTime(3, timeObj)
srcChk.AppendDuration(4, durationObj)
list := NewList(fieldTypes, maxChunkSize)
list := NewList(fieldTypes, maxChunkSize, maxChunkSize*2)
c.Assert(list.GetMemTracker().BytesConsumed(), check.Equals, int64(0))
list.AppendRow(srcChk.GetRow(0))
@ -131,7 +131,7 @@ func BenchmarkListMemoryUsage(b *testing.B) {
row := chk.GetRow(0)
initCap := 50
list := NewList(fieldTypes, 2)
list := NewList(fieldTypes, 2, 8)
for i := 0; i < initCap; i++ {
list.AppendRow(row)
}

View File

@ -82,7 +82,7 @@ func BenchmarkDecodeOneToChunk(b *testing.B) {
raw = EncodeBytes(raw, str.GetBytes())
intType := types.NewFieldType(mysql.TypeLonglong)
b.ResetTimer()
decoder := NewDecoder(chunk.NewChunkWithCapacity([]*types.FieldType{intType}, 32), nil)
decoder := NewDecoder(chunk.New([]*types.FieldType{intType}, 32, 32), nil)
for i := 0; i < b.N; i++ {
decoder.DecodeOne(raw, 0, intType)
}

View File

@ -935,7 +935,7 @@ func (s *testCodecSuite) TestDecodeOneToChunk(c *C) {
datums = append(datums, types.NewDatum(t.value))
}
rowCount := 3
decoder := NewDecoder(chunk.NewChunkWithCapacity(tps, 32), time.Local)
decoder := NewDecoder(chunk.New(tps, 32, 32), time.Local)
for rowIdx := 0; rowIdx < rowCount; rowIdx++ {
encoded, err := EncodeValue(sc, nil, datums...)
c.Assert(err, IsNil)