txn: force locking temp index keys in dml in next-gen (#64398)
ref pingcap/tidb#64241
This commit is contained in:
@ -9,7 +9,7 @@ go_test(
|
||||
],
|
||||
flaky = True,
|
||||
race = "on",
|
||||
shard_count = 23,
|
||||
shard_count = 24,
|
||||
deps = [
|
||||
"//pkg/config",
|
||||
"//pkg/config/kerneltype",
|
||||
|
||||
@ -17,6 +17,7 @@ package indexmergetest
|
||||
import (
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -476,6 +477,62 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
|
||||
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2"))
|
||||
}
|
||||
|
||||
func TestNextGenPessimisticTxnNotFailWithTempIndex(t *testing.T) {
|
||||
if !kerneltype.IsNextGen() {
|
||||
t.Skip("only meaningful when next-gen kernel enforces temp index locking")
|
||||
}
|
||||
store, dom := testkit.CreateMockStoreAndDomain(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("drop table if exists t_pess")
|
||||
tk.MustExec("create table t_pess (id int primary key, b int);")
|
||||
tk.MustExec("insert into t_pess values (1, 1);")
|
||||
|
||||
origLit := ingest.LitInitialized
|
||||
ingest.LitInitialized = false
|
||||
t.Cleanup(func() { ingest.LitInitialized = origLit })
|
||||
|
||||
txnReady := make(chan struct{})
|
||||
txnDone := make(chan error, 1)
|
||||
var startTxnOnce sync.Once
|
||||
var txnStarted atomic.Bool
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterWaitSchemaSynced", func(job *model.Job) {
|
||||
if t.Failed() || job.Type != model.ActionAddIndex || job.SchemaState != model.StateWriteReorganization {
|
||||
return
|
||||
}
|
||||
idx := testutil.FindIdxInfo(dom, "test", "t_pess", "idx_b")
|
||||
if idx == nil || idx.BackfillState != model.BackfillStateRunning {
|
||||
return
|
||||
}
|
||||
triggered := false
|
||||
startTxnOnce.Do(func() {
|
||||
triggered = true
|
||||
txnStarted.Store(true)
|
||||
// let the user transaction acquire pessimistic lock first
|
||||
go func() {
|
||||
tk2 := testkit.NewTestKit(t, store)
|
||||
tk2.MustExec("use test")
|
||||
tk2.MustExec("begin pessimistic")
|
||||
tk2.MustExec("update t_pess set b = b + 10 where id = 1")
|
||||
txnReady <- struct{}{}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
txnDone <- tk2.ExecToErr("commit")
|
||||
}()
|
||||
})
|
||||
// Wait for the txn to hold the pessimistic lock
|
||||
if triggered {
|
||||
<-txnReady
|
||||
}
|
||||
})
|
||||
|
||||
// Assert: both user transaction and DDL will succeed, and data are consistent
|
||||
tk.MustExec("alter table t_pess add index idx_b(b);")
|
||||
require.True(t, txnStarted.Load())
|
||||
require.NoError(t, <-txnDone)
|
||||
tk.MustExec("admin check table t_pess")
|
||||
tk.MustQuery("select * from t_pess").Check(testkit.Rows("1 11"))
|
||||
}
|
||||
|
||||
func TestAddIndexMergeInsertOnMerging(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
|
||||
|
||||
@ -18,6 +18,7 @@ import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
||||
"github.com/pingcap/tidb/pkg/domain"
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/planner/util/coretestsdk"
|
||||
@ -44,6 +45,8 @@ func TestKeysNeedLock(t *testing.T) {
|
||||
rowKey := tablecodec.EncodeRowKeyWithHandle(1, kv.IntHandle(1))
|
||||
uniqueIndexKey := tablecodec.EncodeIndexSeekKey(1, 1, []byte{1})
|
||||
nonUniqueIndexKey := tablecodec.EncodeIndexSeekKey(1, 2, []byte{1})
|
||||
tempIndexKey := tablecodec.EncodeIndexSeekKey(1, 3, []byte{1})
|
||||
tablecodec.IndexKey2TempIndexKey(tempIndexKey)
|
||||
uniqueValue := make([]byte, 8)
|
||||
uniqueUntouched := append(uniqueValue, '1')
|
||||
nonUniqueVal := []byte{'0'}
|
||||
@ -79,4 +82,18 @@ func TestKeysNeedLock(t *testing.T) {
|
||||
require.True(t, session.KeyNeedToLock(test.key, test.val, flag))
|
||||
}
|
||||
}
|
||||
|
||||
tempIdxValue := (&tablecodec.TempIndexValueElem{
|
||||
Value: nonUniqueVal,
|
||||
KeyVer: tablecodec.TempIndexKeyTypeBackfill,
|
||||
}).Encode(nil)
|
||||
need := session.KeyNeedToLock(tempIndexKey, tempIdxValue, 0)
|
||||
if kerneltype.IsNextGen() {
|
||||
require.True(t, need)
|
||||
} else {
|
||||
require.False(t, need)
|
||||
}
|
||||
|
||||
flag := kv.KeyFlags(1)
|
||||
require.True(t, session.KeyNeedToLock(tempIndexKey, tempIdxValue, flag))
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/pkg/config"
|
||||
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/meta/model"
|
||||
"github.com/pingcap/tidb/pkg/parser/terror"
|
||||
@ -652,6 +653,11 @@ func KeyNeedToLock(k, v []byte, flags kv.KeyFlags) bool {
|
||||
}
|
||||
|
||||
if tablecodec.IsTempIndexKey(k) {
|
||||
// We force DMLs to lock all temporary index keys in next-gen, because
|
||||
// next-gen enforces conflict check on all keys, including non-unique index keys.
|
||||
if kerneltype.IsNextGen() {
|
||||
return true
|
||||
}
|
||||
tmpVal, err := tablecodec.DecodeTempIndexValue(v)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("decode temp index value failed", zap.Error(err))
|
||||
|
||||
Reference in New Issue
Block a user