feat: port csv2lp from influxdb (#37)

Co-authored-by: William Baker <55118525+wbaker85@users.noreply.github.com>
This commit is contained in:
Daniel Moran 2021-04-26 09:12:29 -04:00 committed by GitHub
parent 68ad797ab7
commit f2d10e34f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 3845 additions and 0 deletions

1
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/kr/pretty v0.1.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/urfave/cli/v2 v2.3.0
golang.org/x/text v0.3.3
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
honnef.co/go/tools v0.1.3
)

189
pkg/csv2lp/README.md Normal file
View File

@ -0,0 +1,189 @@
# CSV to Line Protocol
csv2lp library converts CSV (comma separated values) to InfluxDB Line Protocol.
1. it can process CSV result of a (simple) flux query that exports data from a bucket
2. it allows the processing of existing CSV files
## Usage
The entry point is the ``CsvToLineProtocol`` function that accepts a (utf8) reader with CSV data and returns a reader with line protocol data.
## Examples
#### Example 1 - Flux Query Result
csv:
```bash
#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:17:57Z,0,time_steal,cpu,cpu1,rsavage.prod
,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:07Z,0,time_steal,cpu,cpu1,rsavage.prod
#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:01Z,2.7263631815907954,usage_user,cpu,cpu-total,tahoecity.prod
,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:11Z,2.247752247752248,usage_user,cpu,cpu-total,tahoecity.prod
```
line protocol data:
```
cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669077000000000
cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669087000000000
cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.7263631815907954 1582669081000000000
cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.247752247752248 1582669091000000000
```
#### Example 2 - Simple CSV file
csv:
```bash
#datatype measurement,tag,tag,double,double,ignored,dateTime:number
m,cpu,host,time_steal,usage_user,nothing,time
cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000
cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000
```
line protocol data:
```
cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.7 1482669077000000000
cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.2 1482669087000000000
```
Data type can be supplied in the column name, the CSV can be shortened to:
```
m|measurement,cpu|tag,host|tag,time_steal|double,usage_user|double,nothing|ignored,time|dateTime:number
cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000
cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000
```
#### Example 3 - Data Types with default values
csv:
```bash
#datatype measurement,tag,string,double,boolean,long,unsignedLong,duration,dateTime
#default test,annotatedDatatypes,,,,,,
m,name,s,d,b,l,ul,dur,time
,,str1,1.0,true,1,1,1ms,1
,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z
```
line protocol data:
```
test,name=annotatedDatatypes s="str1",d=1,b=true,l=1i,ul=1u,dur=1000000i 1
test,name=annotatedDatatypes s="str2",d=2,b=false,l=2i,ul=2u,dur=2000i 1578737410000000000
```
Default value can be supplied in the column label after data type, the CSV could be also:
```
m|measurement|test,name|tag|annotatedDatatypes,s|string,d|double,b|boolean,l|long,ul|unsignedLong,dur|duration,time|dateTime
,,str1,1.0,true,1,1,1ms,1
,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z
```
#### Example 4 - Advanced usage
csv:
```
#constant measurement,test
#constant tag,name,datetypeFormats
#timezone -0500
t|dateTime:2006-01-02|1970-01-02,"d|double:,. ","b|boolean:y,Y:n,N|y"
1970-01-01,"123.456,78",
,"123 456,78",Y
```
- measurement and extra tags is defined using the `#constant` annotation
- timezone for dateTime is to `-0500` (EST)
- `t` column is of `dateTime` data type of format is `2006-01-02`, default value is _January 2nd 1970_
- `d` column is of `double` data type with `,` as a fraction delimiter and `. ` as ignored separators that used to visually separate large numbers into groups
- `b` column os of `boolean` data type that considers `y` or `Y` truthy, `n` or `N` falsy and empty column values as truthy
line protocol data:
```
test,name=datetypeFormats d=123456.78,b=true 18000000000000
test,name=datetypeFormats d=123456.78,b=true 104400000000000
```
#### Example 5 - Custom column separator
```
sep=;
m|measurement;available|boolean:y,Y:|n;dt|dateTime:number
test;nil;1
test;N;2
test;";";3
test;;4
test;Y;5
```
- the first line can define a column separator character for next lines, here: `;`
- other lines use this separator, `available|boolean:y,Y` does not need to be wrapped in double quotes
line protocol data:
```
test available=false 1
test available=false 2
test available=false 3
test available=false 4
test available=true 5
```
## CSV Data On Input
This library supports all the concepts of [flux result annotated CSV](https://docs.influxdata.com/influxdb/latest/reference/syntax/annotated-csv/#tables) and provides a few extensions that allow to process existing/custom CSV files. The conversion to line protocol is driven by contents of annotation rows and layout of the header row.
#### New data types
Existing [data types](https://docs.influxdata.com/influxdb/latest/reference/syntax/annotated-csv/#data-types) are supported. The CSV input can also contain the following data types that are used to associate a column value to a part of a protocol line
- `measurement` data type identifies a column that carries the measurement name
- `tag` data type identifies a column with a tag value, column label (from the header row) is the tag name
- `time` is an alias for existing `dateTime` type , there is at most one such column in a CSV row
- `ignore` and `ignored` data types are used to identify columns that are ignored when creating a protocol line
- `field` data type is used to copy the column data to a protocol line as-is
#### New CSV annotations
- `#constant` annotation adds a constant column to the data, so you can set measurement, time, field or tag of every row you import
- the format of a constant annotation row is `#constant,datatype,name,value`', it contains supported datatype, a column name, and a constant value
- _column name_ can be omitted for _dateTime_ or _measurement_ columns, so the annotation can be simply `#constant,measurement,cpu`
- `#concat` annotation adds a new column that is concatenated from existing columns according to a template
- the format of a concat annotation row is `#concat,datatype,name,template`', it contains supported datatype, a column name, and a template value
- the `template` is a string with `${columnName}` placeholders, in which the placeholders are replaced by values of existing columns
- for example: `#concat,string,fullName,${firstName} ${lastName}`
- _column name_ can be omitted for _dateTime_ or _measurement_ columns
- `#timezone` annotation specifies the time zone of the data using an offset, which is either `+hhmm` or `-hhmm` or `Local` to use the local/computer time zone. Examples: _#timezone,+0100_ _#timezone -0500_ _#timezone Local_
#### Data type with data format
All data types can include the format that is used to parse column data. It is then specified as `datatype:format`. The following data types support format:
- `dateTime:format`
- the following formats are predefined:
- `dateTime:RFC3339` format is 2006-01-02T15:04:05Z07:00
- `dateTime:RFC3339Nano` format is 2006-01-02T15:04:05.999999999Z07:00
- `dateTime:number` represent UTCs time since epoch in nanoseconds
- a custom layout as described in the [time](https://golang.org/pkg/time) package, for example `dateTime:2006-01-02` parses 4-digit-year , '-' , 2-digit month ,'-' , 2 digit day of the month
- if the time format includes a time zone, the parsed date time respects the time zone; otherwise the timezone dependends on the presence of the new `#timezone` annotation; if there is no `#timezone` annotation, UTC is used
- `double:format`
- the `format`'s first character is used to separate integer and fractional part (usually `.` or `,`), second and next format's characters (such as as `, _`) are removed from the column value, these removed characters are typically used to visually separate large numbers into groups
- for example:
- a Spanish locale value `3.494.826.157,123` is of `double:,.` type; the same `double` value is _3494826157.123_
- `1_000_000` is of `double:._` type to be a million `double`
- note that you have to quote column delimiters whenever they appear in a CSV column value, for example:
- `#constant,"double:,.",myColumn,"1.234,011"`
- `long:format` and `unsignedLong:format` support the same format as `double`, but everything after and including a fraction character is ignored
- the format can be appended with `strict` to fail when a fraction digit is present, for example:
- `1000.000` is `1000` when parsed as `long`, but fails when parsed as `long:strict`
- `1_000,000` is `1000` when parsed as `long:,_`, but fails when parsed as `long:strict,_`
- `boolean:truthy:falsy`
- `truthy` and `falsy` are comma-separated lists of values, they can be empty to assume all values as truthy/falsy; for example `boolean:sí,yes,ja,oui,ano,да:no,nein,non,ne,нет`
- a `boolean` data type (without the format) parses column values that start with any of _tTyY1_ as `true` values, _fFnN0_ as `false` values and fails on other values
- a column with an empty value is excluded in the protocol line unless a default value is supplied either using `#default` annotation or in a header line (see below)
#### Header row with data types and default values
The header row (i.e. the row that define column names) can also define column data types when supplied as `name|datatype`; for example `cpu|tag` defines a tag column named _cpu_ . Moreover, it can also specify a default value when supplied as `name|datatype|default`; for example, `count|long|0` defines a field column named _count_ of _long_ data type that will not skip the field if a column value is empty, but uses '0' as the column value.
- this approach helps to easily specify column names, types and defaults in a single row
- this is an alternative to using 3 lines being `#datatype` and `#default` annotations and a simple header row
#### Custom CSV column separator
A CSV file can start with a line `sep=;` to inform about a character that is used to separate columns, by default `,` is used as a column separator. This method is frequently used (Excel).
#### Error handling
The CSV conversion stops on the first error by default, line and column are reported together with the error. The CsvToLineReader's SkipRowOnError function can change it to skip error rows and log errors instead.
#### Support Existing CSV files
The majority of existing CSV files can be imported by skipping the first X lines of existing data (so that custom header line can be then provided) and prepending extra annotation/header lines to let this library know of how to convert the CSV to line protocol. The following functions helps to change the data on input
- [csv2lp.SkipHeaderLinesReader](./skip_header_lines.go) returns a reader that skip the first x lines of the supplied reader
- [io.MultiReader](https://golang.org/pkg/io/#MultiReader) joins multiple readers, custom header line(s) and new lines can be prepended as [strings.NewReader](https://golang.org/pkg/strings/#NewReader)s
- [csv2lp.MultiCloser](./multi_closer.go) helps with closing multiple io.Closers (files) on input, [it is not available OOTB](https://github.com/golang/go/issues/20136)

163
pkg/csv2lp/csv2lp.go Normal file
View File

@ -0,0 +1,163 @@
// Package csv2lp transforms CSV data to InfluxDB line protocol
package csv2lp
import (
"encoding/csv"
"fmt"
"io"
"log"
"strings"
)
// CsvLineError is returned for csv conversion errors
type CsvLineError struct {
// 1 is the first line
Line int
Err error
}
func (e CsvLineError) Error() string {
if e.Line > 0 {
return fmt.Sprintf("line %d: %v", e.Line, e.Err)
}
return fmt.Sprintf("%v", e.Err)
}
// CreateRowColumnError wraps an existing error to add line and column coordinates
func CreateRowColumnError(line int, columnLabel string, err error) CsvLineError {
return CsvLineError{
Line: line,
Err: CsvColumnError{
Column: columnLabel,
Err: err,
},
}
}
// CsvToLineReader represents state of transformation from csv data to line protocol reader
type CsvToLineReader struct {
// csv reading
csv *csv.Reader
// lineReader is used to report line number of the last read CSV line
lineReader *LineReader
// Table collects information about used columns
Table CsvTable
// LineNumber represents line number of csv.Reader, 1 is the first
LineNumber int
// when true, log table data columns before reading data rows
logTableDataColumns bool
// state variable that indicates whether any data row was read
dataRowAdded bool
// log CSV data errors to sterr and continue with CSV processing
skipRowOnError bool
// RowSkipped is called when a row is skipped because of data parsing error
RowSkipped func(source *CsvToLineReader, lineError error, row []string)
// reader results
buffer []byte
lineBuffer []byte
index int
finished error
}
// LogTableColumns turns on/off logging of table data columns before reading data rows
func (state *CsvToLineReader) LogTableColumns(val bool) *CsvToLineReader {
state.logTableDataColumns = val
return state
}
// SkipRowOnError controls whether to fail on every CSV conversion error (false) or to log the error and continue (true)
func (state *CsvToLineReader) SkipRowOnError(val bool) *CsvToLineReader {
state.skipRowOnError = val
return state
}
// Comma returns a field delimiter used in an input CSV file
func (state *CsvToLineReader) Comma() rune {
return state.csv.Comma
}
// Read implements io.Reader that returns protocol lines
func (state *CsvToLineReader) Read(p []byte) (n int, err error) {
// state1: finished
if state.finished != nil {
return 0, state.finished
}
// state2: some data are in the buffer to copy
if len(state.buffer) > state.index {
// we have remaining bytes to copy
if len(state.buffer)-state.index > len(p) {
// copy a part of the buffer
copy(p, state.buffer[state.index:state.index+len(p)])
state.index += len(p)
return len(p), nil
}
// copy the entire buffer
n = len(state.buffer) - state.index
copy(p[:n], state.buffer[state.index:])
state.buffer = state.buffer[:0]
state.index = 0
return n, nil
}
// state3: fill buffer with data to read from
for {
// Read each record from csv
row, err := state.csv.Read()
state.LineNumber = state.lineReader.LastLineNumber
if parseError, ok := err.(*csv.ParseError); ok && parseError.Err == csv.ErrFieldCount {
// every row can have different number of columns
err = nil
}
if err != nil {
state.finished = err
return state.Read(p)
}
if state.LineNumber == 1 && len(row) == 1 && strings.HasPrefix(row[0], "sep=") && len(row[0]) > 4 {
// separator specified in the first line
state.csv.Comma = rune(row[0][4])
continue
}
if state.Table.AddRow(row) {
var err error
state.lineBuffer = state.lineBuffer[:0] // reuse line buffer
state.lineBuffer, err = state.Table.AppendLine(state.lineBuffer, row, state.LineNumber)
if !state.dataRowAdded && state.logTableDataColumns {
log.Println(state.Table.DataColumnsInfo())
}
state.dataRowAdded = true
if err != nil {
lineError := CsvLineError{state.LineNumber, err}
if state.RowSkipped != nil {
state.RowSkipped(state, lineError, row)
continue
}
if state.skipRowOnError {
log.Println(lineError)
continue
}
state.finished = lineError
return state.Read(p)
}
state.buffer = append(state.buffer, state.lineBuffer...)
state.buffer = append(state.buffer, '\n')
break
} else {
state.dataRowAdded = false
}
}
return state.Read(p)
}
// CsvToLineProtocol transforms csv data into line protocol data
func CsvToLineProtocol(reader io.Reader) *CsvToLineReader {
lineReader := NewLineReader(reader)
lineReader.LineNumber = 1 // start counting from 1
csv := csv.NewReader(lineReader)
csv.ReuseRecord = true
return &CsvToLineReader{
csv: csv,
lineReader: lineReader,
}
}

426
pkg/csv2lp/csv2lp_test.go Normal file
View File

@ -0,0 +1,426 @@
package csv2lp
import (
"bytes"
"errors"
"io"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
// Test_CsvToLineProtocol_variousBufferSize tests conversion of annotated CSV data to line protocol data on various buffer sizes
func Test_CsvToLineProtocol_variousBufferSize(t *testing.T) {
var tests = []struct {
name string
csv string
lines string
err string
}{
{
"simple1",
"_measurement,a,b\ncpu,1,1\ncpu,b2\n",
"cpu a=1,b=1\ncpu a=b2\n",
"",
},
{
"simple1_withSep",
"sep=;\n_measurement;a;b\ncpu;1;1\ncpu;b2\n",
"cpu a=1,b=1\ncpu a=b2\n",
"",
},
{
"simple2",
"_measurement,a,b\ncpu,1,1\ncpu,\n",
"",
"no field data",
},
{
"simple3",
"_measurement,a,_time\ncpu,1,1\ncpu,2,invalidTime\n",
"",
"_time", // error in _time column
},
{
"constant_annotations",
"#constant,measurement,,cpu\n" +
"#constant,tag,xpu,xpu1\n" +
"#constant,tag,cpu,cpu1\n" +
"#constant,long,of,100\n" +
"#constant,dateTime,,2\n" +
"x,y\n" +
"1,2\n" +
"3,4\n",
"cpu,cpu=cpu1,xpu=xpu1 x=1,y=2,of=100i 2\n" +
"cpu,cpu=cpu1,xpu=xpu1 x=3,y=4,of=100i 2\n",
"", // no error
},
{
"timezone_annotation-0100",
"#timezone,-0100\n" +
"#constant,measurement,cpu\n" +
"#constant,dateTime:2006-01-02,1970-01-01\n" +
"x,y\n" +
"1,2\n",
"cpu x=1,y=2 3600000000000\n",
"", // no error
},
{
"timezone_annotation_EST",
"#timezone,EST\n" +
"#constant,measurement,cpu\n" +
"#constant,dateTime:2006-01-02,1970-01-01\n" +
"x,y\n" +
"1,2\n",
"cpu x=1,y=2 18000000000000\n",
"", // no error
},
}
bufferSizes := []int{40, 7, 3, 1}
for _, test := range tests {
for _, bufferSize := range bufferSizes {
t.Run(test.name+"_"+strconv.Itoa(bufferSize), func(t *testing.T) {
reader := CsvToLineProtocol(strings.NewReader(test.csv))
buffer := make([]byte, bufferSize)
lines := make([]byte, 0, 100)
for {
n, err := reader.Read(buffer)
if err != nil {
if err == io.EOF {
break
}
if test.err != "" {
// fmt.Println(err)
if err := err.Error(); !strings.Contains(err, test.err) {
require.Equal(t, err, test.err)
}
return
}
require.Nil(t, err.Error())
break
}
lines = append(lines, buffer[:n]...)
}
if test.err == "" {
require.Equal(t, test.lines, string(lines))
} else {
require.Fail(t, "error message with '"+test.err+"' expected")
}
})
}
}
}
// Test_CsvToLineProtocol_samples tests conversion of annotated CSV data to line protocol data
func Test_CsvToLineProtocol_samples(t *testing.T) {
var tests = []struct {
name string
csv string
lines string
err string
}{
{
"queryResult_19452", // https://github.com/influxdata/influxdb/issues/19452
"#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n" +
"#group,false,false,true,true,false,false,true,true,true\n" +
"#default,_result,,,,,,,,\n" +
",result,table,_start,_stop,_time,_value,_field,_measurement,host\n" +
",,0,2020-08-26T22:59:23.598653Z,2020-08-26T23:00:23.598653Z,2020-08-26T22:59:30Z,15075651584,active,mem,ip-192-168-86-25.ec2.internal\n",
"mem,host=ip-192-168-86-25.ec2.internal active=15075651584i 1598482770000000000\n",
"", // no error
},
{
"queryResult_19452_group_first", // issue 19452, but with group annotation first
"#group,false,false,true,true,false,false,true,true,true\n" +
"#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n" +
"#default,_result,,,,,,,,\n" +
",result,table,_start,_stop,_time,_value,_field,_measurement,host\n" +
",,0,2020-08-26T22:59:23.598653Z,2020-08-26T23:00:23.598653Z,2020-08-26T22:59:30Z,15075651584,active,mem,ip-192-168-86-25.ec2.internal\n",
"mem,host=ip-192-168-86-25.ec2.internal active=15075651584i 1598482770000000000\n",
"", // no error
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
reader := CsvToLineProtocol(strings.NewReader(test.csv))
buffer := make([]byte, 100)
lines := make([]byte, 0, 100)
for {
n, err := reader.Read(buffer)
if err != nil {
if err == io.EOF {
break
}
if test.err != "" {
// fmt.Println(err)
if err := err.Error(); !strings.Contains(err, test.err) {
require.Equal(t, err, test.err)
}
return
}
require.Nil(t, err.Error())
break
}
lines = append(lines, buffer[:n]...)
}
if test.err == "" {
require.Equal(t, test.lines, string(lines))
} else {
require.Fail(t, "error message with '"+test.err+"' expected")
}
})
}
}
// Test_CsvToLineProtocol_LogTableColumns checks correct logging of table columns
func Test_CsvToLineProtocol_LogTableColumns(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
log.SetPrefix(prefix)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
log.SetPrefix(oldPrefix)
}()
csv := "_measurement,a,b\ncpu,1,1\ncpu,b2\n"
reader := CsvToLineProtocol(strings.NewReader(csv)).LogTableColumns(true)
require.False(t, reader.skipRowOnError)
require.True(t, reader.logTableDataColumns)
// read all the data
ioutil.ReadAll(reader)
out := buf.String()
// fmt.Println(out)
messages := strings.Count(out, prefix)
require.Equal(t, messages, 1)
}
// Test_CsvToLineProtocol_LogTimeZoneWarning checks correct logging of timezone warning
func Test_CsvToLineProtocol_LogTimeZoneWarning(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
log.SetPrefix(prefix)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
log.SetPrefix(oldPrefix)
}()
csv := "#timezone 1\n" +
"#constant,dateTime:2006-01-02,1970-01-01\n" +
"_measurement,a,b\ncpu,1,1"
reader := CsvToLineProtocol(strings.NewReader(csv))
bytes, _ := ioutil.ReadAll(reader)
out := buf.String()
// fmt.Println(out) // "::PREFIX::WARNING: #timezone annotation: unknown time zone 1
messages := strings.Count(out, prefix)
require.Equal(t, messages, 1)
require.Equal(t, string(bytes), "cpu a=1,b=1 0\n")
}
// Test_CsvToLineProtocol_SkipRowOnError tests that error rows are skipped
func Test_CsvToLineProtocol_SkipRowOnError(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
log.SetPrefix(prefix)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
log.SetPrefix(oldPrefix)
}()
csv := "_measurement,a,_time\n,1,1\ncpu,2,2\ncpu,3,3a\n"
reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true)
require.Equal(t, reader.skipRowOnError, true)
require.Equal(t, reader.logTableDataColumns, false)
// read all the data
ioutil.ReadAll(reader)
out := buf.String()
// fmt.Println(out)
messages := strings.Count(out, prefix)
require.Equal(t, messages, 2)
}
// Test_CsvToLineProtocol_RowSkipped tests that error rows are reported to configured RowSkipped listener
func Test_CsvToLineProtocol_RowSkipped(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
}()
type ActualArguments = struct {
src *CsvToLineReader
err error
row []string
}
type ExpectedArguments = struct {
errorString string
row []string
}
csv := "sep=;\n_measurement;a|long:strict\n;1\ncpu;2.1\ncpu;3a\n"
calledArgs := []ActualArguments{}
expectedArgs := []ExpectedArguments{
{
"line 3: column '_measurement': no measurement supplied",
[]string{"", "1"},
},
{
"line 4: column 'a': '2.1' cannot fit into long data type",
[]string{"cpu", "2.1"},
},
{
"line 5: column 'a': strconv.ParseInt:",
[]string{"cpu", "3a"},
},
}
reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true)
reader.RowSkipped = func(src *CsvToLineReader, err error, _row []string) {
// make a copy of _row
row := make([]string, len(_row))
copy(row, _row)
// remember for comparison
calledArgs = append(calledArgs, ActualArguments{
src, err, row,
})
}
// read all the data
ioutil.ReadAll(reader)
out := buf.String()
require.Empty(t, out, "No log messages expected because RowSkipped handler is set")
require.Len(t, calledArgs, 3)
for i, expected := range expectedArgs {
require.Equal(t, reader, calledArgs[i].src)
require.Contains(t, calledArgs[i].err.Error(), expected.errorString)
require.Equal(t, expected.row, calledArgs[i].row)
}
}
// Test_CsvLineError tests CsvLineError error format
func Test_CsvLineError(t *testing.T) {
var tests = []struct {
err CsvLineError
value string
}{
{
CsvLineError{Line: 1, Err: errors.New("cause")},
"line 1: cause",
},
{
CsvLineError{Line: 2, Err: CsvColumnError{"a", errors.New("cause")}},
"line 2: column 'a': cause",
},
{
CsvLineError{Line: -1, Err: CsvColumnError{"a", errors.New("cause")}},
"column 'a': cause",
},
}
for _, test := range tests {
require.Equal(t, test.value, test.err.Error())
}
}
// Test_CsvToLineProtocol_lineNumbers tests that correct line numbers are reported
func Test_CsvToLineProtocol_lineNumbers(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
}()
type ActualArguments = struct {
src *CsvToLineReader
err error
row []string
}
type ExpectedArguments = struct {
errorString string
row []string
}
// note: csv contains a field with newline and an extra empty lines
csv := `sep=;
_measurement;a|long:strict
;1
cpu;"2
.1"
cpu;3a
`
calledArgs := []ActualArguments{}
expectedArgs := []ExpectedArguments{
{
"line 4: column '_measurement': no measurement supplied",
[]string{"", "1"},
},
{
"line 6: column 'a': '2\n.1' cannot fit into long data type",
[]string{"cpu", "2\n.1"},
},
{
"line 8: column 'a': strconv.ParseInt:",
[]string{"cpu", "3a"},
},
}
reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true)
reader.RowSkipped = func(src *CsvToLineReader, err error, _row []string) {
// make a copy of _row
row := make([]string, len(_row))
copy(row, _row)
// remember for comparison
calledArgs = append(calledArgs, ActualArguments{
src, err, row,
})
}
// read all the data
ioutil.ReadAll(reader)
out := buf.String()
require.Empty(t, out, "No log messages expected because RowSkipped handler is set")
require.Len(t, calledArgs, 3)
for i, expected := range expectedArgs {
require.Equal(t, reader, calledArgs[i].src)
require.Contains(t, calledArgs[i].err.Error(), expected.errorString)
require.Equal(t, expected.row, calledArgs[i].row)
}
// 8 lines were read
require.Equal(t, 8, reader.LineNumber)
}

View File

@ -0,0 +1,222 @@
package csv2lp
import (
"fmt"
"log"
"regexp"
"strconv"
"strings"
"time"
)
// annotationComment describes CSV annotation
type annotationComment struct {
// prefix in a CSV row that recognizes this annotation
prefix string
// flag is 0 to represent an annotation that is used for all data rows
// or a unique bit (>0) between supported annotation prefixes
flag uint8
// setupColumn setups metadata that drives the way of how column data
// are parsed, mandatory when flag > 0
setupColumn func(column *CsvTableColumn, columnValue string)
// setupTable setups metadata that drives the way of how the table data
// are parsed, mandatory when flag == 0
setupTable func(table *CsvTable, row []string) error
}
// isTableAnnotation returns true for a table-wide annotation, false for column-based annotations
func (a annotationComment) isTableAnnotation() bool {
return a.setupTable != nil
}
// matches tests whether an annotationComment can process the CSV comment row
func (a annotationComment) matches(comment string) bool {
return strings.HasPrefix(strings.ToLower(comment), a.prefix)
}
func createConstantOrConcatColumn(table *CsvTable, row []string, annotationName string) CsvTableColumn {
// adds a virtual column with constant value to all data rows
// supported types of constant annotation rows are:
// 1. "#constant,datatype,label,defaultValue"
// 2. "#constant,measurement,value"
// 3. "#constant,dateTime,value"
// 4. "#constant datatype,label,defaultValue"
// 5. "#constant measurement,value"
// 6. "#constant dateTime,value"
// defaultValue is optional, additional columns are ignored
col := CsvTableColumn{}
col.Index = -1 // this is a virtual column that never extracts data from data rows
// setup column data type
col.setupDataType(row[0])
var dataTypeIndex int
if len(col.DataType) == 0 && col.LinePart == 0 {
// type 1,2,3
dataTypeIndex = 1
if len(row) > 1 {
col.setupDataType(row[1])
}
} else {
// type 4,5,6
dataTypeIndex = 0
}
// setup label if available
if len(row) > dataTypeIndex+1 {
col.Label = row[dataTypeIndex+1]
}
// setup defaultValue if available
if len(row) > dataTypeIndex+2 {
col.DefaultValue = row[dataTypeIndex+2]
}
// support type 2,3,5,6 syntax for measurement and timestamp
if col.LinePart == linePartMeasurement || col.LinePart == linePartTime {
if col.DefaultValue == "" && col.Label != "" {
// type 2,3,5,6
col.DefaultValue = col.Label
col.Label = annotationName + " " + col.DataType
} else if col.Label == "" {
// setup a label if no label is supplied for focused error messages
col.Label = annotationName + " " + col.DataType
}
}
// add a virtual column to the table
return col
}
// constantSetupTable setups the supplied CSV table from #constant annotation
func constantSetupTable(table *CsvTable, row []string) error {
col := createConstantOrConcatColumn(table, row, "#constant")
// add a virtual column to the table
table.extraColumns = append(table.extraColumns, &col)
return nil
}
// computedReplacer is used to replace value in computed columns
var computedReplacer *regexp.Regexp = regexp.MustCompile(`\$\{[^}]+\}`)
// concatSetupTable setups the supplied CSV table from #concat annotation
func concatSetupTable(table *CsvTable, row []string) error {
col := createConstantOrConcatColumn(table, row, "#concat")
template := col.DefaultValue
col.ComputeValue = func(row []string) string {
return computedReplacer.ReplaceAllStringFunc(template, func(text string) string {
columnLabel := text[2 : len(text)-1] // ${columnLabel}
if placeholderColumn := table.Column(columnLabel); placeholderColumn != nil {
return placeholderColumn.Value(row)
}
log.Printf("WARNING: column %s: column '%s' cannot be replaced, no such column available", col.Label, columnLabel)
return ""
})
}
// add a virtual column to the table
table.extraColumns = append(table.extraColumns, &col)
// add validator to report error when no placeholder column is available
table.validators = append(table.validators, func(table *CsvTable) error {
placeholders := computedReplacer.FindAllString(template, len(template))
for _, placeholder := range placeholders {
columnLabel := placeholder[2 : len(placeholder)-1] // ${columnLabel}
if placeholderColumn := table.Column(columnLabel); placeholderColumn == nil {
return CsvColumnError{
Column: col.Label,
Err: fmt.Errorf("'%s' references an unknown column '%s', available columns are: %v",
template, columnLabel, strings.Join(table.ColumnLabels(), ",")),
}
}
}
return nil
})
return nil
}
// supportedAnnotations contains all supported CSV annotations comments
var supportedAnnotations = []annotationComment{
{
prefix: "#group",
flag: 1,
setupColumn: func(column *CsvTableColumn, value string) {
// standard flux query result annotation
if strings.HasSuffix(value, "true") {
// setup column's line part unless it is already set (#19452)
if column.LinePart == 0 {
column.LinePart = linePartTag
}
}
},
},
{
prefix: "#datatype",
flag: 2,
setupColumn: func(column *CsvTableColumn, value string) {
// standard flux query result annotation
column.setupDataType(value)
},
},
{
prefix: "#default",
flag: 4,
setupColumn: func(column *CsvTableColumn, value string) {
// standard flux query result annotation
column.DefaultValue = ignoreLeadingComment(value)
},
},
{
prefix: "#constant",
setupTable: constantSetupTable,
},
{
prefix: "#timezone",
setupTable: func(table *CsvTable, row []string) error {
// setup timezone for parsing timestamps, UTC by default
val := ignoreLeadingComment(row[0])
if val == "" && len(row) > 1 {
val = row[1] // #timezone,Local
}
tz, err := parseTimeZone(val)
if err != nil {
return fmt.Errorf("#timezone annotation: %v", err)
}
table.timeZone = tz
return nil
},
},
{
prefix: "#concat",
setupTable: concatSetupTable,
},
}
// ignoreLeadingComment returns a value without '#anyComment ' prefix
func ignoreLeadingComment(value string) string {
if len(value) > 0 && value[0] == '#' {
pos := strings.Index(value, " ")
if pos > 0 {
return strings.TrimLeft(value[pos+1:], " ")
}
return ""
}
return value
}
// parseTimeZone parses the supplied timezone from a string into a time.Location
//
// parseTimeZone("") // time.UTC
// parseTimeZone("local") // time.Local
// parseTimeZone("-0500") // time.FixedZone(-5*3600 + 0*60)
// parseTimeZone("+0200") // time.FixedZone(2*3600 + 0*60)
// parseTimeZone("EST") // time.LoadLocation("EST")
func parseTimeZone(val string) (*time.Location, error) {
switch {
case val == "":
return time.UTC, nil
case strings.ToLower(val) == "local":
return time.Local, nil
case val[0] == '-' || val[0] == '+':
if matched, _ := regexp.MatchString("[+-][0-9][0-9][0-9][0-9]", val); !matched {
return nil, fmt.Errorf("timezone '%s' is not +hhmm or -hhmm", val)
}
intVal, _ := strconv.Atoi(val)
offset := (intVal/100)*3600 + (intVal%100)*60
return time.FixedZone(val, offset), nil
default:
return time.LoadLocation(val)
}
}

View File

@ -0,0 +1,305 @@
package csv2lp
import (
"bytes"
"fmt"
"log"
"os"
"strconv"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func annotation(name string) annotationComment {
for _, a := range supportedAnnotations {
if a.prefix == name {
return a
}
}
panic("no annotation named " + name + " found!")
}
// Test_GroupAnnotation tests #group annotation
func Test_GroupAnnotation(t *testing.T) {
subject := annotation("#group")
require.True(t, subject.matches("#Group"))
require.False(t, subject.isTableAnnotation())
var tests = []struct {
value string
expect int
}{
{"#group true", linePartTag},
{"#group false", 0},
{"false", 0},
{"unknown", 0},
}
for _, test := range tests {
t.Run(test.value, func(t *testing.T) {
col := &CsvTableColumn{}
subject.setupColumn(col, test.value)
require.Equal(t, test.expect, col.LinePart)
})
}
}
// Test_DefaultAnnotation tests #default annotation
func Test_DefaultAnnotation(t *testing.T) {
subject := annotation("#default")
require.True(t, subject.matches("#Default"))
require.False(t, subject.isTableAnnotation())
var tests = []struct {
value string
expect string
}{
{"#default 1", "1"},
{"#default ", ""},
{"whatever", "whatever"},
}
for _, test := range tests {
t.Run(test.value, func(t *testing.T) {
col := &CsvTableColumn{}
subject.setupColumn(col, test.value)
require.Equal(t, test.expect, col.DefaultValue)
})
}
}
// Test_DatatypeAnnotation tests #datatype annotation
func Test_DatatypeAnnotation(t *testing.T) {
subject := annotation("#datatype")
require.True(t, subject.matches("#dataType"))
require.False(t, subject.isTableAnnotation())
var tests = []struct {
value string
expectType string
expectFormat string
expectLinePart int
}{
{"#datatype long", "long", "", 0},
{"#datatype ", "", "", 0},
{"#datatype measurement", "_", "", linePartMeasurement},
{"#datatype tag", "_", "", linePartTag},
{"#datatype field", "", "", linePartField},
{"dateTime", "dateTime", "", linePartTime},
{"dateTime:RFC3339", "dateTime", "RFC3339", linePartTime},
{"#datatype dateTime:RFC3339", "dateTime", "RFC3339", linePartTime},
{"whatever:format", "whatever", "format", 0},
}
for _, test := range tests {
t.Run(test.value, func(t *testing.T) {
col := &CsvTableColumn{}
subject.setupColumn(col, test.value)
if test.expectType != "_" {
require.Equal(t, test.expectType, col.DataType)
}
require.Equal(t, test.expectFormat, col.DataFormat)
})
}
}
// Test_ConstantAnnotation tests #constant annotation
func Test_ConstantAnnotation(t *testing.T) {
subject := annotation("#constant")
require.True(t, subject.matches("#Constant"))
require.True(t, subject.isTableAnnotation())
var tests = []struct {
value []string
expectLabel string
expectDefault string
expectLinePart int
}{
{[]string{"#constant "}, "", "", 0}, // means literally nothing
{[]string{"#constant measurement", "a"}, "_", "a", linePartMeasurement},
{[]string{"#constant measurement", "a", "b"}, "_", "b", linePartMeasurement},
{[]string{"#constant measurement", "a", ""}, "_", "a", linePartMeasurement},
{[]string{"#constant tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag},
{[]string{"#constant", "tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag},
{[]string{"#constant field", "fName", "fVal"}, "fName", "fVal", linePartField},
{[]string{"#constant", "field", "fName", "fVal"}, "fName", "fVal", linePartField},
{[]string{"dateTime", "1"}, "_", "1", linePartTime},
{[]string{"dateTime", "1", "2"}, "_", "2", linePartTime},
{[]string{"dateTime", "", "2"}, "_", "2", linePartTime},
{[]string{"dateTime", "3", ""}, "_", "3", linePartTime},
{[]string{"long", "fN", "fV"}, "fN", "fV", 0},
}
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
table := &CsvTable{}
subject.setupTable(table, test.value)
require.Equal(t, 1, len(table.extraColumns))
col := table.extraColumns[0]
require.Equal(t, test.expectLinePart, col.LinePart)
require.Greater(t, 0, col.Index)
if test.expectLabel != "_" {
require.Equal(t, test.expectLabel, col.Label)
} else {
require.NotEqual(t, "", col.Label)
}
require.Equal(t, test.expectDefault, col.DefaultValue)
})
}
}
// Test_ConcatAnnotation tests #concat annotation
func Test_ConcatAnnotation(t *testing.T) {
subject := annotation("#concat")
require.True(t, subject.matches("#Concat"))
require.True(t, subject.isTableAnnotation())
var tests = []struct {
value []string
expectLabel string
expectValue string
expectLinePart int
}{
// all possible specifications
{[]string{"#concat "}, "", "", 0}, // means literally nothing
{[]string{"#concat measurement", "a"}, "_", "a", linePartMeasurement},
{[]string{"#concat measurement", "a", "b"}, "_", "b", linePartMeasurement},
{[]string{"#concat measurement", "a", ""}, "_", "a", linePartMeasurement},
{[]string{"#concat tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag},
{[]string{"#concat", "tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag},
{[]string{"#concat field", "fName", "fVal"}, "fName", "fVal", linePartField},
{[]string{"#concat", "field", "fName", "fVal"}, "fName", "fVal", linePartField},
{[]string{"dateTime", "1"}, "_", "1", linePartTime},
{[]string{"dateTime", "1", "2"}, "_", "2", linePartTime},
{[]string{"dateTime", "", "2"}, "_", "2", linePartTime},
{[]string{"dateTime", "3", ""}, "_", "3", linePartTime},
{[]string{"long", "fN", "fV"}, "fN", "fV", 0},
// concat values
{[]string{"string", "fN", "$-${b}-${a}"}, "fN", "$-2-1", 0},
}
exampleRow := []string{"1", "2"}
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
table := &CsvTable{columns: []*CsvTableColumn{
{Label: "a", Index: 0},
{Label: "b", Index: 1},
}}
subject.setupTable(table, test.value)
// validator
require.Equal(t, 1, len(table.validators))
require.Equal(t, table.validators[0](table), nil)
// columns
require.Equal(t, 1, len(table.extraColumns))
col := table.extraColumns[0]
require.Equal(t, test.expectLinePart, col.LinePart)
require.Greater(t, 0, col.Index)
if test.expectLabel != "_" {
require.Equal(t, test.expectLabel, col.Label)
} else {
require.NotEqual(t, "", col.Label)
}
require.Equal(t, test.expectValue, col.Value(exampleRow))
})
}
t.Run("concat template references unknown column", func(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
log.SetPrefix(prefix)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
log.SetPrefix(oldPrefix)
}()
table := &CsvTable{columns: []*CsvTableColumn{
{Label: "x", Index: 0},
}}
subject.setupTable(table, []string{"string", "fN", "a${y}-${x}z"})
require.Equal(t, 1, len(table.validators))
require.NotNil(t, table.validators[0](table))
require.Equal(t,
"column 'fN': 'a${y}-${x}z' references an unknown column 'y', available columns are: x",
table.validators[0](table).Error())
// columns
require.Equal(t, 1, len(table.extraColumns))
col := table.extraColumns[0]
require.Greater(t, 0, col.Index)
require.Equal(t, "a-1z", col.Value(exampleRow))
// a warning is printed to console
require.Equal(t,
"::PREFIX::WARNING: column fN: column 'y' cannot be replaced, no such column available",
strings.TrimSpace(buf.String()))
})
}
// Test_TimeZoneAnnotation tests #timezone annotation
func Test_TimeZoneAnnotation(t *testing.T) {
subject := annotation("#timezone")
require.True(t, subject.matches("#timeZone"))
require.True(t, subject.isTableAnnotation())
var tests = []struct {
value string
err string
}{
{"#timezone ", ""},
{"#timezone EST", ""},
{"#timezone,EST", ""},
{"#timezone,+0100", ""},
{"#timezone,whatever", "#timezone annotation"},
}
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
table := &CsvTable{}
err := subject.setupTable(table, strings.Split(test.value, ","))
if test.err == "" {
require.Nil(t, err)
require.NotNil(t, table.timeZone != nil)
} else {
require.NotNil(t, err)
require.True(t, strings.Contains(fmt.Sprintf("%v", err), test.err))
}
})
}
}
// Test_ParseTimeZone tests parseTimeZone fn
func Test_ParseTimeZone(t *testing.T) {
now := time.Now()
_, localOffset := now.Zone()
var tests = []struct {
value string
offset int
}{
{"local", localOffset},
{"Local", localOffset},
{"-0000", 0},
{"+0000", 0},
{"-0100", -3600},
{"+0100", 3600},
{"+0130", 3600 + 3600/2},
{"", 0},
{"-01", -1},
{"0000", -1},
{"UTC", 0},
{"EST", -5 * 3600},
}
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
tz, err := parseTimeZone(test.value)
require.NotEqual(t, tz, err) // both cannot be nil
if err != nil {
require.Nil(t, tz)
// fmt.Println(err)
if test.offset >= 0 {
require.Fail(t, "offset expected")
}
return
}
require.NotNil(t, tz)
testDate := fmt.Sprintf("%d-%02d-%02d", now.Year(), now.Month(), now.Day())
result, err := time.ParseInLocation("2006-01-02", testDate, tz)
require.Nil(t, err)
_, offset := result.Zone()
require.Equal(t, test.offset, offset)
})
}
}

586
pkg/csv2lp/csv_table.go Normal file
View File

@ -0,0 +1,586 @@
package csv2lp
import (
"errors"
"fmt"
"log"
"sort"
"strings"
"time"
"unsafe"
)
// column labels used in flux CSV result
const (
labelFieldName = "_field"
labelFieldValue = "_value"
labelTime = "_time"
labelStart = "_start"
labelStop = "_stop"
labelMeasurement = "_measurement"
)
// types of columns with respect to line protocol
const (
linePartIgnored = iota + 1 // ignored in line protocol
linePartMeasurement
linePartTag
linePartField
linePartTime
)
// CsvTableColumn represents processing metadata about a csv column
type CsvTableColumn struct {
// Label is a column label from the header row, such as "_start", "_stop", "_time"
Label string
// DataType such as "string", "long", "dateTime" ...
DataType string
// DataFormat is a format of DataType, such as "RFC3339", "2006-01-02"
DataFormat string
// LinePart is a line part of the column (0 means not determined yet), see linePart constants
LinePart int
// DefaultValue is used when column's value is an empty string.
DefaultValue string
// Index of this column when reading rows, -1 indicates a virtual column with DefaultValue data
Index int
// TimeZone of dateTime column, applied when parsing dateTime DataType
TimeZone *time.Location
// ParseF is an optional function used to convert column's string value to interface{}
ParseF func(value string) (interface{}, error)
// ComputeValue is an optional function used to compute column value out of row data
ComputeValue func(row []string) string
// escapedLabel contains escaped label that can be directly used in line protocol
escapedLabel string
}
// LineLabel returns escaped name of the column so it can be then used as a tag name or field name in line protocol
func (c *CsvTableColumn) LineLabel() string {
if len(c.escapedLabel) > 0 {
return c.escapedLabel
}
return c.Label
}
// Value returns the value of the column for the supplied row
func (c *CsvTableColumn) Value(row []string) string {
if c.Index < 0 || c.Index >= len(row) {
if c.ComputeValue != nil {
return c.ComputeValue(row)
}
return c.DefaultValue
}
val := row[c.Index]
if len(val) > 0 {
return val
}
return c.DefaultValue
}
// setupDataType setups data type from the value supplied
//
// columnValue contains typeName and possibly additional column metadata,
// it can be
// 1. typeName
// 2. typeName:format
// 3. typeName|defaultValue
// 4. typeName:format|defaultValue
// 5. #anycomment (all options above)
func (c *CsvTableColumn) setupDataType(columnValue string) {
// ignoreLeadingComment is required to specify datatype together with CSV annotation
// in annotations (such as #constant)
columnValue = ignoreLeadingComment(columnValue)
// | adds a default value to column
pipeIndex := strings.Index(columnValue, "|")
if pipeIndex > 1 {
if c.DefaultValue == "" {
c.DefaultValue = columnValue[pipeIndex+1:]
columnValue = columnValue[:pipeIndex]
}
}
// setup column format
colonIndex := strings.Index(columnValue, ":")
if colonIndex > 1 {
c.DataFormat = columnValue[colonIndex+1:]
columnValue = columnValue[:colonIndex]
}
// setup column linePart depending dataType
switch {
case columnValue == "tag":
c.LinePart = linePartTag
case strings.HasPrefix(columnValue, "ignore"):
// ignore or ignored
c.LinePart = linePartIgnored
case columnValue == "dateTime":
// dateTime field is used at most once in a protocol line
c.LinePart = linePartTime
case columnValue == "measurement":
c.LinePart = linePartMeasurement
case columnValue == "field":
c.LinePart = linePartField
columnValue = "" // this a generic field without a data type specified
case columnValue == "time": // time is an alias for dateTime
c.LinePart = linePartTime
columnValue = dateTimeDatatype
default:
// nothing to do since we don't know the linePart yet
// the line part is decided in recomputeLineProtocolColumns
}
// setup column data type
c.DataType = columnValue
// setup custom parsing
if c.DataType == boolDatatype && c.DataFormat != "" {
c.ParseF = createBoolParseFn(c.DataFormat)
return
}
if c.DataType == longDatatype && strings.HasPrefix(c.DataFormat, "strict") {
c.ParseF = createStrictLongParseFn(c.DataFormat[6:])
return
}
if c.DataType == uLongDatatype && strings.HasPrefix(c.DataFormat, "strict") {
c.ParseF = createStrictUnsignedLongParseFn(c.DataFormat[6:])
return
}
}
// CsvColumnError indicates conversion error in a specific column
type CsvColumnError struct {
Column string
Err error
}
// Error interface implementation
func (e CsvColumnError) Error() string {
return fmt.Sprintf("column '%s': %v", e.Column, e.Err)
}
// CsvTable contains metadata about columns and a state of the CSV processing
type CsvTable struct {
// columns contains columns that extract values from data rows
columns []*CsvTableColumn
// partBits is a bitmap that is used to remember that a particular column annotation
// (#group, #datatype and #default) was already processed for the table;
// it is used to detect start of a new table in CSV flux results, a repeated annotation
// is detected and a new CsvTable can be then created
partBits uint8
// readTableData indicates that the table is ready to read table data, which
// is after reading annotation and header rows
readTableData bool
// lpColumnsValid indicates whether line protocol columns are valid or must be re-calculated from columns
lpColumnsValid bool
// extraColumns are added by table-wide annotations, such as #constant
extraColumns []*CsvTableColumn
// ignoreDataTypeInColumnName is true to skip parsing of data type as a part a column name
ignoreDataTypeInColumnName bool
// timeZone of dateTime column(s), applied when parsing dateTime value without a time zone specified
timeZone *time.Location
// validators validate table structure right before processing data rows
validators []func(*CsvTable) error
/* cached columns are initialized before reading the data rows using the computeLineProtocolColumns fn */
// cachedMeasurement is a required column that read (line protocol) measurement
cachedMeasurement *CsvTableColumn
// cachedTime is an optional column that reads timestamp of lp row
cachedTime *CsvTableColumn
// cachedFieldName is an optional column that reads a field name to add to the protocol line
cachedFieldName *CsvTableColumn
// cachedFieldValue is an optional column that reads a field value to add to the protocol line
cachedFieldValue *CsvTableColumn
// cachedFields are columns that read field values, a field name is taken from a column label
cachedFields []*CsvTableColumn
// cachedTags are columns that read tag values, a tag name is taken from a column label
cachedTags []*CsvTableColumn
}
// IgnoreDataTypeInColumnName sets a flag that can ignore dataType parsing in column names.
// When true, column names can then contain '|'. By default, column name can also contain datatype
// and a default value when named `name|datatype` or `name|datatype|default`,
// for example `ready|boolean|true`
func (t *CsvTable) IgnoreDataTypeInColumnName(val bool) {
t.ignoreDataTypeInColumnName = val
}
// DataColumnsInfo returns a string representation of columns that are used to process CSV data
func (t *CsvTable) DataColumnsInfo() string {
if t == nil {
return "<nil>"
}
var builder = strings.Builder{}
t.computeLineProtocolColumns() // censure that ached columns are initialized
builder.WriteString(fmt.Sprintf("CsvTable{ dataColumns: %d constantColumns: %d\n", len(t.columns), len(t.extraColumns)))
builder.WriteString(fmt.Sprintf(" measurement: %+v\n", t.cachedMeasurement))
for _, col := range t.cachedTags {
builder.WriteString(fmt.Sprintf(" tag: %+v\n", col))
}
for _, col := range t.cachedFields {
builder.WriteString(fmt.Sprintf(" field: %+v\n", col))
}
builder.WriteString(fmt.Sprintf(" time: %+v\n", t.cachedTime))
builder.WriteString("}")
return builder.String()
}
// NextTable resets the table to a state in which it expects annotations and header rows
func (t *CsvTable) NextTable() {
t.partBits = 0 // no column annotations parsed yet
t.readTableData = false
t.columns = []*CsvTableColumn{}
t.extraColumns = []*CsvTableColumn{}
}
// createColumns create a slice of CsvTableColumn for the supplied rowSize
func createColumns(rowSize int) []*CsvTableColumn {
retVal := make([]*CsvTableColumn, rowSize)
for i := 0; i < rowSize; i++ {
retVal[i] = &CsvTableColumn{
Index: i,
}
}
return retVal
}
// AddRow updates the state of the CSV table with a new header, annotation or data row.
// Returns true if the row is a data row.
func (t *CsvTable) AddRow(row []string) bool {
// detect data row or table header row
if len(row[0]) == 0 || row[0][0] != '#' {
if !t.readTableData {
// expect a header row
t.lpColumnsValid = false // line protocol columns change
if t.partBits == 0 {
// create columns since no column annotations were processed
t.columns = createColumns(len(row))
}
// assign column labels for the header row
for i := 0; i < len(t.columns); i++ {
col := t.columns[i]
if len(col.Label) == 0 && col.Index < len(row) {
col.Label = row[col.Index]
// assign column data type if possible
if len(col.DataType) == 0 && !t.ignoreDataTypeInColumnName {
if idx := strings.IndexByte(col.Label, '|'); idx != -1 {
col.setupDataType(col.Label[idx+1:])
col.Label = col.Label[:idx]
}
}
}
}
// header row is read, now expect data rows
t.readTableData = true
return false
}
return true
}
// process all supported annotations
for i := 0; i < len(supportedAnnotations); i++ {
supportedAnnotation := supportedAnnotations[i]
if supportedAnnotation.matches(row[0]) {
if len(row[0]) > len(supportedAnnotation.prefix) && row[0][len(supportedAnnotation.prefix)] != ' ' {
continue // ignoring, not a supported annotation
}
t.lpColumnsValid = false // line protocol columns change
if supportedAnnotation.isTableAnnotation() {
// process table-level annotation
if err := supportedAnnotation.setupTable(t, row); err != nil {
log.Println("WARNING: ", err)
}
return false
}
// invariant: !supportedAnnotation.isTableAnnotation()
if t.readTableData {
// any column annotation stops reading of data rows
t.NextTable()
}
// create new columns upon new or repeated column annotation
if t.partBits == 0 || t.partBits&supportedAnnotation.flag == 1 {
t.partBits = supportedAnnotation.flag
t.columns = createColumns(len(row))
} else {
t.partBits = t.partBits | supportedAnnotation.flag
}
// setup columns according to column annotation
for j := 0; j < len(t.columns); j++ {
col := t.columns[j]
if col.Index >= len(row) {
continue // missing value
} else {
supportedAnnotation.setupColumn(col, row[col.Index])
}
}
return false
}
}
// warn about unsupported annotation unless a comment row
if !strings.HasPrefix(row[0], "# ") {
log.Println("WARNING: unsupported annotation: ", row[0])
}
return false
}
// computeLineProtocolColumns computes columns that are
// used to create line protocol rows when required to do so
//
// returns true if new columns were initialized or false if there
// was no change in line protocol columns
func (t *CsvTable) computeLineProtocolColumns() bool {
if !t.lpColumnsValid {
t.recomputeLineProtocolColumns()
return true
}
return false
}
// recomputeLineProtocolColumns always computes the columns that are
// used to create line protocol rows
func (t *CsvTable) recomputeLineProtocolColumns() {
// reset results
t.cachedMeasurement = nil
t.cachedTime = nil
t.cachedFieldName = nil
t.cachedFieldValue = nil
t.cachedTags = nil
t.cachedFields = nil
// collect unique tag names (#19453)
var tags = make(map[string]*CsvTableColumn)
// having a _field column indicates fields without a line type are ignored
defaultIsField := t.Column(labelFieldName) == nil
// go over columns + extra columns
columns := make([]*CsvTableColumn, len(t.columns)+len(t.extraColumns))
copy(columns, t.columns)
copy(columns[len(t.columns):], t.extraColumns)
for i := 0; i < len(columns); i++ {
col := columns[i]
switch {
case col.Label == labelMeasurement || col.LinePart == linePartMeasurement:
t.cachedMeasurement = col
case col.Label == labelTime || col.LinePart == linePartTime:
if t.cachedTime != nil && t.cachedTime.Label != labelStart && t.cachedTime.Label != labelStop {
log.Printf("WARNING: at most one dateTime column is expected, '%s' column is ignored\n", t.cachedTime.Label)
}
t.cachedTime = col
case len(strings.TrimSpace(col.Label)) == 0 || col.LinePart == linePartIgnored:
// ignored columns that are marked to be ignored or without a label
case col.Label == labelFieldName:
t.cachedFieldName = col
case col.Label == labelFieldValue:
t.cachedFieldValue = col
case col.LinePart == linePartTag:
if val, found := tags[col.Label]; found {
log.Printf("WARNING: ignoring duplicate tag '%s' at column index %d, using column at index %d\n", col.Label, val.Index, col.Index)
}
col.escapedLabel = escapeTag(col.Label)
tags[col.Label] = col
case col.LinePart == linePartField:
col.escapedLabel = escapeTag(col.Label)
t.cachedFields = append(t.cachedFields, col)
default:
if defaultIsField {
col.escapedLabel = escapeTag(col.Label)
t.cachedFields = append(t.cachedFields, col)
}
}
}
// line protocol requires sorted unique tags
if len(tags) > 0 {
t.cachedTags = make([]*CsvTableColumn, 0, len(tags))
for _, v := range tags {
t.cachedTags = append(t.cachedTags, v)
}
sort.Slice(t.cachedTags, func(i, j int) bool {
return t.cachedTags[i].Label < t.cachedTags[j].Label
})
}
// setup timezone for timestamp column
if t.cachedTime != nil && t.cachedTime.TimeZone == nil {
t.cachedTime.TimeZone = t.timeZone
}
t.lpColumnsValid = true // line protocol columns are now fresh
}
// CreateLine produces a protocol line out of the supplied row or returns error
func (t *CsvTable) CreateLine(row []string) (line string, err error) {
buffer := make([]byte, 100)[:0]
buffer, err = t.AppendLine(buffer, row, -1)
if err != nil {
return "", err
}
return *(*string)(unsafe.Pointer(&buffer)), nil
}
// AppendLine appends a protocol line to the supplied buffer using a CSV row and returns appended buffer or an error if any
func (t *CsvTable) AppendLine(buffer []byte, row []string, lineNumber int) ([]byte, error) {
if t.computeLineProtocolColumns() {
// validate column data types
if t.cachedFieldValue != nil && !IsTypeSupported(t.cachedFieldValue.DataType) {
return buffer, CsvColumnError{
t.cachedFieldValue.Label,
fmt.Errorf("data type '%s' is not supported", t.cachedFieldValue.DataType),
}
}
for _, c := range t.cachedFields {
if !IsTypeSupported(c.DataType) {
return buffer, CsvColumnError{
c.Label,
fmt.Errorf("data type '%s' is not supported", c.DataType),
}
}
}
for _, v := range t.validators {
if err := v(t); err != nil {
return buffer, err
}
}
}
if t.cachedMeasurement == nil {
return buffer, errors.New("no measurement column found")
}
measurement := t.cachedMeasurement.Value(row)
if measurement == "" {
return buffer, CsvColumnError{
t.cachedMeasurement.Label,
errors.New("no measurement supplied"),
}
}
buffer = append(buffer, escapeMeasurement(measurement)...)
for _, tag := range t.cachedTags {
value := tag.Value(row)
if tag.Index < len(row) && len(value) > 0 {
buffer = append(buffer, ',')
buffer = append(buffer, tag.LineLabel()...)
buffer = append(buffer, '=')
buffer = append(buffer, escapeTag(value)...)
}
}
buffer = append(buffer, ' ')
fieldAdded := false
if t.cachedFieldName != nil && t.cachedFieldValue != nil {
field := t.cachedFieldName.Value(row)
value := t.cachedFieldValue.Value(row)
if len(value) > 0 && len(field) > 0 {
buffer = append(buffer, escapeTag(field)...)
buffer = append(buffer, '=')
var err error
buffer, err = appendConverted(buffer, value, t.cachedFieldValue, lineNumber)
if err != nil {
return buffer, CsvColumnError{
t.cachedFieldName.Label,
err,
}
}
fieldAdded = true
}
}
for _, field := range t.cachedFields {
value := field.Value(row)
if len(value) > 0 {
if !fieldAdded {
fieldAdded = true
} else {
buffer = append(buffer, ',')
}
buffer = append(buffer, field.LineLabel()...)
buffer = append(buffer, '=')
var err error
buffer, err = appendConverted(buffer, value, field, lineNumber)
if err != nil {
return buffer, CsvColumnError{
field.Label,
err,
}
}
}
}
if !fieldAdded {
return buffer, errors.New("no field data found")
}
if t.cachedTime != nil && t.cachedTime.Index < len(row) {
timeVal := t.cachedTime.Value(row)
if len(timeVal) > 0 {
if len(t.cachedTime.DataType) == 0 {
// assume dateTime data type (number or RFC3339)
t.cachedTime.DataType = dateTimeDatatype
t.cachedTime.DataFormat = ""
}
buffer = append(buffer, ' ')
var err error
buffer, err = appendConverted(buffer, timeVal, t.cachedTime, lineNumber)
if err != nil {
return buffer, CsvColumnError{
t.cachedTime.Label,
err,
}
}
}
}
return buffer, nil
}
// Column returns the first column of the supplied label or nil
func (t *CsvTable) Column(label string) *CsvTableColumn {
for i := 0; i < len(t.columns); i++ {
if t.columns[i].Label == label {
return t.columns[i]
}
}
return nil
}
// Columns returns available columns
func (t *CsvTable) Columns() []*CsvTableColumn {
return t.columns
}
// ColumnLabels returns available columns labels
func (t *CsvTable) ColumnLabels() []string {
labels := make([]string, len(t.columns))
for i, col := range t.columns {
labels[i] = col.Label
}
return labels
}
// Measurement returns measurement column or nil
func (t *CsvTable) Measurement() *CsvTableColumn {
t.computeLineProtocolColumns()
return t.cachedMeasurement
}
// Time returns time column or nil
func (t *CsvTable) Time() *CsvTableColumn {
t.computeLineProtocolColumns()
return t.cachedTime
}
// FieldName returns field name column or nil
func (t *CsvTable) FieldName() *CsvTableColumn {
t.computeLineProtocolColumns()
return t.cachedFieldName
}
// FieldValue returns field value column or nil
func (t *CsvTable) FieldValue() *CsvTableColumn {
t.computeLineProtocolColumns()
return t.cachedFieldValue
}
// Tags returns tags
func (t *CsvTable) Tags() []*CsvTableColumn {
t.computeLineProtocolColumns()
return t.cachedTags
}
// Fields returns fields
func (t *CsvTable) Fields() []*CsvTableColumn {
t.computeLineProtocolColumns()
return t.cachedFields
}

View File

@ -0,0 +1,566 @@
package csv2lp
import (
"encoding/csv"
"io"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func readCsv(t *testing.T, data string) [][]string {
reader := csv.NewReader(strings.NewReader(data))
var rows [][]string
for {
row, err := reader.Read()
reader.FieldsPerRecord = 0 // every row can have different number of fields
if err == io.EOF {
break
}
if err != nil {
t.Log("row: ", row)
t.Log(err)
t.Fail()
}
rows = append(rows, row)
}
return rows
}
// Test_CsvTable_FluxQueryResult tests construction of table columns and data from a Flux Query CSV result
func Test_CsvTable_FluxQueryResult(t *testing.T) {
const csvQueryResult = `
#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:17:57Z,0,time_steal,cpu,cpu1,rsavage.prod
,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:07Z,0,time_steal,cpu,cpu1,rsavage.prod
#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,_result,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:01Z,2.7263631815907954,usage_user,cpu,cpu-total,tahoecity.prod
,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:11Z,2.247752247752248,usage_user,cpu,cpu-total,tahoecity.prod
#unre`
var lineProtocolQueryResult = []string{
"cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669077000000000",
"cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669087000000000",
"cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.7263631815907954 1582669081000000000",
"cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.247752247752248 1582669091000000000",
}
table := CsvTable{}
rows := readCsv(t, csvQueryResult)
lineProtocolIndex := 0
for i, row := range rows {
rowProcessed := table.AddRow(row)
if i%6 < 4 {
require.Equal(t, rowProcessed, false, "row %d", i)
} else {
require.Equal(t, rowProcessed, true, "row %d", i)
line, _ := table.CreateLine(row)
require.Equal(t, lineProtocolQueryResult[lineProtocolIndex], line)
lineProtocolIndex++
if i%6 == 4 {
// verify table
require.GreaterOrEqual(t, len(table.columns), 10)
require.Equal(t, table.columns, table.Columns())
for j, col := range table.columns {
if j > 0 {
require.Equal(t, col.Index, j)
require.Equal(t, col.Label, rows[i-1][j])
if len(rows[i-2]) > j {
require.Equal(t, col.DefaultValue, rows[i-2][j])
} else {
// some traling data are missing
require.Equal(t, col.DefaultValue, "")
}
types := strings.Split(rows[i-3][j], ":")
require.Equal(t, types[0], col.DataType, "row %d, col %d", i-3, j)
if len(types) > 1 {
require.Equal(t, types[1], col.DataFormat, "row %d, col %d", i-3, j)
}
}
}
// verify cached values
table.computeLineProtocolColumns()
require.Equal(t, table.Column("_measurement"), table.cachedMeasurement)
require.Nil(t, table.Column("_no"))
require.NotNil(t, table.cachedMeasurement)
require.NotNil(t, table.cachedFieldName)
require.NotNil(t, table.cachedFieldValue)
require.NotNil(t, table.cachedTime)
require.NotNil(t, table.cachedTags)
require.Equal(t, table.Measurement().Label, "_measurement")
require.Equal(t, table.FieldName().Label, "_field")
require.Equal(t, table.FieldValue().Label, "_value")
require.Equal(t, table.Time().Label, "_time")
require.Equal(t, len(table.Tags()), 2)
require.Equal(t, table.Tags()[0].Label, "cpu")
require.Equal(t, table.Tags()[1].Label, "host")
require.Equal(t, len(table.Fields()), 0)
require.Contains(t, table.ColumnLabels(), "_measurement")
}
}
}
}
//Test_IgnoreLeadingComment tests ignoreLeadingComment fn
func Test_IgnoreLeadingComment(t *testing.T) {
var tests = []struct {
value string
expect string
}{
{"", ""},
{"a", "a"},
{" #whatever", " #whatever"},
{"#whatever", ""},
{"#whatever ", ""},
{"#whatever a b ", "a b "},
{"#whatever a b ", "a b "},
}
for _, test := range tests {
t.Run(test.value, func(t *testing.T) {
require.Equal(t, test.expect, ignoreLeadingComment(test.value))
})
}
}
// Test_CsvTableProcessing tests data processing in CsvTable
func Test_CsvTableProcessing(t *testing.T) {
var tests = []struct {
name string
csv string
line string
}{
{
"simple1",
"_measurement,a,b\ncpu,1,1",
"cpu a=1,b=1",
},
{
"simple1b",
"_measurement,,a,b\ncpu,whatever,1,1",
"cpu a=1,b=1",
},
{
"simple2",
"_measurement\ncpu,1,1",
"", // no fields present
},
{
"simple3",
"_time\n1,1",
"", // no measurement present
},
{
"annotated1",
"#datatype measurement,,\nmeasurement,a,b\ncpu,1,2",
"cpu a=1,b=2",
},
{
"annotated2",
"#datatype measurement,tag,field\nmeasurement,a,b\ncpu,1,2",
"cpu,a=1 b=2",
},
{
"annotated3",
"#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2,3",
"cpu,a=1 time=3 2",
},
{
"annotated3_detectedTime1",
"#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2020-01-10T10:10:10Z,3",
"cpu,a=1 time=3 1578651010000000000",
},
{
"annotated3_detectedTime2",
"#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2020-01-10T10:10:10.0Z,3",
"cpu,a=1 time=3 1578651010000000000",
},
{
"annotated4",
"#datatype measurement,tag,ignore,field\nmeasurement,a,b,time\ncpu,1,2,3",
"cpu,a=1 time=3",
},
{
"annotated5",
"#datatype measurement,tag,ignore,field\nmeasurement,a,b,time\ncpu,1,2,3",
"cpu,a=1 time=3",
},
{
"annotated6",
"#datatype measurement,tag,ignore,field\n" +
"#datatypea tag,tag,\n" + // this must be ignored since it not a supported annotation
"measurement,a,b,time\ncpu,1,2,3",
"cpu,a=1 time=3",
},
{
"annotated7",
"#datatype measurement,dateTime,\nmeasurement,a,b\ncpu,2020-01-10T10:10:10.0Z,2",
"cpu b=2 1578651010000000000",
},
{
"annotated8",
"#datatype measurement,,,field\nmeasurement,_field,_value,other\ncpu,a,1,2",
"cpu a=1,other=2",
},
{
"annotated9_sortedTags",
"#datatype measurement,tag,tag,time,field\nmeasurement,b,a,c,time\ncpu,1,2,3,4",
"cpu,a=2,b=1 time=4 3",
},
{
"allFieldTypes",
"#datatype measurement,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime\n" +
"m,s,d,b,l,ul,dur,by,d1,d2,time\n" +
`cpu,"str",1.0,true,1,1,1ms,YWFh,1`,
"cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1",
},
{
"allFieldTypes",
"#datatype measurement,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime\n" +
"m,s,d,b,l,ul,dur,by,d1,d2,time\n" +
`cpu,"str",1.0,true,1,1,1ms,YWFh,1`,
"cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1",
},
{
"allFieldTypes_ignoreAdditionalDateTimes",
"#datatype ,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime:RFC3339,dateTime:RFC3339Nano,\n" +
"_measurement,s,d,b,l,ul,dur,by,d1,d2,_time\n" +
`cpu,"str",1.0,true,1,1,1ms,YWFh,2020-01-10T10:10:10Z,2020-01-10T10:10:10Z,1`,
"cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1",
},
{
"allExtraDataTypes",
"#datatype measurement,tag,field,ignored,dateTime\n" +
"m,t,f,i,dt\n" +
`cpu,myTag,0,myIgnored,1`,
"cpu,t=myTag f=0 1",
},
{
"allTypes_escaped",
"#datatype ,string,string,,,,\n" +
`_measurement,s1,s2,"a,","b ",c=` + "\n" +
`"cpu, ","""",\,a,b,c`,
`cpu\,\ s1="\"",s2="\\",a\,=a,b\ =b,c\==c`,
},
{
"default_values",
"#default cpu,yes,0,1\n#datatype ,tag,,\n_measurement,test,col1,_time\n,,,",
"cpu,test=yes col1=0 1",
},
{
"no duplicate tags", // duplicate tags are ignored, the last column wins, https://github.com/influxdata/influxdb/issues/19453
"#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string,string,string,string,string,string,string\n" +
"#group,true,true,false,false,false,false,true,true,true,true,true,true,true,true,true,true\n" +
"#default,_result,,,,,,,,,,,,,,,\n" +
",result,table,_start,_stop,_time,_value,_field,_measurement,env,host,hostname,nodename,org,result,table,url\n" +
",,0,2020-08-26T23:10:54.023607624Z,2020-08-26T23:15:54.023607624Z,2020-08-26T23:11:00Z,0,0.001,something,host,pod,node,host,,success,role,http://127.0.0.1:8099/metrics\n",
"something,env=host,host=pod,hostname=node,nodename=host,result=success,table=role,url=http://127.0.0.1:8099/metrics 0.001=0 1598483460000000000",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
rows := readCsv(t, test.csv)
table := CsvTable{}
var lines []string
for _, row := range rows {
rowProcessed := table.AddRow(row)
if rowProcessed {
line, err := table.CreateLine(row)
if err != nil && test.line != "" {
require.Nil(t, err.Error())
}
lines = append(lines, line)
}
}
require.Equal(t, []string{test.line}, lines)
})
}
}
// Test_ConstantAnnotations tests processing of constant annotations
func Test_ConstantAnnotations(t *testing.T) {
var tests = []struct {
name string
csv string
line string
}{
{
"measurement_1",
"#constant measurement,cpu\n" +
"a,b\n" +
"1,1",
"cpu a=1,b=1",
},
{
"measurement_2",
"#constant,measurement,,cpu\n" +
"#constant,tag,cpu,cpu1\n" +
"#constant,long,of,0\n" +
"#constant,dateTime,,2\n" +
"a,b\n" +
"1,1",
"cpu,cpu=cpu1 a=1,b=1,of=0i 2",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
rows := readCsv(t, test.csv)
table := CsvTable{}
var lines []string
for _, row := range rows {
rowProcessed := table.AddRow(row)
if rowProcessed {
line, err := table.CreateLine(row)
if err != nil && test.line != "" {
require.Nil(t, err.Error())
}
lines = append(lines, line)
}
}
require.Equal(t, []string{test.line}, lines)
})
}
}
// Test_ConcatAnnotations tests processing of concat annotations
func Test_ConcatAnnotations(t *testing.T) {
var tests = []struct {
name string
csv string
line string
}{
{
"measurement_1",
"#concat measurement,cpu\n" +
"a,b\n" +
"1,1",
"cpu a=1,b=1",
},
{
"measurement_2",
"#concat,measurement,${a}${b}\n" +
"#constant,tag,cpu,cpu1\n" +
"#constant,long,of,0\n" +
"#constant,dateTime,,2\n" +
"a,b\n" +
"1,1",
"11,cpu=cpu1 a=1,b=1,of=0i 2",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
rows := readCsv(t, test.csv)
table := CsvTable{}
var lines []string
for _, row := range rows {
rowProcessed := table.AddRow(row)
if rowProcessed {
line, err := table.CreateLine(row)
if err != nil && test.line != "" {
require.Nil(t, err.Error())
}
lines = append(lines, line)
}
}
require.Equal(t, []string{test.line}, lines)
})
}
}
// Test_DataTypeInColumnName tests specification of column data type in the header row
func Test_DataTypeInColumnName(t *testing.T) {
var tests = []struct {
csv string
line string
ignoreDataTypeInColumnName bool
error string
}{
{
csv: "m|measurement,b|boolean:x:,c|boolean:x:|x\n" +
"cpu,,",
line: `cpu c=true`,
},
{
csv: "m|measurement,a|boolean,b|boolean:0:1,c|boolean:x:,d|boolean:x:\n" +
"cpu,1,1,x,y",
line: `cpu a=true,b=false,c=true,d=false`,
},
{
csv: "#constant measurement,cpu\n" +
"a|long,b|string\n" +
"1,1",
line: `cpu a=1i,b="1"`,
},
{
csv: "#constant measurement,cpu\n" +
"a|long,b|string\n" +
"1,1",
line: `cpu a|long=1,b|string=1`,
ignoreDataTypeInColumnName: true,
},
{
csv: "#constant measurement,cpu\n" +
"#datatype long,string\n" +
"a|long,b|string\n" +
"1,1",
line: `cpu a|long=1i,b|string="1"`,
ignoreDataTypeInColumnName: true,
},
{
csv: "#constant measurement,cpu\n" +
"a|long:strict: ,b|unsignedLong:strict: \n" +
"1 2,1 2",
line: `cpu a=12i,b=12u`,
},
{
csv: "#constant measurement,cpu\n" +
"a|long:strict\n" +
"1.1,1",
error: "column 'a': '1.1' cannot fit into long data type",
},
{
csv: "#constant measurement,cpu\n" +
"a|unsignedLong:strict\n" +
"1.1,1",
error: "column 'a': '1.1' cannot fit into unsignedLong data type",
},
}
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
rows := readCsv(t, test.csv)
table := CsvTable{}
table.IgnoreDataTypeInColumnName(test.ignoreDataTypeInColumnName)
var lines []string
for _, row := range rows {
rowProcessed := table.AddRow(row)
if rowProcessed {
line, err := table.CreateLine(row)
if err != nil {
if test.error == "" {
require.Nil(t, err.Error())
} else {
require.Equal(t, test.error, err.Error())
}
}
lines = append(lines, line)
}
}
require.Equal(t, []string{test.line}, lines)
})
}
}
// Test_CsvTable_dataErrors tests reporting of table data errors
func Test_CsvTable_dataErrors(t *testing.T) {
var tests = []struct {
name string
csv string
}{
{
"error_1_is_not_dateTime:RFC3339",
"#datatype measurement,,\n#datatype ,dateTime:RFC3339,\nmeasurement,a,b\ncpu,1,2",
},
{
"error_a_fieldValue_is_not_long",
"#datatype measurement,,\n#datatype ,long,\nmeasurement,_value,_field\ncpu,a,count",
},
{
"error_a_is_not_long",
"#datatype measurement,,\n#datatype ,long,\nmeasurement,a,b\ncpu,a,2",
},
{
"error_time_is_not_time",
"#datatype measurement,tag,time,field\nmeasurement,a,b,time\ncpu,1,2020-10,3",
},
{
"error_no_measurement",
"#datatype ,\ncol1,col2\n1,2",
},
{
"error_unsupportedFieldDataType",
"#datatype ,whatever\n_measurement,col2\na,2",
},
{
"error_unsupportedFieldValueDataType",
"#datatype ,,whatever\n_measurement,_field,_value\na,1,2",
},
{
"error_no_measurement_data",
"_measurement,col1\n,2",
},
{
"error_derived_column_missing reference",
"#concat string,d,${col1}${col2}\n_measurement,col1\nm,2",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
rows := readCsv(t, test.csv)
table := CsvTable{}
var errors []error
for _, row := range rows {
rowProcessed := table.AddRow(row)
if rowProcessed {
_, err := table.CreateLine(row)
if err != nil {
errors = append(errors, err)
}
}
}
require.Equal(t, 1, len(errors))
// fmt.Println(errors[0])
require.NotNil(t, errors[0].Error())
// LineLabel is the same as Label in all tested columns
for _, col := range table.Columns() {
require.Equal(t, col.Label, col.LineLabel())
}
})
}
}
// Test_CsvTable_DataColumnsInfo tests reporting of table columns
func Test_CsvTable_DataColumnsInfo(t *testing.T) {
data := "#constant,measurement,cpu\n" +
"#constant,tag,xpu,xpu1\n" +
"#constant,tag,cpu,cpu1\n" +
"#constant,long,of,100\n" +
"#constant,dateTime,2\n" +
"x,y\n"
table := CsvTable{}
for _, row := range readCsv(t, data) {
require.False(t, table.AddRow(row))
}
table.computeLineProtocolColumns()
// expected result is something like this:
// "CsvTable{ dataColumns: 2 constantColumns: 5\n" +
// " measurement: &{Label:#constant measurement DataType:measurement DataFormat: LinePart:2 DefaultValue:cpu Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:}\n" +
// " tag: {Label:cpu DataType:tag DataFormat: LinePart:3 DefaultValue:cpu1 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:cpu}\n" +
// " tag: {Label:xpu DataType:tag DataFormat: LinePart:3 DefaultValue:xpu1 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:xpu}\n" +
// " field: {Label:x DataType: DataFormat: LinePart:0 DefaultValue: Index:0 TimeZone:UTC ParseF:<nil> escapedLabel:x}\n" +
// " field: {Label:y DataType: DataFormat: LinePart:0 DefaultValue: Index:1 TimeZone:UTC ParseF:<nil> escapedLabel:y}\n" +
// " field: {Label:of DataType:long DataFormat: LinePart:0 DefaultValue:100 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:of}\n" +
// " time: &{Label:#constant dateTime DataType:dateTime DataFormat: LinePart:5 DefaultValue:2 Index:-1 TimeZone:UTC ParseF:<nil> escapedLabel:}" +
// "\n}"
result := table.DataColumnsInfo()
require.Equal(t, 1, strings.Count(result, "measurement:"))
require.Equal(t, 2, strings.Count(result, "tag:"))
require.Equal(t, 3, strings.Count(result, "field:"))
require.Equal(t, 1, strings.Count(result, "time:"))
var table2 *CsvTable
require.Equal(t, "<nil>", table2.DataColumnsInfo())
}

View File

@ -0,0 +1,341 @@
package csv2lp
import (
"encoding/base64"
"errors"
"fmt"
"io"
"log"
"math"
"strconv"
"strings"
"time"
"golang.org/x/text/encoding/ianaindex"
)
// see https://docs.influxdata.com/influxdb/latest/reference/syntax/annotated-csv/#data-types
const (
stringDatatype = "string"
doubleDatatype = "double"
boolDatatype = "boolean"
longDatatype = "long"
uLongDatatype = "unsignedLong"
durationDatatype = "duration"
base64BinaryDataType = "base64Binary"
dateTimeDatatype = "dateTime"
)
// predefined dateTime formats
const (
RFC3339 = "RFC3339"
RFC3339Nano = "RFC3339Nano"
dataFormatNumber = "number" //the same as long, but serialized without i suffix, used for timestamps
)
var supportedDataTypes map[string]struct{}
func init() {
supportedDataTypes = make(map[string]struct{}, 9)
supportedDataTypes[stringDatatype] = struct{}{}
supportedDataTypes[doubleDatatype] = struct{}{}
supportedDataTypes[boolDatatype] = struct{}{}
supportedDataTypes[longDatatype] = struct{}{}
supportedDataTypes[uLongDatatype] = struct{}{}
supportedDataTypes[durationDatatype] = struct{}{}
supportedDataTypes[base64BinaryDataType] = struct{}{}
supportedDataTypes[dateTimeDatatype] = struct{}{}
supportedDataTypes[""] = struct{}{}
}
// IsTypeSupported returns true if the data type is supported
func IsTypeSupported(dataType string) bool {
_, supported := supportedDataTypes[dataType]
return supported
}
var replaceMeasurement *strings.Replacer = strings.NewReplacer(",", "\\,", " ", "\\ ")
var replaceTag *strings.Replacer = strings.NewReplacer(",", "\\,", " ", "\\ ", "=", "\\=")
var replaceQuoted *strings.Replacer = strings.NewReplacer("\"", "\\\"", "\\", "\\\\")
func escapeMeasurement(val string) string {
for i := 0; i < len(val); i++ {
if val[i] == ',' || val[i] == ' ' {
return replaceMeasurement.Replace(val)
}
}
return val
}
func escapeTag(val string) string {
for i := 0; i < len(val); i++ {
if val[i] == ',' || val[i] == ' ' || val[i] == '=' {
return replaceTag.Replace(val)
}
}
return val
}
func escapeString(val string) string {
for i := 0; i < len(val); i++ {
if val[i] == '"' || val[i] == '\\' {
return replaceQuoted.Replace(val)
}
}
return val
}
// normalizeNumberString normalizes the supplied value according to the supplied format.
// This normalization is intended to convert number strings of different locales to a strconv-parsable value.
//
// The format's first character is a fraction delimiter character. Next characters in the format
// are simply removed, they are typically used to visually separate groups in large numbers.
// The removeFraction parameter controls whether the returned value can contain also the fraction part.
// An empty format means ". \n\t\r_"
//
// For example, to get a strconv-parsable float from a Spanish value '3.494.826.157,123', use format ",." .
func normalizeNumberString(value string, format string, removeFraction bool) (normalized string, truncated bool) {
if len(format) == 0 {
format = ". \n\t\r_"
}
if strings.ContainsAny(value, format) {
formatRunes := []rune(format)
fractionRune := formatRunes[0]
ignored := formatRunes[1:]
retVal := strings.Builder{}
retVal.Grow(len(value))
ForAllCharacters:
for _, c := range value {
// skip ignored characters
for i := 0; i < len(ignored); i++ {
if c == ignored[i] {
continue ForAllCharacters
}
}
if c == fractionRune {
if removeFraction {
return retVal.String(), true
}
retVal.WriteByte('.')
continue
}
retVal.WriteRune(c)
}
return retVal.String(), false
}
return value, false
}
func toTypedValue(val string, column *CsvTableColumn, lineNumber int) (interface{}, error) {
dataType := column.DataType
dataFormat := column.DataFormat
if column.ParseF != nil {
return column.ParseF(val)
}
switch dataType {
case stringDatatype:
return val, nil
case dateTimeDatatype:
switch dataFormat {
case "": // number or time.RFC3339
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return time.Parse(time.RFC3339, val)
}
return time.Unix(0, t).UTC(), nil
case RFC3339:
return time.Parse(time.RFC3339, val)
case RFC3339Nano:
return time.Parse(time.RFC3339Nano, val)
case dataFormatNumber:
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, err
}
return time.Unix(0, t).UTC(), nil
default:
if column.TimeZone != nil {
return time.ParseInLocation(dataFormat, val, column.TimeZone)
}
return time.Parse(dataFormat, val)
}
case durationDatatype:
return time.ParseDuration(val)
case doubleDatatype:
normalized, _ := normalizeNumberString(val, dataFormat, false)
return strconv.ParseFloat(normalized, 64)
case boolDatatype:
switch {
case len(val) == 0:
return nil, errors.New("Unsupported boolean value '" + val + "' , first character is expected to be 't','f','0','1','y','n'")
case val[0] == 't' || val[0] == 'T' || val[0] == 'y' || val[0] == 'Y' || val[0] == '1':
return true, nil
case val[0] == 'f' || val[0] == 'F' || val[0] == 'n' || val[0] == 'N' || val[0] == '0':
return false, nil
default:
return nil, errors.New("Unsupported boolean value '" + val + "' , first character is expected to be 't','f','0','1','y','n'")
}
case longDatatype:
normalized, truncated := normalizeNumberString(val, dataFormat, true)
if truncated {
error := CreateRowColumnError(lineNumber, column.Label,
fmt.Errorf("'%s' truncated to '%s' to fit into long data type", val, normalized))
log.Printf("WARNING: %v\n", error)
}
return strconv.ParseInt(normalized, 10, 64)
case uLongDatatype:
normalized, truncated := normalizeNumberString(val, dataFormat, true)
if truncated {
error := CreateRowColumnError(lineNumber, column.Label,
fmt.Errorf("'%s' truncated to '%s' to fit into unsignedLong data type", val, normalized))
log.Printf("WARNING: %v\n", error)
}
return strconv.ParseUint(normalized, 10, 64)
case base64BinaryDataType:
return base64.StdEncoding.DecodeString(val)
default:
return nil, fmt.Errorf("unsupported data type '%s'", dataType)
}
}
func appendProtocolValue(buffer []byte, value interface{}) ([]byte, error) {
switch v := value.(type) {
case uint64:
return append(strconv.AppendUint(buffer, v, 10), 'u'), nil
case int64:
return append(strconv.AppendInt(buffer, v, 10), 'i'), nil
case int:
return append(strconv.AppendInt(buffer, int64(v), 10), 'i'), nil
case float64:
if math.IsNaN(v) {
return buffer, errors.New("value is NaN")
}
if math.IsInf(v, 0) {
return buffer, errors.New("value is Infinite")
}
return strconv.AppendFloat(buffer, v, 'f', -1, 64), nil
case float32:
v32 := float64(v)
if math.IsNaN(v32) {
return buffer, errors.New("value is NaN")
}
if math.IsInf(v32, 0) {
return buffer, errors.New("value is Infinite")
}
return strconv.AppendFloat(buffer, v32, 'f', -1, 64), nil
case string:
buffer = append(buffer, '"')
buffer = append(buffer, escapeString(v)...)
buffer = append(buffer, '"')
return buffer, nil
case []byte:
buf := make([]byte, base64.StdEncoding.EncodedLen(len(v)))
base64.StdEncoding.Encode(buf, v)
return append(buffer, buf...), nil
case bool:
if v {
return append(buffer, "true"...), nil
}
return append(buffer, "false"...), nil
case time.Time:
return strconv.AppendInt(buffer, v.UnixNano(), 10), nil
case time.Duration:
return append(strconv.AppendInt(buffer, v.Nanoseconds(), 10), 'i'), nil
default:
return buffer, fmt.Errorf("unsupported value type: %T", v)
}
}
func appendConverted(buffer []byte, val string, column *CsvTableColumn, lineNumber int) ([]byte, error) {
if len(column.DataType) == 0 { // keep the value as it is
return append(buffer, val...), nil
}
typedVal, err := toTypedValue(val, column, lineNumber)
if err != nil {
return buffer, err
}
return appendProtocolValue(buffer, typedVal)
}
func decodeNop(reader io.Reader) io.Reader {
return reader
}
// CreateDecoder creates a decoding reader from the supplied encoding to UTF-8, or returns an error
func CreateDecoder(encoding string) (func(io.Reader) io.Reader, error) {
if len(encoding) > 0 && encoding != "UTF-8" {
enc, err := ianaindex.IANA.Encoding(encoding)
if err != nil {
return nil, fmt.Errorf("%v, see https://www.iana.org/assignments/character-sets/character-sets.xhtml", err)
}
if enc == nil {
return nil, fmt.Errorf("unsupported encoding: %s", encoding)
}
return enc.NewDecoder().Reader, nil
}
return decodeNop, nil
}
// createBoolParseFn returns a function that converts a string value to boolean according to format "true,yes,1:false,no,0"
func createBoolParseFn(format string) func(string) (interface{}, error) {
var err error = nil
truthy := []string{}
falsy := []string{}
if !strings.Contains(format, ":") {
err = fmt.Errorf("unsupported boolean format: %s should be in 'true,yes,1:false,no,0' format, but no ':' is present", format)
} else {
colon := strings.Index(format, ":")
t := format[:colon]
f := format[colon+1:]
if t != "" {
truthy = strings.Split(t, ",")
}
if f != "" {
falsy = strings.Split(f, ",")
}
}
return func(val string) (interface{}, error) {
if err != nil {
return nil, err
}
for _, s := range falsy {
if s == val {
return false, nil
}
}
for _, s := range truthy {
if s == val {
return true, nil
}
}
if len(falsy) == 0 {
return false, nil
}
if len(truthy) == 0 {
return true, nil
}
return nil, fmt.Errorf("unsupported boolean value: %s must one of %v or one of %v", val, truthy, falsy)
}
}
// createStrictLongParseFn returns a function that converts a string value to long and fails also when a fraction digit is detected
func createStrictLongParseFn(dataFormat string) func(string) (interface{}, error) {
return func(val string) (interface{}, error) {
normalized, truncated := normalizeNumberString(val, dataFormat, true)
if truncated {
return 0, fmt.Errorf("'%s' cannot fit into long data type", val)
}
return strconv.ParseInt(normalized, 10, 64)
}
}
// createStrictUnsignedLongParseFn returns a function that converts a string value to unsigned long and fails when a fraction digit is detected
func createStrictUnsignedLongParseFn(dataFormat string) func(string) (interface{}, error) {
return func(val string) (interface{}, error) {
normalized, truncated := normalizeNumberString(val, dataFormat, true)
if truncated {
return 0, fmt.Errorf("'%s' cannot fit into unsignedLong data type", val)
}
return strconv.ParseUint(normalized, 10, 64)
}
}

View File

@ -0,0 +1,350 @@
package csv2lp
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"math"
"os"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
)
// Test_EscapeMeasurement tests escapeMeasurement function
func Test_escapeMeasurement(t *testing.T) {
var tests = []struct {
value string
expect string
}{
{"a", "a"}, {"", ""},
{"a,", `a\,`},
{"a ", `a\ `},
{"a=", `a=`},
}
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, test.expect, escapeMeasurement(test.value))
})
}
}
// Test_EscapeTag tests escapeTag function
func Test_EscapeTag(t *testing.T) {
var tests = []struct {
value string
expect string
}{
{"a", "a"}, {"", ""},
{"a,", `a\,`},
{"a ", `a\ `},
{"a=", `a\=`},
}
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, test.expect, escapeTag(test.value))
})
}
}
// Test_EscapeString tests escapeString function
func Test_EscapeString(t *testing.T) {
var tests = []struct {
value string
expect string
}{
{"a", `a`}, {"", ``},
{`a"`, `a\"`},
{`a\`, `a\\`},
}
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, test.expect, escapeString(test.value))
})
}
}
// Test_ToTypedValue tests toTypedValue function
func Test_ToTypedValue(t *testing.T) {
epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z")
var tests = []struct {
dataType string
value string
expect interface{}
}{
{"string", "a", "a"},
{"double", "1.0", float64(1.0)},
{"boolean", "true", true},
{"boolean", "True", true},
{"boolean", "y", true},
{"boolean", "Yes", true},
{"boolean", "1", true},
{"boolean", "false", false},
{"boolean", "False", false},
{"boolean", "n", false},
{"boolean", "No", false},
{"boolean", "0", false},
{"boolean", "", nil},
{"boolean", "?", nil},
{"long", "1", int64(1)},
{"unsignedLong", "1", uint64(1)},
{"duration", "1ns", time.Duration(1)},
{"base64Binary", "YWFh", []byte("aaa")},
{"dateTime:RFC3339", "1970-01-01T00:00:00Z", epochTime},
{"dateTime:RFC3339Nano", "1970-01-01T00:00:00.0Z", epochTime},
{"dateTime:RFC3339", "1970-01-01T00:00:00.000000001Z", epochTime.Add(time.Duration(1))},
{"dateTime:RFC3339Nano", "1970-01-01T00:00:00.000000002Z", epochTime.Add(time.Duration(2))},
{"dateTime:number", "3", epochTime.Add(time.Duration(3))},
{"dateTime", "4", epochTime.Add(time.Duration(4))},
{"dateTime:2006-01-02", "1970-01-01", epochTime},
{"dateTime", "1970-01-01T00:00:00Z", epochTime},
{"dateTime", "1970-01-01T00:00:00.000000001Z", epochTime.Add(time.Duration(1))},
{"double:, .", "200 100.299,0", float64(200100299.0)},
{"long:, .", "200 100.299,0", int64(200100299)},
{"unsignedLong:, .", "200 100.299,0", uint64(200100299)},
{"u.type", "", nil},
}
for i, test := range tests {
t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) {
column := &CsvTableColumn{Label: "test"}
column.setupDataType(test.dataType)
val, err := toTypedValue(test.value, column, 1)
if err != nil && test.expect != nil {
require.Nil(t, err.Error())
}
require.Equal(t, test.expect, val)
})
}
}
// Test_ToTypedValue_dateTimeCustomTimeZone tests custom timezone when calling toTypedValue function
func Test_ToTypedValue_dateTimeCustomTimeZone(t *testing.T) {
epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z")
tz, _ := parseTimeZone("-0100")
var tests = []struct {
dataType string
value string
expect interface{}
}{
{"dateTime:RFC3339", "1970-01-01T00:00:00Z", epochTime},
{"dateTime:RFC3339Nano", "1970-01-01T00:00:00.0Z", epochTime},
{"dateTime:number", "3", epochTime.Add(time.Duration(3))},
{"dateTime:2006-01-02", "1970-01-01", epochTime.Add(time.Hour)},
}
for i, test := range tests {
t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) {
column := &CsvTableColumn{}
column.TimeZone = tz
column.setupDataType(test.dataType)
val, err := toTypedValue(test.value, column, 1)
if err != nil && test.expect != nil {
require.Nil(t, err.Error())
}
if test.expect == nil {
require.Equal(t, test.expect, val)
} else {
expectTime := test.expect.(time.Time)
time := val.(time.Time)
require.True(t, expectTime.Equal(time))
}
})
}
}
// Test_WriteProtocolValue tests writeProtocolValue function
func Test_AppendProtocolValue(t *testing.T) {
epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z")
var tests = []struct {
value interface{}
expect string
}{
{uint64(1), "1u"},
{int64(1), "1i"},
{int(1), "1i"},
{float64(1.1), "1.1"},
{math.NaN(), ""},
{math.Inf(1), ""},
{float32(1), "1"},
{float32(math.NaN()), ""},
{float32(math.Inf(1)), ""},
{"a", `"a"`},
{[]byte("aaa"), "YWFh"},
{true, "true"},
{false, "false"},
{epochTime, "0"},
{time.Duration(100), "100i"},
{struct{}{}, ""},
}
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
val, err := appendProtocolValue(nil, test.value)
if err != nil && test.expect != "" {
require.Nil(t, err.Error())
}
require.Equal(t, test.expect, string(val))
})
}
}
// Test_AppendConverted tests appendConverted function
func Test_AppendConverted(t *testing.T) {
var tests = []struct {
dataType string
value string
expect string
}{
{"", "1", "1"},
{"long", "a", ""},
{"dateTime", "a", ""},
{"dateTime:number", "a", ""},
{"string", "a", `"a"`},
}
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
column := &CsvTableColumn{Label: "test"}
column.setupDataType(test.dataType)
val, err := appendConverted(nil, test.value, column, 1)
if err != nil && test.expect != "" {
require.Nil(t, err.Error())
}
require.Equal(t, test.expect, string(val))
})
}
}
// Test_IsTypeSupported tests IsTypeSupported function
func Test_IsTypeSupported(t *testing.T) {
require.True(t, IsTypeSupported(stringDatatype), true)
require.True(t, IsTypeSupported(doubleDatatype), true)
require.True(t, IsTypeSupported(boolDatatype), true)
require.True(t, IsTypeSupported(longDatatype), true)
require.True(t, IsTypeSupported(uLongDatatype), true)
require.True(t, IsTypeSupported(durationDatatype), true)
require.True(t, IsTypeSupported(base64BinaryDataType), true)
require.True(t, IsTypeSupported(dateTimeDatatype), true)
require.True(t, IsTypeSupported(""), true)
require.False(t, IsTypeSupported(" "), false)
// time format is not part of data type
require.False(t, IsTypeSupported(dateTimeDatatype+":"+RFC3339))
require.False(t, IsTypeSupported(dateTimeDatatype+":"+RFC3339Nano))
require.False(t, IsTypeSupported(dateTimeDatatype+":"+dataFormatNumber))
}
// Test_NormalizeNumberString tests normalizeNumberString function
func Test_NormalizeNumberString(t *testing.T) {
var tests = []struct {
value string
format string
removeFraction bool
expect string
truncated bool
}{
{"123", "", true, "123", false},
{"123", ".", true, "123", false},
{"123.456", ".", true, "123", true},
{"123.456", ".", false, "123.456", false},
{"1 2.3,456", ",. ", false, "123.456", false},
{" 1 2\t3.456 \r\n", "", false, "123.456", false},
}
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
// customize logging to check warnings
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
log.SetPrefix(prefix)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
log.SetPrefix(oldPrefix)
}()
normalized, truncated := normalizeNumberString(test.value, test.format, test.removeFraction)
require.Equal(t, test.expect, normalized)
require.Equal(t, test.truncated, truncated)
})
}
}
// Test_CreateDecoder tests CreateDecoder function
func Test_CreateDecoder(t *testing.T) {
decoder, err := CreateDecoder("UTF-8")
toUtf8 := func(in []byte) string {
s, _ := ioutil.ReadAll(decoder(bytes.NewReader(in)))
return string(s)
}
require.NotNil(t, decoder)
require.Nil(t, err)
require.Equal(t, "\u2318", toUtf8([]byte{226, 140, 152}))
decoder, err = CreateDecoder("windows-1250")
require.NotNil(t, decoder)
require.Nil(t, err)
require.Equal(t, "\u0160", toUtf8([]byte{0x8A}))
decoder, err = CreateDecoder("whateveritis")
require.NotNil(t, err)
require.Nil(t, decoder)
// we can have valid IANA names that are not supported by golang/x/text
decoder, err = CreateDecoder("US-ASCII")
log.Printf("US-ASCII encoding support: %v,%v", decoder != nil, err)
}
// Test_CreateBoolParseFn tests createBoolParseFn function
func Test_CreateBoolParseFn(t *testing.T) {
type pairT struct {
value string
expect string
}
var tests = []struct {
format string
pair []pairT
}{
{"t,y,1:f,n,0", []pairT{
{"y", "true"},
{"0", "false"},
{"T", "unsupported"},
}},
{"true", []pairT{
{"true", "unsupported"},
{"false", "unsupported"},
}},
{"true:", []pairT{
{"true", "true"},
{"other", "false"},
}},
{":false", []pairT{
{"false", "false"},
{"other", "true"},
}},
}
for i, test := range tests {
fn := createBoolParseFn(test.format)
for j, pair := range test.pair {
t.Run(fmt.Sprint(i)+"_"+fmt.Sprint(j), func(t *testing.T) {
result, err := fn(pair.value)
switch pair.expect {
case "true":
require.Equal(t, true, result)
case "false":
require.Equal(t, false, result)
default:
require.NotNil(t, err)
require.True(t, strings.Contains(fmt.Sprintf("%v", err), pair.expect))
}
})
}
}
}

195
pkg/csv2lp/examples_test.go Normal file
View File

@ -0,0 +1,195 @@
package csv2lp
import (
"fmt"
"io/ioutil"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
type csvExample struct {
name string
csv string
lp string
}
var examples []csvExample = []csvExample{
{
"fluxQueryResult",
`
#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:17:57Z,0,time_steal,cpu,cpu1,rsavage.prod
,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:07Z,0,time_steal,cpu,cpu1,rsavage.prod
#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:01Z,2.7263631815907954,usage_user,cpu,cpu-total,tahoecity.prod
,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:11Z,2.247752247752248,usage_user,cpu,cpu-total,tahoecity.prod
`,
`
cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669077000000000
cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669087000000000
cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.7263631815907954 1582669081000000000
cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.247752247752248 1582669091000000000
`,
},
{
"annotatedSimple",
`
#datatype measurement,tag,tag,double,double,ignored,dateTime:number
m,cpu,host,time_steal,usage_user,nothing,time
cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000
cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000
`,
`
cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.7 1482669077000000000
cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.2 1482669087000000000
`,
},
{
"annotatedSimple_labels",
`
m|measurement,cpu|tag,host|tag,time_steal|double,usage_user|double,nothing|ignored,time|dateTime:number
cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000
cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000
`,
`
cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.7 1482669077000000000
cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.2 1482669087000000000
`,
},
{
"annotatedDatatype",
`
#datatype measurement,tag,string,double,boolean,long,unsignedLong,duration,dateTime
#default test,annotatedDatatypes,,,,,,
m,name,s,d,b,l,ul,dur,time
,,str1,1.0,true,1,1,1ms,1
,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z
`,
`
test,name=annotatedDatatypes s="str1",d=1,b=true,l=1i,ul=1u,dur=1000000i 1
test,name=annotatedDatatypes s="str2",d=2,b=false,l=2i,ul=2u,dur=2000i 1578737410000000000
`,
},
{
"annotatedDatatype_labels",
`
m|measurement|test,name|tag|annotatedDatatypes,s|string,d|double,b|boolean,l|long,ul|unsignedLong,dur|duration,time|dateTime
,,str1,1.0,true,1,1,1ms,1
,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z`,
`
test,name=annotatedDatatypes s="str1",d=1,b=true,l=1i,ul=1u,dur=1000000i 1
test,name=annotatedDatatypes s="str2",d=2,b=false,l=2i,ul=2u,dur=2000i 1578737410000000000
`,
},
{
"datetypeFormats",
`
#constant measurement,test
#constant tag,name,datetypeFormats
#timezone -0500
#datatype dateTime:2006-01-02|1970-01-02,"double:,. ","boolean:y,Y:n,N|y"
t,d,b
1970-01-01,"123.456,78",
,"123 456,78",Y
`,
`
test,name=datetypeFormats d=123456.78,b=true 18000000000000
test,name=datetypeFormats d=123456.78,b=true 104400000000000
`,
},
{
"datetypeFormats_labels",
`
#constant measurement,test
#constant tag,name,datetypeFormats
#timezone -0500
t|dateTime:2006-01-02|1970-01-02,"d|double:,. ","b|boolean:y,Y:n,N|y"
1970-01-01,"123.456,78",
,"123 456,78",Y
`,
`
test,name=datetypeFormats d=123456.78,b=true 18000000000000
test,name=datetypeFormats d=123456.78,b=true 104400000000000
`,
},
{
"datetypeFormats_labels_override",
`
#constant measurement,test2
t|dateTime:2006-01-02,_|ignored,s|string|unknown
1970-01-01,"123.456,78",
,"123 456,78",Y
`,
`
test2 s="unknown" 0
test2 s="Y"
`,
},
{
"datetypeFormats_labels_override",
`
m|measurement,usage_user|double
cpu,2.7
cpu,nil
cpu,
,2.9
`,
`
cpu usage_user=2.7
`,
},
{
"columnSeparator",
`sep=;
m|measurement;available|boolean:y,Y:|n;dt|dateTime:number
test;nil;1
test;N;2
test;";";3
test;;4
test;Y;5
`,
`
test available=false 1
test available=false 2
test available=false 3
test available=false 4
test available=true 5
`,
},
}
func (example *csvExample) normalize() rune {
for len(example.lp) > 0 && example.lp[0] == '\n' {
example.lp = example.lp[1:]
}
if strings.HasPrefix(example.csv, "sep=") {
return (rune)(example.csv[4])
}
return ','
}
// Test_Examples tests examples of README.md file herein
func Test_Examples(t *testing.T) {
for _, example := range examples {
t.Run(example.name, func(t *testing.T) {
comma := example.normalize()
transformer := CsvToLineProtocol(strings.NewReader(example.csv))
transformer.SkipRowOnError(true)
result, err := ioutil.ReadAll(transformer)
if err != nil {
require.Nil(t, fmt.Sprintf("%s", err))
}
require.Equal(t, comma, transformer.Comma())
require.Equal(t, example.lp, string(result))
})
}
}

100
pkg/csv2lp/line_reader.go Normal file
View File

@ -0,0 +1,100 @@
package csv2lp
import (
"io"
)
const (
defaultBufSize = 4096
)
// LineReader wraps an io.Reader to count lines that go though read
// function and returns at most one line during every invocation of
// read. It provides a workaround to golang's CSV reader that
// does not expose current line number at all
// (see https://github.com/golang/go/issues/26679)
//
// At most one line is returned by every read in order to ensure that
// golang's CSV reader buffers at most one single line into its nested
// bufio.Reader.
type LineReader struct {
// LineNumber of the next read operation, 0 is the first line by default.
// It can be set to 1 start counting from 1.
LineNumber int
// LastLineNumber is the number of the last read row.
LastLineNumber int
// rs is a wrapped reader
rd io.Reader // reader provided by the client
// buf contains last data read from rd
buf []byte
// readPos is a read position in the buffer
readPos int
// bufSize is the length of data read from rd into buf
bufSize int
// err contains the last error during read
err error
}
// NewLineReader returns a new LineReader.
func NewLineReader(rd io.Reader) *LineReader {
return NewLineReaderSize(rd, defaultBufSize)
}
// NewLineReaderSize returns a new Reader whose buffer has at least the specified
// size.
func NewLineReaderSize(rd io.Reader, size int) *LineReader {
if size < 2 {
size = 2
}
return &LineReader{
rd: rd,
buf: make([]byte, size),
}
}
// Read reads data into p. It fills in data that either does
// not contain \n or ends with \n.
// It returns the number of bytes read into p.
func (lr *LineReader) Read(p []byte) (int, error) {
// handle pathological case of reading into empty array
if len(p) == 0 {
if lr.readPos < lr.bufSize {
return 0, nil
}
return 0, lr.readErr()
}
// read data into buf
if lr.readPos == lr.bufSize {
if lr.err != nil {
return 0, lr.readErr()
}
lr.readPos = 0
lr.bufSize, lr.err = lr.rd.Read(lr.buf)
if lr.bufSize == 0 {
return 0, lr.readErr()
}
}
// copy at most one line and don't overflow internal buffer or p
i := 0
lr.LastLineNumber = lr.LineNumber
for lr.readPos < lr.bufSize && i < len(p) {
b := lr.buf[lr.readPos]
lr.readPos++
p[i] = b
i++
// read at most one line
if b == '\n' {
lr.LineNumber++
break
}
}
return i, nil
}
// readErr returns the last error and resets err status
func (lr *LineReader) readErr() error {
err := lr.err
lr.err = nil
return err
}

View File

@ -0,0 +1,148 @@
package csv2lp_test
import (
"encoding/csv"
"fmt"
"io"
"strings"
"testing"
"testing/iotest"
"github.com/influxdata/influx-cli/v2/pkg/csv2lp"
"github.com/stretchr/testify/require"
)
// TestLineReader tests correctness of line reporting and reader implementation of LineReader
func TestLineReader(t *testing.T) {
type TestInput = struct {
lines [4]string
withDataErrorReader bool
}
tests := []TestInput{
{
lines: [4]string{"a\n", "\n", "\n", "bcxy"},
withDataErrorReader: false,
}, {
lines: [4]string{"a\n", "\n", "\n", "bcxy"},
withDataErrorReader: true,
}, {
lines: [4]string{"a\n", "\n", "\n", "bcx\n"},
withDataErrorReader: false,
}, {
lines: [4]string{"a\n", "\n", "\n", "bcx\n"},
withDataErrorReader: true,
}}
for _, test := range tests {
lines := test.lines
input := strings.Join(lines[:], "")
withDataErrorReader := test.withDataErrorReader
t.Run(fmt.Sprintf("%s withDataErrorReader=%v", input, withDataErrorReader), func(t *testing.T) {
var reader io.Reader = strings.NewReader(input)
if withDataErrorReader {
// ensures that the reader reports the last EOF error also with data
reader = iotest.DataErrReader(reader)
}
lineReader := csv2lp.NewLineReaderSize(reader, 2)
var buf []byte = make([]byte, 4)
var err error
var read int
// patologic case: reading from empty buffer returns 0 without an error
read, err = lineReader.Read(buf[0:0])
require.Equal(t, 0, read)
require.Nil(t, err)
require.Equal(t, 0, lineReader.LastLineNumber)
// 1st line
read, err = lineReader.Read(buf)
require.Equal(t, []byte(lines[0]), buf[0:read])
require.Nil(t, err)
require.Equal(t, 0, lineReader.LastLineNumber)
// 2nd
read, err = lineReader.Read(buf)
require.Equal(t, []byte(lines[1]), buf[0:read])
require.Nil(t, err)
require.Equal(t, 1, lineReader.LastLineNumber)
// reading into empty does not change the game
read, err = lineReader.Read(buf[0:0])
require.Equal(t, 0, read)
require.Nil(t, err)
require.Equal(t, 1, lineReader.LastLineNumber)
// 3rd
read, err = lineReader.Read(buf)
require.Equal(t, []byte(lines[2]), buf[0:read])
require.Nil(t, err)
require.Equal(t, 2, lineReader.LastLineNumber)
// 4th line cannot be fully read, because buffer size is 2
read, err = lineReader.Read(buf)
require.Equal(t, []byte(lines[3][:2]), buf[0:read])
require.Nil(t, err)
require.Equal(t, 3, lineReader.LastLineNumber)
read, err = lineReader.Read(buf)
require.Equal(t, []byte(lines[3][2:]), buf[0:read])
require.Nil(t, err)
require.Equal(t, 3, lineReader.LastLineNumber)
// 5th line => error
_, err = lineReader.Read(buf)
require.NotNil(t, err)
})
}
}
// TestLineReader_Read_BufferOverflow ensures calling Open into
// a slice does not panic. Fixes https://github.com/influxdata/influxdb/issues/19586
func TestLineReader_Read_BufferOverflow(t *testing.T) {
sr := strings.NewReader("foo\nbar")
rd := csv2lp.NewLineReader(sr)
buf := make([]byte, 2)
n, err := rd.Read(buf)
require.Equal(t, n, 2)
require.NoError(t, err)
}
// TestLineReader_viaCsv tests correct line reporting when read through a CSV reader with various buffer sizes
// to emulate multiple required reads with a small test data set
func TestLineReader_viaCsv(t *testing.T) {
type RowWithLine = struct {
row []string
lineNumber int
}
input := "a\n\nb\n\nc"
expected := []RowWithLine{
{[]string{"a"}, 1},
{[]string{"b"}, 3},
{[]string{"c"}, 5},
}
bufferSizes := []int{-1, 0, 2, 50}
for _, bufferSize := range bufferSizes {
t.Run(fmt.Sprintf("buffer size: %d", bufferSize), func(t *testing.T) {
var lineReader *csv2lp.LineReader
if bufferSize < 0 {
lineReader = csv2lp.NewLineReader(strings.NewReader(input))
} else {
lineReader = csv2lp.NewLineReaderSize(strings.NewReader(input), bufferSize)
}
lineReader.LineNumber = 1 // start with 1
csvReader := csv.NewReader(lineReader)
results := make([]RowWithLine, 0, 3)
for {
row, _ := csvReader.Read()
lineNumber := lineReader.LastLineNumber
if row == nil {
break
}
results = append(results, RowWithLine{row, lineNumber})
}
require.Equal(t, expected, results)
})
}
}

View File

@ -0,0 +1,33 @@
package csv2lp
import (
"io"
"log"
)
// multicloser
type multiCloser struct {
closers []io.Closer
}
// Close implements io.Closer to closes all nested closers and logs a warning on error
func (mc *multiCloser) Close() error {
var err error
for i := 0; i < len(mc.closers); i++ {
e := mc.closers[i].Close()
if e != nil {
if err == nil {
err = e
}
log.Println(err)
}
}
return err
}
//MultiCloser creates an io.Closer that silently closes supplied io.Closer instances
func MultiCloser(closers ...io.Closer) io.Closer {
c := make([]io.Closer, len(closers))
copy(c, closers)
return &multiCloser{c}
}

View File

@ -0,0 +1,62 @@
package csv2lp
import (
"bytes"
"errors"
"fmt"
"io"
"log"
"os"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
type testCloser struct {
errorMsg string
}
func (c testCloser) Close() error {
if c.errorMsg == "" {
return nil
}
return errors.New(c.errorMsg)
}
// Test_escapeMeasurement
func TestMultiCloser(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
log.SetPrefix(prefix)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
log.SetPrefix(oldPrefix)
}()
var tests = []struct {
subject io.Closer
errors int
}{
{MultiCloser(), 0},
{MultiCloser(testCloser{}, testCloser{}), 0},
{MultiCloser(testCloser{"a"}, testCloser{}, testCloser{"c"}), 2},
}
for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
buf.Reset()
err := test.subject.Close()
messages := strings.Count(buf.String(), prefix)
require.Equal(t, test.errors, messages)
if test.errors > 0 {
require.NotNil(t, err)
}
})
}
}

View File

@ -0,0 +1,65 @@
package csv2lp
import (
"io"
)
// skipFirstLines is an io.Reader that skips first lines
type skipFirstLines struct {
// reader provides data
reader io.Reader
// skipLines contains the lines to skip
skipLines int
// line is a mutable variable that increases until skipLines is reached
line int
// quotedString indicates whether a quoted CSV string is being read,
// a new line inside a quoted string does not start a new CSV line
quotedString bool
}
// Read implements io.Reader
func (state *skipFirstLines) Read(p []byte) (n int, err error) {
skipHeaderLines:
for state.line < state.skipLines {
n, err := state.reader.Read(p)
if n == 0 {
return n, err
}
for i := 0; i < n; i++ {
switch p[i] {
case '"':
// a quoted string starts or stops
state.quotedString = !state.quotedString
case '\n':
if !state.quotedString {
state.line++
if state.line == state.skipLines {
// modify the buffer and return
if i == n-1 {
if err != nil {
return 0, err
}
// continue with the next chunk
break skipHeaderLines
} else {
// copy all bytes after the newline
for j := i + 1; j < n; j++ {
p[j-i-1] = p[j]
}
return n - i - 1, err
}
}
}
}
}
}
return state.reader.Read(p)
}
// SkipHeaderLinesReader wraps a reader to skip the first skipLines lines in CSV data input
func SkipHeaderLinesReader(skipLines int, reader io.Reader) io.Reader {
return &skipFirstLines{
skipLines: skipLines,
reader: reader,
}
}

View File

@ -0,0 +1,93 @@
package csv2lp
import (
"io"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
// simulates the reader that returns all data together with EOF
type readOnceWithEOF struct {
reader io.Reader
}
func (r *readOnceWithEOF) Read(p []byte) (n int, err error) {
n, _ = r.reader.Read(p)
return n, io.EOF
}
// Test_SkipHeaderLines checks that first lines are skipped
func Test_SkipHeaderLines(t *testing.T) {
var tests = []struct {
skipCount int
input string
result string
}{
{
10,
"1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n",
"",
},
{
0,
"1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n",
"1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n",
},
{
1,
"1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n",
"2\n3\n4\n5\n6\n7\n8\n9\n0\n",
},
{
5,
"1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n",
"6\n7\n8\n9\n0\n",
},
{
20,
"1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n",
"",
},
{
1,
"\"\n\"\"\n\"\n2",
"2",
},
}
for i, test := range tests {
input := test.input
bufferSizes := []int{1, 2, 7, 0, len(input), len(input) + 1}
for _, bufferSize := range bufferSizes {
t.Run(strconv.Itoa(i)+"_"+strconv.Itoa(bufferSize), func(t *testing.T) {
var reader io.Reader
if bufferSize == 0 {
// emulate a reader that returns EOF together with data
bufferSize = len(input)
reader = SkipHeaderLinesReader(test.skipCount, &readOnceWithEOF{strings.NewReader(input)})
} else {
reader = SkipHeaderLinesReader(test.skipCount, strings.NewReader(input))
}
buffer := make([]byte, bufferSize)
result := make([]byte, 0, 100)
for {
n, err := reader.Read(buffer)
if n > 0 {
result = append(result, buffer[:n]...)
}
if err != nil {
if err != io.EOF {
require.Nil(t, err.Error())
}
break
}
}
require.Equal(t, test.result, string(result))
})
}
}
}