// Copyright 2018 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package executor import ( "context" "fmt" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tipb/go-tipb" ) // make sure `TableReaderExecutor` implements `Executor`. var _ Executor = &TableReaderExecutor{} // selectResultHook is used to hack distsql.SelectWithRuntimeStats safely for testing. type selectResultHook struct { selectResultFunc func(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (distsql.SelectResult, error) } func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (distsql.SelectResult, error) { if sr.selectResultFunc == nil { return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs) } return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs) } // TableReaderExecutor sends DAG request and reads table data from kv layer. type TableReaderExecutor struct { baseExecutor table table.Table keepOrder bool desc bool ranges []*ranger.Range // kvRanges are only use for union scan. kvRanges []kv.KeyRange dagPB *tipb.DAGRequest // columns are only required by union scan. columns []*model.ColumnInfo // resultHandler handles the order of the result. Since (MAXInt64, MAXUint64] stores before [0, MaxInt64] physically // for unsigned int. resultHandler *tableResultHandler streaming bool feedback *statistics.QueryFeedback // corColInFilter tells whether there's correlated column in filter. corColInFilter bool // corColInAccess tells whether there's correlated column in access conditions. corColInAccess bool plans []plannercore.PhysicalPlan memTracker *memory.Tracker selectResultHook // for testing } // Open initialzes necessary variables for using this executor. func (e *TableReaderExecutor) Open(ctx context.Context) error { e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) var err error if e.corColInFilter { e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) if err != nil { return err } } if e.runtimeStats != nil { collExec := true e.dagPB.CollectExecutionSummaries = &collExec } if e.corColInAccess { ts := e.plans[0].(*plannercore.PhysicalTableScan) access := ts.AccessCondition pkTP := ts.Table.GetPkColInfo().FieldType e.ranges, err = ranger.BuildTableRange(access, e.ctx.GetSessionVars().StmtCtx, &pkTP) if err != nil { return err } } e.resultHandler = &tableResultHandler{} if e.feedback != nil && e.feedback.Hist != nil { // EncodeInt don't need *statement.Context. var ok bool e.ranges, ok = e.feedback.Hist.SplitRange(nil, e.ranges, false) if !ok { e.feedback.Invalidate() } } firstPartRanges, secondPartRanges := splitRanges(e.ranges, e.keepOrder, e.desc) firstResult, err := e.buildResp(ctx, firstPartRanges) if err != nil { e.feedback.Invalidate() return err } if len(secondPartRanges) == 0 { e.resultHandler.open(nil, firstResult) return nil } var secondResult distsql.SelectResult secondResult, err = e.buildResp(ctx, secondPartRanges) if err != nil { e.feedback.Invalidate() return err } e.resultHandler.open(firstResult, secondResult) return nil } // Next fills data into the chunk passed by its caller. // The task was actually done by tableReaderHandler. func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if err := e.resultHandler.nextChunk(ctx, req); err != nil { e.feedback.Invalidate() return err } return nil } // Close implements the Executor Close interface. func (e *TableReaderExecutor) Close() error { var err error if e.resultHandler != nil { err = e.resultHandler.Close() } if e.runtimeStats != nil { copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.plans[0].ExplainID().String()) copStats.SetRowNum(e.feedback.Actual()) } e.ctx.StoreQueryFeedback(e.feedback) return err } // buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResut returned by the callee // to fetch all results. func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) { var builder distsql.RequestBuilder kvReq, err := builder.SetTableRanges(getPhysicalTableID(e.table), ranges, e.feedback). SetDAGRequest(e.dagPB). SetDesc(e.desc). SetKeepOrder(e.keepOrder). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). SetMemTracker(e.memTracker). Build() if err != nil { return nil, err } e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans)) if err != nil { return nil, err } result.Fetch(ctx) return result, nil } type tableResultHandler struct { // If the pk is unsigned and we have KeepOrder=true and want ascending order, // `optionalResult` will handles the request whose range is in signed int range, and // `result` will handle the request whose range is exceed signed int range. // If we want descending order, `optionalResult` will handles the request whose range is exceed signed, and // the `result` will handle the request whose range is in signed. // Otherwise, we just set `optionalFinished` true and the `result` handles the whole ranges. optionalResult distsql.SelectResult result distsql.SelectResult optionalFinished bool } func (tr *tableResultHandler) open(optionalResult, result distsql.SelectResult) { if optionalResult == nil { tr.optionalFinished = true tr.result = result return } tr.optionalResult = optionalResult tr.result = result tr.optionalFinished = false } func (tr *tableResultHandler) nextChunk(ctx context.Context, chk *chunk.Chunk) error { if !tr.optionalFinished { err := tr.optionalResult.Next(ctx, chk) if err != nil { return err } if chk.NumRows() > 0 { return nil } tr.optionalFinished = true } return tr.result.Next(ctx, chk) } func (tr *tableResultHandler) nextRaw(ctx context.Context) (data []byte, err error) { if !tr.optionalFinished { data, err = tr.optionalResult.NextRaw(ctx) if err != nil { return nil, err } if data != nil { return data, nil } tr.optionalFinished = true } data, err = tr.result.NextRaw(ctx) if err != nil { return nil, err } return data, nil } func (tr *tableResultHandler) Close() error { err := closeAll(tr.optionalResult, tr.result) tr.optionalResult, tr.result = nil, nil return err }