Files
tidb/pkg/table/tables/bench_test.go

227 lines
6.5 KiB
Go

// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tables_test
import (
"context"
"testing"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/benchdaily"
_ "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stretchr/testify/require"
)
const batchSize = 5000
func BenchmarkAddRecordInPipelinedDML(b *testing.B) {
logutil.InitLogger(&logutil.LogConfig{Config: log.Config{Level: "fatal"}})
store, dom := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
// Create the table
_, err := tk.Session().Execute(
context.Background(),
"CREATE TABLE IF NOT EXISTS test.t (a int primary key auto_increment, b varchar(255))",
)
require.NoError(b, err)
tb, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
require.NoError(b, err)
vardef.SetEnableMDL(true)
// Pre-create data to be inserted
records := make([][]types.Datum, batchSize)
for j := range batchSize {
records[j] = types.MakeDatums(j, "test")
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Reset environment for each batch
b.StopTimer()
ctx := tk.Session()
vars := ctx.GetSessionVars()
vars.BulkDMLEnabled = true
vars.TxnCtx.EnableMDL = true
vars.StmtCtx.InUpdateStmt = true
require.Nil(b, sessiontxn.NewTxn(context.Background(), tk.Session()))
txn, _ := ctx.Txn(true)
require.True(b, txn.IsPipelined())
b.StartTimer()
for j := range batchSize {
_, err := tb.AddRecord(ctx.GetTableCtx(), txn, records[j], table.DupKeyCheckLazy)
if err != nil {
b.Fatal(err)
}
}
b.StopTimer()
// Rollback the transaction to avoid interference between batches
ctx.RollbackTxn(context.Background())
}
totalRecords := batchSize * b.N
avgTimePerRecord := float64(b.Elapsed().Nanoseconds()) / float64(totalRecords)
b.ReportMetric(avgTimePerRecord, "ns/record")
}
func BenchmarkRemoveRecordInPipelinedDML(b *testing.B) {
logutil.InitLogger(&logutil.LogConfig{Config: log.Config{Level: "fatal"}})
store, dom := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
// Create the table
_, err := tk.Session().Execute(
context.Background(),
"CREATE TABLE IF NOT EXISTS test.t (a int primary key clustered, b varchar(255))",
)
require.NoError(b, err)
tb, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
require.NoError(b, err)
vardef.SetEnableMDL(true)
// Pre-create and add initial records
records := make([][]types.Datum, batchSize)
for j := range batchSize {
records[j] = types.MakeDatums(j, "test")
}
// Add initial records
se := tk.Session()
for j := range batchSize {
tk.MustExec("INSERT INTO test.t VALUES (?, ?)", j, "test")
}
b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Reset environment for each batch
vars := se.GetSessionVars()
vars.BulkDMLEnabled = true
vars.TxnCtx.EnableMDL = true
vars.StmtCtx.InUpdateStmt = true
require.Nil(b, sessiontxn.NewTxn(context.Background(), tk.Session()))
txn, _ := se.Txn(true)
require.True(b, txn.IsPipelined())
b.StartTimer()
for j := range batchSize {
// Remove record
handle := kv.IntHandle(j)
err := tb.RemoveRecord(se.GetTableCtx(), txn, handle, records[j])
if err != nil {
b.Fatal(err)
}
}
b.StopTimer()
// Rollback the transaction to avoid interference between batches
se.RollbackTxn(context.Background())
require.NoError(b, err)
}
totalRecords := batchSize * b.N
avgTimePerRecord := float64(b.Elapsed().Nanoseconds()) / float64(totalRecords)
b.ReportMetric(avgTimePerRecord, "ns/record")
}
func BenchmarkUpdateRecordInPipelinedDML(b *testing.B) {
logutil.InitLogger(&logutil.LogConfig{Config: log.Config{Level: "fatal"}})
store, dom := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
// Create the table
_, err := tk.Session().Execute(
context.Background(),
"CREATE TABLE IF NOT EXISTS test.t (a int primary key clustered, b varchar(255))",
)
require.NoError(b, err)
tb, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
require.NoError(b, err)
// Pre-create data to be inserted and then updated
records := make([][]types.Datum, batchSize)
for j := range batchSize {
records[j] = types.MakeDatums(j, "test")
}
// Pre-create new data
newData := make([][]types.Datum, batchSize)
for j := range batchSize {
newData[j] = types.MakeDatums(j, "updated")
}
// Add initial records
se := tk.Session()
for j := range batchSize {
tk.MustExec("INSERT INTO test.t VALUES (?, ?)", j, "test")
}
touched := make([]bool, len(tb.Meta().Columns))
touched[1] = true
b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Reset environment for each batch
vars := se.GetSessionVars()
vars.BulkDMLEnabled = true
vars.TxnCtx.EnableMDL = true
vars.StmtCtx.InUpdateStmt = true
require.Nil(b, sessiontxn.NewTxn(context.Background(), tk.Session()))
txn, _ := se.Txn(true)
require.True(b, txn.IsPipelined())
b.StartTimer()
for j := range batchSize {
// Update record
handle := kv.IntHandle(j)
err := tb.UpdateRecord(se.GetTableCtx(), txn, handle, records[j], newData[j], touched, table.WithCtx(context.TODO()))
if err != nil {
b.Fatal(err)
}
}
b.StopTimer()
// Rollback the transaction to avoid interference between batches
se.RollbackTxn(context.Background())
require.NoError(b, err)
}
totalRecords := batchSize * b.N
avgTimePerRecord := float64(b.Elapsed().Nanoseconds()) / float64(totalRecords)
b.ReportMetric(avgTimePerRecord, "ns/record")
}
func TestBenchDaily(t *testing.T) {
benchdaily.Run(
BenchmarkAddRecordInPipelinedDML,
BenchmarkRemoveRecordInPipelinedDML,
BenchmarkUpdateRecordInPipelinedDML,
)
}