883 lines
30 KiB
Go
883 lines
30 KiB
Go
// Copyright 2022 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package executor
|
|
|
|
import (
|
|
"context"
|
|
stderrors "errors"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/domain"
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/metrics"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/parser/mysql"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/statistics"
|
|
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
|
|
"github.com/pingcap/tidb/pkg/table"
|
|
"github.com/pingcap/tidb/pkg/tablecodec"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/codec"
|
|
"github.com/pingcap/tidb/pkg/util/collate"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"github.com/pingcap/tidb/pkg/util/memory"
|
|
"github.com/pingcap/tidb/pkg/util/ranger"
|
|
"github.com/pingcap/tidb/pkg/util/timeutil"
|
|
"github.com/pingcap/tipb/go-tipb"
|
|
"github.com/tiancaiamao/gp"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// AnalyzeColumnsExecV2 is used to maintain v2 analyze process
|
|
type AnalyzeColumnsExecV2 struct {
|
|
*AnalyzeColumnsExec
|
|
}
|
|
|
|
func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2(gp *gp.Pool) *statistics.AnalyzeResults {
|
|
var ranges []*ranger.Range
|
|
if hc := e.handleCols; hc != nil {
|
|
if hc.IsInt() {
|
|
ranges = ranger.FullIntRange(mysql.HasUnsignedFlag(hc.GetCol(0).RetType.GetFlag()))
|
|
} else {
|
|
ranges = ranger.FullNotNullRange()
|
|
}
|
|
} else {
|
|
ranges = ranger.FullIntRange(false)
|
|
}
|
|
|
|
collExtStats := e.ctx.GetSessionVars().EnableExtendedStats
|
|
// specialIndexes holds indexes that include virtual or prefix columns. For these indexes,
|
|
// only the number of distinct values (NDV) is computed using TiKV. Other statistic
|
|
// are derived from sample data processed within TiDB.
|
|
// The reason is that we want to keep the same row sampling for all columns.
|
|
specialIndexes := make([]*model.IndexInfo, 0, len(e.indexes))
|
|
specialIndexesOffsets := make([]int, 0, len(e.indexes))
|
|
for i, idx := range e.indexes {
|
|
isSpecial := false
|
|
for _, col := range idx.Columns {
|
|
colInfo := e.colsInfo[col.Offset]
|
|
isPrefixCol := col.Length != types.UnspecifiedLength
|
|
if colInfo.IsVirtualGenerated() || isPrefixCol {
|
|
isSpecial = true
|
|
break
|
|
}
|
|
}
|
|
if isSpecial {
|
|
specialIndexesOffsets = append(specialIndexesOffsets, i)
|
|
specialIndexes = append(specialIndexes, idx)
|
|
}
|
|
}
|
|
samplingStatsConcurrency, err := getBuildSamplingStatsConcurrency(e.ctx)
|
|
if err != nil {
|
|
e.memTracker.Release(e.memTracker.BytesConsumed())
|
|
return &statistics.AnalyzeResults{Err: err, Job: e.job}
|
|
}
|
|
idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult, 1)
|
|
e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh, samplingStatsConcurrency)
|
|
count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(gp, ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency)
|
|
if err != nil {
|
|
e.memTracker.Release(e.memTracker.BytesConsumed())
|
|
return &statistics.AnalyzeResults{Err: err, Job: e.job}
|
|
}
|
|
cLen := len(e.analyzePB.ColReq.ColumnsInfo)
|
|
colGroupResult := &statistics.AnalyzeResult{
|
|
Hist: hists[cLen:],
|
|
TopNs: topNs[cLen:],
|
|
Fms: fmSketches[cLen:],
|
|
IsIndex: 1,
|
|
}
|
|
// Discard stats of _tidb_rowid.
|
|
// Because the process of analyzing will keep the order of results be the same as the colsInfo in the analyze task,
|
|
// and in `buildAnalyzeFullSamplingTask` we always place the _tidb_rowid at the last of colsInfo, so if there are
|
|
// stats for _tidb_rowid, it must be at the end of the column stats.
|
|
// Virtual column has no histogram yet. So we check nil here.
|
|
if hists[cLen-1] != nil && hists[cLen-1].ID == -1 {
|
|
cLen--
|
|
}
|
|
colResult := &statistics.AnalyzeResult{
|
|
Hist: hists[:cLen],
|
|
TopNs: topNs[:cLen],
|
|
Fms: fmSketches[:cLen],
|
|
}
|
|
|
|
return &statistics.AnalyzeResults{
|
|
TableID: e.tableID,
|
|
Ars: []*statistics.AnalyzeResult{colResult, colGroupResult},
|
|
Job: e.job,
|
|
StatsVer: e.StatsVersion,
|
|
Count: count,
|
|
Snapshot: e.snapshot,
|
|
ExtStats: extStats,
|
|
BaseCount: e.baseCount,
|
|
BaseModifyCnt: e.baseModifyCnt,
|
|
}
|
|
}
|
|
|
|
// decodeSampleDataWithVirtualColumn constructs the virtual column by evaluating from the decoded normal columns.
|
|
func (e *AnalyzeColumnsExecV2) decodeSampleDataWithVirtualColumn(
|
|
collector statistics.RowSampleCollector,
|
|
fieldTps []*types.FieldType,
|
|
virtualColIdx []int,
|
|
schema *expression.Schema,
|
|
) error {
|
|
totFts := make([]*types.FieldType, 0, e.schemaForVirtualColEval.Len())
|
|
for _, col := range e.schemaForVirtualColEval.Columns {
|
|
totFts = append(totFts, col.RetType)
|
|
}
|
|
chk := chunk.NewChunkWithCapacity(totFts, len(collector.Base().Samples))
|
|
decoder := codec.NewDecoder(chk, e.ctx.GetSessionVars().Location())
|
|
for _, sample := range collector.Base().Samples {
|
|
for i, columns := range sample.Columns {
|
|
// Virtual columns will be decoded as null first.
|
|
_, err := decoder.DecodeOne(columns.GetBytes(), i, e.schemaForVirtualColEval.Columns[i].RetType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
intest.AssertFunc(func() bool {
|
|
// Ensure all columns in the chunk have the same number of rows.
|
|
// Checking for virtual columns.
|
|
for i := 1; i < chk.NumCols(); i++ {
|
|
if chk.Column(i).Rows() != chk.Column(0).Rows() {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}, "all columns in chunk should have the same number of rows")
|
|
err := table.FillVirtualColumnValue(fieldTps, virtualColIdx, schema.Columns, e.colsInfo, e.ctx.GetExprCtx(), chk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
iter := chunk.NewIterator4Chunk(chk)
|
|
for row, i := iter.Begin(), 0; row != iter.End(); row, i = iter.Next(), i+1 {
|
|
datums := row.GetDatumRow(totFts)
|
|
collector.Base().Samples[i].Columns = datums
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func printAnalyzeMergeCollectorLog(oldRootCount, newRootCount, subCount, tableID, partitionID int64, isPartition bool, info string, index int) {
|
|
if index < 0 {
|
|
logutil.BgLogger().Debug(info,
|
|
zap.Int64("tableID", tableID),
|
|
zap.Int64("partitionID", partitionID),
|
|
zap.Bool("isPartitionTable", isPartition),
|
|
zap.Int64("oldRootCount", oldRootCount),
|
|
zap.Int64("newRootCount", newRootCount),
|
|
zap.Int64("subCount", subCount))
|
|
} else {
|
|
logutil.BgLogger().Debug(info,
|
|
zap.Int64("tableID", tableID),
|
|
zap.Int64("partitionID", partitionID),
|
|
zap.Bool("isPartitionTable", isPartition),
|
|
zap.Int64("oldRootCount", oldRootCount),
|
|
zap.Int64("newRootCount", newRootCount),
|
|
zap.Int64("subCount", subCount),
|
|
zap.Int("subCollectorIndex", index))
|
|
}
|
|
}
|
|
|
|
func (e *AnalyzeColumnsExecV2) buildSamplingStats(
|
|
gp *gp.Pool,
|
|
ranges []*ranger.Range,
|
|
needExtStats bool,
|
|
indexesWithVirtualColOffsets []int,
|
|
idxNDVPushDownCh chan analyzeIndexNDVTotalResult,
|
|
samplingStatsConcurrency int,
|
|
) (
|
|
count int64,
|
|
hists []*statistics.Histogram,
|
|
topns []*statistics.TopN,
|
|
fmSketches []*statistics.FMSketch,
|
|
extStats *statistics.ExtendedStatsColl,
|
|
err error,
|
|
) {
|
|
// Open memory tracker and resultHandler.
|
|
if err = e.open(ranges); err != nil {
|
|
return 0, nil, nil, nil, nil, err
|
|
}
|
|
defer func() {
|
|
if err1 := e.resultHandler.Close(); err1 != nil {
|
|
err = err1
|
|
}
|
|
}()
|
|
|
|
l := len(e.analyzePB.ColReq.ColumnsInfo) + len(e.analyzePB.ColReq.ColumnGroups)
|
|
rootRowCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
|
|
for range l {
|
|
rootRowCollector.Base().FMSketches = append(rootRowCollector.Base().FMSketches, statistics.NewFMSketch(statistics.MaxSketchSize))
|
|
}
|
|
|
|
sc := e.ctx.GetSessionVars().StmtCtx
|
|
|
|
// Start workers to merge the result from collectors.
|
|
mergeResultCh := make(chan *samplingMergeResult, 1)
|
|
mergeTaskCh := make(chan []byte, 1)
|
|
var taskEg errgroup.Group
|
|
// Start read data from resultHandler and send them to mergeTaskCh.
|
|
taskEg.Go(func() (err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
err = getAnalyzePanicErr(r)
|
|
}
|
|
}()
|
|
return readDataAndSendTask(e.ctx, e.resultHandler, mergeTaskCh, e.memTracker)
|
|
})
|
|
e.samplingMergeWg = &util.WaitGroupWrapper{}
|
|
e.samplingMergeWg.Add(samplingStatsConcurrency)
|
|
for i := range samplingStatsConcurrency {
|
|
id := i
|
|
gp.Go(func() {
|
|
e.subMergeWorker(mergeResultCh, mergeTaskCh, l, id)
|
|
})
|
|
}
|
|
// Merge the result from collectors.
|
|
mergeWorkerPanicCnt := 0
|
|
mergeEg, mergeCtx := errgroup.WithContext(context.Background())
|
|
mergeEg.Go(func() (err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
err = getAnalyzePanicErr(r)
|
|
}
|
|
}()
|
|
for mergeWorkerPanicCnt < samplingStatsConcurrency {
|
|
mergeResult, ok := <-mergeResultCh
|
|
if !ok {
|
|
break
|
|
}
|
|
if mergeResult.err != nil {
|
|
err = mergeResult.err
|
|
if isAnalyzeWorkerPanic(mergeResult.err) {
|
|
mergeWorkerPanicCnt++
|
|
}
|
|
continue
|
|
}
|
|
oldRootCollectorSize := rootRowCollector.Base().MemSize
|
|
oldRootCollectorCount := rootRowCollector.Base().Count
|
|
// Merge the result from sub-collectors.
|
|
rootRowCollector.MergeCollector(mergeResult.collector)
|
|
newRootCollectorCount := rootRowCollector.Base().Count
|
|
printAnalyzeMergeCollectorLog(oldRootCollectorCount, newRootCollectorCount,
|
|
mergeResult.collector.Base().Count, e.tableID.TableID, e.tableID.PartitionID, e.tableID.IsPartitionTable(),
|
|
"merge subMergeWorker in AnalyzeColumnsExecV2", -1)
|
|
e.memTracker.Consume(rootRowCollector.Base().MemSize - oldRootCollectorSize - mergeResult.collector.Base().MemSize)
|
|
mergeResult.collector.DestroyAndPutToPool()
|
|
}
|
|
return err
|
|
})
|
|
err = taskEg.Wait()
|
|
if err != nil {
|
|
mergeCtx.Done()
|
|
if err1 := mergeEg.Wait(); err1 != nil {
|
|
err = stderrors.Join(err, err1)
|
|
}
|
|
return 0, nil, nil, nil, nil, getAnalyzePanicErr(err)
|
|
}
|
|
err = mergeEg.Wait()
|
|
defer e.memTracker.Release(rootRowCollector.Base().MemSize)
|
|
if err != nil {
|
|
return 0, nil, nil, nil, nil, err
|
|
}
|
|
|
|
// Decode the data from sample collectors.
|
|
virtualColIdx := buildVirtualColumnIndex(e.schemaForVirtualColEval, e.colsInfo)
|
|
// Filling virtual columns is necessary here because these samples are used to build statistics for indexes that constructed by virtual columns.
|
|
if len(virtualColIdx) > 0 {
|
|
fieldTps := make([]*types.FieldType, 0, len(virtualColIdx))
|
|
for _, colOffset := range virtualColIdx {
|
|
fieldTps = append(fieldTps, e.schemaForVirtualColEval.Columns[colOffset].RetType)
|
|
}
|
|
err = e.decodeSampleDataWithVirtualColumn(rootRowCollector, fieldTps, virtualColIdx, e.schemaForVirtualColEval)
|
|
if err != nil {
|
|
return 0, nil, nil, nil, nil, err
|
|
}
|
|
} else {
|
|
// If there's no virtual column, normal decode way is enough.
|
|
for _, sample := range rootRowCollector.Base().Samples {
|
|
for i := range sample.Columns {
|
|
sample.Columns[i], err = tablecodec.DecodeColumnValue(sample.Columns[i].GetBytes(), &e.colsInfo[i].FieldType, sc.TimeZone())
|
|
if err != nil {
|
|
return 0, nil, nil, nil, nil, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Calculate handle from the row data for each row. It will be used to sort the samples.
|
|
for _, sample := range rootRowCollector.Base().Samples {
|
|
sample.Handle, err = e.handleCols.BuildHandleByDatums(sc, sample.Columns)
|
|
if err != nil {
|
|
return 0, nil, nil, nil, nil, err
|
|
}
|
|
}
|
|
colLen := len(e.colsInfo)
|
|
// The order of the samples are broken when merging samples from sub-collectors.
|
|
// So now we need to sort the samples according to the handle in order to calculate correlation.
|
|
slices.SortFunc(rootRowCollector.Base().Samples, func(i, j *statistics.ReservoirRowSampleItem) int {
|
|
return i.Handle.Compare(j.Handle)
|
|
})
|
|
|
|
totalLen := len(e.colsInfo) + len(e.indexes)
|
|
hists = make([]*statistics.Histogram, totalLen)
|
|
topns = make([]*statistics.TopN, totalLen)
|
|
fmSketches = make([]*statistics.FMSketch, 0, totalLen)
|
|
buildResultChan := make(chan error, totalLen)
|
|
buildTaskChan := make(chan *samplingBuildTask, totalLen)
|
|
if totalLen < samplingStatsConcurrency {
|
|
samplingStatsConcurrency = totalLen
|
|
}
|
|
e.samplingBuilderWg = newNotifyErrorWaitGroupWrapper(gp, buildResultChan)
|
|
sampleCollectors := make([]*statistics.SampleCollector, len(e.colsInfo))
|
|
exitCh := make(chan struct{})
|
|
e.samplingBuilderWg.Add(samplingStatsConcurrency)
|
|
|
|
// Start workers to build stats.
|
|
for range samplingStatsConcurrency {
|
|
e.samplingBuilderWg.Run(func() {
|
|
e.subBuildWorker(buildResultChan, buildTaskChan, hists, topns, sampleCollectors, exitCh)
|
|
})
|
|
}
|
|
// Generate tasks for building stats.
|
|
for i, col := range e.colsInfo {
|
|
buildTaskChan <- &samplingBuildTask{
|
|
id: col.ID,
|
|
rootRowCollector: rootRowCollector,
|
|
tp: &col.FieldType,
|
|
isColumn: true,
|
|
slicePos: i,
|
|
}
|
|
fmSketches = append(fmSketches, rootRowCollector.Base().FMSketches[i])
|
|
}
|
|
|
|
indexPushedDownResult := <-idxNDVPushDownCh
|
|
if indexPushedDownResult.err != nil {
|
|
close(exitCh)
|
|
e.samplingBuilderWg.Wait()
|
|
return 0, nil, nil, nil, nil, indexPushedDownResult.err
|
|
}
|
|
for _, offset := range indexesWithVirtualColOffsets {
|
|
ret := indexPushedDownResult.results[e.indexes[offset].ID]
|
|
rootRowCollector.Base().NullCount[colLen+offset] = ret.Count
|
|
rootRowCollector.Base().FMSketches[colLen+offset] = ret.Ars[0].Fms[0]
|
|
}
|
|
|
|
// Generate tasks for building stats for indexes.
|
|
for i, idx := range e.indexes {
|
|
buildTaskChan <- &samplingBuildTask{
|
|
id: idx.ID,
|
|
rootRowCollector: rootRowCollector,
|
|
tp: types.NewFieldType(mysql.TypeBlob),
|
|
isColumn: false,
|
|
slicePos: colLen + i,
|
|
}
|
|
fmSketches = append(fmSketches, rootRowCollector.Base().FMSketches[colLen+i])
|
|
}
|
|
close(buildTaskChan)
|
|
|
|
panicCnt := 0
|
|
for panicCnt < samplingStatsConcurrency {
|
|
err1, ok := <-buildResultChan
|
|
if !ok {
|
|
break
|
|
}
|
|
if err1 != nil {
|
|
err = err1
|
|
if isAnalyzeWorkerPanic(err1) {
|
|
panicCnt++
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
defer func() {
|
|
totalSampleCollectorSize := int64(0)
|
|
for _, sampleCollector := range sampleCollectors {
|
|
if sampleCollector != nil {
|
|
totalSampleCollectorSize += sampleCollector.MemSize
|
|
}
|
|
}
|
|
e.memTracker.Release(totalSampleCollectorSize)
|
|
}()
|
|
if err != nil {
|
|
return 0, nil, nil, nil, nil, err
|
|
}
|
|
|
|
count = rootRowCollector.Base().Count
|
|
if needExtStats {
|
|
extStats, err = statistics.BuildExtendedStats(e.ctx, e.TableID.GetStatisticsID(), e.colsInfo, sampleCollectors)
|
|
if err != nil {
|
|
return 0, nil, nil, nil, nil, err
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// handleNDVForSpecialIndexes deals with the logic to analyze the index containing the virtual column when the mode is full sampling.
|
|
func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult, samplingStatsConcurrency int) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logutil.BgLogger().Warn("analyze ndv for special index panicked", zap.Any("recover", r), zap.Stack("stack"))
|
|
metrics.PanicCounter.WithLabelValues(metrics.LabelAnalyze).Inc()
|
|
totalResultCh <- analyzeIndexNDVTotalResult{
|
|
err: getAnalyzePanicErr(r),
|
|
}
|
|
}
|
|
}()
|
|
tasks := e.buildSubIndexJobForSpecialIndex(indexInfos)
|
|
taskCh := make(chan *analyzeTask, len(tasks))
|
|
for _, task := range tasks {
|
|
AddNewAnalyzeJob(e.ctx, task.job)
|
|
}
|
|
resultsCh := make(chan *statistics.AnalyzeResults, len(tasks))
|
|
if len(tasks) < samplingStatsConcurrency {
|
|
samplingStatsConcurrency = len(tasks)
|
|
}
|
|
var subIndexWorkerWg = NewAnalyzeResultsNotifyWaitGroupWrapper(resultsCh)
|
|
subIndexWorkerWg.Add(samplingStatsConcurrency)
|
|
for range samplingStatsConcurrency {
|
|
subIndexWorkerWg.Run(func() { e.subIndexWorkerForNDV(taskCh, resultsCh) })
|
|
}
|
|
for _, task := range tasks {
|
|
taskCh <- task
|
|
}
|
|
close(taskCh)
|
|
panicCnt := 0
|
|
totalResult := analyzeIndexNDVTotalResult{
|
|
results: make(map[int64]*statistics.AnalyzeResults, len(indexInfos)),
|
|
}
|
|
var err error
|
|
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
|
|
for panicCnt < samplingStatsConcurrency {
|
|
results, ok := <-resultsCh
|
|
if !ok {
|
|
break
|
|
}
|
|
if results.Err != nil {
|
|
err = results.Err
|
|
statsHandle.FinishAnalyzeJob(results.Job, err, statistics.TableAnalysisJob)
|
|
if isAnalyzeWorkerPanic(err) {
|
|
panicCnt++
|
|
}
|
|
continue
|
|
}
|
|
statsHandle.FinishAnalyzeJob(results.Job, nil, statistics.TableAnalysisJob)
|
|
totalResult.results[results.Ars[0].Hist[0].ID] = results
|
|
}
|
|
if err != nil {
|
|
totalResult.err = err
|
|
}
|
|
totalResultCh <- totalResult
|
|
}
|
|
|
|
// subIndexWorker receive the task for each index and return the result for them.
|
|
func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, resultsCh chan *statistics.AnalyzeResults) {
|
|
var task *analyzeTask
|
|
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logutil.BgLogger().Warn("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack"))
|
|
metrics.PanicCounter.WithLabelValues(metrics.LabelAnalyze).Inc()
|
|
resultsCh <- &statistics.AnalyzeResults{
|
|
Err: getAnalyzePanicErr(r),
|
|
Job: task.job,
|
|
}
|
|
}
|
|
}()
|
|
for {
|
|
var ok bool
|
|
task, ok = <-taskCh
|
|
if !ok {
|
|
break
|
|
}
|
|
statsHandle.StartAnalyzeJob(task.job)
|
|
if task.taskType != idxTask {
|
|
resultsCh <- &statistics.AnalyzeResults{
|
|
Err: errors.Errorf("incorrect analyze type"),
|
|
Job: task.job,
|
|
}
|
|
continue
|
|
}
|
|
task.idxExec.job = task.job
|
|
resultsCh <- analyzeIndexNDVPushDown(task.idxExec)
|
|
}
|
|
}
|
|
|
|
// buildSubIndexJobForSpecialIndex builds sub index pushed down task to calculate the NDV information for indexes containing virtual column.
|
|
// This is because we cannot push the calculation of the virtual column down to the tikv side.
|
|
func (e *AnalyzeColumnsExecV2) buildSubIndexJobForSpecialIndex(indexInfos []*model.IndexInfo) []*analyzeTask {
|
|
_, offset := timeutil.Zone(e.ctx.GetSessionVars().Location())
|
|
tasks := make([]*analyzeTask, 0, len(indexInfos))
|
|
sc := e.ctx.GetSessionVars().StmtCtx
|
|
concurrency := adaptiveAnlayzeDistSQLConcurrency(context.Background(), e.ctx)
|
|
for _, indexInfo := range indexInfos {
|
|
base := baseAnalyzeExec{
|
|
ctx: e.ctx,
|
|
tableID: e.TableID,
|
|
concurrency: concurrency,
|
|
analyzePB: &tipb.AnalyzeReq{
|
|
Tp: tipb.AnalyzeType_TypeIndex,
|
|
Flags: sc.PushDownFlags(),
|
|
TimeZoneOffset: offset,
|
|
},
|
|
snapshot: e.snapshot,
|
|
}
|
|
idxExec := &AnalyzeIndexExec{
|
|
baseAnalyzeExec: base,
|
|
isCommonHandle: e.tableInfo.IsCommonHandle,
|
|
idxInfo: indexInfo,
|
|
}
|
|
idxExec.opts = make(map[ast.AnalyzeOptionType]uint64, len(ast.AnalyzeOptionString))
|
|
idxExec.opts[ast.AnalyzeOptNumTopN] = 0
|
|
idxExec.opts[ast.AnalyzeOptCMSketchDepth] = 0
|
|
idxExec.opts[ast.AnalyzeOptCMSketchWidth] = 0
|
|
idxExec.opts[ast.AnalyzeOptNumSamples] = 0
|
|
idxExec.opts[ast.AnalyzeOptNumBuckets] = 1
|
|
statsVersion := new(int32)
|
|
*statsVersion = statistics.Version1
|
|
// No Top-N
|
|
topnSize := int32(0)
|
|
idxExec.analyzePB.IdxReq = &tipb.AnalyzeIndexReq{
|
|
// One bucket to store the null for null histogram.
|
|
BucketSize: 1,
|
|
NumColumns: int32(len(indexInfo.Columns)),
|
|
TopNSize: &topnSize,
|
|
Version: statsVersion,
|
|
SketchSize: statistics.MaxSketchSize,
|
|
}
|
|
if idxExec.isCommonHandle && indexInfo.Primary {
|
|
idxExec.analyzePB.Tp = tipb.AnalyzeType_TypeCommonHandle
|
|
}
|
|
// No CM-Sketch.
|
|
depth := int32(0)
|
|
width := int32(0)
|
|
idxExec.analyzePB.IdxReq.CmsketchDepth = &depth
|
|
idxExec.analyzePB.IdxReq.CmsketchWidth = &width
|
|
autoAnalyze := ""
|
|
if e.ctx.GetSessionVars().InRestrictedSQL {
|
|
autoAnalyze = "auto "
|
|
}
|
|
job := &statistics.AnalyzeJob{DBName: e.job.DBName, TableName: e.job.TableName, PartitionName: e.job.PartitionName, JobInfo: autoAnalyze + "analyze ndv for index " + indexInfo.Name.O}
|
|
idxExec.job = job
|
|
tasks = append(tasks, &analyzeTask{
|
|
taskType: idxTask,
|
|
idxExec: idxExec,
|
|
job: job,
|
|
})
|
|
}
|
|
return tasks
|
|
}
|
|
|
|
func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResult, taskCh <-chan []byte, l int, index int) {
|
|
// Only close the resultCh in the first worker.
|
|
closeTheResultCh := index == 0
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logutil.BgLogger().Warn("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack"))
|
|
metrics.PanicCounter.WithLabelValues(metrics.LabelAnalyze).Inc()
|
|
resultCh <- &samplingMergeResult{err: getAnalyzePanicErr(r)}
|
|
}
|
|
// Consume the remaining things.
|
|
for {
|
|
_, ok := <-taskCh
|
|
if !ok {
|
|
break
|
|
}
|
|
}
|
|
e.samplingMergeWg.Done()
|
|
if closeTheResultCh {
|
|
e.samplingMergeWg.Wait()
|
|
close(resultCh)
|
|
}
|
|
}()
|
|
failpoint.Inject("mockAnalyzeSamplingMergeWorkerPanic", func() {
|
|
panic("failpoint triggered")
|
|
})
|
|
failpoint.Inject("mockAnalyzeMergeWorkerSlowConsume", func(val failpoint.Value) {
|
|
times := val.(int)
|
|
for range times {
|
|
e.memTracker.Consume(5 << 20)
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
})
|
|
retCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
|
|
for range l {
|
|
retCollector.Base().FMSketches = append(retCollector.Base().FMSketches, statistics.NewFMSketch(statistics.MaxSketchSize))
|
|
}
|
|
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
|
|
for {
|
|
data, ok := <-taskCh
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
// Unmarshal the data.
|
|
dataSize := int64(cap(data))
|
|
colResp := &tipb.AnalyzeColumnsResp{}
|
|
err := colResp.Unmarshal(data)
|
|
if err != nil {
|
|
resultCh <- &samplingMergeResult{err: err}
|
|
return
|
|
}
|
|
// Consume the memory of the data.
|
|
colRespSize := int64(colResp.Size())
|
|
e.memTracker.Consume(colRespSize)
|
|
|
|
// Update processed rows.
|
|
subCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
|
|
subCollector.Base().FromProto(colResp.RowCollector, e.memTracker)
|
|
statsHandle.UpdateAnalyzeJobProgress(e.job, subCollector.Base().Count)
|
|
|
|
// Print collect log.
|
|
oldRetCollectorSize := retCollector.Base().MemSize
|
|
oldRetCollectorCount := retCollector.Base().Count
|
|
retCollector.MergeCollector(subCollector)
|
|
newRetCollectorCount := retCollector.Base().Count
|
|
printAnalyzeMergeCollectorLog(oldRetCollectorCount, newRetCollectorCount, subCollector.Base().Count,
|
|
e.tableID.TableID, e.tableID.PartitionID, e.TableID.IsPartitionTable(),
|
|
"merge subCollector in concurrency in AnalyzeColumnsExecV2", index)
|
|
|
|
// Consume the memory of the result.
|
|
newRetCollectorSize := retCollector.Base().MemSize
|
|
subCollectorSize := subCollector.Base().MemSize
|
|
e.memTracker.Consume(newRetCollectorSize - oldRetCollectorSize - subCollectorSize)
|
|
e.memTracker.Release(dataSize + colRespSize)
|
|
subCollector.DestroyAndPutToPool()
|
|
}
|
|
|
|
resultCh <- &samplingMergeResult{collector: retCollector}
|
|
}
|
|
|
|
func (e *AnalyzeColumnsExecV2) subBuildWorker(resultCh chan error, taskCh chan *samplingBuildTask, hists []*statistics.Histogram, topns []*statistics.TopN, collectors []*statistics.SampleCollector, exitCh chan struct{}) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logutil.BgLogger().Warn("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack"))
|
|
metrics.PanicCounter.WithLabelValues(metrics.LabelAnalyze).Inc()
|
|
resultCh <- getAnalyzePanicErr(r)
|
|
}
|
|
}()
|
|
failpoint.Inject("mockAnalyzeSamplingBuildWorkerPanic", func() {
|
|
panic("failpoint triggered")
|
|
})
|
|
|
|
colLen := len(e.colsInfo)
|
|
bufferedMemSize := int64(0)
|
|
bufferedReleaseSize := int64(0)
|
|
defer func() {
|
|
e.memTracker.Consume(bufferedMemSize)
|
|
e.memTracker.Release(bufferedReleaseSize)
|
|
}()
|
|
|
|
workLoop:
|
|
for {
|
|
select {
|
|
case task, ok := <-taskCh:
|
|
if !ok {
|
|
break workLoop
|
|
}
|
|
var collector *statistics.SampleCollector
|
|
if task.isColumn {
|
|
if e.colsInfo[task.slicePos].IsGenerated() && !e.colsInfo[task.slicePos].GeneratedStored {
|
|
hists[task.slicePos] = nil
|
|
topns[task.slicePos] = nil
|
|
continue
|
|
}
|
|
sampleNum := task.rootRowCollector.Base().Samples.Len()
|
|
sampleItems := make([]*statistics.SampleItem, 0, sampleNum)
|
|
// consume mandatory memory at the beginning, including empty SampleItems of all sample rows, if exceeds, fast fail
|
|
collectorMemSize := int64(sampleNum) * (8 + statistics.EmptySampleItemSize)
|
|
e.memTracker.Consume(collectorMemSize)
|
|
var collator collate.Collator
|
|
ft := e.colsInfo[task.slicePos].FieldType
|
|
// When it's new collation data, we need to use its collate key instead of original value because only
|
|
// the collate key can ensure the correct ordering.
|
|
// This is also corresponding to similar operation in (*statistics.Column).GetColumnRowCount().
|
|
if ft.EvalType() == types.ETString && ft.GetType() != mysql.TypeEnum && ft.GetType() != mysql.TypeSet {
|
|
collator = collate.GetCollator(ft.GetCollate())
|
|
}
|
|
for j, row := range task.rootRowCollector.Base().Samples {
|
|
if row.Columns[task.slicePos].IsNull() {
|
|
continue
|
|
}
|
|
val := row.Columns[task.slicePos]
|
|
// If this value is very big, we think that it is not a value that can occur many times. So we don't record it.
|
|
if len(val.GetBytes()) > statistics.MaxSampleValueLength {
|
|
continue
|
|
}
|
|
if collator != nil {
|
|
val.SetBytes(collator.Key(val.GetString()))
|
|
deltaSize := int64(cap(val.GetBytes()))
|
|
collectorMemSize += deltaSize
|
|
e.memTracker.BufferedConsume(&bufferedMemSize, deltaSize)
|
|
}
|
|
sampleItems = append(sampleItems, &statistics.SampleItem{
|
|
Value: val,
|
|
Ordinal: j,
|
|
})
|
|
// tmp memory usage
|
|
deltaSize := val.MemUsage() + 4 // content of SampleItem is copied
|
|
e.memTracker.BufferedConsume(&bufferedMemSize, deltaSize)
|
|
e.memTracker.BufferedRelease(&bufferedReleaseSize, deltaSize)
|
|
}
|
|
collector = &statistics.SampleCollector{
|
|
Samples: sampleItems,
|
|
NullCount: task.rootRowCollector.Base().NullCount[task.slicePos],
|
|
Count: task.rootRowCollector.Base().Count - task.rootRowCollector.Base().NullCount[task.slicePos],
|
|
FMSketch: task.rootRowCollector.Base().FMSketches[task.slicePos],
|
|
TotalSize: task.rootRowCollector.Base().TotalSizes[task.slicePos],
|
|
MemSize: collectorMemSize,
|
|
}
|
|
} else {
|
|
var tmpDatum types.Datum
|
|
var err error
|
|
idx := e.indexes[task.slicePos-colLen]
|
|
sampleNum := task.rootRowCollector.Base().Samples.Len()
|
|
sampleItems := make([]*statistics.SampleItem, 0, sampleNum)
|
|
// consume mandatory memory at the beginning, including all SampleItems, if exceeds, fast fail
|
|
// 8 is size of reference, 8 is the size of "b := make([]byte, 0, 8)"
|
|
collectorMemSize := int64(sampleNum) * (8 + statistics.EmptySampleItemSize + 8)
|
|
e.memTracker.Consume(collectorMemSize)
|
|
errCtx := e.ctx.GetSessionVars().StmtCtx.ErrCtx()
|
|
indexSampleCollectLoop:
|
|
for _, row := range task.rootRowCollector.Base().Samples {
|
|
if len(idx.Columns) == 1 && row.Columns[idx.Columns[0].Offset].IsNull() {
|
|
continue
|
|
}
|
|
b := make([]byte, 0, 8)
|
|
for _, col := range idx.Columns {
|
|
// If the index value contains one value which is too long, we think that it's a value that doesn't occur many times.
|
|
if len(row.Columns[col.Offset].GetBytes()) > statistics.MaxSampleValueLength {
|
|
continue indexSampleCollectLoop
|
|
}
|
|
if col.Length != types.UnspecifiedLength {
|
|
row.Columns[col.Offset].Copy(&tmpDatum)
|
|
ranger.CutDatumByPrefixLen(&tmpDatum, col.Length, &e.colsInfo[col.Offset].FieldType)
|
|
b, err = codec.EncodeKey(e.ctx.GetSessionVars().StmtCtx.TimeZone(), b, tmpDatum)
|
|
err = errCtx.HandleError(err)
|
|
if err != nil {
|
|
resultCh <- err
|
|
continue workLoop
|
|
}
|
|
continue
|
|
}
|
|
b, err = codec.EncodeKey(e.ctx.GetSessionVars().StmtCtx.TimeZone(), b, row.Columns[col.Offset])
|
|
err = errCtx.HandleError(err)
|
|
if err != nil {
|
|
resultCh <- err
|
|
continue workLoop
|
|
}
|
|
}
|
|
sampleItems = append(sampleItems, &statistics.SampleItem{
|
|
Value: types.NewBytesDatum(b),
|
|
})
|
|
// tmp memory usage
|
|
deltaSize := sampleItems[len(sampleItems)-1].Value.MemUsage()
|
|
e.memTracker.BufferedConsume(&bufferedMemSize, deltaSize)
|
|
e.memTracker.BufferedRelease(&bufferedReleaseSize, deltaSize)
|
|
}
|
|
collector = &statistics.SampleCollector{
|
|
Samples: sampleItems,
|
|
NullCount: task.rootRowCollector.Base().NullCount[task.slicePos],
|
|
Count: task.rootRowCollector.Base().Count - task.rootRowCollector.Base().NullCount[task.slicePos],
|
|
FMSketch: task.rootRowCollector.Base().FMSketches[task.slicePos],
|
|
TotalSize: task.rootRowCollector.Base().TotalSizes[task.slicePos],
|
|
MemSize: collectorMemSize,
|
|
}
|
|
}
|
|
if task.isColumn {
|
|
collectors[task.slicePos] = collector
|
|
}
|
|
releaseCollectorMemory := func() {
|
|
if !task.isColumn {
|
|
e.memTracker.Release(collector.MemSize)
|
|
}
|
|
}
|
|
hist, topn, err := statistics.BuildHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), task.id, collector, task.tp, task.isColumn, e.memTracker, e.ctx.GetSessionVars().EnableExtendedStats)
|
|
if err != nil {
|
|
resultCh <- err
|
|
releaseCollectorMemory()
|
|
continue
|
|
}
|
|
finalMemSize := hist.MemoryUsage() + topn.MemoryUsage()
|
|
e.memTracker.Consume(finalMemSize)
|
|
hists[task.slicePos] = hist
|
|
topns[task.slicePos] = topn
|
|
resultCh <- nil
|
|
releaseCollectorMemory()
|
|
case <-exitCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type analyzeIndexNDVTotalResult struct {
|
|
results map[int64]*statistics.AnalyzeResults
|
|
err error
|
|
}
|
|
|
|
type samplingMergeResult struct {
|
|
collector statistics.RowSampleCollector
|
|
err error
|
|
}
|
|
|
|
type samplingBuildTask struct {
|
|
id int64
|
|
rootRowCollector statistics.RowSampleCollector
|
|
tp *types.FieldType
|
|
isColumn bool
|
|
slicePos int
|
|
}
|
|
|
|
func readDataAndSendTask(ctx sessionctx.Context, handler *tableResultHandler, mergeTaskCh chan []byte, memTracker *memory.Tracker) error {
|
|
// After all tasks are sent, close the mergeTaskCh to notify the mergeWorker that all tasks have been sent.
|
|
defer close(mergeTaskCh)
|
|
for {
|
|
failpoint.Inject("mockKillRunningV2AnalyzeJob", func() {
|
|
dom := domain.GetDomain(ctx)
|
|
for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() {
|
|
dom.SysProcTracker().KillSysProcess(id)
|
|
}
|
|
})
|
|
if err := ctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil {
|
|
return err
|
|
}
|
|
failpoint.Inject("mockSlowAnalyzeV2", func() {
|
|
time.Sleep(1000 * time.Second)
|
|
})
|
|
|
|
data, err := handler.nextRaw(context.TODO())
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if data == nil {
|
|
break
|
|
}
|
|
|
|
memTracker.Consume(int64(cap(data)))
|
|
mergeTaskCh <- data
|
|
}
|
|
|
|
return nil
|
|
}
|