executor: fix data race in IndexMergeReaderExec (#31230)
close pingcap/tidb#31229
This commit is contained in:
@ -253,7 +253,7 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont
|
||||
}()
|
||||
}
|
||||
|
||||
func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) (err error) {
|
||||
func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) error {
|
||||
if e.runtimeStats != nil {
|
||||
collExec := true
|
||||
e.dagPBs[workID].CollectExecutionSummaries = &collExec
|
||||
@ -288,6 +288,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
|
||||
|
||||
if e.isCorColInPartialFilters[workID] {
|
||||
// We got correlated column, so need to refresh Selection operator.
|
||||
var err error
|
||||
if e.dagPBs[workID].Executors, _, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil {
|
||||
worker.syncErr(e.resultCh, err)
|
||||
return
|
||||
@ -359,7 +360,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) (err error) {
|
||||
func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) error {
|
||||
ts := e.partialPlans[workID][0].(*plannercore.PhysicalTableScan)
|
||||
|
||||
tbls := make([]table.Table, 0, 1)
|
||||
@ -376,6 +377,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
|
||||
defer e.idxWorkerWg.Done()
|
||||
util.WithRecovery(
|
||||
func() {
|
||||
var err error
|
||||
partialTableReader := &TableReaderExecutor{
|
||||
baseExecutor: newBaseExecutor(e.ctx, ts.Schema(), e.getPartitalPlanID(workID)),
|
||||
dagPB: e.dagPBs[workID],
|
||||
@ -415,8 +417,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
|
||||
|
||||
// init partialTableReader and partialTableWorker again for the next table
|
||||
partialTableReader.table = tbl
|
||||
err := partialTableReader.Open(ctx)
|
||||
if err != nil {
|
||||
if err = partialTableReader.Open(ctx); err != nil {
|
||||
logutil.Logger(ctx).Error("open Select result failed:", zap.Error(err))
|
||||
worker.syncErr(e.resultCh, err)
|
||||
break
|
||||
@ -438,7 +439,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
|
||||
|
||||
// release related resources
|
||||
cancel()
|
||||
if err := worker.tableReader.Close(); err != nil {
|
||||
if err = worker.tableReader.Close(); err != nil {
|
||||
logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err))
|
||||
}
|
||||
e.ctx.StoreQueryFeedback(e.feedbacks[workID])
|
||||
|
||||
Reference in New Issue
Block a user