From 232ecbeca05ca92a45c0434d911ee52bc2314fae Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Fri, 22 Dec 2017 16:47:22 +0800 Subject: [PATCH] executor: support Chunk for MergeJoinExec (#5312) --- executor/builder.go | 15 ++- executor/merge_join.go | 293 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 273 insertions(+), 35 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 70c668e842..7185d69249 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -549,14 +549,14 @@ func (b *executorBuilder) buildMergeJoin(v *plan.PhysicalMergeJoin) Executor { rightKeys = append(rightKeys, rightKey) } - leftRowBlock := &rowBlockIterator{ + lhsIter := &readerIterator{ ctx: b.ctx, reader: leftExec, filter: v.LeftConditions, joinKeys: leftKeys, } - rightRowBlock := &rowBlockIterator{ + rhsIter := &readerIterator{ ctx: b.ctx, reader: rightExec, filter: v.RightConditions, @@ -567,20 +567,24 @@ func (b *executorBuilder) buildMergeJoin(v *plan.PhysicalMergeJoin) Executor { if defaultValues == nil { defaultValues = make([]types.Datum, rightExec.Schema().Len()) } + lhsColTypes := leftExec.Schema().GetTypes() + rhsColTypes := rightExec.Schema().GetTypes() e := &MergeJoinExec{ baseExecutor: newBaseExecutor(v.Schema(), b.ctx, leftExec, rightExec), - resultGenerator: newJoinResultGenerator(b.ctx, v.JoinType, false, defaultValues, v.OtherConditions, nil, nil), + resultGenerator: newJoinResultGenerator(b.ctx, v.JoinType, false, defaultValues, v.OtherConditions, lhsColTypes, rhsColTypes), stmtCtx: b.ctx.GetSessionVars().StmtCtx, // left is the outer side by default. + outerIdx: 0, outerKeys: leftKeys, innerKeys: rightKeys, - outerIter: leftRowBlock, - innerIter: rightRowBlock, + outerIter: lhsIter, + innerIter: rhsIter, } if v.JoinType == plan.RightOuterJoin { e.outerKeys, e.innerKeys = e.innerKeys, e.outerKeys e.outerIter, e.innerIter = e.innerIter, e.outerIter + e.outerIdx = 1 } if v.JoinType != plan.InnerJoin { @@ -588,6 +592,7 @@ func (b *executorBuilder) buildMergeJoin(v *plan.PhysicalMergeJoin) Executor { e.outerIter.filter = nil } + e.supportChk = true return e } diff --git a/executor/merge_join.go b/executor/merge_join.go index 2290d32d00..803ad3ddcc 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -14,10 +14,13 @@ package executor import ( + "github.com/cznic/mathutil" "github.com/juju/errors" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" goctx "golang.org/x/net/context" ) @@ -36,8 +39,8 @@ type MergeJoinExec struct { outerKeys []*expression.Column innerKeys []*expression.Column - outerIter *rowBlockIterator - innerIter *rowBlockIterator + outerIter *readerIterator + innerIter *readerIterator outerRows []Row innerRows []Row outerFilter []expression.Expression @@ -45,48 +48,68 @@ type MergeJoinExec struct { resultGenerator joinResultGenerator resultBuffer []Row resultCursor int + + // for chunk execution. + outerIdx int + resultChunk *chunk.Chunk + outerChunkRows []chunk.Row + innerChunkRows []chunk.Row } const rowBufferSize = 4096 -// rowBlockIterator represents a row block with the same join keys -type rowBlockIterator struct { +// readerIterator represents a row block with the same join keys +type readerIterator struct { stmtCtx *stmtctx.StatementContext ctx context.Context reader Executor filter []expression.Expression joinKeys []*expression.Column - peekedRow Row + peekedRow types.Row rowCache []Row + goCtx goctx.Context + + // for chunk executions + firstRow4Key chunk.Row + curRow chunk.Row + curResult *chunk.Chunk + curResultInUse bool + curSelected []bool + resultQueue []*chunk.Chunk + resourceQueue []*chunk.Chunk + + // joinResultGenerator is "nil" means this iterator works on the inner table, + // otherwise it works on the outer table and is used to emits the un-matched + // outer rows to "joinResult". + joinResultGenerator joinResultGenerator + joinResult *chunk.Chunk } -func (rb *rowBlockIterator) init() error { - if rb.reader == nil || rb.joinKeys == nil || len(rb.joinKeys) == 0 || rb.ctx == nil { +func (ri *readerIterator) init() error { + if ri.reader == nil || ri.joinKeys == nil || len(ri.joinKeys) == 0 || ri.ctx == nil { return errors.Errorf("Invalid arguments: Empty arguments detected.") } - rb.stmtCtx = rb.ctx.GetSessionVars().StmtCtx + ri.stmtCtx = ri.ctx.GetSessionVars().StmtCtx var err error - rb.peekedRow, err = rb.nextRow() + ri.peekedRow, err = ri.nextRow() if err != nil { return errors.Trace(err) } - rb.rowCache = make([]Row, 0, rowBufferSize) - + ri.rowCache = make([]Row, 0, rowBufferSize) return nil } -func (rb *rowBlockIterator) nextRow() (Row, error) { - goCtx := goctx.TODO() +func (ri *readerIterator) nextRow() (types.Row, error) { for { - row, err := rb.reader.Next(goCtx) + row, err := ri.reader.Next(ri.goCtx) if err != nil { return nil, errors.Trace(err) } if row == nil { return nil, nil } - if rb.filter != nil { - matched, err := expression.EvalBool(rb.filter, row, rb.ctx) + if ri.filter != nil { + matched, err := expression.EvalBool(ri.filter, row, ri.ctx) if err != nil { return nil, errors.Trace(err) } @@ -98,34 +121,130 @@ func (rb *rowBlockIterator) nextRow() (Row, error) { } } -func (rb *rowBlockIterator) nextBlock() ([]Row, error) { - if rb.peekedRow == nil { +func (ri *readerIterator) nextBlock() ([]Row, error) { + if ri.peekedRow == nil { return nil, nil } - rowCache := rb.rowCache[0:0:rowBufferSize] - rowCache = append(rowCache, rb.peekedRow) + rowCache := ri.rowCache[0:0:rowBufferSize] + rowCache = append(rowCache, ri.peekedRow.(types.DatumRow)) for { - curRow, err := rb.nextRow() + curRow, err := ri.nextRow() if err != nil { return nil, errors.Trace(err) } if curRow == nil { - rb.peekedRow = nil + ri.peekedRow = nil return rowCache, nil } - compareResult, err := compareKeys(rb.stmtCtx, curRow, rb.joinKeys, rb.peekedRow, rb.joinKeys) + compareResult, err := compareKeys(ri.stmtCtx, curRow, ri.joinKeys, ri.peekedRow, ri.joinKeys) if err != nil { return nil, errors.Trace(err) } if compareResult == 0 { - rowCache = append(rowCache, curRow) + rowCache = append(rowCache, curRow.(types.DatumRow)) } else { - rb.peekedRow = curRow + ri.peekedRow = curRow return rowCache, nil } } } +func (ri *readerIterator) initForChunk(chk4Reader *chunk.Chunk) (err error) { + if ri.reader == nil || ri.joinKeys == nil || len(ri.joinKeys) == 0 || ri.ctx == nil { + return errors.Errorf("Invalid arguments: Empty arguments detected.") + } + ri.stmtCtx = ri.ctx.GetSessionVars().StmtCtx + ri.curResult = chk4Reader + ri.curSelected = make([]bool, 0, ri.ctx.GetSessionVars().MaxChunkSize) + ri.curRow = chk4Reader.End() + ri.curResultInUse = false + ri.resultQueue = append(ri.resultQueue, chk4Reader) + ri.firstRow4Key, err = ri.nextSelectedRow() + return errors.Trace(err) +} + +func (ri *readerIterator) rowsWithSameKey() (rows []chunk.Row, err error) { + lastResultIdx := len(ri.resultQueue) - 1 + ri.resourceQueue = append(ri.resourceQueue, ri.resultQueue[0:lastResultIdx]...) + ri.resultQueue = ri.resultQueue[lastResultIdx:] + // no more data. + if ri.firstRow4Key == ri.curResult.End() { + return nil, nil + } + rows = append(rows, ri.firstRow4Key) + for { + selectedRow, err := ri.nextSelectedRow() + // error happens or no more data. + if err != nil || selectedRow == ri.curResult.End() { + ri.firstRow4Key = ri.curResult.End() + return rows, errors.Trace(err) + } + compareResult, err := compareKeys(ri.stmtCtx, selectedRow, ri.joinKeys, ri.firstRow4Key, ri.joinKeys) + if err != nil { + return nil, errors.Trace(err) + } + if compareResult == 0 { + rows = append(rows, selectedRow) + } else { + ri.firstRow4Key = selectedRow + return rows, nil + } + } +} + +func (ri *readerIterator) nextSelectedRow() (chunk.Row, error) { + for { + for ; ri.curRow != ri.curResult.End(); ri.curRow = ri.curRow.Next() { + if ri.curSelected[ri.curRow.Idx()] { + result := ri.curRow + ri.curResultInUse = true + ri.curRow = ri.curRow.Next() + return result, nil + } else if ri.joinResultGenerator != nil { + // If this iterator works on the outer table, we should emit the un-matched outer row to result Chunk. + err := ri.joinResultGenerator.emitToChunk(ri.curRow, nil, ri.joinResult) + if err != nil { + return ri.curResult.End(), errors.Trace(err) + } + } + } + ri.reallocReaderResult() + ri.curRow = ri.curResult.Begin() + err := ri.reader.NextChunk(ri.goCtx, ri.curResult) + // error happens or no more data. + if err != nil || ri.curResult.NumRows() == 0 { + return ri.curResult.End(), errors.Trace(err) + } + ri.curSelected, err = expression.VectorizedFilter(ri.ctx, ri.filter, ri.curResult, ri.curSelected) + if err != nil { + return ri.curResult.End(), errors.Trace(err) + } + } +} + +// reallocReaderResult resets "ri.curResult" to an empty Chunk to buffer the result of "ri.reader". +// It pops a Chunk from "ri.resourceQueue" and push it into "ri.resultQueue" immediately. +func (ri *readerIterator) reallocReaderResult() { + if !ri.curResultInUse { + // If "ri.curResult" is not in use, we can just reuse it. + ri.curResult.Reset() + return + } + + // Create a new Chunk and append it to "resourceQueue" if there is no more + // available chunk in "resourceQueue". + if len(ri.resourceQueue) == 0 { + ri.resourceQueue = append(ri.resourceQueue, ri.reader.newChunk()) + } + + // NOTE: "ri.curResult" is always the last element of "resultQueue". + ri.curResult = ri.resourceQueue[0] + ri.resourceQueue = ri.resourceQueue[1:] + ri.resultQueue = append(ri.resultQueue, ri.curResult) + ri.curResult.Reset() + ri.curResultInUse = false +} + // Close implements the Executor Close interface. func (e *MergeJoinExec) Close() error { e.resultBuffer = nil @@ -147,8 +266,8 @@ func (e *MergeJoinExec) Open(goCtx goctx.Context) error { } func compareKeys(stmtCtx *stmtctx.StatementContext, - leftRow Row, leftKeys []*expression.Column, - rightRow Row, rightKeys []*expression.Column) (int, error) { + leftRow types.Row, leftKeys []*expression.Column, + rightRow types.Row, rightKeys []*expression.Column) (int, error) { for i, leftKey := range leftKeys { lVal, err := leftKey.Eval(leftRow) if err != nil { @@ -250,7 +369,37 @@ func (e *MergeJoinExec) computeJoin() (bool, error) { } } -func (e *MergeJoinExec) prepare() error { +func (e *MergeJoinExec) prepare(goCtx goctx.Context, forChunk bool) error { + e.outerIter.goCtx = goCtx + e.innerIter.goCtx = goCtx + // prepare for chunk-oriented execution. + if forChunk { + e.resultChunk = e.newChunk() + e.outerIter.filter = e.outerFilter + e.outerIter.joinResultGenerator = e.resultGenerator + e.outerIter.joinResult = e.resultChunk + err := e.outerIter.initForChunk(e.childrenResults[e.outerIdx]) + if err != nil { + return errors.Trace(err) + } + err = e.innerIter.initForChunk(e.childrenResults[e.outerIdx^1]) + if err != nil { + return errors.Trace(err) + } + + e.outerChunkRows, err = e.outerIter.rowsWithSameKey() + if err != nil { + return errors.Trace(err) + } + e.innerChunkRows, err = e.innerIter.rowsWithSameKey() + if err != nil { + return errors.Trace(err) + } + e.prepared = true + return nil + } + + // prepare for row-oriented execution. err := e.outerIter.init() if err != nil { return errors.Trace(err) @@ -268,7 +417,6 @@ func (e *MergeJoinExec) prepare() error { if err != nil { return errors.Trace(err) } - e.prepared = true return nil } @@ -276,7 +424,7 @@ func (e *MergeJoinExec) prepare() error { // Next implements the Executor Next interface. func (e *MergeJoinExec) Next(goCtx goctx.Context) (Row, error) { if !e.prepared { - if err := e.prepare(); err != nil { + if err := e.prepare(goCtx, false); err != nil { return nil, errors.Trace(err) } } @@ -296,3 +444,88 @@ func (e *MergeJoinExec) Next(goCtx goctx.Context) (Row, error) { e.resultCursor++ return result, nil } + +// NextChunk implements the Executor NextChunk interface. +func (e *MergeJoinExec) NextChunk(goCtx goctx.Context, chk *chunk.Chunk) error { + chk.Reset() + if !e.prepared { + if err := e.prepare(goCtx, true); err != nil { + return errors.Trace(err) + } + } + for { + numRequiredRows := e.maxChunkSize - chk.NumRows() + numRetainedRows := e.resultChunk.NumRows() + numAppendedRows := mathutil.Min(numRequiredRows, numRetainedRows) + + chk.Append(e.resultChunk, e.resultChunk.NumRows()-numAppendedRows, e.resultChunk.NumRows()) + e.resultChunk.TruncateTo(e.resultChunk.NumRows() - numAppendedRows) + + if chk.NumRows() == e.maxChunkSize { + return nil + } + + // reach here means there is no more data in "e.resultChunk" + hasMore, err := e.joinToResultChunk() + if err != nil || !hasMore { + return errors.Trace(err) + } + } +} + +func (e *MergeJoinExec) joinToResultChunk() (bool, error) { + var ( + cmpResult int + err error + ) + for { + if e.outerChunkRows == nil { + return false, nil + } + + if e.innerChunkRows == nil { + // here we set cmpResult to -1 to emit unmatched outer rows. + cmpResult = -1 + } else { + cmpResult, err = compareKeys(e.stmtCtx, e.outerChunkRows[0], e.outerKeys, e.innerChunkRows[0], e.innerKeys) + if err != nil { + return false, errors.Trace(err) + } + if cmpResult > 0 { + e.innerChunkRows, err = e.innerIter.rowsWithSameKey() + if err != nil { + return false, errors.Trace(err) + } + continue + } + } + + // reach here, cmpResult <= 0 is guaranteed. + if cmpResult < 0 { + for _, unMatchedOuter := range e.outerChunkRows { + err = e.resultGenerator.emitToChunk(unMatchedOuter, nil, e.resultChunk) + if err != nil { + return false, errors.Trace(err) + } + } + } else { + for _, outer := range e.outerChunkRows { + err = e.resultGenerator.emitToChunk(outer, e.innerChunkRows, e.resultChunk) + if err != nil { + return false, errors.Trace(err) + } + } + e.innerChunkRows, err = e.innerIter.rowsWithSameKey() + if err != nil { + return false, errors.Trace(err) + } + } + e.outerChunkRows, err = e.outerIter.rowsWithSameKey() + if err != nil { + return false, errors.Trace(err) + } + if e.resultChunk.NumRows() > 0 { + return true, nil + } + } +}