feat: port rate-limiter from influxdb (#44)
This commit is contained in:
parent
1aa715df05
commit
74d622bece
1
go.mod
1
go.mod
@ -6,6 +6,7 @@ require (
|
||||
github.com/AlecAivazis/survey/v2 v2.2.9
|
||||
github.com/BurntSushi/toml v0.3.1
|
||||
github.com/daixiang0/gci v0.2.8
|
||||
github.com/fujiwara/shapeio v1.0.0
|
||||
github.com/kr/pretty v0.1.0 // indirect
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/urfave/cli/v2 v2.3.0
|
||||
|
6
go.sum
6
go.sum
@ -11,6 +11,10 @@ github.com/daixiang0/gci v0.2.8/go.mod h1:+4dZ7TISfSmqfAGv59ePaHfNzgGtIkHAhhdKgg
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/fujiwara/shapeio v1.0.0 h1:xG5D9oNqCSUUbryZ/jQV3cqe1v2suEjwPIcEg1gKM8M=
|
||||
github.com/fujiwara/shapeio v1.0.0/go.mod h1:LmEmu6L/8jetyj1oewewFb7bZCNRwE7wLCUNzDLaLVA=
|
||||
github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174 h1:WlZsjVhE8Af9IcZDGgJGQpNflI3+MJSBhsgT5PCtzBQ=
|
||||
github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174/go.mod h1:DqJ97dSdRW1W22yXSB90986pcOyQ7r45iio1KN2ez1A=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||
@ -64,6 +68,8 @@ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20201118003311-bd56c0adb394/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
|
78
internal/throttler/throttler.go
Normal file
78
internal/throttler/throttler.go
Normal file
@ -0,0 +1,78 @@
|
||||
package throttler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/fujiwara/shapeio"
|
||||
"github.com/influxdata/influx-cli/v2/pkg/csv2lp"
|
||||
)
|
||||
|
||||
type Throttler struct {
|
||||
bytesPerSecond float64
|
||||
}
|
||||
|
||||
func NewThrottler(rate string) (*Throttler, error) {
|
||||
bytesPerSec, err := ToBytesPerSecond(rate)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Throttler{bytesPerSecond: bytesPerSec}, nil
|
||||
}
|
||||
|
||||
func (t *Throttler) Throttle(ctx context.Context, in io.Reader) io.Reader {
|
||||
if t.bytesPerSecond == 0.0 {
|
||||
return in
|
||||
}
|
||||
|
||||
// LineReader ensures that original reader is consumed in the smallest possible
|
||||
// units (at most one protocol line) to avoid bigger pauses in throttling
|
||||
throttledReader := shapeio.NewReaderWithContext(csv2lp.NewLineReader(in), ctx)
|
||||
throttledReader.SetRateLimit(t.bytesPerSecond)
|
||||
|
||||
return throttledReader
|
||||
}
|
||||
|
||||
var rateLimitRegexp = regexp.MustCompile(`^(\d*\.?\d*)(B|kB|MB)/?(\d*)?(s|sec|m|min)$`)
|
||||
var bytesUnitMultiplier = map[string]float64{"B": 1, "kB": 1024, "MB": 1_048_576}
|
||||
var timeUnitMultiplier = map[string]float64{"s": 1, "sec": 1, "m": 60, "min": 60}
|
||||
|
||||
// ToBytesPerSecond converts rate from string to number. The supplied string
|
||||
// value format must be COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional.
|
||||
// All spaces are ignored, they can help with formatting. Examples: "5 MB / 5 min", 17kbs. 5.1MB5m.
|
||||
func ToBytesPerSecond(rateLimit string) (float64, error) {
|
||||
// ignore all spaces
|
||||
strVal := strings.ReplaceAll(rateLimit, " ", "")
|
||||
if len(strVal) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
matches := rateLimitRegexp.FindStringSubmatch(strVal)
|
||||
if matches == nil {
|
||||
return 0, fmt.Errorf("invalid rate limit %q: it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional, rexpexp: %v", strVal, rateLimitRegexp)
|
||||
}
|
||||
bytes, err := strconv.ParseFloat(matches[1], 64)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid rate limit %q: '%v' is not count of bytes: %v", strVal, matches[1], err)
|
||||
}
|
||||
bytes = bytes * bytesUnitMultiplier[matches[2]]
|
||||
var time float64
|
||||
if len(matches[3]) == 0 {
|
||||
time = 1 // number is not specified, for example 5kbs or 1Mb/s
|
||||
} else {
|
||||
int64Val, err := strconv.ParseUint(matches[3], 10, 32)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid rate limit %q: time is out of range: %v", strVal, err)
|
||||
}
|
||||
if int64Val <= 0 {
|
||||
return 0, fmt.Errorf("invalid rate limit %q: positive time expected but %v supplied", strVal, matches[3])
|
||||
}
|
||||
time = float64(int64Val)
|
||||
}
|
||||
time = time * timeUnitMultiplier[matches[4]]
|
||||
return bytes / time, nil
|
||||
}
|
84
internal/throttler/throttler_test.go
Normal file
84
internal/throttler/throttler_test.go
Normal file
@ -0,0 +1,84 @@
|
||||
package throttler_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influx-cli/v2/internal/throttler"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestThrottlerPassthrough(t *testing.T) {
|
||||
// Hard to test that rate-limiting actually works, so we just check
|
||||
// that no data is lost.
|
||||
in := "Hello world!"
|
||||
throttler, err := throttler.NewThrottler("1B/s")
|
||||
require.NoError(t, err)
|
||||
r := throttler.Throttle(context.Background(), strings.NewReader(in))
|
||||
|
||||
out := bytes.Buffer{}
|
||||
_, err = out.ReadFrom(r)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, in, out.String())
|
||||
}
|
||||
|
||||
func TestToBytesPerSecond(t *testing.T) {
|
||||
var tests = []struct {
|
||||
in string
|
||||
out float64
|
||||
error string
|
||||
}{
|
||||
{
|
||||
in: "5 MB / 5 min",
|
||||
out: float64(5*1024*1024) / float64(5*60),
|
||||
},
|
||||
{
|
||||
in: "17kBs",
|
||||
out: float64(17 * 1024),
|
||||
},
|
||||
{
|
||||
in: "1B/m",
|
||||
out: float64(1) / float64(60),
|
||||
},
|
||||
{
|
||||
in: "1B/2sec",
|
||||
out: float64(1) / float64(2),
|
||||
},
|
||||
{
|
||||
in: "",
|
||||
out: 0,
|
||||
},
|
||||
{
|
||||
in: "1B/munite",
|
||||
error: `invalid rate limit "1B/munite": it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional`,
|
||||
},
|
||||
{
|
||||
in: ".B/s",
|
||||
error: `invalid rate limit ".B/s": '.' is not count of bytes:`,
|
||||
},
|
||||
{
|
||||
in: "1B0s",
|
||||
error: `invalid rate limit "1B0s": positive time expected but 0 supplied`,
|
||||
},
|
||||
{
|
||||
in: "1MB/42949672950s",
|
||||
error: `invalid rate limit "1MB/42949672950s": time is out of range`,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.in, func(t *testing.T) {
|
||||
bytesPerSec, err := throttler.ToBytesPerSecond(test.in)
|
||||
if len(test.error) == 0 {
|
||||
require.Equal(t, test.out, bytesPerSec)
|
||||
require.Nil(t, err)
|
||||
} else {
|
||||
require.NotNil(t, err)
|
||||
// contains is used, since the error messages contains root cause that may evolve with go versions
|
||||
require.Contains(t, err.Error(), test.error)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user