executor, plan: union scan reuse pk when it's handle. (#4185)
This commit is contained in:
@ -370,7 +370,19 @@ func (b *executorBuilder) buildUnionScanExec(v *plan.PhysicalUnionScan) Executor
|
||||
if b.err != nil {
|
||||
return nil
|
||||
}
|
||||
us := &UnionScanExec{baseExecutor: newBaseExecutor(v.Schema(), b.ctx, src), needColHandle: v.NeedColHandle}
|
||||
us := &UnionScanExec{baseExecutor: newBaseExecutor(v.Schema(), b.ctx, src)}
|
||||
// Get the handle column index of the below plan.
|
||||
// We can guarantee that there must be only one col in the map.
|
||||
for _, cols := range v.Children()[0].Schema().TblID2Handle {
|
||||
for _, col := range cols {
|
||||
us.belowHandleIndex = col.Index
|
||||
// If we don't found the handle column in the union scan's schema,
|
||||
// we need to remove it when output.
|
||||
if us.schema.ColumnIndex(col) != -1 {
|
||||
us.handleColIsUsed = true
|
||||
}
|
||||
}
|
||||
}
|
||||
switch x := src.(type) {
|
||||
case *XSelectTableExec:
|
||||
us.desc = x.desc
|
||||
|
||||
@ -91,11 +91,15 @@ type UnionScanExec struct {
|
||||
|
||||
dirty *dirtyTable
|
||||
// usedIndex is the column offsets of the index which Src executor has used.
|
||||
usedIndex []int
|
||||
desc bool
|
||||
conditions []expression.Expression
|
||||
columns []*model.ColumnInfo
|
||||
needColHandle bool
|
||||
usedIndex []int
|
||||
desc bool
|
||||
conditions []expression.Expression
|
||||
columns []*model.ColumnInfo
|
||||
|
||||
// belowHandleIndex is the handle's position of the below scan plan.
|
||||
belowHandleIndex int
|
||||
// handleColIsUsed checks whether this executor need to output handle column in its output row.
|
||||
handleColIsUsed bool
|
||||
|
||||
addedRows []Row
|
||||
cursor int
|
||||
@ -134,8 +138,8 @@ func (us *UnionScanExec) Next() (Row, error) {
|
||||
} else {
|
||||
us.cursor++
|
||||
}
|
||||
if !us.needColHandle {
|
||||
row = row[:len(row)-1]
|
||||
if !us.handleColIsUsed {
|
||||
row = append(row[:us.belowHandleIndex], row[us.belowHandleIndex+1:]...)
|
||||
}
|
||||
return row, nil
|
||||
}
|
||||
@ -172,7 +176,7 @@ func (us *UnionScanExec) getSnapshotRow() (Row, error) {
|
||||
if us.snapshotRow == nil {
|
||||
break
|
||||
}
|
||||
snapshotHandle := us.snapshotRow[len(us.snapshotRow)-1].GetInt64()
|
||||
snapshotHandle := us.snapshotRow[us.belowHandleIndex].GetInt64()
|
||||
if _, ok := us.dirty.deletedRows[snapshotHandle]; ok {
|
||||
continue
|
||||
}
|
||||
@ -231,9 +235,8 @@ func (us *UnionScanExec) compare(a, b Row) (int, error) {
|
||||
return cmp, nil
|
||||
}
|
||||
}
|
||||
dataLen := len(a)
|
||||
aHandle := a[dataLen-1].GetInt64()
|
||||
bHandle := b[dataLen-1].GetInt64()
|
||||
aHandle := a[us.belowHandleIndex].GetInt64()
|
||||
bHandle := b[us.belowHandleIndex].GetInt64()
|
||||
var cmp int
|
||||
if aHandle == bHandle {
|
||||
cmp = 0
|
||||
@ -247,17 +250,14 @@ func (us *UnionScanExec) compare(a, b Row) (int, error) {
|
||||
|
||||
func (us *UnionScanExec) buildAndSortAddedRows(t table.Table, asName *model.CIStr) error {
|
||||
us.addedRows = make([]Row, 0, len(us.dirty.addedRows))
|
||||
newLen := us.schema.Len()
|
||||
if !us.needColHandle {
|
||||
newLen++
|
||||
}
|
||||
for h, data := range us.dirty.addedRows {
|
||||
newData := make([]types.Datum, 0, newLen)
|
||||
newData := make([]types.Datum, 0, us.schema.Len())
|
||||
for _, col := range us.columns {
|
||||
if col.ID == model.ExtraHandleID {
|
||||
continue
|
||||
newData = append(newData, types.NewIntDatum(h))
|
||||
} else {
|
||||
newData = append(newData, data[col.Offset])
|
||||
}
|
||||
newData = append(newData, data[col.Offset])
|
||||
}
|
||||
matched, err := expression.EvalBool(us.conditions, newData, us.ctx)
|
||||
if err != nil {
|
||||
@ -266,7 +266,6 @@ func (us *UnionScanExec) buildAndSortAddedRows(t table.Table, asName *model.CISt
|
||||
if !matched {
|
||||
continue
|
||||
}
|
||||
newData = append(newData, types.NewIntDatum(h))
|
||||
|
||||
row := newData
|
||||
us.addedRows = append(us.addedRows, row)
|
||||
|
||||
@ -157,6 +157,7 @@ func (p *Union) PruneColumns(parentUsedCols []*expression.Column) {
|
||||
// PruneColumns implements LogicalPlan interface.
|
||||
func (p *DataSource) PruneColumns(parentUsedCols []*expression.Column) {
|
||||
used := getUsedList(parentUsedCols, p.schema)
|
||||
p.pruneUnionScanSchema(used)
|
||||
handleIdx := -1 // -1 for not found.
|
||||
for _, col := range p.schema.TblID2Handle {
|
||||
handleIdx = col[0].Index
|
||||
@ -174,7 +175,6 @@ func (p *DataSource) PruneColumns(parentUsedCols []*expression.Column) {
|
||||
p.Columns = append(p.Columns[:i], p.Columns[i+1:]...)
|
||||
}
|
||||
}
|
||||
p.pruneUnionScanSchema(used)
|
||||
}
|
||||
|
||||
func (p *DataSource) pruneUnionScanSchema(usedMask []bool) {
|
||||
|
||||
@ -307,7 +307,7 @@ func (p PhysicalIndexReader) init(allocator *idAllocator, ctx context.Context) *
|
||||
is := p.IndexPlans[0].(*PhysicalIndexScan)
|
||||
p.schema = is.dataSourceSchema
|
||||
}
|
||||
p.OutputColumns = p.schema.Columns
|
||||
p.OutputColumns = p.schema.Clone().Columns
|
||||
return &p
|
||||
}
|
||||
|
||||
|
||||
@ -1217,7 +1217,13 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan {
|
||||
p.SetSchema(schema)
|
||||
return p
|
||||
}
|
||||
if pkCol == nil || needUnionScan {
|
||||
if needUnionScan {
|
||||
p.unionScanSchema = expression.NewSchema(make([]*expression.Column, 0, len(tableInfo.Columns))...)
|
||||
for _, col := range schema.Columns {
|
||||
p.unionScanSchema.Append(col)
|
||||
}
|
||||
}
|
||||
if pkCol == nil {
|
||||
idCol := &expression.Column{
|
||||
FromID: p.id,
|
||||
DBName: schemaName,
|
||||
@ -1228,15 +1234,9 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan {
|
||||
Index: schema.Len(),
|
||||
ID: model.ExtraHandleID,
|
||||
}
|
||||
if needUnionScan {
|
||||
p.unionScanSchema = expression.NewSchema(make([]*expression.Column, 0, len(tableInfo.Columns))...)
|
||||
for _, col := range schema.Columns {
|
||||
p.unionScanSchema.Append(col)
|
||||
}
|
||||
if b.needColHandle > 0 {
|
||||
p.unionScanSchema.Columns = append(p.unionScanSchema.Columns, idCol)
|
||||
p.unionScanSchema.TblID2Handle[tableInfo.ID] = []*expression.Column{idCol}
|
||||
}
|
||||
if needUnionScan && b.needColHandle > 0 {
|
||||
p.unionScanSchema.Columns = append(p.unionScanSchema.Columns, idCol)
|
||||
p.unionScanSchema.TblID2Handle[tableInfo.ID] = []*expression.Column{idCol}
|
||||
}
|
||||
p.Columns = append(p.Columns, &model.ColumnInfo{
|
||||
ID: model.ExtraHandleID,
|
||||
@ -1245,6 +1245,9 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan {
|
||||
schema.Append(idCol)
|
||||
schema.TblID2Handle[tableInfo.ID] = []*expression.Column{idCol}
|
||||
} else {
|
||||
if needUnionScan && b.needColHandle > 0 {
|
||||
p.unionScanSchema.TblID2Handle[tableInfo.ID] = []*expression.Column{pkCol}
|
||||
}
|
||||
schema.TblID2Handle[tableInfo.ID] = []*expression.Column{pkCol}
|
||||
}
|
||||
p.SetSchema(schema)
|
||||
|
||||
@ -141,6 +141,7 @@ func (p *PhysicalTableReader) ResolveIndices() {
|
||||
|
||||
// ResolveIndices implements Plan interface.
|
||||
func (p *PhysicalIndexReader) ResolveIndices() {
|
||||
p.basePlan.ResolveIndices()
|
||||
p.indexPlan.ResolveIndices()
|
||||
for _, col := range p.OutputColumns {
|
||||
if col.ID != model.ExtraHandleID {
|
||||
|
||||
Reference in New Issue
Block a user