diff --git a/cmd/influx/main.go b/cmd/influx/main.go index 089b437..dd815dc 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -199,6 +199,7 @@ var app = cli.App{ newBucketCmd(), newCompletionCmd(), newBucketSchemaCmd(), + newQueryCmd(), }, } diff --git a/cmd/influx/query.go b/cmd/influx/query.go new file mode 100644 index 0000000..4d6bc81 --- /dev/null +++ b/cmd/influx/query.go @@ -0,0 +1,138 @@ +package main + +import ( + "errors" + "fmt" + "io/ioutil" + "os" + "strings" + + "github.com/influxdata/influx-cli/v2/internal/cmd" + "github.com/influxdata/influx-cli/v2/internal/cmd/query" + "github.com/influxdata/influx-cli/v2/pkg/cli/middleware" + "github.com/urfave/cli/v2" +) + +func newQueryCmd() *cli.Command { + var orgParams cmd.OrgParams + return &cli.Command{ + Name: "query", + Usage: "Execute a Flux query", + Description: "Execute a Flux query provided via the first argument, a file, or stdin", + ArgsUsage: "[query literal or '-' for stdin]", + Before: middleware.WithBeforeFns(withCli(), withApi(true)), + Flags: append( + commonFlagsNoPrint, + &cli.GenericFlag{ + Name: "org-id", + Usage: "The ID of the organization", + EnvVars: []string{"INFLUX_ORG_ID"}, + Value: &orgParams.OrgID, + }, + &cli.StringFlag{ + Name: "org", + Usage: "The name of the organization", + Aliases: []string{"o"}, + EnvVars: []string{"INFLUX_ORG"}, + Destination: &orgParams.OrgName, + }, + &cli.StringFlag{ + Name: "file", + Usage: "Path to Flux query file", + Aliases: []string{"f"}, + }, + &cli.BoolFlag{ + Name: "raw", + Usage: "Display raw query results", + Aliases: []string{"r"}, + }, + &cli.StringSliceFlag{ + Name: "profilers", + Usage: "Names of Flux profilers to enable", + Aliases: []string{"p"}, + }, + ), + Action: func(ctx *cli.Context) error { + queryString, err := readQuery(ctx) + if err != nil { + return err + } + queryString = strings.TrimSpace(queryString) + if queryString == "" { + return errors.New("no query provided") + } + + // The old CLI allowed specifying this either via repeated flags or + // via a single flag w/ a comma-separated value. + rawProfilers := ctx.StringSlice("profilers") + var profilers []string + for _, p := range rawProfilers { + profilers = append(profilers, strings.Split(p, ",")...) + } + + params := query.Params{ + OrgParams: orgParams, + Query: queryString, + Profilers: profilers, + } + + printer := query.RawResultPrinter + if !ctx.Bool("raw") { + return errors.New("--raw or -r must be specified for now") + } + + client := query.Client{ + CLI: getCLI(ctx), + QueryApi: getAPI(ctx).QueryApi, + ResultPrinter: printer, + } + return client.Query(ctx.Context, ¶ms) + }, + } +} + +// readQuery reads a Flux query into memory from a file, args, or stdin based on CLI parameters. +func readQuery(ctx *cli.Context) (string, error) { + nargs := ctx.NArg() + file := ctx.String("file") + + if nargs > 1 { + return "", fmt.Errorf("at most 1 query string can be specified over the CLI, got %d", ctx.NArg()) + } + if nargs == 1 && file != "" { + return "", errors.New("query can be specified via --file or over the CLI, not both") + } + + readFile := func(path string) (string, error) { + queryBytes, err := ioutil.ReadFile(path) + if err != nil { + return "", fmt.Errorf("failed to read query from %q: %w", path, err) + } + return string(queryBytes), nil + } + + readStdin := func() (string, error) { + queryBytes, err := ioutil.ReadAll(os.Stdin) + if err != nil { + return "", fmt.Errorf("failed to read query from stdin: %w", err) + } + return string(queryBytes), err + } + + if file != "" { + return readFile(file) + } + if nargs == 0 { + return readStdin() + } + + arg := ctx.Args().Get(0) + // Backwards compatibility. + if strings.HasPrefix(arg, "@") { + return readFile(arg[1:]) + } else if arg == "-" { + return readStdin() + } else { + return arg, nil + } +} diff --git a/internal/cmd/query/query.go b/internal/cmd/query/query.go new file mode 100644 index 0000000..0421a6b --- /dev/null +++ b/internal/cmd/query/query.go @@ -0,0 +1,151 @@ +package query + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/influxdata/influx-cli/v2/internal/api" + "github.com/influxdata/influx-cli/v2/internal/cmd" +) + +var ErrMustSpecifyOrg = errors.New("must specify org ID or org name") + +type ResultPrinter interface { + PrintQueryResults(resultStream io.ReadCloser, out io.Writer) error +} + +type rawResultPrinter struct{} + +// RawResultPrinter streams query results directly to the output without +// any parsing or formatting. +var RawResultPrinter ResultPrinter = &rawResultPrinter{} + +func (r *rawResultPrinter) PrintQueryResults(resultStream io.ReadCloser, out io.Writer) error { + _, err := io.Copy(out, resultStream) + return err +} + +type Client struct { + cmd.CLI + api.QueryApi + ResultPrinter +} + +type Params struct { + cmd.OrgParams + Query string + Profilers []string +} + +// BuildDefaultAST wraps a raw query string in the AST structure expected +// by the query API, injecting default values expected by the CLI formatter. +func BuildDefaultAST(query string) api.Query { + return api.Query{ + Query: query, + Type: api.PtrString("flux"), + Dialect: &api.Dialect{ + Annotations: &[]string{"group", "datatype", "default"}, + Delimiter: api.PtrString(","), + Header: api.PtrBool(true), + }, + } +} + +// BuildExternAST constructs a Flux AST tree to import and set the profilers option. +// +// See the docs for more info: https://docs.influxdata.com/influxdb/cloud/reference/flux/stdlib/profiler/ +func BuildExternAST(profilers []string) *api.Extern { + // Construct AST statements to import and set the 'profilers' option. + // NOTE: We've purposefully codegen'd a map[string]interface{} schema + // for the field populated here because the API spec for our Flux AST + // generates very hard-to-use models, and our attempts to change + // that have all (so far) broken the codegen for the UI. + // + // We assume that this logic will be changed infrequently enough that + // the lack of type-safety won't be a frequent pain point. + + // import "profiler" + profilersImport := map[string]interface{}{ + "type": "ImportDeclaration", + "path": map[string]interface{}{ + "type": "StringLiteral", + "value": "profiler", + }, + } + // "" for each profiler + profilerExprs := make([]interface{}, len(profilers)) + for i, profiler := range profilers { + profilerExprs[i] = map[string]interface{}{ + "type": "StringLiteral", + "value": profiler, + } + } + // ["" for each profiler] + profilersArrayExpr := map[string]interface{}{ + "type": "ArrayExpression", + "elements": profilerExprs, + } + // profiler.enabledProfilers + profilersMemberExpr := map[string]interface{}{ + "type": "MemberExpression", + "object": map[string]interface{}{ + "name": "profiler", + "type": "Identifier", + }, + "property": map[string]interface{}{ + "name": "enabledProfilers", + "type": "Identifier", + }, + } + // profiler.enabledProfilers = ["" for each profiler] + profilersAssignmentExpr := map[string]interface{}{ + "type": "MemberAssignment", + "member": profilersMemberExpr, + "init": profilersArrayExpr, + } + // option profiler.enabledProfilers = ["" for each profiler] + profilersOptionExpr := map[string]interface{}{ + "type": "OptionStatement", + "assignment": profilersAssignmentExpr, + } + // import "profiler" + // option profiler.enabledProfilers = ["" for each profiler] + profilersExternExpr := map[string]interface{}{ + "imports": []interface{}{profilersImport}, + "body": []interface{}{profilersOptionExpr}, + } + + extern := api.NewExternWithDefaults() + extern.AdditionalProperties = profilersExternExpr + return extern +} + +func (c Client) Query(ctx context.Context, params *Params) error { + if !params.OrgID.Valid() && params.OrgName == "" && c.ActiveConfig.Org == "" { + return ErrMustSpecifyOrg + } + + query := BuildDefaultAST(params.Query) + if len(params.Profilers) > 0 { + query.Extern = BuildExternAST(params.Profilers) + } + + req := c.PostQuery(ctx).Query(query).AcceptEncoding("gzip") + if params.OrgID.Valid() { + req = req.OrgID(params.OrgID.String()) + } else if params.OrgName != "" { + req = req.Org(params.OrgName) + } else { + req = req.Org(c.ActiveConfig.Org) + } + + resp, err := req.Execute() + if err != nil { + return fmt.Errorf("failed to execute query: %w", err) + } + defer resp.Close() + + return c.PrintQueryResults(resp, c.StdIO) +} diff --git a/internal/cmd/query/query_test.go b/internal/cmd/query/query_test.go new file mode 100644 index 0000000..5cc2ca6 --- /dev/null +++ b/internal/cmd/query/query_test.go @@ -0,0 +1,225 @@ +package query_test + +import ( + "bytes" + "context" + "io/ioutil" + "strings" + "testing" + + "github.com/golang/mock/gomock" + "github.com/influxdata/influx-cli/v2/internal/api" + "github.com/influxdata/influx-cli/v2/internal/cmd" + "github.com/influxdata/influx-cli/v2/internal/cmd/query" + "github.com/influxdata/influx-cli/v2/internal/config" + "github.com/influxdata/influx-cli/v2/internal/mock" + "github.com/influxdata/influx-cli/v2/pkg/influxid" + "github.com/stretchr/testify/assert" + tmock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestRawResultPrinter_PrintQueryResults(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + rawTable string + }{ + { + name: "empty", + rawTable: "", + }, + { + name: "single table", + rawTable: `#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string +#default,_result,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,bar +,,0,1921-05-08T15:29:32.475078Z,2021-05-08T15:29:32.475078Z,2021-05-04T18:29:52.764702Z,12345,qux,foo,"""baz""" +,,0,1921-05-08T15:29:32.475078Z,2021-05-08T15:29:32.475078Z,2021-05-04T19:30:59.67555Z,12345,qux,foo,"""baz""" +,,0,1921-05-08T15:29:32.475078Z,2021-05-08T15:29:32.475078Z,2021-05-04T19:31:01.876079Z,12345,qux,foo,"""baz""" +,,0,1921-05-08T15:29:32.475078Z,2021-05-08T15:29:32.475078Z,2021-05-04T19:31:02.499461Z,12345,qux,foo,"""baz""" +`, + }, + { + name: "multi table", + rawTable: `#group,false,false,true,true,false,false,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string +#default,_result,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,bar +,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T18:29:52.764702Z,12345,qux,foo,"""baz""" +,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T19:30:59.67555Z,12345,qux,foo,"""baz""" +,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T19:31:01.876079Z,12345,qux,foo,"""baz""" +,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T19:31:02.499461Z,12345,qux,foo,"""baz""" + +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,bar,is_foo +,,1,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-08T15:42:19.567667Z,12345,qux,foo,"""baz""",t + +#group,false,false,true,false,false,false,false,false,false,false +#datatype,string,long,string,string,string,long,long,long,long,double +#default,_profiler,,,,,,,,, +,result,table,_measurement,Type,Label,Count,MinDuration,MaxDuration,DurationSum,MeanDuration +,,0,profiler/operator,*influxdb.readFilterSource,ReadRange2,1,367331,367331,367331,367331 +`, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + in := ioutil.NopCloser(strings.NewReader(tc.rawTable)) + out := bytes.Buffer{} + require.NoError(t, query.RawResultPrinter.PrintQueryResults(in, &out)) + require.Equal(t, tc.rawTable, out.String()) + }) + } +} + +func TestQuery(t *testing.T) { + t.Parallel() + + // Use dummy data here + the raw output printer to keep things + // focused on the business logic of the Query command; details + // of how results are formatted are tested elsewhere. + fakeQuery := query.BuildDefaultAST("I'm a query!") + fakeResults := "data data data" + + orgID, err := influxid.IDFromString("1111111111111111") + require.NoError(t, err) + + testCases := []struct { + name string + params query.Params + configOrgName string + registerExpectations func(t *testing.T, queryApi *mock.MockQueryApi) + expectInErr string + }{ + { + name: "by org ID", + params: query.Params{ + OrgParams: cmd.OrgParams{ + OrgID: orgID, + }, + Query: fakeQuery.Query, + }, + configOrgName: "default-org", + registerExpectations: func(t *testing.T, queryApi *mock.MockQueryApi) { + queryApi.EXPECT().PostQuery(gomock.Any()).Return(api.ApiPostQueryRequest{ApiService: queryApi}) + queryApi.EXPECT().PostQueryExecute(tmock.MatchedBy(func(in api.ApiPostQueryRequest) bool { + body := in.GetQuery() + return assert.NotNil(t, body) && + assert.Equal(t, fakeQuery, *body) && + assert.Equal(t, orgID.String(), *in.GetOrgID()) && + assert.Nil(t, in.GetOrg()) + })).Return(ioutil.NopCloser(strings.NewReader(fakeResults)), nil) + }, + }, + { + name: "by org name", + params: query.Params{ + OrgParams: cmd.OrgParams{ + OrgName: "my-org", + }, + Query: fakeQuery.Query, + }, + configOrgName: "default-org", + registerExpectations: func(t *testing.T, queryApi *mock.MockQueryApi) { + queryApi.EXPECT().PostQuery(gomock.Any()).Return(api.ApiPostQueryRequest{ApiService: queryApi}) + queryApi.EXPECT().PostQueryExecute(tmock.MatchedBy(func(in api.ApiPostQueryRequest) bool { + body := in.GetQuery() + return assert.NotNil(t, body) && + assert.Equal(t, fakeQuery, *body) && + assert.Equal(t, "my-org", *in.GetOrg()) && + assert.Nil(t, in.GetOrgID()) + })).Return(ioutil.NopCloser(strings.NewReader(fakeResults)), nil) + }, + }, + { + name: "by org name from config", + params: query.Params{ + OrgParams: cmd.OrgParams{}, + Query: fakeQuery.Query, + }, + configOrgName: "default-org", + registerExpectations: func(t *testing.T, queryApi *mock.MockQueryApi) { + queryApi.EXPECT().PostQuery(gomock.Any()).Return(api.ApiPostQueryRequest{ApiService: queryApi}) + queryApi.EXPECT().PostQueryExecute(tmock.MatchedBy(func(in api.ApiPostQueryRequest) bool { + body := in.GetQuery() + return assert.NotNil(t, body) && + assert.Equal(t, fakeQuery, *body) && + assert.Equal(t, "default-org", *in.GetOrg()) && + assert.Nil(t, in.GetOrgID()) + })).Return(ioutil.NopCloser(strings.NewReader(fakeResults)), nil) + }, + }, + { + name: "no org specified", + params: query.Params{ + OrgParams: cmd.OrgParams{}, + Query: fakeQuery.Query, + }, + expectInErr: query.ErrMustSpecifyOrg.Error(), + }, + { + name: "with profilers", + params: query.Params{ + OrgParams: cmd.OrgParams{}, + Query: fakeQuery.Query, + Profilers: []string{"foo", "bar"}, + }, + configOrgName: "default-org", + registerExpectations: func(t *testing.T, queryApi *mock.MockQueryApi) { + queryApi.EXPECT().PostQuery(gomock.Any()).Return(api.ApiPostQueryRequest{ApiService: queryApi}) + + expectedBody := fakeQuery + expectedBody.Extern = query.BuildExternAST([]string{"foo", "bar"}) + + queryApi.EXPECT().PostQueryExecute(tmock.MatchedBy(func(in api.ApiPostQueryRequest) bool { + body := in.GetQuery() + return assert.NotNil(t, body) && + assert.Equal(t, expectedBody, *body) && + assert.Equal(t, "default-org", *in.GetOrg()) && + assert.Nil(t, in.GetOrgID()) + })).Return(ioutil.NopCloser(strings.NewReader(fakeResults)), nil) + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + stdio := mock.NewMockStdIO(ctrl) + writtenBytes := bytes.Buffer{} + stdio.EXPECT().Write(gomock.Any()).DoAndReturn(writtenBytes.Write).AnyTimes() + + queryApi := mock.NewMockQueryApi(ctrl) + if tc.registerExpectations != nil { + tc.registerExpectations(t, queryApi) + } + cli := query.Client{ + CLI: cmd.CLI{ActiveConfig: config.Config{Org: tc.configOrgName}, StdIO: stdio}, + QueryApi: queryApi, + ResultPrinter: query.RawResultPrinter, + } + + err := cli.Query(context.Background(), &tc.params) + if tc.expectInErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectInErr) + require.Empty(t, writtenBytes.String()) + return + } + require.NoError(t, err) + require.Equal(t, fakeResults, writtenBytes.String()) + }) + } +} diff --git a/internal/mock/api_query.gen.go b/internal/mock/api_query.gen.go new file mode 100644 index 0000000..c7c5f01 --- /dev/null +++ b/internal/mock/api_query.gen.go @@ -0,0 +1,66 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/influxdata/influx-cli/v2/internal/api (interfaces: QueryApi) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + io "io" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + api "github.com/influxdata/influx-cli/v2/internal/api" +) + +// MockQueryApi is a mock of QueryApi interface. +type MockQueryApi struct { + ctrl *gomock.Controller + recorder *MockQueryApiMockRecorder +} + +// MockQueryApiMockRecorder is the mock recorder for MockQueryApi. +type MockQueryApiMockRecorder struct { + mock *MockQueryApi +} + +// NewMockQueryApi creates a new mock instance. +func NewMockQueryApi(ctrl *gomock.Controller) *MockQueryApi { + mock := &MockQueryApi{ctrl: ctrl} + mock.recorder = &MockQueryApiMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockQueryApi) EXPECT() *MockQueryApiMockRecorder { + return m.recorder +} + +// PostQuery mocks base method. +func (m *MockQueryApi) PostQuery(arg0 context.Context) api.ApiPostQueryRequest { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PostQuery", arg0) + ret0, _ := ret[0].(api.ApiPostQueryRequest) + return ret0 +} + +// PostQuery indicates an expected call of PostQuery. +func (mr *MockQueryApiMockRecorder) PostQuery(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostQuery", reflect.TypeOf((*MockQueryApi)(nil).PostQuery), arg0) +} + +// PostQueryExecute mocks base method. +func (m *MockQueryApi) PostQueryExecute(arg0 api.ApiPostQueryRequest) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PostQueryExecute", arg0) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PostQueryExecute indicates an expected call of PostQueryExecute. +func (mr *MockQueryApiMockRecorder) PostQueryExecute(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostQueryExecute", reflect.TypeOf((*MockQueryApi)(nil).PostQueryExecute), arg0) +} diff --git a/internal/mock/gen.go b/internal/mock/gen.go index 4927b92..59e52df 100644 --- a/internal/mock/gen.go +++ b/internal/mock/gen.go @@ -7,6 +7,7 @@ package mock //go:generate go run github.com/golang/mock/mockgen -package mock -destination api_organizations.gen.go github.com/influxdata/influx-cli/v2/internal/api OrganizationsApi //go:generate go run github.com/golang/mock/mockgen -package mock -destination api_setup.gen.go github.com/influxdata/influx-cli/v2/internal/api SetupApi //go:generate go run github.com/golang/mock/mockgen -package mock -destination api_write.gen.go github.com/influxdata/influx-cli/v2/internal/api WriteApi +//go:generate go run github.com/golang/mock/mockgen -package mock -destination api_query.gen.go github.com/influxdata/influx-cli/v2/internal/api QueryApi // Other mocks //go:generate go run github.com/golang/mock/mockgen -package mock -destination config.gen.go -mock_names Service=MockConfigService github.com/influxdata/influx-cli/v2/internal/config Service