feat: port influx write from influxdb (#47)

This commit is contained in:
Daniel Moran 2021-04-26 12:32:54 -04:00 committed by GitHub
parent bd97d2c7f3
commit 0e1db1e782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1417 additions and 1 deletions

View File

@ -85,6 +85,14 @@ var commonFlagsNoToken = []cli.Flag{
},
}
// Most commands use this form of the token flag.
var commonFlags = append(commonFlagsNoToken, &cli.StringFlag{
Name: tokenFlag,
Usage: "Authentication token",
Aliases: []string{"t"},
EnvVars: []string{"INFLUX_TOKEN"},
})
// newCli builds a CLI core that reads from stdin, writes to stdout/stderr, manages a local config store,
// and optionally tracks a trace ID specified over the CLI.
func newCli(ctx *cli.Context) (*internal.CLI, error) {
@ -164,6 +172,7 @@ var app = cli.App{
&versionCmd,
&pingCmd,
&setupCmd,
&writeCmd,
},
}

281
cmd/influx/write.go Normal file
View File

@ -0,0 +1,281 @@
package main
import (
"fmt"
"io"
"net/http"
"os"
"github.com/influxdata/influx-cli/v2/internal"
"github.com/influxdata/influx-cli/v2/internal/api"
"github.com/influxdata/influx-cli/v2/internal/batcher"
"github.com/influxdata/influx-cli/v2/internal/linereader"
"github.com/influxdata/influx-cli/v2/internal/throttler"
"github.com/urfave/cli/v2"
)
var writeFlags = append(
commonFlags,
&cli.StringFlag{
Name: "bucket-id",
Usage: "The ID of destination bucket",
EnvVars: []string{"INFLUX_BUCKET_ID"},
},
&cli.StringFlag{
Name: "bucket",
Usage: "The name of destination bucket",
Aliases: []string{"b"},
EnvVars: []string{"INFLUX_BUCKET_NAME"},
},
&cli.StringFlag{
Name: "org-id",
Usage: "The ID of the organization",
EnvVars: []string{"INFLUX_ORG_ID"},
},
&cli.StringFlag{
Name: "org",
Usage: "The name of the organization",
Aliases: []string{"o"},
EnvVars: []string{"INFLUX_ORG"},
},
&cli.StringFlag{
Name: "precision",
Usage: "Precision of the timestamps of the lines",
Aliases: []string{"p"},
EnvVars: []string{"INFLUX_PRECISION"},
Value: "ns",
},
&cli.StringFlag{
Name: "format",
Usage: "Input format, either 'lp' (Line Protocol) or 'csv' (Comma Separated Values)",
DefaultText: "'lp' unless '.csv' extension",
},
&cli.StringSliceFlag{
Name: "header",
Usage: "Header prepends lines to input data",
},
&cli.StringSliceFlag{
Name: "file",
Usage: "The path to the file to import",
Aliases: []string{"f"},
TakesFile: true,
},
&cli.StringSliceFlag{
Name: "url",
Usage: "The URL to import data from",
Aliases: []string{"u"},
},
&cli.BoolFlag{
Name: "debug",
Usage: "Log CSV columns to stderr before reading data rows",
},
&cli.BoolFlag{
Name: "skipRowOnError",
Usage: "Log CSV data errors to stderr and continue with CSV processing",
},
// NOTE: The old CLI allowed this flag to be used as an int _or_ a bool, with the bool form being
// short-hand for N=1. urfave/cli isn't that flexible.
&cli.IntFlag{
Name: "skipHeader",
Usage: "Skip the first <n> rows from input data",
},
&cli.IntFlag{
Name: "max-line-length",
Usage: "Specifies the maximum number of bytes that can be read for a single line",
Value: 16_000_000,
},
&cli.BoolFlag{
Name: "xIgnoreDataTypeInColumnName",
Usage: "Ignores dataType which could be specified after ':' in column name",
Hidden: true,
},
&cli.StringFlag{
Name: "encoding",
Usage: "Character encoding of input files or stdin",
Value: "UTF-8",
},
&cli.StringFlag{
Name: "errors-file",
Usage: "The path to the file to write rejected rows to",
TakesFile: true,
},
&cli.StringFlag{
Name: "rate-limit",
Usage: `Throttles write, examples: "5 MB / 5 min" , "17kBs"`,
DefaultText: "no throttling",
},
&cli.StringFlag{
Name: "compression",
Usage: "Input compression, either 'none' or 'gzip'",
DefaultText: "'none' unless an input has a '.gz' extension",
},
)
var writeCmd = cli.Command{
Name: "write",
Usage: "Write points to InfluxDB",
Description: "Write data to InfluxDB via stdin, or add an entire file specified with the -f flag",
Flags: writeFlags,
Action: func(ctx *cli.Context) error {
format, err := parseFormat(ctx.String("format"))
if err != nil {
return err
}
compression, err := parseCompression(ctx.String("compression"))
if err != nil {
return err
}
precision, err := parsePrecision(ctx.String("precision"))
if err != nil {
return err
}
var errorOut io.Writer
if ctx.IsSet("errors-file") {
errorFile, err := os.Open(ctx.String("errors-file"))
if err != nil {
return fmt.Errorf("failed to open errors-file: %w", err)
}
defer errorFile.Close()
errorOut = errorFile
}
throttler, err := throttler.NewThrottler(ctx.String("rate-limit"))
if err != nil {
return err
}
cli, err := newCli(ctx)
if err != nil {
return err
}
client, err := newApiClient(ctx, cli, true)
if err != nil {
return err
}
writeClients := &internal.WriteClients{
Client: client.WriteApi,
Reader: &linereader.MultiInputLineReader{
StdIn: os.Stdin,
HttpClient: http.DefaultClient,
ErrorOut: errorOut,
Args: ctx.Args().Slice(),
Files: ctx.StringSlice("file"),
URLs: ctx.StringSlice("url"),
Format: format,
Compression: compression,
Encoding: ctx.String("encoding"),
Headers: ctx.StringSlice("header"),
SkipRowOnError: ctx.Bool("skipRowOnError"),
SkipHeader: ctx.Int("skipHeader"),
IgnoreDataTypeInColumnName: ctx.Bool("xIgnoreDataTypeInColumnName"),
Debug: ctx.Bool("debug"),
},
Throttler: throttler,
Writer: &batcher.BufferBatcher{
MaxFlushBytes: batcher.DefaultMaxBytes,
MaxFlushInterval: batcher.DefaultInterval,
MaxLineLength: ctx.Int("max-line-length"),
},
}
return cli.Write(standardCtx(ctx), writeClients, &internal.WriteParams{
BucketID: ctx.String("bucket-id"),
BucketName: ctx.String("bucket"),
OrgID: ctx.String("org-id"),
OrgName: ctx.String("org"),
Precision: precision,
})
},
Subcommands: []*cli.Command{
{
Name: "dryrun",
Usage: "Write to stdout instead of InfluxDB",
Description: "Write protocol lines to stdout instead of InfluxDB. Troubleshoot conversion from CSV to line protocol",
Flags: writeFlags,
Action: func(ctx *cli.Context) error {
format, err := parseFormat(ctx.String("format"))
if err != nil {
return err
}
compression, err := parseCompression(ctx.String("compression"))
if err != nil {
return err
}
var errorOut io.Writer
if ctx.IsSet("errors-file") {
errorFile, err := os.Open(ctx.String("errors-file"))
if err != nil {
return fmt.Errorf("failed to open errors-file: %w", err)
}
defer errorFile.Close()
errorOut = errorFile
}
cli, err := newCli(ctx)
if err != nil {
return err
}
reader := &linereader.MultiInputLineReader{
StdIn: os.Stdin,
HttpClient: http.DefaultClient,
ErrorOut: errorOut,
Args: ctx.Args().Slice(),
Files: ctx.StringSlice("file"),
URLs: ctx.StringSlice("url"),
Format: format,
Compression: compression,
Encoding: ctx.String("encoding"),
Headers: ctx.StringSlice("header"),
SkipRowOnError: ctx.Bool("skipRowOnError"),
SkipHeader: ctx.Int("skipHeader"),
IgnoreDataTypeInColumnName: ctx.Bool("xIgnoreDataTypeInColumnName"),
Debug: ctx.Bool("debug"),
}
return cli.WriteDryRun(standardCtx(ctx), reader)
},
},
},
}
func parseFormat(f string) (linereader.InputFormat, error) {
switch f {
case "":
return linereader.InputFormatDerived, nil
case "lp":
return linereader.InputFormatLP, nil
case "csv":
return linereader.InputFormatCSV, nil
default:
return 0, fmt.Errorf("unsupported format: %q", f)
}
}
func parseCompression(c string) (linereader.InputCompression, error) {
switch c {
case "":
return linereader.InputCompressionDerived, nil
case "none":
return linereader.InputCompressionNone, nil
case "gzip":
return linereader.InputCompressionGZIP, nil
default:
return 0, fmt.Errorf("unsupported compression: %q", c)
}
}
func parsePrecision(p string) (api.WritePrecision, error) {
switch p {
case "ms":
return api.WRITEPRECISION_MS, nil
case "s":
return api.WRITEPRECISION_S, nil
case "us":
return api.WRITEPRECISION_US, nil
case "ns":
return api.WRITEPRECISION_NS, nil
default:
return "", fmt.Errorf("unsupported precision: %q", p)
}
}

View File

@ -4,7 +4,7 @@ declare -r ETC_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &> /dev/null && pwd)"
declare -r ROOT_DIR="$(dirname ${ETC_DIR})"
declare -r GENERATOR_DOCKER_IMG=openapitools/openapi-generator-cli:v5.1.0
declare -r OPENAPI_COMMIT=5ab3ef0b9a6aee68b3b34e1858ca6d55c153650a
declare -r OPENAPI_COMMIT=4b75803e472eadb7be101fbe84f1f120f0a008b0
# Download our target API spec.
# NOTE: openapi-generator supports HTTP references to API docs, but using that feature

302
internal/api/api_write.go Normal file
View File

@ -0,0 +1,302 @@
/*
* Subset of Influx API covered by Influx CLI
*
* No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
*
* API version: 2.0.0
*/
// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT.
package api
import (
"bytes"
_context "context"
_ioutil "io/ioutil"
_nethttp "net/http"
_neturl "net/url"
)
// Linger please
var (
_ _context.Context
)
type WriteApi interface {
/*
* PostWrite Write time series data into InfluxDB
* @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background().
* @return ApiPostWriteRequest
*/
PostWrite(ctx _context.Context) ApiPostWriteRequest
/*
* PostWriteExecute executes the request
*/
PostWriteExecute(r ApiPostWriteRequest) (*_nethttp.Response, error)
}
// WriteApiService WriteApi service
type WriteApiService service
type ApiPostWriteRequest struct {
ctx _context.Context
ApiService WriteApi
org *string
bucket *string
body []byte
zapTraceSpan *string
contentEncoding *string
contentType *string
contentLength *int32
accept *string
orgID *string
precision *WritePrecision
}
func (r ApiPostWriteRequest) Org(org string) ApiPostWriteRequest {
r.org = &org
return r
}
func (r ApiPostWriteRequest) GetOrg() *string {
return r.org
}
func (r ApiPostWriteRequest) Bucket(bucket string) ApiPostWriteRequest {
r.bucket = &bucket
return r
}
func (r ApiPostWriteRequest) GetBucket() *string {
return r.bucket
}
func (r ApiPostWriteRequest) Body(body []byte) ApiPostWriteRequest {
r.body = body
return r
}
func (r ApiPostWriteRequest) GetBody() []byte {
return r.body
}
func (r ApiPostWriteRequest) ZapTraceSpan(zapTraceSpan string) ApiPostWriteRequest {
r.zapTraceSpan = &zapTraceSpan
return r
}
func (r ApiPostWriteRequest) GetZapTraceSpan() *string {
return r.zapTraceSpan
}
func (r ApiPostWriteRequest) ContentEncoding(contentEncoding string) ApiPostWriteRequest {
r.contentEncoding = &contentEncoding
return r
}
func (r ApiPostWriteRequest) GetContentEncoding() *string {
return r.contentEncoding
}
func (r ApiPostWriteRequest) ContentType(contentType string) ApiPostWriteRequest {
r.contentType = &contentType
return r
}
func (r ApiPostWriteRequest) GetContentType() *string {
return r.contentType
}
func (r ApiPostWriteRequest) ContentLength(contentLength int32) ApiPostWriteRequest {
r.contentLength = &contentLength
return r
}
func (r ApiPostWriteRequest) GetContentLength() *int32 {
return r.contentLength
}
func (r ApiPostWriteRequest) Accept(accept string) ApiPostWriteRequest {
r.accept = &accept
return r
}
func (r ApiPostWriteRequest) GetAccept() *string {
return r.accept
}
func (r ApiPostWriteRequest) OrgID(orgID string) ApiPostWriteRequest {
r.orgID = &orgID
return r
}
func (r ApiPostWriteRequest) GetOrgID() *string {
return r.orgID
}
func (r ApiPostWriteRequest) Precision(precision WritePrecision) ApiPostWriteRequest {
r.precision = &precision
return r
}
func (r ApiPostWriteRequest) GetPrecision() *WritePrecision {
return r.precision
}
func (r ApiPostWriteRequest) Execute() (*_nethttp.Response, error) {
return r.ApiService.PostWriteExecute(r)
}
/*
* PostWrite Write time series data into InfluxDB
* @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background().
* @return ApiPostWriteRequest
*/
func (a *WriteApiService) PostWrite(ctx _context.Context) ApiPostWriteRequest {
return ApiPostWriteRequest{
ApiService: a,
ctx: ctx,
}
}
/*
* Execute executes the request
*/
func (a *WriteApiService) PostWriteExecute(r ApiPostWriteRequest) (*_nethttp.Response, error) {
var (
localVarHTTPMethod = _nethttp.MethodPost
localVarPostBody interface{}
localVarFormFileName string
localVarFileName string
localVarFileBytes []byte
)
localBasePath, err := a.client.cfg.ServerURLWithContext(r.ctx, "WriteApiService.PostWrite")
if err != nil {
return nil, GenericOpenAPIError{error: err.Error()}
}
localVarPath := localBasePath + "/write"
localVarHeaderParams := make(map[string]string)
localVarQueryParams := _neturl.Values{}
localVarFormParams := _neturl.Values{}
if r.org == nil {
return nil, reportError("org is required and must be specified")
}
if r.bucket == nil {
return nil, reportError("bucket is required and must be specified")
}
if r.body == nil {
return nil, reportError("body is required and must be specified")
}
localVarQueryParams.Add("org", parameterToString(*r.org, ""))
if r.orgID != nil {
localVarQueryParams.Add("orgID", parameterToString(*r.orgID, ""))
}
localVarQueryParams.Add("bucket", parameterToString(*r.bucket, ""))
if r.precision != nil {
localVarQueryParams.Add("precision", parameterToString(*r.precision, ""))
}
// to determine the Content-Type header
localVarHTTPContentTypes := []string{"text/plain"}
// set Content-Type header
localVarHTTPContentType := selectHeaderContentType(localVarHTTPContentTypes)
if localVarHTTPContentType != "" {
localVarHeaderParams["Content-Type"] = localVarHTTPContentType
}
// to determine the Accept header
localVarHTTPHeaderAccepts := []string{"application/json"}
// set Accept header
localVarHTTPHeaderAccept := selectHeaderAccept(localVarHTTPHeaderAccepts)
if localVarHTTPHeaderAccept != "" {
localVarHeaderParams["Accept"] = localVarHTTPHeaderAccept
}
if r.zapTraceSpan != nil {
localVarHeaderParams["Zap-Trace-Span"] = parameterToString(*r.zapTraceSpan, "")
}
if r.contentEncoding != nil {
localVarHeaderParams["Content-Encoding"] = parameterToString(*r.contentEncoding, "")
}
if r.contentType != nil {
localVarHeaderParams["Content-Type"] = parameterToString(*r.contentType, "")
}
if r.contentLength != nil {
localVarHeaderParams["Content-Length"] = parameterToString(*r.contentLength, "")
}
if r.accept != nil {
localVarHeaderParams["Accept"] = parameterToString(*r.accept, "")
}
// body params
localVarPostBody = r.body
req, err := a.client.prepareRequest(r.ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, localVarFormFileName, localVarFileName, localVarFileBytes)
if err != nil {
return nil, err
}
localVarHTTPResponse, err := a.client.callAPI(req)
if err != nil || localVarHTTPResponse == nil {
return localVarHTTPResponse, err
}
localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body)
localVarHTTPResponse.Body.Close()
localVarHTTPResponse.Body = _ioutil.NopCloser(bytes.NewBuffer(localVarBody))
if err != nil {
return localVarHTTPResponse, err
}
if localVarHTTPResponse.StatusCode >= 300 {
newErr := GenericOpenAPIError{
body: localVarBody,
error: localVarHTTPResponse.Status,
}
if localVarHTTPResponse.StatusCode == 400 {
var v LineProtocolError
err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type"))
if err != nil {
newErr.error = err.Error()
return localVarHTTPResponse, newErr
}
newErr.model = &v
return localVarHTTPResponse, newErr
}
if localVarHTTPResponse.StatusCode == 401 {
var v Error
err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type"))
if err != nil {
newErr.error = err.Error()
return localVarHTTPResponse, newErr
}
newErr.model = &v
return localVarHTTPResponse, newErr
}
if localVarHTTPResponse.StatusCode == 403 {
var v Error
err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type"))
if err != nil {
newErr.error = err.Error()
return localVarHTTPResponse, newErr
}
newErr.model = &v
return localVarHTTPResponse, newErr
}
if localVarHTTPResponse.StatusCode == 413 {
var v LineProtocolLengthError
err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type"))
if err != nil {
newErr.error = err.Error()
return localVarHTTPResponse, newErr
}
newErr.model = &v
return localVarHTTPResponse, newErr
}
var v Error
err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type"))
if err != nil {
newErr.error = err.Error()
return localVarHTTPResponse, newErr
}
newErr.model = &v
return localVarHTTPResponse, newErr
}
return localVarHTTPResponse, nil
}

View File

@ -50,6 +50,8 @@ type APIClient struct {
HealthApi HealthApi
SetupApi SetupApi
WriteApi WriteApi
}
type service struct {
@ -70,6 +72,7 @@ func NewAPIClient(cfg *Configuration) *APIClient {
// API Services
c.HealthApi = (*HealthApiService)(&c.common)
c.SetupApi = (*SetupApiService)(&c.common)
c.WriteApi = (*WriteApiService)(&c.common)
return c
}

View File

@ -35,3 +35,11 @@ func (o *HealthCheck) Error() string {
}
return fmt.Sprintf("health check failed: %s", message)
}
func (o *LineProtocolError) Error() string {
return o.Message
}
func (o *LineProtocolLengthError) Error() string {
return o.Message
}

View File

@ -0,0 +1,234 @@
/*
* Subset of Influx API covered by Influx CLI
*
* No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
*
* API version: 2.0.0
*/
// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT.
package api
import (
"encoding/json"
)
// LineProtocolError struct for LineProtocolError
type LineProtocolError struct {
// Code is the machine-readable error code.
Code string `json:"code"`
// Message is a human-readable message.
Message string `json:"message"`
// Op describes the logical code operation during error. Useful for debugging.
Op string `json:"op"`
// Err is a stack of errors that occurred during processing of the request. Useful for debugging.
Err string `json:"err"`
// First line within sent body containing malformed data
Line *int32 `json:"line,omitempty"`
}
// NewLineProtocolError instantiates a new LineProtocolError object
// This constructor will assign default values to properties that have it defined,
// and makes sure properties required by API are set, but the set of arguments
// will change when the set of required properties is changed
func NewLineProtocolError(code string, message string, op string, err string) *LineProtocolError {
this := LineProtocolError{}
this.Code = code
this.Message = message
this.Op = op
this.Err = err
return &this
}
// NewLineProtocolErrorWithDefaults instantiates a new LineProtocolError object
// This constructor will only assign default values to properties that have it defined,
// but it doesn't guarantee that properties required by API are set
func NewLineProtocolErrorWithDefaults() *LineProtocolError {
this := LineProtocolError{}
return &this
}
// GetCode returns the Code field value
func (o *LineProtocolError) GetCode() string {
if o == nil {
var ret string
return ret
}
return o.Code
}
// GetCodeOk returns a tuple with the Code field value
// and a boolean to check if the value has been set.
func (o *LineProtocolError) GetCodeOk() (*string, bool) {
if o == nil {
return nil, false
}
return &o.Code, true
}
// SetCode sets field value
func (o *LineProtocolError) SetCode(v string) {
o.Code = v
}
// GetMessage returns the Message field value
func (o *LineProtocolError) GetMessage() string {
if o == nil {
var ret string
return ret
}
return o.Message
}
// GetMessageOk returns a tuple with the Message field value
// and a boolean to check if the value has been set.
func (o *LineProtocolError) GetMessageOk() (*string, bool) {
if o == nil {
return nil, false
}
return &o.Message, true
}
// SetMessage sets field value
func (o *LineProtocolError) SetMessage(v string) {
o.Message = v
}
// GetOp returns the Op field value
func (o *LineProtocolError) GetOp() string {
if o == nil {
var ret string
return ret
}
return o.Op
}
// GetOpOk returns a tuple with the Op field value
// and a boolean to check if the value has been set.
func (o *LineProtocolError) GetOpOk() (*string, bool) {
if o == nil {
return nil, false
}
return &o.Op, true
}
// SetOp sets field value
func (o *LineProtocolError) SetOp(v string) {
o.Op = v
}
// GetErr returns the Err field value
func (o *LineProtocolError) GetErr() string {
if o == nil {
var ret string
return ret
}
return o.Err
}
// GetErrOk returns a tuple with the Err field value
// and a boolean to check if the value has been set.
func (o *LineProtocolError) GetErrOk() (*string, bool) {
if o == nil {
return nil, false
}
return &o.Err, true
}
// SetErr sets field value
func (o *LineProtocolError) SetErr(v string) {
o.Err = v
}
// GetLine returns the Line field value if set, zero value otherwise.
func (o *LineProtocolError) GetLine() int32 {
if o == nil || o.Line == nil {
var ret int32
return ret
}
return *o.Line
}
// GetLineOk returns a tuple with the Line field value if set, nil otherwise
// and a boolean to check if the value has been set.
func (o *LineProtocolError) GetLineOk() (*int32, bool) {
if o == nil || o.Line == nil {
return nil, false
}
return o.Line, true
}
// HasLine returns a boolean if a field has been set.
func (o *LineProtocolError) HasLine() bool {
if o != nil && o.Line != nil {
return true
}
return false
}
// SetLine gets a reference to the given int32 and assigns it to the Line field.
func (o *LineProtocolError) SetLine(v int32) {
o.Line = &v
}
func (o LineProtocolError) MarshalJSON() ([]byte, error) {
toSerialize := map[string]interface{}{}
if true {
toSerialize["code"] = o.Code
}
if true {
toSerialize["message"] = o.Message
}
if true {
toSerialize["op"] = o.Op
}
if true {
toSerialize["err"] = o.Err
}
if o.Line != nil {
toSerialize["line"] = o.Line
}
return json.Marshal(toSerialize)
}
type NullableLineProtocolError struct {
value *LineProtocolError
isSet bool
}
func (v NullableLineProtocolError) Get() *LineProtocolError {
return v.value
}
func (v *NullableLineProtocolError) Set(val *LineProtocolError) {
v.value = val
v.isSet = true
}
func (v NullableLineProtocolError) IsSet() bool {
return v.isSet
}
func (v *NullableLineProtocolError) Unset() {
v.value = nil
v.isSet = false
}
func NewNullableLineProtocolError(val *LineProtocolError) *NullableLineProtocolError {
return &NullableLineProtocolError{value: val, isSet: true}
}
func (v NullableLineProtocolError) MarshalJSON() ([]byte, error) {
return json.Marshal(v.value)
}
func (v *NullableLineProtocolError) UnmarshalJSON(src []byte) error {
v.isSet = true
return json.Unmarshal(src, &v.value)
}

View File

@ -0,0 +1,167 @@
/*
* Subset of Influx API covered by Influx CLI
*
* No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
*
* API version: 2.0.0
*/
// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT.
package api
import (
"encoding/json"
)
// LineProtocolLengthError struct for LineProtocolLengthError
type LineProtocolLengthError struct {
// Code is the machine-readable error code.
Code string `json:"code"`
// Message is a human-readable message.
Message string `json:"message"`
// Max length in bytes for a body of line-protocol.
MaxLength int32 `json:"maxLength"`
}
// NewLineProtocolLengthError instantiates a new LineProtocolLengthError object
// This constructor will assign default values to properties that have it defined,
// and makes sure properties required by API are set, but the set of arguments
// will change when the set of required properties is changed
func NewLineProtocolLengthError(code string, message string, maxLength int32) *LineProtocolLengthError {
this := LineProtocolLengthError{}
this.Code = code
this.Message = message
this.MaxLength = maxLength
return &this
}
// NewLineProtocolLengthErrorWithDefaults instantiates a new LineProtocolLengthError object
// This constructor will only assign default values to properties that have it defined,
// but it doesn't guarantee that properties required by API are set
func NewLineProtocolLengthErrorWithDefaults() *LineProtocolLengthError {
this := LineProtocolLengthError{}
return &this
}
// GetCode returns the Code field value
func (o *LineProtocolLengthError) GetCode() string {
if o == nil {
var ret string
return ret
}
return o.Code
}
// GetCodeOk returns a tuple with the Code field value
// and a boolean to check if the value has been set.
func (o *LineProtocolLengthError) GetCodeOk() (*string, bool) {
if o == nil {
return nil, false
}
return &o.Code, true
}
// SetCode sets field value
func (o *LineProtocolLengthError) SetCode(v string) {
o.Code = v
}
// GetMessage returns the Message field value
func (o *LineProtocolLengthError) GetMessage() string {
if o == nil {
var ret string
return ret
}
return o.Message
}
// GetMessageOk returns a tuple with the Message field value
// and a boolean to check if the value has been set.
func (o *LineProtocolLengthError) GetMessageOk() (*string, bool) {
if o == nil {
return nil, false
}
return &o.Message, true
}
// SetMessage sets field value
func (o *LineProtocolLengthError) SetMessage(v string) {
o.Message = v
}
// GetMaxLength returns the MaxLength field value
func (o *LineProtocolLengthError) GetMaxLength() int32 {
if o == nil {
var ret int32
return ret
}
return o.MaxLength
}
// GetMaxLengthOk returns a tuple with the MaxLength field value
// and a boolean to check if the value has been set.
func (o *LineProtocolLengthError) GetMaxLengthOk() (*int32, bool) {
if o == nil {
return nil, false
}
return &o.MaxLength, true
}
// SetMaxLength sets field value
func (o *LineProtocolLengthError) SetMaxLength(v int32) {
o.MaxLength = v
}
func (o LineProtocolLengthError) MarshalJSON() ([]byte, error) {
toSerialize := map[string]interface{}{}
if true {
toSerialize["code"] = o.Code
}
if true {
toSerialize["message"] = o.Message
}
if true {
toSerialize["maxLength"] = o.MaxLength
}
return json.Marshal(toSerialize)
}
type NullableLineProtocolLengthError struct {
value *LineProtocolLengthError
isSet bool
}
func (v NullableLineProtocolLengthError) Get() *LineProtocolLengthError {
return v.value
}
func (v *NullableLineProtocolLengthError) Set(val *LineProtocolLengthError) {
v.value = val
v.isSet = true
}
func (v NullableLineProtocolLengthError) IsSet() bool {
return v.isSet
}
func (v *NullableLineProtocolLengthError) Unset() {
v.value = nil
v.isSet = false
}
func NewNullableLineProtocolLengthError(val *LineProtocolLengthError) *NullableLineProtocolLengthError {
return &NullableLineProtocolLengthError{value: val, isSet: true}
}
func (v NullableLineProtocolLengthError) MarshalJSON() ([]byte, error) {
return json.Marshal(v.value)
}
func (v *NullableLineProtocolLengthError) UnmarshalJSON(src []byte) error {
v.isSet = true
return json.Unmarshal(src, &v.value)
}

View File

@ -0,0 +1,85 @@
/*
* Subset of Influx API covered by Influx CLI
*
* No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
*
* API version: 2.0.0
*/
// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT.
package api
import (
"encoding/json"
"fmt"
)
// WritePrecision the model 'WritePrecision'
type WritePrecision string
// List of WritePrecision
const (
WRITEPRECISION_MS WritePrecision = "ms"
WRITEPRECISION_S WritePrecision = "s"
WRITEPRECISION_US WritePrecision = "us"
WRITEPRECISION_NS WritePrecision = "ns"
)
func (v *WritePrecision) UnmarshalJSON(src []byte) error {
var value string
err := json.Unmarshal(src, &value)
if err != nil {
return err
}
enumTypeValue := WritePrecision(value)
for _, existing := range []WritePrecision{"ms", "s", "us", "ns"} {
if existing == enumTypeValue {
*v = enumTypeValue
return nil
}
}
return fmt.Errorf("%+v is not a valid WritePrecision", value)
}
// Ptr returns reference to WritePrecision value
func (v WritePrecision) Ptr() *WritePrecision {
return &v
}
type NullableWritePrecision struct {
value *WritePrecision
isSet bool
}
func (v NullableWritePrecision) Get() *WritePrecision {
return v.value
}
func (v *NullableWritePrecision) Set(val *WritePrecision) {
v.value = val
v.isSet = true
}
func (v NullableWritePrecision) IsSet() bool {
return v.isSet
}
func (v *NullableWritePrecision) Unset() {
v.value = nil
v.isSet = false
}
func NewNullableWritePrecision(val *WritePrecision) *NullableWritePrecision {
return &NullableWritePrecision{value: val, isSet: true}
}
func (v NullableWritePrecision) MarshalJSON() ([]byte, error) {
return json.Marshal(v.value)
}
func (v *NullableWritePrecision) UnmarshalJSON(src []byte) error {
v.isSet = true
return json.Unmarshal(src, &v.value)
}

View File

@ -0,0 +1,23 @@
package mock
import (
"context"
"net/http"
"github.com/influxdata/influx-cli/v2/internal/api"
)
var _ api.WriteApi = (*WriteApi)(nil)
type WriteApi struct {
PostWriteExecuteFn func(api.ApiPostWriteRequest) (*http.Response, error)
}
func (w *WriteApi) PostWrite(context.Context) api.ApiPostWriteRequest {
return api.ApiPostWriteRequest{
ApiService: w,
}
}
func (w *WriteApi) PostWriteExecute(req api.ApiPostWriteRequest) (*http.Response, error) {
return w.PostWriteExecuteFn(req)
}

105
internal/write.go Normal file
View File

@ -0,0 +1,105 @@
package internal
import (
"context"
"errors"
"fmt"
"io"
"github.com/influxdata/influx-cli/v2/internal/api"
)
type LineReader interface {
Open(ctx context.Context) (io.Reader, io.Closer, error)
}
type Throttler interface {
Throttle(ctx context.Context, in io.Reader) io.Reader
}
type Batcher interface {
WriteBatches(ctx context.Context, r io.Reader, writeFn func(batch []byte) error) error
}
type WriteClients struct {
Reader LineReader
Throttler Throttler
Writer Batcher
Client api.WriteApi
}
type WriteParams struct {
BucketID string
BucketName string
OrgID string
OrgName string
Precision api.WritePrecision
}
var ErrWriteCanceled = errors.New("write canceled")
func (c *CLI) Write(ctx context.Context, clients *WriteClients, params *WriteParams) error {
if params.OrgID == "" && params.OrgName == "" && c.ActiveConfig.Org == "" {
return errors.New("must specify org ID or org name")
}
if params.BucketID == "" && params.BucketName == "" {
return errors.New("must specify bucket ID or bucket name")
}
r, closer, err := clients.Reader.Open(ctx)
if closer != nil {
defer closer.Close()
}
if err != nil {
return err
}
writeBatch := func(batch []byte) error {
req := clients.Client.PostWrite(ctx).Body(batch).ContentEncoding("gzip").Precision(params.Precision)
if c.TraceId != "" {
req = req.ZapTraceSpan(c.TraceId)
}
if params.BucketID != "" {
req = req.Bucket(params.BucketID)
} else {
req = req.Bucket(params.BucketName)
}
if params.OrgID != "" {
req = req.Org(params.OrgID)
} else if params.OrgName != "" {
req = req.Org(params.OrgName)
} else {
req = req.Org(c.ActiveConfig.Org)
}
if _, err := clients.Client.PostWriteExecute(req); err != nil {
return err
}
return nil
}
if err := clients.Writer.WriteBatches(ctx, clients.Throttler.Throttle(ctx, r), writeBatch); err == context.Canceled {
return ErrWriteCanceled
} else if err != nil {
return fmt.Errorf("failed to write data: %w", err)
}
return nil
}
func (c *CLI) WriteDryRun(ctx context.Context, reader LineReader) error {
r, closer, err := reader.Open(ctx)
if closer != nil {
defer closer.Close()
}
if err != nil {
return err
}
if _, err := io.Copy(c.StdIO, r); err != nil {
return err
}
return nil
}

199
internal/write_test.go Normal file
View File

@ -0,0 +1,199 @@
package internal_test
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"strings"
"testing"
"github.com/influxdata/influx-cli/v2/internal"
"github.com/influxdata/influx-cli/v2/internal/api"
"github.com/influxdata/influx-cli/v2/internal/config"
"github.com/influxdata/influx-cli/v2/internal/mock"
"github.com/stretchr/testify/require"
)
type bufferReader struct {
buf bytes.Buffer
}
func (pr *bufferReader) Open(context.Context) (io.Reader, io.Closer, error) {
return &pr.buf, ioutil.NopCloser(nil), nil
}
type noopThrottler struct {
used bool
}
func (nt *noopThrottler) Throttle(_ context.Context, in io.Reader) io.Reader {
nt.used = true
return in
}
type lineBatcher struct{}
func (pb *lineBatcher) WriteBatches(_ context.Context, r io.Reader, writeFn func(batch []byte) error) error {
buf := bytes.Buffer{}
if _, err := io.Copy(&buf, r); err != nil {
return err
}
for _, l := range strings.Split(buf.String(), "\n") {
if l != "" {
if err := writeFn([]byte(l)); err != nil {
return err
}
}
}
return nil
}
func TestWriteByIDs(t *testing.T) {
inLines := []string{"fake line protocol 1", "fake line protocol 2", "fake line protocol 3"}
mockReader := bufferReader{}
for _, l := range inLines {
_, err := io.Copy(&mockReader.buf, strings.NewReader(l+"\n"))
require.NoError(t, err)
}
mockThrottler := noopThrottler{}
mockBatcher := lineBatcher{}
params := internal.WriteParams{
OrgID: "12345",
BucketID: "98765",
Precision: api.WRITEPRECISION_S,
}
cli := internal.CLI{ActiveConfig: config.Config{Org: "my-default-org"}}
var writtenLines []string
client := mock.WriteApi{
PostWriteExecuteFn: func(req api.ApiPostWriteRequest) (*http.Response, error) {
// Make sure query params are set.
require.Equal(t, params.OrgID, *req.GetOrg())
require.Equal(t, params.BucketID, *req.GetBucket())
require.Equal(t, params.Precision, *req.GetPrecision())
// Make sure the body is properly marked for compression, and record what was sent.
require.Equal(t, "gzip", *req.GetContentEncoding())
writtenLines = append(writtenLines, string(req.GetBody()))
return nil, nil
},
}
clients := internal.WriteClients{
Reader: &mockReader,
Throttler: &mockThrottler,
Writer: &mockBatcher,
Client: &client,
}
require.NoError(t, cli.Write(context.Background(), &clients, &params))
require.Equal(t, inLines, writtenLines)
require.True(t, mockThrottler.used)
}
func TestWriteByNames(t *testing.T) {
inLines := []string{"fake line protocol 1", "fake line protocol 2", "fake line protocol 3"}
mockReader := bufferReader{}
for _, l := range inLines {
_, err := io.Copy(&mockReader.buf, strings.NewReader(l+"\n"))
require.NoError(t, err)
}
mockThrottler := noopThrottler{}
mockBatcher := lineBatcher{}
params := internal.WriteParams{
OrgName: "my-org",
BucketName: "my-bucket",
Precision: api.WRITEPRECISION_US,
}
cli := internal.CLI{TraceId: "my-trace-id", ActiveConfig: config.Config{Org: "my-default-org"}}
var writtenLines []string
client := mock.WriteApi{
PostWriteExecuteFn: func(req api.ApiPostWriteRequest) (*http.Response, error) {
// Make sure query params are set.
require.Equal(t, params.OrgName, *req.GetOrg())
require.Equal(t, params.BucketName, *req.GetBucket())
require.Equal(t, params.Precision, *req.GetPrecision())
require.Equal(t, cli.TraceId, *req.GetZapTraceSpan())
// Make sure the body is properly marked for compression, and record what was sent.
require.Equal(t, "gzip", *req.GetContentEncoding())
writtenLines = append(writtenLines, string(req.GetBody()))
return nil, nil
},
}
clients := internal.WriteClients{
Reader: &mockReader,
Throttler: &mockThrottler,
Writer: &mockBatcher,
Client: &client,
}
require.NoError(t, cli.Write(context.Background(), &clients, &params))
require.Equal(t, inLines, writtenLines)
require.True(t, mockThrottler.used)
}
func TestWriteOrgFromConfig(t *testing.T) {
inLines := []string{"fake line protocol 1", "fake line protocol 2", "fake line protocol 3"}
mockReader := bufferReader{}
for _, l := range inLines {
_, err := io.Copy(&mockReader.buf, strings.NewReader(l+"\n"))
require.NoError(t, err)
}
mockThrottler := noopThrottler{}
mockBatcher := lineBatcher{}
params := internal.WriteParams{
BucketName: "my-bucket",
Precision: api.WRITEPRECISION_US,
}
cli := internal.CLI{ActiveConfig: config.Config{Org: "my-default-org"}}
var writtenLines []string
client := mock.WriteApi{
PostWriteExecuteFn: func(req api.ApiPostWriteRequest) (*http.Response, error) {
// Make sure query params are set.
require.Equal(t, cli.ActiveConfig.Org, *req.GetOrg())
require.Equal(t, params.BucketName, *req.GetBucket())
require.Equal(t, params.Precision, *req.GetPrecision())
// Make sure the body is properly marked for compression, and record what was sent.
require.Equal(t, "gzip", *req.GetContentEncoding())
writtenLines = append(writtenLines, string(req.GetBody()))
return nil, nil
},
}
clients := internal.WriteClients{
Reader: &mockReader,
Throttler: &mockThrottler,
Writer: &mockBatcher,
Client: &client,
}
require.NoError(t, cli.Write(context.Background(), &clients, &params))
require.Equal(t, inLines, writtenLines)
require.True(t, mockThrottler.used)
}
func TestWriteDryRun(t *testing.T) {
inLines := `
fake line protocol 1
fake line protocol 2
fake line protocol 3
`
mockReader := bufferReader{}
_, err := io.Copy(&mockReader.buf, strings.NewReader(inLines))
require.NoError(t, err)
stdio := mock.NewMockStdio(nil, true)
cli := internal.CLI{ActiveConfig: config.Config{Org: "my-default-org"}, StdIO: stdio}
require.NoError(t, cli.WriteDryRun(context.Background(), &mockReader))
require.Equal(t, inLines, stdio.Stdout())
}