store/tikv: split prewrite/commit request to prevent large RPC requests (#1183)
This commit is contained in:
@ -112,6 +112,18 @@ func (c *txnCommitter) iterKeysByRegion(keys [][]byte, f func([][]byte) error) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *txnCommitter) keyValueSize(key []byte) int {
|
||||
size := c.keySize(key)
|
||||
if mutation := c.mutations[string(key)]; mutation != nil {
|
||||
size += len(mutation.Value)
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
func (c *txnCommitter) keySize(key []byte) int {
|
||||
return len(key)
|
||||
}
|
||||
|
||||
func (c *txnCommitter) prewriteSingleRegion(keys [][]byte) error {
|
||||
mutations := make([]*pb.Mutation, len(keys))
|
||||
for i, k := range keys {
|
||||
@ -141,7 +153,7 @@ func (c *txnCommitter) prewriteSingleRegion(keys [][]byte) error {
|
||||
// TODO: The recursive maybe not able to exit if TiKV &
|
||||
// PD are implemented incorrectly. A possible fix is
|
||||
// introducing a 'max backoff time'.
|
||||
err = c.iterKeysByRegion(keys, c.prewriteSingleRegion)
|
||||
err = c.prewriteKeys(keys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
prewriteResp := resp.GetCmdPrewriteResp()
|
||||
@ -190,7 +202,7 @@ func (c *txnCommitter) commitSingleRegion(keys [][]byte) error {
|
||||
}
|
||||
if regionErr := resp.GetRegionError(); regionErr != nil {
|
||||
// re-split keys and commit again.
|
||||
err = c.iterKeysByRegion(keys, c.commitSingleRegion)
|
||||
err = c.commitKeys(keys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
commitResp := resp.GetCmdCommitResp()
|
||||
@ -238,11 +250,23 @@ func (c *txnCommitter) cleanupSingleRegion(keys [][]byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *txnCommitter) prewriteKeys(keys [][]byte) error {
|
||||
return c.iterKeysByRegion(keys, batchIterFn(c.prewriteSingleRegion, c.keyValueSize))
|
||||
}
|
||||
|
||||
func (c *txnCommitter) commitKeys(keys [][]byte) error {
|
||||
return c.iterKeysByRegion(keys, batchIterFn(c.commitSingleRegion, c.keySize))
|
||||
}
|
||||
|
||||
func (c *txnCommitter) cleanupKeys(keys [][]byte) error {
|
||||
return c.iterKeysByRegion(keys, batchIterFn(c.cleanupSingleRegion, c.keySize))
|
||||
}
|
||||
|
||||
func (c *txnCommitter) Commit() error {
|
||||
err := c.iterKeysByRegion(c.keys, c.prewriteSingleRegion)
|
||||
err := c.prewriteKeys(c.keys)
|
||||
if err != nil {
|
||||
log.Warnf("txn commit failed on prewrite: %v", err)
|
||||
c.iterKeysByRegion(c.writtenKeys, c.cleanupSingleRegion)
|
||||
c.cleanupKeys(c.writtenKeys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -252,13 +276,35 @@ func (c *txnCommitter) Commit() error {
|
||||
}
|
||||
c.commitTS = commitTS
|
||||
|
||||
err = c.iterKeysByRegion(c.keys, c.commitSingleRegion)
|
||||
err = c.commitKeys(c.keys)
|
||||
if err != nil {
|
||||
if !c.committed {
|
||||
c.iterKeysByRegion(c.writtenKeys, c.cleanupSingleRegion)
|
||||
c.cleanupKeys(c.writtenKeys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
log.Warnf("txn commit succeed with error: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's
|
||||
// Key+Value size below 512KB.
|
||||
const txnCommitBatchSize = 512 * 1024
|
||||
|
||||
// batchIterfn wraps an iteration function and returns a new one that iterates
|
||||
// keys by batch size.
|
||||
func batchIterFn(f func([][]byte) error, sizeFn func([]byte) int) func([][]byte) error {
|
||||
return func(keys [][]byte) error {
|
||||
var start, end int
|
||||
for start = 0; start < len(keys); start = end {
|
||||
var size int
|
||||
for end = start; end < len(keys) && size < txnCommitBatchSize; end++ {
|
||||
size += sizeFn(keys[end])
|
||||
}
|
||||
if err := f(keys[start:end]); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user