executor: some clean up. (#5156)

This commit is contained in:
Yiding Cui
2017-11-20 15:41:12 +08:00
committed by Han Fei
parent a220cff85e
commit 4a75a103e4
2 changed files with 10 additions and 86 deletions

View File

@ -184,13 +184,9 @@ type AnalyzeIndexExec struct {
}
func (e *AnalyzeIndexExec) open() error {
fieldTypes := make([]*types.FieldType, len(e.idxInfo.Columns))
for i, v := range e.idxInfo.Columns {
fieldTypes[i] = &(e.tblInfo.Columns[v.Offset].FieldType)
}
idxRange := &types.IndexRange{LowVal: []types.Datum{types.MinNotNullDatum()}, HighVal: []types.Datum{types.MaxValueDatum()}}
var builder requestBuilder
kvReq, err := builder.SetIndexRanges(e.ctx.GetSessionVars().StmtCtx, e.tblInfo.ID, e.idxInfo.ID, []*types.IndexRange{idxRange}, fieldTypes).
kvReq, err := builder.SetIndexRanges(e.tblInfo.ID, e.idxInfo.ID, []*types.IndexRange{idxRange}).
SetAnalyzeRequest(e.analyzePB).
SetKeepOrder(true).
SetPriority(e.priority).

View File

@ -169,14 +169,9 @@ func indexValuesToKVRanges(tid, idxID int64, values [][]types.Datum) ([]kv.KeyRa
return krs, nil
}
func indexRangesToKVRanges(sc *variable.StatementContext, tid, idxID int64, ranges []*types.IndexRange, fieldTypes []*types.FieldType) ([]kv.KeyRange, error) {
func indexRangesToKVRanges(tid, idxID int64, ranges []*types.IndexRange) ([]kv.KeyRange, error) {
krs := make([]kv.KeyRange, 0, len(ranges))
for _, ran := range ranges {
err := convertIndexRangeTypes(sc, ran, fieldTypes)
if err != nil {
return nil, errors.Trace(err)
}
low, err := codec.EncodeKey(nil, ran.LowVal...)
if err != nil {
return nil, errors.Trace(err)
@ -198,66 +193,7 @@ func indexRangesToKVRanges(sc *variable.StatementContext, tid, idxID int64, rang
return krs, nil
}
func convertIndexRangeTypes(sc *variable.StatementContext, ran *types.IndexRange, fieldTypes []*types.FieldType) error {
for i := range ran.LowVal {
if ran.LowVal[i].Kind() == types.KindMinNotNull || ran.LowVal[i].Kind() == types.KindMaxValue {
continue
}
converted, err := ran.LowVal[i].ConvertTo(sc, fieldTypes[i])
if err != nil {
return errors.Trace(err)
}
cmp, err := converted.CompareDatum(sc, &ran.LowVal[i])
if err != nil {
return errors.Trace(err)
}
ran.LowVal[i] = converted
if cmp == 0 {
continue
}
if cmp < 0 && !ran.LowExclude {
// For int column a, a >= 1.1 is converted to a > 1.
ran.LowExclude = true
} else if cmp > 0 && ran.LowExclude {
// For int column a, a > 1.9 is converted to a >= 2.
ran.LowExclude = false
}
// The converted value has changed, the other column values doesn't matter.
// For equal condition, converted value changed means there will be no match.
// For non equal condition, this column would be the last one to build the range.
// Break here to prevent the rest columns modify LowExclude again.
break
}
for i := range ran.HighVal {
if ran.HighVal[i].Kind() == types.KindMaxValue || ran.LowVal[i].Kind() == types.KindNull {
continue
}
converted, err := ran.HighVal[i].ConvertTo(sc, fieldTypes[i])
if err != nil {
return errors.Trace(err)
}
cmp, err := converted.CompareDatum(sc, &ran.HighVal[i])
if err != nil {
return errors.Trace(err)
}
ran.HighVal[i] = converted
if cmp == 0 {
continue
}
// For int column a, a < 1.1 is converted to a <= 1.
if cmp < 0 && ran.HighExclude {
ran.HighExclude = false
}
// For int column a, a <= 1.9 is converted to a < 2.
if cmp > 0 && !ran.HighExclude {
ran.HighExclude = true
}
break
}
return nil
}
func extractHandlesFromNewIndexResult(idxResult distsql.SelectResult) (handles []int64, finish bool, err error) {
func extractHandlesFromIndexResult(idxResult distsql.SelectResult) (handles []int64, finish bool, err error) {
subResult, e0 := idxResult.Next()
if e0 != nil {
err = errors.Trace(e0)
@ -267,14 +203,14 @@ func extractHandlesFromNewIndexResult(idxResult distsql.SelectResult) (handles [
finish = true
return
}
handles, err = extractHandlesFromNewIndexSubResult(subResult)
handles, err = extractHandlesFromIndexSubResult(subResult)
if err != nil {
err = errors.Trace(err)
}
return
}
func extractHandlesFromNewIndexSubResult(subResult distsql.PartialResult) ([]int64, error) {
func extractHandlesFromIndexSubResult(subResult distsql.PartialResult) ([]int64, error) {
defer terror.Call(subResult.Close)
var (
handles []int64
@ -558,12 +494,8 @@ func (e *IndexReaderExecutor) Open() error {
span, goCtx := startSpanFollowsContext(e.ctx.GoCtx(), "executor.IndexReader.Open")
defer span.Finish()
fieldTypes := make([]*types.FieldType, len(e.index.Columns))
for i, v := range e.index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
}
var builder requestBuilder
kvReq, err := builder.SetIndexRanges(e.ctx.GetSessionVars().StmtCtx, e.tableID, e.index.ID, e.ranges, fieldTypes).
kvReq, err := builder.SetIndexRanges(e.tableID, e.index.ID, e.ranges).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
@ -661,7 +593,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(goCtx goctx.Context, kvRanges []k
// at the same time to keep data ordered.
func (worker *indexWorker) fetchHandles(e *IndexLookUpExecutor, result distsql.SelectResult, workCh chan<- *lookupTableTask, goCtx goctx.Context, finished <-chan struct{}) {
for {
handles, finish, err := extractHandlesFromNewIndexResult(result)
handles, finish, err := extractHandlesFromIndexResult(result)
if err != nil {
doneCh := make(chan error, 1)
doneCh <- errors.Trace(err)
@ -763,11 +695,7 @@ func (e *IndexLookUpExecutor) open(kvRanges []kv.KeyRange) error {
}
func (e *IndexLookUpExecutor) indexRangesToKVRanges() ([]kv.KeyRange, error) {
fieldTypes := make([]*types.FieldType, len(e.index.Columns))
for i, v := range e.index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
}
return indexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.tableID, e.index.ID, e.ranges, fieldTypes)
return indexRangesToKVRanges(e.tableID, e.index.ID, e.ranges)
}
// executeTask executes the table look up tasks. We will construct a table reader and send request by handles.
@ -904,11 +832,11 @@ func (builder *requestBuilder) SetTableRanges(tid int64, tableRanges []types.Int
return builder
}
func (builder *requestBuilder) SetIndexRanges(sc *variable.StatementContext, tid, idxID int64, ranges []*types.IndexRange, fieldTypes []*types.FieldType) *requestBuilder {
func (builder *requestBuilder) SetIndexRanges(tid, idxID int64, ranges []*types.IndexRange) *requestBuilder {
if builder.err != nil {
return builder
}
builder.Request.KeyRanges, builder.err = indexRangesToKVRanges(sc, tid, idxID, ranges, fieldTypes)
builder.Request.KeyRanges, builder.err = indexRangesToKVRanges(tid, idxID, ranges)
return builder
}