feat: port most of influx query from influxdb (#86)

This commit is contained in:
Daniel Moran 2021-05-12 15:18:08 -04:00 committed by GitHub
parent 223ae250a9
commit d22fb717c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 582 additions and 0 deletions

View File

@ -199,6 +199,7 @@ var app = cli.App{
newBucketCmd(),
newCompletionCmd(),
newBucketSchemaCmd(),
newQueryCmd(),
},
}

138
cmd/influx/query.go Normal file
View File

@ -0,0 +1,138 @@
package main
import (
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
"github.com/influxdata/influx-cli/v2/internal/cmd"
"github.com/influxdata/influx-cli/v2/internal/cmd/query"
"github.com/influxdata/influx-cli/v2/pkg/cli/middleware"
"github.com/urfave/cli/v2"
)
func newQueryCmd() *cli.Command {
var orgParams cmd.OrgParams
return &cli.Command{
Name: "query",
Usage: "Execute a Flux query",
Description: "Execute a Flux query provided via the first argument, a file, or stdin",
ArgsUsage: "[query literal or '-' for stdin]",
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
Flags: append(
commonFlagsNoPrint,
&cli.GenericFlag{
Name: "org-id",
Usage: "The ID of the organization",
EnvVars: []string{"INFLUX_ORG_ID"},
Value: &orgParams.OrgID,
},
&cli.StringFlag{
Name: "org",
Usage: "The name of the organization",
Aliases: []string{"o"},
EnvVars: []string{"INFLUX_ORG"},
Destination: &orgParams.OrgName,
},
&cli.StringFlag{
Name: "file",
Usage: "Path to Flux query file",
Aliases: []string{"f"},
},
&cli.BoolFlag{
Name: "raw",
Usage: "Display raw query results",
Aliases: []string{"r"},
},
&cli.StringSliceFlag{
Name: "profilers",
Usage: "Names of Flux profilers to enable",
Aliases: []string{"p"},
},
),
Action: func(ctx *cli.Context) error {
queryString, err := readQuery(ctx)
if err != nil {
return err
}
queryString = strings.TrimSpace(queryString)
if queryString == "" {
return errors.New("no query provided")
}
// The old CLI allowed specifying this either via repeated flags or
// via a single flag w/ a comma-separated value.
rawProfilers := ctx.StringSlice("profilers")
var profilers []string
for _, p := range rawProfilers {
profilers = append(profilers, strings.Split(p, ",")...)
}
params := query.Params{
OrgParams: orgParams,
Query: queryString,
Profilers: profilers,
}
printer := query.RawResultPrinter
if !ctx.Bool("raw") {
return errors.New("--raw or -r must be specified for now")
}
client := query.Client{
CLI: getCLI(ctx),
QueryApi: getAPI(ctx).QueryApi,
ResultPrinter: printer,
}
return client.Query(ctx.Context, &params)
},
}
}
// readQuery reads a Flux query into memory from a file, args, or stdin based on CLI parameters.
func readQuery(ctx *cli.Context) (string, error) {
nargs := ctx.NArg()
file := ctx.String("file")
if nargs > 1 {
return "", fmt.Errorf("at most 1 query string can be specified over the CLI, got %d", ctx.NArg())
}
if nargs == 1 && file != "" {
return "", errors.New("query can be specified via --file or over the CLI, not both")
}
readFile := func(path string) (string, error) {
queryBytes, err := ioutil.ReadFile(path)
if err != nil {
return "", fmt.Errorf("failed to read query from %q: %w", path, err)
}
return string(queryBytes), nil
}
readStdin := func() (string, error) {
queryBytes, err := ioutil.ReadAll(os.Stdin)
if err != nil {
return "", fmt.Errorf("failed to read query from stdin: %w", err)
}
return string(queryBytes), err
}
if file != "" {
return readFile(file)
}
if nargs == 0 {
return readStdin()
}
arg := ctx.Args().Get(0)
// Backwards compatibility.
if strings.HasPrefix(arg, "@") {
return readFile(arg[1:])
} else if arg == "-" {
return readStdin()
} else {
return arg, nil
}
}

151
internal/cmd/query/query.go Normal file
View File

@ -0,0 +1,151 @@
package query
import (
"context"
"errors"
"fmt"
"io"
"github.com/influxdata/influx-cli/v2/internal/api"
"github.com/influxdata/influx-cli/v2/internal/cmd"
)
var ErrMustSpecifyOrg = errors.New("must specify org ID or org name")
type ResultPrinter interface {
PrintQueryResults(resultStream io.ReadCloser, out io.Writer) error
}
type rawResultPrinter struct{}
// RawResultPrinter streams query results directly to the output without
// any parsing or formatting.
var RawResultPrinter ResultPrinter = &rawResultPrinter{}
func (r *rawResultPrinter) PrintQueryResults(resultStream io.ReadCloser, out io.Writer) error {
_, err := io.Copy(out, resultStream)
return err
}
type Client struct {
cmd.CLI
api.QueryApi
ResultPrinter
}
type Params struct {
cmd.OrgParams
Query string
Profilers []string
}
// BuildDefaultAST wraps a raw query string in the AST structure expected
// by the query API, injecting default values expected by the CLI formatter.
func BuildDefaultAST(query string) api.Query {
return api.Query{
Query: query,
Type: api.PtrString("flux"),
Dialect: &api.Dialect{
Annotations: &[]string{"group", "datatype", "default"},
Delimiter: api.PtrString(","),
Header: api.PtrBool(true),
},
}
}
// BuildExternAST constructs a Flux AST tree to import and set the profilers option.
//
// See the docs for more info: https://docs.influxdata.com/influxdb/cloud/reference/flux/stdlib/profiler/
func BuildExternAST(profilers []string) *api.Extern {
// Construct AST statements to import and set the 'profilers' option.
// NOTE: We've purposefully codegen'd a map[string]interface{} schema
// for the field populated here because the API spec for our Flux AST
// generates very hard-to-use models, and our attempts to change
// that have all (so far) broken the codegen for the UI.
//
// We assume that this logic will be changed infrequently enough that
// the lack of type-safety won't be a frequent pain point.
// import "profiler"
profilersImport := map[string]interface{}{
"type": "ImportDeclaration",
"path": map[string]interface{}{
"type": "StringLiteral",
"value": "profiler",
},
}
// "<profiler>" for each profiler
profilerExprs := make([]interface{}, len(profilers))
for i, profiler := range profilers {
profilerExprs[i] = map[string]interface{}{
"type": "StringLiteral",
"value": profiler,
}
}
// ["<profiler>" for each profiler]
profilersArrayExpr := map[string]interface{}{
"type": "ArrayExpression",
"elements": profilerExprs,
}
// profiler.enabledProfilers
profilersMemberExpr := map[string]interface{}{
"type": "MemberExpression",
"object": map[string]interface{}{
"name": "profiler",
"type": "Identifier",
},
"property": map[string]interface{}{
"name": "enabledProfilers",
"type": "Identifier",
},
}
// profiler.enabledProfilers = ["<profiler>" for each profiler]
profilersAssignmentExpr := map[string]interface{}{
"type": "MemberAssignment",
"member": profilersMemberExpr,
"init": profilersArrayExpr,
}
// option profiler.enabledProfilers = ["<profiler>" for each profiler]
profilersOptionExpr := map[string]interface{}{
"type": "OptionStatement",
"assignment": profilersAssignmentExpr,
}
// import "profiler"
// option profiler.enabledProfilers = ["<profiler>" for each profiler]
profilersExternExpr := map[string]interface{}{
"imports": []interface{}{profilersImport},
"body": []interface{}{profilersOptionExpr},
}
extern := api.NewExternWithDefaults()
extern.AdditionalProperties = profilersExternExpr
return extern
}
func (c Client) Query(ctx context.Context, params *Params) error {
if !params.OrgID.Valid() && params.OrgName == "" && c.ActiveConfig.Org == "" {
return ErrMustSpecifyOrg
}
query := BuildDefaultAST(params.Query)
if len(params.Profilers) > 0 {
query.Extern = BuildExternAST(params.Profilers)
}
req := c.PostQuery(ctx).Query(query).AcceptEncoding("gzip")
if params.OrgID.Valid() {
req = req.OrgID(params.OrgID.String())
} else if params.OrgName != "" {
req = req.Org(params.OrgName)
} else {
req = req.Org(c.ActiveConfig.Org)
}
resp, err := req.Execute()
if err != nil {
return fmt.Errorf("failed to execute query: %w", err)
}
defer resp.Close()
return c.PrintQueryResults(resp, c.StdIO)
}

View File

@ -0,0 +1,225 @@
package query_test
import (
"bytes"
"context"
"io/ioutil"
"strings"
"testing"
"github.com/golang/mock/gomock"
"github.com/influxdata/influx-cli/v2/internal/api"
"github.com/influxdata/influx-cli/v2/internal/cmd"
"github.com/influxdata/influx-cli/v2/internal/cmd/query"
"github.com/influxdata/influx-cli/v2/internal/config"
"github.com/influxdata/influx-cli/v2/internal/mock"
"github.com/influxdata/influx-cli/v2/pkg/influxid"
"github.com/stretchr/testify/assert"
tmock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestRawResultPrinter_PrintQueryResults(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
rawTable string
}{
{
name: "empty",
rawTable: "",
},
{
name: "single table",
rawTable: `#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
#default,_result,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,bar
,,0,1921-05-08T15:29:32.475078Z,2021-05-08T15:29:32.475078Z,2021-05-04T18:29:52.764702Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:29:32.475078Z,2021-05-08T15:29:32.475078Z,2021-05-04T19:30:59.67555Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:29:32.475078Z,2021-05-08T15:29:32.475078Z,2021-05-04T19:31:01.876079Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:29:32.475078Z,2021-05-08T15:29:32.475078Z,2021-05-04T19:31:02.499461Z,12345,qux,foo,"""baz"""
`,
},
{
name: "multi table",
rawTable: `#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
#default,_result,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,bar
,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T18:29:52.764702Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T19:30:59.67555Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T19:31:01.876079Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T19:31:02.499461Z,12345,qux,foo,"""baz"""
#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,bar,is_foo
,,1,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-08T15:42:19.567667Z,12345,qux,foo,"""baz""",t
#group,false,false,true,false,false,false,false,false,false,false
#datatype,string,long,string,string,string,long,long,long,long,double
#default,_profiler,,,,,,,,,
,result,table,_measurement,Type,Label,Count,MinDuration,MaxDuration,DurationSum,MeanDuration
,,0,profiler/operator,*influxdb.readFilterSource,ReadRange2,1,367331,367331,367331,367331
`,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
in := ioutil.NopCloser(strings.NewReader(tc.rawTable))
out := bytes.Buffer{}
require.NoError(t, query.RawResultPrinter.PrintQueryResults(in, &out))
require.Equal(t, tc.rawTable, out.String())
})
}
}
func TestQuery(t *testing.T) {
t.Parallel()
// Use dummy data here + the raw output printer to keep things
// focused on the business logic of the Query command; details
// of how results are formatted are tested elsewhere.
fakeQuery := query.BuildDefaultAST("I'm a query!")
fakeResults := "data data data"
orgID, err := influxid.IDFromString("1111111111111111")
require.NoError(t, err)
testCases := []struct {
name string
params query.Params
configOrgName string
registerExpectations func(t *testing.T, queryApi *mock.MockQueryApi)
expectInErr string
}{
{
name: "by org ID",
params: query.Params{
OrgParams: cmd.OrgParams{
OrgID: orgID,
},
Query: fakeQuery.Query,
},
configOrgName: "default-org",
registerExpectations: func(t *testing.T, queryApi *mock.MockQueryApi) {
queryApi.EXPECT().PostQuery(gomock.Any()).Return(api.ApiPostQueryRequest{ApiService: queryApi})
queryApi.EXPECT().PostQueryExecute(tmock.MatchedBy(func(in api.ApiPostQueryRequest) bool {
body := in.GetQuery()
return assert.NotNil(t, body) &&
assert.Equal(t, fakeQuery, *body) &&
assert.Equal(t, orgID.String(), *in.GetOrgID()) &&
assert.Nil(t, in.GetOrg())
})).Return(ioutil.NopCloser(strings.NewReader(fakeResults)), nil)
},
},
{
name: "by org name",
params: query.Params{
OrgParams: cmd.OrgParams{
OrgName: "my-org",
},
Query: fakeQuery.Query,
},
configOrgName: "default-org",
registerExpectations: func(t *testing.T, queryApi *mock.MockQueryApi) {
queryApi.EXPECT().PostQuery(gomock.Any()).Return(api.ApiPostQueryRequest{ApiService: queryApi})
queryApi.EXPECT().PostQueryExecute(tmock.MatchedBy(func(in api.ApiPostQueryRequest) bool {
body := in.GetQuery()
return assert.NotNil(t, body) &&
assert.Equal(t, fakeQuery, *body) &&
assert.Equal(t, "my-org", *in.GetOrg()) &&
assert.Nil(t, in.GetOrgID())
})).Return(ioutil.NopCloser(strings.NewReader(fakeResults)), nil)
},
},
{
name: "by org name from config",
params: query.Params{
OrgParams: cmd.OrgParams{},
Query: fakeQuery.Query,
},
configOrgName: "default-org",
registerExpectations: func(t *testing.T, queryApi *mock.MockQueryApi) {
queryApi.EXPECT().PostQuery(gomock.Any()).Return(api.ApiPostQueryRequest{ApiService: queryApi})
queryApi.EXPECT().PostQueryExecute(tmock.MatchedBy(func(in api.ApiPostQueryRequest) bool {
body := in.GetQuery()
return assert.NotNil(t, body) &&
assert.Equal(t, fakeQuery, *body) &&
assert.Equal(t, "default-org", *in.GetOrg()) &&
assert.Nil(t, in.GetOrgID())
})).Return(ioutil.NopCloser(strings.NewReader(fakeResults)), nil)
},
},
{
name: "no org specified",
params: query.Params{
OrgParams: cmd.OrgParams{},
Query: fakeQuery.Query,
},
expectInErr: query.ErrMustSpecifyOrg.Error(),
},
{
name: "with profilers",
params: query.Params{
OrgParams: cmd.OrgParams{},
Query: fakeQuery.Query,
Profilers: []string{"foo", "bar"},
},
configOrgName: "default-org",
registerExpectations: func(t *testing.T, queryApi *mock.MockQueryApi) {
queryApi.EXPECT().PostQuery(gomock.Any()).Return(api.ApiPostQueryRequest{ApiService: queryApi})
expectedBody := fakeQuery
expectedBody.Extern = query.BuildExternAST([]string{"foo", "bar"})
queryApi.EXPECT().PostQueryExecute(tmock.MatchedBy(func(in api.ApiPostQueryRequest) bool {
body := in.GetQuery()
return assert.NotNil(t, body) &&
assert.Equal(t, expectedBody, *body) &&
assert.Equal(t, "default-org", *in.GetOrg()) &&
assert.Nil(t, in.GetOrgID())
})).Return(ioutil.NopCloser(strings.NewReader(fakeResults)), nil)
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
stdio := mock.NewMockStdIO(ctrl)
writtenBytes := bytes.Buffer{}
stdio.EXPECT().Write(gomock.Any()).DoAndReturn(writtenBytes.Write).AnyTimes()
queryApi := mock.NewMockQueryApi(ctrl)
if tc.registerExpectations != nil {
tc.registerExpectations(t, queryApi)
}
cli := query.Client{
CLI: cmd.CLI{ActiveConfig: config.Config{Org: tc.configOrgName}, StdIO: stdio},
QueryApi: queryApi,
ResultPrinter: query.RawResultPrinter,
}
err := cli.Query(context.Background(), &tc.params)
if tc.expectInErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectInErr)
require.Empty(t, writtenBytes.String())
return
}
require.NoError(t, err)
require.Equal(t, fakeResults, writtenBytes.String())
})
}
}

View File

@ -0,0 +1,66 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/influxdata/influx-cli/v2/internal/api (interfaces: QueryApi)
// Package mock is a generated GoMock package.
package mock
import (
context "context"
io "io"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
api "github.com/influxdata/influx-cli/v2/internal/api"
)
// MockQueryApi is a mock of QueryApi interface.
type MockQueryApi struct {
ctrl *gomock.Controller
recorder *MockQueryApiMockRecorder
}
// MockQueryApiMockRecorder is the mock recorder for MockQueryApi.
type MockQueryApiMockRecorder struct {
mock *MockQueryApi
}
// NewMockQueryApi creates a new mock instance.
func NewMockQueryApi(ctrl *gomock.Controller) *MockQueryApi {
mock := &MockQueryApi{ctrl: ctrl}
mock.recorder = &MockQueryApiMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockQueryApi) EXPECT() *MockQueryApiMockRecorder {
return m.recorder
}
// PostQuery mocks base method.
func (m *MockQueryApi) PostQuery(arg0 context.Context) api.ApiPostQueryRequest {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PostQuery", arg0)
ret0, _ := ret[0].(api.ApiPostQueryRequest)
return ret0
}
// PostQuery indicates an expected call of PostQuery.
func (mr *MockQueryApiMockRecorder) PostQuery(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostQuery", reflect.TypeOf((*MockQueryApi)(nil).PostQuery), arg0)
}
// PostQueryExecute mocks base method.
func (m *MockQueryApi) PostQueryExecute(arg0 api.ApiPostQueryRequest) (io.ReadCloser, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PostQueryExecute", arg0)
ret0, _ := ret[0].(io.ReadCloser)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// PostQueryExecute indicates an expected call of PostQueryExecute.
func (mr *MockQueryApiMockRecorder) PostQueryExecute(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostQueryExecute", reflect.TypeOf((*MockQueryApi)(nil).PostQueryExecute), arg0)
}

View File

@ -7,6 +7,7 @@ package mock
//go:generate go run github.com/golang/mock/mockgen -package mock -destination api_organizations.gen.go github.com/influxdata/influx-cli/v2/internal/api OrganizationsApi
//go:generate go run github.com/golang/mock/mockgen -package mock -destination api_setup.gen.go github.com/influxdata/influx-cli/v2/internal/api SetupApi
//go:generate go run github.com/golang/mock/mockgen -package mock -destination api_write.gen.go github.com/influxdata/influx-cli/v2/internal/api WriteApi
//go:generate go run github.com/golang/mock/mockgen -package mock -destination api_query.gen.go github.com/influxdata/influx-cli/v2/internal/api QueryApi
// Other mocks
//go:generate go run github.com/golang/mock/mockgen -package mock -destination config.gen.go -mock_names Service=MockConfigService github.com/influxdata/influx-cli/v2/internal/config Service