936 lines
26 KiB
Go

package v1shell
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"reflect"
"regexp"
"runtime"
"sort"
"strings"
"text/tabwriter"
tea "github.com/charmbracelet/bubbletea"
"github.com/fatih/color"
"github.com/influxdata/go-prompt"
"github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influx-cli/v2/clients"
"github.com/muesli/termenv"
)
type Client struct {
clients.CLI
PersistentQueryParams
api.LegacyQueryApi
api.PingApi
api.OrganizationsApi
api.LegacyWriteApi
api.DBRPsApi
}
type PersistentQueryParams struct {
clients.OrgParams
Database string
RetentionPolicy string
Precision string
Format FormatType
Pretty bool
Scientific bool
// Autocompletion Storage
historyFilePath string
historyLimit int
Databases []string
RetentionPolicies []string
Measurements []string
}
func DefaultPersistentQueryParams() PersistentQueryParams {
return PersistentQueryParams{
Format: TableFormat,
Precision: "ns",
historyLimit: 1000,
}
}
func (c *Client) readHistory() []string {
// Attempt to load the history file.
if c.historyFilePath != "" {
if historyFile, err := os.Open(c.historyFilePath); err == nil {
var history []string
scanner := bufio.NewScanner(historyFile)
for scanner.Scan() {
history = append(history, scanner.Text())
}
historyFile.Close()
// Limit to last n elements
if len(history) > c.historyLimit {
history = history[len(history)-c.historyLimit:]
}
return history
}
}
return []string{}
}
func (c *Client) rewriteHistoryFile(history []string) {
if c.historyFilePath != "" {
if historyFile, err := os.Create(c.historyFilePath); err == nil {
historyFile.WriteString(strings.Join(history, "\n"))
historyFile.Close()
}
}
}
func (c *Client) writeCommandToHistory(cmd string) {
if c.historyFilePath != "" {
if historyFile, err := os.OpenFile(c.historyFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666); err == nil {
historyFile.WriteString(strings.TrimSpace(cmd) + "\n")
historyFile.Close()
}
}
}
func (c *Client) clear(cmd string) {
args := strings.Split(strings.TrimSuffix(strings.TrimSpace(cmd), ";"), " ")
v := strings.ToLower(strings.Join(args[1:], " "))
switch v {
case "database", "db":
c.Database = ""
c.RetentionPolicies = []string{}
fmt.Println("database context cleared")
return
case "retention policy", "rp":
c.RetentionPolicy = ""
fmt.Println("retention policy context cleared")
return
default:
if len(args) > 1 {
fmt.Printf("invalid command %q.\n", v)
}
fmt.Println(`Possible commands for 'clear' are:
# Clear the database context
clear database
clear db
# Clear the retention policy context
clear retention policy
clear rp
`)
}
}
func (c *Client) Create(ctx context.Context) error {
res, err := c.GetPing(ctx).ExecuteWithHttpInfo()
if err != nil {
color.Red("Unable to connect to InfluxDB")
return err
}
build := res.Header.Get("X-Influxdb-Build")
version := res.Header.Get("X-Influxdb-Version")
color.Cyan("Connected to InfluxDB %s %s", build, version)
// compute historyFilePath at REPL start
// Only load/write history if HOME environment variable is set.
var historyDir string
if runtime.GOOS == "windows" {
if userDir := os.Getenv("USERPROFILE"); userDir != "" {
historyDir = userDir
}
}
if homeDir := os.Getenv("HOME"); homeDir != "" {
historyDir = homeDir
}
var history []string
if historyDir != "" {
c.historyFilePath = filepath.Join(historyDir, ".influx_history")
history = c.readHistory()
// rewriting history now truncates the history file down to c.historyLimit lines of history
c.rewriteHistoryFile(history)
}
p := prompt.New(c.executor,
c.completer,
prompt.OptionTitle("InfluxQL Shell"),
prompt.OptionHistory(history),
prompt.OptionDescriptionTextColor(prompt.Cyan),
prompt.OptionPrefixTextColor(prompt.Green),
prompt.OptionCompletionWordSeparator(" ", "."),
)
c.Databases, _ = c.GetDatabases(ctx)
p.Run()
return nil
}
func (c *Client) gopher() {
color.Cyan(Gopher)
}
// The logic for the main prompt that is run in the REPL loop
func (c *Client) executor(cmd string) {
if cmd == "" {
return
}
defer c.writeCommandToHistory(cmd)
cmdArgs := strings.Split(cmd, " ")
switch strings.ToLower(cmdArgs[0]) {
case "quit", "exit":
color.HiBlack("Goodbye!")
os.Exit(0)
case "gopher":
c.gopher()
case "node":
color.Yellow("The 'node' command is enterprise only, not available in the influx 2.x CLI - were you looking for the 1.x InfluxDB CLI?")
case "consistency":
color.Yellow("The 'consistency' command is not available in the influx 2.x CLI - were you looking for the 1.x InfluxDB CLI?")
case "help":
c.help()
case "history":
color.HiBlack(strings.Join(c.readHistory(), "\n"))
case "format":
c.setFormat(cmdArgs)
case "precision":
c.setPrecision(cmdArgs)
case "settings":
c.settings()
case "pretty":
c.togglePretty()
case "scientific":
c.toggleScientific()
case "use":
c.use(cmdArgs)
case "insert":
c.insert(cmd)
case "clear":
c.clear(cmd)
default:
c.runAndShowQuery(cmd)
}
}
// Create a regex string for a named InfluxQL identifier, quoted or unquoted
func identRegex(name string) string {
return `((?P<` + name + `>\w+)|(\"(?P<` + name + `_quote>.+?)\"))`
}
// Get the value of a named InfluxQL identifier from a regex match map.
// Returns empty string if no match
func getIdentFromMatches(matches *map[string]string, name string) string {
if val, ok := (*matches)[name]; ok && val != "" {
return val
} else if val, ok := (*matches)[name+"_quote"]; ok && val != "" {
return val
}
return ""
}
// Create a regex match map from a regexp with named groups.
// Returns nil if no match.
func reSubMatchMap(r *regexp.Regexp, str string) *map[string]string {
match := r.FindStringSubmatch(str)
if match == nil {
return nil
}
subMatchMap := make(map[string]string)
for i, name := range r.SubexpNames() {
if i != 0 {
subMatchMap[name] = match[i]
}
}
return &subMatchMap
}
// Returns parsed database, retention policy, point, and if the command was an INSERT statement
// if db and rp are both blank and command was INSERT statement, it was an "INSERT <point>" statement
func ParseInsert(cmd string) (string, string, string, bool) {
// the (?i) clause makes the regex match case-insensitive
var insertIntoStart string = `^(?i)INSERT(\s+)INTO`
var insertIntoRegex string = insertIntoStart + `(\s+)` + identRegex("db") + `(\.` + identRegex("rp") + `)?(\s+)(?P<point>.+)$`
var insertRegex string = `^(?i)INSERT(\s+)(?P<point>.+)$`
var db string
var rp string
var point string
insertRgx := regexp.MustCompile(insertRegex)
insertIntoStartRgx := regexp.MustCompile(insertIntoStart)
insertIntoRgx := regexp.MustCompile(insertIntoRegex)
insertMatches := reSubMatchMap(insertRgx, cmd)
insertIntoMatches := reSubMatchMap(insertIntoRgx, cmd)
if insertIntoMatches != nil {
db = getIdentFromMatches(insertIntoMatches, "db")
rp = getIdentFromMatches(insertIntoMatches, "rp")
point = getIdentFromMatches(insertIntoMatches, "point")
} else if !insertIntoStartRgx.Match([]byte(cmd)) && insertMatches != nil {
point = getIdentFromMatches(insertMatches, "point")
} else {
return "", "", "", false
}
return db, rp, point, true
}
func (c Client) insert(cmd string) {
db, rp, point, isInsertCmd := ParseInsert(cmd)
if !isInsertCmd || point == "" {
color.Red("Expected \"INSERT INTO <database>.<retention_policy> <point>\" OR \"INSERT <point>\".")
return
} else if db == "" && rp == "" { // this is an "INSERT <point>" command
db = c.Database
rp = c.RetentionPolicy
}
buf := bytes.Buffer{}
gzw := gzip.NewWriter(&buf)
_, err := gzw.Write([]byte(point))
gzw.Close()
if err != nil {
color.Red("Failed to gzip points")
return
}
switch c.Precision {
case "h", "m", "rfc3339":
color.Red("Current precision %q unsupported for writes. Use precision [s, ms, ns, us]", c.Precision)
return
}
ctx := context.Background()
writeReq := c.PostLegacyWrite(ctx).
Db(db).
Rp(rp).
Precision(c.Precision).
ContentEncoding("gzip").
Body(buf.String())
if err := writeReq.Execute(); err != nil {
if err.Error() == "" {
err = ctx.Err()
if err == context.Canceled {
err = errors.New("aborted by user")
} else if err == nil {
err = errors.New("no data received")
}
}
color.Red("ERR: %v", err)
if c.Database == "" {
color.Yellow("Note: error may be due to not setting a database or retention policy.")
color.Yellow(`Please set a database with the command "use <database>"`)
return
}
}
}
type FormatType string
type FormatFunc func(api.InfluxqlJsonResponse)
var (
CsvFormat FormatType = "csv"
JsonFormat FormatType = "json"
ColumnFormat FormatType = "column"
TableFormat FormatType = "table"
)
func (c *Client) runAndShowQuery(query string) {
// TODO: guide users trying to use deprecated InfluxQL queries: https://github.com/influxdata/influx-cli/issues/397
ctx := context.Background()
responseStr, err := c.query(ctx, query)
if err != nil {
if err.Error() == "" {
err = ctx.Err()
if err == context.Canceled {
err = errors.New("aborted by user")
} else if err == nil {
err = errors.New("no data received")
}
}
color.Red("ERR: %v", err)
return
}
var response api.InfluxqlJsonResponse
if err := json.Unmarshal([]byte(responseStr), &response); err != nil {
color.Red("Failed to parse JSON response: %v", err)
if c.Database == "" {
color.Yellow("Warning: It is possible this error is due to not setting a database.")
color.Yellow(`Please set a database with the command "use <database>".`)
}
return
}
displayMap := map[FormatType]FormatFunc{
CsvFormat: c.outputCsv,
JsonFormat: c.outputJson,
ColumnFormat: c.outputColumns,
TableFormat: c.outputTable,
}
displayFunc := displayMap[c.Format]
displayFunc(response)
}
func (c *Client) help() {
fmt.Println(`Usage:
use <db_name> sets current database
format <format> specifies the format of the server responses: json, csv, column, table
pretty toggles pretty print for the json format
scientific toggles scientific numeric format for table format
precision <format> specifies the format of the timestamp: rfc3339, h, m, s, ms, u or ns
history displays command history
settings outputs the current settings for the shell
clear clears settings such as database or retention policy. run 'clear' for help
exit/quit/ctrl+d quits the influx shell
show databases show database names
show series show series information
show measurements show measurement information
show tag keys show tag key information
show field keys show field key information
insert <point> insert point into currently-used database
A full list of influxql commands can be found at:
https://docs.influxdata.com/influxdb/latest/query_language/spec/
Keybindings:
<CTRL+D> exit
<CTRL+L> clear screen
<UP ARROW> previous command
<DOWN ARROW> next command
<TAB> next suggestion
<SHIFT+TAB> previous suggestion`)
}
func (c *Client) settings() {
w := new(tabwriter.Writer)
w.Init(os.Stdout, 0, 1, 1, ' ', 0)
fmt.Fprintln(w, "Setting\tValue")
fmt.Fprintln(w, "--------\t--------")
fmt.Fprintf(w, "Database\t%s\n", c.Database)
fmt.Fprintf(w, "RetentionPolicy\t%s\n", c.RetentionPolicy)
fmt.Fprintf(w, "Pretty\t%v\n", c.Pretty)
fmt.Fprintf(w, "Scientific\t%v\n", c.Scientific)
fmt.Fprintf(w, "Format\t%s\n", c.Format)
fmt.Fprintf(w, "Precision\t%s\n", c.Precision)
fmt.Fprintln(w)
w.Flush()
}
func (c *Client) query(ctx context.Context, query string) (string, error) {
res := c.GetLegacyQuery(ctx).
Db(c.Database).
Q(query).
Rp(c.RetentionPolicy).
Accept("application/json")
// when precision is blank, the API uses RFC339 timestamps
if c.Precision != "rfc3339" && c.Precision != "" {
res = res.Epoch(c.Precision)
}
resBody, err := res.Execute()
if err != nil {
return "", err
}
return resBody, nil
}
func (c *Client) setFormat(args []string) {
// args[0] is "format"
if len(args) != 2 {
color.Red("Expected a format [csv, json, column, table]")
return
}
newFormat := FormatType(args[1])
switch newFormat {
case CsvFormat, JsonFormat, ColumnFormat, TableFormat:
c.Format = newFormat
default:
color.HiRed("Unimplemented format %q, keeping %s format.", newFormat, c.Format)
color.HiBlack("Choose a format from [csv, json, column, table]")
}
}
func (c *Client) setPrecision(args []string) {
// args[0] is "precision"
if len(args) != 2 {
color.Red("Expected a precision [rfc3339, ns, u, ms, s, m, or h]")
return
}
precision := args[1]
switch precision {
case "rfc3339", "ns", "u", "µ", "ms", "s", "m", "h":
c.Precision = precision
default:
color.HiRed("Unimplemented precision %q, keeping %s precision.", precision, c.Precision)
color.HiBlack("Choose a precision from [rfc3339, ns, u, ms, s, m, or h]")
}
}
func tagsEqual(prev, current map[string]string) bool {
return reflect.DeepEqual(prev, current)
}
func columnsEqual(prev, current []string) bool {
return reflect.DeepEqual(prev, current)
}
func headersEqual(prev, current api.InfluxqlJsonResponseSeries) bool {
if prev.Name != current.Name {
return false
}
return tagsEqual(prev.GetTags(), current.GetTags()) && columnsEqual(prev.GetColumns(), current.GetColumns())
}
// formatResults will behave differently if you are formatting for columns or csv
func (c *Client) formatResults(result api.InfluxqlJsonResponseResults, separator string, suppressHeaders bool) []string {
rows := []string{}
// Create a tabbed writer for each result as they won't always line up
for i, row := range result.GetSeries() {
// gather tags
tags := []string{}
for k, v := range row.GetTags() {
tags = append(tags, fmt.Sprintf("%s=%s", k, v))
sort.Strings(tags)
}
columnNames := []string{}
// Only put name/tags in a column if format is csv
if c.Format == CsvFormat {
if len(tags) > 0 {
columnNames = append([]string{"tags"}, columnNames...)
}
if row.GetName() != "" {
columnNames = append([]string{"name"}, columnNames...)
}
}
columnNames = append(columnNames, row.GetColumns()...)
// Output a line separator if we have more than one set or results and format is column
if i > 0 && c.Format == ColumnFormat && !suppressHeaders {
rows = append(rows, "")
}
// If we are column format, we break out the name/tag to separate lines
if c.Format == ColumnFormat && !suppressHeaders {
if row.GetName() != "" {
n := fmt.Sprintf("name: %s", row.GetName())
rows = append(rows, n)
}
if len(tags) > 0 {
t := fmt.Sprintf("tags: %s", (strings.Join(tags, ", ")))
rows = append(rows, t)
}
}
if !suppressHeaders {
rows = append(rows, strings.Join(columnNames, separator))
}
// if format is column, write dashes under each column
if c.Format == ColumnFormat && !suppressHeaders {
lines := []string{}
for _, columnName := range columnNames {
lines = append(lines, strings.Repeat("-", len(columnName)))
}
rows = append(rows, strings.Join(lines, separator))
}
for _, v := range row.GetValues() {
var values []string
if c.Format == CsvFormat {
if row.GetName() != "" {
values = append(values, row.GetName())
}
if len(tags) > 0 {
values = append(values, strings.Join(tags, ","))
}
}
for _, vv := range v {
values = append(values, interfaceToString(vv))
}
rows = append(rows, strings.Join(values, separator))
}
}
return rows
}
func interfaceToString(v interface{}) string {
switch t := v.(type) {
case nil:
return ""
case bool:
return fmt.Sprintf("%v", v)
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, uintptr:
return fmt.Sprintf("%d", t)
case float32, float64:
return fmt.Sprintf("%v", t)
default:
return fmt.Sprintf("%v", t)
}
}
func (c *Client) outputCsv(response api.InfluxqlJsonResponse) {
csvw := csv.NewWriter(os.Stdout)
var previousHeaders api.InfluxqlJsonResponseSeries
for _, result := range response.GetResults() {
if result.Error != nil {
color.Red("Error: %v", result.GetError())
continue
}
series := result.GetSeries()
suppressHeaders := len(series) > 0 && headersEqual(previousHeaders, series[0])
if !suppressHeaders && len(result.GetSeries()) > 0 {
previousHeaders = result.GetSeries()[0]
}
// Create a tabbed writer for each result as they won't always line up
rows := c.formatResults(result, "\t", suppressHeaders)
for _, r := range rows {
csvw.Write(strings.Split(r, "\t"))
}
}
csvw.Flush()
}
func (c *Client) outputJson(response api.InfluxqlJsonResponse) {
var data []byte
var err error
if c.Pretty {
data, err = json.MarshalIndent(response, "", " ")
} else {
data, err = json.Marshal(response)
}
if err != nil {
color.Red("Unable to parse json: %s\n", err)
return
}
fmt.Println(string(data))
}
func (c *Client) outputColumns(response api.InfluxqlJsonResponse) {
// Create a tabbed writer for each result as they won't always line up
writer := new(tabwriter.Writer)
writer.Init(os.Stdin, 0, 8, 1, ' ', 0)
var previousHeaders api.InfluxqlJsonResponseSeries
for i, result := range response.GetResults() {
if result.Error != nil {
color.Red("Error: %v", result.GetError())
continue
}
// Check to see if the headers are the same as the previous row. If so, suppress them in the output
suppressHeaders := len(result.GetSeries()) > 0 && headersEqual(previousHeaders, result.GetSeries()[0])
if !suppressHeaders && len(result.GetSeries()) > 0 {
previousHeaders = result.GetSeries()[0]
}
// If we are suppressing headers, don't output the extra line return. If we
// aren't suppressing headers, then we put out line returns between results
// (not before the first result, and not after the last result).
if !suppressHeaders && i > 0 {
fmt.Fprintln(writer, "")
}
rows := c.formatResults(result, "\t", suppressHeaders)
for _, r := range rows {
fmt.Fprintln(writer, r)
}
}
writer.Flush()
}
func (c *Client) outputTable(response api.InfluxqlJsonResponse) {
allResults := response.GetResults()
resIdx := 0
seriesIdx := 0
jumpToLastPage := false
outer:
for resIdx < len(allResults) {
res := allResults[resIdx]
if res.Error != nil {
color.Red("Error: %v", res.GetError())
resIdx++
}
allSeries := res.GetSeries()
for seriesIdx < len(allSeries) {
termenv.AltScreen()
defer termenv.ExitAltScreen()
series := allSeries[seriesIdx]
p := tea.NewProgram(NewModel(series,
jumpToLastPage,
series.GetName(),
series.GetTags(),
resIdx+1,
len(allResults),
seriesIdx+1,
len(allSeries),
c.Scientific),
)
model, err := p.StartReturningModel()
jumpToLastPage = false
if err != nil {
color.Red("Failed to display table")
seriesIdx++
} else {
if tableModel, ok := model.(Model); ok {
switch tableModel.endingStatus {
case goToNextTableStatus:
seriesIdx++
case goToPrevTableStatus:
jumpToLastPage = true
seriesIdx--
case goToPrevTableJumpFirstPageStatus:
seriesIdx--
case quitStatus:
break outer
}
}
}
fmt.Printf("\n")
if seriesIdx >= len(allSeries) {
resIdx++
seriesIdx = 0
break
} else if seriesIdx < 0 {
resIdx--
seriesIdx = len(allResults[resIdx].GetSeries()) - 1
break
}
}
if len(allSeries) == 0 {
color.HiBlack("No results")
resIdx++
}
}
}
func (c *Client) togglePretty() {
c.Pretty = !c.Pretty
color.HiBlack("Pretty: %v", c.Pretty)
}
func (c *Client) toggleScientific() {
c.Scientific = !c.Scientific
color.HiBlack("Scientific: %v", c.Scientific)
}
func (c *Client) use(args []string) {
if len(args) != 2 {
color.Red("wrong number of args for \"use <database>\"")
return
}
parsedDb, parsedRp, err := parseDatabaseAndRetentionPolicy([]byte(args[1]))
if err != nil {
color.Red("Unable to parse: %v", err)
return
}
dbs, err := c.GetDatabases(context.Background())
if err != nil {
color.Red("Unable to check databases: %v", err)
return
}
// discover if the parsedDb is a valid database
for _, db := range dbs {
if parsedDb == db {
exists := false
prevDb := c.Database
c.Database = parsedDb
rps, _ := c.getRetentionPolicies(context.Background())
// discover if the parsedRp is a valid retention policy
for _, rp := range rps {
switch parsedRp {
case "":
c.RetentionPolicy, _ = c.getDefaultRetentionPolicy(context.Background(), c.Database)
case rp:
c.RetentionPolicy = parsedRp
default:
continue
}
c.RetentionPolicies = rps
exists = true
c.Measurements, _ = c.GetMeasurements(context.Background())
break
}
if !exists {
color.Red("No such retention policy %q exists on %q", parsedRp, c.Database)
color.HiBlack("Available retention policies on %q:", parsedDb)
for _, rp := range rps {
color.HiBlack("- %q", rp)
}
c.Database = prevDb
return
}
c.Database = parsedDb
c.Databases = dbs
return
}
}
color.Red("No such database %q exists", parsedDb)
color.HiBlack("Available databases:")
for _, db := range dbs {
color.HiBlack("- %q", db)
}
}
// Get retention policies from the currently used database
func (c *Client) getRetentionPolicies(ctx context.Context) ([]string, error) {
singleSeries, err := c.getDataSingleSeries(ctx,
fmt.Sprintf("SHOW RETENTION POLICIES ON %q", c.Database))
if err != nil {
return []string{}, err
}
nameIndex := -1
for i, colName := range singleSeries.GetColumns() {
if colName == "name" {
nameIndex = i
}
}
if nameIndex == -1 {
return []string{}, fmt.Errorf("expected a \"name\" column for retention policies")
}
var retentionPolicies []string
for _, value := range singleSeries.GetValues() {
if name, ok := value[nameIndex].(string); ok {
retentionPolicies = append(retentionPolicies, name)
} else {
return []string{}, fmt.Errorf("expected \"name\" column to contain string value")
}
}
return retentionPolicies, nil
}
// Get the default retention policy for a given database
func (c *Client) getDefaultRetentionPolicy(ctx context.Context, db string) (string, error) {
singleSeries, err := c.getDataSingleSeries(ctx,
fmt.Sprintf("SHOW RETENTION POLICIES ON %q", db))
if err != nil {
return "", err
}
nameIndex := -1
defaultIndex := -1
for i, colName := range singleSeries.GetColumns() {
if colName == "default" {
defaultIndex = i
} else if colName == "name" {
nameIndex = i
}
}
if nameIndex == -1 {
return "", fmt.Errorf("expected a \"name\" column for retention policies")
}
if defaultIndex == -1 {
return "", fmt.Errorf("expected a \"default\" column for retention policies")
}
for _, value := range singleSeries.GetValues() {
isDefault := value[defaultIndex]
if isDefault, ok := isDefault.(bool); ok {
if isDefault {
if name, ok := value[nameIndex].(string); ok {
return name, nil
} else {
return "", fmt.Errorf("expected \"name\" column to contain string value")
}
}
} else {
return "", fmt.Errorf("expected \"default\" column to contain boolean value")
}
}
return "", fmt.Errorf("no default retention policy")
}
// Get list of database names
func (c *Client) GetDatabases(ctx context.Context) ([]string, error) {
singleSeries, err := c.getDataSingleSeries(ctx, "SHOW DATABASES")
if err != nil {
return []string{}, err
}
values := singleSeries.GetValues()
var databases []string
for _, value := range values {
for _, db := range value {
if db, ok := db.(string); ok {
databases = append(databases, db)
} else {
return []string{}, fmt.Errorf("expected database names to be strings")
}
}
}
return databases, nil
}
// Get list of measurements for currently used database and retention policy
func (c *Client) GetMeasurements(ctx context.Context) ([]string, error) {
singleSeries, err := c.getDataSingleSeries(ctx, "SHOW MEASUREMENTS")
if err != nil {
return []string{}, err
}
var measures []string
for _, measureArr := range singleSeries.GetValues() {
if len(measureArr) != 1 {
return []string{}, fmt.Errorf("expected a single measurement name in each array in values array")
}
if measure, ok := measureArr[0].(string); ok {
measures = append(measures, measure)
} else {
return []string{}, fmt.Errorf("expected measurement name to be a string")
}
}
return measures, nil
}
// Helper function to execute query & parse response, expecting a single series
func (c *Client) getDataSingleSeries(ctx context.Context, query string) (*api.InfluxqlJsonResponseSeries, error) {
res := c.GetLegacyQuery(ctx).
Db(c.Database).
Q(query).
Rp(c.RetentionPolicy).
Accept("application/json")
// when c.Precision is empty, the API returns timestamps in RFC3339 format
if c.Precision != "rfc3339" && c.Precision != "" {
res.Epoch(c.Precision)
}
resBody, err := res.Execute()
if err != nil {
return nil, err
}
var responses api.InfluxqlJsonResponse
if err := json.Unmarshal([]byte(resBody), &responses); err != nil {
return nil, err
}
results := responses.GetResults()
if len(results) != 1 {
return nil, fmt.Errorf("expected a single result from single query")
}
result := results[0]
series := result.GetSeries()
if len(series) != 1 {
return nil, fmt.Errorf("expected a single series from single result")
}
return &series[0], nil
}
// Parse database and retention policy from byte slice.
// Expects format like "db"."rp", db.rp, db, "db".
func parseDatabaseAndRetentionPolicy(stmt []byte) (string, string, error) {
var db, rp []byte
var quoted bool
var seperatorCount int
stmt = bytes.TrimSpace(stmt)
for _, b := range stmt {
if b == '"' {
quoted = !quoted
continue
}
if b == '.' && !quoted {
seperatorCount++
if seperatorCount > 1 {
return "", "", fmt.Errorf("unable to parse database and retention policy from %s", string(stmt))
}
continue
}
if seperatorCount == 1 {
rp = append(rp, b)
continue
}
db = append(db, b)
}
return string(db), string(rp), nil
}