diff --git a/go.mod b/go.mod index 8ae7280..26e5947 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/AlecAivazis/survey/v2 v2.2.9 github.com/BurntSushi/toml v0.3.1 github.com/daixiang0/gci v0.2.8 + github.com/fujiwara/shapeio v1.0.0 github.com/kr/pretty v0.1.0 // indirect github.com/stretchr/testify v1.7.0 github.com/urfave/cli/v2 v2.3.0 diff --git a/go.sum b/go.sum index cbbc2f3..ae78736 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,10 @@ github.com/daixiang0/gci v0.2.8/go.mod h1:+4dZ7TISfSmqfAGv59ePaHfNzgGtIkHAhhdKgg github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/fujiwara/shapeio v1.0.0 h1:xG5D9oNqCSUUbryZ/jQV3cqe1v2suEjwPIcEg1gKM8M= +github.com/fujiwara/shapeio v1.0.0/go.mod h1:LmEmu6L/8jetyj1oewewFb7bZCNRwE7wLCUNzDLaLVA= github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174 h1:WlZsjVhE8Af9IcZDGgJGQpNflI3+MJSBhsgT5PCtzBQ= github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174/go.mod h1:DqJ97dSdRW1W22yXSB90986pcOyQ7r45iio1KN2ez1A= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= @@ -64,6 +68,8 @@ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE= +golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201118003311-bd56c0adb394/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= diff --git a/internal/throttler/throttler.go b/internal/throttler/throttler.go new file mode 100644 index 0000000..7175994 --- /dev/null +++ b/internal/throttler/throttler.go @@ -0,0 +1,78 @@ +package throttler + +import ( + "context" + "fmt" + "io" + "regexp" + "strconv" + "strings" + + "github.com/fujiwara/shapeio" + "github.com/influxdata/influx-cli/v2/pkg/csv2lp" +) + +type Throttler struct { + bytesPerSecond float64 +} + +func NewThrottler(rate string) (*Throttler, error) { + bytesPerSec, err := ToBytesPerSecond(rate) + if err != nil { + return nil, err + } + return &Throttler{bytesPerSecond: bytesPerSec}, nil +} + +func (t *Throttler) Throttle(ctx context.Context, in io.Reader) io.Reader { + if t.bytesPerSecond == 0.0 { + return in + } + + // LineReader ensures that original reader is consumed in the smallest possible + // units (at most one protocol line) to avoid bigger pauses in throttling + throttledReader := shapeio.NewReaderWithContext(csv2lp.NewLineReader(in), ctx) + throttledReader.SetRateLimit(t.bytesPerSecond) + + return throttledReader +} + +var rateLimitRegexp = regexp.MustCompile(`^(\d*\.?\d*)(B|kB|MB)/?(\d*)?(s|sec|m|min)$`) +var bytesUnitMultiplier = map[string]float64{"B": 1, "kB": 1024, "MB": 1_048_576} +var timeUnitMultiplier = map[string]float64{"s": 1, "sec": 1, "m": 60, "min": 60} + +// ToBytesPerSecond converts rate from string to number. The supplied string +// value format must be COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional. +// All spaces are ignored, they can help with formatting. Examples: "5 MB / 5 min", 17kbs. 5.1MB5m. +func ToBytesPerSecond(rateLimit string) (float64, error) { + // ignore all spaces + strVal := strings.ReplaceAll(rateLimit, " ", "") + if len(strVal) == 0 { + return 0, nil + } + + matches := rateLimitRegexp.FindStringSubmatch(strVal) + if matches == nil { + return 0, fmt.Errorf("invalid rate limit %q: it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional, rexpexp: %v", strVal, rateLimitRegexp) + } + bytes, err := strconv.ParseFloat(matches[1], 64) + if err != nil { + return 0, fmt.Errorf("invalid rate limit %q: '%v' is not count of bytes: %v", strVal, matches[1], err) + } + bytes = bytes * bytesUnitMultiplier[matches[2]] + var time float64 + if len(matches[3]) == 0 { + time = 1 // number is not specified, for example 5kbs or 1Mb/s + } else { + int64Val, err := strconv.ParseUint(matches[3], 10, 32) + if err != nil { + return 0, fmt.Errorf("invalid rate limit %q: time is out of range: %v", strVal, err) + } + if int64Val <= 0 { + return 0, fmt.Errorf("invalid rate limit %q: positive time expected but %v supplied", strVal, matches[3]) + } + time = float64(int64Val) + } + time = time * timeUnitMultiplier[matches[4]] + return bytes / time, nil +} diff --git a/internal/throttler/throttler_test.go b/internal/throttler/throttler_test.go new file mode 100644 index 0000000..6a1caa1 --- /dev/null +++ b/internal/throttler/throttler_test.go @@ -0,0 +1,84 @@ +package throttler_test + +import ( + "bytes" + "context" + "strings" + "testing" + + "github.com/influxdata/influx-cli/v2/internal/throttler" + "github.com/stretchr/testify/require" +) + +func TestThrottlerPassthrough(t *testing.T) { + // Hard to test that rate-limiting actually works, so we just check + // that no data is lost. + in := "Hello world!" + throttler, err := throttler.NewThrottler("1B/s") + require.NoError(t, err) + r := throttler.Throttle(context.Background(), strings.NewReader(in)) + + out := bytes.Buffer{} + _, err = out.ReadFrom(r) + require.NoError(t, err) + + require.Equal(t, in, out.String()) +} + +func TestToBytesPerSecond(t *testing.T) { + var tests = []struct { + in string + out float64 + error string + }{ + { + in: "5 MB / 5 min", + out: float64(5*1024*1024) / float64(5*60), + }, + { + in: "17kBs", + out: float64(17 * 1024), + }, + { + in: "1B/m", + out: float64(1) / float64(60), + }, + { + in: "1B/2sec", + out: float64(1) / float64(2), + }, + { + in: "", + out: 0, + }, + { + in: "1B/munite", + error: `invalid rate limit "1B/munite": it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional`, + }, + { + in: ".B/s", + error: `invalid rate limit ".B/s": '.' is not count of bytes:`, + }, + { + in: "1B0s", + error: `invalid rate limit "1B0s": positive time expected but 0 supplied`, + }, + { + in: "1MB/42949672950s", + error: `invalid rate limit "1MB/42949672950s": time is out of range`, + }, + } + for _, test := range tests { + t.Run(test.in, func(t *testing.T) { + bytesPerSec, err := throttler.ToBytesPerSecond(test.in) + if len(test.error) == 0 { + require.Equal(t, test.out, bytesPerSec) + require.Nil(t, err) + } else { + require.NotNil(t, err) + // contains is used, since the error messages contains root cause that may evolve with go versions + require.Contains(t, err.Error(), test.error) + } + }) + } +}