// Copyright 2019 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package executor import ( "bytes" "cmp" "container/heap" "context" "fmt" "runtime/trace" "slices" "sort" "sync" "sync/atomic" "time" "unsafe" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/distsql" "github.com/pingcap/tidb/pkg/executor/internal/builder" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/planner/core/base" "github.com/pingcap/tidb/pkg/planner/core/operator/physicalop" plannerutil "github.com/pingcap/tidb/pkg/planner/util" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/channel" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/ranger" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) var ( _ exec.Executor = &IndexMergeReaderExecutor{} // IndexMergeCancelFuncForTest is used just for test IndexMergeCancelFuncForTest func() ) const ( partialIndexWorkerType = "IndexMergePartialIndexWorker" partialTableWorkerType = "IndexMergePartialTableWorker" processWorkerType = "IndexMergeProcessWorker" partTblIntersectionWorkerType = "IndexMergePartTblIntersectionWorker" tableScanWorkerType = "IndexMergeTableScanWorker" ) // IndexMergeReaderExecutor accesses a table with multiple index/table scan. // There are three types of workers: // 1. partialTableWorker/partialIndexWorker, which are used to fetch the handles // 2. indexMergeProcessWorker, which is used to do the `Union` operation. // 3. indexMergeTableScanWorker, which is used to get the table tuples with the given handles. // // The execution flow is really like IndexLookUpReader. However, it uses multiple index scans // or table scans to get the handles: // 1. use the partialTableWorkers and partialIndexWorkers to fetch the handles (a batch per time) // and send them to the indexMergeProcessWorker. // 2. indexMergeProcessWorker do the `Union` operation for a batch of handles it have got. // For every handle in the batch: // 1. check whether it has been accessed. // 2. if not, record it and send it to the indexMergeTableScanWorker. // 3. if accessed, just ignore it. type IndexMergeReaderExecutor struct { exec.BaseExecutor indexUsageReporter *exec.IndexUsageReporter table table.Table indexes []*model.IndexInfo descs []bool ranges [][]*ranger.Range dagPBs []*tipb.DAGRequest startTS uint64 tableRequest *tipb.DAGRequest keepOrder bool pushedLimit *physicalop.PushedDownLimit byItems []*plannerutil.ByItems // columns are only required by union scan. columns []*model.ColumnInfo // partitionIDMap are only required by union scan with global index. partitionIDMap map[int64]struct{} *dataReaderBuilder // fields about accessing partition tables partitionTableMode bool // if this IndexMerge is accessing a partition table prunedPartitions []table.PhysicalTable // pruned partition tables need to access partitionKeyRanges [][][]kv.KeyRange // [partialIndex][partitionIdx][ranges] // All fields above are immutable. tblWorkerWg sync.WaitGroup idxWorkerWg sync.WaitGroup processWorkerWg sync.WaitGroup finished chan struct{} workerStarted bool keyRanges [][]kv.KeyRange resultCh chan *indexMergeTableTask resultCurr *indexMergeTableTask // memTracker is used to track the memory usage of this executor. memTracker *memory.Tracker partialPlans [][]base.PhysicalPlan tblPlans []base.PhysicalPlan partialNetDataSizes []float64 dataAvgRowSize float64 handleCols plannerutil.HandleCols stats *IndexMergeRuntimeStat // Indicates whether there is correlated column in filter or table/index range. // We need to refresh dagPBs before send DAGReq to storage. isCorColInPartialFilters []bool isCorColInTableFilter bool isCorColInPartialAccess []bool // Whether it's intersection or union. isIntersection bool hasGlobalIndex bool } type indexMergeTableTask struct { lookupTableTask // parTblIdx are only used in indexMergeProcessWorker.fetchLoopIntersection. parTblIdx int // partialPlanID are only used for indexMergeProcessWorker.fetchLoopUnionWithOrderBy. partialPlanID int } // Table implements the dataSourceExecutor interface. func (e *IndexMergeReaderExecutor) Table() table.Table { return e.table } // Open implements the Executor Open interface func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) { e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans)) e.initRuntimeStats() if e.isCorColInTableFilter { e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.tblPlans) if err != nil { return err } } if err = e.rebuildRangeForCorCol(); err != nil { return err } if !e.partitionTableMode { if e.keyRanges, err = e.buildKeyRangesForTable(e.table); err != nil { return err } } else { e.partitionKeyRanges = make([][][]kv.KeyRange, len(e.indexes)) tmpPartitionKeyRanges := make([][][]kv.KeyRange, len(e.prunedPartitions)) for i, p := range e.prunedPartitions { if tmpPartitionKeyRanges[i], err = e.buildKeyRangesForTable(p); err != nil { return err } } for i, idx := range e.indexes { if idx != nil && idx.Global { keyRange, _ := distsql.IndexRangesToKVRanges(e.ctx.GetDistSQLCtx(), e.table.Meta().ID, idx.ID, e.ranges[i]) e.partitionKeyRanges[i] = [][]kv.KeyRange{keyRange.FirstPartitionRange()} } else { for _, pKeyRanges := range tmpPartitionKeyRanges { e.partitionKeyRanges[i] = append(e.partitionKeyRanges[i], pKeyRanges[i]) } } } } e.finished = make(chan struct{}) e.resultCh = make(chan *indexMergeTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize)) if e.memTracker != nil { e.memTracker.Reset() } else { e.memTracker = memory.NewTracker(e.ID(), -1) } e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) return nil } func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) { len1 := len(e.partialPlans) len2 := len(e.isCorColInPartialAccess) if len1 != len2 { return errors.Errorf("unexpect length for partialPlans(%d) and isCorColInPartialAccess(%d)", len1, len2) } for i, plan := range e.partialPlans { if e.isCorColInPartialAccess[i] { switch x := plan[0].(type) { case *physicalop.PhysicalIndexScan: e.ranges[i], err = rebuildIndexRanges(e.Ctx().GetExprCtx(), e.Ctx().GetRangerCtx(), x, x.IdxCols, x.IdxColLens) case *physicalop.PhysicalTableScan: e.ranges[i], err = x.ResolveCorrelatedColumns() default: err = errors.Errorf("unsupported plan type %T", plan[0]) } if err != nil { return err } } } return nil } func (e *IndexMergeReaderExecutor) buildKeyRangesForTable(tbl table.Table) (ranges [][]kv.KeyRange, err error) { dctx := e.Ctx().GetDistSQLCtx() for i, plan := range e.partialPlans { _, ok := plan[0].(*physicalop.PhysicalIndexScan) if !ok { firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges[i], false, e.descs[i], tbl.Meta().IsCommonHandle) firstKeyRanges, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, firstPartRanges) if err != nil { return nil, err } secondKeyRanges, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, secondPartRanges) if err != nil { return nil, err } keyRanges := append(firstKeyRanges.FirstPartitionRange(), secondKeyRanges.FirstPartitionRange()...) ranges = append(ranges, keyRanges) continue } keyRange, err := distsql.IndexRangesToKVRanges(dctx, getPhysicalTableID(tbl), e.indexes[i].ID, e.ranges[i]) if err != nil { return nil, err } ranges = append(ranges, keyRange.FirstPartitionRange()) } return ranges, nil } func (e *IndexMergeReaderExecutor) startWorkers(ctx context.Context) error { exitCh := make(chan struct{}) workCh := make(chan *indexMergeTableTask, 1) fetchCh := make(chan *indexMergeTableTask, len(e.keyRanges)) e.startIndexMergeProcessWorker(ctx, workCh, fetchCh) var err error for i := range e.partialPlans { e.idxWorkerWg.Add(1) if e.indexes[i] != nil { err = e.startPartialIndexWorker(ctx, exitCh, fetchCh, i) } else { err = e.startPartialTableWorker(ctx, exitCh, fetchCh, i) } if err != nil { e.idxWorkerWg.Done() break } } go e.waitPartialWorkersAndCloseFetchChan(fetchCh) if err != nil { close(exitCh) return err } e.startIndexMergeTableScanWorker(ctx, workCh) e.workerStarted = true return nil } func (e *IndexMergeReaderExecutor) waitPartialWorkersAndCloseFetchChan(fetchCh chan *indexMergeTableTask) { e.idxWorkerWg.Wait() close(fetchCh) } func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Context, workCh chan<- *indexMergeTableTask, fetch <-chan *indexMergeTableTask) { idxMergeProcessWorker := &indexMergeProcessWorker{ indexMerge: e, stats: e.stats, } e.processWorkerWg.Add(1) go func() { defer trace.StartRegion(ctx, "IndexMergeProcessWorker").End() util.WithRecovery( func() { if e.isIntersection { if e.keepOrder { // todo: implementing fetchLoopIntersectionWithOrderBy if necessary. panic("Not support intersection with keepOrder = true") } idxMergeProcessWorker.fetchLoopIntersection(ctx, fetch, workCh, e.resultCh, e.finished) } else if len(e.byItems) != 0 { idxMergeProcessWorker.fetchLoopUnionWithOrderBy(ctx, fetch, workCh, e.resultCh, e.finished) } else { idxMergeProcessWorker.fetchLoopUnion(ctx, fetch, workCh, e.resultCh, e.finished) } }, handleWorkerPanic(ctx, e.finished, nil, e.resultCh, nil, processWorkerType), ) e.processWorkerWg.Done() }() } func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, workID int) error { failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { // Wait for processWorker to close resultCh. time.Sleep(time.Second * 2) // Should use fetchCh instead of resultCh to send error. syncErr(ctx, e.finished, fetchCh, errors.New("testIndexMergeResultChCloseEarly")) }) if e.RuntimeStats() != nil { collExec := true e.dagPBs[workID].CollectExecutionSummaries = &collExec } var keyRanges [][]kv.KeyRange if e.partitionTableMode { keyRanges = e.partitionKeyRanges[workID] } else { keyRanges = [][]kv.KeyRange{e.keyRanges[workID]} } failpoint.Inject("startPartialIndexWorkerErr", func() error { return errors.New("inject an error before start partialIndexWorker") }) // for union case, the push-downLimit can be utilized to limit index fetched handles. // for intersection case, the push-downLimit can only be conducted after all index path/table finished. pushedIndexLimit := e.pushedLimit if e.isIntersection { pushedIndexLimit = nil } go func() { defer trace.StartRegion(ctx, "IndexMergePartialIndexWorker").End() defer e.idxWorkerWg.Done() util.WithRecovery( func() { failpoint.Inject("testIndexMergePanicPartialIndexWorker", nil) is := e.partialPlans[workID][0].(*physicalop.PhysicalIndexScan) worker := &partialIndexWorker{ stats: e.stats, idxID: e.getPartitalPlanID(workID), sc: e.Ctx(), dagPB: e.dagPBs[workID], plan: e.partialPlans[workID], batchSize: e.MaxChunkSize(), maxBatchSize: e.Ctx().GetSessionVars().IndexLookupSize, maxChunkSize: e.MaxChunkSize(), memTracker: e.memTracker, partitionTableMode: e.partitionTableMode, prunedPartitions: e.prunedPartitions, byItems: is.ByItems, pushedLimit: pushedIndexLimit, } if worker.stats != nil && worker.idxID != 0 { worker.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(worker.idxID, true) } if e.isCorColInPartialFilters[workID] { // We got correlated column, so need to refresh Selection operator. var err error if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.partialPlans[workID]); err != nil { syncErr(ctx, e.finished, fetchCh, err) return } } worker.batchSize = CalculateBatchSize(int(is.StatsCount()), e.MaxChunkSize(), worker.maxBatchSize) tps := worker.getRetTpsForIndexScan(e.handleCols) results := make([]distsql.SelectResult, 0, len(keyRanges)) defer func() { // To make sure SelectResult.Close() is called even got panic in fetchHandles(). for _, result := range results { if err := result.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) } } }() for _, keyRange := range keyRanges { // check if this executor is closed select { case <-ctx.Done(): return case <-e.finished: return default: } var builder distsql.RequestBuilder builder.SetDAGRequest(e.dagPBs[workID]). SetStartTS(e.startTS). SetDesc(e.descs[workID]). SetKeepOrder(e.keepOrder). SetTxnScope(e.txnScope). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). SetFromSessionVars(e.Ctx().GetDistSQLCtx()). SetMemTracker(e.memTracker). SetFromInfoSchema(e.Ctx().GetInfoSchema()). SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetDistSQLCtx(), &builder.Request, e.partialNetDataSizes[workID])). SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias) if builder.Request.Paging.Enable && builder.Request.Paging.MinPagingSize < uint64(worker.batchSize) { // when paging enabled and Paging.MinPagingSize less than initBatchSize, change Paging.MinPagingSize to // initial batchSize to avoid redundant paging RPC, see more detail in https://github.com/pingcap/tidb/issues/54066 builder.Request.Paging.MinPagingSize = uint64(worker.batchSize) if builder.Request.Paging.MaxPagingSize < uint64(worker.batchSize) { builder.Request.Paging.MaxPagingSize = uint64(worker.batchSize) } } // init kvReq and worker for this partition // The key ranges should be ordered. slices.SortFunc(keyRange, func(i, j kv.KeyRange) int { return bytes.Compare(i.StartKey, j.StartKey) }) kvReq, err := builder.SetKeyRanges(keyRange).Build() if err != nil { syncErr(ctx, e.finished, fetchCh, err) return } result, err := distsql.SelectWithRuntimeStats(ctx, e.Ctx().GetDistSQLCtx(), kvReq, tps, getPhysicalPlanIDs(e.partialPlans[workID]), e.getPartitalPlanID(workID)) if err != nil { syncErr(ctx, e.finished, fetchCh, err) return } results = append(results, result) failpoint.Inject("testIndexMergePartialIndexWorkerCoprLeak", nil) } if len(results) > 1 && len(e.byItems) != 0 { // e.Schema() not the output schema for partialIndexReader, and we put byItems related column at first in `buildIndexReq`, so use nil here. ssr := distsql.NewSortedSelectResults(e.Ctx().GetExprCtx().GetEvalCtx(), results, nil, e.byItems, e.memTracker) results = []distsql.SelectResult{ssr} } ctx1, cancel := context.WithCancel(ctx) // this error is reported in fetchHandles(), so ignore it here. _, _ = worker.fetchHandles(ctx1, results, exitCh, fetchCh, e.finished, e.handleCols, workID) cancel() }, handleWorkerPanic(ctx, e.finished, nil, fetchCh, nil, partialIndexWorkerType), ) }() return nil } func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, workID int) error { ts := e.partialPlans[workID][0].(*physicalop.PhysicalTableScan) tbls := make([]table.Table, 0, 1) if e.partitionTableMode && len(e.byItems) == 0 { for _, p := range e.prunedPartitions { tbls = append(tbls, p) } } else { tbls = append(tbls, e.table) } // for union case, the push-downLimit can be utilized to limit index fetched handles. // for intersection case, the push-downLimit can only be conducted after all index/table path finished. pushedTableLimit := e.pushedLimit if e.isIntersection { pushedTableLimit = nil } go func() { defer trace.StartRegion(ctx, "IndexMergePartialTableWorker").End() defer e.idxWorkerWg.Done() util.WithRecovery( func() { failpoint.Inject("testIndexMergePanicPartialTableWorker", nil) var err error partialTableReader := &TableReaderExecutor{ BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), ts.Schema(), e.getPartitalPlanID(workID)), tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()), dagPB: e.dagPBs[workID], startTS: e.startTS, txnScope: e.txnScope, readReplicaScope: e.readReplicaScope, isStaleness: e.isStaleness, plans: e.partialPlans[workID], ranges: e.ranges[workID], netDataSize: e.partialNetDataSizes[workID], keepOrder: ts.KeepOrder, byItems: ts.ByItems, } worker := &partialTableWorker{ stats: e.stats, sc: e.Ctx(), batchSize: e.MaxChunkSize(), maxBatchSize: e.Ctx().GetSessionVars().IndexLookupSize, maxChunkSize: e.MaxChunkSize(), tableReader: partialTableReader, memTracker: e.memTracker, partitionTableMode: e.partitionTableMode, prunedPartitions: e.prunedPartitions, byItems: ts.ByItems, pushedLimit: pushedTableLimit, } if len(e.prunedPartitions) != 0 && len(e.byItems) != 0 { slices.SortFunc(worker.prunedPartitions, func(i, j table.PhysicalTable) int { return cmp.Compare(i.GetPhysicalID(), j.GetPhysicalID()) }) partialTableReader.kvRangeBuilder = kvRangeBuilderFromRangeAndPartition{ partitions: worker.prunedPartitions, } } if e.isCorColInPartialFilters[workID] { if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.partialPlans[workID]); err != nil { syncErr(ctx, e.finished, fetchCh, err) return } partialTableReader.dagPB = e.dagPBs[workID] } var tableReaderClosed bool defer func() { // To make sure SelectResult.Close() is called even got panic in fetchHandles(). if !tableReaderClosed { terror.Log(exec.Close(worker.tableReader)) } }() for parTblIdx, tbl := range tbls { // check if this executor is closed select { case <-ctx.Done(): return case <-e.finished: return default: } // init partialTableReader and partialTableWorker again for the next table partialTableReader.table = tbl if err = exec.Open(ctx, partialTableReader); err != nil { logutil.Logger(ctx).Error("open Select result failed:", zap.Error(err)) syncErr(ctx, e.finished, fetchCh, err) break } failpoint.Inject("testIndexMergePartialTableWorkerCoprLeak", nil) tableReaderClosed = false worker.batchSize = min(e.MaxChunkSize(), worker.maxBatchSize) // fetch all handles from this table ctx1, cancel := context.WithCancel(ctx) _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.finished, e.handleCols, parTblIdx, workID) // release related resources cancel() tableReaderClosed = true if err = exec.Close(worker.tableReader); err != nil { logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err)) } // this error is reported in fetchHandles(), so ignore it here. if fetchErr != nil { break } } }, handleWorkerPanic(ctx, e.finished, nil, fetchCh, nil, partialTableWorkerType), ) }() return nil } func (e *IndexMergeReaderExecutor) initRuntimeStats() { if e.RuntimeStats() != nil { e.stats = &IndexMergeRuntimeStat{ Concurrency: e.Ctx().GetSessionVars().IndexLookupConcurrency(), } } } func (e *IndexMergeReaderExecutor) getPartitalPlanID(workID int) int { if len(e.partialPlans[workID]) > 0 { return e.partialPlans[workID][len(e.partialPlans[workID])-1].ID() } return 0 } func (e *IndexMergeReaderExecutor) getTablePlanRootID() int { if len(e.tblPlans) > 0 { return e.tblPlans[len(e.tblPlans)-1].ID() } return e.ID() } type partialTableWorker struct { stats *IndexMergeRuntimeStat sc sessionctx.Context batchSize int maxBatchSize int maxChunkSize int tableReader exec.Executor memTracker *memory.Tracker partitionTableMode bool prunedPartitions []table.PhysicalTable byItems []*plannerutil.ByItems scannedKeys uint64 pushedLimit *physicalop.PushedDownLimit } // needPartitionHandle indicates whether we need create a partitionHandle or not. // If the schema from planner part contains ExtraPhysTblID, // we need create a partitionHandle, otherwise create a normal handle. // In TableRowIDScan, the partitionHandle will be used to create key ranges. func (w *partialTableWorker) needPartitionHandle() (bool, error) { cols := w.tableReader.(*TableReaderExecutor).plans[0].Schema().Columns outputOffsets := w.tableReader.(*TableReaderExecutor).dagPB.OutputOffsets col := cols[outputOffsets[len(outputOffsets)-1]] needPartitionHandle := w.partitionTableMode && len(w.byItems) > 0 hasExtraCol := col.ID == model.ExtraPhysTblID // There will be two needPartitionHandle != hasExtraCol situations. // Only `needPartitionHandle` == true and `hasExtraCol` == false are not allowed. // `ExtraPhysTblID` will be used in `SelectLock` when `needPartitionHandle` == false and `hasExtraCol` == true. if needPartitionHandle && !hasExtraCol { return needPartitionHandle, errors.Errorf("Internal error, needPartitionHandle != ret") } return needPartitionHandle, nil } func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, finished <-chan struct{}, handleCols plannerutil.HandleCols, parTblIdx int, partialPlanIndex int) (count int64, err error) { chk := w.tableReader.NewChunkWithCapacity(w.getRetTpsForTableScan(), w.maxChunkSize, w.maxBatchSize) for { start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleCols) if err != nil { syncErr(ctx, finished, fetchCh, err) return count, err } if len(handles) == 0 { return count, nil } count += int64(len(handles)) task := w.buildTableTask(handles, retChunk, parTblIdx, partialPlanIndex) if w.stats != nil { atomic.AddInt64(&w.stats.FetchIdxTime, int64(time.Since(start))) } select { case <-ctx.Done(): return count, ctx.Err() case <-exitCh: return count, nil case <-finished: return count, nil case fetchCh <- task: } } } func (w *partialTableWorker) getRetTpsForTableScan() []*types.FieldType { return exec.RetTypes(w.tableReader) } func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, handleCols plannerutil.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) if len(w.byItems) != 0 { retChk = chunk.NewChunkWithCapacity(w.getRetTpsForTableScan(), w.batchSize) } var memUsage int64 var chunkRowOffset int defer w.memTracker.Consume(-memUsage) for len(handles) < w.batchSize { requiredRows := w.batchSize - len(handles) if w.pushedLimit != nil { if w.pushedLimit.Offset+w.pushedLimit.Count <= w.scannedKeys { return handles, retChk, nil } requiredRows = min(int(w.pushedLimit.Offset+w.pushedLimit.Count-w.scannedKeys), requiredRows) } chk.SetRequiredRows(requiredRows, w.maxChunkSize) start := time.Now() err = errors.Trace(w.tableReader.Next(ctx, chk)) if err != nil { return nil, nil, err } if w.tableReader != nil && w.tableReader.RuntimeStats() != nil { w.tableReader.RuntimeStats().Record(time.Since(start), chk.NumRows()) } if chk.NumRows() == 0 { failpoint.Inject("testIndexMergeErrorPartialTableWorker", func(v failpoint.Value) { failpoint.Return(handles, nil, errors.New(v.(string))) }) return handles, retChk, nil } memDelta := chk.MemoryUsage() memUsage += memDelta w.memTracker.Consume(memDelta) // We want both to reset chunkRowOffset and keep it up-to-date after the loop // nolint:intrange for chunkRowOffset = 0; chunkRowOffset < chk.NumRows(); chunkRowOffset++ { if w.pushedLimit != nil { w.scannedKeys++ if w.scannedKeys > (w.pushedLimit.Offset + w.pushedLimit.Count) { // Skip the handles after Offset+Count. break } } var handle kv.Handle ok, err1 := w.needPartitionHandle() if err1 != nil { return nil, nil, err1 } if ok { handle, err = handleCols.BuildPartitionHandleFromIndexRow(w.sc.GetSessionVars().StmtCtx, chk.GetRow(chunkRowOffset)) } else { handle, err = handleCols.BuildHandleFromIndexRow(w.sc.GetSessionVars().StmtCtx, chk.GetRow(chunkRowOffset)) } if err != nil { return nil, nil, err } handles = append(handles, handle) } // used for order by if len(w.byItems) != 0 { retChk.Append(chk, 0, chunkRowOffset) } } w.batchSize *= 2 if w.batchSize > w.maxBatchSize { w.batchSize = w.maxBatchSize } return handles, retChk, nil } func (w *partialTableWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk, parTblIdx int, partialPlanID int) *indexMergeTableTask { task := &indexMergeTableTask{ lookupTableTask: lookupTableTask{ handles: handles, idxRows: retChk, }, parTblIdx: parTblIdx, partialPlanID: partialPlanID, } if w.prunedPartitions != nil { task.partitionTable = w.prunedPartitions[parTblIdx] } task.doneCh = make(chan error, 1) return task } func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Context, workCh <-chan *indexMergeTableTask) { lookupConcurrencyLimit := e.Ctx().GetSessionVars().IndexLookupConcurrency() e.tblWorkerWg.Add(lookupConcurrencyLimit) for range lookupConcurrencyLimit { worker := &indexMergeTableScanWorker{ stats: e.stats, workCh: workCh, finished: e.finished, indexMergeExec: e, tblPlans: e.tblPlans, memTracker: e.memTracker, } ctx1, cancel := context.WithCancel(ctx) go func() { defer trace.StartRegion(ctx, tableScanWorkerType).End() var task *indexMergeTableTask util.WithRecovery( // Note we use the address of `task` as the argument of both `pickAndExecTask` and `handleTableScanWorkerPanic` // because `task` is expected to be assigned in `pickAndExecTask`, and this assignment should also be visible // in `handleTableScanWorkerPanic` since it will get `doneCh` from `task`. Golang always pass argument by value, // so if we don't use the address of `task` as the argument, the assignment to `task` in `pickAndExecTask` is // not visible in `handleTableScanWorkerPanic` func() { worker.pickAndExecTask(ctx1, &task) }, worker.handleTableScanWorkerPanic(ctx1, e.finished, &task, tableScanWorkerType), ) cancel() e.tblWorkerWg.Done() }() } } func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, tbl table.Table, handles []kv.Handle) (_ exec.Executor, err error) { tableReaderExec := &TableReaderExecutor{ BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), e.Schema(), e.getTablePlanRootID()), tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()), table: tbl, dagPB: e.tableRequest, startTS: e.startTS, txnScope: e.txnScope, readReplicaScope: e.readReplicaScope, isStaleness: e.isStaleness, columns: e.columns, plans: e.tblPlans, netDataSize: e.dataAvgRowSize * float64(len(handles)), } tableReaderExec.buildVirtualColumnInfo() // Reorder handles because SplitKeyRangesByLocationsWith/WithoutBuckets() requires startKey of kvRanges is ordered. // Also it's good for performance. tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, true) if err != nil { logutil.Logger(ctx).Error("build table reader from handles failed", zap.Error(err)) return nil, err } return tableReader, nil } // Next implements Executor Next interface. func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if !e.workerStarted { if err := e.startWorkers(ctx); err != nil { return err } } req.Reset() for { resultTask, err := e.getResultTask(ctx) if err != nil { return errors.Trace(err) } if resultTask == nil { return nil } if resultTask.cursor < len(resultTask.rows) { numToAppend := min(len(resultTask.rows)-resultTask.cursor, e.MaxChunkSize()-req.NumRows()) req.AppendRows(resultTask.rows[resultTask.cursor : resultTask.cursor+numToAppend]) resultTask.cursor += numToAppend if req.NumRows() >= e.MaxChunkSize() { return nil } } } } func (e *IndexMergeReaderExecutor) getResultTask(ctx context.Context) (*indexMergeTableTask, error) { failpoint.Inject("testIndexMergeMainReturnEarly", func(_ failpoint.Value) { // To make sure processWorker make resultCh to be full. // When main goroutine close finished, processWorker may be stuck when writing resultCh. time.Sleep(time.Second * 20) failpoint.Return(nil, errors.New("failpoint testIndexMergeMainReturnEarly")) }) if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) { return e.resultCurr, nil } task, ok := <-e.resultCh if !ok { return nil, nil } select { case <-ctx.Done(): return nil, errors.Trace(ctx.Err()) case err := <-task.doneCh: if err != nil { return nil, errors.Trace(err) } } // Release the memory usage of last task before we handle a new task. if e.resultCurr != nil { e.resultCurr.memTracker.Consume(-e.resultCurr.memUsage) } e.resultCurr = task return e.resultCurr, nil } func handleWorkerPanic(ctx context.Context, finished, limitDone <-chan struct{}, ch chan<- *indexMergeTableTask, extraNotifyCh chan bool, worker string) func(r any) { return func(r any) { if worker == processWorkerType { // There is only one processWorker, so it's safe to close here. // No need to worry about "close on closed channel" error. defer close(ch) } if r == nil { logutil.BgLogger().Debug("worker finish without panic", zap.String("worker", worker)) return } if extraNotifyCh != nil { extraNotifyCh <- true } err4Panic := util.GetRecoverError(r) logutil.Logger(ctx).Warn(err4Panic.Error()) doneCh := make(chan error, 1) doneCh <- err4Panic task := &indexMergeTableTask{ lookupTableTask: lookupTableTask{ doneCh: doneCh, }, } select { case <-ctx.Done(): return case <-finished: return case <-limitDone: // once the process worker recovered from panic, once finding the limitDone signal, actually we can return. return case ch <- task: return } } } // Close implements Exec Close interface. func (e *IndexMergeReaderExecutor) Close() error { if e.stats != nil { defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats) } if e.indexUsageReporter != nil { for _, p := range e.partialPlans { switch p := p[0].(type) { case *physicalop.PhysicalTableScan: e.indexUsageReporter.ReportCopIndexUsageForHandle(e.table, p.ID()) case *physicalop.PhysicalIndexScan: e.indexUsageReporter.ReportCopIndexUsageForTable(e.table, p.Index.ID, p.ID()) } } } if e.finished == nil { return nil } close(e.finished) e.tblWorkerWg.Wait() e.idxWorkerWg.Wait() e.processWorkerWg.Wait() e.finished = nil e.workerStarted = false return nil } type indexMergeProcessWorker struct { indexMerge *IndexMergeReaderExecutor stats *IndexMergeRuntimeStat } type rowIdx struct { partialID int taskID int rowID int } type handleHeap struct { // requiredCnt == 0 means need all handles requiredCnt uint64 tracker *memory.Tracker taskMap map[int][]*indexMergeTableTask idx []rowIdx compareFunc []chunk.CompareFunc byItems []*plannerutil.ByItems } func (h handleHeap) Len() int { return len(h.idx) } func (h handleHeap) Less(i, j int) bool { rowI := h.taskMap[h.idx[i].partialID][h.idx[i].taskID].idxRows.GetRow(h.idx[i].rowID) rowJ := h.taskMap[h.idx[j].partialID][h.idx[j].taskID].idxRows.GetRow(h.idx[j].rowID) for i, compFunc := range h.compareFunc { cmp := compFunc(rowI, i, rowJ, i) if !h.byItems[i].Desc { cmp = -cmp } if cmp < 0 { return true } else if cmp > 0 { return false } } return false } func (h handleHeap) Swap(i, j int) { h.idx[i], h.idx[j] = h.idx[j], h.idx[i] } func (h *handleHeap) Push(x any) { idx := x.(rowIdx) h.idx = append(h.idx, idx) if h.tracker != nil { h.tracker.Consume(int64(unsafe.Sizeof(h.idx))) } } func (h *handleHeap) Pop() any { idxRet := h.idx[len(h.idx)-1] h.idx = h.idx[:len(h.idx)-1] if h.tracker != nil { h.tracker.Consume(-int64(unsafe.Sizeof(h.idx))) } return idxRet } func (w *indexMergeProcessWorker) NewHandleHeap(taskMap map[int][]*indexMergeTableTask, memTracker *memory.Tracker) *handleHeap { compareFuncs := make([]chunk.CompareFunc, 0, len(w.indexMerge.byItems)) for _, item := range w.indexMerge.byItems { keyType := item.Expr.GetType(w.indexMerge.Ctx().GetExprCtx().GetEvalCtx()) compareFuncs = append(compareFuncs, chunk.GetCompareFunc(keyType)) } requiredCnt := uint64(0) if w.indexMerge.pushedLimit != nil { // Pre-allocate up to 1024 to avoid oom requiredCnt = min(1024, w.indexMerge.pushedLimit.Count+w.indexMerge.pushedLimit.Offset) } return &handleHeap{ requiredCnt: requiredCnt, tracker: memTracker, taskMap: taskMap, idx: make([]rowIdx, 0, requiredCnt), compareFunc: compareFuncs, byItems: w.indexMerge.byItems, } } // pruneTableWorkerTaskIdxRows prune idxRows and only keep columns that will be used in byItems. // e.g. the common handle is (`b`, `c`) and order by with column `c`, we should make column `c` at the first. func (w *indexMergeProcessWorker) pruneTableWorkerTaskIdxRows(task *indexMergeTableTask) { if task.idxRows == nil { return } // IndexScan no need to prune retChk, Columns required by byItems are always first. if plan, ok := w.indexMerge.partialPlans[task.partialPlanID][0].(*physicalop.PhysicalTableScan); ok { prune := make([]int, 0, len(w.indexMerge.byItems)) for _, item := range plan.ByItems { c, _ := item.Expr.(*expression.Column) idx := plan.Schema().ColumnIndex(c) // couldn't equals to -1 here, if idx == -1, just let it panic prune = append(prune, idx) } task.idxRows = task.idxRows.Prune(prune) } } func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderBy(ctx context.Context, fetchCh <-chan *indexMergeTableTask, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { memTracker := memory.NewTracker(w.indexMerge.ID(), -1) memTracker.AttachTo(w.indexMerge.memTracker) defer memTracker.Detach() defer close(workCh) if w.stats != nil { start := time.Now() defer func() { w.stats.IndexMergeProcess += time.Since(start) }() } distinctHandles := kv.NewHandleMap() taskMap := make(map[int][]*indexMergeTableTask) uselessMap := make(map[int]struct{}) taskHeap := w.NewHandleHeap(taskMap, memTracker) for task := range fetchCh { select { case err := <-task.doneCh: // If got error from partialIndexWorker/partialTableWorker, stop processing. if err != nil { syncErr(ctx, finished, resultCh, err) return } default: } if _, ok := uselessMap[task.partialPlanID]; ok { continue } if _, ok := taskMap[task.partialPlanID]; !ok { taskMap[task.partialPlanID] = make([]*indexMergeTableTask, 0, 1) } w.pruneTableWorkerTaskIdxRows(task) taskMap[task.partialPlanID] = append(taskMap[task.partialPlanID], task) for i, h := range task.handles { if _, ok := distinctHandles.Get(h); !ok { distinctHandles.Set(h, true) heap.Push(taskHeap, rowIdx{task.partialPlanID, len(taskMap[task.partialPlanID]) - 1, i}) if int(taskHeap.requiredCnt) != 0 && taskHeap.Len() > int(taskHeap.requiredCnt) { top := heap.Pop(taskHeap).(rowIdx) if top.partialID == task.partialPlanID && top.taskID == len(taskMap[task.partialPlanID])-1 && top.rowID == i { uselessMap[task.partialPlanID] = struct{}{} task.handles = task.handles[:i] break } } } memTracker.Consume(int64(h.MemUsage())) } memTracker.Consume(task.idxRows.MemoryUsage()) if len(uselessMap) == len(w.indexMerge.partialPlans) { // consume reset tasks go func() { channel.Clear(fetchCh) }() break } } needCount := taskHeap.Len() if w.indexMerge.pushedLimit != nil { needCount = max(0, taskHeap.Len()-int(w.indexMerge.pushedLimit.Offset)) } if needCount == 0 { return } fhs := make([]kv.Handle, needCount) for i := needCount - 1; i >= 0; i-- { idx := heap.Pop(taskHeap).(rowIdx) fhs[i] = taskMap[idx.partialID][idx.taskID].handles[idx.rowID] } batchSize := w.indexMerge.Ctx().GetSessionVars().IndexLookupSize tasks := make([]*indexMergeTableTask, 0, len(fhs)/batchSize+1) for len(fhs) > 0 { l := min(len(fhs), batchSize) // Save the index order. indexOrder := kv.NewHandleMap() for i, h := range fhs[:l] { indexOrder.Set(h, i) } tasks = append(tasks, &indexMergeTableTask{ lookupTableTask: lookupTableTask{ handles: fhs[:l], indexOrder: indexOrder, doneCh: make(chan error, 1), }, }) fhs = fhs[l:] } for _, task := range tasks { select { case <-ctx.Done(): return case <-finished: return case resultCh <- task: failpoint.Inject("testCancelContext", func() { IndexMergeCancelFuncForTest() }) select { case <-ctx.Done(): return case <-finished: return case workCh <- task: continue } } } } func pushedLimitCountingDown(pushedLimit *physicalop.PushedDownLimit, handles []kv.Handle) (next bool, res []kv.Handle) { fhsLen := uint64(len(handles)) // The number of handles is less than the offset, discard all handles. if fhsLen <= pushedLimit.Offset { pushedLimit.Offset -= fhsLen return true, nil } handles = handles[pushedLimit.Offset:] pushedLimit.Offset = 0 fhsLen = uint64(len(handles)) // The number of handles is greater than the limit, only keep limit count. if fhsLen > pushedLimit.Count { handles = handles[:pushedLimit.Count] } pushedLimit.Count -= min(pushedLimit.Count, fhsLen) return false, handles } func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-chan *indexMergeTableTask, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { failpoint.Return() }) memTracker := memory.NewTracker(w.indexMerge.ID(), -1) memTracker.AttachTo(w.indexMerge.memTracker) defer memTracker.Detach() defer close(workCh) failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil) var pushedLimit *physicalop.PushedDownLimit if w.indexMerge.pushedLimit != nil { pushedLimit = w.indexMerge.pushedLimit.Clone() } hMap := kv.NewHandleMap() for { var ok bool var task *indexMergeTableTask if pushedLimit != nil && pushedLimit.Count == 0 { return } select { case <-ctx.Done(): return case <-finished: return case task, ok = <-fetchCh: if !ok { return } } select { case err := <-task.doneCh: // If got error from partialIndexWorker/partialTableWorker, stop processing. if err != nil { syncErr(ctx, finished, resultCh, err) return } default: } start := time.Now() handles := task.handles fhs := make([]kv.Handle, 0, 8) memTracker.Consume(int64(cap(task.handles) * 8)) for _, h := range handles { if w.indexMerge.partitionTableMode { if _, ok := h.(kv.PartitionHandle); !ok { h = kv.NewPartitionHandle(task.partitionTable.GetPhysicalID(), h) } } if _, ok := hMap.Get(h); !ok { fhs = append(fhs, h) hMap.Set(h, true) } } if len(fhs) == 0 { continue } if pushedLimit != nil { next, res := pushedLimitCountingDown(pushedLimit, fhs) if next { continue } fhs = res } task = &indexMergeTableTask{ lookupTableTask: lookupTableTask{ handles: fhs, doneCh: make(chan error, 1), partitionTable: task.partitionTable, }, } if w.stats != nil { w.stats.IndexMergeProcess += time.Since(start) } failpoint.Inject("testIndexMergeProcessWorkerUnionHang", func(_ failpoint.Value) { for range cap(resultCh) { select { case resultCh <- &indexMergeTableTask{}: default: } } }) select { case <-ctx.Done(): return case <-finished: return case resultCh <- task: failpoint.Inject("testCancelContext", func() { IndexMergeCancelFuncForTest() }) select { case <-ctx.Done(): return case <-finished: return case workCh <- task: } } } } // intersectionCollectWorker is used to dispatch index-merge-table-task to original workCh and resultCh. // a kind of interceptor to control the pushed down limit restriction. (should be no performance impact) type intersectionCollectWorker struct { pushedLimit *physicalop.PushedDownLimit collectCh chan *indexMergeTableTask limitDone chan struct{} } func (w *intersectionCollectWorker) doIntersectionLimitAndDispatch(ctx context.Context, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { var ( ok bool task *indexMergeTableTask ) for { select { case <-ctx.Done(): return case <-finished: return case task, ok = <-w.collectCh: if !ok { return } // receive a new intersection task here, adding limit restriction logic if w.pushedLimit != nil { if w.pushedLimit.Count == 0 { // close limitDone channel to notify intersectionProcessWorkers * N to exit. close(w.limitDone) return } next, handles := pushedLimitCountingDown(w.pushedLimit, task.handles) if next { continue } task.handles = handles } // dispatch the new task to workCh and resultCh. select { case <-ctx.Done(): return case <-finished: return case workCh <- task: select { case <-ctx.Done(): return case <-finished: return case resultCh <- task: } } } } } type intersectionProcessWorker struct { // key: parTblIdx, val: HandleMap // Value of MemAwareHandleMap is *int to avoid extra Get(). handleMapsPerWorker map[int]*kv.MemAwareHandleMap[*int] workerID int workerCh chan *indexMergeTableTask indexMerge *IndexMergeReaderExecutor memTracker *memory.Tracker batchSize int // When rowDelta == memConsumeBatchSize, Consume(memUsage) rowDelta int64 mapUsageDelta int64 partitionIDMap map[int64]int } func (w *intersectionProcessWorker) consumeMemDelta() { w.memTracker.Consume(w.mapUsageDelta + w.rowDelta*int64(unsafe.Sizeof(int(0)))) w.mapUsageDelta = 0 w.rowDelta = 0 } // doIntersectionPerPartition fetch all the task from workerChannel, and after that, then do the intersection pruning, which // will cause wasting a lot of time waiting for all the fetch task done. func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Context, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished, limitDone <-chan struct{}) { failpoint.Inject("testIndexMergePanicPartitionTableIntersectionWorker", nil) defer w.memTracker.Detach() for task := range w.workerCh { var ok bool var hMap *kv.MemAwareHandleMap[*int] if hMap, ok = w.handleMapsPerWorker[task.parTblIdx]; !ok { hMap = kv.NewMemAwareHandleMap[*int]() w.handleMapsPerWorker[task.parTblIdx] = hMap } var mapDelta, rowDelta int64 for _, h := range task.handles { if w.indexMerge.hasGlobalIndex { if ph, ok := h.(kv.PartitionHandle); ok { if v, exists := w.partitionIDMap[ph.PartitionID]; exists { if hMap, ok = w.handleMapsPerWorker[v]; !ok { hMap = kv.NewMemAwareHandleMap[*int]() w.handleMapsPerWorker[v] = hMap } } } else { h = kv.NewPartitionHandle(task.partitionTable.GetPhysicalID(), h) } } // Use *int to avoid Get() again. if cntPtr, ok := hMap.Get(h); ok { (*cntPtr)++ } else { cnt := 1 mapDelta += hMap.Set(h, &cnt) + int64(h.ExtraMemSize()) rowDelta++ } } logutil.BgLogger().Debug("intersectionProcessWorker handle tasks", zap.Int("workerID", w.workerID), zap.Int("task.handles", len(task.handles)), zap.Int64("rowDelta", rowDelta)) w.mapUsageDelta += mapDelta w.rowDelta += rowDelta if w.rowDelta >= int64(w.batchSize) { w.consumeMemDelta() } failpoint.Inject("testIndexMergeIntersectionWorkerPanic", nil) } if w.rowDelta > 0 { w.consumeMemDelta() } // We assume the result of intersection is small, so no need to track memory. intersectedMap := make(map[int][]kv.Handle, len(w.handleMapsPerWorker)) for parTblIdx, hMap := range w.handleMapsPerWorker { hMap.Range(func(h kv.Handle, val *int) bool { if *(val) == len(w.indexMerge.partialPlans) { // Means all partial paths have this handle. intersectedMap[parTblIdx] = append(intersectedMap[parTblIdx], h) } return true }) } tasks := make([]*indexMergeTableTask, 0, len(w.handleMapsPerWorker)) for parTblIdx, intersected := range intersectedMap { // Split intersected[parTblIdx] to avoid task is too large. for len(intersected) > 0 { length := min(w.batchSize, len(intersected)) task := &indexMergeTableTask{ lookupTableTask: lookupTableTask{ handles: intersected[:length], doneCh: make(chan error, 1), }, } intersected = intersected[length:] if w.indexMerge.partitionTableMode { task.partitionTable = w.indexMerge.prunedPartitions[parTblIdx] } tasks = append(tasks, task) logutil.BgLogger().Debug("intersectionProcessWorker build tasks", zap.Int("parTblIdx", parTblIdx), zap.Int("task.handles", len(task.handles))) } } failpoint.Inject("testIndexMergeProcessWorkerIntersectionHang", func(_ failpoint.Value) { if resultCh != nil { for range cap(resultCh) { select { case resultCh <- &indexMergeTableTask{}: default: } } } }) for _, task := range tasks { select { case <-ctx.Done(): return case <-finished: return case <-limitDone: // limitDone has signal means the collectWorker has collected enough results, shutdown process workers quickly here. return case workCh <- task: // resultCh != nil means there is no collectWorker, and we should send task to resultCh too by ourselves here. if resultCh != nil { select { case <-ctx.Done(): return case <-finished: return case resultCh <- task: } } } } } // for every index merge process worker, it should be feed on a sortedSelectResult for every partial index plan (constructed on all // table partition ranges results on that index plan path). Since every partial index path is a sorted select result, we can utilize // K-way merge to accelerate the intersection process. // // partialIndexPlan-1 ---> SSR ---> + // partialIndexPlan-2 ---> SSR ---> + ---> SSR K-way Merge ---> output IndexMergeTableTask // partialIndexPlan-3 ---> SSR ---> + // ... + // partialIndexPlan-N ---> SSR ---> + // // K-way merge detail: for every partial index plan, output one row as current its representative row. Then, comparing the N representative // rows together: // // Loop start: // // case 1: they are all the same, intersection succeed. --- Record current handle down (already in index order). // case 2: distinguish among them, for the minimum value/values corresponded index plan/plans. --- Discard current representative row, fetch next. // // goto Loop start: // // encapsulate all the recorded handles (already in index order) as index merge table tasks, sending them out. func (*indexMergeProcessWorker) fetchLoopIntersectionWithOrderBy(_ context.Context, _ <-chan *indexMergeTableTask, _ chan<- *indexMergeTableTask, _ chan<- *indexMergeTableTask, _ <-chan struct{}) { // todo: pushed sort property with partial index plan and limit. } // For each partition(dynamic mode), a map is used to do intersection. Key of the map is handle, and value is the number of times it occurs. // If the value of handle equals the number of partial paths, it should be sent to final_table_scan_worker. // To avoid too many goroutines, each intersectionProcessWorker can handle multiple partitions. func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fetchCh <-chan *indexMergeTableTask, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { defer close(workCh) if w.stats != nil { start := time.Now() defer func() { w.stats.IndexMergeProcess += time.Since(start) }() } failpoint.Inject("testIndexMergePanicProcessWorkerIntersection", nil) // One goroutine may handle one or multiple partitions. // Max number of partition number is 8192, we use ExecutorConcurrency to avoid too many goroutines. maxWorkerCnt := w.indexMerge.Ctx().GetSessionVars().IndexMergeIntersectionConcurrency() maxChannelSize := atomic.LoadInt32(&LookupTableTaskChannelSize) batchSize := w.indexMerge.Ctx().GetSessionVars().IndexLookupSize partCnt := 1 // To avoid multi-threaded access the handle map, we only use one worker for indexMerge with global index. if w.indexMerge.partitionTableMode && !w.indexMerge.hasGlobalIndex { partCnt = len(w.indexMerge.prunedPartitions) } workerCnt := min(partCnt, maxWorkerCnt) failpoint.Inject("testIndexMergeIntersectionConcurrency", func(val failpoint.Value) { con := val.(int) if con != workerCnt { panic(fmt.Sprintf("unexpected workerCnt, expect %d, got %d", con, workerCnt)) } }) partitionIDMap := make(map[int64]int) if w.indexMerge.hasGlobalIndex { for i, p := range w.indexMerge.prunedPartitions { partitionIDMap[p.GetPhysicalID()] = i } } workers := make([]*intersectionProcessWorker, 0, workerCnt) var collectWorker *intersectionCollectWorker wg := util.WaitGroupWrapper{} wg2 := util.WaitGroupWrapper{} errCh := make(chan bool, workerCnt) var limitDone chan struct{} if w.indexMerge.pushedLimit != nil { // no memory cost for this code logic. collectWorker = &intersectionCollectWorker{ // same size of workCh/resultCh collectCh: make(chan *indexMergeTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize)), pushedLimit: w.indexMerge.pushedLimit.Clone(), limitDone: make(chan struct{}), } limitDone = collectWorker.limitDone wg2.RunWithRecover(func() { defer trace.StartRegion(ctx, "IndexMergeIntersectionProcessWorker").End() collectWorker.doIntersectionLimitAndDispatch(ctx, workCh, resultCh, finished) }, handleWorkerPanic(ctx, finished, nil, resultCh, errCh, partTblIntersectionWorkerType)) } for i := range workerCnt { tracker := memory.NewTracker(w.indexMerge.ID(), -1) tracker.AttachTo(w.indexMerge.memTracker) worker := &intersectionProcessWorker{ workerID: i, handleMapsPerWorker: make(map[int]*kv.MemAwareHandleMap[*int]), workerCh: make(chan *indexMergeTableTask, maxChannelSize), indexMerge: w.indexMerge, memTracker: tracker, batchSize: batchSize, partitionIDMap: partitionIDMap, } wg.RunWithRecover(func() { defer trace.StartRegion(ctx, "IndexMergeIntersectionProcessWorker").End() if collectWorker != nil { // workflow: // intersectionProcessWorker-1 --+ (limit restriction logic) // intersectionProcessWorker-2 --+--------- collectCh--> intersectionCollectWorker +--> workCh --> table worker // ... --+ <--- limitDone to shut inputs ------+ +-> resultCh --> upper parent // intersectionProcessWorker-N --+ worker.doIntersectionPerPartition(ctx, collectWorker.collectCh, nil, finished, collectWorker.limitDone) } else { // workflow: // intersectionProcessWorker-1 --------------------------+--> workCh --> table worker // intersectionProcessWorker-2 ---(same as above) +--> resultCh --> upper parent // ... ---(same as above) // intersectionProcessWorker-N ---(same as above) worker.doIntersectionPerPartition(ctx, workCh, resultCh, finished, nil) } }, handleWorkerPanic(ctx, finished, limitDone, resultCh, errCh, partTblIntersectionWorkerType)) workers = append(workers, worker) } defer func() { for _, processWorker := range workers { close(processWorker.workerCh) } wg.Wait() // only after all the possible writer closed, can we shut down the collectCh. if collectWorker != nil { // you don't need to clear the channel before closing it, so discard all the remain tasks. close(collectWorker.collectCh) } wg2.Wait() }() for { var ok bool var task *indexMergeTableTask select { case <-ctx.Done(): return case <-finished: return case task, ok = <-fetchCh: if !ok { return } } select { case err := <-task.doneCh: // If got error from partialIndexWorker/partialTableWorker, stop processing. if err != nil { syncErr(ctx, finished, resultCh, err) return } default: } select { case <-ctx.Done(): return case <-finished: return case workers[task.parTblIdx%workerCnt].workerCh <- task: case <-errCh: // If got error from intersectionProcessWorker, stop processing. return } } } type partialIndexWorker struct { stats *IndexMergeRuntimeStat sc sessionctx.Context idxID int batchSize int maxBatchSize int maxChunkSize int memTracker *memory.Tracker partitionTableMode bool prunedPartitions []table.PhysicalTable byItems []*plannerutil.ByItems scannedKeys uint64 pushedLimit *physicalop.PushedDownLimit dagPB *tipb.DAGRequest plan []base.PhysicalPlan } func syncErr(ctx context.Context, finished <-chan struct{}, errCh chan<- *indexMergeTableTask, err error) { logutil.BgLogger().Error("IndexMergeReaderExecutor.syncErr", zap.Error(err)) doneCh := make(chan error, 1) doneCh <- err task := &indexMergeTableTask{ lookupTableTask: lookupTableTask{ doneCh: doneCh, }, } // ctx.Done and finished is to avoid write channel is stuck. select { case <-ctx.Done(): return case <-finished: return case errCh <- task: return } } // needPartitionHandle indicates whether we need create a partitionHandle or not. // If the schema from planner part contains ExtraPhysTblID, // we need create a partitionHandle, otherwise create a normal handle. // In TableRowIDScan, the partitionHandle will be used to create key ranges. func (w *partialIndexWorker) needPartitionHandle() (bool, error) { cols := w.plan[0].Schema().Columns outputOffsets := w.dagPB.OutputOffsets col := cols[outputOffsets[len(outputOffsets)-1]] is := w.plan[0].(*physicalop.PhysicalIndexScan) needPartitionHandle := w.partitionTableMode && len(w.byItems) > 0 || is.Index.Global hasExtraCol := col.ID == model.ExtraPhysTblID // There will be two needPartitionHandle != hasExtraCol situations. // Only `needPartitionHandle` == true and `hasExtraCol` == false are not allowed. // `ExtraPhysTblID` will be used in `SelectLock` when `needPartitionHandle` == false and `hasExtraCol` == true. if needPartitionHandle && !hasExtraCol { return needPartitionHandle, errors.Errorf("Internal error, needPartitionHandle != ret") } return needPartitionHandle, nil } func (w *partialIndexWorker) fetchHandles( ctx context.Context, results []distsql.SelectResult, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, finished <-chan struct{}, handleCols plannerutil.HandleCols, partialPlanIndex int) (count int64, err error) { tps := w.getRetTpsForIndexScan(handleCols) chk := chunk.NewChunkWithCapacity(tps, w.maxChunkSize) for i := 0; i < len(results); { start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, results[i], handleCols) if err != nil { syncErr(ctx, finished, fetchCh, err) return count, err } if len(handles) == 0 { i++ continue } count += int64(len(handles)) task := w.buildTableTask(handles, retChunk, i, partialPlanIndex) if w.stats != nil { atomic.AddInt64(&w.stats.FetchIdxTime, int64(time.Since(start))) } select { case <-ctx.Done(): return count, ctx.Err() case <-exitCh: return count, nil case <-finished: return count, nil case fetchCh <- task: } } return count, nil } func (w *partialIndexWorker) getRetTpsForIndexScan(handleCols plannerutil.HandleCols) []*types.FieldType { var tps []*types.FieldType if len(w.byItems) != 0 { for _, item := range w.byItems { tps = append(tps, item.Expr.GetType(w.sc.GetExprCtx().GetEvalCtx())) } } tps = append(tps, handleCols.GetFieldsTypes()...) if ok, _ := w.needPartitionHandle(); ok { tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) } return tps } func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, handleCols plannerutil.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) if len(w.byItems) != 0 { retChk = chunk.NewChunkWithCapacity(w.getRetTpsForIndexScan(handleCols), w.batchSize) } var memUsage int64 var chunkRowOffset int defer w.memTracker.Consume(-memUsage) for len(handles) < w.batchSize { requiredRows := w.batchSize - len(handles) if w.pushedLimit != nil { if w.pushedLimit.Offset+w.pushedLimit.Count <= w.scannedKeys { return handles, retChk, nil } requiredRows = min(int(w.pushedLimit.Offset+w.pushedLimit.Count-w.scannedKeys), requiredRows) } chk.SetRequiredRows(requiredRows, w.maxChunkSize) start := time.Now() err = errors.Trace(idxResult.Next(ctx, chk)) if err != nil { return nil, nil, err } if w.stats != nil && w.idxID != 0 { w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(w.idxID, false).Record(time.Since(start), chk.NumRows()) } if chk.NumRows() == 0 { failpoint.Inject("testIndexMergeErrorPartialIndexWorker", func(v failpoint.Value) { failpoint.Return(handles, nil, errors.New(v.(string))) }) return handles, retChk, nil } memDelta := chk.MemoryUsage() memUsage += memDelta w.memTracker.Consume(memDelta) // We want both to reset chunkRowOffset and keep it up-to-date after the loop // nolint:intrange for chunkRowOffset = 0; chunkRowOffset < chk.NumRows(); chunkRowOffset++ { if w.pushedLimit != nil { w.scannedKeys++ if w.scannedKeys > (w.pushedLimit.Offset + w.pushedLimit.Count) { // Skip the handles after Offset+Count. break } } var handle kv.Handle ok, err1 := w.needPartitionHandle() if err1 != nil { return nil, nil, err1 } if ok { handle, err = handleCols.BuildPartitionHandleFromIndexRow(w.sc.GetSessionVars().StmtCtx, chk.GetRow(chunkRowOffset)) } else { handle, err = handleCols.BuildHandleFromIndexRow(w.sc.GetSessionVars().StmtCtx, chk.GetRow(chunkRowOffset)) } if err != nil { return nil, nil, err } handles = append(handles, handle) } // used for order by if len(w.byItems) != 0 { retChk.Append(chk, 0, chunkRowOffset) } } w.batchSize *= 2 if w.batchSize > w.maxBatchSize { w.batchSize = w.maxBatchSize } return handles, retChk, nil } func (w *partialIndexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk, parTblIdx int, partialPlanID int) *indexMergeTableTask { task := &indexMergeTableTask{ lookupTableTask: lookupTableTask{ handles: handles, idxRows: retChk, }, parTblIdx: parTblIdx, partialPlanID: partialPlanID, } if w.prunedPartitions != nil { task.partitionTable = w.prunedPartitions[parTblIdx] } task.doneCh = make(chan error, 1) return task } type indexMergeTableScanWorker struct { stats *IndexMergeRuntimeStat workCh <-chan *indexMergeTableTask finished <-chan struct{} indexMergeExec *IndexMergeReaderExecutor tblPlans []base.PhysicalPlan // memTracker is used to track the memory usage of this executor. memTracker *memory.Tracker } func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task **indexMergeTableTask) { var ok bool for { waitStart := time.Now() select { case <-ctx.Done(): return case <-w.finished: return case *task, ok = <-w.workCh: if !ok { return } } // Make sure panic failpoint is after fetch task from workCh. // Otherwise, cannot send error to task.doneCh. failpoint.Inject("testIndexMergePanicTableScanWorker", nil) execStart := time.Now() err := w.executeTask(ctx, *task) if w.stats != nil { atomic.AddInt64(&w.stats.WaitTime, int64(execStart.Sub(waitStart))) atomic.AddInt64(&w.stats.FetchRow, int64(time.Since(execStart))) atomic.AddInt64(&w.stats.TableTaskNum, 1) } failpoint.Inject("testIndexMergePickAndExecTaskPanic", nil) select { case <-ctx.Done(): return case <-w.finished: return case (*task).doneCh <- err: } } } func (*indexMergeTableScanWorker) handleTableScanWorkerPanic(ctx context.Context, finished <-chan struct{}, task **indexMergeTableTask, worker string) func(r any) { return func(r any) { if r == nil { logutil.BgLogger().Debug("worker finish without panic", zap.String("worker", worker)) return } err4Panic := errors.Errorf("%s: %v", worker, r) logutil.Logger(ctx).Warn(err4Panic.Error()) if *task != nil { select { case <-ctx.Done(): return case <-finished: return case (*task).doneCh <- err4Panic: return } } } } func (w *indexMergeTableScanWorker) executeTask(ctx context.Context, task *indexMergeTableTask) error { tbl := w.indexMergeExec.table if w.indexMergeExec.partitionTableMode && task.partitionTable != nil { tbl = task.partitionTable } tableReader, err := w.indexMergeExec.buildFinalTableReader(ctx, tbl, task.handles) if err != nil { logutil.Logger(ctx).Error("build table reader failed", zap.Error(err)) return err } defer func() { terror.Log(exec.Close(tableReader)) }() task.memTracker = w.memTracker memUsage := int64(cap(task.handles) * 8) task.memUsage = memUsage task.memTracker.Consume(memUsage) handleCnt := len(task.handles) task.rows = make([]chunk.Row, 0, handleCnt) for { chk := exec.TryNewCacheChunk(tableReader) err = exec.Next(ctx, tableReader, chk) if err != nil { logutil.Logger(ctx).Warn("table reader fetch next chunk failed", zap.Error(err)) return err } if chk.NumRows() == 0 { break } memUsage = chk.MemoryUsage() task.memUsage += memUsage task.memTracker.Consume(memUsage) iter := chunk.NewIterator4Chunk(chk) for row := iter.Begin(); row != iter.End(); row = iter.Next() { task.rows = append(task.rows, row) } } if w.indexMergeExec.keepOrder { // Because len(outputOffsets) == tableScan.Schema().Len(), // so we could use row.GetInt64(idx) to get partition ID here. // TODO: We could add plannercore.PartitionHandleCols to unify them. physicalTableIDIdx := -1 for i, c := range w.indexMergeExec.Schema().Columns { if c.ID == model.ExtraPhysTblID { physicalTableIDIdx = i break } } task.rowIdx = make([]int, 0, len(task.rows)) for _, row := range task.rows { handle, err := w.indexMergeExec.handleCols.BuildHandle(w.indexMergeExec.Ctx().GetSessionVars().StmtCtx, row) if err != nil { return err } if w.indexMergeExec.partitionTableMode && physicalTableIDIdx != -1 { handle = kv.NewPartitionHandle(row.GetInt64(physicalTableIDIdx), handle) } rowIdx, _ := task.indexOrder.Get(handle) task.rowIdx = append(task.rowIdx, rowIdx.(int)) } sort.Sort(task) } memUsage = int64(cap(task.rows)) * int64(unsafe.Sizeof(chunk.Row{})) task.memUsage += memUsage task.memTracker.Consume(memUsage) if handleCnt != len(task.rows) && len(w.tblPlans) == 1 { return errors.Errorf("handle count %d isn't equal to value count %d", handleCnt, len(task.rows)) } return nil } // IndexMergeRuntimeStat record the indexMerge runtime stat type IndexMergeRuntimeStat struct { IndexMergeProcess time.Duration FetchIdxTime int64 WaitTime int64 FetchRow int64 TableTaskNum int64 Concurrency int } func (e *IndexMergeRuntimeStat) String() string { var buf bytes.Buffer if e.FetchIdxTime != 0 { buf.WriteString(fmt.Sprintf("index_task:{fetch_handle:%s", time.Duration(e.FetchIdxTime))) if e.IndexMergeProcess != 0 { buf.WriteString(fmt.Sprintf(", merge:%s", e.IndexMergeProcess)) } buf.WriteByte('}') } if e.FetchRow != 0 { if buf.Len() > 0 { buf.WriteByte(',') } fmt.Fprintf(&buf, " table_task:{num:%d, concurrency:%d, fetch_row:%s, wait_time:%s}", e.TableTaskNum, e.Concurrency, time.Duration(e.FetchRow), time.Duration(e.WaitTime)) } return buf.String() } // Clone implements the RuntimeStats interface. func (e *IndexMergeRuntimeStat) Clone() execdetails.RuntimeStats { newRs := *e return &newRs } // Merge implements the RuntimeStats interface. func (e *IndexMergeRuntimeStat) Merge(other execdetails.RuntimeStats) { tmp, ok := other.(*IndexMergeRuntimeStat) if !ok { return } e.IndexMergeProcess += tmp.IndexMergeProcess e.FetchIdxTime += tmp.FetchIdxTime e.FetchRow += tmp.FetchRow e.WaitTime += e.WaitTime e.TableTaskNum += tmp.TableTaskNum } // Tp implements the RuntimeStats interface. func (*IndexMergeRuntimeStat) Tp() int { return execdetails.TpIndexMergeRunTimeStats }