From 2387127ea6859d4fa89504ff52b20cc02fa86d2e Mon Sep 17 00:00:00 2001 From: Rustin Liu Date: Fri, 27 Oct 2023 19:18:34 +0800 Subject: [PATCH] statistics: better benchmark tests for merge topN (#48006) --- pkg/statistics/cmsketch.go | 6 +- pkg/statistics/handle/globalstats/topn.go | 51 +++++-- .../handle/globalstats/topn_bench_test.go | 143 ++++++++---------- 3 files changed, 103 insertions(+), 97 deletions(-) diff --git a/pkg/statistics/cmsketch.go b/pkg/statistics/cmsketch.go index b5fe744118..9643e6b718 100644 --- a/pkg/statistics/cmsketch.go +++ b/pkg/statistics/cmsketch.go @@ -828,14 +828,12 @@ func MergeTopN(topNs []*TopN, n uint32) (*TopN, []TopNMeta) { // CheckEmptyTopNs checks whether all TopNs are empty. func CheckEmptyTopNs(topNs []*TopN) bool { - count := uint64(0) for _, topN := range topNs { - count += topN.TotalCount() - if count != 0 { + if topN.TotalCount() != 0 { return false } } - return count == 0 + return true } // SortTopnMeta sort topnMeta diff --git a/pkg/statistics/handle/globalstats/topn.go b/pkg/statistics/handle/globalstats/topn.go index d07ea09bae..9e9f14a068 100644 --- a/pkg/statistics/handle/globalstats/topn.go +++ b/pkg/statistics/handle/globalstats/topn.go @@ -45,12 +45,21 @@ func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *StatsWrap return MergeGlobalStatsTopNByConcurrency(gp, mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killer) } -// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency -// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker. -// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control -// the partition size for each worker to solve it -func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatchSize int, wrapper *StatsWrapper, - timeZone *time.Location, version int, n uint32, isIndex bool, killer *sqlkiller.SQLKiller) (*statistics.TopN, +// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency. +// To merge global stats topN by concurrency, +// we will separate the partition topN in concurrency part and deal it with different worker. +// mergeConcurrency is used to control the total concurrency of the running worker, +// and mergeBatchSize is sued to control the partition size for each worker to solve it +func MergeGlobalStatsTopNByConcurrency( + gp *gp.Pool, + mergeConcurrency, mergeBatchSize int, + wrapper *StatsWrapper, + timeZone *time.Location, + version int, + n uint32, + isIndex bool, + killer *sqlkiller.SQLKiller, +) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) { if len(wrapper.AllTopN) < mergeConcurrency { mergeConcurrency = len(wrapper.AllTopN) @@ -119,18 +128,31 @@ func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatch // MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN. // The input parameters: // 1. `topNs` are the partition-level topNs to be merged. -// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value. -// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate. +// 2. `n` is the size of the global-level topN. +// Notice: This value can be 0 and has no default value, we must explicitly specify this value. +// 3. `hists` are the partition-level histograms. +// Some values not in topN may be placed in the histogram. +// We need it here to make the value in the global-level TopN more accurate. // // The output parameters: // 1. `*TopN` is the final global-level topN. -// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter. -// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN. -func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statistics.TopN, n uint32, hists []*statistics.Histogram, - isIndex bool, killer *sqlkiller.SQLKiller) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) { +// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, +// but is not placed to global-level TopN. We should put them back to histogram latter. +// 3. `[]*Histogram` are the partition-level histograms which +// just delete some values when we merge the global-level topN. +func MergePartTopN2GlobalTopN( + loc *time.Location, + version int, + topNs []*statistics.TopN, + n uint32, + hists []*statistics.Histogram, + isIndex bool, + killer *sqlkiller.SQLKiller, +) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) { if statistics.CheckEmptyTopNs(topNs) { return nil, nil, hists, nil } + partNum := len(topNs) // Different TopN structures may hold the same value, we have to merge them. counter := make(map[hack.MutableString]float64) @@ -141,9 +163,11 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti if err := killer.HandleSignal(); err != nil { return nil, nil, nil, err } + // Ignore the empty topN. if topN.TotalCount() == 0 { continue } + for _, val := range topN.TopN { encodedVal := hack.String(val.Encoded) _, exists := counter[encodedVal] @@ -152,6 +176,7 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti // We have already calculated the encodedVal from the histogram, so just continue to next topN value. continue } + // We need to check whether the value corresponding to encodedVal is contained in other partition-level stats. // 1. Check the topN first. // 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram. @@ -159,6 +184,7 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti if err := killer.HandleSignal(); err != nil { return nil, nil, nil, err } + if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 { continue } @@ -181,6 +207,7 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti } } } + numTop := len(counter) if numTop == 0 { return nil, nil, hists, nil diff --git a/pkg/statistics/handle/globalstats/topn_bench_test.go b/pkg/statistics/handle/globalstats/topn_bench_test.go index ffad91ec22..a272bfbd4b 100644 --- a/pkg/statistics/handle/globalstats/topn_bench_test.go +++ b/pkg/statistics/handle/globalstats/topn_bench_test.go @@ -16,6 +16,7 @@ package globalstats import ( "fmt" + "math/rand" "testing" "time" @@ -30,29 +31,22 @@ import ( "github.com/tiancaiamao/gp" ) -// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats -func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) { - loc := time.UTC - sc := stmtctx.NewStmtCtxWithTimeZone(loc) - version := 1 - killer := sqlkiller.SQLKiller{} - +func prepareTopNsAndHists(b *testing.B, partitions int, tz *time.Location) ([]*statistics.TopN, []*statistics.Histogram) { + sc := stmtctx.NewStmtCtxWithTimeZone(tz) // Prepare TopNs. topNs := make([]*statistics.TopN, 0, partitions) for i := 0; i < partitions; i++ { - // Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3. - topN := statistics.NewTopN(3) + // Construct TopN, should be key1 -> rand(0, 1000), key2 -> rand(0, 1000), key3 -> rand(0, 1000)... + topN := statistics.NewTopN(500) { - key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1)) - require.NoError(b, err) - topN.AppendTopN(key1, 2) - key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2)) - require.NoError(b, err) - topN.AppendTopN(key2, 2) - if i%2 == 0 { - key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3)) + for j := 1; j <= 500; j++ { + // Randomly skip some keys for some partitions. + if i%2 == 0 && j%2 == 0 { + continue + } + key, err := codec.EncodeKey(sc, nil, types.NewIntDatum(int64(j))) require.NoError(b, err) - topN.AppendTopN(key3, 3) + topN.AppendTopN(key, uint64(rand.Intn(1000))) } } topNs = append(topNs, topN) @@ -62,68 +56,55 @@ func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) { hists := make([]*statistics.Histogram, 0, partitions) for i := 0; i < partitions; i++ { // Construct Hist - h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0) - h.Bounds.AppendInt64(0, 1) - h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20}) - h.Bounds.AppendInt64(0, 2) - h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30}) - h.Bounds.AppendInt64(0, 3) - h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30}) - h.Bounds.AppendInt64(0, 4) - h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40}) + h := statistics.NewHistogram(1, 500, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0) + for j := 1; j <= 500; j++ { + datum := types.NewIntDatum(int64(j)) + h.AppendBucket(&datum, &datum, int64(10+j*10), 10) + } hists = append(hists, h) } + return topNs, hists +} + +func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) { + loc := time.UTC + version := 1 + killer := sqlkiller.SQLKiller{} + topNs, hists := prepareTopNsAndHists(b, partitions, loc) + b.ResetTimer() for i := 0; i < b.N; i++ { - // Benchmark merge 10 topN. - _, _, _, _ = MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &killer) + // Benchmark merge 100 topN. + _, _, _, _ = MergePartTopN2GlobalTopN( + loc, + version, + topNs, + 100, + hists, + false, + &killer, + ) + } +} + +var benchmarkSizes = []int{100, 1000, 2000, 5000, 10000} + +// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats +func BenchmarkMergePartTopN2GlobalTopNWithHists(b *testing.B) { + for _, size := range benchmarkSizes { + b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) { + benchmarkMergePartTopN2GlobalTopNWithHists(size, b) + }) } } -// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *testing.B) { loc := time.UTC - sc := stmtctx.NewStmtCtxWithTimeZone(loc) version := 1 killer := sqlkiller.SQLKiller{} - // Prepare TopNs. - topNs := make([]*statistics.TopN, 0, partitions) - for i := 0; i < partitions; i++ { - // Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3. - topN := statistics.NewTopN(3) - { - key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1)) - require.NoError(b, err) - topN.AppendTopN(key1, 2) - key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2)) - require.NoError(b, err) - topN.AppendTopN(key2, 2) - if i%2 == 0 { - key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3)) - require.NoError(b, err) - topN.AppendTopN(key3, 3) - } - } - topNs = append(topNs, topN) - } - - // Prepare Hists. - hists := make([]*statistics.Histogram, 0, partitions) - for i := 0; i < partitions; i++ { - // Construct Hist - h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0) - h.Bounds.AppendInt64(0, 1) - h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20}) - h.Bounds.AppendInt64(0, 2) - h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30}) - h.Bounds.AppendInt64(0, 3) - h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30}) - h.Bounds.AppendInt64(0, 4) - h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40}) - hists = append(hists, h) - } + topNs, hists := prepareTopNsAndHists(b, partitions, loc) wrapper := NewStatsWrapper(hists, topNs) const mergeConcurrency = 4 batchSize := len(wrapper.AllTopN) / mergeConcurrency @@ -136,24 +117,24 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test defer gpool.Close() b.ResetTimer() for i := 0; i < b.N; i++ { - // Benchmark merge 10 topN. - _, _, _, _ = MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &killer) - } -} - -var benchmarkSizes = []int{100, 1000, 10000, 100000, 1000000, 10000000} -var benchmarkConcurrencySizes = []int{100, 1000, 10000, 100000} - -func BenchmarkMergePartTopN2GlobalTopNWithHists(b *testing.B) { - for _, size := range benchmarkSizes { - b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) { - benchmarkMergePartTopN2GlobalTopNWithHists(size, b) - }) + // Benchmark merge 100 topN. + _, _, _, _ = MergeGlobalStatsTopNByConcurrency( + gpool, + mergeConcurrency, + batchSize, + wrapper, + loc, + version, + 100, + false, + &killer, + ) } } +// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats func BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists(b *testing.B) { - for _, size := range benchmarkConcurrencySizes { + for _, size := range benchmarkSizes { b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) { benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(size, b) })