diff --git a/cmd/influx/main.go b/cmd/influx/main.go index 7ee115d..a8c80df 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -85,6 +85,14 @@ var commonFlagsNoToken = []cli.Flag{ }, } +// Most commands use this form of the token flag. +var commonFlags = append(commonFlagsNoToken, &cli.StringFlag{ + Name: tokenFlag, + Usage: "Authentication token", + Aliases: []string{"t"}, + EnvVars: []string{"INFLUX_TOKEN"}, +}) + // newCli builds a CLI core that reads from stdin, writes to stdout/stderr, manages a local config store, // and optionally tracks a trace ID specified over the CLI. func newCli(ctx *cli.Context) (*internal.CLI, error) { @@ -164,6 +172,7 @@ var app = cli.App{ &versionCmd, &pingCmd, &setupCmd, + &writeCmd, }, } diff --git a/cmd/influx/write.go b/cmd/influx/write.go new file mode 100644 index 0000000..1a21e32 --- /dev/null +++ b/cmd/influx/write.go @@ -0,0 +1,281 @@ +package main + +import ( + "fmt" + "io" + "net/http" + "os" + + "github.com/influxdata/influx-cli/v2/internal" + "github.com/influxdata/influx-cli/v2/internal/api" + "github.com/influxdata/influx-cli/v2/internal/batcher" + "github.com/influxdata/influx-cli/v2/internal/linereader" + "github.com/influxdata/influx-cli/v2/internal/throttler" + "github.com/urfave/cli/v2" +) + +var writeFlags = append( + commonFlags, + &cli.StringFlag{ + Name: "bucket-id", + Usage: "The ID of destination bucket", + EnvVars: []string{"INFLUX_BUCKET_ID"}, + }, + &cli.StringFlag{ + Name: "bucket", + Usage: "The name of destination bucket", + Aliases: []string{"b"}, + EnvVars: []string{"INFLUX_BUCKET_NAME"}, + }, + &cli.StringFlag{ + Name: "org-id", + Usage: "The ID of the organization", + EnvVars: []string{"INFLUX_ORG_ID"}, + }, + &cli.StringFlag{ + Name: "org", + Usage: "The name of the organization", + Aliases: []string{"o"}, + EnvVars: []string{"INFLUX_ORG"}, + }, + &cli.StringFlag{ + Name: "precision", + Usage: "Precision of the timestamps of the lines", + Aliases: []string{"p"}, + EnvVars: []string{"INFLUX_PRECISION"}, + Value: "ns", + }, + &cli.StringFlag{ + Name: "format", + Usage: "Input format, either 'lp' (Line Protocol) or 'csv' (Comma Separated Values)", + DefaultText: "'lp' unless '.csv' extension", + }, + &cli.StringSliceFlag{ + Name: "header", + Usage: "Header prepends lines to input data", + }, + &cli.StringSliceFlag{ + Name: "file", + Usage: "The path to the file to import", + Aliases: []string{"f"}, + TakesFile: true, + }, + &cli.StringSliceFlag{ + Name: "url", + Usage: "The URL to import data from", + Aliases: []string{"u"}, + }, + &cli.BoolFlag{ + Name: "debug", + Usage: "Log CSV columns to stderr before reading data rows", + }, + &cli.BoolFlag{ + Name: "skipRowOnError", + Usage: "Log CSV data errors to stderr and continue with CSV processing", + }, + // NOTE: The old CLI allowed this flag to be used as an int _or_ a bool, with the bool form being + // short-hand for N=1. urfave/cli isn't that flexible. + &cli.IntFlag{ + Name: "skipHeader", + Usage: "Skip the first rows from input data", + }, + &cli.IntFlag{ + Name: "max-line-length", + Usage: "Specifies the maximum number of bytes that can be read for a single line", + Value: 16_000_000, + }, + &cli.BoolFlag{ + Name: "xIgnoreDataTypeInColumnName", + Usage: "Ignores dataType which could be specified after ':' in column name", + Hidden: true, + }, + &cli.StringFlag{ + Name: "encoding", + Usage: "Character encoding of input files or stdin", + Value: "UTF-8", + }, + &cli.StringFlag{ + Name: "errors-file", + Usage: "The path to the file to write rejected rows to", + TakesFile: true, + }, + &cli.StringFlag{ + Name: "rate-limit", + Usage: `Throttles write, examples: "5 MB / 5 min" , "17kBs"`, + DefaultText: "no throttling", + }, + &cli.StringFlag{ + Name: "compression", + Usage: "Input compression, either 'none' or 'gzip'", + DefaultText: "'none' unless an input has a '.gz' extension", + }, +) + +var writeCmd = cli.Command{ + Name: "write", + Usage: "Write points to InfluxDB", + Description: "Write data to InfluxDB via stdin, or add an entire file specified with the -f flag", + Flags: writeFlags, + Action: func(ctx *cli.Context) error { + format, err := parseFormat(ctx.String("format")) + if err != nil { + return err + } + compression, err := parseCompression(ctx.String("compression")) + if err != nil { + return err + } + precision, err := parsePrecision(ctx.String("precision")) + if err != nil { + return err + } + + var errorOut io.Writer + if ctx.IsSet("errors-file") { + errorFile, err := os.Open(ctx.String("errors-file")) + if err != nil { + return fmt.Errorf("failed to open errors-file: %w", err) + } + defer errorFile.Close() + errorOut = errorFile + } + + throttler, err := throttler.NewThrottler(ctx.String("rate-limit")) + if err != nil { + return err + } + cli, err := newCli(ctx) + if err != nil { + return err + } + client, err := newApiClient(ctx, cli, true) + if err != nil { + return err + } + writeClients := &internal.WriteClients{ + Client: client.WriteApi, + Reader: &linereader.MultiInputLineReader{ + StdIn: os.Stdin, + HttpClient: http.DefaultClient, + ErrorOut: errorOut, + Args: ctx.Args().Slice(), + Files: ctx.StringSlice("file"), + URLs: ctx.StringSlice("url"), + Format: format, + Compression: compression, + Encoding: ctx.String("encoding"), + Headers: ctx.StringSlice("header"), + SkipRowOnError: ctx.Bool("skipRowOnError"), + SkipHeader: ctx.Int("skipHeader"), + IgnoreDataTypeInColumnName: ctx.Bool("xIgnoreDataTypeInColumnName"), + Debug: ctx.Bool("debug"), + }, + Throttler: throttler, + Writer: &batcher.BufferBatcher{ + MaxFlushBytes: batcher.DefaultMaxBytes, + MaxFlushInterval: batcher.DefaultInterval, + MaxLineLength: ctx.Int("max-line-length"), + }, + } + + return cli.Write(standardCtx(ctx), writeClients, &internal.WriteParams{ + BucketID: ctx.String("bucket-id"), + BucketName: ctx.String("bucket"), + OrgID: ctx.String("org-id"), + OrgName: ctx.String("org"), + Precision: precision, + }) + }, + Subcommands: []*cli.Command{ + { + Name: "dryrun", + Usage: "Write to stdout instead of InfluxDB", + Description: "Write protocol lines to stdout instead of InfluxDB. Troubleshoot conversion from CSV to line protocol", + Flags: writeFlags, + Action: func(ctx *cli.Context) error { + format, err := parseFormat(ctx.String("format")) + if err != nil { + return err + } + compression, err := parseCompression(ctx.String("compression")) + if err != nil { + return err + } + + var errorOut io.Writer + if ctx.IsSet("errors-file") { + errorFile, err := os.Open(ctx.String("errors-file")) + if err != nil { + return fmt.Errorf("failed to open errors-file: %w", err) + } + defer errorFile.Close() + errorOut = errorFile + } + + cli, err := newCli(ctx) + if err != nil { + return err + } + reader := &linereader.MultiInputLineReader{ + StdIn: os.Stdin, + HttpClient: http.DefaultClient, + ErrorOut: errorOut, + Args: ctx.Args().Slice(), + Files: ctx.StringSlice("file"), + URLs: ctx.StringSlice("url"), + Format: format, + Compression: compression, + Encoding: ctx.String("encoding"), + Headers: ctx.StringSlice("header"), + SkipRowOnError: ctx.Bool("skipRowOnError"), + SkipHeader: ctx.Int("skipHeader"), + IgnoreDataTypeInColumnName: ctx.Bool("xIgnoreDataTypeInColumnName"), + Debug: ctx.Bool("debug"), + } + + return cli.WriteDryRun(standardCtx(ctx), reader) + }, + }, + }, +} + +func parseFormat(f string) (linereader.InputFormat, error) { + switch f { + case "": + return linereader.InputFormatDerived, nil + case "lp": + return linereader.InputFormatLP, nil + case "csv": + return linereader.InputFormatCSV, nil + default: + return 0, fmt.Errorf("unsupported format: %q", f) + } +} + +func parseCompression(c string) (linereader.InputCompression, error) { + switch c { + case "": + return linereader.InputCompressionDerived, nil + case "none": + return linereader.InputCompressionNone, nil + case "gzip": + return linereader.InputCompressionGZIP, nil + default: + return 0, fmt.Errorf("unsupported compression: %q", c) + } +} + +func parsePrecision(p string) (api.WritePrecision, error) { + switch p { + case "ms": + return api.WRITEPRECISION_MS, nil + case "s": + return api.WRITEPRECISION_S, nil + case "us": + return api.WRITEPRECISION_US, nil + case "ns": + return api.WRITEPRECISION_NS, nil + default: + return "", fmt.Errorf("unsupported precision: %q", p) + } +} diff --git a/etc/generate-openapi.sh b/etc/generate-openapi.sh index 7c903c5..46e491e 100755 --- a/etc/generate-openapi.sh +++ b/etc/generate-openapi.sh @@ -4,7 +4,7 @@ declare -r ETC_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &> /dev/null && pwd)" declare -r ROOT_DIR="$(dirname ${ETC_DIR})" declare -r GENERATOR_DOCKER_IMG=openapitools/openapi-generator-cli:v5.1.0 -declare -r OPENAPI_COMMIT=5ab3ef0b9a6aee68b3b34e1858ca6d55c153650a +declare -r OPENAPI_COMMIT=4b75803e472eadb7be101fbe84f1f120f0a008b0 # Download our target API spec. # NOTE: openapi-generator supports HTTP references to API docs, but using that feature diff --git a/internal/api/api_write.go b/internal/api/api_write.go new file mode 100644 index 0000000..5810be2 --- /dev/null +++ b/internal/api/api_write.go @@ -0,0 +1,302 @@ +/* + * Subset of Influx API covered by Influx CLI + * + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * API version: 2.0.0 + */ + +// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. + +package api + +import ( + "bytes" + _context "context" + _ioutil "io/ioutil" + _nethttp "net/http" + _neturl "net/url" +) + +// Linger please +var ( + _ _context.Context +) + +type WriteApi interface { + + /* + * PostWrite Write time series data into InfluxDB + * @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). + * @return ApiPostWriteRequest + */ + PostWrite(ctx _context.Context) ApiPostWriteRequest + + /* + * PostWriteExecute executes the request + */ + PostWriteExecute(r ApiPostWriteRequest) (*_nethttp.Response, error) +} + +// WriteApiService WriteApi service +type WriteApiService service + +type ApiPostWriteRequest struct { + ctx _context.Context + ApiService WriteApi + org *string + bucket *string + body []byte + zapTraceSpan *string + contentEncoding *string + contentType *string + contentLength *int32 + accept *string + orgID *string + precision *WritePrecision +} + +func (r ApiPostWriteRequest) Org(org string) ApiPostWriteRequest { + r.org = &org + return r +} +func (r ApiPostWriteRequest) GetOrg() *string { + return r.org +} + +func (r ApiPostWriteRequest) Bucket(bucket string) ApiPostWriteRequest { + r.bucket = &bucket + return r +} +func (r ApiPostWriteRequest) GetBucket() *string { + return r.bucket +} + +func (r ApiPostWriteRequest) Body(body []byte) ApiPostWriteRequest { + r.body = body + return r +} +func (r ApiPostWriteRequest) GetBody() []byte { + return r.body +} + +func (r ApiPostWriteRequest) ZapTraceSpan(zapTraceSpan string) ApiPostWriteRequest { + r.zapTraceSpan = &zapTraceSpan + return r +} +func (r ApiPostWriteRequest) GetZapTraceSpan() *string { + return r.zapTraceSpan +} + +func (r ApiPostWriteRequest) ContentEncoding(contentEncoding string) ApiPostWriteRequest { + r.contentEncoding = &contentEncoding + return r +} +func (r ApiPostWriteRequest) GetContentEncoding() *string { + return r.contentEncoding +} + +func (r ApiPostWriteRequest) ContentType(contentType string) ApiPostWriteRequest { + r.contentType = &contentType + return r +} +func (r ApiPostWriteRequest) GetContentType() *string { + return r.contentType +} + +func (r ApiPostWriteRequest) ContentLength(contentLength int32) ApiPostWriteRequest { + r.contentLength = &contentLength + return r +} +func (r ApiPostWriteRequest) GetContentLength() *int32 { + return r.contentLength +} + +func (r ApiPostWriteRequest) Accept(accept string) ApiPostWriteRequest { + r.accept = &accept + return r +} +func (r ApiPostWriteRequest) GetAccept() *string { + return r.accept +} + +func (r ApiPostWriteRequest) OrgID(orgID string) ApiPostWriteRequest { + r.orgID = &orgID + return r +} +func (r ApiPostWriteRequest) GetOrgID() *string { + return r.orgID +} + +func (r ApiPostWriteRequest) Precision(precision WritePrecision) ApiPostWriteRequest { + r.precision = &precision + return r +} +func (r ApiPostWriteRequest) GetPrecision() *WritePrecision { + return r.precision +} + +func (r ApiPostWriteRequest) Execute() (*_nethttp.Response, error) { + return r.ApiService.PostWriteExecute(r) +} + +/* + * PostWrite Write time series data into InfluxDB + * @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). + * @return ApiPostWriteRequest + */ +func (a *WriteApiService) PostWrite(ctx _context.Context) ApiPostWriteRequest { + return ApiPostWriteRequest{ + ApiService: a, + ctx: ctx, + } +} + +/* + * Execute executes the request + */ +func (a *WriteApiService) PostWriteExecute(r ApiPostWriteRequest) (*_nethttp.Response, error) { + var ( + localVarHTTPMethod = _nethttp.MethodPost + localVarPostBody interface{} + localVarFormFileName string + localVarFileName string + localVarFileBytes []byte + ) + + localBasePath, err := a.client.cfg.ServerURLWithContext(r.ctx, "WriteApiService.PostWrite") + if err != nil { + return nil, GenericOpenAPIError{error: err.Error()} + } + + localVarPath := localBasePath + "/write" + + localVarHeaderParams := make(map[string]string) + localVarQueryParams := _neturl.Values{} + localVarFormParams := _neturl.Values{} + if r.org == nil { + return nil, reportError("org is required and must be specified") + } + if r.bucket == nil { + return nil, reportError("bucket is required and must be specified") + } + if r.body == nil { + return nil, reportError("body is required and must be specified") + } + + localVarQueryParams.Add("org", parameterToString(*r.org, "")) + if r.orgID != nil { + localVarQueryParams.Add("orgID", parameterToString(*r.orgID, "")) + } + localVarQueryParams.Add("bucket", parameterToString(*r.bucket, "")) + if r.precision != nil { + localVarQueryParams.Add("precision", parameterToString(*r.precision, "")) + } + // to determine the Content-Type header + localVarHTTPContentTypes := []string{"text/plain"} + + // set Content-Type header + localVarHTTPContentType := selectHeaderContentType(localVarHTTPContentTypes) + if localVarHTTPContentType != "" { + localVarHeaderParams["Content-Type"] = localVarHTTPContentType + } + + // to determine the Accept header + localVarHTTPHeaderAccepts := []string{"application/json"} + + // set Accept header + localVarHTTPHeaderAccept := selectHeaderAccept(localVarHTTPHeaderAccepts) + if localVarHTTPHeaderAccept != "" { + localVarHeaderParams["Accept"] = localVarHTTPHeaderAccept + } + if r.zapTraceSpan != nil { + localVarHeaderParams["Zap-Trace-Span"] = parameterToString(*r.zapTraceSpan, "") + } + if r.contentEncoding != nil { + localVarHeaderParams["Content-Encoding"] = parameterToString(*r.contentEncoding, "") + } + if r.contentType != nil { + localVarHeaderParams["Content-Type"] = parameterToString(*r.contentType, "") + } + if r.contentLength != nil { + localVarHeaderParams["Content-Length"] = parameterToString(*r.contentLength, "") + } + if r.accept != nil { + localVarHeaderParams["Accept"] = parameterToString(*r.accept, "") + } + // body params + localVarPostBody = r.body + req, err := a.client.prepareRequest(r.ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, localVarFormFileName, localVarFileName, localVarFileBytes) + if err != nil { + return nil, err + } + + localVarHTTPResponse, err := a.client.callAPI(req) + if err != nil || localVarHTTPResponse == nil { + return localVarHTTPResponse, err + } + + localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) + localVarHTTPResponse.Body.Close() + localVarHTTPResponse.Body = _ioutil.NopCloser(bytes.NewBuffer(localVarBody)) + if err != nil { + return localVarHTTPResponse, err + } + + if localVarHTTPResponse.StatusCode >= 300 { + newErr := GenericOpenAPIError{ + body: localVarBody, + error: localVarHTTPResponse.Status, + } + if localVarHTTPResponse.StatusCode == 400 { + var v LineProtocolError + err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) + if err != nil { + newErr.error = err.Error() + return localVarHTTPResponse, newErr + } + newErr.model = &v + return localVarHTTPResponse, newErr + } + if localVarHTTPResponse.StatusCode == 401 { + var v Error + err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) + if err != nil { + newErr.error = err.Error() + return localVarHTTPResponse, newErr + } + newErr.model = &v + return localVarHTTPResponse, newErr + } + if localVarHTTPResponse.StatusCode == 403 { + var v Error + err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) + if err != nil { + newErr.error = err.Error() + return localVarHTTPResponse, newErr + } + newErr.model = &v + return localVarHTTPResponse, newErr + } + if localVarHTTPResponse.StatusCode == 413 { + var v LineProtocolLengthError + err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) + if err != nil { + newErr.error = err.Error() + return localVarHTTPResponse, newErr + } + newErr.model = &v + return localVarHTTPResponse, newErr + } + var v Error + err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) + if err != nil { + newErr.error = err.Error() + return localVarHTTPResponse, newErr + } + newErr.model = &v + return localVarHTTPResponse, newErr + } + + return localVarHTTPResponse, nil +} diff --git a/internal/api/client.go b/internal/api/client.go index b575d56..7577c4d 100644 --- a/internal/api/client.go +++ b/internal/api/client.go @@ -50,6 +50,8 @@ type APIClient struct { HealthApi HealthApi SetupApi SetupApi + + WriteApi WriteApi } type service struct { @@ -70,6 +72,7 @@ func NewAPIClient(cfg *Configuration) *APIClient { // API Services c.HealthApi = (*HealthApiService)(&c.common) c.SetupApi = (*SetupApiService)(&c.common) + c.WriteApi = (*WriteApiService)(&c.common) return c } diff --git a/internal/api/error.go b/internal/api/error.go index 82e0910..465183d 100644 --- a/internal/api/error.go +++ b/internal/api/error.go @@ -35,3 +35,11 @@ func (o *HealthCheck) Error() string { } return fmt.Sprintf("health check failed: %s", message) } + +func (o *LineProtocolError) Error() string { + return o.Message +} + +func (o *LineProtocolLengthError) Error() string { + return o.Message +} diff --git a/internal/api/model_line_protocol_error.go b/internal/api/model_line_protocol_error.go new file mode 100644 index 0000000..97ce791 --- /dev/null +++ b/internal/api/model_line_protocol_error.go @@ -0,0 +1,234 @@ +/* + * Subset of Influx API covered by Influx CLI + * + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * API version: 2.0.0 + */ + +// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. + +package api + +import ( + "encoding/json" +) + +// LineProtocolError struct for LineProtocolError +type LineProtocolError struct { + // Code is the machine-readable error code. + Code string `json:"code"` + // Message is a human-readable message. + Message string `json:"message"` + // Op describes the logical code operation during error. Useful for debugging. + Op string `json:"op"` + // Err is a stack of errors that occurred during processing of the request. Useful for debugging. + Err string `json:"err"` + // First line within sent body containing malformed data + Line *int32 `json:"line,omitempty"` +} + +// NewLineProtocolError instantiates a new LineProtocolError object +// This constructor will assign default values to properties that have it defined, +// and makes sure properties required by API are set, but the set of arguments +// will change when the set of required properties is changed +func NewLineProtocolError(code string, message string, op string, err string) *LineProtocolError { + this := LineProtocolError{} + this.Code = code + this.Message = message + this.Op = op + this.Err = err + return &this +} + +// NewLineProtocolErrorWithDefaults instantiates a new LineProtocolError object +// This constructor will only assign default values to properties that have it defined, +// but it doesn't guarantee that properties required by API are set +func NewLineProtocolErrorWithDefaults() *LineProtocolError { + this := LineProtocolError{} + return &this +} + +// GetCode returns the Code field value +func (o *LineProtocolError) GetCode() string { + if o == nil { + var ret string + return ret + } + + return o.Code +} + +// GetCodeOk returns a tuple with the Code field value +// and a boolean to check if the value has been set. +func (o *LineProtocolError) GetCodeOk() (*string, bool) { + if o == nil { + return nil, false + } + return &o.Code, true +} + +// SetCode sets field value +func (o *LineProtocolError) SetCode(v string) { + o.Code = v +} + +// GetMessage returns the Message field value +func (o *LineProtocolError) GetMessage() string { + if o == nil { + var ret string + return ret + } + + return o.Message +} + +// GetMessageOk returns a tuple with the Message field value +// and a boolean to check if the value has been set. +func (o *LineProtocolError) GetMessageOk() (*string, bool) { + if o == nil { + return nil, false + } + return &o.Message, true +} + +// SetMessage sets field value +func (o *LineProtocolError) SetMessage(v string) { + o.Message = v +} + +// GetOp returns the Op field value +func (o *LineProtocolError) GetOp() string { + if o == nil { + var ret string + return ret + } + + return o.Op +} + +// GetOpOk returns a tuple with the Op field value +// and a boolean to check if the value has been set. +func (o *LineProtocolError) GetOpOk() (*string, bool) { + if o == nil { + return nil, false + } + return &o.Op, true +} + +// SetOp sets field value +func (o *LineProtocolError) SetOp(v string) { + o.Op = v +} + +// GetErr returns the Err field value +func (o *LineProtocolError) GetErr() string { + if o == nil { + var ret string + return ret + } + + return o.Err +} + +// GetErrOk returns a tuple with the Err field value +// and a boolean to check if the value has been set. +func (o *LineProtocolError) GetErrOk() (*string, bool) { + if o == nil { + return nil, false + } + return &o.Err, true +} + +// SetErr sets field value +func (o *LineProtocolError) SetErr(v string) { + o.Err = v +} + +// GetLine returns the Line field value if set, zero value otherwise. +func (o *LineProtocolError) GetLine() int32 { + if o == nil || o.Line == nil { + var ret int32 + return ret + } + return *o.Line +} + +// GetLineOk returns a tuple with the Line field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *LineProtocolError) GetLineOk() (*int32, bool) { + if o == nil || o.Line == nil { + return nil, false + } + return o.Line, true +} + +// HasLine returns a boolean if a field has been set. +func (o *LineProtocolError) HasLine() bool { + if o != nil && o.Line != nil { + return true + } + + return false +} + +// SetLine gets a reference to the given int32 and assigns it to the Line field. +func (o *LineProtocolError) SetLine(v int32) { + o.Line = &v +} + +func (o LineProtocolError) MarshalJSON() ([]byte, error) { + toSerialize := map[string]interface{}{} + if true { + toSerialize["code"] = o.Code + } + if true { + toSerialize["message"] = o.Message + } + if true { + toSerialize["op"] = o.Op + } + if true { + toSerialize["err"] = o.Err + } + if o.Line != nil { + toSerialize["line"] = o.Line + } + return json.Marshal(toSerialize) +} + +type NullableLineProtocolError struct { + value *LineProtocolError + isSet bool +} + +func (v NullableLineProtocolError) Get() *LineProtocolError { + return v.value +} + +func (v *NullableLineProtocolError) Set(val *LineProtocolError) { + v.value = val + v.isSet = true +} + +func (v NullableLineProtocolError) IsSet() bool { + return v.isSet +} + +func (v *NullableLineProtocolError) Unset() { + v.value = nil + v.isSet = false +} + +func NewNullableLineProtocolError(val *LineProtocolError) *NullableLineProtocolError { + return &NullableLineProtocolError{value: val, isSet: true} +} + +func (v NullableLineProtocolError) MarshalJSON() ([]byte, error) { + return json.Marshal(v.value) +} + +func (v *NullableLineProtocolError) UnmarshalJSON(src []byte) error { + v.isSet = true + return json.Unmarshal(src, &v.value) +} diff --git a/internal/api/model_line_protocol_length_error.go b/internal/api/model_line_protocol_length_error.go new file mode 100644 index 0000000..f820cd9 --- /dev/null +++ b/internal/api/model_line_protocol_length_error.go @@ -0,0 +1,167 @@ +/* + * Subset of Influx API covered by Influx CLI + * + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * API version: 2.0.0 + */ + +// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. + +package api + +import ( + "encoding/json" +) + +// LineProtocolLengthError struct for LineProtocolLengthError +type LineProtocolLengthError struct { + // Code is the machine-readable error code. + Code string `json:"code"` + // Message is a human-readable message. + Message string `json:"message"` + // Max length in bytes for a body of line-protocol. + MaxLength int32 `json:"maxLength"` +} + +// NewLineProtocolLengthError instantiates a new LineProtocolLengthError object +// This constructor will assign default values to properties that have it defined, +// and makes sure properties required by API are set, but the set of arguments +// will change when the set of required properties is changed +func NewLineProtocolLengthError(code string, message string, maxLength int32) *LineProtocolLengthError { + this := LineProtocolLengthError{} + this.Code = code + this.Message = message + this.MaxLength = maxLength + return &this +} + +// NewLineProtocolLengthErrorWithDefaults instantiates a new LineProtocolLengthError object +// This constructor will only assign default values to properties that have it defined, +// but it doesn't guarantee that properties required by API are set +func NewLineProtocolLengthErrorWithDefaults() *LineProtocolLengthError { + this := LineProtocolLengthError{} + return &this +} + +// GetCode returns the Code field value +func (o *LineProtocolLengthError) GetCode() string { + if o == nil { + var ret string + return ret + } + + return o.Code +} + +// GetCodeOk returns a tuple with the Code field value +// and a boolean to check if the value has been set. +func (o *LineProtocolLengthError) GetCodeOk() (*string, bool) { + if o == nil { + return nil, false + } + return &o.Code, true +} + +// SetCode sets field value +func (o *LineProtocolLengthError) SetCode(v string) { + o.Code = v +} + +// GetMessage returns the Message field value +func (o *LineProtocolLengthError) GetMessage() string { + if o == nil { + var ret string + return ret + } + + return o.Message +} + +// GetMessageOk returns a tuple with the Message field value +// and a boolean to check if the value has been set. +func (o *LineProtocolLengthError) GetMessageOk() (*string, bool) { + if o == nil { + return nil, false + } + return &o.Message, true +} + +// SetMessage sets field value +func (o *LineProtocolLengthError) SetMessage(v string) { + o.Message = v +} + +// GetMaxLength returns the MaxLength field value +func (o *LineProtocolLengthError) GetMaxLength() int32 { + if o == nil { + var ret int32 + return ret + } + + return o.MaxLength +} + +// GetMaxLengthOk returns a tuple with the MaxLength field value +// and a boolean to check if the value has been set. +func (o *LineProtocolLengthError) GetMaxLengthOk() (*int32, bool) { + if o == nil { + return nil, false + } + return &o.MaxLength, true +} + +// SetMaxLength sets field value +func (o *LineProtocolLengthError) SetMaxLength(v int32) { + o.MaxLength = v +} + +func (o LineProtocolLengthError) MarshalJSON() ([]byte, error) { + toSerialize := map[string]interface{}{} + if true { + toSerialize["code"] = o.Code + } + if true { + toSerialize["message"] = o.Message + } + if true { + toSerialize["maxLength"] = o.MaxLength + } + return json.Marshal(toSerialize) +} + +type NullableLineProtocolLengthError struct { + value *LineProtocolLengthError + isSet bool +} + +func (v NullableLineProtocolLengthError) Get() *LineProtocolLengthError { + return v.value +} + +func (v *NullableLineProtocolLengthError) Set(val *LineProtocolLengthError) { + v.value = val + v.isSet = true +} + +func (v NullableLineProtocolLengthError) IsSet() bool { + return v.isSet +} + +func (v *NullableLineProtocolLengthError) Unset() { + v.value = nil + v.isSet = false +} + +func NewNullableLineProtocolLengthError(val *LineProtocolLengthError) *NullableLineProtocolLengthError { + return &NullableLineProtocolLengthError{value: val, isSet: true} +} + +func (v NullableLineProtocolLengthError) MarshalJSON() ([]byte, error) { + return json.Marshal(v.value) +} + +func (v *NullableLineProtocolLengthError) UnmarshalJSON(src []byte) error { + v.isSet = true + return json.Unmarshal(src, &v.value) +} diff --git a/internal/api/model_write_precision.go b/internal/api/model_write_precision.go new file mode 100644 index 0000000..2f1c3bd --- /dev/null +++ b/internal/api/model_write_precision.go @@ -0,0 +1,85 @@ +/* + * Subset of Influx API covered by Influx CLI + * + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * API version: 2.0.0 + */ + +// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. + +package api + +import ( + "encoding/json" + "fmt" +) + +// WritePrecision the model 'WritePrecision' +type WritePrecision string + +// List of WritePrecision +const ( + WRITEPRECISION_MS WritePrecision = "ms" + WRITEPRECISION_S WritePrecision = "s" + WRITEPRECISION_US WritePrecision = "us" + WRITEPRECISION_NS WritePrecision = "ns" +) + +func (v *WritePrecision) UnmarshalJSON(src []byte) error { + var value string + err := json.Unmarshal(src, &value) + if err != nil { + return err + } + enumTypeValue := WritePrecision(value) + for _, existing := range []WritePrecision{"ms", "s", "us", "ns"} { + if existing == enumTypeValue { + *v = enumTypeValue + return nil + } + } + + return fmt.Errorf("%+v is not a valid WritePrecision", value) +} + +// Ptr returns reference to WritePrecision value +func (v WritePrecision) Ptr() *WritePrecision { + return &v +} + +type NullableWritePrecision struct { + value *WritePrecision + isSet bool +} + +func (v NullableWritePrecision) Get() *WritePrecision { + return v.value +} + +func (v *NullableWritePrecision) Set(val *WritePrecision) { + v.value = val + v.isSet = true +} + +func (v NullableWritePrecision) IsSet() bool { + return v.isSet +} + +func (v *NullableWritePrecision) Unset() { + v.value = nil + v.isSet = false +} + +func NewNullableWritePrecision(val *WritePrecision) *NullableWritePrecision { + return &NullableWritePrecision{value: val, isSet: true} +} + +func (v NullableWritePrecision) MarshalJSON() ([]byte, error) { + return json.Marshal(v.value) +} + +func (v *NullableWritePrecision) UnmarshalJSON(src []byte) error { + v.isSet = true + return json.Unmarshal(src, &v.value) +} diff --git a/internal/mock/api_write.go b/internal/mock/api_write.go new file mode 100644 index 0000000..3c5201e --- /dev/null +++ b/internal/mock/api_write.go @@ -0,0 +1,23 @@ +package mock + +import ( + "context" + "net/http" + + "github.com/influxdata/influx-cli/v2/internal/api" +) + +var _ api.WriteApi = (*WriteApi)(nil) + +type WriteApi struct { + PostWriteExecuteFn func(api.ApiPostWriteRequest) (*http.Response, error) +} + +func (w *WriteApi) PostWrite(context.Context) api.ApiPostWriteRequest { + return api.ApiPostWriteRequest{ + ApiService: w, + } +} +func (w *WriteApi) PostWriteExecute(req api.ApiPostWriteRequest) (*http.Response, error) { + return w.PostWriteExecuteFn(req) +} diff --git a/internal/write.go b/internal/write.go new file mode 100644 index 0000000..dd9250a --- /dev/null +++ b/internal/write.go @@ -0,0 +1,105 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/influxdata/influx-cli/v2/internal/api" +) + +type LineReader interface { + Open(ctx context.Context) (io.Reader, io.Closer, error) +} + +type Throttler interface { + Throttle(ctx context.Context, in io.Reader) io.Reader +} + +type Batcher interface { + WriteBatches(ctx context.Context, r io.Reader, writeFn func(batch []byte) error) error +} + +type WriteClients struct { + Reader LineReader + Throttler Throttler + Writer Batcher + Client api.WriteApi +} + +type WriteParams struct { + BucketID string + BucketName string + OrgID string + OrgName string + Precision api.WritePrecision +} + +var ErrWriteCanceled = errors.New("write canceled") + +func (c *CLI) Write(ctx context.Context, clients *WriteClients, params *WriteParams) error { + if params.OrgID == "" && params.OrgName == "" && c.ActiveConfig.Org == "" { + return errors.New("must specify org ID or org name") + } + if params.BucketID == "" && params.BucketName == "" { + return errors.New("must specify bucket ID or bucket name") + } + + r, closer, err := clients.Reader.Open(ctx) + if closer != nil { + defer closer.Close() + } + if err != nil { + return err + } + + writeBatch := func(batch []byte) error { + req := clients.Client.PostWrite(ctx).Body(batch).ContentEncoding("gzip").Precision(params.Precision) + if c.TraceId != "" { + req = req.ZapTraceSpan(c.TraceId) + } + if params.BucketID != "" { + req = req.Bucket(params.BucketID) + } else { + req = req.Bucket(params.BucketName) + } + if params.OrgID != "" { + req = req.Org(params.OrgID) + } else if params.OrgName != "" { + req = req.Org(params.OrgName) + } else { + req = req.Org(c.ActiveConfig.Org) + } + + if _, err := clients.Client.PostWriteExecute(req); err != nil { + return err + } + + return nil + } + + if err := clients.Writer.WriteBatches(ctx, clients.Throttler.Throttle(ctx, r), writeBatch); err == context.Canceled { + return ErrWriteCanceled + } else if err != nil { + return fmt.Errorf("failed to write data: %w", err) + } + + return nil +} + +func (c *CLI) WriteDryRun(ctx context.Context, reader LineReader) error { + r, closer, err := reader.Open(ctx) + if closer != nil { + defer closer.Close() + } + if err != nil { + return err + } + + if _, err := io.Copy(c.StdIO, r); err != nil { + return err + } + + return nil +} diff --git a/internal/write_test.go b/internal/write_test.go new file mode 100644 index 0000000..9d2e88a --- /dev/null +++ b/internal/write_test.go @@ -0,0 +1,199 @@ +package internal_test + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdata/influx-cli/v2/internal" + "github.com/influxdata/influx-cli/v2/internal/api" + "github.com/influxdata/influx-cli/v2/internal/config" + "github.com/influxdata/influx-cli/v2/internal/mock" + "github.com/stretchr/testify/require" +) + +type bufferReader struct { + buf bytes.Buffer +} + +func (pr *bufferReader) Open(context.Context) (io.Reader, io.Closer, error) { + return &pr.buf, ioutil.NopCloser(nil), nil +} + +type noopThrottler struct { + used bool +} + +func (nt *noopThrottler) Throttle(_ context.Context, in io.Reader) io.Reader { + nt.used = true + return in +} + +type lineBatcher struct{} + +func (pb *lineBatcher) WriteBatches(_ context.Context, r io.Reader, writeFn func(batch []byte) error) error { + buf := bytes.Buffer{} + if _, err := io.Copy(&buf, r); err != nil { + return err + } + for _, l := range strings.Split(buf.String(), "\n") { + if l != "" { + if err := writeFn([]byte(l)); err != nil { + return err + } + } + } + return nil +} + +func TestWriteByIDs(t *testing.T) { + inLines := []string{"fake line protocol 1", "fake line protocol 2", "fake line protocol 3"} + mockReader := bufferReader{} + for _, l := range inLines { + _, err := io.Copy(&mockReader.buf, strings.NewReader(l+"\n")) + require.NoError(t, err) + } + mockThrottler := noopThrottler{} + mockBatcher := lineBatcher{} + + params := internal.WriteParams{ + OrgID: "12345", + BucketID: "98765", + Precision: api.WRITEPRECISION_S, + } + cli := internal.CLI{ActiveConfig: config.Config{Org: "my-default-org"}} + + var writtenLines []string + client := mock.WriteApi{ + PostWriteExecuteFn: func(req api.ApiPostWriteRequest) (*http.Response, error) { + // Make sure query params are set. + require.Equal(t, params.OrgID, *req.GetOrg()) + require.Equal(t, params.BucketID, *req.GetBucket()) + require.Equal(t, params.Precision, *req.GetPrecision()) + + // Make sure the body is properly marked for compression, and record what was sent. + require.Equal(t, "gzip", *req.GetContentEncoding()) + writtenLines = append(writtenLines, string(req.GetBody())) + return nil, nil + }, + } + + clients := internal.WriteClients{ + Reader: &mockReader, + Throttler: &mockThrottler, + Writer: &mockBatcher, + Client: &client, + } + + require.NoError(t, cli.Write(context.Background(), &clients, ¶ms)) + require.Equal(t, inLines, writtenLines) + require.True(t, mockThrottler.used) +} + +func TestWriteByNames(t *testing.T) { + inLines := []string{"fake line protocol 1", "fake line protocol 2", "fake line protocol 3"} + mockReader := bufferReader{} + for _, l := range inLines { + _, err := io.Copy(&mockReader.buf, strings.NewReader(l+"\n")) + require.NoError(t, err) + } + mockThrottler := noopThrottler{} + mockBatcher := lineBatcher{} + + params := internal.WriteParams{ + OrgName: "my-org", + BucketName: "my-bucket", + Precision: api.WRITEPRECISION_US, + } + cli := internal.CLI{TraceId: "my-trace-id", ActiveConfig: config.Config{Org: "my-default-org"}} + + var writtenLines []string + client := mock.WriteApi{ + PostWriteExecuteFn: func(req api.ApiPostWriteRequest) (*http.Response, error) { + // Make sure query params are set. + require.Equal(t, params.OrgName, *req.GetOrg()) + require.Equal(t, params.BucketName, *req.GetBucket()) + require.Equal(t, params.Precision, *req.GetPrecision()) + require.Equal(t, cli.TraceId, *req.GetZapTraceSpan()) + + // Make sure the body is properly marked for compression, and record what was sent. + require.Equal(t, "gzip", *req.GetContentEncoding()) + writtenLines = append(writtenLines, string(req.GetBody())) + return nil, nil + }, + } + + clients := internal.WriteClients{ + Reader: &mockReader, + Throttler: &mockThrottler, + Writer: &mockBatcher, + Client: &client, + } + + require.NoError(t, cli.Write(context.Background(), &clients, ¶ms)) + require.Equal(t, inLines, writtenLines) + require.True(t, mockThrottler.used) +} + +func TestWriteOrgFromConfig(t *testing.T) { + inLines := []string{"fake line protocol 1", "fake line protocol 2", "fake line protocol 3"} + mockReader := bufferReader{} + for _, l := range inLines { + _, err := io.Copy(&mockReader.buf, strings.NewReader(l+"\n")) + require.NoError(t, err) + } + mockThrottler := noopThrottler{} + mockBatcher := lineBatcher{} + + params := internal.WriteParams{ + BucketName: "my-bucket", + Precision: api.WRITEPRECISION_US, + } + cli := internal.CLI{ActiveConfig: config.Config{Org: "my-default-org"}} + + var writtenLines []string + client := mock.WriteApi{ + PostWriteExecuteFn: func(req api.ApiPostWriteRequest) (*http.Response, error) { + // Make sure query params are set. + require.Equal(t, cli.ActiveConfig.Org, *req.GetOrg()) + require.Equal(t, params.BucketName, *req.GetBucket()) + require.Equal(t, params.Precision, *req.GetPrecision()) + + // Make sure the body is properly marked for compression, and record what was sent. + require.Equal(t, "gzip", *req.GetContentEncoding()) + writtenLines = append(writtenLines, string(req.GetBody())) + return nil, nil + }, + } + + clients := internal.WriteClients{ + Reader: &mockReader, + Throttler: &mockThrottler, + Writer: &mockBatcher, + Client: &client, + } + + require.NoError(t, cli.Write(context.Background(), &clients, ¶ms)) + require.Equal(t, inLines, writtenLines) + require.True(t, mockThrottler.used) +} + +func TestWriteDryRun(t *testing.T) { + inLines := ` +fake line protocol 1 +fake line protocol 2 +fake line protocol 3 +` + mockReader := bufferReader{} + _, err := io.Copy(&mockReader.buf, strings.NewReader(inLines)) + require.NoError(t, err) + stdio := mock.NewMockStdio(nil, true) + cli := internal.CLI{ActiveConfig: config.Config{Org: "my-default-org"}, StdIO: stdio} + + require.NoError(t, cli.WriteDryRun(context.Background(), &mockReader)) + require.Equal(t, inLines, stdio.Stdout()) +}