From ecbee2e81abe3fcd8aa4f36748544dbc45bd535b Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 10 Nov 2017 00:21:35 -0600 Subject: [PATCH] executor: don't reuse Executor in IndexLookUpJoin, remove doRequestForDatums() (#5031) --- executor/builder.go | 118 ++++++++++++++++++++++++++++++---- executor/distsql.go | 89 ++----------------------- executor/index_lookup_join.go | 17 +++-- 3 files changed, 122 insertions(+), 102 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 00afd2be9a..2edbd475d7 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -15,8 +15,10 @@ package executor import ( "math" + "sort" "time" + "github.com/cznic/sortutil" "github.com/juju/errors" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/context" @@ -41,6 +43,7 @@ type executorBuilder struct { ctx context.Context is infoschema.InfoSchema priority int + startTS uint64 // cached when the first time getStartTS() is called // err is set when there is error happened during Executor building process. err error } @@ -647,10 +650,16 @@ func (b *executorBuilder) buildTableDual(v *plan.TableDual) Executor { } func (b *executorBuilder) getStartTS() uint64 { + if b.startTS != 0 { + // Return the cached value. + return b.startTS + } + startTS := b.ctx.GetSessionVars().SnapshotTS if startTS == 0 { startTS = b.ctx.Txn().StartTS() } + b.startTS = startTS return startTS } @@ -950,17 +959,11 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plan.PhysicalIndexJoin) Execut b.err = errors.Trace(b.err) return nil } - - innerExec := b.build(v.Children()[1]).(DataReader) - if b.err != nil { - b.err = errors.Trace(b.err) - return nil - } - + innerExecBuilder := &dataReaderBuilder{v.Children()[1], b} return &IndexLookUpJoin{ baseExecutor: newBaseExecutor(v.Schema(), b.ctx, outerExec), outerExec: outerExec, - innerExec: innerExec, + innerExecBuilder: innerExecBuilder, outerKeys: v.OuterJoinKeys, innerKeys: v.InnerJoinKeys, outerFilter: v.LeftConditions, @@ -972,7 +975,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plan.PhysicalIndexJoin) Execut } } -func (b *executorBuilder) buildTableReader(v *plan.PhysicalTableReader) Executor { +func (b *executorBuilder) buildTableReader(v *plan.PhysicalTableReader) *TableReaderExecutor { dagReq := b.constructDAGReq(v.TablePlans) if b.err != nil { return nil @@ -1003,7 +1006,7 @@ func (b *executorBuilder) buildTableReader(v *plan.PhysicalTableReader) Executor return e } -func (b *executorBuilder) buildIndexReader(v *plan.PhysicalIndexReader) Executor { +func (b *executorBuilder) buildIndexReader(v *plan.PhysicalIndexReader) *IndexReaderExecutor { dagReq := b.constructDAGReq(v.IndexPlans) if b.err != nil { return nil @@ -1035,7 +1038,7 @@ func (b *executorBuilder) buildIndexReader(v *plan.PhysicalIndexReader) Executor return e } -func (b *executorBuilder) buildIndexLookUpReader(v *plan.PhysicalIndexLookUpReader) Executor { +func (b *executorBuilder) buildIndexLookUpReader(v *plan.PhysicalIndexLookUpReader) *IndexLookUpExecutor { indexReq := b.constructDAGReq(v.IndexPlans) if b.err != nil { return nil @@ -1094,6 +1097,99 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plan.PhysicalIndexLookUpRead handleCol: handleCol, priority: b.priority, tableReaderSchema: tableReaderSchema, + dataReaderBuilder: &dataReaderBuilder{executorBuilder: b}, } return e } + +// dataReaderBuilder build an executor. +// The executor can be used to read data in the ranges which are constructed by datums. +// Differences from executorBuilder: +// 1. dataReaderBuilder calculate data range from argument, rather than plan. +// 2. the result executor is already opened. +type dataReaderBuilder struct { + plan.Plan + *executorBuilder +} + +func (builder *dataReaderBuilder) buildExecutorForDatums(goCtx goctx.Context, datums [][]types.Datum) (Executor, error) { + switch v := builder.Plan.(type) { + case *plan.PhysicalIndexReader: + return builder.buildIndexReaderForDatums(goCtx, v, datums) + case *plan.PhysicalTableReader: + return builder.buildTableReaderForDatums(goCtx, v, datums) + case *plan.PhysicalIndexLookUpReader: + return builder.buildIndexLookUpReaderForDatums(goCtx, v, datums) + } + return nil, errors.New("Wrong plan type for dataReaderBuilder") +} + +func (builder *dataReaderBuilder) buildTableReaderForDatums(goCtx goctx.Context, v *plan.PhysicalTableReader, datums [][]types.Datum) (Executor, error) { + e := builder.executorBuilder.buildTableReader(v) + if err := builder.executorBuilder.err; err != nil { + return nil, errors.Trace(err) + } + handles := make([]int64, 0, len(datums)) + for _, datum := range datums { + handles = append(handles, datum[0].GetInt64()) + } + return builder.buildTableReaderFromHandles(goCtx, e, handles) +} + +func (builder *dataReaderBuilder) buildTableReaderFromHandles(goCtx goctx.Context, e *TableReaderExecutor, handles []int64) (Executor, error) { + sort.Sort(sortutil.Int64Slice(handles)) + var b requestBuilder + kvReq, err := b.SetTableHandles(e.tableID, handles). + SetDAGRequest(e.dagPB). + SetDesc(e.desc). + SetKeepOrder(e.keepOrder). + SetPriority(e.priority). + SetFromSessionVars(e.ctx.GetSessionVars()). + Build() + if err != nil { + return nil, errors.Trace(err) + } + e.result, err = distsql.SelectDAG(goCtx, e.ctx.GetClient(), kvReq, e.schema.Len()) + if err != nil { + return nil, errors.Trace(err) + } + e.result.Fetch(goCtx) + return e, nil +} + +func (builder *dataReaderBuilder) buildIndexReaderForDatums(goCtx goctx.Context, v *plan.PhysicalIndexReader, values [][]types.Datum) (Executor, error) { + e := builder.executorBuilder.buildIndexReader(v) + if err := builder.executorBuilder.err; err != nil { + return nil, errors.Trace(err) + } + var b requestBuilder + kvReq, err := b.SetIndexValues(e.tableID, e.index.ID, values). + SetDAGRequest(e.dagPB). + SetDesc(e.desc). + SetKeepOrder(e.keepOrder). + SetPriority(e.priority). + SetFromSessionVars(e.ctx.GetSessionVars()). + Build() + if err != nil { + return nil, errors.Trace(err) + } + e.result, err = distsql.SelectDAG(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq, e.schema.Len()) + if err != nil { + return nil, errors.Trace(err) + } + e.result.Fetch(goCtx) + return e, nil +} + +func (builder *dataReaderBuilder) buildIndexLookUpReaderForDatums(goCtx goctx.Context, v *plan.PhysicalIndexLookUpReader, values [][]types.Datum) (Executor, error) { + e := builder.executorBuilder.buildIndexLookUpReader(v) + if err := builder.executorBuilder.err; err != nil { + return nil, errors.Trace(err) + } + kvRanges, err := indexValuesToKVRanges(e.tableID, e.index.ID, values) + if err != nil { + return nil, errors.Trace(err) + } + err = e.open(kvRanges) + return e, errors.Trace(err) +} diff --git a/executor/distsql.go b/executor/distsql.go index 9f7da5b8dd..7de37cc308 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -44,10 +44,6 @@ var ( _ Executor = &TableReaderExecutor{} _ Executor = &IndexReaderExecutor{} _ Executor = &IndexLookUpExecutor{} - - _ DataReader = &TableReaderExecutor{} - _ DataReader = &IndexReaderExecutor{} - _ DataReader = &IndexLookUpExecutor{} ) // LookupTableTaskChannelSize represents the channel size of the index double read taskChan. @@ -302,12 +298,6 @@ func extractHandlesFromNewIndexSubResult(subResult distsql.PartialResult) ([]int return handles, nil } -type int64Slice []int64 - -func (p int64Slice) Len() int { return len(p) } -func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - // Closeable is a interface for closeable structures. type Closeable interface { // Close closes the object. @@ -394,13 +384,6 @@ func setPBColumnsDefaultValue(ctx context.Context, pbColumns []*tipb.ColumnInfo, return nil } -// DataReader can send requests which ranges are constructed by datums. -type DataReader interface { - Executor - - doRequestForDatums(goCtx goctx.Context, datums [][]types.Datum) error -} - // handleIsExtra checks whether this column is a extra handle column generated during plan building phase. func handleIsExtra(col *expression.Column) bool { if col != nil && col.ID == model.ExtraHandleID { @@ -512,38 +495,6 @@ func startSpanFollowsContext(goCtx goctx.Context, operationName string) (opentra return span, opentracing.ContextWithSpan(goCtx, span) } -// doRequestForHandles constructs kv ranges by handles. It is used by index look up executor. -func (e *TableReaderExecutor) doRequestForHandles(goCtx goctx.Context, handles []int64) error { - sort.Sort(int64Slice(handles)) - var builder requestBuilder - kvReq, err := builder.SetTableHandles(e.tableID, handles). - SetDAGRequest(e.dagPB). - SetDesc(e.desc). - SetKeepOrder(e.keepOrder). - SetPriority(e.priority). - SetFromSessionVars(e.ctx.GetSessionVars()). - Build() - if err != nil { - return errors.Trace(err) - } - e.result, err = distsql.SelectDAG(goCtx, e.ctx.GetClient(), kvReq, e.schema.Len()) - if err != nil { - return errors.Trace(err) - } - e.result.Fetch(goCtx) - return nil -} - -// doRequestForDatums constructs kv ranges by Datums. It is used by index look up join. -// Every lens for `datums` will always be one and must be type of int64. -func (e *TableReaderExecutor) doRequestForDatums(goCtx goctx.Context, datums [][]types.Datum) error { - handles := make([]int64, 0, len(datums)) - for _, datum := range datums { - handles = append(handles, datum[0].GetInt64()) - } - return errors.Trace(e.doRequestForHandles(goCtx, handles)) -} - // IndexReaderExecutor sends dag request and reads index data from kv layer. type IndexReaderExecutor struct { table table.Table @@ -640,27 +591,6 @@ func (e *IndexReaderExecutor) Open() error { return nil } -// doRequestForDatums constructs kv ranges by datums. It is used by index look up executor. -func (e *IndexReaderExecutor) doRequestForDatums(goCtx goctx.Context, values [][]types.Datum) error { - var builder requestBuilder - kvReq, err := builder.SetIndexValues(e.tableID, e.index.ID, values). - SetDAGRequest(e.dagPB). - SetDesc(e.desc). - SetKeepOrder(e.keepOrder). - SetPriority(e.priority). - SetFromSessionVars(e.ctx.GetSessionVars()). - Build() - if err != nil { - return errors.Trace(err) - } - e.result, err = distsql.SelectDAG(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq, e.schema.Len()) - if err != nil { - return errors.Trace(err) - } - e.result.Fetch(goCtx) - return nil -} - // IndexLookUpExecutor implements double read for index scan. type IndexLookUpExecutor struct { table table.Table @@ -682,6 +612,7 @@ type IndexLookUpExecutor struct { // columns are only required by union scan. columns []*model.ColumnInfo priority int + *dataReaderBuilder // All fields above is immutable. indexWorker @@ -846,15 +777,6 @@ func (e *IndexLookUpExecutor) indexRangesToKVRanges() ([]kv.KeyRange, error) { return indexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.tableID, e.index.ID, e.ranges, fieldTypes) } -// doRequestForDatums constructs kv ranges by datums. It is used by index look up join. -func (e *IndexLookUpExecutor) doRequestForDatums(goCtx goctx.Context, values [][]types.Datum) error { - kvRanges, err := indexValuesToKVRanges(e.tableID, e.index.ID, values) - if err != nil { - return errors.Trace(err) - } - return e.open(kvRanges) -} - // executeTask executes the table look up tasks. We will construct a table reader and send request by handles. // Then we hold the returning rows and finish this task. func (e *IndexLookUpExecutor) executeTask(task *lookupTableTask, goCtx goctx.Context) { @@ -868,18 +790,21 @@ func (e *IndexLookUpExecutor) executeTask(task *lookupTableTask, goCtx goctx.Con } else { schema = e.schema } - tableReader := &TableReaderExecutor{ + + var tableReader Executor + tableReader, err = e.dataReaderBuilder.buildTableReaderFromHandles(goCtx, &TableReaderExecutor{ table: e.table, tableID: e.tableID, dagPB: e.tableRequest, schema: schema, ctx: e.ctx, - } - err = tableReader.doRequestForHandles(goCtx, task.handles) + }, task.handles) if err != nil { + log.Error(err) return } defer terror.Call(tableReader.Close) + for { var row Row row, err = tableReader.Next() diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 95287cb9dc..aa5306f3bf 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -86,7 +86,7 @@ type IndexLookUpJoin struct { baseExecutor outerExec Executor - innerExec DataReader + innerExecBuilder *dataReaderBuilder outerKeys []*expression.Column innerKeys []*expression.Column outerFilter expression.CNFExprs @@ -129,7 +129,7 @@ func (e *IndexLookUpJoin) Close() error { // release all resource references. e.outerExec = nil - e.innerExec = nil + e.innerExecBuilder = nil e.outerKeys = nil e.innerKeys = nil e.outerFilter = nil @@ -262,16 +262,16 @@ func (e *IndexLookUpJoin) deDuplicateRequestRows(requestRows [][]types.Datum, re // fetchSortedInners will join the outer rows and inner rows and store them to resultBuffer. func (e *IndexLookUpJoin) fetchSortedInners(requestRows [][]types.Datum) error { - if err := e.innerExec.doRequestForDatums(e.ctx.GoCtx(), requestRows); err != nil { + innerExec, err := e.innerExecBuilder.buildExecutorForDatums(e.ctx.GoCtx(), requestRows) + if err != nil { return errors.Trace(err) } - - defer terror.Call(e.innerExec.Close) + defer terror.Call(innerExec.Close) for { - innerRow, err := e.innerExec.Next() - if err != nil { - return errors.Trace(err) + innerRow, err1 := innerExec.Next() + if err1 != nil { + return errors.Trace(err1) } else if innerRow == nil { break } @@ -298,7 +298,6 @@ func (e *IndexLookUpJoin) fetchSortedInners(requestRows [][]types.Datum) error { innerJoinKey = innerJoinKey[len(innerJoinKey):] } - var err error e.innerOrderedRows.keys, err = e.constructJoinKeys(innerJoinKeys) if err != nil { return errors.Trace(err)