From 9f2ff293ddb3f22bdbdf9ffeba1760e2e7e2d036 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Tue, 16 Jan 2018 15:15:25 +0800 Subject: [PATCH] *: make MVMap.Get() be able to reuse the value buffer pre-allocated (#5644) --- executor/aggregate.go | 13 +++++++++---- executor/index_lookup_join.go | 6 ++++-- executor/join.go | 8 ++++++-- expression/aggregation/util.go | 13 +++++++------ util/mvmap/mvmap.go | 5 ++--- util/mvmap/mvmap_test.go | 8 +++++--- 6 files changed, 33 insertions(+), 20 deletions(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index d3c8bb9715..416778a974 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -43,6 +43,8 @@ type HashAggExec struct { mutableRow chunk.MutRow rowBuffer []types.Datum GroupByItems []expression.Expression + groupKey []byte + groupVals [][]byte } // Close implements the Executor Close interface. @@ -67,6 +69,8 @@ func (e *HashAggExec) Open(goCtx goctx.Context) error { e.aggCtxsMap = make(aggCtxsMapper, 0) e.mutableRow = chunk.MutRowFromTypes(e.retTypes()) e.rowBuffer = make([]types.Datum, 0, e.Schema().Len()) + e.groupKey = make([]byte, 0, 8) + e.groupVals = make([][]byte, 0, 8) return nil } @@ -122,7 +126,7 @@ func (e *HashAggExec) execute(goCtx goctx.Context) (err error) { if err != nil { return errors.Trace(err) } - if e.groupMap.Get(groupKey) == nil { + if len(e.groupMap.Get(groupKey, e.groupVals[:0])) == 0 { e.groupMap.Put(groupKey, []byte{}) } aggCtxs := e.getContexts(groupKey) @@ -183,11 +187,12 @@ func (e *HashAggExec) getGroupKey(row types.Row) ([]byte, error) { } vals = append(vals, v) } - bs, err := codec.EncodeValue(e.sc, []byte{}, vals...) + var err error + e.groupKey, err = codec.EncodeValue(e.sc, e.groupKey[:0], vals...) if err != nil { return nil, errors.Trace(err) } - return bs, nil + return e.groupKey, nil } // innerNext fetches a single row from src and update each aggregate function. @@ -205,7 +210,7 @@ func (e *HashAggExec) innerNext(goCtx goctx.Context) (ret bool, err error) { if err != nil { return false, errors.Trace(err) } - if e.groupMap.Get(groupKey) == nil { + if len(e.groupMap.Get(groupKey, e.groupVals[:0])) == 0 { e.groupMap.Put(groupKey, []byte{}) } aggCtxs := e.getContexts(groupKey) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 7abede3aee..962bb1d8e4 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -60,6 +60,7 @@ type IndexLookUpJoin struct { indexRanges []*ranger.NewRange keyOff2IdxOff []int + innerPtrBytes [][]byte } type outerCtx struct { @@ -121,6 +122,7 @@ func (e *IndexLookUpJoin) Open(goCtx goctx.Context) error { if err != nil { return errors.Trace(err) } + e.innerPtrBytes = make([][]byte, 0, 8) e.startWorkers(goCtx) return nil } @@ -275,9 +277,9 @@ func (e *IndexLookUpJoin) getFinishedTask(goCtx goctx.Context) (*lookUpJoinTask, func (e *IndexLookUpJoin) lookUpMatchedInners(task *lookUpJoinTask, rowIdx int) { outerKey := task.encodedLookUpKeys.GetRow(rowIdx).GetBytes(0) - innerPtrBytes := task.lookupMap.Get(outerKey) + e.innerPtrBytes = task.lookupMap.Get(outerKey, e.innerPtrBytes[:0]) task.matchedInners = task.matchedInners[:0] - for _, b := range innerPtrBytes { + for _, b := range e.innerPtrBytes { ptr := *(*chunk.RowPtr)(unsafe.Pointer(&b[0])) matchedInner := task.innerResult.GetRow(ptr) task.matchedInners = append(task.matchedInners, matchedInner) diff --git a/executor/join.go b/executor/join.go index 87528c0789..08c2d13df5 100644 --- a/executor/join.go +++ b/executor/join.go @@ -75,6 +75,7 @@ type HashJoinExec struct { outerResultChs []chan *chunk.Chunk joinChkResourceCh []chan *chunk.Chunk joinResultCh chan *hashjoinWorkerResult + hashTableValBufs [][][]byte } // outerChkResource stores the result of the join outer fetch worker, @@ -148,6 +149,7 @@ func (e *HashJoinExec) Open(goCtx goctx.Context) error { e.prepared = false + e.hashTableValBufs = make([][][]byte, e.concurrency) e.hashJoinBuffers = make([]*hashJoinBuffer, 0, e.concurrency) for i := 0; i < e.concurrency; i++ { buffer := &hashJoinBuffer{ @@ -633,7 +635,8 @@ func (e *HashJoinExec) joinOuterRow(workerID int, outerRow Row, resultBuffer *ex return true } - values := e.hashTable.Get(joinKey) + e.hashTableValBufs[workerID] = e.hashTable.Get(joinKey, e.hashTableValBufs[workerID][:0]) + values := e.hashTableValBufs[workerID] if len(values) == 0 { resultBuffer.rows, resultBuffer.err = e.resultGenerators[0].emit(outerRow, nil, resultBuffer.rows) resultBuffer.err = errors.Trace(resultBuffer.err) @@ -672,7 +675,8 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID int, outerRow chunk.Ro } return nil } - innerPtrs := e.hashTable.Get(joinKey) + e.hashTableValBufs[workerID] = e.hashTable.Get(joinKey, e.hashTableValBufs[workerID][:0]) + innerPtrs := e.hashTableValBufs[workerID] if len(innerPtrs) == 0 { err = e.resultGenerators[workerID].emitToChunk(outerRow, nil, chk) if err != nil { diff --git a/expression/aggregation/util.go b/expression/aggregation/util.go index dc395616da..10fed4a320 100644 --- a/expression/aggregation/util.go +++ b/expression/aggregation/util.go @@ -11,7 +11,8 @@ import ( // distinctChecker stores existing keys and checks if given data is distinct. type distinctChecker struct { existingKeys *mvmap.MVMap - buf []byte + key []byte + vals [][]byte } // createDistinctChecker creates a new distinct checker. @@ -23,17 +24,17 @@ func createDistinctChecker() *distinctChecker { // Check checks if values is distinct. func (d *distinctChecker) Check(sc *stmtctx.StatementContext, values []types.Datum) (bool, error) { - d.buf = d.buf[:0] + d.key = d.key[:0] var err error - d.buf, err = codec.EncodeValue(sc, d.buf, values...) + d.key, err = codec.EncodeValue(sc, d.key, values...) if err != nil { return false, errors.Trace(err) } - v := d.existingKeys.Get(d.buf) - if v != nil { + d.vals = d.existingKeys.Get(d.key, d.vals[:0]) + if len(d.vals) > 0 { return false, nil } - d.existingKeys.Put(d.buf, []byte{}) + d.existingKeys.Put(d.key, []byte{}) return true, nil } diff --git a/util/mvmap/mvmap.go b/util/mvmap/mvmap.go index 1239c85a1a..ef2535ece7 100644 --- a/util/mvmap/mvmap.go +++ b/util/mvmap/mvmap.go @@ -150,9 +150,8 @@ func (m *MVMap) Put(key, value []byte) { m.length++ } -// Get gets the values of the key. -func (m *MVMap) Get(key []byte) [][]byte { - var values [][]byte +// Get gets the values of the "key" and appends them to "values". +func (m *MVMap) Get(key []byte, values [][]byte) [][]byte { hashKey := fnvHash64(key) entryAddr := m.hashTable[hashKey] for entryAddr != nullEntryAddr { diff --git a/util/mvmap/mvmap_test.go b/util/mvmap/mvmap_test.go index 0bf1a85d66..b39eec70a2 100644 --- a/util/mvmap/mvmap_test.go +++ b/util/mvmap/mvmap_test.go @@ -23,15 +23,16 @@ import ( func TestMVMap(t *testing.T) { m := NewMVMap() + vals := [][]byte{} m.Put([]byte("abc"), []byte("abc1")) m.Put([]byte("abc"), []byte("abc2")) m.Put([]byte("def"), []byte("def1")) m.Put([]byte("def"), []byte("def2")) - vals := m.Get([]byte("abc")) + vals = m.Get([]byte("abc"), vals[:0]) if fmt.Sprintf("%s", vals) != "[abc1 abc2]" { t.FailNow() } - vals = m.Get([]byte("def")) + vals = m.Get([]byte("def"), vals[:0]) if fmt.Sprintf("%s", vals) != "[def1 def2]" { t.FailNow() } @@ -70,10 +71,11 @@ func BenchmarkMVMapGet(b *testing.B) { binary.BigEndian.PutUint64(buffer, uint64(i)) m.Put(buffer, buffer) } + val := make([][]byte, 0, 8) b.ResetTimer() for i := 0; i < b.N; i++ { binary.BigEndian.PutUint64(buffer, uint64(i)) - val := m.Get(buffer) + val = m.Get(buffer, val[:0]) if len(val) != 1 || bytes.Compare(val[0], buffer) != 0 { b.FailNow() }