Files
tidb/pkg/executor/distsql.go

1992 lines
61 KiB
Go

// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"bytes"
"context"
"fmt"
"runtime/trace"
"slices"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/distsql"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/executor/internal/builder"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/metrics"
"github.com/pingcap/tidb/pkg/expression"
isctx "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
"github.com/pingcap/tidb/pkg/planner/planctx"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/channel"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/logutil/consistency"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/ranger"
rangerctx "github.com/pingcap/tidb/pkg/util/ranger/context"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)
var (
_ exec.Executor = &TableReaderExecutor{}
_ exec.Executor = &IndexReaderExecutor{}
_ exec.Executor = &IndexLookUpExecutor{}
)
// LookupTableTaskChannelSize represents the channel size of the index double read taskChan.
var LookupTableTaskChannelSize int32 = 50
// lookupTableTask is created from a partial result of an index request which
// contains the handles in those index keys.
type lookupTableTask struct {
id int
handles []kv.Handle
rowIdx []int // rowIdx represents the handle index for every row. Only used when keep order.
rows []chunk.Row
idxRows *chunk.Chunk
cursor int
// after the cop task is built, buildDone will be set to the current instant, for Next wait duration statistic.
buildDoneTime time.Time
doneCh chan error
// indexOrder map is used to save the original index order for the handles.
// Without this map, the original index order might be lost.
// The handles fetched from index is originally ordered by index, but we need handles to be ordered by itself
// to do table request.
indexOrder *kv.HandleMap
// duplicatedIndexOrder map likes indexOrder. But it's used when checkIndexValue isn't nil and
// the same handle of index has multiple values.
duplicatedIndexOrder *kv.HandleMap
// partitionTable indicates whether this task belongs to a partition table and which partition table it is.
partitionTable table.PhysicalTable
// memUsage records the memory usage of this task calculated by table worker.
// memTracker is used to release memUsage after task is done and unused.
//
// The sequence of function calls are:
// 1. calculate task.memUsage.
// 2. task.memTracker = tableWorker.memTracker
// 3. task.memTracker.Consume(task.memUsage)
// 4. task.memTracker.Consume(-task.memUsage)
//
// Step 1~3 are completed in "tableWorker.executeTask".
// Step 4 is completed in "IndexLookUpExecutor.Next".
memUsage int64
memTracker *memory.Tracker
}
func (task *lookupTableTask) Len() int {
return len(task.rows)
}
func (task *lookupTableTask) Less(i, j int) bool {
return task.rowIdx[i] < task.rowIdx[j]
}
func (task *lookupTableTask) Swap(i, j int) {
task.rowIdx[i], task.rowIdx[j] = task.rowIdx[j], task.rowIdx[i]
task.rows[i], task.rows[j] = task.rows[j], task.rows[i]
}
// Closeable is a interface for closeable structures.
type Closeable interface {
// Close closes the object.
Close() error
}
// closeAll closes all objects even if an object returns an error.
// If multiple objects returns error, the first error will be returned.
func closeAll(objs ...Closeable) error {
var err error
for _, obj := range objs {
if obj != nil {
err1 := obj.Close()
if err == nil && err1 != nil {
err = err1
}
}
}
if err != nil {
return errors.Trace(err)
}
return nil
}
// rebuildIndexRanges will be called if there's correlated column in access conditions. We will rebuild the range
// by substituting correlated column with the constant.
func rebuildIndexRanges(ectx expression.BuildContext, rctx *rangerctx.RangerContext, is *physicalop.PhysicalIndexScan, idxCols []*expression.Column, colLens []int) (ranges []*ranger.Range, err error) {
access := make([]expression.Expression, 0, len(is.AccessCondition))
for _, cond := range is.AccessCondition {
newCond, err1 := expression.SubstituteCorCol2Constant(ectx, cond)
if err1 != nil {
return nil, err1
}
access = append(access, newCond)
}
// All of access conditions must be used to build ranges, so we don't limit range memory usage.
ranges, _, err = ranger.DetachSimpleCondAndBuildRangeForIndex(rctx, access, idxCols, colLens, 0)
return ranges, err
}
type indexReaderExecutorContext struct {
rctx *rangerctx.RangerContext
dctx *distsqlctx.DistSQLContext
ectx expression.BuildContext
infoSchema isctx.MetaOnlyInfoSchema
buildPBCtx *planctx.BuildPBContext
stmtMemTracker *memory.Tracker
}
func newIndexReaderExecutorContext(sctx sessionctx.Context) indexReaderExecutorContext {
pctx := sctx.GetPlanCtx()
return indexReaderExecutorContext{
rctx: pctx.GetRangerCtx(),
dctx: sctx.GetDistSQLCtx(),
ectx: sctx.GetExprCtx(),
infoSchema: pctx.GetInfoSchema(),
buildPBCtx: pctx.GetBuildPBCtx(),
stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker,
}
}
// IndexReaderExecutor sends dag request and reads index data from kv layer.
type IndexReaderExecutor struct {
indexReaderExecutorContext
exec.BaseExecutorV2
indexUsageReporter *exec.IndexUsageReporter
// For a partitioned table, the IndexReaderExecutor works on a partition, so
// the type of this table field is actually `table.PhysicalTable`.
table table.Table
index *model.IndexInfo
physicalTableID int64
ranges []*ranger.Range
// groupedRanges is from AccessPath.groupedRanges, please see the comment there for more details.
// In brief, it splits IndexReaderExecutor.ranges into groups. When it's set, we need to access them respectively
// and use a merge sort to combine them.
groupedRanges [][]*ranger.Range
partitions []table.PhysicalTable
partRangeMap map[int64][]*ranger.Range // each partition may have different ranges
// kvRanges are only used for union scan.
kvRanges []kv.KeyRange
dagPB *tipb.DAGRequest
startTS uint64
txnScope string
readReplicaScope string
isStaleness bool
netDataSize float64
// result returns one or more distsql.PartialResult and each PartialResult is returned by one region.
result distsql.SelectResult
// columns are only required by union scan.
columns []*model.ColumnInfo
// outputColumns are only required by union scan.
outputColumns []*expression.Column
// partitionIDMap are only required by union scan with global index.
partitionIDMap map[int64]struct{}
paging bool
keepOrder bool
desc bool
// byItems only for partition table with orderBy + pushedLimit
byItems []*plannerutil.ByItems
corColInFilter bool
corColInAccess bool
idxCols []*expression.Column
colLens []int
plans []base.PhysicalPlan
memTracker *memory.Tracker
selectResultHook // for testing
// If dummy flag is set, this is not a real IndexReader, it just provides the KV ranges for UnionScan.
// Used by the temporary table, cached table.
dummy bool
}
// Table implements the dataSourceExecutor interface.
func (e *IndexReaderExecutor) Table() table.Table {
return e.table
}
func (e *IndexReaderExecutor) setDummy() {
e.dummy = true
}
// Close clears all resources hold by current object.
func (e *IndexReaderExecutor) Close() (err error) {
if e.indexUsageReporter != nil {
e.indexUsageReporter.ReportCopIndexUsageForTable(e.table, e.index.ID, e.plans[0].ID())
}
if e.result != nil {
err = e.result.Close()
}
e.result = nil
e.kvRanges = e.kvRanges[:0]
if e.dummy {
return nil
}
return err
}
// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.dummy {
req.Reset()
return nil
}
return e.result.Next(ctx, req)
}
// Open implements the Executor Open interface.
func (e *IndexReaderExecutor) Open(ctx context.Context) error {
var err error
if e.corColInAccess {
is := e.plans[0].(*physicalop.PhysicalIndexScan)
e.ranges, err = rebuildIndexRanges(e.ectx, e.rctx, is, e.idxCols, e.colLens)
if err != nil {
return err
}
// Rebuild groupedRanges if it was originally set
if len(is.GroupByColIdxs) != 0 {
e.groupedRanges, err = plannercore.GroupRangesByCols(e.ranges, is.GroupByColIdxs)
if err != nil {
return err
}
}
}
// partRangeMap comes from the index join code path, while groupedRanges will not be set in that case.
// They are two different sources of ranges, and should not appear together.
intest.Assert(!(len(e.partRangeMap) > 0 && len(e.groupedRanges) > 0), "partRangeMap and groupedRanges should not appear together")
// Build kvRanges considering both partitions and groupedRanges
kvRanges, err := e.buildKVRangesForIndexReader()
if err != nil {
return err
}
return e.open(ctx, kvRanges)
}
// buildKVRangesForIndexReader builds kvRanges for IndexReaderExecutor considering both partitions and groupedRanges.
func (e *IndexReaderExecutor) buildKVRangesForIndexReader() ([]kv.KeyRange, error) {
tableIDs := make([]int64, 0, len(e.partitions))
for _, p := range e.partitions {
tableIDs = append(tableIDs, p.GetPhysicalID())
}
if len(e.partitions) == 0 {
tableIDs = append(tableIDs, e.physicalTableID)
}
groupedRanges := e.groupedRanges
if len(groupedRanges) == 0 {
groupedRanges = [][]*ranger.Range{e.ranges}
}
results := make([]kv.KeyRange, 0, len(groupedRanges))
for _, ranges := range groupedRanges {
kvRanges, err := buildKeyRanges(e.dctx, ranges, e.partRangeMap, tableIDs, e.index.ID, nil)
if err != nil {
return nil, err
}
for _, kvRange := range kvRanges {
results = append(results, kvRange...)
}
}
return results, nil
}
func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) {
var builder distsql.RequestBuilder
builder.SetKeyRanges(r).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.dctx).
SetFromInfoSchema(e.infoSchema).
SetMemTracker(e.memTracker).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.dctx, &builder.Request, e.netDataSize)).
SetConnIDAndConnAlias(e.dctx.ConnectionID, e.dctx.SessionAlias)
kvReq, err := builder.Build()
return kvReq, err
}
func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.buildPBCtx, e.plans)
if err != nil {
return err
}
}
if e.RuntimeStats() != nil {
collExec := true
e.dagPB.CollectExecutionSummaries = &collExec
}
e.kvRanges = kvRanges
// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
// In a test case IndexReaderExecutor is mocked and e.table is nil.
// Avoid sending distsql request to TIKV.
if e.dummy {
return nil
}
if e.memTracker != nil {
e.memTracker.Reset()
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.stmtMemTracker)
slices.SortFunc(kvRanges, func(i, j kv.KeyRange) int {
return bytes.Compare(i.StartKey, j.StartKey)
})
if !needMergeSort(e.byItems, len(kvRanges)) {
kvReq, err := e.buildKVReq(kvRanges)
if err != nil {
return err
}
e.result, err = e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
if err != nil {
return err
}
} else {
// Use sortedSelectResults for merge sort
kvReqs := make([]*kv.Request, 0, len(kvRanges))
for _, kvRange := range kvRanges {
kvReq, err := e.buildKVReq([]kv.KeyRange{kvRange})
if err != nil {
return err
}
kvReqs = append(kvReqs, kvReq)
}
var results []distsql.SelectResult
for _, kvReq := range kvReqs {
result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
if err != nil {
return err
}
results = append(results, result)
}
e.result = distsql.NewSortedSelectResults(e.ectx.GetEvalCtx(), results, e.Schema(), e.byItems, e.memTracker)
}
return nil
}
type indexLookUpExecutorContext struct {
tableReaderExecutorContext
stmtRuntimeStatsColl *execdetails.RuntimeStatsColl
indexLookupSize int
indexLookupConcurrency int
enableRedactLog string
storage kv.Storage
weakConsistency bool
}
func newIndexLookUpExecutorContext(sctx sessionctx.Context) indexLookUpExecutorContext {
return indexLookUpExecutorContext{
tableReaderExecutorContext: newTableReaderExecutorContext(sctx),
stmtRuntimeStatsColl: sctx.GetSessionVars().StmtCtx.RuntimeStatsColl,
indexLookupSize: sctx.GetSessionVars().IndexLookupSize,
indexLookupConcurrency: sctx.GetSessionVars().IndexLookupConcurrency(),
enableRedactLog: sctx.GetSessionVars().EnableRedactLog,
storage: sctx.GetStore(),
weakConsistency: sctx.GetSessionVars().StmtCtx.WeakConsistency,
}
}
// IndexLookUpExecutor implements double read for index scan.
type IndexLookUpExecutor struct {
indexLookUpExecutorContext
exec.BaseExecutorV2
indexUsageReporter *exec.IndexUsageReporter
table table.Table
index *model.IndexInfo
ranges []*ranger.Range
// groupedRanges is from AccessPath.groupedRanges, please see the comment there for more details.
// In brief, it splits IndexLookUpExecutor.ranges into groups. When it's set, we need to access them respectively
// and use a merge sort to combine them.
groupedRanges [][]*ranger.Range
dagPB *tipb.DAGRequest
startTS uint64
// handleIdx is the index of handle, which is only used for case of keeping order.
handleIdx []int
handleCols []*expression.Column
primaryKeyIndex *model.IndexInfo
tableRequest *tipb.DAGRequest
// columns are only required by union scan.
columns []*model.ColumnInfo
// partitionIDMap are only required by union scan with global index.
partitionIDMap map[int64]struct{}
*dataReaderBuilder
idxNetDataSize float64
avgRowSize float64
// fields about accessing partition tables
partitionTableMode bool // if this executor is accessing a local index with partition table
prunedPartitions []table.PhysicalTable // partition tables need to access
partitionRangeMap map[int64][]*ranger.Range
// All fields above are immutable.
idxWorkerWg *sync.WaitGroup
tblWorkerWg *sync.WaitGroup
finished chan struct{}
resultCh chan *lookupTableTask
resultCurr *lookupTableTask
// memTracker is used to track the memory usage of this executor.
memTracker *memory.Tracker
// checkIndexValue is used to check the consistency of the index data.
*checkIndexValue
// groupedKVRanges is built from ranger.Range and needed to access tikv. It's a unified form that considers both
// ranges and groupedRanges, and also considers the partitioned table.
// The extra PhysicalTableID is needed by the memIndexLookUpReader because it can't get it from PartitionHandle like
// the IndexLookUpExecutor here.
groupedKVRanges []*kvRangesWithPhysicalTblID
workerStarted bool
byItems []*plannerutil.ByItems
keepOrder bool
desc bool
indexPaging bool
corColInIdxSide bool
corColInTblSide bool
corColInAccess bool
idxPlans []base.PhysicalPlan
tblPlans []base.PhysicalPlan
idxCols []*expression.Column
colLens []int
// PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader.
PushedLimit *physicalop.PushedDownLimit
stats *IndexLookUpRunTimeStats
// cancelFunc is called when close the executor
cancelFunc context.CancelFunc
workerCtx context.Context
pool *workerPool
// If dummy flag is set, this is not a real IndexLookUpReader, it just provides the KV ranges for UnionScan.
// Used by the temporary table, cached table.
dummy bool
// Whether to push down the index lookup to TiKV
indexLookUpPushDown bool
}
type kvRangesWithPhysicalTblID struct {
PhysicalTableID int64
KeyRanges []kv.KeyRange
}
type getHandleType int8
const (
getHandleFromIndex getHandleType = iota
getHandleFromTable
)
// nolint:structcheck
type checkIndexValue struct {
idxColTps []*types.FieldType
idxTblCols []*table.Column
}
// Table implements the dataSourceExecutor interface.
func (e *IndexLookUpExecutor) Table() table.Table {
return e.table
}
func (e *IndexLookUpExecutor) setDummy() {
e.dummy = true
}
// Open implements the Executor Open interface.
func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
var err error
if e.corColInAccess {
is := e.idxPlans[0].(*physicalop.PhysicalIndexScan)
e.ranges, err = rebuildIndexRanges(e.ectx, e.rctx, is, e.idxCols, e.colLens)
if err != nil {
return err
}
// Rebuild groupedRanges if it was originally set
if len(is.GroupByColIdxs) != 0 {
e.groupedRanges, err = plannercore.GroupRangesByCols(e.ranges, is.GroupByColIdxs)
if err != nil {
return err
}
}
}
if e.memTracker != nil {
e.memTracker.Reset()
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.stmtMemTracker)
// partitionRangeMap comes from the index join code path, while groupedRanges will not be set in that case.
// They are two different sources of ranges, and should not appear together.
intest.Assert(!(len(e.partitionRangeMap) > 0 && len(e.groupedRanges) > 0),
"partitionRangeMap and groupedRanges should not appear together")
err = e.buildTableKeyRanges()
if err != nil {
return err
}
// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
if e.dummy {
return nil
}
return e.open(ctx)
}
func buildKeyRanges(dctx *distsqlctx.DistSQLContext,
ranges []*ranger.Range,
rangeOverrideForPartitionID map[int64][]*ranger.Range,
physicalIDs []int64,
indexID int64,
memTracker *memory.Tracker,
) ([][]kv.KeyRange, error) {
results := make([][]kv.KeyRange, 0, len(physicalIDs))
for _, physicalID := range physicalIDs {
if pRange, ok := rangeOverrideForPartitionID[physicalID]; ok {
ranges = pRange
}
if indexID == -1 {
rRanges, err := distsql.CommonHandleRangesToKVRanges(dctx, []int64{physicalID}, ranges)
if err != nil {
return nil, err
}
results = append(results, rRanges.FirstPartitionRange())
} else {
singleRanges, err := distsql.IndexRangesToKVRangesWithInterruptSignal(dctx, physicalID, indexID, ranges, memTracker, nil)
if err != nil {
return nil, err
}
results = append(results, singleRanges.FirstPartitionRange())
}
}
return results, nil
}
func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) {
tableIDs := make([]int64, 0, len(e.prunedPartitions))
if e.partitionTableMode {
for _, p := range e.prunedPartitions {
tableIDs = append(tableIDs, p.GetPhysicalID())
}
} else {
tableIDs = append(tableIDs, getPhysicalTableID(e.table))
}
groupedRanges := e.groupedRanges
if len(groupedRanges) == 0 {
groupedRanges = [][]*ranger.Range{e.ranges}
}
kvRanges := make([][]kv.KeyRange, 0, len(groupedRanges))
physicalTblIDsForPartitionKVRanges := make([]int64, 0, len(tableIDs)*len(groupedRanges))
for _, ranges := range groupedRanges {
kvRange, err := buildKeyRanges(e.dctx, ranges, e.partitionRangeMap, tableIDs, e.index.ID, e.memTracker)
if err != nil {
return err
}
kvRanges = append(kvRanges, kvRange...)
physicalTblIDsForPartitionKVRanges = append(physicalTblIDsForPartitionKVRanges, tableIDs...)
}
if len(kvRanges) > 1 {
// If there are more than one kv ranges, it must come from the partitioned table, or groupedRanges, or both.
intest.Assert(e.partitionTableMode || len(e.groupedRanges) > 0)
}
e.groupedKVRanges = make([]*kvRangesWithPhysicalTblID, 0, len(kvRanges))
for i, kvRange := range kvRanges {
partitionKVRange := &kvRangesWithPhysicalTblID{
PhysicalTableID: physicalTblIDsForPartitionKVRanges[i],
KeyRanges: kvRange,
}
e.groupedKVRanges = append(e.groupedKVRanges, partitionKVRange)
}
return nil
}
func (e *IndexLookUpExecutor) open(_ context.Context) error {
// We have to initialize "memTracker" and other execution resources in here
// instead of in function "Open", because this "IndexLookUpExecutor" may be
// constructed by a "IndexLookUpJoin" and "Open" will not be called in that
// situation.
e.initRuntimeStats()
if e.memTracker != nil {
e.memTracker.Reset()
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.stmtMemTracker)
e.finished = make(chan struct{})
e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
var err error
if e.corColInIdxSide {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.buildPBCtx, e.idxPlans)
if err != nil {
return err
}
}
if e.corColInTblSide {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.buildPBCtx, e.tblPlans)
if err != nil {
return err
}
}
e.idxWorkerWg = &sync.WaitGroup{}
e.tblWorkerWg = &sync.WaitGroup{}
return nil
}
func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize int) error {
// indexWorker will submit lookup-table tasks (processed by tableWorker) to the pool,
// so fetching index and getting table data can run concurrently.
e.workerCtx, e.cancelFunc = context.WithCancel(ctx)
e.pool = &workerPool{
needSpawn: func(workers, tasks uint32) bool {
return workers < uint32(e.indexLookupConcurrency) && tasks > 1
},
}
if err := e.startIndexWorker(ctx, initBatchSize); err != nil {
return err
}
e.workerStarted = true
return nil
}
func (e *IndexLookUpExecutor) needPartitionHandle(tp getHandleType) (bool, error) {
if e.indexLookUpPushDown {
// For index lookup push down, needPartitionHandle should always return false because
// global index or keep order for partition table is not supported now.
intest.Assert(!e.index.Global && !e.keepOrder)
return false, nil
}
var col *expression.Column
var needPartitionHandle bool
if tp == getHandleFromIndex {
cols := e.idxPlans[0].Schema().Columns
outputOffsets := e.dagPB.OutputOffsets
col = cols[outputOffsets[len(outputOffsets)-1]]
// For indexScan, need partitionHandle when global index or keepOrder with partitionTable
needPartitionHandle = e.index.Global || e.partitionTableMode && e.keepOrder
} else {
cols := e.tblPlans[0].Schema().Columns
outputOffsets := e.tableRequest.OutputOffsets
col = cols[outputOffsets[len(outputOffsets)-1]]
// For TableScan, need partitionHandle in `indexOrder` when e.keepOrder == true or execute `admin check [table|index]` with global index
needPartitionHandle = ((e.index.Global || e.partitionTableMode) && e.keepOrder) || (e.index.Global && e.checkIndexValue != nil)
}
hasExtraCol := col.ID == model.ExtraPhysTblID
// There will be two needPartitionHandle != hasExtraCol situations.
// Only `needPartitionHandle` == true and `hasExtraCol` == false are not allowed.
// `ExtraPhysTblID` will be used in `SelectLock` when `needPartitionHandle` == false and `hasExtraCol` == true.
if needPartitionHandle && !hasExtraCol {
return needPartitionHandle, errors.Errorf("Internal error, needPartitionHandle != ret, tp(%d)", tp)
}
return needPartitionHandle, nil
}
func (e *IndexLookUpExecutor) isCommonHandle() bool {
return !(len(e.handleCols) == 1 && e.handleCols[0].ID == model.ExtraHandleID) && e.table.Meta() != nil && e.table.Meta().IsCommonHandle
}
func (e *IndexLookUpExecutor) getRetTpsForIndexReader() []*types.FieldType {
if e.checkIndexValue != nil {
return e.idxColTps
}
var tps []*types.FieldType
if len(e.byItems) != 0 {
for _, item := range e.byItems {
tps = append(tps, item.Expr.GetType(e.ectx.GetEvalCtx()))
}
}
if e.isCommonHandle() {
for _, handleCol := range e.handleCols {
tps = append(tps, handleCol.RetType)
}
} else {
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
}
if ok, _ := e.needPartitionHandle(getHandleFromIndex); ok {
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
}
return tps
}
// startIndexWorker launch a background goroutine to fetch handles, submit lookup-table tasks to the pool.
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, initBatchSize int) error {
if e.RuntimeStats() != nil {
collExec := true
e.dagPB.CollectExecutionSummaries = &collExec
}
tracker := memory.NewTracker(memory.LabelForIndexWorker, -1)
tracker.AttachTo(e.memTracker)
kvRanges := make([][]kv.KeyRange, 0, len(e.groupedKVRanges))
for _, ranges := range e.groupedKVRanges {
kvRanges = append(kvRanges, ranges.KeyRanges)
}
// When len(kvrange) = 1, no sorting is required,
// so remove byItems and non-necessary output columns
if len(kvRanges) == 1 {
e.dagPB.OutputOffsets = e.dagPB.OutputOffsets[len(e.byItems):]
e.byItems = nil
}
var tps []*types.FieldType
tblScanIdxForRewritePartitionID := -1
if e.indexLookUpPushDown {
tps = e.RetFieldTypes()
if e.partitionTableMode {
for idx, executor := range e.dagPB.Executors {
if executor.Tp == tipb.ExecType_TypeTableScan {
tblScanIdxForRewritePartitionID = idx
break
}
}
if tblScanIdxForRewritePartitionID < 0 {
intest.Assert(false)
return errors.New("cannot find table scan executor in for partition index lookup push down")
}
}
} else {
tps = e.getRetTpsForIndexReader()
}
idxID := e.getIndexPlanRootID()
e.idxWorkerWg.Add(1)
e.pool.submit(func() {
defer trace.StartRegion(ctx, "IndexLookUpIndexTask").End()
growWorkerStack16K()
worker := &indexWorker{
idxLookup: e,
finished: e.finished,
resultCh: e.resultCh,
keepOrder: e.keepOrder,
checkIndexValue: e.checkIndexValue,
maxBatchSize: e.indexLookupSize,
maxChunkSize: e.MaxChunkSize(),
PushedLimit: e.PushedLimit,
}
worker.batchSize = e.calculateBatchSize(initBatchSize, worker.maxBatchSize)
results := make([]distsql.SelectResult, 0, len(kvRanges))
for idx, kvRange := range kvRanges {
// check if executor is closed
finished := false
select {
case <-e.finished:
finished = true
default:
}
if finished {
break
}
if tblScanIdxForRewritePartitionID >= 0 {
// We should set the TblScan's TableID to the partition physical ID to make sure
// the push-down index lookup can encode the table handle key correctly.
e.dagPB.Executors[tblScanIdxForRewritePartitionID].TblScan.TableId = e.prunedPartitions[idx].GetPhysicalID()
}
var builder distsql.RequestBuilder
builder.SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.dctx).
SetFromInfoSchema(e.infoSchema).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.dctx, &builder.Request, e.idxNetDataSize/float64(len(kvRanges)))).
SetMemTracker(tracker).
SetConnIDAndConnAlias(e.dctx.ConnectionID, e.dctx.SessionAlias)
if e.indexLookUpPushDown {
// Paging and Cop-cache is not supported in index lookup push down.
builder.Request.Paging.Enable = false
builder.Request.Cacheable = false
}
if builder.Request.Paging.Enable && builder.Request.Paging.MinPagingSize < uint64(worker.batchSize) {
// when paging enabled and Paging.MinPagingSize less than initBatchSize, change Paging.MinPagingSize to
// initBatchSize to avoid redundant paging RPC, see more detail in https://github.com/pingcap/tidb/issues/53827
builder.Request.Paging.MinPagingSize = uint64(worker.batchSize)
if builder.Request.Paging.MaxPagingSize < uint64(worker.batchSize) {
builder.Request.Paging.MaxPagingSize = uint64(worker.batchSize)
}
}
// init kvReq, result and worker for this partition
// The key ranges should be ordered.
slices.SortFunc(kvRange, func(i, j kv.KeyRange) int {
return bytes.Compare(i.StartKey, j.StartKey)
})
kvReq, err := builder.SetKeyRanges(kvRange).Build()
if err != nil {
worker.syncErr(err)
break
}
result, err := distsql.SelectWithRuntimeStats(ctx, e.dctx, kvReq, tps, getPhysicalPlanIDs(e.idxPlans), idxID)
if err != nil {
worker.syncErr(err)
break
}
results = append(results, result)
}
if needMergeSort(e.byItems, len(results)) {
// e.Schema() not the output schema for indexReader, and we put byItems related column at first in `buildIndexReq`, so use nil here.
ssr := distsql.NewSortedSelectResults(e.ectx.GetEvalCtx(), results, nil, e.byItems, e.memTracker)
results = []distsql.SelectResult{ssr}
}
ctx1, cancel := context.WithCancel(ctx)
// this error is synced in fetchHandles(), don't sync it again
var selResultList selectResultList
indexTypes := e.getRetTpsForIndexReader()
if e.indexLookUpPushDown {
var err error
if selResultList, err = newSelectResultRowIterList(results, [][]*types.FieldType{indexTypes}); err != nil {
cancel()
worker.syncErr(err)
return
}
} else {
selResultList = newSelectResultList(results)
}
_ = worker.fetchHandles(ctx1, selResultList, indexTypes)
cancel()
selResultList.Close()
close(e.resultCh)
e.idxWorkerWg.Done()
})
return nil
}
// calculateBatchSize calculates a suitable initial batch size.
func (e *IndexLookUpExecutor) calculateBatchSize(initBatchSize, maxBatchSize int) int {
if e.indexPaging {
// If indexPaging is true means this query has limit, so use initBatchSize to avoid scan some unnecessary data.
return min(initBatchSize, maxBatchSize)
}
var estRows int
if len(e.idxPlans) > 0 {
estRows = int(e.idxPlans[0].StatsCount())
}
return CalculateBatchSize(estRows, initBatchSize, maxBatchSize)
}
// CalculateBatchSize calculates a suitable initial batch size. It exports for testing.
func CalculateBatchSize(estRows, initBatchSize, maxBatchSize int) int {
batchSize := min(initBatchSize, maxBatchSize)
if estRows >= maxBatchSize {
return maxBatchSize
}
for batchSize < estRows {
// If batchSize less than estRows, increase batch size to avoid unnecessary rpc.
batchSize = batchSize * 2
if batchSize >= maxBatchSize {
return maxBatchSize
}
}
return batchSize
}
func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookupTableTask) (*TableReaderExecutor, error) {
table := e.table
if e.partitionTableMode && task.partitionTable != nil {
table = task.partitionTable
}
tableReaderExec := &TableReaderExecutor{
BaseExecutorV2: e.BuildNewBaseExecutorV2(e.stmtRuntimeStatsColl, e.Schema(), e.getTableRootPlanID()),
tableReaderExecutorContext: e.tableReaderExecutorContext,
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
txnScope: e.txnScope,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
columns: e.columns,
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
netDataSize: e.avgRowSize * float64(len(task.handles)),
byItems: e.byItems,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true)
if err != nil {
if ctx.Err() != context.Canceled {
logutil.Logger(ctx).Error("build table reader from handles failed", zap.Error(err))
}
return nil, err
}
return tableReader, nil
}
// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
if e.stats != nil {
defer func() {
e.stmtRuntimeStatsColl.RegisterStats(e.ID(), e.stats)
var indexScanCopTasks int32
if copStats := e.stmtRuntimeStatsColl.GetCopStats(e.getIndexPlanRootID()); copStats != nil {
indexScanCopTasks = copStats.GetTasks()
}
if e.indexLookUpPushDown {
metrics.IndexLookUpExecutorWithPushDownEnabledRowNumber.Observe(float64(e.stats.indexScanBasicStats.GetActRows()))
metrics.IndexLookUpIndexScanCopTasksWithPushDownEnabled.Add(float64(indexScanCopTasks))
} else {
metrics.IndexLookUpIndexScanCopTasksNormal.Add(float64(indexScanCopTasks))
}
}()
}
if stats := e.RuntimeStats(); stats != nil {
if e.indexLookUpPushDown {
defer func() {
metrics.IndexLookUpExecutorWithPushDownEnabledDuration.Observe(time.Duration(stats.GetTime()).Seconds())
}()
}
}
if e.indexUsageReporter != nil {
e.indexUsageReporter.ReportCopIndexUsageForTable(
e.table,
e.index.ID,
e.idxPlans[0].ID())
}
if e.dummy {
return nil
}
if !e.workerStarted || e.finished == nil {
return nil
}
if e.cancelFunc != nil {
e.cancelFunc()
e.cancelFunc = nil
}
close(e.finished)
// Drain the resultCh and discard the result, in case that Next() doesn't fully
// consume the data, background worker still writing to resultCh and block forever.
channel.Clear(e.resultCh)
e.idxWorkerWg.Wait()
e.tblWorkerWg.Wait()
e.finished = nil
e.workerStarted = false
e.resultCurr = nil
return nil
}
// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.dummy {
req.Reset()
return nil
}
if !e.workerStarted {
if err := e.startWorkers(ctx, req.RequiredRows()); err != nil {
return err
}
}
req.Reset()
for {
resultTask, err := e.getResultTask()
if err != nil {
return err
}
if resultTask == nil {
return nil
}
if resultTask.cursor < len(resultTask.rows) {
numToAppend := min(len(resultTask.rows)-resultTask.cursor, req.RequiredRows()-req.NumRows())
req.AppendRows(resultTask.rows[resultTask.cursor : resultTask.cursor+numToAppend])
resultTask.cursor += numToAppend
if req.IsFull() {
return nil
}
}
}
}
func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {
if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) {
return e.resultCurr, nil
}
var (
enableStats = e.stats != nil
start time.Time
indexFetchedInstant time.Time
)
if enableStats {
start = time.Now()
}
task, ok := <-e.resultCh
if !ok {
return nil, nil
}
if enableStats {
indexFetchedInstant = time.Now()
}
if err := <-task.doneCh; err != nil {
return nil, err
}
if enableStats {
e.stats.NextWaitIndexScan += indexFetchedInstant.Sub(start)
if task.buildDoneTime.After(indexFetchedInstant) {
e.stats.NextWaitTableLookUpBuild += task.buildDoneTime.Sub(indexFetchedInstant)
indexFetchedInstant = task.buildDoneTime
}
e.stats.NextWaitTableLookUpResp += time.Since(indexFetchedInstant)
}
// Release the memory usage of last task before we handle a new task.
if e.resultCurr != nil {
e.resultCurr.memTracker.Consume(-e.resultCurr.memUsage)
}
e.resultCurr = task
return e.resultCurr, nil
}
func (e *IndexLookUpExecutor) initRuntimeStats() {
if e.RuntimeStats() != nil {
e.stats = &IndexLookUpRunTimeStats{
indexScanBasicStats: &execdetails.BasicRuntimeStats{},
Concurrency: e.indexLookupConcurrency,
}
}
}
func (e *IndexLookUpExecutor) getIndexPlanRootID() int {
if len(e.idxPlans) > 0 {
return e.idxPlans[len(e.idxPlans)-1].ID()
}
return e.ID()
}
func (e *IndexLookUpExecutor) getTableRootPlanID() int {
if len(e.tblPlans) > 0 {
return e.tblPlans[len(e.tblPlans)-1].ID()
}
return e.ID()
}
// indexWorker is used by IndexLookUpExecutor to maintain index lookup background goroutines.
type indexWorker struct {
idxLookup *IndexLookUpExecutor
finished <-chan struct{}
resultCh chan<- *lookupTableTask
keepOrder bool
// batchSize is for lightweight startup. It will be increased exponentially until reaches the max batch size value.
batchSize int
maxBatchSize int
maxChunkSize int
// checkIndexValue is used to check the consistency of the index data.
*checkIndexValue
// PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader.
PushedLimit *physicalop.PushedDownLimit
// scannedKeys indicates how many keys be scanned
scannedKeys uint64
}
func (w *indexWorker) syncErr(err error) {
doneCh := make(chan error, 1)
doneCh <- err
w.resultCh <- &lookupTableTask{
doneCh: doneCh,
}
}
type selectResultList []struct {
Result distsql.SelectResult
RowIter distsql.SelectResultIter
}
func newSelectResultList(results []distsql.SelectResult) selectResultList {
l := make(selectResultList, len(results))
for i, r := range results {
l[i].Result = r
}
return l
}
func newSelectResultRowIterList(results []distsql.SelectResult, intermediateResultTypes [][]*types.FieldType) (selectResultList, error) {
ret := newSelectResultList(results)
for i, r := range ret {
rowIter, err := r.Result.IntoIter(intermediateResultTypes)
if err != nil {
ret.Close()
return nil, err
}
ret[i].RowIter = rowIter
ret[i].Result = nil
}
return ret, nil
}
func (l selectResultList) Close() {
for _, r := range l {
var err error
if r.RowIter != nil {
err = r.RowIter.Close()
} else if r.Result != nil {
err = r.Result.Close()
}
if err != nil {
logutil.BgLogger().Error("close Select result failed", zap.Error(err))
}
}
}
// fetchHandles fetches a batch of handles from index data and builds the index lookup tasks.
// The tasks are submitted to the pool and processed by tableWorker, and sent to e.resultCh
// at the same time to keep data ordered.
func (w *indexWorker) fetchHandles(ctx context.Context, results selectResultList, indexTps []*types.FieldType) (err error) {
defer func() {
if r := recover(); r != nil {
logutil.Logger(ctx).Warn("indexWorker in IndexLookupExecutor panicked", zap.Any("recover", r), zap.Stack("stack"))
err4Panic := util.GetRecoverError(r)
w.syncErr(err4Panic)
if err != nil {
err = errors.Trace(err4Panic)
}
}
}()
var chk *chunk.Chunk
if !w.idxLookup.indexLookUpPushDown {
// chk is only used by non-indexLookUpPushDown mode for mem-reuse
chk = w.idxLookup.AllocPool.Alloc(indexTps, w.idxLookup.MaxChunkSize(), w.idxLookup.MaxChunkSize())
}
handleOffsets, err := w.getHandleOffsets(len(indexTps))
if err != nil {
w.syncErr(err)
return err
}
idxID := w.idxLookup.getIndexPlanRootID()
if w.idxLookup.stmtRuntimeStatsColl != nil {
if idxID != w.idxLookup.ID() && w.idxLookup.stats != nil {
w.idxLookup.stats.indexScanBasicStats = w.idxLookup.stmtRuntimeStatsColl.GetBasicRuntimeStats(idxID, true)
}
}
taskID := 0
for i := 0; i < len(results); {
curResultIdx := i
result := results[curResultIdx]
if w.PushedLimit != nil && w.scannedKeys >= w.PushedLimit.Count+w.PushedLimit.Offset {
break
}
startTime := time.Now()
var completedRows []chunk.Row
var handles []kv.Handle
var retChunk *chunk.Chunk
var curResultExhausted bool
if w.idxLookup.indexLookUpPushDown {
completedRows, handles, curResultExhausted, err = w.extractLookUpPushDownRowsOrHandles(ctx, result.RowIter, handleOffsets)
} else {
handles, retChunk, err = w.extractTaskHandles(ctx, chk, result.Result, handleOffsets)
curResultExhausted = len(handles) == 0
}
finishFetch := time.Now()
if err != nil {
w.syncErr(err)
return err
}
if curResultExhausted {
i++
}
if len(handles) == 0 && len(completedRows) == 0 {
continue
}
var completedTask *lookupTableTask
if rowCnt := len(completedRows); rowCnt > 0 {
metrics.IndexLookUpPushDownRowsCounterHit.Add(float64(rowCnt))
// Currently, completedRows is only produced by index lookup push down which does not support keep order.
// for non-keep-order request, the completed rows can be sent to resultCh directly.
completedTask = w.buildCompletedTask(taskID, completedRows)
taskID++
}
var tableLookUpTask *lookupTableTask
if rowCnt := len(handles); rowCnt > 0 {
if w.idxLookup.indexLookUpPushDown {
metrics.IndexLookUpPushDownRowsCounterMiss.Add(float64(rowCnt))
} else {
metrics.IndexLookUpNormalRowsCounter.Add(float64(rowCnt))
}
tableLookUpTask = w.buildTableTask(taskID, handles, retChunk)
if w.idxLookup.partitionTableMode {
tableLookUpTask.partitionTable = w.idxLookup.prunedPartitions[curResultIdx]
}
taskID++
}
finishBuild := time.Now()
select {
case <-ctx.Done():
return nil
case <-w.finished:
return nil
default:
if completedTask != nil {
w.resultCh <- completedTask
}
if tableLookUpTask != nil {
e := w.idxLookup
e.tblWorkerWg.Add(1)
e.pool.submit(func() {
defer e.tblWorkerWg.Done()
select {
case <-e.finished:
return
default:
growWorkerStack16K()
execTableTask(e, tableLookUpTask)
}
})
w.resultCh <- tableLookUpTask
}
}
if w.idxLookup.stats != nil {
atomic.AddInt64(&w.idxLookup.stats.FetchHandle, int64(finishFetch.Sub(startTime)))
atomic.AddInt64(&w.idxLookup.stats.TaskWait, int64(time.Since(finishBuild)))
atomic.AddInt64(&w.idxLookup.stats.FetchHandleTotal, int64(time.Since(startTime)))
}
}
return nil
}
func (w *indexWorker) getHandleOffsets(indexTpsLen int) ([]int, error) {
numColsWithoutPid := indexTpsLen
ok, err := w.idxLookup.needPartitionHandle(getHandleFromIndex)
if err != nil {
return nil, err
}
if ok {
numColsWithoutPid = numColsWithoutPid - 1
}
handleOffset := make([]int, 0, len(w.idxLookup.handleCols))
for i := range w.idxLookup.handleCols {
handleOffset = append(handleOffset, numColsWithoutPid-len(w.idxLookup.handleCols)+i)
}
if len(handleOffset) == 0 {
handleOffset = []int{numColsWithoutPid - 1}
}
return handleOffset, nil
}
func (w *indexWorker) extractLookUpPushDownRowsOrHandles(ctx context.Context, iter distsql.SelectResultIter, handleOffset []int) (rows []chunk.Row, handles []kv.Handle, exhausted bool, err error) {
intest.Assert(w.checkIndexValue == nil, "CheckIndex or CheckTable should not use index lookup push down")
const channelIdxIndex = 0
const channelIdxRow = 1
startTime := time.Now()
startScanKeys := w.scannedKeys
defer func() {
if cnt := w.scannedKeys - startScanKeys; w.idxLookup.stats != nil {
w.idxLookup.stats.indexScanBasicStats.Record(time.Since(startTime), int(cnt))
}
}()
checkLimit := w.PushedLimit != nil
for len(handles)+len(rows) < w.batchSize {
var row distsql.SelectResultRow
row, err = iter.Next(ctx)
if err != nil {
return nil, nil, false, errors.Trace(err)
}
if row.IsEmpty() {
exhausted = true
return
}
w.scannedKeys++
if checkLimit {
if w.scannedKeys <= w.PushedLimit.Offset {
continue
}
if w.scannedKeys > (w.PushedLimit.Offset + w.PushedLimit.Count) {
// Skip the handles after Offset+Count.
return
}
}
switch row.ChannelIndex {
case channelIdxRow:
rows = append(rows, row.Row)
case channelIdxIndex:
h, err := w.idxLookup.getHandle(row.Row, handleOffset, w.idxLookup.isCommonHandle(), getHandleFromIndex)
if err != nil {
return nil, nil, false, errors.Trace(err)
}
handles = append(handles, h)
default:
return nil, nil, false, errors.Errorf("unexpected channel index %d", row.ChannelIndex)
}
}
w.batchSize *= 2
if w.batchSize > w.maxBatchSize {
w.batchSize = w.maxBatchSize
}
return
}
func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, handleOffset []int) (
handles []kv.Handle, retChk *chunk.Chunk, err error) {
// PushedLimit would always be nil for CheckIndex or CheckTable, we add this check just for insurance.
checkLimit := (w.PushedLimit != nil) && (w.checkIndexValue == nil)
for len(handles) < w.batchSize {
requiredRows := w.batchSize - len(handles)
if checkLimit {
if w.PushedLimit.Offset+w.PushedLimit.Count <= w.scannedKeys {
return handles, nil, nil
}
leftCnt := w.PushedLimit.Offset + w.PushedLimit.Count - w.scannedKeys
if uint64(requiredRows) > leftCnt {
requiredRows = int(leftCnt)
}
}
chk.SetRequiredRows(requiredRows, w.maxChunkSize)
startTime := time.Now()
err = errors.Trace(idxResult.Next(ctx, chk))
if err != nil {
return handles, nil, err
}
if w.idxLookup.stats != nil {
w.idxLookup.stats.indexScanBasicStats.Record(time.Since(startTime), chk.NumRows())
}
if chk.NumRows() == 0 {
return handles, retChk, nil
}
if handles == nil {
handles = make([]kv.Handle, 0, chk.NumRows())
}
for i := range chk.NumRows() {
w.scannedKeys++
if checkLimit {
if w.scannedKeys <= w.PushedLimit.Offset {
continue
}
if w.scannedKeys > (w.PushedLimit.Offset + w.PushedLimit.Count) {
// Skip the handles after Offset+Count.
return handles, nil, nil
}
}
h, err := w.idxLookup.getHandle(chk.GetRow(i), handleOffset, w.idxLookup.isCommonHandle(), getHandleFromIndex)
if err != nil {
return handles, retChk, err
}
handles = append(handles, h)
}
if w.checkIndexValue != nil {
if retChk == nil {
retChk = chunk.NewChunkWithCapacity(w.idxColTps, w.batchSize)
}
retChk.Append(chk, 0, chk.NumRows())
}
}
w.batchSize *= 2
if w.batchSize > w.maxBatchSize {
w.batchSize = w.maxBatchSize
}
return handles, retChk, nil
}
func (*indexWorker) buildCompletedTask(taskID int, rows []chunk.Row) *lookupTableTask {
task := &lookupTableTask{
id: taskID,
rows: rows,
doneCh: make(chan error, 1),
}
task.doneCh <- nil
return task
}
func (w *indexWorker) buildTableTask(taskID int, handles []kv.Handle, retChk *chunk.Chunk) *lookupTableTask {
var indexOrder *kv.HandleMap
var duplicatedIndexOrder *kv.HandleMap
if w.keepOrder {
// Save the index order.
indexOrder = kv.NewHandleMap()
for i, h := range handles {
indexOrder.Set(h, i)
}
}
if w.checkIndexValue != nil {
// Save the index order.
indexOrder = kv.NewHandleMap()
duplicatedIndexOrder = kv.NewHandleMap()
for i, h := range handles {
if _, ok := indexOrder.Get(h); ok {
duplicatedIndexOrder.Set(h, i)
} else {
indexOrder.Set(h, i)
}
}
}
task := &lookupTableTask{
id: taskID,
handles: handles,
indexOrder: indexOrder,
duplicatedIndexOrder: duplicatedIndexOrder,
idxRows: retChk,
}
task.doneCh = make(chan error, 1)
return task
}
func execTableTask(e *IndexLookUpExecutor, task *lookupTableTask) {
var (
ctx = e.workerCtx
region *trace.Region
)
if trace.IsEnabled() {
region = trace.StartRegion(ctx, "IndexLookUpTableTask"+strconv.Itoa(task.id))
}
defer func() {
if r := recover(); r != nil {
logutil.Logger(ctx).Warn("TableWorker in IndexLookUpExecutor panicked", zap.Any("recover", r), zap.Stack("stack"))
err := util.GetRecoverError(r)
task.doneCh <- err
}
if region != nil {
region.End()
}
}()
tracker := memory.NewTracker(task.id, -1)
tracker.AttachTo(e.memTracker)
w := &tableWorker{
idxLookup: e,
finished: e.finished,
keepOrder: e.keepOrder,
handleIdx: e.handleIdx,
checkIndexValue: e.checkIndexValue,
memTracker: tracker,
}
startTime := time.Now()
err := w.executeTask(ctx, task)
if e.stats != nil {
atomic.AddInt64(&e.stats.TableRowScan, int64(time.Since(startTime)))
atomic.AddInt64(&e.stats.TableTaskNum, 1)
}
task.doneCh <- err
}
// tableWorker is used by IndexLookUpExecutor to maintain table lookup background goroutines.
type tableWorker struct {
idxLookup *IndexLookUpExecutor
finished <-chan struct{}
keepOrder bool
handleIdx []int
// memTracker is used to track the memory usage of this executor.
memTracker *memory.Tracker
// checkIndexValue is used to check the consistency of the index data.
*checkIndexValue
}
func (e *IndexLookUpExecutor) getHandle(row chunk.Row, handleIdx []int,
isCommonHandle bool, tp getHandleType) (handle kv.Handle, err error) {
if isCommonHandle {
var handleEncoded []byte
var datums []types.Datum
for i, idx := range handleIdx {
// If the new collation is enabled and the handle contains non-binary string,
// the handle in the index is encoded as "sortKey". So we cannot restore its
// original value(the primary key) here.
// We use a trick to avoid encoding the "sortKey" again by changing the charset
// collation to `binary`.
rtp := e.handleCols[i].RetType
if collate.NewCollationEnabled() && e.table.Meta().CommonHandleVersion == 0 && rtp.EvalType() == types.ETString &&
!mysql.HasBinaryFlag(rtp.GetFlag()) && tp == getHandleFromIndex {
rtp = rtp.Clone()
rtp.SetCollate(charset.CollationBin)
datums = append(datums, row.GetDatum(idx, rtp))
continue
}
datums = append(datums, row.GetDatum(idx, e.handleCols[i].RetType))
}
tablecodec.TruncateIndexValues(e.table.Meta(), e.primaryKeyIndex, datums)
ectx := e.ectx.GetEvalCtx()
handleEncoded, err = codec.EncodeKey(ectx.Location(), nil, datums...)
errCtx := ectx.ErrCtx()
err = errCtx.HandleError(err)
if err != nil {
return nil, err
}
handle, err = kv.NewCommonHandle(handleEncoded)
if err != nil {
return nil, err
}
} else {
if len(handleIdx) == 0 {
handle = kv.IntHandle(row.GetInt64(0))
} else {
handle = kv.IntHandle(row.GetInt64(handleIdx[0]))
}
}
ok, err := e.needPartitionHandle(tp)
if err != nil {
return nil, err
}
if ok {
pid := row.GetInt64(row.Len() - 1)
handle = kv.NewPartitionHandle(pid, handle)
}
return
}
// IndexLookUpRunTimeStats record the indexlookup runtime stat
type IndexLookUpRunTimeStats struct {
// indexScanBasicStats uses to record basic runtime stats for index scan.
indexScanBasicStats *execdetails.BasicRuntimeStats
FetchHandleTotal int64
FetchHandle int64
TaskWait int64
TableRowScan int64
TableTaskNum int64
Concurrency int
// Record the `Next` call affected wait duration details.
NextWaitIndexScan time.Duration
NextWaitTableLookUpBuild time.Duration
NextWaitTableLookUpResp time.Duration
}
func (e *IndexLookUpRunTimeStats) String() string {
var buf bytes.Buffer
fetchHandle := atomic.LoadInt64(&e.FetchHandleTotal)
indexScan := atomic.LoadInt64(&e.FetchHandle)
taskWait := atomic.LoadInt64(&e.TaskWait)
tableScan := atomic.LoadInt64(&e.TableRowScan)
tableTaskNum := atomic.LoadInt64(&e.TableTaskNum)
concurrency := e.Concurrency
if indexScan != 0 {
buf.WriteString(fmt.Sprintf("index_task: {total_time: %s, fetch_handle: %s, build: %s, wait: %s}",
execdetails.FormatDuration(time.Duration(fetchHandle)),
execdetails.FormatDuration(time.Duration(indexScan)),
execdetails.FormatDuration(time.Duration(fetchHandle-indexScan-taskWait)),
execdetails.FormatDuration(time.Duration(taskWait))))
}
if tableScan != 0 {
if buf.Len() > 0 {
buf.WriteByte(',')
}
buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency))
}
if e.NextWaitIndexScan > 0 || e.NextWaitTableLookUpBuild > 0 || e.NextWaitTableLookUpResp > 0 {
if buf.Len() > 0 {
buf.WriteByte(',')
fmt.Fprintf(&buf, " next: {wait_index: %s, wait_table_lookup_build: %s, wait_table_lookup_resp: %s}",
execdetails.FormatDuration(e.NextWaitIndexScan),
execdetails.FormatDuration(e.NextWaitTableLookUpBuild),
execdetails.FormatDuration(e.NextWaitTableLookUpResp))
}
}
return buf.String()
}
// Clone implements the RuntimeStats interface.
func (e *IndexLookUpRunTimeStats) Clone() execdetails.RuntimeStats {
newRs := *e
return &newRs
}
// Merge implements the RuntimeStats interface.
func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) {
tmp, ok := other.(*IndexLookUpRunTimeStats)
if !ok {
return
}
e.FetchHandleTotal += tmp.FetchHandleTotal
e.FetchHandle += tmp.FetchHandle
e.TaskWait += tmp.TaskWait
e.TableRowScan += tmp.TableRowScan
e.TableTaskNum += tmp.TableTaskNum
e.NextWaitIndexScan += tmp.NextWaitIndexScan
e.NextWaitTableLookUpBuild += tmp.NextWaitTableLookUpBuild
e.NextWaitTableLookUpResp += tmp.NextWaitTableLookUpResp
}
// Tp implements the RuntimeStats interface.
func (*IndexLookUpRunTimeStats) Tp() int {
return execdetails.TpIndexLookUpRunTimeStats
}
func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, tableReader exec.Executor) error {
chk := exec.TryNewCacheChunk(tableReader)
tblInfo := w.idxLookup.table.Meta()
vals := make([]types.Datum, 0, len(w.idxTblCols))
// Prepare collator for compare.
collators := make([]collate.Collator, 0, len(w.idxColTps))
for _, tp := range w.idxColTps {
collators = append(collators, collate.GetCollator(tp.GetCollate()))
}
ir := func() *consistency.Reporter {
return &consistency.Reporter{
HandleEncode: func(handle kv.Handle) kv.Key {
return tablecodec.EncodeRecordKey(w.idxLookup.table.RecordPrefix(), handle)
},
IndexEncode: func(idxRow *consistency.RecordData) kv.Key {
var idx table.Index
for _, v := range w.idxLookup.table.Indices() {
if strings.EqualFold(v.Meta().Name.String(), w.idxLookup.index.Name.O) {
idx = v
break
}
}
if idx == nil {
return nil
}
ectx := w.idxLookup.ectx.GetEvalCtx()
k, _, err := idx.GenIndexKey(ectx.ErrCtx(), ectx.Location(), idxRow.Values[:len(idx.Meta().Columns)], idxRow.Handle, nil)
if err != nil {
return nil
}
return k
},
Tbl: tblInfo,
Idx: w.idxLookup.index,
EnableRedactLog: w.idxLookup.enableRedactLog,
Storage: w.idxLookup.storage,
}
}
for {
err := exec.Next(ctx, tableReader, chk)
if err != nil {
return errors.Trace(err)
}
// If ctx is cancelled, `Next` may return empty result when the actual data is not empty. To avoid producing
// false-positive error logs that cause confusion, exit in this case.
select {
case <-ctx.Done():
return nil
default:
}
if chk.NumRows() == 0 {
task.indexOrder.Range(func(h kv.Handle, val any) bool {
idxRow := task.idxRows.GetRow(val.(int))
err = ir().ReportAdminCheckInconsistent(ctx, h, &consistency.RecordData{Handle: h, Values: getDatumRow(&idxRow, w.idxColTps)}, nil)
return false
})
if err != nil {
return err
}
break
}
iter := chunk.NewIterator4Chunk(chk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
handle, err := w.idxLookup.getHandle(row, w.handleIdx, w.idxLookup.isCommonHandle(), getHandleFromTable)
if err != nil {
return err
}
v, ok := task.indexOrder.Get(handle)
if !ok {
v, _ = task.duplicatedIndexOrder.Get(handle)
}
offset, _ := v.(int)
task.indexOrder.Delete(handle)
idxRow := task.idxRows.GetRow(offset)
vals = vals[:0]
for i, col := range w.idxTblCols {
vals = append(vals, row.GetDatum(i, &col.FieldType))
}
tablecodec.TruncateIndexValues(tblInfo, w.idxLookup.index, vals)
tc := w.idxLookup.ectx.GetEvalCtx().TypeCtx()
for i := range vals {
col := w.idxTblCols[i]
idxVal := idxRow.GetDatum(i, w.idxColTps[i])
tablecodec.TruncateIndexValue(&idxVal, w.idxLookup.index.Columns[i], col.ColumnInfo)
cmpRes, err := tables.CompareIndexAndVal(tc, vals[i], idxVal, collators[i], col.FieldType.IsArray() && vals[i].Kind() == types.KindMysqlJSON)
if err != nil {
return ir().ReportAdminCheckInconsistentWithColInfo(ctx,
handle,
col.Name.O,
idxVal,
vals[i],
err,
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, w.idxColTps)},
)
}
if cmpRes != 0 {
return ir().ReportAdminCheckInconsistentWithColInfo(ctx,
handle,
col.Name.O,
idxRow.GetDatum(i, w.idxColTps[i]),
vals[i],
err,
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, w.idxColTps)},
)
}
}
}
}
return nil
}
func getDatumRow(r *chunk.Row, fields []*types.FieldType) []types.Datum {
datumRow := make([]types.Datum, 0, r.Chunk().NumCols())
for colIdx := range r.Chunk().NumCols() {
if colIdx >= len(fields) {
break
}
datum := r.GetDatum(colIdx, fields[colIdx])
datumRow = append(datumRow, datum)
}
return datumRow
}
// executeTask executes the table look up tasks. We will construct a table reader and send request by handles.
// Then we hold the returning rows and finish this task.
func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error {
tableReader, err := w.idxLookup.buildTableReader(ctx, task)
task.buildDoneTime = time.Now()
if err != nil {
if ctx.Err() != context.Canceled {
logutil.Logger(ctx).Error("build table reader failed", zap.Error(err))
}
return err
}
defer func() { terror.Log(exec.Close(tableReader)) }()
if w.checkIndexValue != nil {
return w.compareData(ctx, task, tableReader)
}
{
task.memTracker = w.memTracker
memUsage := int64(cap(task.handles))*size.SizeOfInterface + tableReader.memUsage()
for _, h := range task.handles {
memUsage += int64(h.MemUsage())
}
if task.indexOrder != nil {
memUsage += task.indexOrder.MemUsage()
}
if task.duplicatedIndexOrder != nil {
memUsage += task.duplicatedIndexOrder.MemUsage()
}
memUsage += task.idxRows.MemoryUsage()
task.memUsage = memUsage
task.memTracker.Consume(memUsage)
}
handleCnt := len(task.handles)
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := exec.TryNewCacheChunk(tableReader)
err = exec.Next(ctx, tableReader, chk)
if err != nil {
if ctx.Err() != context.Canceled {
logutil.Logger(ctx).Warn("table reader fetch next chunk failed", zap.Error(err))
}
return err
}
if chk.NumRows() == 0 {
break
}
{
memUsage := chk.MemoryUsage()
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
}
iter := chunk.NewIterator4Chunk(chk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
task.rows = append(task.rows, row)
}
}
defer trace.StartRegion(ctx, "IndexLookUpTableCompute").End()
{
memUsage := int64(cap(task.rows)) * int64(unsafe.Sizeof(chunk.Row{}))
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
}
if w.keepOrder {
task.rowIdx = make([]int, 0, len(task.rows))
for i := range task.rows {
handle, err := w.idxLookup.getHandle(task.rows[i], w.handleIdx, w.idxLookup.isCommonHandle(), getHandleFromTable)
if err != nil {
return err
}
rowIdx, _ := task.indexOrder.Get(handle)
task.rowIdx = append(task.rowIdx, rowIdx.(int))
}
{
memUsage := int64(cap(task.rowIdx) * int(size.SizeOfInt))
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
}
sort.Sort(task)
}
if handleCnt != len(task.rows) && !util.HasCancelled(ctx) &&
!w.idxLookup.weakConsistency {
if len(w.idxLookup.tblPlans) == 1 {
obtainedHandlesMap := kv.NewHandleMap()
for _, row := range task.rows {
handle, err := w.idxLookup.getHandle(row, w.handleIdx, w.idxLookup.isCommonHandle(), getHandleFromTable)
if err != nil {
return err
}
obtainedHandlesMap.Set(handle, true)
}
missHds := GetLackHandles(task.handles, obtainedHandlesMap)
return (&consistency.Reporter{
HandleEncode: func(hd kv.Handle) kv.Key {
return tablecodec.EncodeRecordKey(w.idxLookup.table.RecordPrefix(), hd)
},
Tbl: w.idxLookup.table.Meta(),
Idx: w.idxLookup.index,
EnableRedactLog: w.idxLookup.enableRedactLog,
Storage: w.idxLookup.storage,
}).ReportLookupInconsistent(ctx,
handleCnt,
len(task.rows),
missHds,
task.handles,
nil,
//missRecords,
)
}
}
return nil
}
// GetLackHandles gets the handles in expectedHandles but not in obtainedHandlesMap.
func GetLackHandles(expectedHandles []kv.Handle, obtainedHandlesMap *kv.HandleMap) []kv.Handle {
diffCnt := len(expectedHandles) - obtainedHandlesMap.Len()
diffHandles := make([]kv.Handle, 0, diffCnt)
var cnt int
for _, handle := range expectedHandles {
isExist := false
if _, ok := obtainedHandlesMap.Get(handle); ok {
obtainedHandlesMap.Delete(handle)
isExist = true
}
if !isExist {
diffHandles = append(diffHandles, handle)
cnt++
if cnt == diffCnt {
break
}
}
}
return diffHandles
}
func getPhysicalPlanIDs(plans []base.PhysicalPlan) []int {
planIDs := make([]int, 0, len(plans))
for _, p := range plans {
planIDs = append(planIDs, p.ID())
}
return planIDs
}
func needMergeSort(byItems []*plannerutil.ByItems, kvRangesCount int) bool {
return len(byItems) > 0 && kvRangesCount > 1
}