diff --git a/executor/analyze_col_v2.go b/executor/analyze_col_v2.go index 204ec95e09..2b9157fc2a 100644 --- a/executor/analyze_col_v2.go +++ b/executor/analyze_col_v2.go @@ -127,17 +127,27 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes specialIndexes = append(specialIndexes, idx) } } + samplingStatsConcurrency, err := getBuildSamplingStatsConcurrency(e.ctx) + if err != nil { + e.memTracker.Release(e.memTracker.BytesConsumed()) + return &statistics.AnalyzeResults{Err: err, Job: e.job} + } + statsConcurrncy, err := getBuildStatsConcurrency(e.ctx) + if err != nil { + e.memTracker.Release(e.memTracker.BytesConsumed()) + return &statistics.AnalyzeResults{Err: err, Job: e.job} + } idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult, 1) // subIndexWorkerWg is better to be initialized in handleNDVForSpecialIndexes, however if we do so, golang would // report unexpected/unreasonable data race error on subIndexWorkerWg when running TestAnalyzeVirtualCol test // case with `-race` flag now. var wg util.WaitGroupWrapper wg.Run(func() { - e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh) + e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh, statsConcurrncy) }) defer wg.Wait() - count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh) + count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency) if err != nil { e.memTracker.Release(e.memTracker.BytesConsumed()) return &statistics.AnalyzeResults{Err: err, Job: e.job} @@ -239,6 +249,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( needExtStats bool, indexesWithVirtualColOffsets []int, idxNDVPushDownCh chan analyzeIndexNDVTotalResult, + samplingStatsConcurrency int, ) ( count int64, hists []*statistics.Histogram, @@ -264,10 +275,6 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( } sc := e.ctx.GetSessionVars().StmtCtx - samplingStatsConcurrency, err := getBuildSamplingStatsConcurrency(e.ctx) - if err != nil { - return 0, nil, nil, nil, nil, err - } // Start workers to merge the result from collectors. mergeResultCh := make(chan *samplingMergeResult, samplingStatsConcurrency) @@ -469,7 +476,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( } // handleNDVForSpecialIndexes deals with the logic to analyze the index containing the virtual column when the mode is full sampling. -func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult) { +func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult, statsConcurrncy int) { defer func() { if r := recover(); r != nil { logutil.BgLogger().Error("analyze ndv for special index panicked", zap.Any("recover", r), zap.Stack("stack")) @@ -480,7 +487,6 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In } }() tasks := e.buildSubIndexJobForSpecialIndex(indexInfos) - statsConcurrncy, err := getBuildStatsConcurrency(e.ctx) taskCh := make(chan *analyzeTask, len(tasks)) for _, task := range tasks { AddNewAnalyzeJob(e.ctx, task.job) @@ -502,6 +508,7 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In totalResult := analyzeIndexNDVTotalResult{ results: make(map[int64]*statistics.AnalyzeResults, len(indexInfos)), } + var err error for panicCnt < statsConcurrncy { results, ok := <-resultsCh if !ok {