statistics: better benchmark tests for merge topN (#48006)

This commit is contained in:
Rustin Liu
2023-10-27 19:18:34 +08:00
committed by GitHub
parent 6d64b7e89a
commit 2387127ea6
3 changed files with 103 additions and 97 deletions

View File

@ -828,14 +828,12 @@ func MergeTopN(topNs []*TopN, n uint32) (*TopN, []TopNMeta) {
// CheckEmptyTopNs checks whether all TopNs are empty.
func CheckEmptyTopNs(topNs []*TopN) bool {
count := uint64(0)
for _, topN := range topNs {
count += topN.TotalCount()
if count != 0 {
if topN.TotalCount() != 0 {
return false
}
}
return count == 0
return true
}
// SortTopnMeta sort topnMeta

View File

@ -45,12 +45,21 @@ func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *StatsWrap
return MergeGlobalStatsTopNByConcurrency(gp, mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killer)
}
// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker.
// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control
// the partition size for each worker to solve it
func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatchSize int, wrapper *StatsWrapper,
timeZone *time.Location, version int, n uint32, isIndex bool, killer *sqlkiller.SQLKiller) (*statistics.TopN,
// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency.
// To merge global stats topN by concurrency,
// we will separate the partition topN in concurrency part and deal it with different worker.
// mergeConcurrency is used to control the total concurrency of the running worker,
// and mergeBatchSize is sued to control the partition size for each worker to solve it
func MergeGlobalStatsTopNByConcurrency(
gp *gp.Pool,
mergeConcurrency, mergeBatchSize int,
wrapper *StatsWrapper,
timeZone *time.Location,
version int,
n uint32,
isIndex bool,
killer *sqlkiller.SQLKiller,
) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
if len(wrapper.AllTopN) < mergeConcurrency {
mergeConcurrency = len(wrapper.AllTopN)
@ -119,18 +128,31 @@ func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatch
// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN.
// The input parameters:
// 1. `topNs` are the partition-level topNs to be merged.
// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value.
// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate.
// 2. `n` is the size of the global-level topN.
// Notice: This value can be 0 and has no default value, we must explicitly specify this value.
// 3. `hists` are the partition-level histograms.
// Some values not in topN may be placed in the histogram.
// We need it here to make the value in the global-level TopN more accurate.
//
// The output parameters:
// 1. `*TopN` is the final global-level topN.
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statistics.TopN, n uint32, hists []*statistics.Histogram,
isIndex bool, killer *sqlkiller.SQLKiller) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) {
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs,
// but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which
// just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(
loc *time.Location,
version int,
topNs []*statistics.TopN,
n uint32,
hists []*statistics.Histogram,
isIndex bool,
killer *sqlkiller.SQLKiller,
) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) {
if statistics.CheckEmptyTopNs(topNs) {
return nil, nil, hists, nil
}
partNum := len(topNs)
// Different TopN structures may hold the same value, we have to merge them.
counter := make(map[hack.MutableString]float64)
@ -141,9 +163,11 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti
if err := killer.HandleSignal(); err != nil {
return nil, nil, nil, err
}
// Ignore the empty topN.
if topN.TotalCount() == 0 {
continue
}
for _, val := range topN.TopN {
encodedVal := hack.String(val.Encoded)
_, exists := counter[encodedVal]
@ -152,6 +176,7 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti
// We have already calculated the encodedVal from the histogram, so just continue to next topN value.
continue
}
// We need to check whether the value corresponding to encodedVal is contained in other partition-level stats.
// 1. Check the topN first.
// 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram.
@ -159,6 +184,7 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti
if err := killer.HandleSignal(); err != nil {
return nil, nil, nil, err
}
if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 {
continue
}
@ -181,6 +207,7 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti
}
}
}
numTop := len(counter)
if numTop == 0 {
return nil, nil, hists, nil

View File

@ -16,6 +16,7 @@ package globalstats
import (
"fmt"
"math/rand"
"testing"
"time"
@ -30,29 +31,22 @@ import (
"github.com/tiancaiamao/gp"
)
// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
loc := time.UTC
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
version := 1
killer := sqlkiller.SQLKiller{}
func prepareTopNsAndHists(b *testing.B, partitions int, tz *time.Location) ([]*statistics.TopN, []*statistics.Histogram) {
sc := stmtctx.NewStmtCtxWithTimeZone(tz)
// Prepare TopNs.
topNs := make([]*statistics.TopN, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
topN := statistics.NewTopN(3)
// Construct TopN, should be key1 -> rand(0, 1000), key2 -> rand(0, 1000), key3 -> rand(0, 1000)...
topN := statistics.NewTopN(500)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
require.NoError(b, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
require.NoError(b, err)
topN.AppendTopN(key2, 2)
if i%2 == 0 {
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
for j := 1; j <= 500; j++ {
// Randomly skip some keys for some partitions.
if i%2 == 0 && j%2 == 0 {
continue
}
key, err := codec.EncodeKey(sc, nil, types.NewIntDatum(int64(j)))
require.NoError(b, err)
topN.AppendTopN(key3, 3)
topN.AppendTopN(key, uint64(rand.Intn(1000)))
}
}
topNs = append(topNs, topN)
@ -62,68 +56,55 @@ func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
hists := make([]*statistics.Histogram, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct Hist
h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
h.Bounds.AppendInt64(0, 1)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20})
h.Bounds.AppendInt64(0, 2)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 3)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 4)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
h := statistics.NewHistogram(1, 500, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
for j := 1; j <= 500; j++ {
datum := types.NewIntDatum(int64(j))
h.AppendBucket(&datum, &datum, int64(10+j*10), 10)
}
hists = append(hists, h)
}
return topNs, hists
}
func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
loc := time.UTC
version := 1
killer := sqlkiller.SQLKiller{}
topNs, hists := prepareTopNsAndHists(b, partitions, loc)
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &killer)
// Benchmark merge 100 topN.
_, _, _, _ = MergePartTopN2GlobalTopN(
loc,
version,
topNs,
100,
hists,
false,
&killer,
)
}
}
var benchmarkSizes = []int{100, 1000, 2000, 5000, 10000}
// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func BenchmarkMergePartTopN2GlobalTopNWithHists(b *testing.B) {
for _, size := range benchmarkSizes {
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
benchmarkMergePartTopN2GlobalTopNWithHists(size, b)
})
}
}
// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *testing.B) {
loc := time.UTC
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
version := 1
killer := sqlkiller.SQLKiller{}
// Prepare TopNs.
topNs := make([]*statistics.TopN, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
topN := statistics.NewTopN(3)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
require.NoError(b, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
require.NoError(b, err)
topN.AppendTopN(key2, 2)
if i%2 == 0 {
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
require.NoError(b, err)
topN.AppendTopN(key3, 3)
}
}
topNs = append(topNs, topN)
}
// Prepare Hists.
hists := make([]*statistics.Histogram, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct Hist
h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
h.Bounds.AppendInt64(0, 1)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20})
h.Bounds.AppendInt64(0, 2)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 3)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 4)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
hists = append(hists, h)
}
topNs, hists := prepareTopNsAndHists(b, partitions, loc)
wrapper := NewStatsWrapper(hists, topNs)
const mergeConcurrency = 4
batchSize := len(wrapper.AllTopN) / mergeConcurrency
@ -136,24 +117,24 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test
defer gpool.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &killer)
}
}
var benchmarkSizes = []int{100, 1000, 10000, 100000, 1000000, 10000000}
var benchmarkConcurrencySizes = []int{100, 1000, 10000, 100000}
func BenchmarkMergePartTopN2GlobalTopNWithHists(b *testing.B) {
for _, size := range benchmarkSizes {
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
benchmarkMergePartTopN2GlobalTopNWithHists(size, b)
})
// Benchmark merge 100 topN.
_, _, _, _ = MergeGlobalStatsTopNByConcurrency(
gpool,
mergeConcurrency,
batchSize,
wrapper,
loc,
version,
100,
false,
&killer,
)
}
}
// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists(b *testing.B) {
for _, size := range benchmarkConcurrencySizes {
for _, size := range benchmarkSizes {
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(size, b)
})