executor: fix data race at the getIntFromSessionVars (#47614)
close pingcap/tidb#47613
This commit is contained in:
@ -127,17 +127,27 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes
|
||||
specialIndexes = append(specialIndexes, idx)
|
||||
}
|
||||
}
|
||||
samplingStatsConcurrency, err := getBuildSamplingStatsConcurrency(e.ctx)
|
||||
if err != nil {
|
||||
e.memTracker.Release(e.memTracker.BytesConsumed())
|
||||
return &statistics.AnalyzeResults{Err: err, Job: e.job}
|
||||
}
|
||||
statsConcurrncy, err := getBuildStatsConcurrency(e.ctx)
|
||||
if err != nil {
|
||||
e.memTracker.Release(e.memTracker.BytesConsumed())
|
||||
return &statistics.AnalyzeResults{Err: err, Job: e.job}
|
||||
}
|
||||
idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult, 1)
|
||||
// subIndexWorkerWg is better to be initialized in handleNDVForSpecialIndexes, however if we do so, golang would
|
||||
// report unexpected/unreasonable data race error on subIndexWorkerWg when running TestAnalyzeVirtualCol test
|
||||
// case with `-race` flag now.
|
||||
var wg util.WaitGroupWrapper
|
||||
wg.Run(func() {
|
||||
e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh)
|
||||
e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh, statsConcurrncy)
|
||||
})
|
||||
defer wg.Wait()
|
||||
|
||||
count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh)
|
||||
count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency)
|
||||
if err != nil {
|
||||
e.memTracker.Release(e.memTracker.BytesConsumed())
|
||||
return &statistics.AnalyzeResults{Err: err, Job: e.job}
|
||||
@ -239,6 +249,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
|
||||
needExtStats bool,
|
||||
indexesWithVirtualColOffsets []int,
|
||||
idxNDVPushDownCh chan analyzeIndexNDVTotalResult,
|
||||
samplingStatsConcurrency int,
|
||||
) (
|
||||
count int64,
|
||||
hists []*statistics.Histogram,
|
||||
@ -264,10 +275,6 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
|
||||
}
|
||||
|
||||
sc := e.ctx.GetSessionVars().StmtCtx
|
||||
samplingStatsConcurrency, err := getBuildSamplingStatsConcurrency(e.ctx)
|
||||
if err != nil {
|
||||
return 0, nil, nil, nil, nil, err
|
||||
}
|
||||
|
||||
// Start workers to merge the result from collectors.
|
||||
mergeResultCh := make(chan *samplingMergeResult, samplingStatsConcurrency)
|
||||
@ -469,7 +476,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
|
||||
}
|
||||
|
||||
// handleNDVForSpecialIndexes deals with the logic to analyze the index containing the virtual column when the mode is full sampling.
|
||||
func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult) {
|
||||
func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult, statsConcurrncy int) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logutil.BgLogger().Error("analyze ndv for special index panicked", zap.Any("recover", r), zap.Stack("stack"))
|
||||
@ -480,7 +487,6 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
|
||||
}
|
||||
}()
|
||||
tasks := e.buildSubIndexJobForSpecialIndex(indexInfos)
|
||||
statsConcurrncy, err := getBuildStatsConcurrency(e.ctx)
|
||||
taskCh := make(chan *analyzeTask, len(tasks))
|
||||
for _, task := range tasks {
|
||||
AddNewAnalyzeJob(e.ctx, task.job)
|
||||
@ -502,6 +508,7 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
|
||||
totalResult := analyzeIndexNDVTotalResult{
|
||||
results: make(map[int64]*statistics.AnalyzeResults, len(indexInfos)),
|
||||
}
|
||||
var err error
|
||||
for panicCnt < statsConcurrncy {
|
||||
results, ok := <-resultsCh
|
||||
if !ok {
|
||||
|
||||
Reference in New Issue
Block a user