
* refactor: take clients out of internal * refactor: move stdio to pkg * Move internal/api to api * refactor: final changes for Kapacitor to access shared functionality * chore: regenerate mocks * fix: bad automated refactor * chore: extra formatting not caught by make fmt
90 lines
2.7 KiB
Go
90 lines
2.7 KiB
Go
package write
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/fujiwara/shapeio"
|
|
"github.com/influxdata/influx-cli/v2/pkg/csv2lp"
|
|
)
|
|
|
|
type Throttler struct {
|
|
bytesPerSecond float64
|
|
}
|
|
|
|
func NewThrottler(bytesPerSec BytesPerSec) *Throttler {
|
|
return &Throttler{bytesPerSecond: float64(bytesPerSec)}
|
|
}
|
|
|
|
func (t *Throttler) Throttle(ctx context.Context, in io.Reader) io.Reader {
|
|
if t.bytesPerSecond == 0.0 {
|
|
return in
|
|
}
|
|
|
|
// LineReader ensures that original reader is consumed in the smallest possible
|
|
// units (at most one protocol line) to avoid bigger pauses in throttling
|
|
throttledReader := shapeio.NewReaderWithContext(csv2lp.NewLineReader(in), ctx)
|
|
throttledReader.SetRateLimit(t.bytesPerSecond)
|
|
|
|
return throttledReader
|
|
}
|
|
|
|
var rateLimitRegexp = regexp.MustCompile(`^(\d*\.?\d*)(B|kB|MB)/?(\d*)?(s|sec|m|min)$`)
|
|
var bytesUnitMultiplier = map[string]float64{"B": 1, "kB": 1024, "MB": 1_048_576}
|
|
var timeUnitMultiplier = map[string]float64{"s": 1, "sec": 1, "m": 60, "min": 60}
|
|
|
|
type BytesPerSec float64
|
|
|
|
func (b *BytesPerSec) Set(v string) (err error) {
|
|
bps, err := ToBytesPerSecond(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*b = BytesPerSec(bps)
|
|
return nil
|
|
}
|
|
|
|
func (b BytesPerSec) String() string {
|
|
return strconv.FormatFloat(float64(b), 'E', -1, 64)
|
|
}
|
|
|
|
// ToBytesPerSecond converts rate from string to number. The supplied string
|
|
// value format must be COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional.
|
|
// All spaces are ignored, they can help with formatting. Examples: "5 MB / 5 min", 17kbs. 5.1MB5m.
|
|
func ToBytesPerSecond(rateLimit string) (BytesPerSec, error) {
|
|
// ignore all spaces
|
|
strVal := strings.ReplaceAll(rateLimit, " ", "")
|
|
if len(strVal) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
matches := rateLimitRegexp.FindStringSubmatch(strVal)
|
|
if matches == nil {
|
|
return 0, fmt.Errorf("invalid rate limit %q: it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional, rexpexp: %v", strVal, rateLimitRegexp)
|
|
}
|
|
bytes, err := strconv.ParseFloat(matches[1], 64)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("invalid rate limit %q: '%v' is not count of bytes: %v", strVal, matches[1], err)
|
|
}
|
|
bytes = bytes * bytesUnitMultiplier[matches[2]]
|
|
var time float64
|
|
if len(matches[3]) == 0 {
|
|
time = 1 // number is not specified, for example 5kbs or 1Mb/s
|
|
} else {
|
|
int64Val, err := strconv.ParseUint(matches[3], 10, 32)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("invalid rate limit %q: time is out of range: %v", strVal, err)
|
|
}
|
|
if int64Val <= 0 {
|
|
return 0, fmt.Errorf("invalid rate limit %q: positive time expected but %v supplied", strVal, matches[3])
|
|
}
|
|
time = float64(int64Val)
|
|
}
|
|
time = time * timeUnitMultiplier[matches[4]]
|
|
return BytesPerSec(bytes / time), nil
|
|
}
|