store/tikv: remove canceled requests before sending (#10634)
This commit is contained in:
@ -110,12 +110,12 @@ func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs in
|
||||
}
|
||||
select {
|
||||
case <-time.After(time.Duration(realSleep) * time.Millisecond):
|
||||
attempts++
|
||||
lastSleep = sleep
|
||||
return lastSleep
|
||||
case <-ctx.Done():
|
||||
return 0
|
||||
}
|
||||
|
||||
attempts++
|
||||
lastSleep = sleep
|
||||
return lastSleep
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -336,6 +336,10 @@ type batchCommandsEntry struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (b *batchCommandsEntry) isCanceled() bool {
|
||||
return atomic.LoadInt32(&b.canceled) == 1
|
||||
}
|
||||
|
||||
const idleTimeout = 3 * time.Minute
|
||||
|
||||
// fetchAllPendingRequests fetches all pending requests from the channel.
|
||||
@ -476,6 +480,10 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
|
||||
bestBatchWaitSize += 1
|
||||
}
|
||||
|
||||
length = removeCanceledRequests(&entries, &requests)
|
||||
if length == 0 {
|
||||
continue // All requests are canceled.
|
||||
}
|
||||
maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length))
|
||||
for i := 0; i < length; i++ {
|
||||
requestID := uint64(i) + maxBatchID - uint64(length)
|
||||
@ -506,6 +514,23 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
|
||||
}
|
||||
}
|
||||
|
||||
// removeCanceledRequests removes canceled requests before sending.
|
||||
func removeCanceledRequests(
|
||||
entries *[]*batchCommandsEntry,
|
||||
requests *[]*tikvpb.BatchCommandsRequest_Request) int {
|
||||
validEntries := (*entries)[:0]
|
||||
validRequets := (*requests)[:0]
|
||||
for _, e := range *entries {
|
||||
if !e.isCanceled() {
|
||||
validEntries = append(validEntries, e)
|
||||
validRequets = append(validRequets, e.req)
|
||||
}
|
||||
}
|
||||
*entries = validEntries
|
||||
*requests = validRequets
|
||||
return len(*entries)
|
||||
}
|
||||
|
||||
// rpcClient is RPC client struct.
|
||||
// TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV.
|
||||
// Since we use shared client connection to communicate to the same TiKV, it's possible
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
"testing"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/kvproto/pkg/tikvpb"
|
||||
"github.com/pingcap/tidb/config"
|
||||
)
|
||||
|
||||
@ -52,3 +53,27 @@ func (s *testClientSuite) TestConn(c *C) {
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(conn3, IsNil)
|
||||
}
|
||||
|
||||
func (s *testClientSuite) TestRemoveCanceledRequests(c *C) {
|
||||
req := new(tikvpb.BatchCommandsRequest_Request)
|
||||
entries := []*batchCommandsEntry{
|
||||
{canceled: 1, req: req},
|
||||
{canceled: 0, req: req},
|
||||
{canceled: 1, req: req},
|
||||
{canceled: 1, req: req},
|
||||
{canceled: 0, req: req},
|
||||
}
|
||||
entryPtr := &entries[0]
|
||||
requests := make([]*tikvpb.BatchCommandsRequest_Request, len(entries))
|
||||
for i := range entries {
|
||||
requests[i] = entries[i].req
|
||||
}
|
||||
length := removeCanceledRequests(&entries, &requests)
|
||||
c.Assert(length, Equals, 2)
|
||||
for _, e := range entries {
|
||||
c.Assert(e.isCanceled(), IsFalse)
|
||||
}
|
||||
c.Assert(len(requests), Equals, 2)
|
||||
newEntryPtr := &entries[0]
|
||||
c.Assert(entryPtr, Equals, newEntryPtr)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user