// Copyright 2017 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package statistics import ( "math/rand" "testing" "time" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/require" ) func recordSetForWeightSamplingTest(size int) *recordSet { r := &recordSet{ data: make([]types.Datum, 0, size), count: size, } for i := 0; i < size; i++ { r.data = append(r.data, types.NewIntDatum(int64(i))) } r.setFields(mysql.TypeLonglong) return r } func recordSetForDistributedSamplingTest(size, batch int) []*recordSet { sets := make([]*recordSet, 0, batch) batchSize := size / batch for i := 0; i < batch; i++ { r := &recordSet{ data: make([]types.Datum, 0, batchSize), count: batchSize, } for j := 0; j < size/batch; j++ { r.data = append(r.data, types.NewIntDatum(int64(j+batchSize*i))) } r.setFields(mysql.TypeLonglong) sets = append(sets, r) } return sets } func TestWeightedSampling(t *testing.T) { t.Parallel() sampleNum := int64(20) rowNum := 100 loopCnt := 1000 rs := recordSetForWeightSamplingTest(rowNum) sc := mock.NewContext().GetSessionVars().StmtCtx // The loop which is commented out is used for stability test. // This test can run 800 times in a row without any failure. // for x := 0; x < 800; x++ { itemCnt := make([]int, rowNum) for loopI := 0; loopI < loopCnt; loopI++ { builder := &ReservoirRowSampleBuilder{ Sc: sc, RecordSet: rs, ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, Collators: make([]collate.Collator, 1), ColGroups: nil, MaxSampleSize: int(sampleNum), MaxFMSketchSize: 1000, Rng: rand.New(rand.NewSource(time.Now().UnixNano())), } collector, err := builder.Collect() require.NoError(t, err) for i := 0; i < collector.MaxSampleSize; i++ { a := collector.Samples[i].Columns[0].GetInt64() itemCnt[a]++ } require.Nil(t, rs.Close()) } expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) delta := 0.5 for _, cnt := range itemCnt { if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { require.Truef(t, false, "The frequency %v is exceed the Chernoff Bound", cnt) } } } func TestDistributedWeightedSampling(t *testing.T) { t.Parallel() sampleNum := int64(10) rowNum := 100 loopCnt := 1500 batch := 5 sets := recordSetForDistributedSamplingTest(rowNum, batch) sc := mock.NewContext().GetSessionVars().StmtCtx // The loop which is commented out is used for stability test. // This test can run 800 times in a row without any failure. // for x := 0; x < 800; x++ { itemCnt := make([]int, rowNum) for loopI := 1; loopI < loopCnt; loopI++ { rootRowCollector := NewReservoirRowSampleCollector(int(sampleNum), 1) rootRowCollector.FMSketches = append(rootRowCollector.FMSketches, NewFMSketch(1000)) for i := 0; i < batch; i++ { builder := &ReservoirRowSampleBuilder{ Sc: sc, RecordSet: sets[i], ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, Collators: make([]collate.Collator, 1), ColGroups: nil, MaxSampleSize: int(sampleNum), MaxFMSketchSize: 1000, Rng: rand.New(rand.NewSource(time.Now().UnixNano())), } collector, err := builder.Collect() require.NoError(t, err) rootRowCollector.MergeCollector(collector) require.Nil(t, sets[i].Close()) } for _, sample := range rootRowCollector.Samples { itemCnt[sample.Columns[0].GetInt64()]++ } } expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) delta := 0.5 for _, cnt := range itemCnt { if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { require.Truef(t, false, "the frequency %v is exceed the Chernoff Bound", cnt) } } } func TestBuildStatsOnRowSample(t *testing.T) { t.Parallel() ctx := mock.NewContext() sketch := NewFMSketch(1000) data := make([]*SampleItem, 0, 8) for i := 1; i <= 1000; i++ { d := types.NewIntDatum(int64(i)) err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) require.NoError(t, err) data = append(data, &SampleItem{Value: d}) } for i := 1; i < 10; i++ { d := types.NewIntDatum(int64(2)) err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) require.NoError(t, err) data = append(data, &SampleItem{Value: d}) } for i := 1; i < 7; i++ { d := types.NewIntDatum(int64(4)) err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) require.NoError(t, err) data = append(data, &SampleItem{Value: d}) } for i := 1; i < 5; i++ { d := types.NewIntDatum(int64(7)) err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) require.NoError(t, err) data = append(data, &SampleItem{Value: d}) } for i := 1; i < 3; i++ { d := types.NewIntDatum(int64(11)) err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) require.NoError(t, err) data = append(data, &SampleItem{Value: d}) } collector := &SampleCollector{ Samples: data, NullCount: 0, Count: int64(len(data)), FMSketch: sketch, TotalSize: int64(len(data)) * 8, } tp := types.NewFieldType(mysql.TypeLonglong) hist, topN, err := BuildHistAndTopN(ctx, 5, 4, 1, collector, tp, true) require.Nilf(t, err, "%+v", err) topNStr, err := topN.DecodedString(ctx, []byte{tp.Tp}) require.NoError(t, err) require.Equal(t, "TopN{length: 4, [(2, 10), (4, 7), (7, 5), (11, 3)]}", topNStr) require.Equal(t, "column:1 ndv:1000 totColSize:8168\n"+ "num: 200 lower_bound: 1 upper_bound: 204 repeats: 1 ndv: 0\n"+ "num: 200 lower_bound: 205 upper_bound: 404 repeats: 1 ndv: 0\n"+ "num: 200 lower_bound: 405 upper_bound: 604 repeats: 1 ndv: 0\n"+ "num: 200 lower_bound: 605 upper_bound: 804 repeats: 1 ndv: 0\n"+ "num: 196 lower_bound: 805 upper_bound: 1000 repeats: 1 ndv: 0", hist.ToString(0)) }