Files
tidb/pkg/server/conn_test.go
2025-11-14 13:32:11 +00:00

2230 lines
78 KiB
Go

// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"fmt"
"io"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/extension"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/server/internal"
"github.com/pingcap/tidb/pkg/server/internal/handshake"
"github.com/pingcap/tidb/pkg/server/internal/parse"
"github.com/pingcap/tidb/pkg/server/internal/testutil"
serverutil "github.com/pingcap/tidb/pkg/server/internal/util"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/arena"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/plancodec"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
promtestutils "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/testutils"
)
type Issue33699CheckType struct {
name string
defVal string
setVal string
isSessionVariable bool
}
func (c *Issue33699CheckType) toSetSessionVar() string {
if c.isSessionVariable {
return fmt.Sprintf("set session %s=%s", c.name, c.setVal)
}
return fmt.Sprintf("set @%s=%s", c.name, c.setVal)
}
func (c *Issue33699CheckType) toGetSessionVar() string {
if c.isSessionVariable {
return fmt.Sprintf("select @@session.%s", c.name)
}
return fmt.Sprintf("select @%s", c.name)
}
func TestIssue33699(t *testing.T) {
store := testkit.CreateMockStore(t)
var outBuffer bytes.Buffer
tidbdrv := NewTiDBDriver(store)
cfg := serverutil.NewTestConfig()
cfg.Port, cfg.Status.StatusPort = 0, 0
cfg.Status.ReportStatus = false
server, err := NewServer(cfg, tidbdrv)
require.NoError(t, err)
defer server.Close()
cc := &clientConn{
connectionID: 1,
salt: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14},
server: server,
pkt: internal.NewPacketIOForTest(bufio.NewWriter(&outBuffer)),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
alloc: arena.NewAllocator(512),
chunkAlloc: chunk.NewAllocator(),
capability: mysql.ClientProtocol41,
}
tk := testkit.NewTestKit(t, store)
ctx := &TiDBContext{Session: tk.Session()}
cc.SetCtx(ctx)
// change user.
doChangeUser := func() {
userData := append([]byte("root"), 0x0, 0x0)
userData = append(userData, []byte("test")...)
userData = append(userData, 0x0)
changeUserReq := dispatchInput{
com: mysql.ComChangeUser,
in: userData,
err: nil,
out: []byte{0x7, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
}
inBytes := append([]byte{changeUserReq.com}, changeUserReq.in...)
err = cc.dispatch(context.Background(), inBytes)
require.Equal(t, changeUserReq.err, err)
if err == nil {
err = cc.flush(context.TODO())
require.NoError(t, err)
require.Equal(t, changeUserReq.out, outBuffer.Bytes())
} else {
_ = cc.flush(context.TODO())
}
outBuffer.Reset()
}
// check variable.
checks := []Issue33699CheckType{
{ // self define.
"a",
"<nil>",
"1",
false,
},
{ // session variable
"net_read_timeout",
"30",
"1234",
true,
},
{
"net_write_timeout",
"60",
"1234",
true,
},
}
// default;
for _, ck := range checks {
tk.MustQuery(ck.toGetSessionVar()).Check(testkit.Rows(ck.defVal))
}
// set;
for _, ck := range checks {
tk.MustExec(ck.toSetSessionVar())
}
// check after set.
for _, ck := range checks {
tk.MustQuery(ck.toGetSessionVar()).Check(testkit.Rows(ck.setVal))
}
// check for issue-33892: maybe trigger panic when ChangeUser before fix.
var stop uint32
go func(stop *uint32) {
for {
if atomic.LoadUint32(stop) == 1 {
break
}
cc.getCtx().ShowProcess()
}
}(&stop)
time.Sleep(time.Millisecond)
doChangeUser()
atomic.StoreUint32(&stop, 1)
time.Sleep(time.Millisecond)
require.NotEqual(t, ctx, cc.getCtx())
require.NotEqual(t, ctx.Session, cc.ctx.Session)
// new session,so values is defaults;
tk.SetSession(cc.ctx.Session) // set new session.
for _, ck := range checks {
tk.MustQuery(ck.toGetSessionVar()).Check(testkit.Rows(ck.defVal))
}
}
func TestMalformHandshakeHeader(t *testing.T) {
data := []byte{0x00}
var p handshake.Response41
_, err := parse.HandshakeResponseHeader(context.Background(), &p, data)
require.Error(t, err)
}
func TestParseHandshakeResponse(t *testing.T) {
// test data from http://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse41
data := []byte{
0x85, 0xa2, 0x1e, 0x00, 0x00, 0x00, 0x00, 0x40, 0x08, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x72, 0x6f, 0x6f, 0x74, 0x00, 0x14, 0x22, 0x50, 0x79, 0xa2, 0x12, 0xd4,
0xe8, 0x82, 0xe5, 0xb3, 0xf4, 0x1a, 0x97, 0x75, 0x6b, 0xc8, 0xbe, 0xdb, 0x9f, 0x80, 0x6d, 0x79,
0x73, 0x71, 0x6c, 0x5f, 0x6e, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x73, 0x73, 0x77,
0x6f, 0x72, 0x64, 0x00, 0x61, 0x03, 0x5f, 0x6f, 0x73, 0x09, 0x64, 0x65, 0x62, 0x69, 0x61, 0x6e,
0x36, 0x2e, 0x30, 0x0c, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x6e, 0x61, 0x6d, 0x65,
0x08, 0x6c, 0x69, 0x62, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x04, 0x5f, 0x70, 0x69, 0x64, 0x05, 0x32,
0x32, 0x33, 0x34, 0x34, 0x0f, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x65, 0x72,
0x73, 0x69, 0x6f, 0x6e, 0x08, 0x35, 0x2e, 0x36, 0x2e, 0x36, 0x2d, 0x6d, 0x39, 0x09, 0x5f, 0x70,
0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x06, 0x78, 0x38, 0x36, 0x5f, 0x36, 0x34, 0x03, 0x66,
0x6f, 0x6f, 0x03, 0x62, 0x61, 0x72,
}
var p handshake.Response41
offset, err := parse.HandshakeResponseHeader(context.Background(), &p, data)
require.NoError(t, err)
require.Equal(t, mysql.ClientConnectAtts, p.Capability&mysql.ClientConnectAtts)
err = parse.HandshakeResponseBody(context.Background(), &p, data, offset)
require.NoError(t, err)
eq := mapIdentical(p.Attrs, map[string]string{
"_client_version": "5.6.6-m9",
"_platform": "x86_64",
"foo": "bar",
"_os": "debian6.0",
"_client_name": "libmysql",
"_pid": "22344"})
require.True(t, eq)
data = []byte{
0x8d, 0xa6, 0x0f, 0x00, 0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x70, 0x61, 0x6d, 0x00, 0x14, 0xab, 0x09, 0xee, 0xf6, 0xbc, 0xb1, 0x32,
0x3e, 0x61, 0x14, 0x38, 0x65, 0xc0, 0x99, 0x1d, 0x95, 0x7d, 0x75, 0xd4, 0x47, 0x74, 0x65, 0x73,
0x74, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x5f, 0x6e, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70,
0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x00,
}
p = handshake.Response41{}
offset, err = parse.HandshakeResponseHeader(context.Background(), &p, data)
require.NoError(t, err)
capability := mysql.ClientProtocol41 |
mysql.ClientPluginAuth |
mysql.ClientSecureConnection |
mysql.ClientConnectWithDB
require.Equal(t, capability, p.Capability&capability)
err = parse.HandshakeResponseBody(context.Background(), &p, data, offset)
require.NoError(t, err)
require.Equal(t, "pam", p.User)
require.Equal(t, "test", p.DBName)
}
func TestIssue1768(t *testing.T) {
// this data is from captured handshake packet, using mysql client.
// TiDB should handle authorization correctly, even mysql client set
// the ClientPluginAuthLenencClientData capability.
data := []byte{
0x85, 0xa6, 0xff, 0x01, 0x00, 0x00, 0x00, 0x01, 0x21, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00, 0x14, 0xe9, 0x7a, 0x2b, 0xec, 0x4a, 0xa8,
0xea, 0x67, 0x8a, 0xc2, 0x46, 0x4d, 0x32, 0xa4, 0xda, 0x39, 0x77, 0xe5, 0x61, 0x1a, 0x65, 0x03,
0x5f, 0x6f, 0x73, 0x05, 0x4c, 0x69, 0x6e, 0x75, 0x78, 0x0c, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e,
0x74, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x08, 0x6c, 0x69, 0x62, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x04,
0x5f, 0x70, 0x69, 0x64, 0x04, 0x39, 0x30, 0x33, 0x30, 0x0f, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e,
0x74, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x06, 0x35, 0x2e, 0x37, 0x2e, 0x31, 0x34,
0x09, 0x5f, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x06, 0x78, 0x38, 0x36, 0x5f, 0x36,
0x34, 0x0c, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x61, 0x6d, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x05, 0x6d,
0x79, 0x73, 0x71, 0x6c,
}
p := handshake.Response41{}
offset, err := parse.HandshakeResponseHeader(context.Background(), &p, data)
require.NoError(t, err)
require.Equal(t, mysql.ClientPluginAuthLenencClientData, p.Capability&mysql.ClientPluginAuthLenencClientData)
err = parse.HandshakeResponseBody(context.Background(), &p, data, offset)
require.NoError(t, err)
require.NotEmpty(t, p.Auth)
}
func TestInitialHandshake(t *testing.T) {
store := testkit.CreateMockStore(t)
var outBuffer bytes.Buffer
cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
srv, err := NewServer(cfg, drv)
require.NoError(t, err)
cc := &clientConn{
connectionID: 1,
salt: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14},
server: srv,
pkt: internal.NewPacketIOForTest(bufio.NewWriter(&outBuffer)),
}
err = cc.writeInitialHandshake(context.TODO())
require.NoError(t, err)
expected := new(bytes.Buffer)
expected.WriteByte(0x0a) // Protocol
expected.WriteString(mysql.ServerVersion) // Version
expected.WriteByte(0x00) // NULL
err = binary.Write(expected, binary.LittleEndian, uint32(1)) // Connection ID
require.NoError(t, err)
expected.Write([]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x00}) // Salt
err = binary.Write(expected, binary.LittleEndian, uint16(defaultCapability&0xFFFF)) // Server Capability
require.NoError(t, err)
expected.WriteByte(uint8(mysql.DefaultCollationID)) // Server Language
err = binary.Write(expected, binary.LittleEndian, mysql.ServerStatusAutocommit) // Server Status
require.NoError(t, err)
err = binary.Write(expected, binary.LittleEndian, uint16((defaultCapability>>16)&0xFFFF)) // Extended Server Capability
require.NoError(t, err)
expected.WriteByte(0x15) // Authentication Plugin Length
expected.Write([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) // Unused
expected.Write([]byte{0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x00}) // Salt
expected.WriteString("mysql_native_password") // Authentication Plugin
expected.WriteByte(0x00) // NULL
require.Equal(t, expected.Bytes(), outBuffer.Bytes()[4:])
}
type dispatchInput struct {
com byte
in []byte
err error
out []byte
}
func TestDispatch(t *testing.T) {
userData := append([]byte("root"), 0x0, 0x0)
userData = append(userData, []byte("test")...)
userData = append(userData, 0x0)
inputs := []dispatchInput{
{
com: mysql.ComSleep,
in: nil,
err: mysql.NewErrf(mysql.ErrUnknown, "command %d not supported now", nil, mysql.ComSleep),
out: nil,
},
{
com: mysql.ComQuit,
in: nil,
err: io.EOF,
out: nil,
},
{
com: mysql.ComQuery,
in: []byte("do 1"),
err: nil,
out: []byte{0x3, 0x0, 0x0, 0x0, 0x0, 0x00, 0x0},
},
{
com: mysql.ComInitDB,
in: []byte("test"),
err: nil,
out: []byte{0x3, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0},
},
{
com: mysql.ComPing,
in: nil,
err: nil,
out: []byte{0x3, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
},
{
com: mysql.ComStmtPrepare,
in: []byte("select 1"),
err: nil,
out: []byte{
0xc, 0x0, 0x0, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x17,
0x0, 0x0, 0x4, 0x3, 0x64, 0x65, 0x66, 0x0, 0x0, 0x0, 0x1, 0x31, 0x0, 0xc, 0x3f,
0x0, 0x1, 0x0, 0x0, 0x0, 0x8, 0x81, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x5, 0xfe,
},
},
{
com: mysql.ComStmtExecute,
in: []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x1, 0x0},
err: nil,
out: []byte{
0x1, 0x0, 0x0, 0x6, 0x1, 0x17, 0x0, 0x0, 0x7, 0x3, 0x64, 0x65, 0x66, 0x0, 0x0, 0x0,
0x1, 0x31, 0x0, 0xc, 0x3f, 0x0, 0x1, 0x0, 0x0, 0x0, 0x8, 0x81, 0x0, 0x0, 0x0,
0x0, 0x1, 0x0, 0x0, 0x8, 0xfe,
},
},
{
com: mysql.ComStmtFetch,
in: []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
err: nil,
out: []byte{0x1, 0x0, 0x0, 0x9, 0xfe},
},
{
com: mysql.ComStmtReset,
in: []byte{0x1, 0x0, 0x0, 0x0},
err: nil,
out: []byte{0x3, 0x0, 0x0, 0xa, 0x0, 0x0, 0x0},
},
{
com: mysql.ComSetOption,
in: []byte{0x1, 0x0, 0x0, 0x0},
err: nil,
out: []byte{0x1, 0x0, 0x0, 0xb, 0xfe},
},
{
com: mysql.ComStmtClose,
in: []byte{0x1, 0x0, 0x0, 0x0},
err: nil,
out: []byte{},
},
{
com: mysql.ComFieldList,
in: []byte("t"),
err: nil,
out: []byte{
0x1f, 0x0, 0x0, 0xc, 0x3, 0x64, 0x65, 0x66, 0x4, 0x74, 0x65, 0x73, 0x74, 0x1, 0x74,
0x1, 0x74, 0x1, 0x61, 0x1, 0x61, 0xc, 0x3f, 0x0, 0xb, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0,
0x0, 0x0, 0x0, 0xfb, 0x1, 0x0, 0x0, 0xd, 0xfe,
},
},
{
com: mysql.ComChangeUser,
in: userData,
err: nil,
out: []byte{0x3, 0x0, 0x0, 0xe, 0x0, 0x0, 0x0},
},
{
com: mysql.ComRefresh, // flush privileges
in: []byte{0x01},
err: nil,
out: []byte{0x3, 0x0, 0x0, 0xf, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x10, 0x0, 0x0, 0x0},
},
{
com: mysql.ComRefresh, // flush logs etc
in: []byte{0x02},
err: nil,
out: []byte{0x3, 0x0, 0x0, 0x11, 0x0, 0x0, 0x0},
},
{
com: mysql.ComResetConnection,
in: nil,
err: nil,
out: []byte{0x3, 0x0, 0x0, 0x12, 0x0, 0x0, 0x0},
},
}
testDispatch(t, inputs, 0)
}
func TestDispatchClientProtocol41(t *testing.T) {
userData := append([]byte("root"), 0x0, 0x0)
userData = append(userData, []byte("test")...)
userData = append(userData, 0x0)
inputs := []dispatchInput{
{
com: mysql.ComSleep,
in: nil,
err: mysql.NewErrf(mysql.ErrUnknown, "command %d not supported now", nil, mysql.ComSleep),
out: nil,
},
{
com: mysql.ComQuit,
in: nil,
err: io.EOF,
out: nil,
},
{
com: mysql.ComQuery,
in: []byte("do 1"),
err: nil,
out: []byte{0x7, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
},
{
com: mysql.ComInitDB,
in: []byte("test"),
err: nil,
out: []byte{0x7, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
},
{
com: mysql.ComPing,
in: nil,
err: nil,
out: []byte{0x7, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
},
{
com: mysql.ComStmtPrepare,
in: []byte("select 1"),
err: nil,
out: []byte{
0xc, 0x0, 0x0, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x17,
0x0, 0x0, 0x4, 0x3, 0x64, 0x65, 0x66, 0x0, 0x0, 0x0, 0x1, 0x31, 0x0, 0xc, 0x3f,
0x0, 0x1, 0x0, 0x0, 0x0, 0x8, 0x81, 0x0, 0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x5, 0xfe,
0x0, 0x0, 0x2, 0x0,
},
},
{
com: mysql.ComStmtExecute,
in: []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x1, 0x0},
err: nil,
out: []byte{
0x1, 0x0, 0x0, 0x6, 0x1, 0x17, 0x0, 0x0, 0x7, 0x3, 0x64, 0x65, 0x66, 0x0, 0x0, 0x0,
0x1, 0x31, 0x0, 0xc, 0x3f, 0x0, 0x1, 0x0, 0x0, 0x0, 0x8, 0x81, 0x0, 0x0, 0x0,
0x0, 0x5, 0x0, 0x0, 0x8, 0xfe, 0x0, 0x0, 0x42, 0x0,
},
},
{
com: mysql.ComStmtFetch,
in: []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
err: nil,
out: []byte{0x5, 0x0, 0x0, 0x9, 0xfe, 0x0, 0x0, 0x42, 0x0},
},
{
com: mysql.ComStmtReset,
in: []byte{0x1, 0x0, 0x0, 0x0},
err: nil,
out: []byte{0x7, 0x0, 0x0, 0xa, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
},
{
com: mysql.ComSetOption,
in: []byte{0x1, 0x0, 0x0, 0x0},
err: nil,
out: []byte{0x5, 0x0, 0x0, 0xb, 0xfe, 0x0, 0x0, 0x2, 0x0},
},
{
com: mysql.ComStmtClose,
in: []byte{0x1, 0x0, 0x0, 0x0},
err: nil,
out: []byte{},
},
{
com: mysql.ComFieldList,
in: []byte("t"),
err: nil,
out: []byte{
0x1f, 0x0, 0x0, 0xc, 0x3, 0x64, 0x65, 0x66, 0x4, 0x74, 0x65, 0x73, 0x74, 0x1, 0x74,
0x1, 0x74, 0x1, 0x61, 0x1, 0x61, 0xc, 0x3f, 0x0, 0xb, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0,
0x0, 0x0, 0x0, 0xfb, 0x5, 0x0, 0x0, 0x0d, 0xfe, 0x0, 0x0, 0x2, 0x0,
},
},
{
com: mysql.ComChangeUser,
in: userData,
err: nil,
out: []byte{0x7, 0x0, 0x0, 0xe, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
},
{
com: mysql.ComRefresh, // flush privileges
in: []byte{0x01},
err: nil,
out: []byte{0x7, 0x0, 0x0, 0xf, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x7, 0x0, 0x0, 0x10, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
},
{
com: mysql.ComRefresh, // flush logs etc
in: []byte{0x02},
err: nil,
out: []byte{0x7, 0x0, 0x0, 0x11, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
},
{
com: mysql.ComResetConnection,
in: nil,
err: nil,
out: []byte{0x7, 0x0, 0x0, 0x12, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
},
}
testDispatch(t, inputs, mysql.ClientProtocol41)
}
func TestQueryEndWithZero(t *testing.T) {
inputs := []dispatchInput{
{
com: mysql.ComStmtPrepare,
in: append([]byte("select 1"), 0x0),
err: nil,
out: []byte{
0xc, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x17,
0x0, 0x0, 0x1, 0x3, 0x64, 0x65, 0x66, 0x0, 0x0, 0x0, 0x1, 0x31, 0x0, 0xc, 0x3f,
0x0, 0x1, 0x0, 0x0, 0x0, 0x8, 0x81, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x2, 0xfe,
},
},
{
com: mysql.ComQuery,
in: append([]byte("select 1"), 0x0),
err: nil,
out: []byte{
0x1, 0x0, 0x0, 0x3, 0x1, 0x17, 0x0, 0x0, 0x4, 0x3, 0x64, 0x65, 0x66, 0x0, 0x0, 0x0,
0x1, 0x31, 0x0, 0xc, 0x3f, 0x0, 0x1, 0x0, 0x0, 0x0, 0x8, 0x81, 0x0, 0x0, 0x0,
0x0, 0x1, 0x0, 0x0, 0x5, 0xfe, 0x2, 0x0, 0x0, 0x6, 0x1, 0x31, 0x1, 0x0, 0x0, 0x7, 0xfe,
},
},
}
testDispatch(t, inputs, 0)
}
func testDispatch(t *testing.T, inputs []dispatchInput, capability uint32) {
store := testkit.CreateMockStore(t)
se, err := session.CreateSession4Test(store)
require.NoError(t, err)
tc := &TiDBContext{
Session: se,
stmts: make(map[int]*TiDBStatement),
}
_, err = se.Execute(context.Background(), "create table test.t(a int)")
require.NoError(t, err)
_, err = se.Execute(context.Background(), "insert into test.t values (1)")
require.NoError(t, err)
var outBuffer bytes.Buffer
tidbdrv := NewTiDBDriver(store)
cfg := serverutil.NewTestConfig()
cfg.Port, cfg.Status.StatusPort = 0, 0
cfg.Status.ReportStatus = false
server, err := NewServer(cfg, tidbdrv)
require.NoError(t, err)
// Set healthy. This is used by graceful shutdown
// and is used in the response for ComPing and the
// /status HTTP endpoint
server.health.Store(true)
defer server.Close()
cc := &clientConn{
connectionID: 1,
salt: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14},
server: server,
pkt: internal.NewPacketIOForTest(bufio.NewWriter(&outBuffer)),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
alloc: arena.NewAllocator(512),
chunkAlloc: chunk.NewAllocator(),
capability: capability,
}
cc.SetCtx(tc)
for _, cs := range inputs {
inBytes := append([]byte{cs.com}, cs.in...)
err := cc.dispatch(context.Background(), inBytes)
require.Equal(t, cs.err, err)
if err == nil {
err = cc.flush(context.TODO())
require.NoError(t, err)
require.Equal(t, cs.out, outBuffer.Bytes())
} else {
_ = cc.flush(context.TODO())
}
outBuffer.Reset()
}
}
func TestGetSessionVarsWaitTimeout(t *testing.T) {
store := testkit.CreateMockStore(t)
se, err := session.CreateSession4Test(store)
require.NoError(t, err)
tc := &TiDBContext{
Session: se,
stmts: make(map[int]*TiDBStatement),
}
cc := &clientConn{
connectionID: 1,
server: &Server{
capability: defaultCapability,
},
}
cc.SetCtx(tc)
require.Equal(t, uint64(vardef.DefWaitTimeout), cc.getSessionVarsWaitTimeout(context.Background()))
}
func mapIdentical(m1, m2 map[string]string) bool {
return mapBelong(m1, m2) && mapBelong(m2, m1)
}
func mapBelong(m1, m2 map[string]string) bool {
for k1, v1 := range m1 {
v2, ok := m2[k1]
if !ok && v1 != v2 {
return false
}
}
return true
}
func TestConnExecutionTimeout(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
// There is no underlying netCon, use failpoint to avoid panic
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeClientConn", "return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeClientConn"))
}()
tk := testkit.NewTestKit(t, store)
connID := uint64(1)
tk.Session().SetConnectionID(connID)
tc := &TiDBContext{
Session: tk.Session(),
stmts: make(map[int]*TiDBStatement),
}
cc := &clientConn{
connectionID: connID,
server: &Server{
capability: defaultCapability,
},
alloc: arena.NewAllocator(32 * 1024),
chunkAlloc: chunk.NewAllocator(),
}
cc.SetCtx(tc)
srv := &Server{
clients: map[uint64]*clientConn{
connID: cc,
},
dom: dom,
}
handle := dom.ExpensiveQueryHandle().SetSessionManager(srv)
go handle.Run()
tk.MustExec("use test;")
tk.MustExec("CREATE TABLE testTable2 (id bigint PRIMARY KEY, age int)")
for i := range 10 {
str := fmt.Sprintf("insert into testTable2 values(%d, %d)", i, i%80)
tk.MustExec(str)
}
tk.MustQuery("select SLEEP(1);").Check(testkit.Rows("0"))
tk.MustExec("set @@max_execution_time = 500;")
tk.MustQuery("select SLEEP(1);").Check(testkit.Rows("1"))
tk.MustExec("set @@max_execution_time = 1500;")
tk.MustExec("set @@tidb_expensive_query_time_threshold = 1;")
tk.MustQuery("select SLEEP(1);").Check(testkit.Rows("0"))
err := tk.QueryToErr("select * FROM testTable2 WHERE SLEEP(1);")
require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error())
// Test executor stats when execution time exceeded.
tk.MustExec("set @@tidb_slow_log_threshold=300")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowByInjestSleep", `return(150)`))
err = tk.QueryToErr("select /*+ max_execution_time(600), set_var(tikv_client_read_timeout=100) */ * from testTable2")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowByInjestSleep"))
require.Error(t, err)
require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error())
planInfo, err := plancodec.DecodePlan(tk.Session().GetSessionVars().StmtCtx.GetEncodedPlan())
require.NoError(t, err)
require.Regexp(t, "TableReader.*cop_task: {num: .*num_rpc:.*, total_time:.*", planInfo)
// Killed because of max execution time, reset Killed to 0.
tk.Session().GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.MaxExecTimeExceeded)
tk.MustExec("set @@max_execution_time = 0;")
tk.MustQuery("select * FROM testTable2 WHERE SLEEP(1);").Check(testkit.Rows())
err = tk.QueryToErr("select /*+ MAX_EXECUTION_TIME(100)*/ * FROM testTable2 WHERE SLEEP(1);")
require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error())
// Killed because of max execution time, reset Killed to 0.
tk.Session().GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.MaxExecTimeExceeded)
err = cc.handleQuery(context.Background(), "select * FROM testTable2 WHERE SLEEP(1);")
require.NoError(t, err)
err = cc.handleQuery(context.Background(), "select /*+ MAX_EXECUTION_TIME(100)*/ * FROM testTable2 WHERE SLEEP(1);")
require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error())
err = cc.handleQuery(context.Background(), "select /*+ set_var(max_execution_time=100) */ age, sleep(1) from testTable2 union all select age, 1 from testTable2")
require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error())
// Killed because of max execution time, reset Killed to 0.
tk.Session().GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.MaxExecTimeExceeded)
tk.MustExec("set @@max_execution_time = 500;")
err = cc.handleQuery(context.Background(), "alter table testTable2 add index idx(age);")
require.NoError(t, err)
}
func TestShutDown(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
cc := &clientConn{}
se, err := session.CreateSession4Test(store)
require.NoError(t, err)
tc := &TiDBContext{Session: se}
cc.SetCtx(tc)
// set killed flag
cc.status = connStatusShutdown
// assert ErrQueryInterrupted
err = cc.handleQuery(context.Background(), "select 1")
require.Equal(t, exeerrors.ErrQueryInterrupted, err)
cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
srv, err := NewServer(cfg, drv)
require.NoError(t, err)
srv.SetDomain(dom)
cc = &clientConn{server: srv}
cc.SetCtx(tc)
waitMap := [][]bool{
// Reading, Not Reading
{false, true}, // Not InTxn
{true, true}, // InTxn
}
for idx, waitMap := range waitMap {
inTxn := idx > 0
for idx, shouldWait := range waitMap {
reading := idx == 0
if inTxn {
cc.getCtx().GetSessionVars().SetInTxn(true)
} else {
cc.getCtx().GetSessionVars().SetInTxn(false)
}
if reading {
cc.CompareAndSwapStatus(cc.getStatus(), connStatusReading)
} else {
cc.CompareAndSwapStatus(cc.getStatus(), connStatusDispatching)
}
srv.clients[dom.NextConnID()] = cc
waitTime := 100 * time.Millisecond
begin := time.Now()
srv.DrainClients(waitTime, waitTime)
if shouldWait {
require.Greater(t, time.Since(begin), waitTime)
} else {
require.Less(t, time.Since(begin), waitTime)
}
}
}
}
func TestCommitWaitGroup(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
se, err := session.CreateSession4Test(store)
require.NoError(t, err)
tc := &TiDBContext{Session: se}
cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
srv, err := NewServer(cfg, drv)
require.NoError(t, err)
srv.SetDomain(dom)
cc := &clientConn{server: srv}
cc.SetCtx(tc)
cc.CompareAndSwapStatus(cc.getStatus(), connStatusReading)
cc.getCtx().GetSessionVars().SetInTxn(false)
srv.clients[dom.NextConnID()] = cc
wg := cc.getCtx().GetCommitWaitGroup()
wg.Add(1)
go func() {
time.Sleep(100 * time.Millisecond)
wg.Done()
}()
begin := time.Now()
srv.DrainClients(time.Second, time.Second)
require.Greater(t, time.Since(begin), 100*time.Millisecond)
require.Less(t, time.Since(begin), time.Second)
}
type snapshotCache interface {
SnapCacheHitCount() int
}
func TestPrefetchPointKeys4Update(t *testing.T) {
store := testkit.CreateMockStore(t)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
}
tk := testkit.NewTestKit(t, store)
cc.SetCtx(&TiDBContext{Session: tk.Session()})
ctx := context.Background()
tk.Session().GetSessionVars().EnableClusteredIndex = vardef.ClusteredIndexDefModeIntOnly
tk.MustExec("use test")
tk.MustExec("create table prefetch (a int, b int, c int, primary key (a, b))")
tk.MustExec("insert prefetch values (1, 1, 1), (2, 2, 2), (3, 3, 3)")
tk.MustExec("begin optimistic")
tk.MustExec("update prefetch set c = c + 1 where a = 2 and b = 2")
// enable multi-statement
capabilities := cc.ctx.GetSessionVars().ClientCapability
capabilities ^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(capabilities)
query := "update prefetch set c = c + 1 where a = 1 and b = 1;" +
"update prefetch set c = c + 1 where a = 2 and b = 2;" +
"update prefetch set c = c + 1 where a = 3 and b = 3;"
err := cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
snap := txn.GetSnapshot()
//nolint:forcetypeassert
require.Equal(t, 6, snap.(snapshotCache).SnapCacheHitCount())
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 2", "2 2 4", "3 3 4"))
tk.MustExec("begin pessimistic")
tk.MustExec("update prefetch set c = c + 1 where a = 2 and b = 2")
require.Equal(t, 2, tk.Session().GetSessionVars().TxnCtx.PessimisticCacheHit)
err = cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err = tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
require.Equal(t, 6, tk.Session().GetSessionVars().TxnCtx.PessimisticCacheHit)
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5"))
}
func TestPrefetchPointKeys4Delete(t *testing.T) {
store := testkit.CreateMockStore(t)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
}
tk := testkit.NewTestKit(t, store)
cc.SetCtx(&TiDBContext{Session: tk.Session()})
ctx := context.Background()
tk.Session().GetSessionVars().EnableClusteredIndex = vardef.ClusteredIndexDefModeIntOnly
tk.MustExec("use test")
tk.MustExec("create table prefetch (a int, b int, c int, primary key (a, b))")
tk.MustExec("insert prefetch values (1, 1, 1), (2, 2, 2), (3, 3, 3)")
tk.MustExec("insert prefetch values (4, 4, 4), (5, 5, 5), (6, 6, 6)")
tk.MustExec("begin optimistic")
tk.MustExec("delete from prefetch where a = 2 and b = 2")
// enable multi-statement
capabilities := cc.ctx.GetSessionVars().ClientCapability
capabilities ^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(capabilities)
query := "delete from prefetch where a = 1 and b = 1;" +
"delete from prefetch where a = 2 and b = 2;" +
"delete from prefetch where a = 3 and b = 3;"
err := cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
snap := txn.GetSnapshot()
//nolint:forcetypeassert
require.Equal(t, 6, snap.(snapshotCache).SnapCacheHitCount())
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("4 4 4", "5 5 5", "6 6 6"))
tk.MustExec("begin pessimistic")
tk.MustExec("delete from prefetch where a = 5 and b = 5")
require.Equal(t, 2, tk.Session().GetSessionVars().TxnCtx.PessimisticCacheHit)
query = "delete from prefetch where a = 4 and b = 4;" +
"delete from prefetch where a = 5 and b = 5;" +
"delete from prefetch where a = 6 and b = 6;"
err = cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err = tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
require.Equal(t, 6, tk.Session().GetSessionVars().TxnCtx.PessimisticCacheHit)
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows())
}
func TestPrefetchBatchPointGet(t *testing.T) {
store := testkit.CreateMockStore(t)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
}
tk := testkit.NewTestKit(t, store)
cc.SetCtx(&TiDBContext{Session: tk.Session()})
ctx := context.Background()
tk.MustExec("use test")
tk.MustExec("create table prefetch (a int primary key, b int)")
tk.MustExec("insert prefetch values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)")
tk.MustExec("begin optimistic")
tk.MustExec("delete from prefetch where a = 1")
// enable multi-statement
capabilities := cc.ctx.GetSessionVars().ClientCapability
capabilities ^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(capabilities)
query := "delete from prefetch where a in (2,3);" +
"delete from prefetch where a in (4,5);"
err := cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
snap := txn.GetSnapshot()
//nolint:forcetypeassert
require.Equal(t, 4, snap.(snapshotCache).SnapCacheHitCount())
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows())
}
func TestPrefetchPartitionTable(t *testing.T) {
store := testkit.CreateMockStore(t)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
}
tk := testkit.NewTestKit(t, store)
cc.SetCtx(&TiDBContext{Session: tk.Session()})
ctx := context.Background()
tk.MustExec("use test")
tk.MustExec("create table prefetch (a int primary key, b int) partition by hash(a) partitions 4")
tk.MustExec("insert prefetch values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)")
tk.MustExec("begin optimistic")
tk.MustExec("delete from prefetch where a = 1")
// enable multi-statement
capabilities := cc.ctx.GetSessionVars().ClientCapability
capabilities ^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(capabilities)
query := "delete from prefetch where a = 2;" +
"delete from prefetch where a = 3;" +
"delete from prefetch where a in (4,5);"
err := cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
snap := txn.GetSnapshot()
//nolint:forcetypeassert
require.Equal(t, 4, snap.(snapshotCache).SnapCacheHitCount())
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows())
}
func TestTiFlashFallback(t *testing.T) {
store := testkit.CreateMockStore(t,
mockstore.WithClusterInspector(func(c testutils.Cluster) {
//nolint:forcetypeassert
mockCluster := c.(*unistore.Cluster)
_, _, region1 := mockstore.BootstrapWithSingleStore(c)
store := c.AllocID()
peer := c.AllocID()
mockCluster.AddStore(store, "tiflash0", &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
mockCluster.AddPeer(region1, store, peer)
}),
mockstore.WithStoreType(mockstore.EmbedUnistore),
)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
}
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@session.tidb_allow_tiflash_cop=ON")
cc.SetCtx(&TiDBContext{Session: tk.Session(), stmts: make(map[int]*TiDBStatement)})
tk.MustExec("drop table if exists t")
tk.MustExec("set tidb_cost_model_version=1")
tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
dml := "insert into t values"
for i := range 50 {
dml += fmt.Sprintf("(%v, 0)", i)
if i != 49 {
dml += ","
}
}
tk.MustExec(dml)
tk.MustQuery("select count(*) from t").Check(testkit.Rows("50"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/internal/mpp/ReduceCopNextMaxBackoff", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/ReduceCopNextMaxBackoff", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/internal/mpp/ReduceCopNextMaxBackoff"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/ReduceCopNextMaxBackoff"))
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"))
// test COM_STMT_EXECUTE
ctx := context.Background()
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
tk.MustExec("set @@tidb_allow_mpp=OFF")
require.NoError(t, cc.HandleStmtPrepare(ctx, "select sum(a) from t"))
require.NoError(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}))
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
// test COM_STMT_FETCH (cursor mode)
require.Error(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}))
tk.MustExec("set @@tidb_allow_fallback_to_tikv=''")
require.Error(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/BatchCopRpcErrtiflash0"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/fetchNextErr", "return(\"firstNext\")"))
// test COM_STMT_EXECUTE (cursor mode)
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
require.NoError(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/fetchNextErr"))
// test that TiDB would not retry if the first execution already sends data to client
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/fetchNextErr", "return(\"secondNext\")"))
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
require.Error(t, cc.handleQuery(ctx, "select * from t t1 join t t2 on t1.a = t2.a"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/fetchNextErr"))
// simple TiFlash query (unary + non-streaming)
tk.MustExec("set @@tidb_allow_batch_cop=0; set @@tidb_allow_mpp=0;")
require.NoError(t, failpoint.Enable("tikvclient/tikvStoreSendReqResult", "return(\"requestTiFlashError\")"))
testFallbackWork(t, tk, cc, "select sum(a) from t")
require.NoError(t, failpoint.Disable("tikvclient/tikvStoreSendReqResult"))
// TiFlash query based on batch cop (batch + streaming)
tk.MustExec("set @@tidb_allow_batch_cop=1; set @@tidb_allow_mpp=0;")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"))
testFallbackWork(t, tk, cc, "select count(*) from t")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/BatchCopRpcErrtiflash0"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/batchCopRecvTimeout", "return(true)"))
testFallbackWork(t, tk, cc, "select count(*) from t")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/batchCopRecvTimeout"))
// TiFlash MPP query (MPP + streaming)
tk.MustExec("set @@tidb_allow_batch_cop=0; set @@tidb_allow_mpp=1;")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/mppDispatchTimeout", "return(true)"))
testFallbackWork(t, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/mppDispatchTimeout"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/mppRecvTimeout", "return(-1)"))
testFallbackWork(t, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/mppRecvTimeout"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/establishMppConnectionErr", "return(true)"))
testFallbackWork(t, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/establishMppConnectionErr"))
// When fallback is not set, TiFlash mpp will return the original error message
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/mppDispatchTimeout", "return(true)"))
tk.MustExec("set @@tidb_allow_fallback_to_tikv=''")
tk.MustExec("set @@tidb_allow_mpp=ON")
tk.MustExec("set @@tidb_enforce_mpp=ON")
tk.MustExec("set @@tidb_isolation_read_engines='tiflash,tidb'")
err = cc.handleQuery(ctx, "select count(*) from t")
require.Error(t, err)
require.NotEqual(t, err.Error(), tikverr.ErrTiFlashServerTimeout.Error())
}
func testFallbackWork(t *testing.T, tk *testkit.TestKit, cc *clientConn, sql string) {
ctx := context.Background()
tk.MustExec("set @@tidb_allow_fallback_to_tikv=''")
require.Error(t, tk.QueryToErr(sql))
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
require.NoError(t, cc.handleQuery(ctx, sql))
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
}
// For issue https://github.com/pingcap/tidb/issues/25069
func TestShowErrors(t *testing.T) {
store := testkit.CreateMockStore(t)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
}
ctx := context.Background()
tk := testkit.NewTestKit(t, store)
cc.SetCtx(&TiDBContext{Session: tk.Session(), stmts: make(map[int]*TiDBStatement)})
err := cc.handleQuery(ctx, "create database if not exists test;")
require.NoError(t, err)
err = cc.handleQuery(ctx, "use test;")
require.NoError(t, err)
stmts, err := cc.ctx.Parse(ctx, "drop table idontexist")
require.NoError(t, err)
_, err = cc.ctx.ExecuteStmt(ctx, stmts[0])
require.Error(t, err)
tk.MustQuery("show errors").Check(testkit.Rows("Error 1051 Unknown table 'test.idontexist'"))
}
func TestHandleAuthPlugin(t *testing.T) {
store := testkit.CreateMockStore(t)
cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
srv, err := NewServer(cfg, drv)
require.NoError(t, err)
ctx := context.Background()
tk := testkit.NewTestKit(t, store)
tk.MustExec("CREATE USER unativepassword")
defer func() {
tk.MustExec("DROP USER unativepassword")
}()
// 5.7 or newer client trying to authenticate with mysql_native_password
cc := &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp := handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthNativePassword,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
// 8.0 or newer client trying to authenticate with caching_sha2_password
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthCachingSha2Password,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthNativePassword), resp.Auth)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
// client trying to authenticate with tidb_sm3_password
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthTiDBSM3Password,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthNativePassword), resp.Auth)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
// MySQL 5.1 or older client, without authplugin support
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
// === Target account has mysql_native_password ===
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeUser", "return(\"mysql_native_password\")"))
// 5.7 or newer client trying to authenticate with mysql_native_password
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthNativePassword,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthNativePassword), resp.Auth)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
// 8.0 or newer client trying to authenticate with caching_sha2_password
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthCachingSha2Password,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthNativePassword), resp.Auth)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
// client trying to authenticate with tidb_sm3_password
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthTiDBSM3Password,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthNativePassword), resp.Auth)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
// MySQL 5.1 or older client, without authplugin support
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeUser"))
// === Target account has caching_sha2_password ===
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeUser", "return(\"caching_sha2_password\")"))
// 5.7 or newer client trying to authenticate with mysql_native_password
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthNativePassword,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthCachingSha2Password), resp.Auth)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
// 8.0 or newer client trying to authenticate with caching_sha2_password
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthCachingSha2Password,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthCachingSha2Password), resp.Auth)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
// client trying to authenticate with tidb_sm3_password
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthTiDBSM3Password,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthCachingSha2Password), resp.Auth)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
// MySQL 5.1 or older client, without authplugin support
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.Error(t, err)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeUser"))
// === Target account has tidb_sm3_password ===
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeUser", "return(\"tidb_sm3_password\")"))
// 5.7 or newer client trying to authenticate with mysql_native_password
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthNativePassword,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthTiDBSM3Password), resp.Auth)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
// 8.0 or newer client trying to authenticate with caching_sha2_password
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthCachingSha2Password,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthTiDBSM3Password), resp.Auth)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
// client trying to authenticate with tidb_sm3_password
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthTiDBSM3Password,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthTiDBSM3Password), resp.Auth)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
// MySQL 5.1 or older client, without authplugin support
cc = &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "unativepassword",
}
resp = handshake.Response41{
Capability: mysql.ClientProtocol41,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.Error(t, err)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeUser"))
}
func TestChangeUserAuth(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("create user user1")
cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
srv, err := NewServer(cfg, drv)
require.NoError(t, err)
cc := &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
peerHost: "localhost",
collation: mysql.DefaultCollationID,
capability: defaultCapability,
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "root",
}
ctx := context.Background()
se, _ := session.CreateSession4Test(store)
tc := &TiDBContext{
Session: se,
stmts: make(map[int]*TiDBStatement),
}
cc.SetCtx(tc)
data := []byte{}
data = append(data, "user1"...)
data = append(data, 0)
data = append(data, 1)
data = append(data, 0)
data = append(data, "test"...)
data = append(data, 0)
data = append(data, 0, 0)
data = append(data, "unknown"...)
data = append(data, 0)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/ChangeUserAuthSwitch", fmt.Sprintf("return(\"%s\")", t.Name())))
err = cc.handleChangeUser(ctx, data)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/ChangeUserAuthSwitch"))
require.EqualError(t, err, t.Name())
}
func TestAuthPlugin2(t *testing.T) {
store := testkit.CreateMockStore(t)
cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
srv, err := NewServer(cfg, drv)
require.NoError(t, err)
cc := &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "root",
}
ctx := context.Background()
se, _ := session.CreateSession4Test(store)
tc := &TiDBContext{
Session: se,
stmts: make(map[int]*TiDBStatement),
}
cc.SetCtx(tc)
resp := handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
}
cc.isUnixSocket = true
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
respAuthSwitch, err := cc.checkAuthPlugin(ctx, &resp)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
require.Equal(t, []byte(mysql.AuthNativePassword), respAuthSwitch)
require.NoError(t, err)
}
func TestAuthSessionTokenPlugin(t *testing.T) {
// create the cert
tempDir := t.TempDir()
certPath := filepath.Join(tempDir, "test1_cert.pem")
keyPath := filepath.Join(tempDir, "test1_key.pem")
err := util.CreateCertificates(certPath, keyPath, 1024, x509.RSA, x509.UnknownSignatureAlgorithm)
require.NoError(t, err)
cfg := config.GetGlobalConfig()
cfg.Security.SessionTokenSigningCert = certPath
cfg.Security.SessionTokenSigningKey = keyPath
cfg.Port = 0
cfg.Status.StatusPort = 0
// The global config is read during creating the store.
store := testkit.CreateMockStore(t)
drv := NewTiDBDriver(store)
srv, err := NewServer(cfg, drv)
require.NoError(t, err)
ctx := context.Background()
tk := testkit.NewTestKit(t, store)
tk.MustExec("CREATE USER auth_session_token")
tk.MustExec("CREATE USER another_user")
tc, err := drv.OpenCtx(uint64(0), 0, uint8(mysql.DefaultCollationID), "", nil, nil)
require.NoError(t, err)
cc := &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "auth_session_token",
}
cc.SetCtx(tc)
// create a token without TLS
tk1 := testkit.NewTestKitWithSession(t, store, tc.Session)
tc.Session.GetSessionVars().ConnectionInfo = cc.connectInfo()
tk1.Session().Auth(&auth.UserIdentity{Username: "auth_session_token", Hostname: "localhost"}, nil, nil, nil)
tk1.MustQuery("show session_states")
// create a token with TLS
cc.tlsConn = tls.Client(nil, &tls.Config{})
tc.Session.GetSessionVars().ConnectionInfo = cc.connectInfo()
tk1.Session().Auth(&auth.UserIdentity{Username: "auth_session_token", Hostname: "localhost"}, nil, nil, nil)
tk1.MustQuery("show session_states")
// create a token with UnixSocket
cc.tlsConn = nil
cc.isUnixSocket = true
tc.Session.GetSessionVars().ConnectionInfo = cc.connectInfo()
rows := tk1.MustQuery("show session_states").Rows()
//nolint:forcetypeassert
tokenBytes := []byte(rows[0][1].(string))
// auth with the token
resp := handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthTiDBSessionToken,
Auth: tokenBytes,
}
err = cc.handleAuthPlugin(ctx, &resp)
require.NoError(t, err)
err = cc.openSessionAndDoAuth(resp.Auth, resp.AuthPlugin, resp.ZstdLevel)
require.NoError(t, err)
// login succeeds even if the password expires now
tk.MustExec("ALTER USER auth_session_token PASSWORD EXPIRE")
err = cc.openSessionAndDoAuth([]byte{}, mysql.AuthNativePassword, 0)
require.ErrorContains(t, err, "Your password has expired")
err = cc.openSessionAndDoAuth(resp.Auth, resp.AuthPlugin, resp.ZstdLevel)
require.NoError(t, err)
// wrong token should fail
tokenBytes[0] ^= 0xff
err = cc.openSessionAndDoAuth(resp.Auth, resp.AuthPlugin, resp.ZstdLevel)
require.ErrorContains(t, err, "Access denied")
tokenBytes[0] ^= 0xff
// using the token to auth with another user should fail
cc.user = "another_user"
err = cc.openSessionAndDoAuth(resp.Auth, resp.AuthPlugin, resp.ZstdLevel)
require.ErrorContains(t, err, "Access denied")
}
func TestMaxAllowedPacket(t *testing.T) {
// Test cases from issue 31422: https://github.com/pingcap/tidb/issues/31422
// The string "SELECT length('') as len;" has 25 chars,
// so if the string inside '' has a length of 999, the total query reaches the max allowed packet size.
const maxAllowedPacket = 1024
var (
inBuffer bytes.Buffer
readBytes []byte
)
// The length of total payload is (25 + 999 = 1024).
bytes := append([]byte{0x00, 0x04, 0x00, 0x00}, fmt.Appendf(nil, "SELECT length('%s') as len;", strings.Repeat("a", 999))...)
_, err := inBuffer.Write(bytes)
require.NoError(t, err)
brc := serverutil.NewBufferedReadConn(&testutil.BytesConn{Buffer: inBuffer})
pkt := internal.NewPacketIO(brc)
pkt.SetMaxAllowedPacket(maxAllowedPacket)
readBytes, err = pkt.ReadPacket()
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("SELECT length('%s') as len;", strings.Repeat("a", 999)), string(readBytes))
require.Equal(t, uint8(1), pkt.Sequence())
// The length of total payload is (25 + 1000 = 1025).
inBuffer.Reset()
bytes = append([]byte{0x01, 0x04, 0x00, 0x00}, fmt.Appendf(nil, "SELECT length('%s') as len;", strings.Repeat("a", 1000))...)
_, err = inBuffer.Write(bytes)
require.NoError(t, err)
brc = serverutil.NewBufferedReadConn(&testutil.BytesConn{Buffer: inBuffer})
pkt = internal.NewPacketIO(brc)
pkt.SetMaxAllowedPacket(maxAllowedPacket)
_, err = pkt.ReadPacket()
require.Error(t, err)
// The length of total payload is (25 + 488 = 513).
// Two separate packets would NOT exceed the limitation of maxAllowedPacket.
inBuffer.Reset()
bytes = append([]byte{0x01, 0x02, 0x00, 0x00}, fmt.Appendf(nil, "SELECT length('%s') as len;", strings.Repeat("a", 488))...)
_, err = inBuffer.Write(bytes)
require.NoError(t, err)
brc = serverutil.NewBufferedReadConn(&testutil.BytesConn{Buffer: inBuffer})
pkt = internal.NewPacketIO(brc)
pkt.SetMaxAllowedPacket(maxAllowedPacket)
readBytes, err = pkt.ReadPacket()
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("SELECT length('%s') as len;", strings.Repeat("a", 488)), string(readBytes))
require.Equal(t, uint8(1), pkt.Sequence())
inBuffer.Reset()
bytes = append([]byte{0x01, 0x02, 0x00, 0x01}, fmt.Appendf(nil, "SELECT length('%s') as len;", strings.Repeat("b", 488))...)
_, err = inBuffer.Write(bytes)
require.NoError(t, err)
brc = serverutil.NewBufferedReadConn(&testutil.BytesConn{Buffer: inBuffer})
pkt.SetBufferedReadConn(brc)
readBytes, err = pkt.ReadPacket()
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("SELECT length('%s') as len;", strings.Repeat("b", 488)), string(readBytes))
require.Equal(t, uint8(2), pkt.Sequence())
}
func TestOkEof(t *testing.T) {
store := testkit.CreateMockStore(t)
var outBuffer bytes.Buffer
tidbdrv := NewTiDBDriver(store)
cfg := serverutil.NewTestConfig()
cfg.Port, cfg.Status.StatusPort = 0, 0
cfg.Status.ReportStatus = false
server, err := NewServer(cfg, tidbdrv)
require.NoError(t, err)
defer server.Close()
cc := &clientConn{
connectionID: 1,
salt: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14},
server: server,
pkt: internal.NewPacketIOForTest(bufio.NewWriter(&outBuffer)),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
alloc: arena.NewAllocator(512),
chunkAlloc: chunk.NewAllocator(),
capability: mysql.ClientProtocol41 | mysql.ClientDeprecateEOF,
}
tk := testkit.NewTestKit(t, store)
ctx := &TiDBContext{Session: tk.Session()}
cc.SetCtx(ctx)
err = cc.writeOK(context.Background())
require.NoError(t, err)
require.Equal(t, []byte{0x7, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0}, outBuffer.Bytes())
outBuffer.Reset()
err = cc.writeEOF(context.Background(), cc.ctx.Status())
require.NoError(t, err)
err = cc.flush(context.TODO())
require.NoError(t, err)
require.Equal(t, mysql.EOFHeader, outBuffer.Bytes()[4])
require.Equal(t, []byte{0x7, 0x0, 0x0, 0x1, 0xfe, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0}, outBuffer.Bytes())
}
func TestExtensionChangeUser(t *testing.T) {
defer extension.Reset()
extension.Reset()
logged := false
var logTp extension.ConnEventTp
var logInfo *extension.ConnEventInfo
require.NoError(t, extension.Register("test", extension.WithSessionHandlerFactory(func() *extension.SessionHandler {
return &extension.SessionHandler{
OnConnectionEvent: func(tp extension.ConnEventTp, info *extension.ConnEventInfo) {
require.False(t, logged)
logTp = tp
logInfo = info
logged = true
},
}
})))
extensions, err := extension.GetExtensions()
require.NoError(t, err)
store := testkit.CreateMockStore(t)
var outBuffer bytes.Buffer
tidbdrv := NewTiDBDriver(store)
cfg := serverutil.NewTestConfig()
cfg.Port, cfg.Status.StatusPort = 0, 0
cfg.Status.ReportStatus = false
server, err := NewServer(cfg, tidbdrv)
require.NoError(t, err)
defer server.Close()
cc := &clientConn{
connectionID: 1,
salt: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14},
server: server,
pkt: internal.NewPacketIOForTest(bufio.NewWriter(&outBuffer)),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
alloc: arena.NewAllocator(512),
chunkAlloc: chunk.NewAllocator(),
capability: mysql.ClientProtocol41,
extensions: extensions.NewSessionExtensions(),
}
tk := testkit.NewTestKit(t, store)
ctx := &TiDBContext{Session: tk.Session()}
cc.SetCtx(ctx)
tk.MustExec("create user user1")
tk.MustExec("create user user2")
tk.MustExec("create database db1")
tk.MustExec("create database db2")
tk.MustExec("grant select on db1.* to user1@'%'")
tk.MustExec("grant select on db2.* to user2@'%'")
// change user.
doDispatch := func(req dispatchInput) {
inBytes := append([]byte{req.com}, req.in...)
err = cc.dispatch(context.Background(), inBytes)
require.Equal(t, req.err, err)
if err == nil {
err = cc.flush(context.TODO())
require.NoError(t, err)
require.Equal(t, req.out, outBuffer.Bytes())
} else {
_ = cc.flush(context.TODO())
}
outBuffer.Reset()
}
expectedConnInfo := extension.ConnEventInfo{
ConnectionInfo: cc.connectInfo(),
ActiveRoles: []*auth.RoleIdentity{},
}
expectedConnInfo.User = "user1"
expectedConnInfo.DB = "db1"
require.False(t, logged)
userData := append([]byte("user1"), 0x0, 0x0)
userData = append(userData, []byte("db1")...)
userData = append(userData, 0x0)
doDispatch(dispatchInput{
com: mysql.ComChangeUser,
in: userData,
err: nil,
out: []byte{0x7, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
})
require.True(t, logged)
require.Equal(t, extension.ConnReset, logTp)
require.Equal(t, expectedConnInfo.ActiveRoles, logInfo.ActiveRoles)
require.Equal(t, expectedConnInfo.Error, logInfo.Error)
require.Equal(t, *(expectedConnInfo.ConnectionInfo), *(logInfo.ConnectionInfo))
logged = false
logTp = 0
logInfo = nil
expectedConnInfo.User = "user2"
expectedConnInfo.DB = "db2"
userData = append([]byte("user2"), 0x0, 0x0)
userData = append(userData, []byte("db2")...)
userData = append(userData, 0x0)
doDispatch(dispatchInput{
com: mysql.ComChangeUser,
in: userData,
err: nil,
out: []byte{0x7, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
})
require.True(t, logged)
require.Equal(t, extension.ConnReset, logTp)
require.Equal(t, expectedConnInfo.ActiveRoles, logInfo.ActiveRoles)
require.Equal(t, expectedConnInfo.Error, logInfo.Error)
require.Equal(t, *(expectedConnInfo.ConnectionInfo), *(logInfo.ConnectionInfo))
logged = false
logTp = 0
logInfo = nil
doDispatch(dispatchInput{
com: mysql.ComResetConnection,
in: nil,
err: nil,
out: []byte{0x7, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0},
})
require.True(t, logged)
require.Equal(t, extension.ConnReset, logTp)
require.Equal(t, expectedConnInfo.ActiveRoles, logInfo.ActiveRoles)
require.Equal(t, expectedConnInfo.Error, logInfo.Error)
require.Equal(t, *(expectedConnInfo.ConnectionInfo), *(logInfo.ConnectionInfo))
}
func TestAuthSha(t *testing.T) {
store := testkit.CreateMockStore(t)
var outBuffer bytes.Buffer
tidbdrv := NewTiDBDriver(store)
cfg := serverutil.NewTestConfig()
cfg.Port, cfg.Status.StatusPort = 0, 0
cfg.Status.ReportStatus = false
server, err := NewServer(cfg, tidbdrv)
require.NoError(t, err)
defer server.Close()
cc := &clientConn{
connectionID: 1,
salt: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14},
server: server,
pkt: internal.NewPacketIOForTest(bufio.NewWriter(&outBuffer)),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
alloc: arena.NewAllocator(512),
chunkAlloc: chunk.NewAllocator(),
capability: mysql.ClientProtocol41,
}
tk := testkit.NewTestKit(t, store)
ctx := &TiDBContext{Session: tk.Session()}
cc.SetCtx(ctx)
resp := handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthCachingSha2Password,
Auth: []byte{}, // No password
}
authData, err := cc.authSha(context.Background(), resp)
require.NoError(t, err)
// If no password is specified authSha() should return an empty byte slice
// which differs from when a password is specified as that should trigger
// fastAuthFail and the rest of the auth process.
require.Equal(t, authData, []byte{})
}
func TestProcessInfoForExecuteCommand(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
}
ctx := context.Background()
tk.MustExec("use test")
cc.SetCtx(&TiDBContext{Session: tk.Session(), stmts: make(map[int]*TiDBStatement)})
tk.MustExec("create table t (col1 int)")
// simple prepare and execute
require.NoError(t, cc.HandleStmtPrepare(ctx, "select sum(col1) from t"))
require.NoError(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}))
require.Equal(t, cc.ctx.Session.ShowProcess().Info, "select sum(col1) from t")
// prepare and execute with params
require.NoError(t, cc.HandleStmtPrepare(ctx, "select sum(col1) from t where col1 < ? and col1 > 100"))
// 1 params, length of nullBitMap is 1, `0x8, 0x0` represents the type, and the following `0x10, 0x0....` is the param
// 10
require.NoError(t, cc.handleStmtExecute(ctx, []byte{0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0,
0x1, 0x8, 0x0,
0x0A, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}))
require.Equal(t, cc.ctx.Session.ShowProcess().Info, "select sum(col1) from t where col1 < ? and col1 > 100")
}
func TestLDAPAuthSwitch(t *testing.T) {
store := testkit.CreateMockStore(t)
cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
srv, err := NewServer(cfg, drv)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("CREATE USER test_simple_ldap IDENTIFIED WITH authentication_ldap_simple AS 'uid=test_simple_ldap,dc=example,dc=com'")
cc := &clientConn{
connectionID: 1,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
server: srv,
user: "test_simple_ldap",
}
se, _ := session.CreateSession4Test(store)
tc := &TiDBContext{
Session: se,
stmts: make(map[int]*TiDBStatement),
}
cc.SetCtx(tc)
cc.isUnixSocket = true
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch", "return(1)"))
respAuthSwitch, err := cc.checkAuthPlugin(context.Background(), &handshake.Response41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
User: "test_simple_ldap",
})
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeAuthSwitch"))
require.NoError(t, err)
require.Equal(t, []byte(mysql.AuthMySQLClearPassword), respAuthSwitch)
}
func TestEmptyOrgName(t *testing.T) {
inputs := []dispatchInput{
{
com: mysql.ComQuery,
in: append([]byte("SELECT DATE_FORMAT(CONCAT('2023-07-0', a), '%Y') AS 'YEAR' FROM test.t"), 0x0),
err: nil,
out: []byte{0x1, 0x0, 0x0, 0x0, 0x1, // 1 column
0x1a, 0x0, 0x0,
0x1, 0x3, 0x64, 0x65, 0x66, // catalog
0x0, // schema
0x0, // table name
0x0, // org table
0x4, 0x59, 0x45, 0x41, 0x52, // name 'YEAR'
0x0, // org name
0xc, 0x2e, 0x0, 0x2c, 0x0, 0x0, 0x0, 0xfd, 0x0, 0x0, 0x1f, 0x0, 0x0, 0x1, 0x0, 0x0, 0x2, 0xfe, 0x5, 0x0,
0x0, 0x3, 0x4, 0x32, 0x30, 0x32, 0x33, 0x1, 0x0, 0x0, 0x4, 0xfe},
},
}
testDispatch(t, inputs, 0)
}
func TestStats(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
stats := &compressionStats{}
// No compression
vars := tk.Session().GetSessionVars()
m, err := stats.Stats(vars)
require.NoError(t, err)
require.Equal(t, "OFF", m["Compression"])
require.Equal(t, "", m["Compression_algorithm"])
require.Equal(t, 0, m["Compression_level"])
// zlib compression
vars.CompressionAlgorithm = mysql.CompressionZlib
m, err = stats.Stats(vars)
require.NoError(t, err)
require.Equal(t, "ON", m["Compression"])
require.Equal(t, "zlib", m["Compression_algorithm"])
require.Equal(t, mysql.ZlibCompressDefaultLevel, m["Compression_level"])
// zstd compression, with level 1
vars.CompressionAlgorithm = mysql.CompressionZstd
vars.CompressionLevel = 1
m, err = stats.Stats(vars)
require.NoError(t, err)
require.Equal(t, "ON", m["Compression"])
require.Equal(t, "zstd", m["Compression_algorithm"])
require.Equal(t, 1, m["Compression_level"])
}
func TestCloseConn(t *testing.T) {
var outBuffer bytes.Buffer
store, _ := testkit.CreateMockStoreAndDomain(t)
cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
server, err := NewServer(cfg, drv)
require.NoError(t, err)
cc := &clientConn{
connectionID: 0,
salt: []byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A,
0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14,
},
server: server,
pkt: internal.NewPacketIOForTest(bufio.NewWriter(&outBuffer)),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
alloc: arena.NewAllocator(512),
chunkAlloc: chunk.NewAllocator(),
capability: mysql.ClientProtocol41,
}
var wg sync.WaitGroup
const numGoroutines = 10
wg.Add(numGoroutines)
for range numGoroutines {
go func() {
defer wg.Done()
err := closeConn(cc)
require.NoError(t, err)
}()
}
wg.Wait()
}
func TestConnAddMetrics(t *testing.T) {
store := testkit.CreateMockStore(t)
re := require.New(t)
tk := testkit.NewTestKit(t, store)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: internal.NewPacketIOForTest(bufio.NewWriter(bytes.NewBuffer(nil))),
}
tk.MustExec("use test")
cc.SetCtx(&TiDBContext{Session: tk.Session(), stmts: make(map[int]*TiDBStatement)})
// default
cc.addQueryMetrics(mysql.ComQuery, time.Now(), nil)
counter := metrics.QueryTotalCounter
v := promtestutils.ToFloat64(counter.WithLabelValues("Query", "OK", "default"))
require.Equal(t, 1.0, v)
// rg1
cc.getCtx().GetSessionVars().ResourceGroupName = "test_rg1"
cc.addQueryMetrics(mysql.ComQuery, time.Now(), nil)
re.Equal(promtestutils.ToFloat64(counter.WithLabelValues("Query", "OK", "default")), 1.0)
re.Equal(promtestutils.ToFloat64(counter.WithLabelValues("Query", "OK", "test_rg1")), 1.0)
/// inc the counter again
cc.addQueryMetrics(mysql.ComQuery, time.Now(), nil)
re.Equal(promtestutils.ToFloat64(counter.WithLabelValues("Query", "OK", "test_rg1")), 2.0)
// rg2
cc.getCtx().GetSessionVars().ResourceGroupName = "test_rg2"
// error
cc.addQueryMetrics(mysql.ComQuery, time.Now(), errors.New("unknown error"))
re.Equal(promtestutils.ToFloat64(counter.WithLabelValues("Query", "Error", "test_rg2")), 1.0)
// ok
cc.addQueryMetrics(mysql.ComStmtExecute, time.Now(), nil)
re.Equal(promtestutils.ToFloat64(counter.WithLabelValues("StmtExecute", "OK", "test_rg2")), 1.0)
}
func TestIssue54335(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
// There is no underlying netCon, use failpoint to avoid panic
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeClientConn", "return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeClientConn"))
}()
tk := testkit.NewTestKit(t, store)
connID := uint64(1)
tk.Session().SetConnectionID(connID)
tc := &TiDBContext{
Session: tk.Session(),
stmts: make(map[int]*TiDBStatement),
}
cc := &clientConn{
connectionID: connID,
server: &Server{
capability: defaultCapability,
},
alloc: arena.NewAllocator(32 * 1024),
chunkAlloc: chunk.NewAllocator(),
}
cc.SetCtx(tc)
srv := &Server{
clients: map[uint64]*clientConn{
connID: cc,
},
dom: dom,
}
handle := dom.ExpensiveQueryHandle().SetSessionManager(srv)
go handle.Run()
tk.MustExec("use test;")
tk.MustExec("CREATE TABLE testTable2 (id bigint, age int)")
str := fmt.Sprintf("insert into testTable2 values(%d, %d)", 1, 1)
tk.MustExec(str)
for range 14 {
tk.MustExec("insert into testTable2 select * from testTable2")
}
times := 100
for ; times > 0; times-- {
// Test with -race
_ = cc.handleQuery(context.Background(), "select /*+ MAX_EXECUTION_TIME(1)*/ * FROM testTable2;")
}
}