feat: add formatted output to influx query (#87)

This commit is contained in:
Daniel Moran 2021-05-13 10:54:45 -04:00 committed by GitHub
parent d22fb717c6
commit 6a9f17d100
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1890 additions and 3 deletions

View File

@ -76,9 +76,11 @@ func newQueryCmd() *cli.Command {
Profilers: profilers,
}
printer := query.RawResultPrinter
if !ctx.Bool("raw") {
return errors.New("--raw or -r must be specified for now")
var printer query.ResultPrinter
if ctx.Bool("raw") {
printer = query.RawResultPrinter
} else {
printer = query.NewFormattingPrinter()
}
client := query.Client{

View File

@ -0,0 +1,269 @@
package query
import (
"encoding/base64"
"io"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/influx-cli/v2/pkg/fluxcsv"
)
// formattingPrinter formats query results into a structured table before printing.
type formattingPrinter struct {
widths []int
maxWidth int
newWidths []int
pad []byte
dash []byte
// fmtBuf is used to format values
fmtBuf [64]byte
cols []fluxcsv.FluxColumn
lastColIdx int
}
func NewFormattingPrinter() *formattingPrinter {
return &formattingPrinter{}
}
func (f *formattingPrinter) PrintQueryResults(resultStream io.ReadCloser, out io.Writer) error {
res := fluxcsv.NewQueryTableResult(resultStream)
defer res.Close()
return f.write(res, out)
}
const fixedWidthTimeFmt = "2006-01-02T15:04:05.000000000Z"
var eol = []byte{'\n'}
type writeHelper struct {
w io.Writer
err error
}
func (w *writeHelper) write(data []byte) {
if w.err != nil {
return
}
_, err := w.w.Write(data)
w.err = err
}
var minWidthsByType = map[fluxcsv.ColType]int{
fluxcsv.BoolDatatype: 12,
fluxcsv.LongDatatype: 26,
fluxcsv.ULongDatatype: 27,
fluxcsv.DoubleDatatype: 28,
fluxcsv.StringDatatype: 22,
fluxcsv.TimeDatatypeRFC: len(fixedWidthTimeFmt),
fluxcsv.TimeDatatypeRFCNano: len(fixedWidthTimeFmt),
fluxcsv.Base64BinaryDataType: 22,
fluxcsv.DurationDatatype: 22,
}
// write writes the formatted table data to w.
func (f *formattingPrinter) write(res *fluxcsv.QueryTableResult, out io.Writer) error {
w := &writeHelper{w: out}
r := 0
for res.Next() {
record := res.Record()
if res.AnnotationsChanged() {
// Reset and sort cols
f.cols = res.Metadata().Columns()
f.lastColIdx = len(f.cols) - 1
groupKeys := make(map[string]int, len(res.Metadata().GroupKeyCols()))
for i, k := range res.Metadata().GroupKeyCols() {
groupKeys[k] = i
}
sort.Slice(f.cols, func(i, j int) bool {
iCol, jCol := f.cols[i], f.cols[j]
iGroupIdx, iIsGroup := groupKeys[iCol.Name()]
jGroupIdx, jIsGroup := groupKeys[jCol.Name()]
if iIsGroup && jIsGroup {
return iGroupIdx < jGroupIdx
}
if !iIsGroup && !jIsGroup {
return i < j
}
return iIsGroup && !jIsGroup
})
// Compute header widths
f.widths = make([]int, len(f.cols))
for i, c := range f.cols {
// Column header is "<label>:<type>"
l := len(c.Name()) + len(display(c.DataType())) + 1
min := minWidthsByType[c.DataType()]
if min > l {
l = min
}
f.widths[i] = l
if l > f.maxWidth {
f.maxWidth = l
}
}
}
if res.ResultChanged() {
w.write([]byte("Result: "))
w.write([]byte(record.Result()))
w.write(eol)
}
if res.TableIdChanged() || res.AnnotationsChanged() {
w.write([]byte("Table: keys: ["))
labels := make([]string, len(res.Metadata().GroupKeyCols()))
for i, c := range res.Metadata().GroupKeyCols() {
labels[i] = c
}
w.write([]byte(strings.Join(labels, ", ")))
w.write([]byte("]"))
w.write(eol)
// Check err and return early
if w.err != nil {
return w.err
}
r = 0
}
if r == 0 {
for i, c := range f.cols {
buf := f.valueBuf(c.DataType(), record.ValueByKey(c.Name()))
l := len(buf)
if l > f.widths[i] {
f.widths[i] = l
}
if l > f.maxWidth {
f.maxWidth = l
}
}
f.makePaddingBuffers()
f.writeHeader(w)
f.writeHeaderSeparator(w)
f.newWidths = make([]int, len(f.widths))
copy(f.newWidths, f.widths)
}
for i, c := range f.cols {
buf := f.valueBuf(c.DataType(), record.ValueByKey(c.Name()))
l := len(buf)
padding := f.widths[i] - l
if padding >= 0 {
w.write(f.pad[:padding])
w.write(buf)
} else {
//TODO make unicode friendly
w.write(buf[:f.widths[i]-3])
w.write([]byte{'.', '.', '.'})
}
if i != f.lastColIdx {
w.write(f.pad[:2])
}
if l > f.newWidths[i] {
f.newWidths[i] = l
}
if l > f.maxWidth {
f.maxWidth = l
}
}
w.write(eol)
r++
}
return w.err
}
func (f *formattingPrinter) makePaddingBuffers() {
if len(f.pad) != f.maxWidth {
f.pad = make([]byte, f.maxWidth)
for i := range f.pad {
f.pad[i] = ' '
}
}
if len(f.dash) != f.maxWidth {
f.dash = make([]byte, f.maxWidth)
for i := range f.dash {
f.dash[i] = '-'
}
}
}
func (f *formattingPrinter) writeHeader(w *writeHelper) {
for i, c := range f.cols {
buf := append(append([]byte(c.Name()), ':'), []byte(display(c.DataType()))...)
w.write(f.pad[:f.widths[i]-len(buf)])
w.write(buf)
if i != f.lastColIdx {
w.write(f.pad[:2])
}
}
w.write(eol)
}
func (f *formattingPrinter) writeHeaderSeparator(w *writeHelper) {
for i := range f.cols {
w.write(f.dash[:f.widths[i]])
if i != f.lastColIdx {
w.write(f.pad[:2])
}
}
w.write(eol)
}
func display(t fluxcsv.ColType) string {
switch t {
case fluxcsv.StringDatatype:
return "string"
case fluxcsv.DoubleDatatype:
return "float"
case fluxcsv.BoolDatatype:
return "boolean"
case fluxcsv.LongDatatype:
return "int"
case fluxcsv.ULongDatatype:
return "uint"
case fluxcsv.TimeDatatypeRFC:
return "time"
case fluxcsv.TimeDatatypeRFCNano:
return "time"
case fluxcsv.DurationDatatype:
return "duration"
case fluxcsv.Base64BinaryDataType:
return "bytes"
default:
panic("shouldn't happen")
}
}
func (f *formattingPrinter) valueBuf(typ fluxcsv.ColType, v interface{}) []byte {
var buf []byte
if v == nil {
return buf
}
switch typ {
case fluxcsv.StringDatatype:
buf = []byte(v.(string))
case fluxcsv.DoubleDatatype:
buf = strconv.AppendFloat(f.fmtBuf[0:0], v.(float64), 'f', -1, 64)
case fluxcsv.BoolDatatype:
buf = strconv.AppendBool(f.fmtBuf[0:0], v.(bool))
case fluxcsv.LongDatatype:
buf = strconv.AppendInt(f.fmtBuf[0:0], v.(int64), 10)
case fluxcsv.ULongDatatype:
buf = strconv.AppendUint(f.fmtBuf[0:0], v.(uint64), 10)
case fluxcsv.TimeDatatypeRFC:
fallthrough
case fluxcsv.TimeDatatypeRFCNano:
buf = []byte(v.(time.Time).Format(fixedWidthTimeFmt))
case fluxcsv.DurationDatatype:
buf = []byte(v.(time.Duration).String())
case fluxcsv.Base64BinaryDataType:
base64.StdEncoding.Encode(buf, v.([]byte))
}
return buf
}

View File

@ -0,0 +1,150 @@
package query_test
import (
"bytes"
"io/ioutil"
"strings"
"testing"
"github.com/influxdata/influx-cli/v2/internal/cmd/query"
"github.com/stretchr/testify/require"
)
func TestFormatterPrinter_PrintQueryResults(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
in string
expected string
}{
{
name: "empty",
in: "",
expected: "",
},
{
name: "single table",
in: `#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
#default,_result,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,bar
,,0,1921-05-08T15:46:22.507379Z,2021-05-08T14:46:22.507379Z,2021-05-04T18:29:52.764702Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:46:22.507379Z,2021-05-08T14:46:22.507379Z,2021-05-04T19:30:59.67555Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:46:22.507379Z,2021-05-08T14:46:22.507379Z,2021-05-04T19:31:01.876079Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:46:22.507379Z,2021-05-08T14:46:22.507379Z,2021-05-04T19:31:02.499461Z,12345,qux,foo,"""baz"""
`,
expected: `Result: _result
Table: keys: [_start, _stop, _field, _measurement, bar]
_start:time _stop:time _field:string _measurement:string bar:string _time:time _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ---------------------- ------------------------------ ----------------------------
1921-05-08T15:46:22.507379000Z 2021-05-08T14:46:22.507379000Z qux foo "baz" 2021-05-04T18:29:52.764702000Z 12345
1921-05-08T15:46:22.507379000Z 2021-05-08T14:46:22.507379000Z qux foo "baz" 2021-05-04T19:30:59.675550000Z 12345
1921-05-08T15:46:22.507379000Z 2021-05-08T14:46:22.507379000Z qux foo "baz" 2021-05-04T19:31:01.876079000Z 12345
1921-05-08T15:46:22.507379000Z 2021-05-08T14:46:22.507379000Z qux foo "baz" 2021-05-04T19:31:02.499461000Z 12345
`,
},
{
name: "nil values",
in: `#group,false,false,false,false,true,false,false
#datatype,string,long,string,string,string,string,long
#default,_result,,,,,,
,result,table,name,id,organizationID,retentionPolicy,retentionPeriod
,,0,_monitoring,1aa1e247d56a143f,b6b9cb281ae9583d,,604800000000000
,,0,_tasks,e03361698294077c,b6b9cb281ae9583d,,259200000000000
,,0,dan,57de01a0f4825d94,b6b9cb281ae9583d,,259200000000000
`,
expected: `Result: _result
Table: keys: [organizationID]
organizationID:string name:string id:string retentionPolicy:string retentionPeriod:int
---------------------- ---------------------- ---------------------- ---------------------- --------------------------
b6b9cb281ae9583d _monitoring 1aa1e247d56a143f 604800000000000
b6b9cb281ae9583d _tasks e03361698294077c 259200000000000
b6b9cb281ae9583d dan 57de01a0f4825d94 259200000000000
`,
},
{
name: "multi table",
in: `#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
#default,_result,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,bar
,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T18:29:52.764702Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T19:30:59.67555Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T19:31:01.876079Z,12345,qux,foo,"""baz"""
,,0,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-04T19:31:02.499461Z,12345,qux,foo,"""baz"""
#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,bar,is_foo
,,1,1921-05-08T15:42:58.218436Z,2021-05-08T15:42:58.218436Z,2021-05-08T15:42:19.567667Z,12345,qux,foo,"""baz""",t
#group,false,false,true,false,false,false,false,false,false,false
#datatype,string,long,string,string,string,long,long,long,long,double
#default,_profiler,,,,,,,,,
,result,table,_measurement,Type,Label,Count,MinDuration,MaxDuration,DurationSum,MeanDuration
,,0,profiler/operator,*influxdb.readFilterSource,ReadRange2,1,367331,367331,367331,367331
`,
expected: `Result: _result
Table: keys: [_start, _stop, _field, _measurement, bar]
_start:time _stop:time _field:string _measurement:string bar:string _time:time _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ---------------------- ------------------------------ ----------------------------
1921-05-08T15:42:58.218436000Z 2021-05-08T15:42:58.218436000Z qux foo "baz" 2021-05-04T18:29:52.764702000Z 12345
1921-05-08T15:42:58.218436000Z 2021-05-08T15:42:58.218436000Z qux foo "baz" 2021-05-04T19:30:59.675550000Z 12345
1921-05-08T15:42:58.218436000Z 2021-05-08T15:42:58.218436000Z qux foo "baz" 2021-05-04T19:31:01.876079000Z 12345
1921-05-08T15:42:58.218436000Z 2021-05-08T15:42:58.218436000Z qux foo "baz" 2021-05-04T19:31:02.499461000Z 12345
Table: keys: [_start, _stop, _field, _measurement, bar, is_foo]
_start:time _stop:time _field:string _measurement:string bar:string is_foo:string _time:time _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ---------------------- ---------------------- ------------------------------ ----------------------------
1921-05-08T15:42:58.218436000Z 2021-05-08T15:42:58.218436000Z qux foo "baz" t 2021-05-08T15:42:19.567667000Z 12345
Result: _profiler
Table: keys: [_measurement]
_measurement:string Type:string Label:string Count:int MinDuration:int MaxDuration:int DurationSum:int MeanDuration:float
---------------------- -------------------------- ---------------------- -------------------------- -------------------------- -------------------------- -------------------------- ----------------------------
profiler/operator *influxdb.readFilterSource ReadRange2 1 367331 367331 367331 367331
`,
},
{
name: "multi table single result",
in: `#group,false,false,true,true,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
#default,_result,,,,,,,
,result,table,_start,_stop,_value,_field,_measurement,bar
,,0,2021-05-12T20:11:00Z,2021-05-12T20:12:00Z,518490,qux,foo,"""baz"""
,,1,2021-05-12T20:12:00Z,2021-05-12T20:13:00Z,703665,qux,foo,"""baz"""
,,2,2021-05-12T20:13:00Z,2021-05-12T20:14:00Z,703665,qux,foo,"""baz"""
,,3,2021-05-12T20:14:00Z,2021-05-12T20:15:00Z,444420,qux,foo,"""baz"""`,
expected: `Result: _result
Table: keys: [_start, _stop, _field, _measurement, bar]
_start:time _stop:time _field:string _measurement:string bar:string _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ---------------------- ----------------------------
2021-05-12T20:11:00.000000000Z 2021-05-12T20:12:00.000000000Z qux foo "baz" 518490
Table: keys: [_start, _stop, _field, _measurement, bar]
_start:time _stop:time _field:string _measurement:string bar:string _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ---------------------- ----------------------------
2021-05-12T20:12:00.000000000Z 2021-05-12T20:13:00.000000000Z qux foo "baz" 703665
Table: keys: [_start, _stop, _field, _measurement, bar]
_start:time _stop:time _field:string _measurement:string bar:string _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ---------------------- ----------------------------
2021-05-12T20:13:00.000000000Z 2021-05-12T20:14:00.000000000Z qux foo "baz" 703665
Table: keys: [_start, _stop, _field, _measurement, bar]
_start:time _stop:time _field:string _measurement:string bar:string _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ---------------------- ----------------------------
2021-05-12T20:14:00.000000000Z 2021-05-12T20:15:00.000000000Z qux foo "baz" 444420
`,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
in := ioutil.NopCloser(strings.NewReader(tc.in))
out := bytes.Buffer{}
require.NoError(t, query.NewFormattingPrinter().PrintQueryResults(in, &out))
require.Equal(t, tc.expected, out.String())
})
}
}

297
pkg/fluxcsv/query_result.go Normal file
View File

@ -0,0 +1,297 @@
package fluxcsv
import (
"encoding/base64"
"encoding/csv"
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/influxdata/influx-cli/v2/internal/duration"
)
type ColType int
const (
StringDatatype ColType = iota
DoubleDatatype
BoolDatatype
LongDatatype
ULongDatatype
DurationDatatype
Base64BinaryDataType
TimeDatatypeRFC
TimeDatatypeRFCNano
InvalidDatatype
)
func ParseType(s string) (ColType, error) {
switch s {
case "string":
return StringDatatype, nil
case "double":
return DoubleDatatype, nil
case "boolean":
return BoolDatatype, nil
case "long":
return LongDatatype, nil
case "unsignedLong":
return ULongDatatype, nil
case "duration":
return DurationDatatype, nil
case "base64Binary":
return Base64BinaryDataType, nil
case "dateTime:RFC3339":
return TimeDatatypeRFC, nil
case "dateTime:RFC3339Nano":
return TimeDatatypeRFCNano, nil
default:
return InvalidDatatype, fmt.Errorf("unknown data type %s", s)
}
}
// QueryTableResult parses streamed flux query response into structures representing flux table parts
// Walking though the result is done by repeatedly calling Next() until returns false.
// Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
// Data are acquired by Record() method.
// Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
type QueryTableResult struct {
io.Closer
csvReader *csv.Reader
resultChanged bool
tableIdChanged bool
annotationsChanged bool
record *FluxRecord
baseColumns []*FluxColumn
metadata *FluxTableMetadata
err error
}
func NewQueryTableResult(rawResponse io.ReadCloser) *QueryTableResult {
csvReader := csv.NewReader(rawResponse)
csvReader.FieldsPerRecord = -1
return &QueryTableResult{Closer: rawResponse, csvReader: csvReader}
}
// ResultChanged returns true if the last call of Next() found a new query result
func (q *QueryTableResult) ResultChanged() bool {
return q.resultChanged
}
// TableIdChanged returns true if the last call of Next() found a new table within the query result
func (q *QueryTableResult) TableIdChanged() bool {
return q.tableIdChanged
}
// AnnotationsChanged returns true if last call of Next() found new CSV annotations
func (q *QueryTableResult) AnnotationsChanged() bool {
return q.annotationsChanged
}
// Record returns last parsed flux table data row
// Use Record methods to access value and row properties
func (q *QueryTableResult) Record() *FluxRecord {
return q.record
}
// Metadata returns table-level info for last parsed flux table data row
func (q *QueryTableResult) Metadata() *FluxTableMetadata {
return q.metadata
}
type parsingState int
const (
parsingStateNormal parsingState = iota
parsingStateAnnotation
parsingStateNameRow
parsingStateError
)
// Next advances to next row in query result.
// During the first time it is called, Next creates also table metadata
// Actual parsed row is available through Record() function
// Returns false in case of end or an error, otherwise true
func (q *QueryTableResult) Next() bool {
var row []string
// set closing query in case of preliminary return
closer := func() {
if err := q.Close(); err != nil {
message := err.Error()
if q.err != nil {
message = fmt.Sprintf("%s,%s", message, q.err.Error())
}
q.err = errors.New(message)
}
}
defer func() {
closer()
}()
parsingState := parsingStateNormal
q.annotationsChanged = false
dataTypeAnnotationFound := false
readRow:
row, q.err = q.csvReader.Read()
if q.err == io.EOF {
q.err = nil
return false
}
if q.err != nil {
return false
}
if len(row) <= 1 {
goto readRow
}
if len(row[0]) > 0 && row[0][0] == '#' {
if parsingState == parsingStateNormal {
q.annotationsChanged = true
q.baseColumns = nil
for range row[1:] {
q.baseColumns = append(q.baseColumns, NewFluxColumn())
}
parsingState = parsingStateAnnotation
}
}
expectedNcol := len(q.baseColumns)
if expectedNcol == 0 {
q.err = errors.New("parsing error, annotations not found")
return false
}
ncol := len(row) - 1
if ncol != expectedNcol {
q.err = fmt.Errorf("parsing error, row has different number of columns than the table: %d vs %d", ncol, expectedNcol)
return false
}
switch row[0] {
case "":
switch parsingState {
case parsingStateAnnotation:
if !dataTypeAnnotationFound {
q.err = errors.New("parsing error, datatype annotation not found")
return false
}
parsingState = parsingStateNameRow
fallthrough
case parsingStateNameRow:
if row[1] == "error" {
parsingState = parsingStateError
} else {
for i, n := range row[1:] {
q.baseColumns[i].SetName(n)
}
q.metadata = NewFluxTableMetadataFull(q.baseColumns...)
parsingState = parsingStateNormal
}
goto readRow
case parsingStateError:
var message string
if len(row) > 1 && len(row[1]) > 0 {
message = row[1]
} else {
message = "unknown query error"
}
reference := ""
if len(row) > 2 && len(row[2]) > 0 {
reference = fmt.Sprintf(",%s", row[2])
}
q.err = fmt.Errorf("%s%s", message, reference)
return false
}
values := make(map[string]interface{})
for i, v := range row[1:] {
values[q.baseColumns[i].Name()], q.err = toValue(
stringTernary(v, q.baseColumns[i].DefaultValue()), q.baseColumns[i].DataType(), q.baseColumns[i].Name())
if q.err != nil {
return false
}
}
var prevRes string
var prevId int64
if q.record != nil {
prevRes, prevId = q.record.Result(), q.record.TableId()
}
q.record, q.err = NewFluxRecord(q.metadata, values)
q.resultChanged = q.record.Result() != prevRes
q.tableIdChanged = q.record.TableId() != prevId
if q.err != nil {
return false
}
case "#datatype":
dataTypeAnnotationFound = true
for i, d := range row[1:] {
t, err := ParseType(d)
if err != nil {
q.err = err
return false
}
q.baseColumns[i].SetDataType(t)
}
goto readRow
case "#group":
for i, g := range row[1:] {
q.baseColumns[i].SetGroup(g == "true")
}
goto readRow
case "#default":
for i, c := range row[1:] {
q.baseColumns[i].SetDefaultValue(c)
}
goto readRow
}
// don't close query
closer = func() {}
return true
}
// Err returns an error raised during flux query response parsing
func (q *QueryTableResult) Err() error {
return q.err
}
// stringTernary returns a if not empty, otherwise b
func stringTernary(a, b string) string {
if a == "" {
return b
}
return a
}
// toValue converts s into type by t
func toValue(s string, t ColType, name string) (interface{}, error) {
if s == "" {
return nil, nil
}
switch t {
case StringDatatype:
return s, nil
case TimeDatatypeRFC:
return time.Parse(time.RFC3339, s)
case TimeDatatypeRFCNano:
return time.Parse(time.RFC3339Nano, s)
case DurationDatatype:
return duration.RawDurationToTimeDuration(s)
case DoubleDatatype:
return strconv.ParseFloat(s, 64)
case BoolDatatype:
if strings.ToLower(s) == "false" {
return false, nil
}
return true, nil
case LongDatatype:
return strconv.ParseInt(s, 10, 64)
case ULongDatatype:
return strconv.ParseUint(s, 10, 64)
case Base64BinaryDataType:
return base64.StdEncoding.DecodeString(s)
default:
return nil, fmt.Errorf("%s has unknown data type %v", name, t)
}
}

View File

@ -0,0 +1,784 @@
package fluxcsv_test
import (
"fmt"
"io/ioutil"
"strings"
"testing"
"time"
"github.com/influxdata/influx-cli/v2/pkg/fluxcsv"
"github.com/stretchr/testify/require"
)
func TestQueryCVSResultSingleTable(t *testing.T) {
csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#group,false,false,true,true,false,false,true,true,true,true
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
`
expectedTable := fluxcsv.NewFluxTableMetadataFull(
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "_result", "result", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "", "table", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_start", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_stop", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_time", false),
fluxcsv.NewFluxColumnFull(fluxcsv.DoubleDatatype, "", "_value", false),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_field", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_measurement", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "a", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "b", true),
)
expectedRecord1, err := fluxcsv.NewFluxRecord(
expectedTable,
map[string]interface{}{
"result": "_result",
"table": int64(0),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T10:34:08.135814545Z"),
"_value": 1.4,
"_field": "f",
"_measurement": "test",
"a": "1",
"b": "adsfasdf",
},
)
require.NoError(t, err)
expectedRecord2, err := fluxcsv.NewFluxRecord(
expectedTable,
map[string]interface{}{
"result": "_result",
"table": int64(0),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T22:08:44.850214724Z"),
"_value": 6.6,
"_field": "f",
"_measurement": "test",
"a": "1",
"b": "adsfasdf",
},
)
require.NoError(t, err)
reader := strings.NewReader(csvTable)
queryResult := fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.True(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord1, queryResult.Record())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.False(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord2, queryResult.Record())
require.False(t, queryResult.Next())
require.Nil(t, queryResult.Err())
}
func TestQueryCVSResultMultiTables(t *testing.T) {
csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#group,false,false,true,true,false,false,true,true,true,true
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
#group,false,false,true,true,false,false,true,true,true,true
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,4,i,test,1,adsfasdf
,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,-1,i,test,1,adsfasdf
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,boolean,string,string,string,string
#group,false,false,true,true,false,false,true,true,true,true
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,2,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.62797864Z,false,f,test,0,adsfasdf
,,2,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.969100374Z,true,f,test,0,adsfasdf
#datatype,string,long,dateTime:RFC3339Nano,dateTime:RFC3339Nano,dateTime:RFC3339Nano,unsignedLong,string,string,string,string
#group,false,false,true,true,false,false,true,true,true,true
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,3,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.62797864Z,0,i,test,0,adsfasdf
,,3,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.969100374Z,2,i,test,0,adsfasdf
`
expectedTable1 := fluxcsv.NewFluxTableMetadataFull(
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "_result", "result", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "", "table", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_start", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_stop", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_time", false),
fluxcsv.NewFluxColumnFull(fluxcsv.DoubleDatatype, "", "_value", false),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_field", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_measurement", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "a", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "b", true),
)
expectedRecord11, err := fluxcsv.NewFluxRecord(
expectedTable1,
map[string]interface{}{
"result": "_result",
"table": int64(0),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T10:34:08.135814545Z"),
"_value": 1.4,
"_field": "f",
"_measurement": "test",
"a": "1",
"b": "adsfasdf",
},
)
require.NoError(t, err)
expectedRecord12, err := fluxcsv.NewFluxRecord(
expectedTable1,
map[string]interface{}{
"result": "_result",
"table": int64(0),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T22:08:44.850214724Z"),
"_value": 6.6,
"_field": "f",
"_measurement": "test",
"a": "1",
"b": "adsfasdf",
},
)
require.NoError(t, err)
expectedTable2 := fluxcsv.NewFluxTableMetadataFull(
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "_result", "result", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "", "table", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_start", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_stop", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_time", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "", "_value", false),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_field", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_measurement", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "a", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "b", true),
)
expectedRecord21, err := fluxcsv.NewFluxRecord(
expectedTable2,
map[string]interface{}{
"result": "_result",
"table": int64(1),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T10:34:08.135814545Z"),
"_value": int64(4),
"_field": "i",
"_measurement": "test",
"a": "1",
"b": "adsfasdf",
},
)
require.NoError(t, err)
expectedRecord22, err := fluxcsv.NewFluxRecord(
expectedTable2,
map[string]interface{}{
"result": "_result",
"table": int64(1),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T22:08:44.850214724Z"),
"_value": int64(-1),
"_field": "i",
"_measurement": "test",
"a": "1",
"b": "adsfasdf",
},
)
require.NoError(t, err)
expectedTable3 := fluxcsv.NewFluxTableMetadataFull(
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "_result", "result", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "", "table", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_start", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_stop", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_time", false),
fluxcsv.NewFluxColumnFull(fluxcsv.BoolDatatype, "", "_value", false),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_field", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_measurement", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "a", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "b", true),
)
expectedRecord31, err := fluxcsv.NewFluxRecord(
expectedTable3,
map[string]interface{}{
"result": "_result",
"table": int64(2),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T22:08:44.62797864Z"),
"_value": false,
"_field": "f",
"_measurement": "test",
"a": "0",
"b": "adsfasdf",
},
)
require.NoError(t, err)
expectedRecord32, err := fluxcsv.NewFluxRecord(
expectedTable3,
map[string]interface{}{
"result": "_result",
"table": int64(2),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T22:08:44.969100374Z"),
"_value": true,
"_field": "f",
"_measurement": "test",
"a": "0",
"b": "adsfasdf",
},
)
require.NoError(t, err)
expectedTable4 := fluxcsv.NewFluxTableMetadataFull(
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "_result", "result", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "", "table", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFCNano, "", "_start", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFCNano, "", "_stop", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFCNano, "", "_time", false),
fluxcsv.NewFluxColumnFull(fluxcsv.ULongDatatype, "", "_value", false),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_field", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_measurement", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "a", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "b", true),
)
expectedRecord41, err := fluxcsv.NewFluxRecord(
expectedTable4,
map[string]interface{}{
"result": "_result",
"table": int64(3),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T22:08:44.62797864Z"),
"_value": uint64(0),
"_field": "i",
"_measurement": "test",
"a": "0",
"b": "adsfasdf",
},
)
require.NoError(t, err)
expectedRecord42, err := fluxcsv.NewFluxRecord(
expectedTable4,
map[string]interface{}{
"result": "_result",
"table": int64(3),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T22:08:44.969100374Z"),
"_value": uint64(2),
"_field": "i",
"_measurement": "test",
"a": "0",
"b": "adsfasdf",
},
)
require.NoError(t, err)
reader := strings.NewReader(csvTable)
queryResult := fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord11, queryResult.Record())
require.True(t, queryResult.AnnotationsChanged())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.False(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord12, queryResult.Record())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.True(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord21, queryResult.Record())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.False(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord22, queryResult.Record())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err(), queryResult.Err())
require.True(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord31, queryResult.Record())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.False(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord32, queryResult.Record())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.True(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord41, queryResult.Record())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.False(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord42, queryResult.Record())
require.False(t, queryResult.Next())
require.Nil(t, queryResult.Err())
}
func TestQueryCVSResultSingleTableMultiColumnsNoValue(t *testing.T) {
csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,duration,base64Binary,dateTime:RFC3339
#group,false,false,true,true,false,true,true,false,false,false
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z
,,1,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z
`
expectedTable := fluxcsv.NewFluxTableMetadataFull(
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "_result", "result", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "", "table", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_start", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_stop", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_time", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "", "deviceId", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "sensor", true),
fluxcsv.NewFluxColumnFull(fluxcsv.DurationDatatype, "", "elapsed", false),
fluxcsv.NewFluxColumnFull(fluxcsv.Base64BinaryDataType, "", "note", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "start", false),
)
expectedRecord1, err := fluxcsv.NewFluxRecord(
expectedTable,
map[string]interface{}{
"result": "_result",
"table": int64(0),
"_start": mustParseTime(t, "2020-04-28T12:36:50.990018157Z"),
"_stop": mustParseTime(t, "2020-04-28T12:51:50.990018157Z"),
"_time": mustParseTime(t, "2020-04-28T12:38:11.480545389Z"),
"deviceId": int64(1467463),
"sensor": "BME280",
"elapsed": time.Minute + time.Second,
"note": []byte("datainbase64"),
"start": time.Date(2020, 4, 27, 0, 0, 0, 0, time.UTC),
},
)
require.NoError(t, err)
expectedRecord2, err := fluxcsv.NewFluxRecord(
expectedTable,
map[string]interface{}{
"result": "_result",
"table": int64(1),
"_start": mustParseTime(t, "2020-04-28T12:36:50.990018157Z"),
"_stop": mustParseTime(t, "2020-04-28T12:51:50.990018157Z"),
"_time": mustParseTime(t, "2020-04-28T12:39:36.330153686Z"),
"deviceId": int64(1467463),
"sensor": "BME280",
"elapsed": time.Hour + 20*time.Minute + 30*time.Second + 132450000*time.Nanosecond,
"note": []byte("xxxxxccccccddddd"),
"start": time.Date(2020, 4, 28, 0, 0, 0, 0, time.UTC),
},
)
require.NoError(t, err)
reader := strings.NewReader(csvTable)
queryResult := fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.True(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord1, queryResult.Record())
require.Nil(t, queryResult.Record().Value())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.False(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord2, queryResult.Record())
require.False(t, queryResult.Next())
require.Nil(t, queryResult.Err())
}
func TestErrorInRow(t *testing.T) {
csvRowsError := []string{
`#datatype,string,string`,
`#group,true,true`,
`#default,,`,
`,error,reference`,
`,failed to create physical plan: invalid time bounds from procedure from: bounds contain zero time,897`}
csvTable := makeCSVstring(csvRowsError)
reader := strings.NewReader(csvTable)
queryResult := fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.False(t, queryResult.Next())
require.NotNil(t, queryResult.Err())
require.Equal(t, "failed to create physical plan: invalid time bounds from procedure from: bounds contain zero time,897", queryResult.Err().Error())
csvRowsErrorNoReference := []string{
`#datatype,string,string`,
`#group,true,true`,
`#default,,`,
`,error,reference`,
`,failed to create physical plan: invalid time bounds from procedure from: bounds contain zero time,`}
csvTable = makeCSVstring(csvRowsErrorNoReference)
reader = strings.NewReader(csvTable)
queryResult = fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.False(t, queryResult.Next())
require.NotNil(t, queryResult.Err())
require.Equal(t, "failed to create physical plan: invalid time bounds from procedure from: bounds contain zero time", queryResult.Err().Error())
csvRowsErrorNoMessage := []string{
`#datatype,string,string`,
`#group,true,true`,
`#default,,`,
`,error,reference`,
`,,`}
csvTable = makeCSVstring(csvRowsErrorNoMessage)
reader = strings.NewReader(csvTable)
queryResult = fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.False(t, queryResult.Next())
require.NotNil(t, queryResult.Err())
require.Equal(t, "unknown query error", queryResult.Err().Error())
}
func TestInvalidDataType(t *testing.T) {
csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,int,string,duration,base64Binary,dateTime:RFC3339
#group,false,false,true,true,false,true,true,false,false,false
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z
`
reader := strings.NewReader(csvTable)
queryResult := fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.False(t, queryResult.Next())
require.NotNil(t, queryResult.Err())
require.Equal(t, "unknown data type int", queryResult.Err().Error())
}
func TestReorderedAnnotations(t *testing.T) {
expectedTable := fluxcsv.NewFluxTableMetadataFull(
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "_result", "result", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "", "table", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_start", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_stop", true),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_time", false),
fluxcsv.NewFluxColumnFull(fluxcsv.DoubleDatatype, "", "_value", false),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_field", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_measurement", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "a", true),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "b", true),
)
expectedRecord1, err := fluxcsv.NewFluxRecord(
expectedTable,
map[string]interface{}{
"result": "_result",
"table": int64(0),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T10:34:08.135814545Z"),
"_value": 1.4,
"_field": "f",
"_measurement": "test",
"a": "1",
"b": "adsfasdf",
},
)
require.NoError(t, err)
expectedRecord2, err := fluxcsv.NewFluxRecord(
expectedTable,
map[string]interface{}{
"result": "_result",
"table": int64(0),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T22:08:44.850214724Z"),
"_value": 6.6,
"_field": "f",
"_measurement": "test",
"a": "1",
"b": "adsfasdf",
},
)
require.NoError(t, err)
csvTable1 := `#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
`
reader := strings.NewReader(csvTable1)
queryResult := fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.True(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord1, queryResult.Record())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.False(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord2, queryResult.Record())
require.False(t, queryResult.Next())
require.Nil(t, queryResult.Err())
csvTable2 := `#default,_result,,,,,,,,,
#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
`
reader = strings.NewReader(csvTable2)
queryResult = fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.True(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord1, queryResult.Record())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.False(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord2, queryResult.Record())
require.False(t, queryResult.Next())
require.Nil(t, queryResult.Err())
}
func TestDatatypeOnlyAnnotation(t *testing.T) {
expectedTable := fluxcsv.NewFluxTableMetadataFull(
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "result", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "", "table", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_start", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_stop", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_time", false),
fluxcsv.NewFluxColumnFull(fluxcsv.DoubleDatatype, "", "_value", false),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_field", false),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_measurement", false),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "a", false),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "b", false),
)
expectedRecord1, err := fluxcsv.NewFluxRecord(
expectedTable,
map[string]interface{}{
"result": nil,
"table": int64(0),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T10:34:08.135814545Z"),
"_value": 1.4,
"_field": "f",
"_measurement": "test",
"a": "1",
"b": "adsfasdf",
},
)
require.NoError(t, err)
expectedRecord2, err := fluxcsv.NewFluxRecord(
expectedTable,
map[string]interface{}{
"result": nil,
"table": int64(0),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T22:08:44.850214724Z"),
"_value": 6.6,
"_field": "f",
"_measurement": "test",
"a": "1",
"b": "adsfasdf",
},
)
require.NoError(t, err)
csvTable1 := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
`
reader := strings.NewReader(csvTable1)
queryResult := fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.True(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord1, queryResult.Record())
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.False(t, queryResult.AnnotationsChanged())
require.NotNil(t, queryResult.Record())
require.Equal(t, expectedRecord2, queryResult.Record())
require.False(t, queryResult.Next())
require.Nil(t, queryResult.Err())
}
func TestMissingDatatypeAnnotation(t *testing.T) {
csvTable1 := `
#group,false,false,true,true,false,true,true,false,false,false
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z
`
reader := strings.NewReader(csvTable1)
queryResult := fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.False(t, queryResult.Next())
require.NotNil(t, queryResult.Err())
require.Equal(t, "parsing error, datatype annotation not found", queryResult.Err().Error())
csvTable2 := `
#default,_result,,,,,,,,,
#group,false,false,true,true,false,true,true,false,false,false
,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z
`
reader = strings.NewReader(csvTable2)
queryResult = fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.False(t, queryResult.Next())
require.NotNil(t, queryResult.Err())
require.Equal(t, "parsing error, datatype annotation not found", queryResult.Err().Error())
}
func TestMissingAnnotations(t *testing.T) {
csvTable3 := `
,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z
`
reader := strings.NewReader(csvTable3)
queryResult := fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.False(t, queryResult.Next())
require.NotNil(t, queryResult.Err())
require.Equal(t, "parsing error, annotations not found", queryResult.Err().Error())
}
func TestDifferentNumberOfColumns(t *testing.T) {
csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,duration,base64Binary,dateTime:RFC3339
#group,false,false,true,true,false,true,true,false,false,
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z,2345234
`
reader := strings.NewReader(csvTable)
queryResult := fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.False(t, queryResult.Next())
require.NotNil(t, queryResult.Err())
require.Equal(t, "parsing error, row has different number of columns than the table: 11 vs 10", queryResult.Err().Error())
csvTable2 := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,duration,base64Binary,dateTime:RFC3339
#group,false,false,true,true,false,true,true,false,false,
#default,_result,,,,,,,
,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z,2345234
`
reader = strings.NewReader(csvTable2)
queryResult = fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.False(t, queryResult.Next())
require.NotNil(t, queryResult.Err())
require.Equal(t, "parsing error, row has different number of columns than the table: 8 vs 10", queryResult.Err().Error())
csvTable3 := `#default,_result,,,,,,,
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,duration,base64Binary,dateTime:RFC3339
#group,false,false,true,true,false,true,true,false,false,
,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z,2345234
`
reader = strings.NewReader(csvTable3)
queryResult = fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.False(t, queryResult.Next())
require.NotNil(t, queryResult.Err())
require.Equal(t, "parsing error, row has different number of columns than the table: 10 vs 8", queryResult.Err().Error())
}
func TestEmptyValue(t *testing.T) {
csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#group,false,false,true,true,false,false,true,true,true,true
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,,f,test,1,adsfasdf
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,,adsfasdf
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:11:32.225467895Z,1122.45,f,test,3,
`
reader := strings.NewReader(csvTable)
queryResult := fluxcsv.NewQueryTableResult(ioutil.NopCloser(reader))
require.True(t, queryResult.Next(), queryResult.Err())
require.Nil(t, queryResult.Err())
require.NotNil(t, queryResult.Record())
require.Nil(t, queryResult.Record().Value())
require.True(t, queryResult.Next(), queryResult.Err())
require.NotNil(t, queryResult.Record())
require.Nil(t, queryResult.Record().ValueByKey("a"))
require.True(t, queryResult.Next(), queryResult.Err())
require.NotNil(t, queryResult.Record())
require.Nil(t, queryResult.Record().ValueByKey("b"))
require.False(t, queryResult.Next())
require.Nil(t, queryResult.Err())
}
func makeCSVstring(rows []string) string {
csvTable := strings.Join(rows, "\r\n")
return fmt.Sprintf("%s\r\n", csvTable)
}

275
pkg/fluxcsv/table.go Normal file
View File

@ -0,0 +1,275 @@
package fluxcsv
import (
"fmt"
"strconv"
"strings"
"time"
)
const (
ResultCol = "result"
TableIdCol = "table"
)
// FluxTableMetadata holds flux query result table information represented by collection of columns.
// Each new table is introduced by annotations
type FluxTableMetadata struct {
resultColumn *FluxColumn
tableIdColumn *FluxColumn
columns []FluxColumn
groupKeyCols []string
}
// FluxColumn holds flux query table column properties
type FluxColumn struct {
name string
dataType ColType
group bool
defaultValue string
}
// FluxRecord represents row in the flux query result table
type FluxRecord struct {
metadata *FluxTableMetadata
result string
tableId int64
values map[string]interface{}
}
// NewFluxTableMetadataFull creates FluxTableMetadata containing the given columns
func NewFluxTableMetadataFull(columns ...*FluxColumn) *FluxTableMetadata {
m := FluxTableMetadata{}
for _, c := range columns {
switch n := c.Name(); n {
case ResultCol:
m.resultColumn = c
case TableIdCol:
m.tableIdColumn = c
default:
m.columns = append(m.columns, *c)
if c.IsGroup() {
m.groupKeyCols = append(m.groupKeyCols, n)
}
}
}
return &m
}
// ResultColumn returns metadata about the column naming results as
// specified by the query
func (f *FluxTableMetadata) ResultColumn() *FluxColumn {
return f.resultColumn
}
// TableIdColumn returns metadata about the column tracking table IDs
// within a result
func (f *FluxTableMetadata) TableIdColumn() *FluxColumn {
return f.tableIdColumn
}
// Columns returns slice of flux query result table
func (f *FluxTableMetadata) Columns() []FluxColumn {
return f.columns
}
// Column returns flux table column by index.
// Returns nil if index is out of the bounds.
func (f *FluxTableMetadata) Column(index int) *FluxColumn {
if len(f.columns) == 0 || index < 0 || index >= len(f.columns) {
return nil
}
return &f.columns[index]
}
// GroupKeyCols returns the names of the grouping columns
// in the table, sorted in ascending order.
func (f *FluxTableMetadata) GroupKeyCols() []string {
return f.groupKeyCols
}
// String returns FluxTableMetadata string dump
func (f *FluxTableMetadata) String() string {
var buffer strings.Builder
for i, c := range f.columns {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString("col")
buffer.WriteString(c.String())
}
return buffer.String()
}
// NewFluxColumn creates FluxColumn for id
func NewFluxColumn() *FluxColumn {
return &FluxColumn{}
}
// NewFluxColumnFull creates FluxColumn
func NewFluxColumnFull(dataType ColType, defaultValue string, name string, group bool) *FluxColumn {
return &FluxColumn{name: name, dataType: dataType, group: group, defaultValue: defaultValue}
}
// SetDefaultValue sets default value for the column
func (f *FluxColumn) SetDefaultValue(defaultValue string) {
f.defaultValue = defaultValue
}
// SetGroup set group flag for the column
func (f *FluxColumn) SetGroup(group bool) {
f.group = group
}
// SetDataType sets data type for the column
func (f *FluxColumn) SetDataType(dataType ColType) {
f.dataType = dataType
}
// SetName sets name of the column
func (f *FluxColumn) SetName(name string) {
f.name = name
}
// DefaultValue returns default value of the column
func (f *FluxColumn) DefaultValue() string {
return f.defaultValue
}
// IsGroup return true if the column is grouping column
func (f *FluxColumn) IsGroup() bool {
return f.group
}
// DataType returns data type of the column
func (f *FluxColumn) DataType() ColType {
return f.dataType
}
// Name returns name of the column
func (f *FluxColumn) Name() string {
return f.name
}
// String returns FluxColumn string dump
func (f *FluxColumn) String() string {
return fmt.Sprintf("{name: %s, datatype: %v, defaultValue: %s, group: %v}", f.name, f.dataType, f.defaultValue, f.group)
}
// NewFluxRecord returns new record for the table with values
func NewFluxRecord(metadata *FluxTableMetadata, values map[string]interface{}) (*FluxRecord, error) {
res := stringValue(values, ResultCol)
if res == "" && metadata.ResultColumn() != nil {
res = metadata.ResultColumn().DefaultValue()
}
delete(values, ResultCol)
var tid int64
if v, ok := values[TableIdCol]; ok {
if tid, ok = v.(int64); !ok {
return nil, fmt.Errorf("invalid value for table ID: %s", v)
}
} else if metadata.TableIdColumn() != nil {
if did := metadata.TableIdColumn().DefaultValue(); did != "" {
if parsedId, err := strconv.Atoi(did); err != nil {
return nil, fmt.Errorf("invalid default value for table ID: %s", did)
} else {
tid = int64(parsedId)
}
}
}
return &FluxRecord{metadata: metadata, result: res, tableId: tid, values: values}, nil
}
// Result returns the name of the result containing this record as specified by the query.
func (r *FluxRecord) Result() string {
return r.result
}
// TableId returns index of the table record belongs to within its result.
func (r *FluxRecord) TableId() int64 {
return r.tableId
}
// Start returns the inclusive lower time bound of all records in the current table.
// Returns empty time.Time if there is no column "_start".
func (r *FluxRecord) Start() time.Time {
return timeValue(r.values, "_start")
}
// Stop returns the exclusive upper time bound of all records in the current table.
// Returns empty time.Time if there is no column "_stop".
func (r *FluxRecord) Stop() time.Time {
return timeValue(r.values, "_stop")
}
// Time returns the time of the record.
// Returns empty time.Time if there is no column "_time".
func (r *FluxRecord) Time() time.Time {
return timeValue(r.values, "_time")
}
// Value returns the default _value column value or nil if not present
func (r *FluxRecord) Value() interface{} {
return r.ValueByKey("_value")
}
// Field returns the field name.
// Returns empty string if there is no column "_field".
func (r *FluxRecord) Field() string {
return stringValue(r.values, "_field")
}
// Measurement returns the measurement name of the record
// Returns empty string if there is no column "_measurement".
func (r *FluxRecord) Measurement() string {
return stringValue(r.values, "_measurement")
}
// Values returns map of the values where key is the column name
func (r *FluxRecord) Values() map[string]interface{} {
return r.values
}
// ValueByKey returns value for given column key for the record or nil of result has no value the column key
func (r *FluxRecord) ValueByKey(key string) interface{} {
return r.values[key]
}
// String returns FluxRecord string dump
func (r *FluxRecord) String() string {
var buffer strings.Builder
i := 0
for k, v := range r.values {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(fmt.Sprintf("%s:%v", k, v))
i++
}
return buffer.String()
}
// timeValue returns time.Time value from values map according to the key
// Empty time.Time value is returned if key is not found
func timeValue(values map[string]interface{}, key string) time.Time {
if val, ok := values[key]; ok {
if t, ok := val.(time.Time); ok {
return t
}
}
return time.Time{}
}
// timeValue returns string value from values map according to the key
// Empty string is returned if key is not found
func stringValue(values map[string]interface{}, key string) string {
if val, ok := values[key]; ok {
if s, ok := val.(string); ok {
return s
}
}
return ""
}

110
pkg/fluxcsv/table_test.go Normal file
View File

@ -0,0 +1,110 @@
package fluxcsv_test
import (
"testing"
"time"
"github.com/influxdata/influx-cli/v2/pkg/fluxcsv"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func mustParseTime(t *testing.T, s string) time.Time {
t.Helper()
time, err := time.Parse(time.RFC3339, s)
require.NoError(t, err)
return time
}
func TestTable(t *testing.T) {
t.Parallel()
table := fluxcsv.NewFluxTableMetadataFull(
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "_result", "result", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "10", "table", false),
fluxcsv.NewFluxColumnFull(fluxcsv.TimeDatatypeRFC, "", "_start", true),
fluxcsv.NewFluxColumnFull(fluxcsv.DoubleDatatype, "1.1", "_value", false),
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "", "_field", true),
)
require.Len(t, table.Columns(), 3)
require.NotNil(t, table.ResultColumn())
require.Equal(t, "_result", table.ResultColumn().DefaultValue())
require.Equal(t, fluxcsv.StringDatatype, table.ResultColumn().DataType())
require.Equal(t, "result", table.ResultColumn().Name())
require.Equal(t, false, table.ResultColumn().IsGroup())
require.NotNil(t, table.TableIdColumn())
require.Equal(t, "10", table.TableIdColumn().DefaultValue())
require.Equal(t, fluxcsv.LongDatatype, table.TableIdColumn().DataType())
require.Equal(t, "table", table.TableIdColumn().Name())
require.Equal(t, false, table.TableIdColumn().IsGroup())
require.NotNil(t, table.Column(0))
require.Equal(t, "", table.Column(0).DefaultValue())
require.Equal(t, fluxcsv.TimeDatatypeRFC, table.Column(0).DataType())
require.Equal(t, "_start", table.Column(0).Name())
require.Equal(t, true, table.Column(0).IsGroup())
require.NotNil(t, table.Column(1))
require.Equal(t, "1.1", table.Column(1).DefaultValue())
require.Equal(t, fluxcsv.DoubleDatatype, table.Column(1).DataType())
require.Equal(t, "_value", table.Column(1).Name())
require.Equal(t, false, table.Column(1).IsGroup())
require.NotNil(t, table.Column(2))
require.Equal(t, "", table.Column(2).DefaultValue())
require.Equal(t, fluxcsv.StringDatatype, table.Column(2).DataType())
require.Equal(t, "_field", table.Column(2).Name())
require.Equal(t, true, table.Column(2).IsGroup())
}
func TestRecord(t *testing.T) {
t.Parallel()
table := fluxcsv.NewFluxTableMetadataFull(
fluxcsv.NewFluxColumnFull(fluxcsv.StringDatatype, "_result", "result", false),
fluxcsv.NewFluxColumnFull(fluxcsv.LongDatatype, "10", "table", false),
)
record, err := fluxcsv.NewFluxRecord(table, map[string]interface{}{
"table": int64(0),
"_start": mustParseTime(t, "2020-02-17T22:19:49.747562847Z"),
"_stop": mustParseTime(t, "2020-02-18T22:19:49.747562847Z"),
"_time": mustParseTime(t, "2020-02-18T10:34:08.135814545Z"),
"_value": 1.4,
"_field": "f",
"_measurement": "test",
"a": "1",
"b": "adsfasdf",
})
require.NoError(t, err)
require.Len(t, record.Values(), 9)
require.Equal(t, mustParseTime(t, "2020-02-17T22:19:49.747562847Z"), record.Start())
require.Equal(t, mustParseTime(t, "2020-02-18T22:19:49.747562847Z"), record.Stop())
require.Equal(t, mustParseTime(t, "2020-02-18T10:34:08.135814545Z"), record.Time())
require.Equal(t, "f", record.Field())
require.Equal(t, 1.4, record.Value())
require.Equal(t, "test", record.Measurement())
require.Equal(t, int64(0), record.TableId())
agRec, err := fluxcsv.NewFluxRecord(table, map[string]interface{}{
"result": "foo",
"room": "bathroom",
"sensor": "SHT",
"temp": 24.3,
"hum": 42,
})
require.NoError(t, err)
require.Len(t, agRec.Values(), 4)
require.Equal(t, time.Time{}, agRec.Start())
require.Equal(t, time.Time{}, agRec.Stop())
require.Equal(t, time.Time{}, agRec.Time())
require.Equal(t, "", agRec.Field())
assert.Nil(t, agRec.Value())
require.Equal(t, "", agRec.Measurement())
require.Equal(t, int64(10), agRec.TableId())
require.Equal(t, 24.3, agRec.ValueByKey("temp"))
require.Equal(t, 42, agRec.ValueByKey("hum"))
assert.Nil(t, agRec.ValueByKey("notexist"))
}