diff --git a/bootstrap.go b/bootstrap.go index e3051f91b1..c1971c9c19 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -160,6 +160,7 @@ const ( null_count bigint(64) NOT NULL DEFAULT 0, modify_count bigint(64) NOT NULL DEFAULT 0, version bigint(64) unsigned NOT NULL DEFAULT 0, + cm_sketch blob, unique index tbl(table_id, is_index, hist_id) );` @@ -227,6 +228,7 @@ const ( version13 = 13 version14 = 14 version15 = 15 + version16 = 16 ) func checkBootstrapped(s Session) (bool, error) { @@ -339,6 +341,10 @@ func upgrade(s Session) { upgradeToVer15(s) } + if ver < version16 { + upgradeToVer16(s) + } + updateBootstrapVer(s) _, err = s.Execute(goctx.Background(), "COMMIT") @@ -540,6 +546,10 @@ func upgradeToVer15(s Session) { } } +func upgradeToVer16(s Session) { + doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms ADD COLUMN `cm_sketch` blob", infoschema.ErrColumnExists) +} + // updateBootstrapVer updates bootstrap version variable in mysql.TiDB table. func updateBootstrapVer(s Session) { // Update bootstrap version. diff --git a/domain/domain.go b/domain/domain.go index 1307d21205..8384dfebb5 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -594,8 +594,8 @@ func (do *Domain) updateStatsWorker(ctx context.Context, lease time.Duration) { log.Error("[stats] handle ddl event fail: ", errors.ErrorStack(err)) } case t := <-statsHandle.AnalyzeResultCh(): - for _, hg := range t.Hist { - err := hg.SaveToStorage(ctx, t.TableID, t.Count, t.IsIndex) + for i, hg := range t.Hist { + err := statistics.SaveStatsToStorage(ctx, t.TableID, t.Count, t.IsIndex, hg, t.Cms[i]) if err != nil { log.Error("[stats] save histogram to storage fail: ", errors.ErrorStack(err)) } diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 66f621dd0c..dc905f4fec 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -276,7 +276,7 @@ func (s *testSuite) TestAggregation(c *C) { result = tk.MustQuery("select count(*) from information_schema.columns") // When adding new memory table in information_schema, please update this variable. - columnCountOfAllInformationSchemaTables := "742" + columnCountOfAllInformationSchemaTables := "743" result.Check(testkit.Rows(columnCountOfAllInformationSchemaTables)) tk.MustExec("drop table if exists t1") diff --git a/executor/analyze.go b/executor/analyze.go index 4c6fb16279..81847095d5 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -43,10 +43,12 @@ type AnalyzeExec struct { } const ( - maxSampleSize = 10000 - maxRegionSampleSize = 1000 - maxSketchSize = 10000 - maxBucketSize = 256 + maxSampleSize = 10000 + maxRegionSampleSize = 1000 + maxSketchSize = 10000 + maxBucketSize = 256 + defaultCMSketchDepth = 8 + defaultCMSketchWidth = 2048 ) // Schema implements the Executor Schema interface. @@ -111,8 +113,8 @@ func (e *AnalyzeExec) Next() (Row, error) { return nil, errors.Trace(err1) } for _, result := range results { - for _, hg := range result.Hist { - err = hg.SaveToStorage(e.ctx, result.TableID, result.Count, result.IsIndex) + for i, hg := range result.Hist { + err = statistics.SaveStatsToStorage(e.ctx, result.TableID, result.Count, result.IsIndex, hg, result.Cms[i]) if err != nil { return nil, errors.Trace(err) } @@ -160,13 +162,14 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<- } func analyzeIndexPushdown(idxExec *AnalyzeIndexExec) statistics.AnalyzeResult { - hist, err := idxExec.buildHistogram() + hist, cms, err := idxExec.buildStats() if err != nil { return statistics.AnalyzeResult{Err: err} } result := statistics.AnalyzeResult{ TableID: idxExec.tblInfo.ID, Hist: []*statistics.Histogram{hist}, + Cms: []*statistics.CMSketch{cms}, IsIndex: 1, } if len(hist.Buckets) > 0 { @@ -208,21 +211,23 @@ func (e *AnalyzeIndexExec) open() error { return nil } -func (e *AnalyzeIndexExec) buildHistogram() (hist *statistics.Histogram, err error) { +func (e *AnalyzeIndexExec) buildStats() (hist *statistics.Histogram, cms *statistics.CMSketch, err error) { if err = e.open(); err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } defer func() { if err1 := e.result.Close(); err1 != nil { hist = nil + cms = nil err = errors.Trace(err1) } }() hist = &statistics.Histogram{} + cms = statistics.NewCMSketch(defaultCMSketchDepth, defaultCMSketchWidth) for { data, err := e.result.NextRaw() if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } if data == nil { break @@ -230,25 +235,32 @@ func (e *AnalyzeIndexExec) buildHistogram() (hist *statistics.Histogram, err err resp := &tipb.AnalyzeIndexResp{} err = resp.Unmarshal(data) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } hist, err = statistics.MergeHistograms(e.ctx.GetSessionVars().StmtCtx, hist, statistics.HistogramFromProto(resp.Hist), maxBucketSize) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) + } + if resp.Cms != nil { + err := cms.MergeCMSketch(statistics.CMSketchFromProto(resp.Cms)) + if err != nil { + return nil, nil, errors.Trace(err) + } } } hist.ID = e.idxInfo.ID - return hist, nil + return hist, cms, nil } func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) statistics.AnalyzeResult { - hists, err := colExec.buildHistograms() + hists, cms, err := colExec.buildStats() if err != nil { return statistics.AnalyzeResult{Err: err} } result := statistics.AnalyzeResult{ TableID: colExec.tblInfo.ID, Hist: hists, + Cms: cms, } hist := hists[0] result.Count = hist.NullCount @@ -292,13 +304,14 @@ func (e *AnalyzeColumnsExec) open() error { return nil } -func (e *AnalyzeColumnsExec) buildHistograms() (hists []*statistics.Histogram, err error) { +func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []*statistics.CMSketch, err error) { if err = e.open(); err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } defer func() { if err1 := e.result.Close(); err1 != nil { hists = nil + cms = nil err = errors.Trace(err1) } }() @@ -307,14 +320,15 @@ func (e *AnalyzeColumnsExec) buildHistograms() (hists []*statistics.Histogram, e for i := range collectors { collectors[i] = &statistics.SampleCollector{ IsMerger: true, - Sketch: statistics.NewFMSketch(maxSketchSize), + FMSketch: statistics.NewFMSketch(maxSketchSize), MaxSampleSize: maxSampleSize, + CMSketch: statistics.NewCMSketch(defaultCMSketchDepth, defaultCMSketchWidth), } } for { data, err1 := e.result.NextRaw() if err1 != nil { - return nil, errors.Trace(err1) + return nil, nil, errors.Trace(err1) } if data == nil { break @@ -322,12 +336,12 @@ func (e *AnalyzeColumnsExec) buildHistograms() (hists []*statistics.Histogram, e resp := &tipb.AnalyzeColumnsResp{} err = resp.Unmarshal(data) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } if e.pkInfo != nil { pkHist, err = statistics.MergeHistograms(e.ctx.GetSessionVars().StmtCtx, pkHist, statistics.HistogramFromProto(resp.PkHist), maxBucketSize) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } } for i, rc := range resp.Collectors { @@ -340,27 +354,29 @@ func (e *AnalyzeColumnsExec) buildHistograms() (hists []*statistics.Histogram, e for i, bkt := range pkHist.Buckets { pkHist.Buckets[i].LowerBound, err = tablecodec.DecodeColumnValue(bkt.LowerBound.GetBytes(), &e.pkInfo.FieldType, timeZone) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } pkHist.Buckets[i].UpperBound, err = tablecodec.DecodeColumnValue(bkt.UpperBound.GetBytes(), &e.pkInfo.FieldType, timeZone) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } } hists = append(hists, pkHist) + cms = append(cms, nil) } for i, col := range e.colsInfo { for j, s := range collectors[i].Samples { collectors[i].Samples[j], err = tablecodec.DecodeColumnValue(s.GetBytes(), &col.FieldType, timeZone) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } } hg, err := statistics.BuildColumn(e.ctx, maxBucketSize, col.ID, collectors[i]) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } hists = append(hists, hg) + cms = append(cms, collectors[i].CMSketch) } - return hists, nil + return hists, cms, nil } diff --git a/executor/builder.go b/executor/builder.go index 395a299762..b5fc0bbd2a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -834,6 +834,12 @@ func (b *executorBuilder) buildAnalyzeIndexPushdown(task plan.AnalyzeIndexTask) BucketSize: maxBucketSize, NumColumns: int32(len(task.IndexInfo.Columns)), } + if !task.IndexInfo.Unique { + depth := int32(defaultCMSketchDepth) + width := int32(defaultCMSketchWidth) + e.analyzePB.IdxReq.CmsketchDepth = &depth + e.analyzePB.IdxReq.CmsketchWidth = &width + } return e } @@ -859,11 +865,15 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plan.AnalyzeColumnsTa TimeZoneOffset: timeZoneOffset(b.ctx), }, } + depth := int32(defaultCMSketchDepth) + width := int32(defaultCMSketchWidth) e.analyzePB.ColReq = &tipb.AnalyzeColumnsReq{ - BucketSize: maxBucketSize, - SampleSize: maxRegionSampleSize, - SketchSize: maxSketchSize, - ColumnsInfo: distsql.ColumnsToProto(cols, task.TableInfo.PKIsHandle), + BucketSize: maxBucketSize, + SampleSize: maxRegionSampleSize, + SketchSize: maxSketchSize, + ColumnsInfo: distsql.ColumnsToProto(cols, task.TableInfo.PKIsHandle), + CmsketchDepth: &depth, + CmsketchWidth: &width, } b.err = setPBColumnsDefaultValue(b.ctx, e.analyzePB.ColReq.ColumnsInfo, cols) return e diff --git a/session.go b/session.go index 2a58325a5d..5740e7fb41 100644 --- a/session.go +++ b/session.go @@ -1125,7 +1125,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er const ( notBootstrapped = 0 - currentBootstrapVersion = 15 + currentBootstrapVersion = 16 ) func getStoreBootstrapVersion(store kv.Storage) int64 { diff --git a/statistics/builder.go b/statistics/builder.go index b5b622a8d0..ea446e0f24 100644 --- a/statistics/builder.go +++ b/statistics/builder.go @@ -15,11 +15,9 @@ package statistics import ( "github.com/juju/errors" - "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/codec" ) // SortedBuilder is used to build histograms for PK and index. @@ -108,30 +106,6 @@ func (b *SortedBuilder) Iterate(data types.Datum) error { return nil } -// BuildIndex builds histogram for index. -func BuildIndex(ctx context.Context, numBuckets, id int64, records ast.RecordSet) (int64, *Histogram, error) { - b := NewSortedBuilder(ctx.GetSessionVars().StmtCtx, numBuckets, id) - for { - row, err := records.Next() - if err != nil { - return 0, nil, errors.Trace(err) - } - if row == nil { - break - } - bytes, err := codec.EncodeKey(nil, row.Data...) - if err != nil { - return 0, nil, errors.Trace(err) - } - data := types.NewBytesDatum(bytes) - err = b.Iterate(data) - if err != nil { - return 0, nil, errors.Trace(err) - } - } - return b.Count, b.Hist(), nil -} - // BuildColumn builds histogram from samples for column. func BuildColumn(ctx context.Context, numBuckets, id int64, collector *SampleCollector) (*Histogram, error) { count := collector.Count @@ -144,7 +118,7 @@ func BuildColumn(ctx context.Context, numBuckets, id int64, collector *SampleCol if err != nil { return nil, errors.Trace(err) } - ndv := collector.Sketch.NDV() + ndv := collector.FMSketch.NDV() if ndv > count { ndv = count } @@ -205,6 +179,7 @@ func BuildColumn(ctx context.Context, numBuckets, id int64, collector *SampleCol type AnalyzeResult struct { TableID int64 Hist []*Histogram + Cms []*CMSketch Count int64 IsIndex int Err error diff --git a/statistics/cmsketch.go b/statistics/cmsketch.go index 25435391a1..a2b7cebc1f 100644 --- a/statistics/cmsketch.go +++ b/statistics/cmsketch.go @@ -21,6 +21,7 @@ import ( "github.com/juju/errors" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tipb/go-tipb" "github.com/spaolacci/murmur3" ) @@ -33,7 +34,8 @@ type CMSketch struct { table [][]uint32 } -func newCMSketch(d, w int32) *CMSketch { +// NewCMSketch returns a new CM sketch. +func NewCMSketch(d, w int32) *CMSketch { tbl := make([][]uint32, d) for i := range tbl { tbl[i] = make([]uint32, w) @@ -41,25 +43,25 @@ func newCMSketch(d, w int32) *CMSketch { return &CMSketch{depth: d, width: w, table: tbl} } -func (c *CMSketch) insert(val *types.Datum) error { - bytes, err := codec.EncodeValue(nil, *val) - if err != nil { - return errors.Trace(err) - } +// InsertBytes inserts the bytes value into the CM Sketch. +func (c *CMSketch) InsertBytes(bytes []byte) { c.count++ h1, h2 := murmur3.Sum128(bytes) for i := range c.table { j := (h1 + h2*uint64(i)) % uint64(c.width) c.table[i][j]++ } - return nil } -func (c *CMSketch) query(val *types.Datum) (uint32, error) { - bytes, err := codec.EncodeValue(nil, *val) +func (c *CMSketch) queryValue(val types.Datum) (uint32, error) { + bytes, err := codec.EncodeValue(nil, val) if err != nil { return 0, errors.Trace(err) } + return c.queryBytes(bytes), nil +} + +func (c *CMSketch) queryBytes(bytes []byte) uint32 { h1, h2 := murmur3.Sum128(bytes) vals := make([]uint32, c.depth) min := uint32(math.MaxUint32) @@ -78,12 +80,13 @@ func (c *CMSketch) query(val *types.Datum) (uint32, error) { sort.Sort(sortutil.Uint32Slice(vals)) res := vals[(c.depth-1)/2] + (vals[c.depth/2]-vals[(c.depth-1)/2])/2 if res > min { - return min, nil + return min } - return res, nil + return res } -func (c *CMSketch) mergeCMSketch(rc *CMSketch) error { +// MergeCMSketch merges two CM Sketch. +func (c *CMSketch) MergeCMSketch(rc *CMSketch) error { if c.depth != rc.depth || c.width != rc.width { return errors.New("Dimensions of Count-Min Sketch should be the same") } @@ -95,3 +98,71 @@ func (c *CMSketch) mergeCMSketch(rc *CMSketch) error { } return nil } + +// CMSketchToProto converts CMSketch to its protobuf representation. +func CMSketchToProto(c *CMSketch) *tipb.CMSketch { + protoSketch := &tipb.CMSketch{Rows: make([]*tipb.CMSketchRow, c.depth)} + for i := range c.table { + protoSketch.Rows[i] = &tipb.CMSketchRow{Counters: make([]uint32, c.width)} + for j := range c.table[i] { + protoSketch.Rows[i].Counters[j] = c.table[i][j] + } + } + return protoSketch +} + +// CMSketchFromProto converts CMSketch from its protobuf representation. +func CMSketchFromProto(protoSketch *tipb.CMSketch) *CMSketch { + c := NewCMSketch(int32(len(protoSketch.Rows)), int32(len(protoSketch.Rows[0].Counters))) + for i, row := range protoSketch.Rows { + c.count = 0 + for j, counter := range row.Counters { + c.table[i][j] = counter + c.count = c.count + uint64(counter) + } + } + return c +} + +func encodeCMSketch(c *CMSketch) ([]byte, error) { + if c == nil || c.count == 0 { + return nil, nil + } + p := CMSketchToProto(c) + return p.Marshal() +} + +func decodeCMSketch(data []byte) (*CMSketch, error) { + if data == nil { + return nil, nil + } + p := &tipb.CMSketch{} + err := p.Unmarshal(data) + if err != nil { + return nil, errors.Trace(err) + } + if len(p.Rows) == 0 { + return nil, nil + } + return CMSketchFromProto(p), nil +} + +// TotalCount returns the count, it is only used for test. +func (c *CMSketch) TotalCount() uint64 { + return c.count +} + +// Equal tests if two CM Sketch equal, it is only used for test. +func (c *CMSketch) Equal(rc *CMSketch) bool { + if c.width != rc.width || c.depth != rc.depth { + return false + } + for i := range c.table { + for j := range c.table[i] { + if c.table[i][j] != rc.table[i][j] { + return false + } + } + } + return true +} diff --git a/statistics/cmsketch_test.go b/statistics/cmsketch_test.go index f1b75be804..e917cc582f 100644 --- a/statistics/cmsketch_test.go +++ b/statistics/cmsketch_test.go @@ -20,10 +20,20 @@ import ( "github.com/juju/errors" . "github.com/pingcap/check" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" ) +func (c *CMSketch) insert(val *types.Datum) error { + bytes, err := codec.EncodeValue(nil, *val) + if err != nil { + return errors.Trace(err) + } + c.InsertBytes(bytes) + return nil +} + func buildCMSketchAndMap(d, w int32, total, imax uint64, s float64) (*CMSketch, map[int64]uint32, error) { - cms := newCMSketch(d, w) + cms := NewCMSketch(d, w) mp := make(map[int64]uint32) zipf := rand.NewZipf(rand.New(rand.NewSource(time.Now().UnixNano())), s, 1, imax) for i := uint64(0); i < total; i++ { @@ -40,8 +50,7 @@ func buildCMSketchAndMap(d, w int32, total, imax uint64, s float64) (*CMSketch, func averageAbsoluteError(cms *CMSketch, mp map[int64]uint32) (uint64, error) { var total uint64 for num, count := range mp { - val := types.NewIntDatum(num) - estimate, err := cms.query(&val) + estimate, err := cms.queryValue(types.NewIntDatum(num)) if err != nil { return 0, errors.Trace(err) } @@ -89,7 +98,7 @@ func (s *testStatisticsSuite) TestCMSketch(c *C) { c.Assert(err, IsNil) c.Check(avg, Less, t.avgError) - err = lSketch.mergeCMSketch(rSketch) + err = lSketch.MergeCMSketch(rSketch) c.Assert(err, IsNil) for val, count := range rMap { lMap[val] += count @@ -99,3 +108,13 @@ func (s *testStatisticsSuite) TestCMSketch(c *C) { c.Check(avg, Less, t.avgError*2) } } + +func (s *testStatisticsSuite) TestCMSketchCoding(c *C) { + lSketch, _, err := buildCMSketchAndMap(8, 2048, 1000, 1000, 1.1) + c.Assert(err, IsNil) + bytes, err := encodeCMSketch(lSketch) + c.Assert(err, IsNil) + rSketch, err := decodeCMSketch(bytes) + c.Assert(err, IsNil) + c.Assert(lSketch.Equal(rSketch), IsTrue) +} diff --git a/statistics/handle.go b/statistics/handle.go index 0f77007744..b767bafcb3 100644 --- a/statistics/handle.go +++ b/statistics/handle.go @@ -171,7 +171,11 @@ func (h *Handle) LoadNeededHistograms() error { if err != nil { return errors.Trace(err) } - tbl.Columns[c.ID] = &Column{Histogram: *hg, Info: c.Info, Count: int64(hg.totalRowCount())} + cms, err := h.cmSketchFromStorage(col.tableID, 0, col.columnID) + if err != nil { + return errors.Trace(err) + } + tbl.Columns[c.ID] = &Column{Histogram: *hg, Info: c.Info, CMSketch: cms, Count: int64(hg.totalRowCount())} h.UpdateTableStats([]*Table{tbl}, nil) histogramNeededColumns.delete(col) } diff --git a/statistics/handle_test.go b/statistics/handle_test.go index e5f5229d00..53654877cc 100644 --- a/statistics/handle_test.go +++ b/statistics/handle_test.go @@ -114,10 +114,12 @@ func assertTableEqual(c *C, a *statistics.Table, b *statistics.Table) { c.Assert(len(a.Columns), Equals, len(b.Columns)) for i := range a.Columns { assertHistogramEqual(c, a.Columns[i].Histogram, b.Columns[i].Histogram) + c.Assert(a.Columns[i].CMSketch.Equal(b.Columns[i].CMSketch), IsTrue) } c.Assert(len(a.Indices), Equals, len(b.Indices)) for i := range a.Indices { assertHistogramEqual(c, a.Indices[i].Histogram, b.Indices[i].Histogram) + c.Assert(a.Indices[i].CMSketch.Equal(b.Indices[i].CMSketch), IsTrue) } } @@ -338,7 +340,7 @@ func (s *testStatsCacheSuite) TestLoadHist(c *C) { c.Assert(newStatsTbl2.Columns[int64(3)].LastUpdateVersion, Greater, newStatsTbl2.Columns[int64(1)].LastUpdateVersion) } -func (s *testStatsUpdateSuite) TestLoadHistogram(c *C) { +func (s *testStatsUpdateSuite) TestLoadStats(c *C) { store, do, err := newStoreWithBootstrap(10 * time.Millisecond) c.Assert(err, IsNil) defer store.Close() @@ -358,10 +360,16 @@ func (s *testStatsUpdateSuite) TestLoadHistogram(c *C) { stat := h.GetTableStats(tableInfo.ID) hg := stat.Columns[tableInfo.Columns[0].ID].Histogram c.Assert(len(hg.Buckets), Greater, 0) + cms := stat.Columns[tableInfo.Columns[0].ID].CMSketch + c.Assert(cms, IsNil) hg = stat.Indices[tableInfo.Indices[0].ID].Histogram c.Assert(len(hg.Buckets), Greater, 0) + cms = stat.Indices[tableInfo.Indices[0].ID].CMSketch + c.Assert(cms.TotalCount(), Greater, uint64(0)) hg = stat.Columns[tableInfo.Columns[2].ID].Histogram c.Assert(len(hg.Buckets), Equals, 0) + cms = stat.Columns[tableInfo.Columns[2].ID].CMSketch + c.Assert(cms, IsNil) _, err = stat.ColumnEqualRowCount(testKit.Se.GetSessionVars().StmtCtx, types.NewIntDatum(1), tableInfo.Columns[2].ID) c.Assert(err, IsNil) time.Sleep(1 * time.Second) diff --git a/statistics/histogram.go b/statistics/histogram.go index 2b775c04dd..a256d005b6 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -63,8 +63,8 @@ type Bucket struct { commonPfxLen int // when the bucket value type is KindString or KindBytes, commonPfxLen is the common prefix length of the lower bound and upper bound. } -// SaveToStorage saves the histogram to storage. -func (hg *Histogram) SaveToStorage(ctx context.Context, tableID int64, count int64, isIndex int) error { +// SaveStatsToStorage saves the stats to storage. +func SaveStatsToStorage(ctx context.Context, tableID int64, count int64, isIndex int, hg *Histogram, cms *CMSketch) error { goCtx := ctx.GoCtx() if goCtx == nil { goCtx = goctx.Background() @@ -81,7 +81,12 @@ func (hg *Histogram) SaveToStorage(ctx context.Context, tableID int64, count int if err != nil { return errors.Trace(err) } - replaceSQL = fmt.Sprintf("replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count) values (%d, %d, %d, %d, %d, %d)", tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount) + data, err := encodeCMSketch(cms) + if err != nil { + return errors.Trace(err) + } + replaceSQL = fmt.Sprintf("replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch) values (%d, %d, %d, %d, %d, %d, X'%X')", + tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data) _, err = exec.Execute(goCtx, replaceSQL) if err != nil { return errors.Trace(err) @@ -461,6 +466,7 @@ func MergeHistograms(sc *variable.StatementContext, lh *Histogram, rh *Histogram // Column represents a column histogram. type Column struct { Histogram + *CMSketch Count int64 Info *model.ColumnInfo } @@ -469,6 +475,15 @@ func (c *Column) String() string { return c.Histogram.toString(false) } +func (c *Column) equalRowCount(sc *variable.StatementContext, val types.Datum) (float64, error) { + if c.CMSketch != nil { + count, err := c.CMSketch.queryValue(val) + return float64(count), errors.Trace(err) + } + count, err := c.Histogram.equalRowCount(sc, val) + return count, errors.Trace(err) +} + // getIntColumnRowCount estimates the row count by a slice of IntColumnRange. func (c *Column) getIntColumnRowCount(sc *variable.StatementContext, intRanges []types.IntColumnRange, totalRowCount float64) (float64, error) { @@ -555,6 +570,7 @@ func (c *Column) getColumnRowCount(sc *variable.StatementContext, ranges []*type // Index represents an index histogram. type Index struct { Histogram + *CMSketch Info *model.IndexInfo } @@ -562,6 +578,14 @@ func (idx *Index) String() string { return idx.Histogram.toString(true) } +func (idx *Index) equalRowCount(sc *variable.StatementContext, b []byte) (float64, error) { + if idx.CMSketch != nil { + return float64(idx.CMSketch.queryBytes(b)), nil + } + count, err := idx.Histogram.equalRowCount(sc, types.NewBytesDatum(b)) + return count, errors.Trace(err) +} + func (idx *Index) getRowCount(sc *variable.StatementContext, indexRanges []*types.IndexRange) (float64, error) { totalCount := float64(0) for _, indexRange := range indexRanges { @@ -576,7 +600,7 @@ func (idx *Index) getRowCount(sc *variable.StatementContext, indexRanges []*type } if bytes.Equal(lb, rb) { if !indexRange.LowExclude && !indexRange.HighExclude { - rowCount, err1 := idx.equalRowCount(sc, types.NewBytesDatum(lb)) + rowCount, err1 := idx.equalRowCount(sc, lb) if err1 != nil { return 0, errors.Trace(err1) } diff --git a/statistics/sample.go b/statistics/sample.go index ebaf65541d..6f3f6fb315 100644 --- a/statistics/sample.go +++ b/statistics/sample.go @@ -32,14 +32,19 @@ type SampleCollector struct { NullCount int64 Count int64 // Count is the number of non-null rows. MaxSampleSize int64 - Sketch *FMSketch + FMSketch *FMSketch + CMSketch *CMSketch } // MergeSampleCollector merges two sample collectors. func (c *SampleCollector) MergeSampleCollector(rc *SampleCollector) { c.NullCount += rc.NullCount c.Count += rc.Count - c.Sketch.mergeFMSketch(rc.Sketch) + c.FMSketch.mergeFMSketch(rc.FMSketch) + if rc.CMSketch != nil { + err := c.CMSketch.MergeCMSketch(rc.CMSketch) + terror.Log(errors.Trace(err)) + } for _, val := range rc.Samples { err := c.collect(val) terror.Log(errors.Trace(err)) @@ -51,7 +56,10 @@ func SampleCollectorToProto(c *SampleCollector) *tipb.SampleCollector { collector := &tipb.SampleCollector{ NullCount: c.NullCount, Count: c.Count, - FmSketch: FMSketchToProto(c.Sketch), + FmSketch: FMSketchToProto(c.FMSketch), + } + if c.CMSketch != nil { + collector.CmSketch = CMSketchToProto(c.CMSketch) } for _, sample := range c.Samples { collector.Samples = append(collector.Samples, sample.GetBytes()) @@ -64,7 +72,10 @@ func SampleCollectorFromProto(collector *tipb.SampleCollector) *SampleCollector s := &SampleCollector{ NullCount: collector.NullCount, Count: collector.Count, - Sketch: FMSketchFromProto(collector.FmSketch), + FMSketch: FMSketchFromProto(collector.FmSketch), + } + if collector.CmSketch != nil { + s.CMSketch = CMSketchFromProto(collector.CmSketch) } for _, val := range collector.Samples { s.Samples = append(s.Samples, types.NewBytesDatum(val)) @@ -79,9 +90,12 @@ func (c *SampleCollector) collect(d types.Datum) error { return nil } c.Count++ - if err := c.Sketch.InsertValue(d); err != nil { + if err := c.FMSketch.InsertValue(d); err != nil { return errors.Trace(err) } + if c.CMSketch != nil { + c.CMSketch.InsertBytes(d.GetBytes()) + } } c.seenValues++ // The following code use types.CopyDatum(d) because d may have a deep reference @@ -102,21 +116,23 @@ func (c *SampleCollector) collect(d types.Datum) error { // SampleBuilder is used to build samples for columns. // Also, if primary key is handle, it will directly build histogram for it. type SampleBuilder struct { - Sc *variable.StatementContext - RecordSet ast.RecordSet - ColLen int // ColLen is the number of columns need to be sampled. - PkID int64 // If primary key is handle, the PkID is the id of the primary key. If not exists, it is -1. - MaxBucketSize int64 - MaxSampleSize int64 - MaxSketchSize int64 + Sc *variable.StatementContext + RecordSet ast.RecordSet + ColLen int // ColLen is the number of columns need to be sampled. + PkID int64 // If primary key is handle, the PkID is the id of the primary key. If not exists, it is -1. + MaxBucketSize int64 + MaxSampleSize int64 + MaxFMSketchSize int64 + CMSketchDepth int32 + CMSketchWidth int32 } -// CollectSamplesAndEstimateNDVs collects sample from the result set using Reservoir Sampling algorithm, +// CollectColumnStats collects sample from the result set using Reservoir Sampling algorithm, // and estimates NDVs using FM Sketch during the collecting process. -// It returns the sample collectors which contain total count, null count and distinct values count. +// It returns the sample collectors which contain total count, null count, distinct values count and CM Sketch. // It also returns the statistic builder for PK which contains the histogram. // See https://en.wikipedia.org/wiki/Reservoir_sampling -func (s SampleBuilder) CollectSamplesAndEstimateNDVs() ([]*SampleCollector, *SortedBuilder, error) { +func (s SampleBuilder) CollectColumnStats() ([]*SampleCollector, *SortedBuilder, error) { var pkBuilder *SortedBuilder if s.PkID != -1 { pkBuilder = NewSortedBuilder(s.Sc, s.MaxBucketSize, s.PkID) @@ -125,7 +141,12 @@ func (s SampleBuilder) CollectSamplesAndEstimateNDVs() ([]*SampleCollector, *Sor for i := range collectors { collectors[i] = &SampleCollector{ MaxSampleSize: s.MaxSampleSize, - Sketch: NewFMSketch(int(s.MaxSketchSize)), + FMSketch: NewFMSketch(int(s.MaxFMSketchSize)), + } + } + if s.CMSketchDepth > 0 && s.CMSketchWidth > 0 { + for i := range collectors { + collectors[i].CMSketch = NewCMSketch(s.CMSketchDepth, s.CMSketchWidth) } } for { diff --git a/statistics/sample_test.go b/statistics/sample_test.go index 275a484aeb..2ad1aa6f57 100644 --- a/statistics/sample_test.go +++ b/statistics/sample_test.go @@ -71,60 +71,68 @@ func (s *testSampleSuite) SetUpSuite(c *C) { s.rs = rs } -func (s *testSampleSuite) TestCollectSamplesAndEstimateNDVs(c *C) { +func (s *testSampleSuite) TestCollectColumnStats(c *C) { builder := statistics.SampleBuilder{ - Sc: mock.NewContext().GetSessionVars().StmtCtx, - RecordSet: s.rs, - ColLen: 1, - PkID: 1, - MaxSampleSize: 10000, - MaxBucketSize: 256, - MaxSketchSize: 1000, + Sc: mock.NewContext().GetSessionVars().StmtCtx, + RecordSet: s.rs, + ColLen: 1, + PkID: 1, + MaxSampleSize: 10000, + MaxBucketSize: 256, + MaxFMSketchSize: 1000, + CMSketchWidth: 2048, + CMSketchDepth: 8, } s.rs.Close() - collectors, pkBuilder, err := builder.CollectSamplesAndEstimateNDVs() + collectors, pkBuilder, err := builder.CollectColumnStats() c.Assert(err, IsNil) c.Assert(collectors[0].NullCount+collectors[0].Count, Equals, int64(s.count)) - c.Assert(collectors[0].Sketch.NDV(), Equals, int64(6232)) + c.Assert(collectors[0].FMSketch.NDV(), Equals, int64(6232)) + c.Assert(collectors[0].CMSketch.TotalCount(), Equals, uint64(collectors[0].Count)) c.Assert(int64(pkBuilder.Count), Equals, int64(s.count)) c.Assert(pkBuilder.Hist().NDV, Equals, int64(s.count)) } func (s *testSampleSuite) TestMergeSampleCollector(c *C) { builder := statistics.SampleBuilder{ - Sc: mock.NewContext().GetSessionVars().StmtCtx, - RecordSet: s.rs, - ColLen: 2, - PkID: -1, - MaxSampleSize: 1000, - MaxBucketSize: 256, - MaxSketchSize: 1000, + Sc: mock.NewContext().GetSessionVars().StmtCtx, + RecordSet: s.rs, + ColLen: 2, + PkID: -1, + MaxSampleSize: 1000, + MaxBucketSize: 256, + MaxFMSketchSize: 1000, + CMSketchWidth: 2048, + CMSketchDepth: 8, } s.rs.Close() - collectors, pkBuilder, err := builder.CollectSamplesAndEstimateNDVs() + collectors, pkBuilder, err := builder.CollectColumnStats() c.Assert(err, IsNil) c.Assert(pkBuilder, IsNil) c.Assert(len(collectors), Equals, 2) collectors[0].IsMerger = true collectors[0].MergeSampleCollector(collectors[1]) - c.Assert(collectors[0].Sketch.NDV(), Equals, int64(9280)) + c.Assert(collectors[0].FMSketch.NDV(), Equals, int64(9280)) c.Assert(len(collectors[0].Samples), Equals, 1000) c.Assert(collectors[0].NullCount, Equals, int64(1000)) c.Assert(collectors[0].Count, Equals, int64(19000)) + c.Assert(collectors[0].CMSketch.TotalCount(), Equals, uint64(collectors[0].Count)) } func (s *testSampleSuite) TestCollectorProtoConversion(c *C) { builder := statistics.SampleBuilder{ - Sc: mock.NewContext().GetSessionVars().StmtCtx, - RecordSet: s.rs, - ColLen: 2, - PkID: -1, - MaxSampleSize: 10000, - MaxBucketSize: 256, - MaxSketchSize: 1000, + Sc: mock.NewContext().GetSessionVars().StmtCtx, + RecordSet: s.rs, + ColLen: 2, + PkID: -1, + MaxSampleSize: 10000, + MaxBucketSize: 256, + MaxFMSketchSize: 1000, + CMSketchWidth: 2048, + CMSketchDepth: 8, } s.rs.Close() - collectors, pkBuilder, err := builder.CollectSamplesAndEstimateNDVs() + collectors, pkBuilder, err := builder.CollectColumnStats() c.Assert(err, IsNil) c.Assert(pkBuilder, IsNil) for _, collector := range collectors { @@ -132,7 +140,8 @@ func (s *testSampleSuite) TestCollectorProtoConversion(c *C) { s := statistics.SampleCollectorFromProto(p) c.Assert(collector.Count, Equals, s.Count) c.Assert(collector.NullCount, Equals, s.NullCount) - c.Assert(collector.Sketch.NDV(), Equals, s.Sketch.NDV()) + c.Assert(collector.CMSketch.TotalCount(), Equals, s.CMSketch.TotalCount()) + c.Assert(collector.FMSketch.NDV(), Equals, s.FMSketch.NDV()) c.Assert(len(collector.Samples), Equals, len(s.Samples)) } } diff --git a/statistics/statistics_test.go b/statistics/statistics_test.go index 239784a97d..8a345b67ee 100644 --- a/statistics/statistics_test.go +++ b/statistics/statistics_test.go @@ -149,6 +149,31 @@ func buildPK(ctx context.Context, numBuckets, id int64, records ast.RecordSet) ( return b.Count, b.hist, nil } +func buildIndex(ctx context.Context, numBuckets, id int64, records ast.RecordSet) (int64, *Histogram, *CMSketch, error) { + b := NewSortedBuilder(ctx.GetSessionVars().StmtCtx, numBuckets, id) + cms := NewCMSketch(8, 2048) + for { + row, err := records.Next() + if err != nil { + return 0, nil, nil, errors.Trace(err) + } + if row == nil { + break + } + bytes, err := codec.EncodeKey(nil, row.Data...) + if err != nil { + return 0, nil, nil, errors.Trace(err) + } + data := types.NewBytesDatum(bytes) + err = b.Iterate(data) + if err != nil { + return 0, nil, nil, errors.Trace(err) + } + cms.InsertBytes(bytes) + } + return b.Count, b.Hist(), cms, nil +} + func calculateScalar(hist *Histogram) { for i, bkt := range hist.Buckets { bkt.lowerScalar, bkt.upperScalar, bkt.commonPfxLen = preCalculateDatumScalar(&bkt.LowerBound, &bkt.UpperBound) @@ -172,7 +197,7 @@ func (s *testStatisticsSuite) TestBuild(c *C) { Count: s.count, NullCount: 0, Samples: s.samples, - Sketch: sketch, + FMSketch: sketch, } col, err := BuildColumn(ctx, bucketCount, 2, collector) checkRepeats(c, col) @@ -208,22 +233,22 @@ func (s *testStatisticsSuite) TestBuild(c *C) { c.Check(int(count), Equals, 9) builder := SampleBuilder{ - Sc: mock.NewContext().GetSessionVars().StmtCtx, - RecordSet: s.pk, - ColLen: 1, - PkID: -1, - MaxSampleSize: 1000, - MaxSketchSize: 1000, + Sc: mock.NewContext().GetSessionVars().StmtCtx, + RecordSet: s.pk, + ColLen: 1, + PkID: -1, + MaxSampleSize: 1000, + MaxFMSketchSize: 1000, } s.pk.Close() - collectors, _, err := builder.CollectSamplesAndEstimateNDVs() + collectors, _, err := builder.CollectColumnStats() c.Assert(err, IsNil) c.Assert(len(collectors), Equals, 1) col, err = BuildColumn(mock.NewContext(), 256, 2, collectors[0]) c.Assert(err, IsNil) checkRepeats(c, col) - tblCount, col, err := BuildIndex(ctx, bucketCount, 1, ast.RecordSet(s.rc)) + tblCount, col, _, err := buildIndex(ctx, bucketCount, 1, ast.RecordSet(s.rc)) checkRepeats(c, col) calculateScalar(col) c.Check(err, IsNil) @@ -276,7 +301,7 @@ func (s *testStatisticsSuite) TestBuild(c *C) { func (s *testStatisticsSuite) TestHistogramProtoConversion(c *C) { ctx := mock.NewContext() s.rc.Close() - tblCount, col, err := BuildIndex(ctx, 256, 1, ast.RecordSet(s.rc)) + tblCount, col, _, err := buildIndex(ctx, 256, 1, ast.RecordSet(s.rc)) c.Check(err, IsNil) c.Check(int(tblCount), Equals, 100000) @@ -384,6 +409,14 @@ func (s *testStatisticsSuite) TestPseudoTable(c *C) { c.Assert(int(count), Equals, 250) } +func buildCMSketch(values []types.Datum) *CMSketch { + cms := NewCMSketch(8, 2048) + for _, val := range values { + cms.insert(&val) + } + return cms +} + func (s *testStatisticsSuite) TestColumnRange(c *C) { bucketCount := int64(256) sketch, _, _ := buildFMSketch(s.rc.(*recordSet).data, 1000) @@ -394,12 +427,12 @@ func (s *testStatisticsSuite) TestColumnRange(c *C) { Count: s.count, NullCount: 0, Samples: s.samples, - Sketch: sketch, + FMSketch: sketch, } hg, err := BuildColumn(ctx, bucketCount, 2, collector) calculateScalar(hg) c.Check(err, IsNil) - col := &Column{Histogram: *hg} + col := &Column{Histogram: *hg, CMSketch: buildCMSketch(s.rc.(*recordSet).data)} tbl := &Table{ Count: int64(col.totalRowCount()), Columns: make(map[int64]*Column), @@ -444,7 +477,7 @@ func (s *testStatisticsSuite) TestColumnRange(c *C) { ran[0].HighExcl = true count, err = tbl.GetRowCountByColumnRanges(sc, 0, ran) c.Assert(err, IsNil) - c.Assert(int(count), Equals, 9995) + c.Assert(int(count), Equals, 9994) ran[0].LowExcl = false ran[0].HighExcl = false count, err = tbl.GetRowCountByColumnRanges(sc, 0, ran) @@ -523,12 +556,12 @@ func (s *testStatisticsSuite) TestIndexRanges(c *C) { sc := ctx.GetSessionVars().StmtCtx s.rc.(*recordSet).cursor = 0 - rowCount, hg, err := BuildIndex(ctx, bucketCount, 0, s.rc) + rowCount, hg, cms, err := buildIndex(ctx, bucketCount, 0, s.rc) calculateScalar(hg) c.Check(err, IsNil) c.Check(rowCount, Equals, int64(100000)) idxInfo := &model.IndexInfo{Columns: []*model.IndexColumn{{Offset: 0}}} - idx := &Index{Histogram: *hg, Info: idxInfo} + idx := &Index{Histogram: *hg, CMSketch: cms, Info: idxInfo} tbl := &Table{ Count: int64(idx.totalRowCount()), Indices: make(map[int64]*Index), @@ -576,5 +609,5 @@ func (s *testStatisticsSuite) TestIndexRanges(c *C) { ran[0].HighVal[0] = types.NewIntDatum(1000) count, err = tbl.GetRowCountByIndexRanges(sc, 0, ran) c.Assert(err, IsNil) - c.Assert(int(count), Equals, 1) + c.Assert(int(count), Equals, 0) } diff --git a/statistics/table.go b/statistics/table.go index b254850f29..ae27ba1938 100644 --- a/statistics/table.go +++ b/statistics/table.go @@ -70,7 +70,19 @@ func (t *Table) copy() *Table { return nt } -func (h *Handle) indexHistogramFromStorage(row *ast.Row, table *Table, tableInfo *model.TableInfo) error { +func (h *Handle) cmSketchFromStorage(tblID int64, isIndex, histID int64) (*CMSketch, error) { + selSQL := fmt.Sprintf("select cm_sketch from mysql.stats_histograms where table_id = %d and is_index = %d and hist_id = %d", tblID, isIndex, histID) + rows, _, err := h.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(h.ctx, selSQL) + if err != nil { + return nil, errors.Trace(err) + } + if len(rows) == 0 { + return nil, nil + } + return decodeCMSketch(rows[0].Data[0].GetBytes()) +} + +func (h *Handle) indexStatsFromStorage(row *ast.Row, table *Table, tableInfo *model.TableInfo) error { histID, distinct := row.Data[2].GetInt64(), row.Data[3].GetInt64() histVer, nullCount := row.Data[4].GetUint64(), row.Data[5].GetInt64() idx := table.Indices[histID] @@ -83,7 +95,11 @@ func (h *Handle) indexHistogramFromStorage(row *ast.Row, table *Table, tableInfo if err != nil { return errors.Trace(err) } - idx = &Index{Histogram: *hg, Info: idxInfo} + cms, err := h.cmSketchFromStorage(tableInfo.ID, 1, idxInfo.ID) + if err != nil { + return errors.Trace(err) + } + idx = &Index{Histogram: *hg, CMSketch: cms, Info: idxInfo} } break } @@ -95,7 +111,7 @@ func (h *Handle) indexHistogramFromStorage(row *ast.Row, table *Table, tableInfo return nil } -func (h *Handle) columnHistogramFromStorage(row *ast.Row, table *Table, tableInfo *model.TableInfo) error { +func (h *Handle) columnStatsFromStorage(row *ast.Row, table *Table, tableInfo *model.TableInfo) error { histID, distinct := row.Data[2].GetInt64(), row.Data[3].GetInt64() histVer, nullCount := row.Data[4].GetUint64(), row.Data[5].GetInt64() col := table.Columns[histID] @@ -121,7 +137,11 @@ func (h *Handle) columnHistogramFromStorage(row *ast.Row, table *Table, tableInf if err != nil { return errors.Trace(err) } - col = &Column{Histogram: *hg, Info: colInfo, Count: int64(hg.totalRowCount())} + cms, err := h.cmSketchFromStorage(tableInfo.ID, 0, colInfo.ID) + if err != nil { + return errors.Trace(err) + } + col = &Column{Histogram: *hg, Info: colInfo, CMSketch: cms, Count: int64(hg.totalRowCount())} } break } @@ -160,11 +180,11 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo) (*Table, erro } for _, row := range rows { if row.Data[1].GetInt64() > 0 { - if err := h.indexHistogramFromStorage(row, table, tableInfo); err != nil { + if err := h.indexStatsFromStorage(row, table, tableInfo); err != nil { return nil, errors.Trace(err) } } else { - if err := h.columnHistogramFromStorage(row, table, tableInfo); err != nil { + if err := h.columnStatsFromStorage(row, table, tableInfo); err != nil { return nil, errors.Trace(err) } } diff --git a/store/tikv/mock-tikv/analyze.go b/store/tikv/mock-tikv/analyze.go index 4ed7ce3dfa..d90c4c3a82 100644 --- a/store/tikv/mock-tikv/analyze.go +++ b/store/tikv/mock-tikv/analyze.go @@ -66,6 +66,10 @@ func (h *rpcHandler) handleAnalyzeIndexReq(req *coprocessor.Request, analyzeReq IndexScan: &tipb.IndexScan{Desc: false}, } statsBuilder := statistics.NewSortedBuilder(flagsToStatementContext(analyzeReq.Flags), analyzeReq.IdxReq.BucketSize, 0) + var cms *statistics.CMSketch + if analyzeReq.IdxReq.CmsketchDepth != nil && analyzeReq.IdxReq.CmsketchWidth != nil { + cms = statistics.NewCMSketch(*analyzeReq.IdxReq.CmsketchDepth, *analyzeReq.IdxReq.CmsketchWidth) + } for { values, err := e.Next() if err != nil { @@ -82,9 +86,16 @@ func (h *rpcHandler) handleAnalyzeIndexReq(req *coprocessor.Request, analyzeReq if err != nil { return nil, errors.Trace(err) } + if cms != nil { + cms.InsertBytes(value) + } } hg := statistics.HistogramToProto(statsBuilder.Hist()) - data, err := proto.Marshal(&tipb.AnalyzeIndexResp{Hist: hg}) + var cm *tipb.CMSketch + if cms != nil { + cm = statistics.CMSketchToProto(cms) + } + data, err := proto.Marshal(&tipb.AnalyzeIndexResp{Hist: hg, Cms: cm}) if err != nil { return nil, errors.Trace(err) } @@ -119,15 +130,19 @@ func (h *rpcHandler) handleAnalyzeColumnsReq(req *coprocessor.Request, analyzeRe } colReq := analyzeReq.ColReq builder := statistics.SampleBuilder{ - Sc: sc, - RecordSet: e, - ColLen: numCols, - PkID: pkID, - MaxBucketSize: colReq.BucketSize, - MaxSketchSize: colReq.SketchSize, - MaxSampleSize: colReq.SampleSize, + Sc: sc, + RecordSet: e, + ColLen: numCols, + PkID: pkID, + MaxBucketSize: colReq.BucketSize, + MaxFMSketchSize: colReq.SketchSize, + MaxSampleSize: colReq.SampleSize, } - collectors, pkBuilder, err := builder.CollectSamplesAndEstimateNDVs() + if colReq.CmsketchWidth != nil && colReq.CmsketchDepth != nil { + builder.CMSketchWidth = *colReq.CmsketchWidth + builder.CMSketchDepth = *colReq.CmsketchDepth + } + collectors, pkBuilder, err := builder.CollectColumnStats() if err != nil { return nil, errors.Trace(err) }