996 lines
36 KiB
Go
996 lines
36 KiB
Go
// Copyright 2021 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package sessiontxn_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/config"
|
|
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
|
"github.com/pingcap/tidb/pkg/domain"
|
|
"github.com/pingcap/tidb/pkg/errno"
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessiontxn"
|
|
"github.com/pingcap/tidb/pkg/testkit"
|
|
"github.com/pingcap/tidb/pkg/testkit/testfork"
|
|
"github.com/pingcap/tidb/pkg/testkit/testsetup"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/goleak"
|
|
)
|
|
|
|
func TestMain(m *testing.M) {
|
|
testsetup.SetupForCommonTest()
|
|
opts := []goleak.Option{
|
|
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
|
|
goleak.IgnoreTopFunction("github.com/bazelbuild/rules_go/go/tools/bzltestutil.RegisterTimeoutHandler.func1"),
|
|
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
|
|
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
|
|
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
|
|
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
|
|
}
|
|
goleak.VerifyTestMain(m, opts...)
|
|
}
|
|
|
|
func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain) {
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/assertTxnManagerInCompile", "return"))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/assertTxnManagerInRebuildPlan", "return"))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/assertTxnManagerAfterBuildExecutor", "return"))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/assertTxnManagerAfterPessimisticLockErrorRetry", "return"))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/assertTxnManagerInShortPointGetPlan", "return"))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/session/assertTxnManagerInRunStmt", "return"))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/session/assertTxnManagerInCachedPlanExec", "return"))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/session/assertTxnManagerForUpdateTSEqual", "return"))
|
|
|
|
store, do := testkit.CreateMockStoreAndDomain(t)
|
|
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.Session().SetValue(sessiontxn.AssertRecordsKey, nil)
|
|
tk.Session().SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
|
|
tk.MustExec("use test")
|
|
tk.MustExec("drop table if exists t1,t2")
|
|
|
|
tk.MustExec("create table t1 (id int primary key, v int)")
|
|
tk.MustExec("insert into t1 values(1, 10)")
|
|
|
|
tk.MustExec("create table t2 (id int)")
|
|
|
|
tk.MustExec("create temporary table tmp (id int)")
|
|
tk.MustExec("insert into tmp values(10)")
|
|
|
|
t.Cleanup(func() {
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/assertTxnManagerInCompile"))
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/assertTxnManagerInRebuildPlan"))
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/assertTxnManagerAfterBuildExecutor"))
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/assertTxnManagerAfterPessimisticLockErrorRetry"))
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/assertTxnManagerInShortPointGetPlan"))
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/session/assertTxnManagerInRunStmt"))
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/session/assertTxnManagerInCachedPlanExec"))
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/session/assertTxnManagerForUpdateTSEqual"))
|
|
|
|
tk.Session().SetValue(sessiontxn.AssertRecordsKey, nil)
|
|
tk.Session().SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.Session().SetValue(sessiontxn.AssertTxnInfoSchemaAfterRetryKey, nil)
|
|
})
|
|
return store, do
|
|
}
|
|
|
|
func checkAssertRecordExits(t *testing.T, se sessionctx.Context, name string) {
|
|
records, ok := se.Value(sessiontxn.AssertRecordsKey).(map[string]any)
|
|
require.True(t, ok, fmt.Sprintf("'%s' not in record, maybe failpoint not enabled?", name))
|
|
_, ok = records[name]
|
|
require.True(t, ok, fmt.Sprintf("'%s' not in record", name))
|
|
}
|
|
|
|
func doWithCheckPath(t *testing.T, se sessionctx.Context, names []string, do func()) {
|
|
se.SetValue(sessiontxn.AssertRecordsKey, nil)
|
|
do()
|
|
for _, name := range names {
|
|
checkAssertRecordExits(t, se, name)
|
|
}
|
|
}
|
|
|
|
var normalPathRecords = []string{
|
|
"assertTxnManagerInCompile",
|
|
"assertTxnManagerInRunStmt",
|
|
"assertTxnManagerAfterBuildExecutor",
|
|
}
|
|
|
|
func TestTxnContextForSimpleCases(t *testing.T) {
|
|
store, do := setupTxnContextTest(t)
|
|
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
se := tk.Session()
|
|
|
|
tk2 := testkit.NewTestKit(t, store)
|
|
tk2.MustExec("use test")
|
|
|
|
is1 := do.InfoSchema()
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
// test for write
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustExec("insert into t2 (id) values(3)")
|
|
})
|
|
// test for select
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
// test for select for update
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
tk2.MustExec("alter table t2 add column(c1 int)")
|
|
is2 := do.InfoSchema()
|
|
require.True(t, is2.SchemaMetaVersion() > is1.SchemaMetaVersion())
|
|
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
}
|
|
|
|
func TestTxnContextInExplicitTxn(t *testing.T) {
|
|
if kerneltype.IsNextGen() {
|
|
t.Skip("MDL is always enabled and read only in nextgen")
|
|
}
|
|
store, do := setupTxnContextTest(t)
|
|
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("set global tidb_enable_metadata_lock=0")
|
|
tk.MustExec("use test")
|
|
se := tk.Session()
|
|
|
|
tk2 := testkit.NewTestKit(t, store)
|
|
tk2.MustExec("use test")
|
|
|
|
is1 := do.InfoSchema()
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
|
|
tk.MustExec("begin")
|
|
// test for write
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustExec("insert into t2 (id) values(2)")
|
|
})
|
|
// test for select
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
// test for select for update
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
// info schema changed when txn not finish, the info schema in old txn should not change
|
|
tk2.MustExec("alter table t2 add column(c1 int)")
|
|
is2 := do.InfoSchema()
|
|
require.True(t, is2.SchemaMetaVersion() > is1.SchemaMetaVersion())
|
|
|
|
// test for write
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustExec("insert into t2 (id) values(2)")
|
|
})
|
|
// test for select
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
// test for select for update
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustGetErrCode("commit", errno.ErrInfoSchemaChanged)
|
|
})
|
|
|
|
// the info schema in new txn should use the newest one
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2)
|
|
tk.MustExec("begin")
|
|
// test for write
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustExec("insert into t2 (id) values(2)")
|
|
})
|
|
// test for select
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
// test for select for update
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10"))
|
|
})
|
|
}
|
|
|
|
func TestTxnContextBeginInUnfinishedTxn(t *testing.T) {
|
|
store, do := setupTxnContextTest(t)
|
|
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
se := tk.Session()
|
|
|
|
tk2 := testkit.NewTestKit(t, store)
|
|
tk2.MustExec("use test")
|
|
|
|
is1 := do.InfoSchema()
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
tk.MustExec("begin")
|
|
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
tk2.MustExec("alter table t2 add column(c1 int)")
|
|
is2 := do.InfoSchema()
|
|
require.True(t, is2.SchemaMetaVersion() > is1.SchemaMetaVersion())
|
|
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
tk.MustExec("begin")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
tk.MustExec("rollback")
|
|
}
|
|
|
|
func TestTxnContextWithAutocommitFalse(t *testing.T) {
|
|
if kerneltype.IsNextGen() {
|
|
t.Skip("MDL is always enabled and read only in nextgen")
|
|
}
|
|
store, do := setupTxnContextTest(t)
|
|
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
tk.MustExec("set global tidb_enable_metadata_lock=0")
|
|
se := tk.Session()
|
|
|
|
tk2 := testkit.NewTestKit(t, store)
|
|
tk2.MustExec("use test")
|
|
|
|
is1 := do.InfoSchema()
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
tk.MustExec("begin")
|
|
|
|
tk.MustExec("set autocommit=0")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema())
|
|
// test for write
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustExec("insert into t2 (id) values(2)")
|
|
})
|
|
|
|
// schema change should not affect because it is in txn
|
|
tk2.MustExec("alter table t2 add column(c1 int)")
|
|
|
|
// test for select
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
// test for select for update
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10"))
|
|
})
|
|
tk.MustExec("rollback")
|
|
}
|
|
|
|
func TestTxnContextInRC(t *testing.T) {
|
|
store, do := setupTxnContextTest(t)
|
|
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
se := tk.Session()
|
|
|
|
tk2 := testkit.NewTestKit(t, store)
|
|
tk2.MustExec("use test")
|
|
|
|
is1 := do.InfoSchema()
|
|
tk.MustExec("set tx_isolation = 'READ-COMMITTED'")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
tk.MustExec("begin pessimistic")
|
|
|
|
// schema change should not affect even in rc isolation
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk2.MustExec("alter table t2 add column(c1 int)")
|
|
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
// test for write
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustExec("insert into t2 (id) values(2)")
|
|
})
|
|
|
|
// test for select
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
tk2.MustExec("update t1 set v=11 where id=1")
|
|
|
|
// test for select
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 11"))
|
|
})
|
|
|
|
// test for select for update
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 11"))
|
|
})
|
|
|
|
tk.MustExec("rollback")
|
|
}
|
|
|
|
func TestTxnContextInPessimisticKeyConflict(t *testing.T) {
|
|
store, do := setupTxnContextTest(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
se := tk.Session()
|
|
is1 := do.InfoSchema()
|
|
|
|
tk.MustExec("begin pessimistic")
|
|
|
|
// trigger retry
|
|
tk2 := testkit.NewTestKit(t, store)
|
|
tk2.MustExec("use test")
|
|
tk2.MustExec("update t1 set v=11 where id=1")
|
|
tk2.MustExec("alter table t2 add column(c1 int)")
|
|
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
path := append([]string{"assertTxnManagerAfterPessimisticLockErrorRetry"}, normalPathRecords...)
|
|
doWithCheckPath(t, se, path, func() {
|
|
tk.MustExec("update t1 set v=12 where id=1")
|
|
})
|
|
|
|
tk.MustExec("rollback")
|
|
}
|
|
|
|
func TestTxnContextForHistoricalRead(t *testing.T) {
|
|
store, do := setupTxnContextTest(t)
|
|
setTxnTk := testkit.NewTestKit(t, store)
|
|
setTxnTk.MustExec("set global tidb_txn_mode=''")
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
se := tk.Session()
|
|
|
|
safePoint := "20160102-15:04:05 -0700"
|
|
tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%s', '') ON DUPLICATE KEY UPDATE variable_value = '%s', comment=''`, safePoint, safePoint))
|
|
|
|
is1 := do.InfoSchema()
|
|
tk.MustExec("do sleep(0.1)")
|
|
tk.MustExec("set @a=now(6)")
|
|
// change schema
|
|
tk.MustExec("alter table t2 add column(c1 int)")
|
|
tk.MustExec("update t1 set v=11 where id=1")
|
|
|
|
tk.MustExec("set @@tidb_snapshot=@a")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("set @@tidb_snapshot=''")
|
|
tk.MustExec("begin")
|
|
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema())
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 11"))
|
|
})
|
|
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 11"))
|
|
})
|
|
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("set @@tidb_snapshot=@a")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
tk.MustExec("rollback")
|
|
}
|
|
|
|
func TestTxnContextForStaleRead(t *testing.T) {
|
|
store, do := setupTxnContextTest(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
se := tk.Session()
|
|
|
|
safePoint := "20160102-15:04:05 -0700"
|
|
tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%s', '') ON DUPLICATE KEY UPDATE variable_value = '%s', comment=''`, safePoint, safePoint))
|
|
|
|
is1 := do.InfoSchema()
|
|
tk.MustExec("do sleep(0.1)")
|
|
tk.MustExec("set @a=now(6)")
|
|
time.Sleep(time.Millisecond * 1200)
|
|
|
|
// change schema
|
|
tk.MustExec("alter table t2 add column(c1 int)")
|
|
tk.MustExec("update t1 set v=11 where id=1")
|
|
|
|
// @@tidb_read_staleness
|
|
tk.MustExec("set @@tidb_read_staleness=-1")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 as of timestamp @a").Check(testkit.Rows("1 10"))
|
|
})
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("set @@tidb_read_staleness=''")
|
|
|
|
// select ... as of ...
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 as of timestamp @a").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
// @@tx_read_ts
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("set @@tx_read_ts=@a")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema())
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 11"))
|
|
})
|
|
|
|
// txn begin with @tx_read_ts
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("set @@tx_read_ts=@a")
|
|
tk.MustExec("begin")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
tk.MustExec("rollback")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema())
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 11"))
|
|
})
|
|
|
|
// txn begin ... as of ...
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("start transaction read only as of timestamp @a")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10"))
|
|
})
|
|
tk.MustExec("rollback")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema())
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 11"))
|
|
})
|
|
}
|
|
|
|
func TestTxnContextForPrepareExecute(t *testing.T) {
|
|
store, do := setupTxnContextTest(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
tk.MustExec("set tidb_enable_prepared_plan_cache=ON")
|
|
se := tk.Session()
|
|
|
|
stmtID, _, _, err := se.PrepareStmt("select * from t1 where id=1")
|
|
require.NoError(t, err)
|
|
|
|
is1 := do.InfoSchema()
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
|
|
// Test prepare/execute in SQL
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustExec("prepare s from 'select * from t1 where id=1'")
|
|
})
|
|
doWithCheckPath(t, se, []string{"assertTxnManagerInCompile", "assertTxnManagerInShortPointGetPlan"}, func() {
|
|
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
// Test ExecutePreparedStmt
|
|
doWithCheckPath(t, se, []string{"assertTxnManagerInCompile", "assertTxnManagerInShortPointGetPlan"}, func() {
|
|
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
// Test PlanCache
|
|
doWithCheckPath(t, se, nil, func() {
|
|
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
// In txn
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("begin")
|
|
|
|
//change schema
|
|
tk2 := testkit.NewTestKit(t, store)
|
|
tk2.MustExec("use test")
|
|
tk2.MustExec("alter table t2 add column(c1 int)")
|
|
tk2.MustExec("update t1 set v=11 where id=1")
|
|
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustExec("prepare s from 'select * from t1 where id=1'")
|
|
})
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))
|
|
})
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))
|
|
})
|
|
|
|
tk.MustExec("rollback")
|
|
}
|
|
|
|
func TestStaleReadInPrepare(t *testing.T) {
|
|
store, _ := setupTxnContextTest(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
se := tk.Session()
|
|
|
|
tk.MustExec(`create table tt (id int primary key, v int)`)
|
|
tk.MustExec(`insert into tt values(1, 10)`)
|
|
tk.MustExec("do sleep(0.1)")
|
|
tk.MustExec("set @a=now(6)")
|
|
tk.MustExec("do sleep(0.1)")
|
|
|
|
st, _, _, err := se.PrepareStmt("select v from tt where id=1")
|
|
require.NoError(t, err)
|
|
|
|
tk.MustExec(`update tt set v=11 where id=1`)
|
|
rs, err := se.ExecutePreparedStmt(context.TODO(), st, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("11"))
|
|
|
|
tk.MustExec("set @@tx_read_ts=@a")
|
|
rs, err = se.ExecutePreparedStmt(context.TODO(), st, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("10"))
|
|
|
|
tk.MustExec("update tt set v=12 where id=1")
|
|
tk.MustExec("set @@tx_read_ts=''")
|
|
rs, err = se.ExecutePreparedStmt(context.TODO(), st, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("12"))
|
|
rs, err = se.ExecutePreparedStmt(context.TODO(), st, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("12"))
|
|
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
|
|
}
|
|
|
|
func TestTxnContextForStaleReadInPrepare(t *testing.T) {
|
|
store, _ := setupTxnContextTest(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
se := tk.Session()
|
|
|
|
is1 := se.GetLatestInfoSchema()
|
|
tk.MustExec("do sleep(0.1)")
|
|
tk.MustExec("set @a=now(6)")
|
|
tk.MustExec("prepare s1 from 'select * from t1 where id=1'")
|
|
tk.MustExec("prepare s2 from 'select * from t1 as of timestamp @a where id=1 '")
|
|
|
|
stmtID1, _, _, err := se.PrepareStmt("select * from t1 where id=1")
|
|
require.NoError(t, err)
|
|
|
|
stmtID2, _, _, err := se.PrepareStmt("select * from t1 as of timestamp @a where id=1 ")
|
|
require.NoError(t, err)
|
|
|
|
//change schema
|
|
tk.MustExec("use test")
|
|
tk.MustExec("alter table t2 add column(c1 int)")
|
|
tk.MustExec("update t1 set v=11 where id=1")
|
|
|
|
tk.MustExec("set @@tx_read_ts=@a")
|
|
stmtID3, _, _, err := se.PrepareStmt("select * from t1 where id=1 ")
|
|
require.NoError(t, err)
|
|
tk.MustExec("set @@tx_read_ts=''")
|
|
|
|
tk.MustExec("set @@tx_read_ts=@a")
|
|
tk.MustExec("prepare s3 from 'select * from t1 where id=1 '")
|
|
tk.MustExec("set @@tx_read_ts=''")
|
|
|
|
// tx_read_ts
|
|
tk.MustExec("set @@tx_read_ts=@a")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))
|
|
})
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("set @@tx_read_ts=''")
|
|
|
|
tk.MustExec("set @@tx_read_ts=@a")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustExec("execute s1")
|
|
})
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("set @@tx_read_ts=''")
|
|
|
|
// select ... as of ...
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID2, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))
|
|
})
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustExec("execute s2")
|
|
})
|
|
|
|
// tx_read_ts in prepare
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID3, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))
|
|
})
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustExec("execute s3")
|
|
})
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
|
|
// stale read should not use plan cache
|
|
is2 := se.GetLatestInfoSchema()
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("set @@tx_read_ts=''")
|
|
tk.MustExec("do sleep(0.1)")
|
|
tk.MustExec("set @b=now(6)")
|
|
tk.MustExec("do sleep(0.1)")
|
|
tk.MustExec("update t1 set v=v+1 where id=1")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2)
|
|
doWithCheckPath(t, se, []string{"assertTxnManagerInCompile", "assertTxnManagerInShortPointGetPlan"}, func() {
|
|
// stale-read is not used since `tx_read_ts` is empty, so the plan cache should be used in this case.
|
|
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 12"))
|
|
})
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("set @@tx_read_ts=@b")
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 11"))
|
|
})
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("set @@tx_read_ts=''")
|
|
}
|
|
|
|
func TestTxnContextPreparedStmtWithForUpdate(t *testing.T) {
|
|
if kerneltype.IsNextGen() {
|
|
t.Skip("MDL is always enabled and read only in nextgen")
|
|
}
|
|
store, do := setupTxnContextTest(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("set global tidb_enable_metadata_lock=0")
|
|
tk.MustExec("use test")
|
|
se := tk.Session()
|
|
|
|
is1 := do.InfoSchema()
|
|
|
|
stmtID1, _, _, err := se.PrepareStmt("select * from t1 where id=1 for update")
|
|
require.NoError(t, err)
|
|
tk.MustExec("prepare s from 'select * from t1 where id=1 for update'")
|
|
tk.MustExec("begin pessimistic")
|
|
|
|
//change schema
|
|
tk2 := testkit.NewTestKit(t, store)
|
|
tk2.MustExec("use test")
|
|
tk2.MustExec("alter table t1 add column(c int default 100)")
|
|
tk2.MustExec("update t1 set v=11 where id=1")
|
|
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 11"))
|
|
})
|
|
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil)
|
|
require.NoError(t, err)
|
|
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 11"))
|
|
})
|
|
|
|
doWithCheckPath(t, se, normalPathRecords, func() {
|
|
tk.MustQuery("execute s").Check(testkit.Rows("1 11"))
|
|
})
|
|
|
|
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
|
|
tk.MustExec("rollback")
|
|
}
|
|
|
|
// See issue: https://github.com/pingcap/tidb/issues/35459
|
|
func TestStillWriteConflictAfterRetry(t *testing.T) {
|
|
store, _ := setupTxnContextTest(t)
|
|
|
|
queries := []string{
|
|
"select * from t1 for update",
|
|
"select * from t1 where id=1 for update",
|
|
"select * from t1 where id in (1, 2, 3) for update",
|
|
"select * from t1 where id=1 and v>0 for update",
|
|
"select * from t1 where id=1 for update union select * from t1 where id=1 for update",
|
|
"update t1 set v=v+1",
|
|
"update t1 set v=v+1 where id=1",
|
|
"update t1 set v=v+1 where id=1 and v>0",
|
|
"update t1 set v=v+1 where id in (1, 2, 3)",
|
|
"update t1 set v=v+1 where id in (1, 2, 3) and v>0",
|
|
}
|
|
|
|
testfork.RunTest(t, func(t *testfork.T) {
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
tk.MustExec("truncate table t1")
|
|
tk.MustExec("insert into t1 values(1, 10)")
|
|
// Fair locking avoids conflicting again after retry in this case. Disable it for this test.
|
|
tk.MustExec("set @@tidb_pessimistic_txn_fair_locking = 0")
|
|
tk2 := testkit.NewSteppedTestKit(t, store)
|
|
defer tk2.MustExec("rollback")
|
|
|
|
tk2.MustExec("use test")
|
|
tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'")
|
|
tk2.MustExec("set @@tidb_pessimistic_txn_fair_locking = 0")
|
|
tk2.MustExec(fmt.Sprintf("set tx_isolation = '%s'", testfork.PickEnum(t, ast.RepeatableRead, ast.ReadCommitted)))
|
|
autocommit := testfork.PickEnum(t, 0, 1)
|
|
tk2.MustExec(fmt.Sprintf("set autocommit=%d", autocommit))
|
|
if autocommit == 1 {
|
|
tk2.MustExec("begin")
|
|
}
|
|
|
|
tk2.SetBreakPoints(
|
|
sessiontxn.BreakPointBeforeExecutorFirstRun,
|
|
sessiontxn.BreakPointOnStmtRetryAfterLockError,
|
|
)
|
|
|
|
var isSelect, isUpdate bool
|
|
query := testfork.Pick(t, queries)
|
|
switch {
|
|
case strings.HasPrefix(query, "select"):
|
|
isSelect = true
|
|
tk2.SteppedMustQuery(query)
|
|
case strings.HasPrefix(query, "update"):
|
|
isUpdate = true
|
|
tk2.SteppedMustExec(query)
|
|
default:
|
|
require.FailNowf(t, "invalid query: ", query)
|
|
}
|
|
|
|
// Pause the session before the executor first run and then update the record in another session
|
|
tk2.ExpectStopOnBreakPoint(sessiontxn.BreakPointBeforeExecutorFirstRun)
|
|
tk.MustExec("update t1 set v=v+1")
|
|
|
|
// Session continues, it should get a lock error and retry, we pause the session before the executor's next run
|
|
// and then update the record in another session again.
|
|
tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointOnStmtRetryAfterLockError)
|
|
tk.MustExec("update t1 set v=v+1")
|
|
|
|
// Because the record is updated by another session again, when this session continues, it will get a lock error again.
|
|
tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointOnStmtRetryAfterLockError)
|
|
tk2.Continue().ExpectIdle()
|
|
switch {
|
|
case isSelect:
|
|
tk2.GetQueryResult().Check(testkit.Rows("1 12"))
|
|
case isUpdate:
|
|
tk2.MustExec("commit")
|
|
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 13"))
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestOptimisticTxnRetryInPessimisticMode(t *testing.T) {
|
|
defer config.RestoreFunc()()
|
|
config.UpdateGlobal(func(conf *config.Config) {
|
|
conf.PessimisticTxn.PessimisticAutoCommit.Store(false)
|
|
})
|
|
store, _ := setupTxnContextTest(t)
|
|
|
|
queries := []string{
|
|
"update t1 set v=v+1",
|
|
"update t1 set v=v+1 where id=1",
|
|
"update t1 set v=v+1 where id=1 and v>0",
|
|
"update t1 set v=v+1 where id in (1, 2, 3)",
|
|
"update t1 set v=v+1 where id in (1, 2, 3) and v>0",
|
|
}
|
|
|
|
testfork.RunTest(t, func(t *testfork.T) {
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
tk.MustExec("truncate table t1")
|
|
tk.MustExec("insert into t1 values(1, 10)")
|
|
tk2 := testkit.NewSteppedTestKit(t, store)
|
|
defer tk2.MustExec("rollback")
|
|
|
|
tk2.MustExec("use test")
|
|
tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'")
|
|
tk2.MustExec("set autocommit = 1")
|
|
|
|
// When autocommit meets write conflict, it will retry in pessimistic mode.
|
|
// conflictAfterTransfer being true means we encounter a write-conflict again during
|
|
// the pessimistic mode.
|
|
// doubleConflictAfterTransfer being true means we encounter a write-conflict again
|
|
// during the pessimistic retry phase.
|
|
// And only conflictAfterTransfer being true allows doubleConflictAfterTransfer being true.
|
|
conflictAfterTransfer := testfork.PickEnum(t, true, false)
|
|
doubleConflictAfterTransfer := testfork.PickEnum(t, true, false)
|
|
if !conflictAfterTransfer && doubleConflictAfterTransfer {
|
|
return
|
|
}
|
|
|
|
// If `tidb_pessimistic_txn_fair_locking` is enabled, the double conflict case is
|
|
// avoided. Disable it to run this test.
|
|
if doubleConflictAfterTransfer {
|
|
tk2.MustExec("set @@tidb_pessimistic_txn_fair_locking = 0")
|
|
}
|
|
|
|
tk2.SetBreakPoints(
|
|
sessiontxn.BreakPointBeforeExecutorFirstRun,
|
|
sessiontxn.BreakPointOnStmtRetryAfterLockError,
|
|
)
|
|
|
|
query := testfork.Pick(t, queries)
|
|
|
|
tk2.SteppedMustExec(query)
|
|
|
|
// Pause the session before the executor first run and then update the record in another session
|
|
tk2.ExpectStopOnBreakPoint(sessiontxn.BreakPointBeforeExecutorFirstRun)
|
|
// After this update, tk2's statement will encounter write conflict. As it's an autocommit transaction,
|
|
// it will transfer to pessimistic transaction mode.
|
|
tk.MustExec("update t1 set v=v+1")
|
|
|
|
if conflictAfterTransfer {
|
|
tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointBeforeExecutorFirstRun)
|
|
tk.MustExec("update t1 set v=v+1")
|
|
|
|
if doubleConflictAfterTransfer {
|
|
// Session continues, it should get a lock error and retry, we pause the session before the executor's next run
|
|
// and then update the record in another session again.
|
|
tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointOnStmtRetryAfterLockError)
|
|
tk.MustExec("update t1 set v=v+1")
|
|
}
|
|
|
|
// Because the record is updated by another session again, when this session continues, it will get a lock error again.
|
|
tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointOnStmtRetryAfterLockError)
|
|
tk2.Continue().ExpectIdle()
|
|
|
|
if doubleConflictAfterTransfer {
|
|
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 14"))
|
|
} else {
|
|
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 13"))
|
|
}
|
|
} else {
|
|
tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointBeforeExecutorFirstRun)
|
|
tk2.Continue().ExpectIdle()
|
|
|
|
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 12"))
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestTSOCmdCountForPrepareExecute(t *testing.T) {
|
|
// This is a mock workload mocks one which discovers that the tso request count is abnormal.
|
|
// After the bug fix, the tso request count recovers, so we use this workload to record the current tso request count
|
|
// to reject future works that accidentally causes tso request increasing.
|
|
// Note, we do not record all tso requests but some typical requests.
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/sessiontxn/isolation/requestTsoFromPD", "return"))
|
|
defer func() {
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/sessiontxn/isolation/requestTsoFromPD"))
|
|
}()
|
|
store := testkit.CreateMockStore(t)
|
|
|
|
ctx := context.Background()
|
|
tk := testkit.NewTestKit(t, store)
|
|
sctx := tk.Session()
|
|
|
|
tk.MustExec("use test")
|
|
tk.MustExec("drop table if exists t1")
|
|
tk.MustExec("drop table if exists t2")
|
|
tk.MustExec("drop table if exists t3")
|
|
|
|
tk.MustExec("create table t1(id int, v int, v2 int, primary key (id), unique key uk (v))")
|
|
tk.MustExec("create table t2(id int, v int, unique key i1(v))")
|
|
tk.MustExec("create table t3(id int, v int, key i1(v))")
|
|
|
|
sqlSelectID, _, _, _ := tk.Session().PrepareStmt("select * from t1 where id = ? for update")
|
|
sqlUpdateID, _, _, _ := tk.Session().PrepareStmt("update t1 set v = v + 10 where id = ?")
|
|
sqlInsertID1, _, _, _ := tk.Session().PrepareStmt("insert into t2 values(?, ?)")
|
|
sqlInsertID2, _, _, _ := tk.Session().PrepareStmt("insert into t3 values(?, ?)")
|
|
|
|
tk.MustExec("insert into t1 values (1, 1, 1)")
|
|
sctx.SetValue(sessiontxn.TsoRequestCount, 0)
|
|
|
|
for i := 1; i < 100; i++ {
|
|
tk.MustExec("begin pessimistic")
|
|
stmt, err := tk.Session().ExecutePreparedStmt(ctx, sqlSelectID, expression.Args2Expressions4Test(1))
|
|
require.NoError(t, err)
|
|
require.NoError(t, stmt.Close())
|
|
stmt, err = tk.Session().ExecutePreparedStmt(ctx, sqlUpdateID, expression.Args2Expressions4Test(1))
|
|
require.NoError(t, err)
|
|
require.Nil(t, stmt)
|
|
|
|
val := i * 10
|
|
stmt, err = tk.Session().ExecutePreparedStmt(ctx, sqlInsertID1, expression.Args2Expressions4Test(val, val))
|
|
require.NoError(t, err)
|
|
require.Nil(t, stmt)
|
|
stmt, err = tk.Session().ExecutePreparedStmt(ctx, sqlInsertID2, expression.Args2Expressions4Test(val, val))
|
|
require.NoError(t, err)
|
|
require.Nil(t, stmt)
|
|
tk.MustExec("commit")
|
|
}
|
|
count := sctx.Value(sessiontxn.TsoRequestCount)
|
|
require.Equal(t, uint64(99), count)
|
|
}
|
|
|
|
func TestTSOCmdCountForTextSql(t *testing.T) {
|
|
// This is a mock workload mocks one which discovers that the tso request count is abnormal.
|
|
// After the bug fix, the tso request count recovers, so we use this workload to record the current tso request count
|
|
// to reject future works that accidentally causes tso request increasing.
|
|
// Note, we do not record all tso requests but some typical requests.
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/sessiontxn/isolation/requestTsoFromPD", "return"))
|
|
defer func() {
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/sessiontxn/isolation/requestTsoFromPD"))
|
|
}()
|
|
store := testkit.CreateMockStore(t)
|
|
|
|
tk := testkit.NewTestKit(t, store)
|
|
sctx := tk.Session()
|
|
|
|
tk.MustExec("use test")
|
|
tk.MustExec("drop table if exists t1")
|
|
tk.MustExec("drop table if exists t2")
|
|
tk.MustExec("drop table if exists t3")
|
|
|
|
tk.MustExec("create table t1(id int, v int, v2 int, primary key (id), unique key uk (v))")
|
|
tk.MustExec("create table t2(id int, v int, unique key i1(v))")
|
|
tk.MustExec("create table t3(id int, v int, key i1(v))")
|
|
|
|
tk.MustExec("insert into t1 values (1, 1, 1)")
|
|
sctx.SetValue(sessiontxn.TsoRequestCount, 0)
|
|
for i := 1; i < 100; i++ {
|
|
tk.MustExec("begin pessimistic")
|
|
tk.MustQuery("select * from t1 where id = 1 for update")
|
|
tk.MustExec("update t1 set v = v + 10 where id = 1")
|
|
val := i * 10
|
|
tk.MustExec(fmt.Sprintf("insert into t2 values(%v, %v)", val, val))
|
|
tk.MustExec(fmt.Sprintf("insert into t3 values(%v, %v)", val, val))
|
|
tk.MustExec("commit")
|
|
}
|
|
count := sctx.Value(sessiontxn.TsoRequestCount)
|
|
require.Equal(t, uint64(99), count)
|
|
}
|