From e5ea06dec804626fb94fb3dff22b373e20b1602b Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Thu, 17 Aug 2017 13:07:51 +0800 Subject: [PATCH] executor, plan: union scan reuse pk when it's handle. (#4185) --- executor/builder.go | 14 +++++++++++++- executor/union_scan.go | 37 ++++++++++++++++++------------------ plan/column_pruning.go | 2 +- plan/initialize.go | 2 +- plan/logical_plan_builder.go | 23 ++++++++++++---------- plan/resolve_indices.go | 1 + 6 files changed, 47 insertions(+), 32 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index c6f96f49b4..7e386f892e 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -370,7 +370,19 @@ func (b *executorBuilder) buildUnionScanExec(v *plan.PhysicalUnionScan) Executor if b.err != nil { return nil } - us := &UnionScanExec{baseExecutor: newBaseExecutor(v.Schema(), b.ctx, src), needColHandle: v.NeedColHandle} + us := &UnionScanExec{baseExecutor: newBaseExecutor(v.Schema(), b.ctx, src)} + // Get the handle column index of the below plan. + // We can guarantee that there must be only one col in the map. + for _, cols := range v.Children()[0].Schema().TblID2Handle { + for _, col := range cols { + us.belowHandleIndex = col.Index + // If we don't found the handle column in the union scan's schema, + // we need to remove it when output. + if us.schema.ColumnIndex(col) != -1 { + us.handleColIsUsed = true + } + } + } switch x := src.(type) { case *XSelectTableExec: us.desc = x.desc diff --git a/executor/union_scan.go b/executor/union_scan.go index dccc8df052..dba0a78654 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -91,11 +91,15 @@ type UnionScanExec struct { dirty *dirtyTable // usedIndex is the column offsets of the index which Src executor has used. - usedIndex []int - desc bool - conditions []expression.Expression - columns []*model.ColumnInfo - needColHandle bool + usedIndex []int + desc bool + conditions []expression.Expression + columns []*model.ColumnInfo + + // belowHandleIndex is the handle's position of the below scan plan. + belowHandleIndex int + // handleColIsUsed checks whether this executor need to output handle column in its output row. + handleColIsUsed bool addedRows []Row cursor int @@ -134,8 +138,8 @@ func (us *UnionScanExec) Next() (Row, error) { } else { us.cursor++ } - if !us.needColHandle { - row = row[:len(row)-1] + if !us.handleColIsUsed { + row = append(row[:us.belowHandleIndex], row[us.belowHandleIndex+1:]...) } return row, nil } @@ -172,7 +176,7 @@ func (us *UnionScanExec) getSnapshotRow() (Row, error) { if us.snapshotRow == nil { break } - snapshotHandle := us.snapshotRow[len(us.snapshotRow)-1].GetInt64() + snapshotHandle := us.snapshotRow[us.belowHandleIndex].GetInt64() if _, ok := us.dirty.deletedRows[snapshotHandle]; ok { continue } @@ -231,9 +235,8 @@ func (us *UnionScanExec) compare(a, b Row) (int, error) { return cmp, nil } } - dataLen := len(a) - aHandle := a[dataLen-1].GetInt64() - bHandle := b[dataLen-1].GetInt64() + aHandle := a[us.belowHandleIndex].GetInt64() + bHandle := b[us.belowHandleIndex].GetInt64() var cmp int if aHandle == bHandle { cmp = 0 @@ -247,17 +250,14 @@ func (us *UnionScanExec) compare(a, b Row) (int, error) { func (us *UnionScanExec) buildAndSortAddedRows(t table.Table, asName *model.CIStr) error { us.addedRows = make([]Row, 0, len(us.dirty.addedRows)) - newLen := us.schema.Len() - if !us.needColHandle { - newLen++ - } for h, data := range us.dirty.addedRows { - newData := make([]types.Datum, 0, newLen) + newData := make([]types.Datum, 0, us.schema.Len()) for _, col := range us.columns { if col.ID == model.ExtraHandleID { - continue + newData = append(newData, types.NewIntDatum(h)) + } else { + newData = append(newData, data[col.Offset]) } - newData = append(newData, data[col.Offset]) } matched, err := expression.EvalBool(us.conditions, newData, us.ctx) if err != nil { @@ -266,7 +266,6 @@ func (us *UnionScanExec) buildAndSortAddedRows(t table.Table, asName *model.CISt if !matched { continue } - newData = append(newData, types.NewIntDatum(h)) row := newData us.addedRows = append(us.addedRows, row) diff --git a/plan/column_pruning.go b/plan/column_pruning.go index 79e615bce7..da6aa0d23d 100644 --- a/plan/column_pruning.go +++ b/plan/column_pruning.go @@ -157,6 +157,7 @@ func (p *Union) PruneColumns(parentUsedCols []*expression.Column) { // PruneColumns implements LogicalPlan interface. func (p *DataSource) PruneColumns(parentUsedCols []*expression.Column) { used := getUsedList(parentUsedCols, p.schema) + p.pruneUnionScanSchema(used) handleIdx := -1 // -1 for not found. for _, col := range p.schema.TblID2Handle { handleIdx = col[0].Index @@ -174,7 +175,6 @@ func (p *DataSource) PruneColumns(parentUsedCols []*expression.Column) { p.Columns = append(p.Columns[:i], p.Columns[i+1:]...) } } - p.pruneUnionScanSchema(used) } func (p *DataSource) pruneUnionScanSchema(usedMask []bool) { diff --git a/plan/initialize.go b/plan/initialize.go index 4fc87cd0c9..f722a8d7a1 100644 --- a/plan/initialize.go +++ b/plan/initialize.go @@ -307,7 +307,7 @@ func (p PhysicalIndexReader) init(allocator *idAllocator, ctx context.Context) * is := p.IndexPlans[0].(*PhysicalIndexScan) p.schema = is.dataSourceSchema } - p.OutputColumns = p.schema.Columns + p.OutputColumns = p.schema.Clone().Columns return &p } diff --git a/plan/logical_plan_builder.go b/plan/logical_plan_builder.go index 7daa7270ff..eabec1f8e9 100644 --- a/plan/logical_plan_builder.go +++ b/plan/logical_plan_builder.go @@ -1217,7 +1217,13 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan { p.SetSchema(schema) return p } - if pkCol == nil || needUnionScan { + if needUnionScan { + p.unionScanSchema = expression.NewSchema(make([]*expression.Column, 0, len(tableInfo.Columns))...) + for _, col := range schema.Columns { + p.unionScanSchema.Append(col) + } + } + if pkCol == nil { idCol := &expression.Column{ FromID: p.id, DBName: schemaName, @@ -1228,15 +1234,9 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan { Index: schema.Len(), ID: model.ExtraHandleID, } - if needUnionScan { - p.unionScanSchema = expression.NewSchema(make([]*expression.Column, 0, len(tableInfo.Columns))...) - for _, col := range schema.Columns { - p.unionScanSchema.Append(col) - } - if b.needColHandle > 0 { - p.unionScanSchema.Columns = append(p.unionScanSchema.Columns, idCol) - p.unionScanSchema.TblID2Handle[tableInfo.ID] = []*expression.Column{idCol} - } + if needUnionScan && b.needColHandle > 0 { + p.unionScanSchema.Columns = append(p.unionScanSchema.Columns, idCol) + p.unionScanSchema.TblID2Handle[tableInfo.ID] = []*expression.Column{idCol} } p.Columns = append(p.Columns, &model.ColumnInfo{ ID: model.ExtraHandleID, @@ -1245,6 +1245,9 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan { schema.Append(idCol) schema.TblID2Handle[tableInfo.ID] = []*expression.Column{idCol} } else { + if needUnionScan && b.needColHandle > 0 { + p.unionScanSchema.TblID2Handle[tableInfo.ID] = []*expression.Column{pkCol} + } schema.TblID2Handle[tableInfo.ID] = []*expression.Column{pkCol} } p.SetSchema(schema) diff --git a/plan/resolve_indices.go b/plan/resolve_indices.go index 5b47015e07..e0f6fcb641 100644 --- a/plan/resolve_indices.go +++ b/plan/resolve_indices.go @@ -141,6 +141,7 @@ func (p *PhysicalTableReader) ResolveIndices() { // ResolveIndices implements Plan interface. func (p *PhysicalIndexReader) ResolveIndices() { + p.basePlan.ResolveIndices() p.indexPlan.ResolveIndices() for _, col := range p.OutputColumns { if col.ID != model.ExtraHandleID {