2021-05-13 10:54:45 -04:00

276 lines
7.2 KiB
Go

package fluxcsv
import (
"fmt"
"strconv"
"strings"
"time"
)
const (
ResultCol = "result"
TableIdCol = "table"
)
// FluxTableMetadata holds flux query result table information represented by collection of columns.
// Each new table is introduced by annotations
type FluxTableMetadata struct {
resultColumn *FluxColumn
tableIdColumn *FluxColumn
columns []FluxColumn
groupKeyCols []string
}
// FluxColumn holds flux query table column properties
type FluxColumn struct {
name string
dataType ColType
group bool
defaultValue string
}
// FluxRecord represents row in the flux query result table
type FluxRecord struct {
metadata *FluxTableMetadata
result string
tableId int64
values map[string]interface{}
}
// NewFluxTableMetadataFull creates FluxTableMetadata containing the given columns
func NewFluxTableMetadataFull(columns ...*FluxColumn) *FluxTableMetadata {
m := FluxTableMetadata{}
for _, c := range columns {
switch n := c.Name(); n {
case ResultCol:
m.resultColumn = c
case TableIdCol:
m.tableIdColumn = c
default:
m.columns = append(m.columns, *c)
if c.IsGroup() {
m.groupKeyCols = append(m.groupKeyCols, n)
}
}
}
return &m
}
// ResultColumn returns metadata about the column naming results as
// specified by the query
func (f *FluxTableMetadata) ResultColumn() *FluxColumn {
return f.resultColumn
}
// TableIdColumn returns metadata about the column tracking table IDs
// within a result
func (f *FluxTableMetadata) TableIdColumn() *FluxColumn {
return f.tableIdColumn
}
// Columns returns slice of flux query result table
func (f *FluxTableMetadata) Columns() []FluxColumn {
return f.columns
}
// Column returns flux table column by index.
// Returns nil if index is out of the bounds.
func (f *FluxTableMetadata) Column(index int) *FluxColumn {
if len(f.columns) == 0 || index < 0 || index >= len(f.columns) {
return nil
}
return &f.columns[index]
}
// GroupKeyCols returns the names of the grouping columns
// in the table, sorted in ascending order.
func (f *FluxTableMetadata) GroupKeyCols() []string {
return f.groupKeyCols
}
// String returns FluxTableMetadata string dump
func (f *FluxTableMetadata) String() string {
var buffer strings.Builder
for i, c := range f.columns {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString("col")
buffer.WriteString(c.String())
}
return buffer.String()
}
// NewFluxColumn creates FluxColumn for id
func NewFluxColumn() *FluxColumn {
return &FluxColumn{}
}
// NewFluxColumnFull creates FluxColumn
func NewFluxColumnFull(dataType ColType, defaultValue string, name string, group bool) *FluxColumn {
return &FluxColumn{name: name, dataType: dataType, group: group, defaultValue: defaultValue}
}
// SetDefaultValue sets default value for the column
func (f *FluxColumn) SetDefaultValue(defaultValue string) {
f.defaultValue = defaultValue
}
// SetGroup set group flag for the column
func (f *FluxColumn) SetGroup(group bool) {
f.group = group
}
// SetDataType sets data type for the column
func (f *FluxColumn) SetDataType(dataType ColType) {
f.dataType = dataType
}
// SetName sets name of the column
func (f *FluxColumn) SetName(name string) {
f.name = name
}
// DefaultValue returns default value of the column
func (f *FluxColumn) DefaultValue() string {
return f.defaultValue
}
// IsGroup return true if the column is grouping column
func (f *FluxColumn) IsGroup() bool {
return f.group
}
// DataType returns data type of the column
func (f *FluxColumn) DataType() ColType {
return f.dataType
}
// Name returns name of the column
func (f *FluxColumn) Name() string {
return f.name
}
// String returns FluxColumn string dump
func (f *FluxColumn) String() string {
return fmt.Sprintf("{name: %s, datatype: %v, defaultValue: %s, group: %v}", f.name, f.dataType, f.defaultValue, f.group)
}
// NewFluxRecord returns new record for the table with values
func NewFluxRecord(metadata *FluxTableMetadata, values map[string]interface{}) (*FluxRecord, error) {
res := stringValue(values, ResultCol)
if res == "" && metadata.ResultColumn() != nil {
res = metadata.ResultColumn().DefaultValue()
}
delete(values, ResultCol)
var tid int64
if v, ok := values[TableIdCol]; ok {
if tid, ok = v.(int64); !ok {
return nil, fmt.Errorf("invalid value for table ID: %s", v)
}
} else if metadata.TableIdColumn() != nil {
if did := metadata.TableIdColumn().DefaultValue(); did != "" {
if parsedId, err := strconv.Atoi(did); err != nil {
return nil, fmt.Errorf("invalid default value for table ID: %s", did)
} else {
tid = int64(parsedId)
}
}
}
return &FluxRecord{metadata: metadata, result: res, tableId: tid, values: values}, nil
}
// Result returns the name of the result containing this record as specified by the query.
func (r *FluxRecord) Result() string {
return r.result
}
// TableId returns index of the table record belongs to within its result.
func (r *FluxRecord) TableId() int64 {
return r.tableId
}
// Start returns the inclusive lower time bound of all records in the current table.
// Returns empty time.Time if there is no column "_start".
func (r *FluxRecord) Start() time.Time {
return timeValue(r.values, "_start")
}
// Stop returns the exclusive upper time bound of all records in the current table.
// Returns empty time.Time if there is no column "_stop".
func (r *FluxRecord) Stop() time.Time {
return timeValue(r.values, "_stop")
}
// Time returns the time of the record.
// Returns empty time.Time if there is no column "_time".
func (r *FluxRecord) Time() time.Time {
return timeValue(r.values, "_time")
}
// Value returns the default _value column value or nil if not present
func (r *FluxRecord) Value() interface{} {
return r.ValueByKey("_value")
}
// Field returns the field name.
// Returns empty string if there is no column "_field".
func (r *FluxRecord) Field() string {
return stringValue(r.values, "_field")
}
// Measurement returns the measurement name of the record
// Returns empty string if there is no column "_measurement".
func (r *FluxRecord) Measurement() string {
return stringValue(r.values, "_measurement")
}
// Values returns map of the values where key is the column name
func (r *FluxRecord) Values() map[string]interface{} {
return r.values
}
// ValueByKey returns value for given column key for the record or nil of result has no value the column key
func (r *FluxRecord) ValueByKey(key string) interface{} {
return r.values[key]
}
// String returns FluxRecord string dump
func (r *FluxRecord) String() string {
var buffer strings.Builder
i := 0
for k, v := range r.values {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(fmt.Sprintf("%s:%v", k, v))
i++
}
return buffer.String()
}
// timeValue returns time.Time value from values map according to the key
// Empty time.Time value is returned if key is not found
func timeValue(values map[string]interface{}, key string) time.Time {
if val, ok := values[key]; ok {
if t, ok := val.(time.Time); ok {
return t
}
}
return time.Time{}
}
// timeValue returns string value from values map according to the key
// Empty string is returned if key is not found
func stringValue(values map[string]interface{}, key string) string {
if val, ok := values[key]; ok {
if s, ok := val.(string); ok {
return s
}
}
return ""
}