From 76eaed8c2ef1afa89dbd3d5f330b179f3d1df5ec Mon Sep 17 00:00:00 2001 From: disksing Date: Sat, 30 Apr 2016 10:33:22 +0800 Subject: [PATCH] store/tikv: split prewrite/commit request to prevent large RPC requests (#1183) --- store/tikv/txn_committer.go | 58 +++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/store/tikv/txn_committer.go b/store/tikv/txn_committer.go index 847334bb2b..caf46248d8 100644 --- a/store/tikv/txn_committer.go +++ b/store/tikv/txn_committer.go @@ -112,6 +112,18 @@ func (c *txnCommitter) iterKeysByRegion(keys [][]byte, f func([][]byte) error) e return nil } +func (c *txnCommitter) keyValueSize(key []byte) int { + size := c.keySize(key) + if mutation := c.mutations[string(key)]; mutation != nil { + size += len(mutation.Value) + } + return size +} + +func (c *txnCommitter) keySize(key []byte) int { + return len(key) +} + func (c *txnCommitter) prewriteSingleRegion(keys [][]byte) error { mutations := make([]*pb.Mutation, len(keys)) for i, k := range keys { @@ -141,7 +153,7 @@ func (c *txnCommitter) prewriteSingleRegion(keys [][]byte) error { // TODO: The recursive maybe not able to exit if TiKV & // PD are implemented incorrectly. A possible fix is // introducing a 'max backoff time'. - err = c.iterKeysByRegion(keys, c.prewriteSingleRegion) + err = c.prewriteKeys(keys) return errors.Trace(err) } prewriteResp := resp.GetCmdPrewriteResp() @@ -190,7 +202,7 @@ func (c *txnCommitter) commitSingleRegion(keys [][]byte) error { } if regionErr := resp.GetRegionError(); regionErr != nil { // re-split keys and commit again. - err = c.iterKeysByRegion(keys, c.commitSingleRegion) + err = c.commitKeys(keys) return errors.Trace(err) } commitResp := resp.GetCmdCommitResp() @@ -238,11 +250,23 @@ func (c *txnCommitter) cleanupSingleRegion(keys [][]byte) error { return nil } +func (c *txnCommitter) prewriteKeys(keys [][]byte) error { + return c.iterKeysByRegion(keys, batchIterFn(c.prewriteSingleRegion, c.keyValueSize)) +} + +func (c *txnCommitter) commitKeys(keys [][]byte) error { + return c.iterKeysByRegion(keys, batchIterFn(c.commitSingleRegion, c.keySize)) +} + +func (c *txnCommitter) cleanupKeys(keys [][]byte) error { + return c.iterKeysByRegion(keys, batchIterFn(c.cleanupSingleRegion, c.keySize)) +} + func (c *txnCommitter) Commit() error { - err := c.iterKeysByRegion(c.keys, c.prewriteSingleRegion) + err := c.prewriteKeys(c.keys) if err != nil { log.Warnf("txn commit failed on prewrite: %v", err) - c.iterKeysByRegion(c.writtenKeys, c.cleanupSingleRegion) + c.cleanupKeys(c.writtenKeys) return errors.Trace(err) } @@ -252,13 +276,35 @@ func (c *txnCommitter) Commit() error { } c.commitTS = commitTS - err = c.iterKeysByRegion(c.keys, c.commitSingleRegion) + err = c.commitKeys(c.keys) if err != nil { if !c.committed { - c.iterKeysByRegion(c.writtenKeys, c.cleanupSingleRegion) + c.cleanupKeys(c.writtenKeys) return errors.Trace(err) } log.Warnf("txn commit succeed with error: %v", err) } return nil } + +// TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's +// Key+Value size below 512KB. +const txnCommitBatchSize = 512 * 1024 + +// batchIterfn wraps an iteration function and returns a new one that iterates +// keys by batch size. +func batchIterFn(f func([][]byte) error, sizeFn func([]byte) int) func([][]byte) error { + return func(keys [][]byte) error { + var start, end int + for start = 0; start < len(keys); start = end { + var size int + for end = start; end < len(keys) && size < txnCommitBatchSize; end++ { + size += sizeFn(keys[end]) + } + if err := f(keys[start:end]); err != nil { + return errors.Trace(err) + } + } + return nil + } +}