store/tikv: pre-split regions during 2PC to avoid hotspot (#16920)
This commit is contained in:
@ -423,6 +423,9 @@ func txnLockTTL(startTime time.Time, txnSize int) uint64 {
|
||||
return lockTTL + uint64(elapsed)
|
||||
}
|
||||
|
||||
var preSplitDetectThreshold uint32 = 100000
|
||||
var preSplitSizeThreshold uint32 = 32 << 20
|
||||
|
||||
// doActionOnMutations groups keys into primary batch and secondary batches, if primary batch exists in the key,
|
||||
// it does action on primary batch first, then on secondary batches. If action is commit, secondary batches
|
||||
// is done in background goroutine.
|
||||
@ -435,6 +438,68 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// Pre-split regions to avoid too much write workload into a single region.
|
||||
// In the large transaction case, this operation is important to avoid TiKV 'server is busy' error.
|
||||
var preSplited bool
|
||||
preSplitDetectThresholdVal := atomic.LoadUint32(&preSplitDetectThreshold)
|
||||
for _, group := range groups {
|
||||
if uint32(group.mutations.len()) >= preSplitDetectThresholdVal {
|
||||
logutil.BgLogger().Info("2PC detect large amount of mutations on a single region",
|
||||
zap.Uint64("region", group.region.GetID()),
|
||||
zap.Int("mutations count", group.mutations.len()))
|
||||
// Use context.Background, this time should not add up to Backoffer.
|
||||
if preSplitAndScatterIn2PC(context.Background(), c.store, group) {
|
||||
preSplited = true
|
||||
}
|
||||
}
|
||||
}
|
||||
// Reload region cache again.
|
||||
if preSplited {
|
||||
groups, err = c.store.regionCache.GroupSortedMutationsByRegion(bo, mutations)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
return c.doActionOnGroupMutations(bo, action, groups)
|
||||
}
|
||||
|
||||
func preSplitAndScatterIn2PC(ctx context.Context, store *tikvStore, group groupedMutations) bool {
|
||||
length := group.mutations.len()
|
||||
splitKeys := make([][]byte, 0, 4)
|
||||
|
||||
preSplitSizeThresholdVal := atomic.LoadUint32(&preSplitSizeThreshold)
|
||||
regionSize := 0
|
||||
for i := 0; i < length; i++ {
|
||||
regionSize = regionSize + len(group.mutations.keys[i]) + len(group.mutations.values[i])
|
||||
// The second condition is used for testing.
|
||||
if regionSize >= int(preSplitSizeThresholdVal) {
|
||||
regionSize = 0
|
||||
splitKeys = append(splitKeys, group.mutations.keys[i])
|
||||
}
|
||||
}
|
||||
if len(splitKeys) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
regionIDs, err := store.SplitRegions(ctx, splitKeys, true)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("2PC split regions failed", zap.Uint64("regionID", group.region.id), zap.Int("keys count", length), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
for _, regionID := range regionIDs {
|
||||
err := store.WaitScatterRegionFinish(regionID, 0)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("2PC wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
// Invalidate the old region cache information.
|
||||
store.regionCache.InvalidateCachedRegion(group.region)
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPhaseCommitAction, groups []groupedMutations) error {
|
||||
action.tiKVTxnRegionsNumHistogram().Observe(float64(len(groups)))
|
||||
|
||||
var batches []batchMutations
|
||||
@ -467,6 +532,7 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo
|
||||
_, actionIsCleanup := action.(actionCleanup)
|
||||
_, actionIsPessimiticLock := action.(actionPessimisticLock)
|
||||
|
||||
var err error
|
||||
failpoint.Inject("skipKeyReturnOK", func(val failpoint.Value) {
|
||||
valStr, ok := val.(string)
|
||||
if ok && c.connID > 0 {
|
||||
|
||||
@ -15,7 +15,12 @@
|
||||
|
||||
package tikv
|
||||
|
||||
import . "github.com/pingcap/check"
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
)
|
||||
|
||||
// TestCommitMultipleRegions tests commit multiple regions.
|
||||
// The test takes too long under the race detector.
|
||||
@ -35,3 +40,38 @@ func (s *testCommitterSuite) TestCommitMultipleRegions(c *C) {
|
||||
}
|
||||
s.mustCommit(c, m)
|
||||
}
|
||||
|
||||
func (s *testTiclientSuite) TestSplitRegionIn2PC(c *C) {
|
||||
const preSplitThresholdInTest = 500
|
||||
old := atomic.LoadUint32(&preSplitDetectThreshold)
|
||||
defer atomic.StoreUint32(&preSplitDetectThreshold, old)
|
||||
atomic.StoreUint32(&preSplitDetectThreshold, preSplitThresholdInTest)
|
||||
|
||||
old = atomic.LoadUint32(&preSplitSizeThreshold)
|
||||
defer atomic.StoreUint32(&preSplitSizeThreshold, old)
|
||||
atomic.StoreUint32(&preSplitSizeThreshold, 5000)
|
||||
|
||||
bo := NewBackoffer(context.Background(), 1)
|
||||
startKey := encodeKey(s.prefix, s08d("key", 0))
|
||||
endKey := encodeKey(s.prefix, s08d("key", preSplitThresholdInTest))
|
||||
checkKeyRegion := func(bo *Backoffer, start, end []byte, checker Checker) {
|
||||
// Check regions after split.
|
||||
loc1, err := s.store.regionCache.LocateKey(bo, start)
|
||||
c.Assert(err, IsNil)
|
||||
loc2, err := s.store.regionCache.LocateKey(bo, end)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(loc1.Region.id, checker, loc2.Region.id)
|
||||
}
|
||||
|
||||
// Check before test.
|
||||
checkKeyRegion(bo, startKey, endKey, Equals)
|
||||
txn := s.beginTxn(c)
|
||||
for i := 0; i < preSplitThresholdInTest; i++ {
|
||||
err := txn.Set(encodeKey(s.prefix, s08d("key", i)), valueBytes(i))
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
err := txn.Commit(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
// Check region split after test.
|
||||
checkKeyRegion(bo, startKey, endKey, Not(Equals))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user