influx-cli/pkg/fluxcsv/query_result.go

298 lines
7.2 KiB
Go

package fluxcsv
import (
"encoding/base64"
"encoding/csv"
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/influxdata/influx-cli/v2/pkg/duration"
)
type ColType int
const (
StringDatatype ColType = iota
DoubleDatatype
BoolDatatype
LongDatatype
ULongDatatype
DurationDatatype
Base64BinaryDataType
TimeDatatypeRFC
TimeDatatypeRFCNano
InvalidDatatype
)
func ParseType(s string) (ColType, error) {
switch s {
case "string":
return StringDatatype, nil
case "double":
return DoubleDatatype, nil
case "boolean":
return BoolDatatype, nil
case "long":
return LongDatatype, nil
case "unsignedLong":
return ULongDatatype, nil
case "duration":
return DurationDatatype, nil
case "base64Binary":
return Base64BinaryDataType, nil
case "dateTime:RFC3339":
return TimeDatatypeRFC, nil
case "dateTime:RFC3339Nano":
return TimeDatatypeRFCNano, nil
default:
return InvalidDatatype, fmt.Errorf("unknown data type %s", s)
}
}
// QueryTableResult parses streamed flux query response into structures representing flux table parts
// Walking though the result is done by repeatedly calling Next() until returns false.
// Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
// Data are acquired by Record() method.
// Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
type QueryTableResult struct {
io.Closer
csvReader *csv.Reader
resultChanged bool
tableIdChanged bool
annotationsChanged bool
record *FluxRecord
baseColumns []*FluxColumn
metadata *FluxTableMetadata
err error
}
func NewQueryTableResult(rawResponse io.ReadCloser) *QueryTableResult {
csvReader := csv.NewReader(rawResponse)
csvReader.FieldsPerRecord = -1
return &QueryTableResult{Closer: rawResponse, csvReader: csvReader}
}
// ResultChanged returns true if the last call of Next() found a new query result
func (q *QueryTableResult) ResultChanged() bool {
return q.resultChanged
}
// TableIdChanged returns true if the last call of Next() found a new table within the query result
func (q *QueryTableResult) TableIdChanged() bool {
return q.tableIdChanged
}
// AnnotationsChanged returns true if last call of Next() found new CSV annotations
func (q *QueryTableResult) AnnotationsChanged() bool {
return q.annotationsChanged
}
// Record returns last parsed flux table data row
// Use Record methods to access value and row properties
func (q *QueryTableResult) Record() *FluxRecord {
return q.record
}
// Metadata returns table-level info for last parsed flux table data row
func (q *QueryTableResult) Metadata() *FluxTableMetadata {
return q.metadata
}
type parsingState int
const (
parsingStateNormal parsingState = iota
parsingStateAnnotation
parsingStateNameRow
parsingStateError
)
// Next advances to next row in query result.
// During the first time it is called, Next creates also table metadata
// Actual parsed row is available through Record() function
// Returns false in case of end or an error, otherwise true
func (q *QueryTableResult) Next() bool {
var row []string
// set closing query in case of preliminary return
closer := func() {
if err := q.Close(); err != nil {
message := err.Error()
if q.err != nil {
message = fmt.Sprintf("%s,%s", message, q.err.Error())
}
q.err = errors.New(message)
}
}
defer func() {
closer()
}()
parsingState := parsingStateNormal
q.annotationsChanged = false
dataTypeAnnotationFound := false
readRow:
row, q.err = q.csvReader.Read()
if q.err == io.EOF {
q.err = nil
return false
}
if q.err != nil {
return false
}
if len(row) <= 1 {
goto readRow
}
if len(row[0]) > 0 && row[0][0] == '#' {
if parsingState == parsingStateNormal {
q.annotationsChanged = true
q.baseColumns = nil
for range row[1:] {
q.baseColumns = append(q.baseColumns, NewFluxColumn())
}
parsingState = parsingStateAnnotation
}
}
expectedNcol := len(q.baseColumns)
if expectedNcol == 0 {
q.err = errors.New("parsing error, annotations not found")
return false
}
ncol := len(row) - 1
if ncol != expectedNcol {
q.err = fmt.Errorf("parsing error, row has different number of columns than the table: %d vs %d", ncol, expectedNcol)
return false
}
switch row[0] {
case "":
switch parsingState {
case parsingStateAnnotation:
if !dataTypeAnnotationFound {
q.err = errors.New("parsing error, datatype annotation not found")
return false
}
parsingState = parsingStateNameRow
fallthrough
case parsingStateNameRow:
if row[1] == "error" {
parsingState = parsingStateError
} else {
for i, n := range row[1:] {
q.baseColumns[i].SetName(n)
}
q.metadata = NewFluxTableMetadataFull(q.baseColumns...)
parsingState = parsingStateNormal
}
goto readRow
case parsingStateError:
var message string
if len(row) > 1 && len(row[1]) > 0 {
message = row[1]
} else {
message = "unknown query error"
}
reference := ""
if len(row) > 2 && len(row[2]) > 0 {
reference = fmt.Sprintf(",%s", row[2])
}
q.err = fmt.Errorf("%s%s", message, reference)
return false
}
values := make(map[string]interface{})
for i, v := range row[1:] {
values[q.baseColumns[i].Name()], q.err = toValue(
stringTernary(v, q.baseColumns[i].DefaultValue()), q.baseColumns[i].DataType(), q.baseColumns[i].Name())
if q.err != nil {
return false
}
}
var prevRes string
var prevId int64
if q.record != nil {
prevRes, prevId = q.record.Result(), q.record.TableId()
}
q.record, q.err = NewFluxRecord(q.metadata, values)
q.resultChanged = q.record.Result() != prevRes
q.tableIdChanged = q.record.TableId() != prevId
if q.err != nil {
return false
}
case "#datatype":
dataTypeAnnotationFound = true
for i, d := range row[1:] {
t, err := ParseType(d)
if err != nil {
q.err = err
return false
}
q.baseColumns[i].SetDataType(t)
}
goto readRow
case "#group":
for i, g := range row[1:] {
q.baseColumns[i].SetGroup(g == "true")
}
goto readRow
case "#default":
for i, c := range row[1:] {
q.baseColumns[i].SetDefaultValue(c)
}
goto readRow
}
// don't close query
closer = func() {}
return true
}
// Err returns an error raised during flux query response parsing
func (q *QueryTableResult) Err() error {
return q.err
}
// stringTernary returns a if not empty, otherwise b
func stringTernary(a, b string) string {
if a == "" {
return b
}
return a
}
// toValue converts s into type by t
func toValue(s string, t ColType, name string) (interface{}, error) {
if s == "" {
return nil, nil
}
switch t {
case StringDatatype:
return s, nil
case TimeDatatypeRFC:
return time.Parse(time.RFC3339, s)
case TimeDatatypeRFCNano:
return time.Parse(time.RFC3339Nano, s)
case DurationDatatype:
return duration.RawDurationToTimeDuration(s)
case DoubleDatatype:
return strconv.ParseFloat(s, 64)
case BoolDatatype:
if strings.ToLower(s) == "false" {
return false, nil
}
return true, nil
case LongDatatype:
return strconv.ParseInt(s, 10, 64)
case ULongDatatype:
return strconv.ParseUint(s, 10, 64)
case Base64BinaryDataType:
return base64.StdEncoding.DecodeString(s)
default:
return nil, fmt.Errorf("%s has unknown data type %v", name, t)
}
}