Files
tidb/executor/benchmark_test.go

351 lines
9.6 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mock"
)
var (
_ Executor = &mockDataSource{}
)
type mockDataSourceParameters struct {
schema *expression.Schema
ndvs []int // number of distinct values on columns[i] and zero represents no limit
orders []bool // columns[i] should be ordered if orders[i] is true
rows int // number of rows the DataSource should output
ctx sessionctx.Context
}
type mockDataSource struct {
baseExecutor
p mockDataSourceParameters
genData []*chunk.Chunk
chunks []*chunk.Chunk
chunkPtr int
}
func (mds *mockDataSource) genColDatums(col int) (results []interface{}) {
typ := mds.retFieldTypes[col]
order := mds.p.orders[col]
rows := mds.p.rows
NDV := mds.p.ndvs[col]
results = make([]interface{}, 0, rows)
if NDV == 0 {
for i := 0; i < rows; i++ {
results = append(results, mds.randDatum(typ))
}
} else {
datumSet := make(map[string]bool, NDV)
datums := make([]interface{}, 0, NDV)
for len(datums) < NDV {
d := mds.randDatum(typ)
str := fmt.Sprintf("%v", d)
if datumSet[str] {
continue
}
datumSet[str] = true
datums = append(datums, d)
}
for i := 0; i < rows; i++ {
results = append(results, datums[rand.Intn(NDV)])
}
}
if order {
sort.Slice(results, func(i, j int) bool {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return results[i].(int64) < results[j].(int64)
case mysql.TypeDouble:
return results[i].(float64) < results[j].(float64)
default:
panic("not implement")
}
})
}
return
}
func (mds *mockDataSource) randDatum(typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(rand.Int())
case mysql.TypeDouble:
return rand.Float64()
default:
panic("not implement")
}
}
func (mds *mockDataSource) prepareChunks() {
mds.chunks = make([]*chunk.Chunk, len(mds.genData))
for i := range mds.chunks {
mds.chunks[i] = mds.genData[i].CopyConstruct()
}
mds.chunkPtr = 0
}
func (mds *mockDataSource) Next(ctx context.Context, req *chunk.RecordBatch) error {
if mds.chunkPtr >= len(mds.chunks) {
req.Reset()
return nil
}
dataChk := mds.chunks[mds.chunkPtr]
dataChk.SwapColumns(req.Chunk)
mds.chunkPtr++
return nil
}
func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource {
baseExec := newBaseExecutor(opt.ctx, opt.schema, "")
m := &mockDataSource{baseExec, opt, nil, nil, 0}
types := m.retTypes()
colData := make([][]interface{}, len(types))
for i := 0; i < len(types); i++ {
colData[i] = m.genColDatums(i)
}
m.genData = make([]*chunk.Chunk, (m.p.rows+m.initCap-1)/m.initCap)
for i := range m.genData {
m.genData[i] = chunk.NewChunkWithCapacity(m.retTypes(), m.ctx.GetSessionVars().MaxChunkSize)
}
for i := 0; i < m.p.rows; i++ {
idx := i / m.maxChunkSize
retTypes := m.retTypes()
for colIdx := 0; colIdx < len(types); colIdx++ {
switch retTypes[colIdx].Tp {
case mysql.TypeLong, mysql.TypeLonglong:
m.genData[idx].AppendInt64(colIdx, colData[colIdx][i].(int64))
case mysql.TypeDouble:
m.genData[idx].AppendFloat64(colIdx, colData[colIdx][i].(float64))
default:
panic("not implement")
}
}
}
return m
}
type aggTestCase struct {
// The test table's schema is fixed (aggCol Double, groupBy LongLong).
execType string // "hash" or "stream"
aggFunc string // sum, avg, count ....
groupByNDV int // the number of distinct group-by keys
hasDistinct bool
rows int
concurrency int
ctx sessionctx.Context
}
func (a aggTestCase) columns() []*expression.Column {
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeDouble)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}}
}
func (a aggTestCase) String() string {
return fmt.Sprintf("(execType:%v, aggFunc:%v, groupByNDV:%v, hasDistinct:%v, rows:%v, concruuency:%v)",
a.execType, a.aggFunc, a.groupByNDV, a.hasDistinct, a.rows, a.concurrency)
}
func defaultAggTestCase(exec string) *aggTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
return &aggTestCase{exec, ast.AggFuncSum, 1000, false, 10000000, 4, ctx}
}
func buildHashAggExecutor(ctx sessionctx.Context, src Executor, schema *expression.Schema,
aggFuncs []*aggregation.AggFuncDesc, groupItems []expression.Expression) Executor {
plan := new(core.PhysicalHashAgg)
plan.AggFuncs = aggFuncs
plan.GroupByItems = groupItems
plan.SetSchema(schema)
plan.Init(ctx, nil)
plan.SetChildren(nil)
b := newExecutorBuilder(ctx, nil)
exec := b.build(plan)
hashAgg := exec.(*HashAggExec)
hashAgg.children[0] = src
return exec
}
func buildStreamAggExecutor(ctx sessionctx.Context, src Executor, schema *expression.Schema,
aggFuncs []*aggregation.AggFuncDesc, groupItems []expression.Expression) Executor {
plan := new(core.PhysicalStreamAgg)
plan.AggFuncs = aggFuncs
plan.GroupByItems = groupItems
plan.SetSchema(schema)
plan.Init(ctx, nil)
plan.SetChildren(nil)
b := newExecutorBuilder(ctx, nil)
exec := b.build(plan)
streamAgg := exec.(*StreamAggExec)
streamAgg.children[0] = src
return exec
}
func buildAggExecutor(b *testing.B, testCase *aggTestCase, child Executor) Executor {
ctx := testCase.ctx
if err := ctx.GetSessionVars().SetSystemVar(variable.TiDBHashAggFinalConcurrency, fmt.Sprintf("%v", testCase.concurrency)); err != nil {
b.Fatal(err)
}
if err := ctx.GetSessionVars().SetSystemVar(variable.TiDBHashAggPartialConcurrency, fmt.Sprintf("%v", testCase.concurrency)); err != nil {
b.Fatal(err)
}
childCols := testCase.columns()
schema := expression.NewSchema(childCols...)
groupBy := []expression.Expression{childCols[1]}
aggFunc := aggregation.NewAggFuncDesc(testCase.ctx, testCase.aggFunc, []expression.Expression{childCols[0]}, testCase.hasDistinct)
aggFuncs := []*aggregation.AggFuncDesc{aggFunc}
var aggExec Executor
switch testCase.execType {
case "hash":
aggExec = buildHashAggExecutor(testCase.ctx, child, schema, aggFuncs, groupBy)
case "stream":
aggExec = buildStreamAggExecutor(testCase.ctx, child, schema, aggFuncs, groupBy)
default:
b.Fatal("not implement")
}
return aggExec
}
func benchmarkAggExecWithCase(b *testing.B, casTest *aggTestCase) {
cols := casTest.columns()
orders := []bool{false, casTest.execType == "stream"}
dataSource := buildMockDataSource(mockDataSourceParameters{
schema: expression.NewSchema(cols...),
ndvs: []int{0, casTest.groupByNDV},
orders: orders,
rows: casTest.rows,
ctx: casTest.ctx,
})
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer() // prepare a new agg-executor
aggExec := buildAggExecutor(b, casTest, dataSource)
tmpCtx := context.Background()
chk := aggExec.newFirstChunk()
dataSource.prepareChunks()
b.StartTimer()
if err := aggExec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
batch := chunk.NewRecordBatch(chk)
for {
if err := aggExec.Next(tmpCtx, batch); err != nil {
b.Fatal(b)
}
if chk.NumRows() == 0 {
break
}
}
if err := aggExec.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
}
}
func BenchmarkAggRows(b *testing.B) {
rows := []int{100000, 1000000, 10000000}
concurrencies := []int{1, 4, 8, 15, 20, 30, 40}
for _, row := range rows {
for _, con := range concurrencies {
for _, exec := range []string{"hash", "stream"} {
if exec == "stream" && con > 1 {
continue
}
cas := defaultAggTestCase(exec)
cas.rows = row
cas.concurrency = con
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkAggExecWithCase(b, cas)
})
}
}
}
}
func BenchmarkAggGroupByNDV(b *testing.B) {
NDVs := []int{10, 100, 1000, 10000, 100000, 1000000, 10000000}
for _, NDV := range NDVs {
for _, exec := range []string{"hash", "stream"} {
cas := defaultAggTestCase(exec)
cas.groupByNDV = NDV
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkAggExecWithCase(b, cas)
})
}
}
}
func BenchmarkAggConcurrency(b *testing.B) {
concs := []int{1, 4, 8, 15, 20, 30, 40}
for _, con := range concs {
for _, exec := range []string{"hash", "stream"} {
if exec == "stream" && con > 1 {
continue
}
cas := defaultAggTestCase(exec)
cas.concurrency = con
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkAggExecWithCase(b, cas)
})
}
}
}
func BenchmarkAggDistinct(b *testing.B) {
rows := []int{100000, 1000000, 10000000}
distincts := []bool{false, true}
for _, row := range rows {
for _, exec := range []string{"hash", "stream"} {
for _, distinct := range distincts {
cas := defaultAggTestCase(exec)
cas.rows = row
cas.hasDistinct = distinct
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkAggExecWithCase(b, cas)
})
}
}
}
}