diff --git a/go.mod b/go.mod index 68bacbf..8ae7280 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/kr/pretty v0.1.0 // indirect github.com/stretchr/testify v1.7.0 github.com/urfave/cli/v2 v2.3.0 + golang.org/x/text v0.3.3 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect honnef.co/go/tools v0.1.3 ) diff --git a/pkg/csv2lp/README.md b/pkg/csv2lp/README.md new file mode 100644 index 0000000..b522e33 --- /dev/null +++ b/pkg/csv2lp/README.md @@ -0,0 +1,189 @@ +# CSV to Line Protocol +csv2lp library converts CSV (comma separated values) to InfluxDB Line Protocol. + + 1. it can process CSV result of a (simple) flux query that exports data from a bucket + 2. it allows the processing of existing CSV files + +## Usage +The entry point is the ``CsvToLineProtocol`` function that accepts a (utf8) reader with CSV data and returns a reader with line protocol data. + +## Examples +#### Example 1 - Flux Query Result +csv: +```bash +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host +,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:17:57Z,0,time_steal,cpu,cpu1,rsavage.prod +,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:07Z,0,time_steal,cpu,cpu1,rsavage.prod + +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host +,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:01Z,2.7263631815907954,usage_user,cpu,cpu-total,tahoecity.prod +,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:11Z,2.247752247752248,usage_user,cpu,cpu-total,tahoecity.prod +``` + +line protocol data: +``` +cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669077000000000 +cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669087000000000 +cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.7263631815907954 1582669081000000000 +cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.247752247752248 1582669091000000000 +``` +#### Example 2 - Simple CSV file + +csv: +```bash +#datatype measurement,tag,tag,double,double,ignored,dateTime:number +m,cpu,host,time_steal,usage_user,nothing,time +cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000 +cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000 +``` + +line protocol data: +``` +cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.7 1482669077000000000 +cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.2 1482669087000000000 +``` + +Data type can be supplied in the column name, the CSV can be shortened to: + +``` +m|measurement,cpu|tag,host|tag,time_steal|double,usage_user|double,nothing|ignored,time|dateTime:number +cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000 +cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000 +``` +#### Example 3 - Data Types with default values + +csv: +```bash +#datatype measurement,tag,string,double,boolean,long,unsignedLong,duration,dateTime +#default test,annotatedDatatypes,,,,,, +m,name,s,d,b,l,ul,dur,time +,,str1,1.0,true,1,1,1ms,1 +,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z +``` + +line protocol data: +``` +test,name=annotatedDatatypes s="str1",d=1,b=true,l=1i,ul=1u,dur=1000000i 1 +test,name=annotatedDatatypes s="str2",d=2,b=false,l=2i,ul=2u,dur=2000i 1578737410000000000 +``` + +Default value can be supplied in the column label after data type, the CSV could be also: + +``` +m|measurement|test,name|tag|annotatedDatatypes,s|string,d|double,b|boolean,l|long,ul|unsignedLong,dur|duration,time|dateTime +,,str1,1.0,true,1,1,1ms,1 +,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z +``` +#### Example 4 - Advanced usage +csv: +``` +#constant measurement,test +#constant tag,name,datetypeFormats +#timezone -0500 +t|dateTime:2006-01-02|1970-01-02,"d|double:,. ","b|boolean:y,Y:n,N|y" +1970-01-01,"123.456,78", +,"123 456,78",Y +``` + - measurement and extra tags is defined using the `#constant` annotation + - timezone for dateTime is to `-0500` (EST) + - `t` column is of `dateTime` data type of format is `2006-01-02`, default value is _January 2nd 1970_ + - `d` column is of `double` data type with `,` as a fraction delimiter and `. ` as ignored separators that used to visually separate large numbers into groups + - `b` column os of `boolean` data type that considers `y` or `Y` truthy, `n` or `N` falsy and empty column values as truthy + + +line protocol data: +``` +test,name=datetypeFormats d=123456.78,b=true 18000000000000 +test,name=datetypeFormats d=123456.78,b=true 104400000000000 +``` + +#### Example 5 - Custom column separator +``` +sep=; +m|measurement;available|boolean:y,Y:|n;dt|dateTime:number +test;nil;1 +test;N;2 +test;";";3 +test;;4 +test;Y;5 +``` + - the first line can define a column separator character for next lines, here: `;` + - other lines use this separator, `available|boolean:y,Y` does not need to be wrapped in double quotes + +line protocol data: +``` +test available=false 1 +test available=false 2 +test available=false 3 +test available=false 4 +test available=true 5 +``` +## CSV Data On Input +This library supports all the concepts of [flux result annotated CSV](https://docs.influxdata.com/influxdb/latest/reference/syntax/annotated-csv/#tables) and provides a few extensions that allow to process existing/custom CSV files. The conversion to line protocol is driven by contents of annotation rows and layout of the header row. + +#### New data types +Existing [data types](https://docs.influxdata.com/influxdb/latest/reference/syntax/annotated-csv/#data-types) are supported. The CSV input can also contain the following data types that are used to associate a column value to a part of a protocol line + - `measurement` data type identifies a column that carries the measurement name + - `tag` data type identifies a column with a tag value, column label (from the header row) is the tag name + - `time` is an alias for existing `dateTime` type , there is at most one such column in a CSV row + - `ignore` and `ignored` data types are used to identify columns that are ignored when creating a protocol line + - `field` data type is used to copy the column data to a protocol line as-is + +#### New CSV annotations +- `#constant` annotation adds a constant column to the data, so you can set measurement, time, field or tag of every row you import + - the format of a constant annotation row is `#constant,datatype,name,value`', it contains supported datatype, a column name, and a constant value + - _column name_ can be omitted for _dateTime_ or _measurement_ columns, so the annotation can be simply `#constant,measurement,cpu` +- `#concat` annotation adds a new column that is concatenated from existing columns according to a template + - the format of a concat annotation row is `#concat,datatype,name,template`', it contains supported datatype, a column name, and a template value + - the `template` is a string with `${columnName}` placeholders, in which the placeholders are replaced by values of existing columns + - for example: `#concat,string,fullName,${firstName} ${lastName}` + - _column name_ can be omitted for _dateTime_ or _measurement_ columns +- `#timezone` annotation specifies the time zone of the data using an offset, which is either `+hhmm` or `-hhmm` or `Local` to use the local/computer time zone. Examples: _#timezone,+0100_ _#timezone -0500_ _#timezone Local_ + +#### Data type with data format +All data types can include the format that is used to parse column data. It is then specified as `datatype:format`. The following data types support format: +- `dateTime:format` + - the following formats are predefined: + - `dateTime:RFC3339` format is 2006-01-02T15:04:05Z07:00 + - `dateTime:RFC3339Nano` format is 2006-01-02T15:04:05.999999999Z07:00 + - `dateTime:number` represent UTCs time since epoch in nanoseconds + - a custom layout as described in the [time](https://golang.org/pkg/time) package, for example `dateTime:2006-01-02` parses 4-digit-year , '-' , 2-digit month ,'-' , 2 digit day of the month + - if the time format includes a time zone, the parsed date time respects the time zone; otherwise the timezone dependends on the presence of the new `#timezone` annotation; if there is no `#timezone` annotation, UTC is used +- `double:format` + - the `format`'s first character is used to separate integer and fractional part (usually `.` or `,`), second and next format's characters (such as as `, _`) are removed from the column value, these removed characters are typically used to visually separate large numbers into groups + - for example: + - a Spanish locale value `3.494.826.157,123` is of `double:,.` type; the same `double` value is _3494826157.123_ + - `1_000_000` is of `double:._` type to be a million `double` + - note that you have to quote column delimiters whenever they appear in a CSV column value, for example: + - `#constant,"double:,.",myColumn,"1.234,011"` +- `long:format` and `unsignedLong:format` support the same format as `double`, but everything after and including a fraction character is ignored + - the format can be appended with `strict` to fail when a fraction digit is present, for example: + - `1000.000` is `1000` when parsed as `long`, but fails when parsed as `long:strict` + - `1_000,000` is `1000` when parsed as `long:,_`, but fails when parsed as `long:strict,_` +- `boolean:truthy:falsy` + - `truthy` and `falsy` are comma-separated lists of values, they can be empty to assume all values as truthy/falsy; for example `boolean:sí,yes,ja,oui,ano,да:no,nein,non,ne,нет` + - a `boolean` data type (without the format) parses column values that start with any of _tTyY1_ as `true` values, _fFnN0_ as `false` values and fails on other values + - a column with an empty value is excluded in the protocol line unless a default value is supplied either using `#default` annotation or in a header line (see below) + +#### Header row with data types and default values +The header row (i.e. the row that define column names) can also define column data types when supplied as `name|datatype`; for example `cpu|tag` defines a tag column named _cpu_ . Moreover, it can also specify a default value when supplied as `name|datatype|default`; for example, `count|long|0` defines a field column named _count_ of _long_ data type that will not skip the field if a column value is empty, but uses '0' as the column value. + - this approach helps to easily specify column names, types and defaults in a single row + - this is an alternative to using 3 lines being `#datatype` and `#default` annotations and a simple header row + +#### Custom CSV column separator +A CSV file can start with a line `sep=;` to inform about a character that is used to separate columns, by default `,` is used as a column separator. This method is frequently used (Excel). + +#### Error handling +The CSV conversion stops on the first error by default, line and column are reported together with the error. The CsvToLineReader's SkipRowOnError function can change it to skip error rows and log errors instead. + +#### Support Existing CSV files +The majority of existing CSV files can be imported by skipping the first X lines of existing data (so that custom header line can be then provided) and prepending extra annotation/header lines to let this library know of how to convert the CSV to line protocol. The following functions helps to change the data on input + - [csv2lp.SkipHeaderLinesReader](./skip_header_lines.go) returns a reader that skip the first x lines of the supplied reader + - [io.MultiReader](https://golang.org/pkg/io/#MultiReader) joins multiple readers, custom header line(s) and new lines can be prepended as [strings.NewReader](https://golang.org/pkg/strings/#NewReader)s + - [csv2lp.MultiCloser](./multi_closer.go) helps with closing multiple io.Closers (files) on input, [it is not available OOTB](https://github.com/golang/go/issues/20136) diff --git a/pkg/csv2lp/csv2lp.go b/pkg/csv2lp/csv2lp.go new file mode 100644 index 0000000..f4edbec --- /dev/null +++ b/pkg/csv2lp/csv2lp.go @@ -0,0 +1,163 @@ +// Package csv2lp transforms CSV data to InfluxDB line protocol +package csv2lp + +import ( + "encoding/csv" + "fmt" + "io" + "log" + "strings" +) + +// CsvLineError is returned for csv conversion errors +type CsvLineError struct { + // 1 is the first line + Line int + Err error +} + +func (e CsvLineError) Error() string { + if e.Line > 0 { + return fmt.Sprintf("line %d: %v", e.Line, e.Err) + } + return fmt.Sprintf("%v", e.Err) +} + +// CreateRowColumnError wraps an existing error to add line and column coordinates +func CreateRowColumnError(line int, columnLabel string, err error) CsvLineError { + return CsvLineError{ + Line: line, + Err: CsvColumnError{ + Column: columnLabel, + Err: err, + }, + } +} + +// CsvToLineReader represents state of transformation from csv data to line protocol reader +type CsvToLineReader struct { + // csv reading + csv *csv.Reader + // lineReader is used to report line number of the last read CSV line + lineReader *LineReader + // Table collects information about used columns + Table CsvTable + // LineNumber represents line number of csv.Reader, 1 is the first + LineNumber int + // when true, log table data columns before reading data rows + logTableDataColumns bool + // state variable that indicates whether any data row was read + dataRowAdded bool + // log CSV data errors to sterr and continue with CSV processing + skipRowOnError bool + // RowSkipped is called when a row is skipped because of data parsing error + RowSkipped func(source *CsvToLineReader, lineError error, row []string) + + // reader results + buffer []byte + lineBuffer []byte + index int + finished error +} + +// LogTableColumns turns on/off logging of table data columns before reading data rows +func (state *CsvToLineReader) LogTableColumns(val bool) *CsvToLineReader { + state.logTableDataColumns = val + return state +} + +// SkipRowOnError controls whether to fail on every CSV conversion error (false) or to log the error and continue (true) +func (state *CsvToLineReader) SkipRowOnError(val bool) *CsvToLineReader { + state.skipRowOnError = val + return state +} + +// Comma returns a field delimiter used in an input CSV file +func (state *CsvToLineReader) Comma() rune { + return state.csv.Comma +} + +// Read implements io.Reader that returns protocol lines +func (state *CsvToLineReader) Read(p []byte) (n int, err error) { + // state1: finished + if state.finished != nil { + return 0, state.finished + } + // state2: some data are in the buffer to copy + if len(state.buffer) > state.index { + // we have remaining bytes to copy + if len(state.buffer)-state.index > len(p) { + // copy a part of the buffer + copy(p, state.buffer[state.index:state.index+len(p)]) + state.index += len(p) + return len(p), nil + } + // copy the entire buffer + n = len(state.buffer) - state.index + copy(p[:n], state.buffer[state.index:]) + state.buffer = state.buffer[:0] + state.index = 0 + return n, nil + } + // state3: fill buffer with data to read from + for { + // Read each record from csv + row, err := state.csv.Read() + state.LineNumber = state.lineReader.LastLineNumber + if parseError, ok := err.(*csv.ParseError); ok && parseError.Err == csv.ErrFieldCount { + // every row can have different number of columns + err = nil + } + + if err != nil { + state.finished = err + return state.Read(p) + } + if state.LineNumber == 1 && len(row) == 1 && strings.HasPrefix(row[0], "sep=") && len(row[0]) > 4 { + // separator specified in the first line + state.csv.Comma = rune(row[0][4]) + continue + } + if state.Table.AddRow(row) { + var err error + state.lineBuffer = state.lineBuffer[:0] // reuse line buffer + state.lineBuffer, err = state.Table.AppendLine(state.lineBuffer, row, state.LineNumber) + if !state.dataRowAdded && state.logTableDataColumns { + log.Println(state.Table.DataColumnsInfo()) + } + state.dataRowAdded = true + if err != nil { + lineError := CsvLineError{state.LineNumber, err} + if state.RowSkipped != nil { + state.RowSkipped(state, lineError, row) + continue + } + if state.skipRowOnError { + log.Println(lineError) + continue + } + state.finished = lineError + return state.Read(p) + } + + state.buffer = append(state.buffer, state.lineBuffer...) + state.buffer = append(state.buffer, '\n') + break + } else { + state.dataRowAdded = false + } + } + return state.Read(p) +} + +// CsvToLineProtocol transforms csv data into line protocol data +func CsvToLineProtocol(reader io.Reader) *CsvToLineReader { + lineReader := NewLineReader(reader) + lineReader.LineNumber = 1 // start counting from 1 + csv := csv.NewReader(lineReader) + csv.ReuseRecord = true + return &CsvToLineReader{ + csv: csv, + lineReader: lineReader, + } +} diff --git a/pkg/csv2lp/csv2lp_test.go b/pkg/csv2lp/csv2lp_test.go new file mode 100644 index 0000000..5b72655 --- /dev/null +++ b/pkg/csv2lp/csv2lp_test.go @@ -0,0 +1,426 @@ +package csv2lp + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "log" + "os" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// Test_CsvToLineProtocol_variousBufferSize tests conversion of annotated CSV data to line protocol data on various buffer sizes +func Test_CsvToLineProtocol_variousBufferSize(t *testing.T) { + var tests = []struct { + name string + csv string + lines string + err string + }{ + { + "simple1", + "_measurement,a,b\ncpu,1,1\ncpu,b2\n", + "cpu a=1,b=1\ncpu a=b2\n", + "", + }, + { + "simple1_withSep", + "sep=;\n_measurement;a;b\ncpu;1;1\ncpu;b2\n", + "cpu a=1,b=1\ncpu a=b2\n", + "", + }, + { + "simple2", + "_measurement,a,b\ncpu,1,1\ncpu,\n", + "", + "no field data", + }, + { + "simple3", + "_measurement,a,_time\ncpu,1,1\ncpu,2,invalidTime\n", + "", + "_time", // error in _time column + }, + { + "constant_annotations", + "#constant,measurement,,cpu\n" + + "#constant,tag,xpu,xpu1\n" + + "#constant,tag,cpu,cpu1\n" + + "#constant,long,of,100\n" + + "#constant,dateTime,,2\n" + + "x,y\n" + + "1,2\n" + + "3,4\n", + "cpu,cpu=cpu1,xpu=xpu1 x=1,y=2,of=100i 2\n" + + "cpu,cpu=cpu1,xpu=xpu1 x=3,y=4,of=100i 2\n", + "", // no error + }, + { + "timezone_annotation-0100", + "#timezone,-0100\n" + + "#constant,measurement,cpu\n" + + "#constant,dateTime:2006-01-02,1970-01-01\n" + + "x,y\n" + + "1,2\n", + "cpu x=1,y=2 3600000000000\n", + "", // no error + }, + { + "timezone_annotation_EST", + "#timezone,EST\n" + + "#constant,measurement,cpu\n" + + "#constant,dateTime:2006-01-02,1970-01-01\n" + + "x,y\n" + + "1,2\n", + "cpu x=1,y=2 18000000000000\n", + "", // no error + }, + } + bufferSizes := []int{40, 7, 3, 1} + + for _, test := range tests { + for _, bufferSize := range bufferSizes { + t.Run(test.name+"_"+strconv.Itoa(bufferSize), func(t *testing.T) { + reader := CsvToLineProtocol(strings.NewReader(test.csv)) + buffer := make([]byte, bufferSize) + lines := make([]byte, 0, 100) + for { + n, err := reader.Read(buffer) + if err != nil { + if err == io.EOF { + break + } + if test.err != "" { + // fmt.Println(err) + if err := err.Error(); !strings.Contains(err, test.err) { + require.Equal(t, err, test.err) + } + return + } + require.Nil(t, err.Error()) + break + } + lines = append(lines, buffer[:n]...) + } + if test.err == "" { + require.Equal(t, test.lines, string(lines)) + } else { + require.Fail(t, "error message with '"+test.err+"' expected") + } + }) + } + } +} + +// Test_CsvToLineProtocol_samples tests conversion of annotated CSV data to line protocol data +func Test_CsvToLineProtocol_samples(t *testing.T) { + var tests = []struct { + name string + csv string + lines string + err string + }{ + { + "queryResult_19452", // https://github.com/influxdata/influxdb/issues/19452 + "#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n" + + "#group,false,false,true,true,false,false,true,true,true\n" + + "#default,_result,,,,,,,,\n" + + ",result,table,_start,_stop,_time,_value,_field,_measurement,host\n" + + ",,0,2020-08-26T22:59:23.598653Z,2020-08-26T23:00:23.598653Z,2020-08-26T22:59:30Z,15075651584,active,mem,ip-192-168-86-25.ec2.internal\n", + "mem,host=ip-192-168-86-25.ec2.internal active=15075651584i 1598482770000000000\n", + "", // no error + }, + { + "queryResult_19452_group_first", // issue 19452, but with group annotation first + "#group,false,false,true,true,false,false,true,true,true\n" + + "#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n" + + "#default,_result,,,,,,,,\n" + + ",result,table,_start,_stop,_time,_value,_field,_measurement,host\n" + + ",,0,2020-08-26T22:59:23.598653Z,2020-08-26T23:00:23.598653Z,2020-08-26T22:59:30Z,15075651584,active,mem,ip-192-168-86-25.ec2.internal\n", + "mem,host=ip-192-168-86-25.ec2.internal active=15075651584i 1598482770000000000\n", + "", // no error + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reader := CsvToLineProtocol(strings.NewReader(test.csv)) + buffer := make([]byte, 100) + lines := make([]byte, 0, 100) + for { + n, err := reader.Read(buffer) + if err != nil { + if err == io.EOF { + break + } + if test.err != "" { + // fmt.Println(err) + if err := err.Error(); !strings.Contains(err, test.err) { + require.Equal(t, err, test.err) + } + return + } + require.Nil(t, err.Error()) + break + } + lines = append(lines, buffer[:n]...) + } + if test.err == "" { + require.Equal(t, test.lines, string(lines)) + } else { + require.Fail(t, "error message with '"+test.err+"' expected") + } + }) + } +} + +// Test_CsvToLineProtocol_LogTableColumns checks correct logging of table columns +func Test_CsvToLineProtocol_LogTableColumns(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + oldPrefix := log.Prefix() + prefix := "::PREFIX::" + log.SetPrefix(prefix) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + + csv := "_measurement,a,b\ncpu,1,1\ncpu,b2\n" + + reader := CsvToLineProtocol(strings.NewReader(csv)).LogTableColumns(true) + require.False(t, reader.skipRowOnError) + require.True(t, reader.logTableDataColumns) + // read all the data + ioutil.ReadAll(reader) + + out := buf.String() + // fmt.Println(out) + messages := strings.Count(out, prefix) + require.Equal(t, messages, 1) +} + +// Test_CsvToLineProtocol_LogTimeZoneWarning checks correct logging of timezone warning +func Test_CsvToLineProtocol_LogTimeZoneWarning(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + oldPrefix := log.Prefix() + prefix := "::PREFIX::" + log.SetPrefix(prefix) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + + csv := "#timezone 1\n" + + "#constant,dateTime:2006-01-02,1970-01-01\n" + + "_measurement,a,b\ncpu,1,1" + + reader := CsvToLineProtocol(strings.NewReader(csv)) + bytes, _ := ioutil.ReadAll(reader) + + out := buf.String() + // fmt.Println(out) // "::PREFIX::WARNING: #timezone annotation: unknown time zone 1 + messages := strings.Count(out, prefix) + require.Equal(t, messages, 1) + require.Equal(t, string(bytes), "cpu a=1,b=1 0\n") +} + +// Test_CsvToLineProtocol_SkipRowOnError tests that error rows are skipped +func Test_CsvToLineProtocol_SkipRowOnError(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + oldPrefix := log.Prefix() + prefix := "::PREFIX::" + log.SetPrefix(prefix) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + + csv := "_measurement,a,_time\n,1,1\ncpu,2,2\ncpu,3,3a\n" + + reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true) + require.Equal(t, reader.skipRowOnError, true) + require.Equal(t, reader.logTableDataColumns, false) + // read all the data + ioutil.ReadAll(reader) + + out := buf.String() + // fmt.Println(out) + messages := strings.Count(out, prefix) + require.Equal(t, messages, 2) +} + +// Test_CsvToLineProtocol_RowSkipped tests that error rows are reported to configured RowSkipped listener +func Test_CsvToLineProtocol_RowSkipped(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + }() + + type ActualArguments = struct { + src *CsvToLineReader + err error + row []string + } + type ExpectedArguments = struct { + errorString string + row []string + } + + csv := "sep=;\n_measurement;a|long:strict\n;1\ncpu;2.1\ncpu;3a\n" + calledArgs := []ActualArguments{} + expectedArgs := []ExpectedArguments{ + { + "line 3: column '_measurement': no measurement supplied", + []string{"", "1"}, + }, + { + "line 4: column 'a': '2.1' cannot fit into long data type", + []string{"cpu", "2.1"}, + }, + { + "line 5: column 'a': strconv.ParseInt:", + []string{"cpu", "3a"}, + }, + } + + reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true) + reader.RowSkipped = func(src *CsvToLineReader, err error, _row []string) { + // make a copy of _row + row := make([]string, len(_row)) + copy(row, _row) + // remember for comparison + calledArgs = append(calledArgs, ActualArguments{ + src, err, row, + }) + } + // read all the data + ioutil.ReadAll(reader) + + out := buf.String() + require.Empty(t, out, "No log messages expected because RowSkipped handler is set") + + require.Len(t, calledArgs, 3) + for i, expected := range expectedArgs { + require.Equal(t, reader, calledArgs[i].src) + require.Contains(t, calledArgs[i].err.Error(), expected.errorString) + require.Equal(t, expected.row, calledArgs[i].row) + } +} + +// Test_CsvLineError tests CsvLineError error format +func Test_CsvLineError(t *testing.T) { + var tests = []struct { + err CsvLineError + value string + }{ + { + CsvLineError{Line: 1, Err: errors.New("cause")}, + "line 1: cause", + }, + { + CsvLineError{Line: 2, Err: CsvColumnError{"a", errors.New("cause")}}, + "line 2: column 'a': cause", + }, + { + CsvLineError{Line: -1, Err: CsvColumnError{"a", errors.New("cause")}}, + "column 'a': cause", + }, + } + for _, test := range tests { + require.Equal(t, test.value, test.err.Error()) + } +} + +// Test_CsvToLineProtocol_lineNumbers tests that correct line numbers are reported +func Test_CsvToLineProtocol_lineNumbers(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + }() + + type ActualArguments = struct { + src *CsvToLineReader + err error + row []string + } + type ExpectedArguments = struct { + errorString string + row []string + } + + // note: csv contains a field with newline and an extra empty lines + csv := `sep=; + +_measurement;a|long:strict +;1 +cpu;"2 +.1" + +cpu;3a +` + calledArgs := []ActualArguments{} + expectedArgs := []ExpectedArguments{ + { + "line 4: column '_measurement': no measurement supplied", + []string{"", "1"}, + }, + { + "line 6: column 'a': '2\n.1' cannot fit into long data type", + []string{"cpu", "2\n.1"}, + }, + { + "line 8: column 'a': strconv.ParseInt:", + []string{"cpu", "3a"}, + }, + } + + reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true) + reader.RowSkipped = func(src *CsvToLineReader, err error, _row []string) { + // make a copy of _row + row := make([]string, len(_row)) + copy(row, _row) + // remember for comparison + calledArgs = append(calledArgs, ActualArguments{ + src, err, row, + }) + } + // read all the data + ioutil.ReadAll(reader) + + out := buf.String() + require.Empty(t, out, "No log messages expected because RowSkipped handler is set") + + require.Len(t, calledArgs, 3) + for i, expected := range expectedArgs { + require.Equal(t, reader, calledArgs[i].src) + require.Contains(t, calledArgs[i].err.Error(), expected.errorString) + require.Equal(t, expected.row, calledArgs[i].row) + } + // 8 lines were read + require.Equal(t, 8, reader.LineNumber) +} diff --git a/pkg/csv2lp/csv_annotations.go b/pkg/csv2lp/csv_annotations.go new file mode 100644 index 0000000..8d662fb --- /dev/null +++ b/pkg/csv2lp/csv_annotations.go @@ -0,0 +1,222 @@ +package csv2lp + +import ( + "fmt" + "log" + "regexp" + "strconv" + "strings" + "time" +) + +// annotationComment describes CSV annotation +type annotationComment struct { + // prefix in a CSV row that recognizes this annotation + prefix string + // flag is 0 to represent an annotation that is used for all data rows + // or a unique bit (>0) between supported annotation prefixes + flag uint8 + // setupColumn setups metadata that drives the way of how column data + // are parsed, mandatory when flag > 0 + setupColumn func(column *CsvTableColumn, columnValue string) + // setupTable setups metadata that drives the way of how the table data + // are parsed, mandatory when flag == 0 + setupTable func(table *CsvTable, row []string) error +} + +// isTableAnnotation returns true for a table-wide annotation, false for column-based annotations +func (a annotationComment) isTableAnnotation() bool { + return a.setupTable != nil +} + +// matches tests whether an annotationComment can process the CSV comment row +func (a annotationComment) matches(comment string) bool { + return strings.HasPrefix(strings.ToLower(comment), a.prefix) +} + +func createConstantOrConcatColumn(table *CsvTable, row []string, annotationName string) CsvTableColumn { + // adds a virtual column with constant value to all data rows + // supported types of constant annotation rows are: + // 1. "#constant,datatype,label,defaultValue" + // 2. "#constant,measurement,value" + // 3. "#constant,dateTime,value" + // 4. "#constant datatype,label,defaultValue" + // 5. "#constant measurement,value" + // 6. "#constant dateTime,value" + // defaultValue is optional, additional columns are ignored + col := CsvTableColumn{} + col.Index = -1 // this is a virtual column that never extracts data from data rows + // setup column data type + col.setupDataType(row[0]) + var dataTypeIndex int + if len(col.DataType) == 0 && col.LinePart == 0 { + // type 1,2,3 + dataTypeIndex = 1 + if len(row) > 1 { + col.setupDataType(row[1]) + } + } else { + // type 4,5,6 + dataTypeIndex = 0 + } + // setup label if available + if len(row) > dataTypeIndex+1 { + col.Label = row[dataTypeIndex+1] + } + // setup defaultValue if available + if len(row) > dataTypeIndex+2 { + col.DefaultValue = row[dataTypeIndex+2] + } + // support type 2,3,5,6 syntax for measurement and timestamp + if col.LinePart == linePartMeasurement || col.LinePart == linePartTime { + if col.DefaultValue == "" && col.Label != "" { + // type 2,3,5,6 + col.DefaultValue = col.Label + col.Label = annotationName + " " + col.DataType + } else if col.Label == "" { + // setup a label if no label is supplied for focused error messages + col.Label = annotationName + " " + col.DataType + } + } + // add a virtual column to the table + return col +} + +// constantSetupTable setups the supplied CSV table from #constant annotation +func constantSetupTable(table *CsvTable, row []string) error { + col := createConstantOrConcatColumn(table, row, "#constant") + // add a virtual column to the table + table.extraColumns = append(table.extraColumns, &col) + return nil +} + +// computedReplacer is used to replace value in computed columns +var computedReplacer *regexp.Regexp = regexp.MustCompile(`\$\{[^}]+\}`) + +// concatSetupTable setups the supplied CSV table from #concat annotation +func concatSetupTable(table *CsvTable, row []string) error { + col := createConstantOrConcatColumn(table, row, "#concat") + template := col.DefaultValue + col.ComputeValue = func(row []string) string { + return computedReplacer.ReplaceAllStringFunc(template, func(text string) string { + columnLabel := text[2 : len(text)-1] // ${columnLabel} + if placeholderColumn := table.Column(columnLabel); placeholderColumn != nil { + return placeholderColumn.Value(row) + } + log.Printf("WARNING: column %s: column '%s' cannot be replaced, no such column available", col.Label, columnLabel) + return "" + }) + } + // add a virtual column to the table + table.extraColumns = append(table.extraColumns, &col) + // add validator to report error when no placeholder column is available + table.validators = append(table.validators, func(table *CsvTable) error { + placeholders := computedReplacer.FindAllString(template, len(template)) + for _, placeholder := range placeholders { + columnLabel := placeholder[2 : len(placeholder)-1] // ${columnLabel} + if placeholderColumn := table.Column(columnLabel); placeholderColumn == nil { + return CsvColumnError{ + Column: col.Label, + Err: fmt.Errorf("'%s' references an unknown column '%s', available columns are: %v", + template, columnLabel, strings.Join(table.ColumnLabels(), ",")), + } + } + } + return nil + }) + return nil +} + +// supportedAnnotations contains all supported CSV annotations comments +var supportedAnnotations = []annotationComment{ + { + prefix: "#group", + flag: 1, + setupColumn: func(column *CsvTableColumn, value string) { + // standard flux query result annotation + if strings.HasSuffix(value, "true") { + // setup column's line part unless it is already set (#19452) + if column.LinePart == 0 { + column.LinePart = linePartTag + } + } + }, + }, + { + prefix: "#datatype", + flag: 2, + setupColumn: func(column *CsvTableColumn, value string) { + // standard flux query result annotation + column.setupDataType(value) + }, + }, + { + prefix: "#default", + flag: 4, + setupColumn: func(column *CsvTableColumn, value string) { + // standard flux query result annotation + column.DefaultValue = ignoreLeadingComment(value) + }, + }, + { + prefix: "#constant", + setupTable: constantSetupTable, + }, + { + prefix: "#timezone", + setupTable: func(table *CsvTable, row []string) error { + // setup timezone for parsing timestamps, UTC by default + val := ignoreLeadingComment(row[0]) + if val == "" && len(row) > 1 { + val = row[1] // #timezone,Local + } + tz, err := parseTimeZone(val) + if err != nil { + return fmt.Errorf("#timezone annotation: %v", err) + } + table.timeZone = tz + return nil + }, + }, + { + prefix: "#concat", + setupTable: concatSetupTable, + }, +} + +// ignoreLeadingComment returns a value without '#anyComment ' prefix +func ignoreLeadingComment(value string) string { + if len(value) > 0 && value[0] == '#' { + pos := strings.Index(value, " ") + if pos > 0 { + return strings.TrimLeft(value[pos+1:], " ") + } + return "" + } + return value +} + +// parseTimeZone parses the supplied timezone from a string into a time.Location +// +// parseTimeZone("") // time.UTC +// parseTimeZone("local") // time.Local +// parseTimeZone("-0500") // time.FixedZone(-5*3600 + 0*60) +// parseTimeZone("+0200") // time.FixedZone(2*3600 + 0*60) +// parseTimeZone("EST") // time.LoadLocation("EST") +func parseTimeZone(val string) (*time.Location, error) { + switch { + case val == "": + return time.UTC, nil + case strings.ToLower(val) == "local": + return time.Local, nil + case val[0] == '-' || val[0] == '+': + if matched, _ := regexp.MatchString("[+-][0-9][0-9][0-9][0-9]", val); !matched { + return nil, fmt.Errorf("timezone '%s' is not +hhmm or -hhmm", val) + } + intVal, _ := strconv.Atoi(val) + offset := (intVal/100)*3600 + (intVal%100)*60 + return time.FixedZone(val, offset), nil + default: + return time.LoadLocation(val) + } +} diff --git a/pkg/csv2lp/csv_annotations_test.go b/pkg/csv2lp/csv_annotations_test.go new file mode 100644 index 0000000..3e6f0ff --- /dev/null +++ b/pkg/csv2lp/csv_annotations_test.go @@ -0,0 +1,305 @@ +package csv2lp + +import ( + "bytes" + "fmt" + "log" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func annotation(name string) annotationComment { + for _, a := range supportedAnnotations { + if a.prefix == name { + return a + } + } + panic("no annotation named " + name + " found!") +} + +// Test_GroupAnnotation tests #group annotation +func Test_GroupAnnotation(t *testing.T) { + subject := annotation("#group") + require.True(t, subject.matches("#Group")) + require.False(t, subject.isTableAnnotation()) + var tests = []struct { + value string + expect int + }{ + {"#group true", linePartTag}, + {"#group false", 0}, + {"false", 0}, + {"unknown", 0}, + } + for _, test := range tests { + t.Run(test.value, func(t *testing.T) { + col := &CsvTableColumn{} + subject.setupColumn(col, test.value) + require.Equal(t, test.expect, col.LinePart) + }) + } +} + +// Test_DefaultAnnotation tests #default annotation +func Test_DefaultAnnotation(t *testing.T) { + subject := annotation("#default") + require.True(t, subject.matches("#Default")) + require.False(t, subject.isTableAnnotation()) + var tests = []struct { + value string + expect string + }{ + {"#default 1", "1"}, + {"#default ", ""}, + {"whatever", "whatever"}, + } + for _, test := range tests { + t.Run(test.value, func(t *testing.T) { + col := &CsvTableColumn{} + subject.setupColumn(col, test.value) + require.Equal(t, test.expect, col.DefaultValue) + }) + } +} + +// Test_DatatypeAnnotation tests #datatype annotation +func Test_DatatypeAnnotation(t *testing.T) { + subject := annotation("#datatype") + require.True(t, subject.matches("#dataType")) + require.False(t, subject.isTableAnnotation()) + var tests = []struct { + value string + expectType string + expectFormat string + expectLinePart int + }{ + {"#datatype long", "long", "", 0}, + {"#datatype ", "", "", 0}, + {"#datatype measurement", "_", "", linePartMeasurement}, + {"#datatype tag", "_", "", linePartTag}, + {"#datatype field", "", "", linePartField}, + {"dateTime", "dateTime", "", linePartTime}, + {"dateTime:RFC3339", "dateTime", "RFC3339", linePartTime}, + {"#datatype dateTime:RFC3339", "dateTime", "RFC3339", linePartTime}, + {"whatever:format", "whatever", "format", 0}, + } + for _, test := range tests { + t.Run(test.value, func(t *testing.T) { + col := &CsvTableColumn{} + subject.setupColumn(col, test.value) + if test.expectType != "_" { + require.Equal(t, test.expectType, col.DataType) + } + require.Equal(t, test.expectFormat, col.DataFormat) + }) + } +} + +// Test_ConstantAnnotation tests #constant annotation +func Test_ConstantAnnotation(t *testing.T) { + subject := annotation("#constant") + require.True(t, subject.matches("#Constant")) + require.True(t, subject.isTableAnnotation()) + var tests = []struct { + value []string + expectLabel string + expectDefault string + expectLinePart int + }{ + {[]string{"#constant "}, "", "", 0}, // means literally nothing + {[]string{"#constant measurement", "a"}, "_", "a", linePartMeasurement}, + {[]string{"#constant measurement", "a", "b"}, "_", "b", linePartMeasurement}, + {[]string{"#constant measurement", "a", ""}, "_", "a", linePartMeasurement}, + {[]string{"#constant tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag}, + {[]string{"#constant", "tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag}, + {[]string{"#constant field", "fName", "fVal"}, "fName", "fVal", linePartField}, + {[]string{"#constant", "field", "fName", "fVal"}, "fName", "fVal", linePartField}, + {[]string{"dateTime", "1"}, "_", "1", linePartTime}, + {[]string{"dateTime", "1", "2"}, "_", "2", linePartTime}, + {[]string{"dateTime", "", "2"}, "_", "2", linePartTime}, + {[]string{"dateTime", "3", ""}, "_", "3", linePartTime}, + {[]string{"long", "fN", "fV"}, "fN", "fV", 0}, + } + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + table := &CsvTable{} + subject.setupTable(table, test.value) + require.Equal(t, 1, len(table.extraColumns)) + col := table.extraColumns[0] + require.Equal(t, test.expectLinePart, col.LinePart) + require.Greater(t, 0, col.Index) + if test.expectLabel != "_" { + require.Equal(t, test.expectLabel, col.Label) + } else { + require.NotEqual(t, "", col.Label) + } + require.Equal(t, test.expectDefault, col.DefaultValue) + }) + } +} + +// Test_ConcatAnnotation tests #concat annotation +func Test_ConcatAnnotation(t *testing.T) { + subject := annotation("#concat") + require.True(t, subject.matches("#Concat")) + require.True(t, subject.isTableAnnotation()) + var tests = []struct { + value []string + expectLabel string + expectValue string + expectLinePart int + }{ + // all possible specifications + {[]string{"#concat "}, "", "", 0}, // means literally nothing + {[]string{"#concat measurement", "a"}, "_", "a", linePartMeasurement}, + {[]string{"#concat measurement", "a", "b"}, "_", "b", linePartMeasurement}, + {[]string{"#concat measurement", "a", ""}, "_", "a", linePartMeasurement}, + {[]string{"#concat tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag}, + {[]string{"#concat", "tag", "tgName", "tgValue"}, "tgName", "tgValue", linePartTag}, + {[]string{"#concat field", "fName", "fVal"}, "fName", "fVal", linePartField}, + {[]string{"#concat", "field", "fName", "fVal"}, "fName", "fVal", linePartField}, + {[]string{"dateTime", "1"}, "_", "1", linePartTime}, + {[]string{"dateTime", "1", "2"}, "_", "2", linePartTime}, + {[]string{"dateTime", "", "2"}, "_", "2", linePartTime}, + {[]string{"dateTime", "3", ""}, "_", "3", linePartTime}, + {[]string{"long", "fN", "fV"}, "fN", "fV", 0}, + // concat values + {[]string{"string", "fN", "$-${b}-${a}"}, "fN", "$-2-1", 0}, + } + exampleRow := []string{"1", "2"} + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + table := &CsvTable{columns: []*CsvTableColumn{ + {Label: "a", Index: 0}, + {Label: "b", Index: 1}, + }} + subject.setupTable(table, test.value) + // validator + require.Equal(t, 1, len(table.validators)) + require.Equal(t, table.validators[0](table), nil) + // columns + require.Equal(t, 1, len(table.extraColumns)) + col := table.extraColumns[0] + require.Equal(t, test.expectLinePart, col.LinePart) + require.Greater(t, 0, col.Index) + if test.expectLabel != "_" { + require.Equal(t, test.expectLabel, col.Label) + } else { + require.NotEqual(t, "", col.Label) + } + require.Equal(t, test.expectValue, col.Value(exampleRow)) + }) + } + t.Run("concat template references unknown column", func(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + oldPrefix := log.Prefix() + prefix := "::PREFIX::" + log.SetPrefix(prefix) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + + table := &CsvTable{columns: []*CsvTableColumn{ + {Label: "x", Index: 0}, + }} + subject.setupTable(table, []string{"string", "fN", "a${y}-${x}z"}) + require.Equal(t, 1, len(table.validators)) + require.NotNil(t, table.validators[0](table)) + require.Equal(t, + "column 'fN': 'a${y}-${x}z' references an unknown column 'y', available columns are: x", + table.validators[0](table).Error()) + // columns + require.Equal(t, 1, len(table.extraColumns)) + col := table.extraColumns[0] + require.Greater(t, 0, col.Index) + require.Equal(t, "a-1z", col.Value(exampleRow)) + // a warning is printed to console + require.Equal(t, + "::PREFIX::WARNING: column fN: column 'y' cannot be replaced, no such column available", + strings.TrimSpace(buf.String())) + }) +} + +// Test_TimeZoneAnnotation tests #timezone annotation +func Test_TimeZoneAnnotation(t *testing.T) { + subject := annotation("#timezone") + require.True(t, subject.matches("#timeZone")) + require.True(t, subject.isTableAnnotation()) + var tests = []struct { + value string + err string + }{ + {"#timezone ", ""}, + {"#timezone EST", ""}, + {"#timezone,EST", ""}, + {"#timezone,+0100", ""}, + {"#timezone,whatever", "#timezone annotation"}, + } + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + table := &CsvTable{} + err := subject.setupTable(table, strings.Split(test.value, ",")) + if test.err == "" { + require.Nil(t, err) + require.NotNil(t, table.timeZone != nil) + } else { + require.NotNil(t, err) + require.True(t, strings.Contains(fmt.Sprintf("%v", err), test.err)) + } + }) + } +} + +// Test_ParseTimeZone tests parseTimeZone fn +func Test_ParseTimeZone(t *testing.T) { + now := time.Now() + _, localOffset := now.Zone() + var tests = []struct { + value string + offset int + }{ + {"local", localOffset}, + {"Local", localOffset}, + {"-0000", 0}, + {"+0000", 0}, + {"-0100", -3600}, + {"+0100", 3600}, + {"+0130", 3600 + 3600/2}, + {"", 0}, + {"-01", -1}, + {"0000", -1}, + {"UTC", 0}, + {"EST", -5 * 3600}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + tz, err := parseTimeZone(test.value) + require.NotEqual(t, tz, err) // both cannot be nil + if err != nil { + require.Nil(t, tz) + // fmt.Println(err) + if test.offset >= 0 { + require.Fail(t, "offset expected") + } + return + } + require.NotNil(t, tz) + testDate := fmt.Sprintf("%d-%02d-%02d", now.Year(), now.Month(), now.Day()) + result, err := time.ParseInLocation("2006-01-02", testDate, tz) + require.Nil(t, err) + _, offset := result.Zone() + require.Equal(t, test.offset, offset) + }) + } +} diff --git a/pkg/csv2lp/csv_table.go b/pkg/csv2lp/csv_table.go new file mode 100644 index 0000000..97213ba --- /dev/null +++ b/pkg/csv2lp/csv_table.go @@ -0,0 +1,586 @@ +package csv2lp + +import ( + "errors" + "fmt" + "log" + "sort" + "strings" + "time" + "unsafe" +) + +// column labels used in flux CSV result +const ( + labelFieldName = "_field" + labelFieldValue = "_value" + labelTime = "_time" + labelStart = "_start" + labelStop = "_stop" + labelMeasurement = "_measurement" +) + +// types of columns with respect to line protocol +const ( + linePartIgnored = iota + 1 // ignored in line protocol + linePartMeasurement + linePartTag + linePartField + linePartTime +) + +// CsvTableColumn represents processing metadata about a csv column +type CsvTableColumn struct { + // Label is a column label from the header row, such as "_start", "_stop", "_time" + Label string + // DataType such as "string", "long", "dateTime" ... + DataType string + // DataFormat is a format of DataType, such as "RFC3339", "2006-01-02" + DataFormat string + // LinePart is a line part of the column (0 means not determined yet), see linePart constants + LinePart int + // DefaultValue is used when column's value is an empty string. + DefaultValue string + // Index of this column when reading rows, -1 indicates a virtual column with DefaultValue data + Index int + // TimeZone of dateTime column, applied when parsing dateTime DataType + TimeZone *time.Location + // ParseF is an optional function used to convert column's string value to interface{} + ParseF func(value string) (interface{}, error) + // ComputeValue is an optional function used to compute column value out of row data + ComputeValue func(row []string) string + + // escapedLabel contains escaped label that can be directly used in line protocol + escapedLabel string +} + +// LineLabel returns escaped name of the column so it can be then used as a tag name or field name in line protocol +func (c *CsvTableColumn) LineLabel() string { + if len(c.escapedLabel) > 0 { + return c.escapedLabel + } + return c.Label +} + +// Value returns the value of the column for the supplied row +func (c *CsvTableColumn) Value(row []string) string { + if c.Index < 0 || c.Index >= len(row) { + if c.ComputeValue != nil { + return c.ComputeValue(row) + } + return c.DefaultValue + } + val := row[c.Index] + if len(val) > 0 { + return val + } + return c.DefaultValue +} + +// setupDataType setups data type from the value supplied +// +// columnValue contains typeName and possibly additional column metadata, +// it can be +// 1. typeName +// 2. typeName:format +// 3. typeName|defaultValue +// 4. typeName:format|defaultValue +// 5. #anycomment (all options above) +func (c *CsvTableColumn) setupDataType(columnValue string) { + // ignoreLeadingComment is required to specify datatype together with CSV annotation + // in annotations (such as #constant) + columnValue = ignoreLeadingComment(columnValue) + + // | adds a default value to column + pipeIndex := strings.Index(columnValue, "|") + if pipeIndex > 1 { + if c.DefaultValue == "" { + c.DefaultValue = columnValue[pipeIndex+1:] + columnValue = columnValue[:pipeIndex] + } + } + // setup column format + colonIndex := strings.Index(columnValue, ":") + if colonIndex > 1 { + c.DataFormat = columnValue[colonIndex+1:] + columnValue = columnValue[:colonIndex] + } + + // setup column linePart depending dataType + switch { + case columnValue == "tag": + c.LinePart = linePartTag + case strings.HasPrefix(columnValue, "ignore"): + // ignore or ignored + c.LinePart = linePartIgnored + case columnValue == "dateTime": + // dateTime field is used at most once in a protocol line + c.LinePart = linePartTime + case columnValue == "measurement": + c.LinePart = linePartMeasurement + case columnValue == "field": + c.LinePart = linePartField + columnValue = "" // this a generic field without a data type specified + case columnValue == "time": // time is an alias for dateTime + c.LinePart = linePartTime + columnValue = dateTimeDatatype + default: + // nothing to do since we don't know the linePart yet + // the line part is decided in recomputeLineProtocolColumns + } + // setup column data type + c.DataType = columnValue + + // setup custom parsing + if c.DataType == boolDatatype && c.DataFormat != "" { + c.ParseF = createBoolParseFn(c.DataFormat) + return + } + if c.DataType == longDatatype && strings.HasPrefix(c.DataFormat, "strict") { + c.ParseF = createStrictLongParseFn(c.DataFormat[6:]) + return + } + if c.DataType == uLongDatatype && strings.HasPrefix(c.DataFormat, "strict") { + c.ParseF = createStrictUnsignedLongParseFn(c.DataFormat[6:]) + return + } +} + +// CsvColumnError indicates conversion error in a specific column +type CsvColumnError struct { + Column string + Err error +} + +// Error interface implementation +func (e CsvColumnError) Error() string { + return fmt.Sprintf("column '%s': %v", e.Column, e.Err) +} + +// CsvTable contains metadata about columns and a state of the CSV processing +type CsvTable struct { + // columns contains columns that extract values from data rows + columns []*CsvTableColumn + // partBits is a bitmap that is used to remember that a particular column annotation + // (#group, #datatype and #default) was already processed for the table; + // it is used to detect start of a new table in CSV flux results, a repeated annotation + // is detected and a new CsvTable can be then created + partBits uint8 + // readTableData indicates that the table is ready to read table data, which + // is after reading annotation and header rows + readTableData bool + // lpColumnsValid indicates whether line protocol columns are valid or must be re-calculated from columns + lpColumnsValid bool + // extraColumns are added by table-wide annotations, such as #constant + extraColumns []*CsvTableColumn + // ignoreDataTypeInColumnName is true to skip parsing of data type as a part a column name + ignoreDataTypeInColumnName bool + // timeZone of dateTime column(s), applied when parsing dateTime value without a time zone specified + timeZone *time.Location + // validators validate table structure right before processing data rows + validators []func(*CsvTable) error + + /* cached columns are initialized before reading the data rows using the computeLineProtocolColumns fn */ + // cachedMeasurement is a required column that read (line protocol) measurement + cachedMeasurement *CsvTableColumn + // cachedTime is an optional column that reads timestamp of lp row + cachedTime *CsvTableColumn + // cachedFieldName is an optional column that reads a field name to add to the protocol line + cachedFieldName *CsvTableColumn + // cachedFieldValue is an optional column that reads a field value to add to the protocol line + cachedFieldValue *CsvTableColumn + // cachedFields are columns that read field values, a field name is taken from a column label + cachedFields []*CsvTableColumn + // cachedTags are columns that read tag values, a tag name is taken from a column label + cachedTags []*CsvTableColumn +} + +// IgnoreDataTypeInColumnName sets a flag that can ignore dataType parsing in column names. +// When true, column names can then contain '|'. By default, column name can also contain datatype +// and a default value when named `name|datatype` or `name|datatype|default`, +// for example `ready|boolean|true` +func (t *CsvTable) IgnoreDataTypeInColumnName(val bool) { + t.ignoreDataTypeInColumnName = val +} + +// DataColumnsInfo returns a string representation of columns that are used to process CSV data +func (t *CsvTable) DataColumnsInfo() string { + if t == nil { + return "" + } + var builder = strings.Builder{} + t.computeLineProtocolColumns() // censure that ached columns are initialized + builder.WriteString(fmt.Sprintf("CsvTable{ dataColumns: %d constantColumns: %d\n", len(t.columns), len(t.extraColumns))) + builder.WriteString(fmt.Sprintf(" measurement: %+v\n", t.cachedMeasurement)) + for _, col := range t.cachedTags { + builder.WriteString(fmt.Sprintf(" tag: %+v\n", col)) + } + for _, col := range t.cachedFields { + builder.WriteString(fmt.Sprintf(" field: %+v\n", col)) + } + builder.WriteString(fmt.Sprintf(" time: %+v\n", t.cachedTime)) + builder.WriteString("}") + + return builder.String() +} + +// NextTable resets the table to a state in which it expects annotations and header rows +func (t *CsvTable) NextTable() { + t.partBits = 0 // no column annotations parsed yet + t.readTableData = false + t.columns = []*CsvTableColumn{} + t.extraColumns = []*CsvTableColumn{} +} + +// createColumns create a slice of CsvTableColumn for the supplied rowSize +func createColumns(rowSize int) []*CsvTableColumn { + retVal := make([]*CsvTableColumn, rowSize) + for i := 0; i < rowSize; i++ { + retVal[i] = &CsvTableColumn{ + Index: i, + } + } + return retVal +} + +// AddRow updates the state of the CSV table with a new header, annotation or data row. +// Returns true if the row is a data row. +func (t *CsvTable) AddRow(row []string) bool { + // detect data row or table header row + if len(row[0]) == 0 || row[0][0] != '#' { + if !t.readTableData { + // expect a header row + t.lpColumnsValid = false // line protocol columns change + if t.partBits == 0 { + // create columns since no column annotations were processed + t.columns = createColumns(len(row)) + } + // assign column labels for the header row + for i := 0; i < len(t.columns); i++ { + col := t.columns[i] + if len(col.Label) == 0 && col.Index < len(row) { + col.Label = row[col.Index] + // assign column data type if possible + if len(col.DataType) == 0 && !t.ignoreDataTypeInColumnName { + if idx := strings.IndexByte(col.Label, '|'); idx != -1 { + col.setupDataType(col.Label[idx+1:]) + col.Label = col.Label[:idx] + } + } + } + } + // header row is read, now expect data rows + t.readTableData = true + return false + } + return true + } + + // process all supported annotations + for i := 0; i < len(supportedAnnotations); i++ { + supportedAnnotation := supportedAnnotations[i] + if supportedAnnotation.matches(row[0]) { + if len(row[0]) > len(supportedAnnotation.prefix) && row[0][len(supportedAnnotation.prefix)] != ' ' { + continue // ignoring, not a supported annotation + } + t.lpColumnsValid = false // line protocol columns change + if supportedAnnotation.isTableAnnotation() { + // process table-level annotation + if err := supportedAnnotation.setupTable(t, row); err != nil { + log.Println("WARNING: ", err) + } + return false + } + // invariant: !supportedAnnotation.isTableAnnotation() + if t.readTableData { + // any column annotation stops reading of data rows + t.NextTable() + } + // create new columns upon new or repeated column annotation + if t.partBits == 0 || t.partBits&supportedAnnotation.flag == 1 { + t.partBits = supportedAnnotation.flag + t.columns = createColumns(len(row)) + } else { + t.partBits = t.partBits | supportedAnnotation.flag + } + // setup columns according to column annotation + for j := 0; j < len(t.columns); j++ { + col := t.columns[j] + if col.Index >= len(row) { + continue // missing value + } else { + supportedAnnotation.setupColumn(col, row[col.Index]) + } + } + return false + } + } + // warn about unsupported annotation unless a comment row + if !strings.HasPrefix(row[0], "# ") { + log.Println("WARNING: unsupported annotation: ", row[0]) + } + return false +} + +// computeLineProtocolColumns computes columns that are +// used to create line protocol rows when required to do so +// +// returns true if new columns were initialized or false if there +// was no change in line protocol columns +func (t *CsvTable) computeLineProtocolColumns() bool { + if !t.lpColumnsValid { + t.recomputeLineProtocolColumns() + return true + } + return false +} + +// recomputeLineProtocolColumns always computes the columns that are +// used to create line protocol rows +func (t *CsvTable) recomputeLineProtocolColumns() { + // reset results + t.cachedMeasurement = nil + t.cachedTime = nil + t.cachedFieldName = nil + t.cachedFieldValue = nil + t.cachedTags = nil + t.cachedFields = nil + // collect unique tag names (#19453) + var tags = make(map[string]*CsvTableColumn) + + // having a _field column indicates fields without a line type are ignored + defaultIsField := t.Column(labelFieldName) == nil + + // go over columns + extra columns + columns := make([]*CsvTableColumn, len(t.columns)+len(t.extraColumns)) + copy(columns, t.columns) + copy(columns[len(t.columns):], t.extraColumns) + for i := 0; i < len(columns); i++ { + col := columns[i] + switch { + case col.Label == labelMeasurement || col.LinePart == linePartMeasurement: + t.cachedMeasurement = col + case col.Label == labelTime || col.LinePart == linePartTime: + if t.cachedTime != nil && t.cachedTime.Label != labelStart && t.cachedTime.Label != labelStop { + log.Printf("WARNING: at most one dateTime column is expected, '%s' column is ignored\n", t.cachedTime.Label) + } + t.cachedTime = col + case len(strings.TrimSpace(col.Label)) == 0 || col.LinePart == linePartIgnored: + // ignored columns that are marked to be ignored or without a label + case col.Label == labelFieldName: + t.cachedFieldName = col + case col.Label == labelFieldValue: + t.cachedFieldValue = col + case col.LinePart == linePartTag: + if val, found := tags[col.Label]; found { + log.Printf("WARNING: ignoring duplicate tag '%s' at column index %d, using column at index %d\n", col.Label, val.Index, col.Index) + } + col.escapedLabel = escapeTag(col.Label) + tags[col.Label] = col + case col.LinePart == linePartField: + col.escapedLabel = escapeTag(col.Label) + t.cachedFields = append(t.cachedFields, col) + default: + if defaultIsField { + col.escapedLabel = escapeTag(col.Label) + t.cachedFields = append(t.cachedFields, col) + } + } + } + // line protocol requires sorted unique tags + if len(tags) > 0 { + t.cachedTags = make([]*CsvTableColumn, 0, len(tags)) + for _, v := range tags { + t.cachedTags = append(t.cachedTags, v) + } + sort.Slice(t.cachedTags, func(i, j int) bool { + return t.cachedTags[i].Label < t.cachedTags[j].Label + }) + } + // setup timezone for timestamp column + if t.cachedTime != nil && t.cachedTime.TimeZone == nil { + t.cachedTime.TimeZone = t.timeZone + } + + t.lpColumnsValid = true // line protocol columns are now fresh +} + +// CreateLine produces a protocol line out of the supplied row or returns error +func (t *CsvTable) CreateLine(row []string) (line string, err error) { + buffer := make([]byte, 100)[:0] + buffer, err = t.AppendLine(buffer, row, -1) + if err != nil { + return "", err + } + return *(*string)(unsafe.Pointer(&buffer)), nil +} + +// AppendLine appends a protocol line to the supplied buffer using a CSV row and returns appended buffer or an error if any +func (t *CsvTable) AppendLine(buffer []byte, row []string, lineNumber int) ([]byte, error) { + if t.computeLineProtocolColumns() { + // validate column data types + if t.cachedFieldValue != nil && !IsTypeSupported(t.cachedFieldValue.DataType) { + return buffer, CsvColumnError{ + t.cachedFieldValue.Label, + fmt.Errorf("data type '%s' is not supported", t.cachedFieldValue.DataType), + } + } + for _, c := range t.cachedFields { + if !IsTypeSupported(c.DataType) { + return buffer, CsvColumnError{ + c.Label, + fmt.Errorf("data type '%s' is not supported", c.DataType), + } + } + } + for _, v := range t.validators { + if err := v(t); err != nil { + return buffer, err + } + } + } + + if t.cachedMeasurement == nil { + return buffer, errors.New("no measurement column found") + } + measurement := t.cachedMeasurement.Value(row) + if measurement == "" { + return buffer, CsvColumnError{ + t.cachedMeasurement.Label, + errors.New("no measurement supplied"), + } + } + buffer = append(buffer, escapeMeasurement(measurement)...) + for _, tag := range t.cachedTags { + value := tag.Value(row) + if tag.Index < len(row) && len(value) > 0 { + buffer = append(buffer, ',') + buffer = append(buffer, tag.LineLabel()...) + buffer = append(buffer, '=') + buffer = append(buffer, escapeTag(value)...) + } + } + buffer = append(buffer, ' ') + fieldAdded := false + if t.cachedFieldName != nil && t.cachedFieldValue != nil { + field := t.cachedFieldName.Value(row) + value := t.cachedFieldValue.Value(row) + if len(value) > 0 && len(field) > 0 { + buffer = append(buffer, escapeTag(field)...) + buffer = append(buffer, '=') + var err error + buffer, err = appendConverted(buffer, value, t.cachedFieldValue, lineNumber) + if err != nil { + return buffer, CsvColumnError{ + t.cachedFieldName.Label, + err, + } + } + fieldAdded = true + } + } + for _, field := range t.cachedFields { + value := field.Value(row) + if len(value) > 0 { + if !fieldAdded { + fieldAdded = true + } else { + buffer = append(buffer, ',') + } + buffer = append(buffer, field.LineLabel()...) + buffer = append(buffer, '=') + var err error + buffer, err = appendConverted(buffer, value, field, lineNumber) + if err != nil { + return buffer, CsvColumnError{ + field.Label, + err, + } + } + } + } + if !fieldAdded { + return buffer, errors.New("no field data found") + } + + if t.cachedTime != nil && t.cachedTime.Index < len(row) { + timeVal := t.cachedTime.Value(row) + if len(timeVal) > 0 { + if len(t.cachedTime.DataType) == 0 { + // assume dateTime data type (number or RFC3339) + t.cachedTime.DataType = dateTimeDatatype + t.cachedTime.DataFormat = "" + } + buffer = append(buffer, ' ') + var err error + buffer, err = appendConverted(buffer, timeVal, t.cachedTime, lineNumber) + if err != nil { + return buffer, CsvColumnError{ + t.cachedTime.Label, + err, + } + } + } + } + return buffer, nil +} + +// Column returns the first column of the supplied label or nil +func (t *CsvTable) Column(label string) *CsvTableColumn { + for i := 0; i < len(t.columns); i++ { + if t.columns[i].Label == label { + return t.columns[i] + } + } + return nil +} + +// Columns returns available columns +func (t *CsvTable) Columns() []*CsvTableColumn { + return t.columns +} + +// ColumnLabels returns available columns labels +func (t *CsvTable) ColumnLabels() []string { + labels := make([]string, len(t.columns)) + for i, col := range t.columns { + labels[i] = col.Label + } + return labels +} + +// Measurement returns measurement column or nil +func (t *CsvTable) Measurement() *CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedMeasurement +} + +// Time returns time column or nil +func (t *CsvTable) Time() *CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedTime +} + +// FieldName returns field name column or nil +func (t *CsvTable) FieldName() *CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedFieldName +} + +// FieldValue returns field value column or nil +func (t *CsvTable) FieldValue() *CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedFieldValue +} + +// Tags returns tags +func (t *CsvTable) Tags() []*CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedTags +} + +// Fields returns fields +func (t *CsvTable) Fields() []*CsvTableColumn { + t.computeLineProtocolColumns() + return t.cachedFields +} diff --git a/pkg/csv2lp/csv_table_test.go b/pkg/csv2lp/csv_table_test.go new file mode 100644 index 0000000..45585fe --- /dev/null +++ b/pkg/csv2lp/csv_table_test.go @@ -0,0 +1,566 @@ +package csv2lp + +import ( + "encoding/csv" + "io" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func readCsv(t *testing.T, data string) [][]string { + reader := csv.NewReader(strings.NewReader(data)) + var rows [][]string + for { + row, err := reader.Read() + reader.FieldsPerRecord = 0 // every row can have different number of fields + if err == io.EOF { + break + } + if err != nil { + t.Log("row: ", row) + t.Log(err) + t.Fail() + } + rows = append(rows, row) + } + return rows +} + +// Test_CsvTable_FluxQueryResult tests construction of table columns and data from a Flux Query CSV result +func Test_CsvTable_FluxQueryResult(t *testing.T) { + const csvQueryResult = ` +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host +,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:17:57Z,0,time_steal,cpu,cpu1,rsavage.prod +,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:07Z,0,time_steal,cpu,cpu1,rsavage.prod + +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host +,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:01Z,2.7263631815907954,usage_user,cpu,cpu-total,tahoecity.prod +,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:11Z,2.247752247752248,usage_user,cpu,cpu-total,tahoecity.prod +#unre` + + var lineProtocolQueryResult = []string{ + "cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669077000000000", + "cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669087000000000", + "cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.7263631815907954 1582669081000000000", + "cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.247752247752248 1582669091000000000", + } + + table := CsvTable{} + rows := readCsv(t, csvQueryResult) + lineProtocolIndex := 0 + for i, row := range rows { + rowProcessed := table.AddRow(row) + if i%6 < 4 { + require.Equal(t, rowProcessed, false, "row %d", i) + } else { + require.Equal(t, rowProcessed, true, "row %d", i) + line, _ := table.CreateLine(row) + require.Equal(t, lineProtocolQueryResult[lineProtocolIndex], line) + lineProtocolIndex++ + if i%6 == 4 { + // verify table + require.GreaterOrEqual(t, len(table.columns), 10) + require.Equal(t, table.columns, table.Columns()) + for j, col := range table.columns { + if j > 0 { + require.Equal(t, col.Index, j) + require.Equal(t, col.Label, rows[i-1][j]) + if len(rows[i-2]) > j { + require.Equal(t, col.DefaultValue, rows[i-2][j]) + } else { + // some traling data are missing + require.Equal(t, col.DefaultValue, "") + } + types := strings.Split(rows[i-3][j], ":") + require.Equal(t, types[0], col.DataType, "row %d, col %d", i-3, j) + if len(types) > 1 { + require.Equal(t, types[1], col.DataFormat, "row %d, col %d", i-3, j) + } + } + } + // verify cached values + table.computeLineProtocolColumns() + require.Equal(t, table.Column("_measurement"), table.cachedMeasurement) + require.Nil(t, table.Column("_no")) + require.NotNil(t, table.cachedMeasurement) + require.NotNil(t, table.cachedFieldName) + require.NotNil(t, table.cachedFieldValue) + require.NotNil(t, table.cachedTime) + require.NotNil(t, table.cachedTags) + require.Equal(t, table.Measurement().Label, "_measurement") + require.Equal(t, table.FieldName().Label, "_field") + require.Equal(t, table.FieldValue().Label, "_value") + require.Equal(t, table.Time().Label, "_time") + require.Equal(t, len(table.Tags()), 2) + require.Equal(t, table.Tags()[0].Label, "cpu") + require.Equal(t, table.Tags()[1].Label, "host") + require.Equal(t, len(table.Fields()), 0) + require.Contains(t, table.ColumnLabels(), "_measurement") + } + } + } +} + +//Test_IgnoreLeadingComment tests ignoreLeadingComment fn +func Test_IgnoreLeadingComment(t *testing.T) { + var tests = []struct { + value string + expect string + }{ + {"", ""}, + {"a", "a"}, + {" #whatever", " #whatever"}, + {"#whatever", ""}, + {"#whatever ", ""}, + {"#whatever a b ", "a b "}, + {"#whatever a b ", "a b "}, + } + for _, test := range tests { + t.Run(test.value, func(t *testing.T) { + require.Equal(t, test.expect, ignoreLeadingComment(test.value)) + }) + } + +} + +// Test_CsvTableProcessing tests data processing in CsvTable +func Test_CsvTableProcessing(t *testing.T) { + var tests = []struct { + name string + csv string + line string + }{ + { + "simple1", + "_measurement,a,b\ncpu,1,1", + "cpu a=1,b=1", + }, + { + "simple1b", + "_measurement,,a,b\ncpu,whatever,1,1", + "cpu a=1,b=1", + }, + { + "simple2", + "_measurement\ncpu,1,1", + "", // no fields present + }, + { + "simple3", + "_time\n1,1", + "", // no measurement present + }, + { + "annotated1", + "#datatype measurement,,\nmeasurement,a,b\ncpu,1,2", + "cpu a=1,b=2", + }, + { + "annotated2", + "#datatype measurement,tag,field\nmeasurement,a,b\ncpu,1,2", + "cpu,a=1 b=2", + }, + { + "annotated3", + "#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2,3", + "cpu,a=1 time=3 2", + }, + { + "annotated3_detectedTime1", + "#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2020-01-10T10:10:10Z,3", + "cpu,a=1 time=3 1578651010000000000", + }, + { + "annotated3_detectedTime2", + "#datatype measurement,tag,dateTime,field\nmeasurement,a,b,time\ncpu,1,2020-01-10T10:10:10.0Z,3", + "cpu,a=1 time=3 1578651010000000000", + }, + { + "annotated4", + "#datatype measurement,tag,ignore,field\nmeasurement,a,b,time\ncpu,1,2,3", + "cpu,a=1 time=3", + }, + { + "annotated5", + "#datatype measurement,tag,ignore,field\nmeasurement,a,b,time\ncpu,1,2,3", + "cpu,a=1 time=3", + }, + { + "annotated6", + "#datatype measurement,tag,ignore,field\n" + + "#datatypea tag,tag,\n" + // this must be ignored since it not a supported annotation + "measurement,a,b,time\ncpu,1,2,3", + "cpu,a=1 time=3", + }, + { + "annotated7", + "#datatype measurement,dateTime,\nmeasurement,a,b\ncpu,2020-01-10T10:10:10.0Z,2", + "cpu b=2 1578651010000000000", + }, + { + "annotated8", + "#datatype measurement,,,field\nmeasurement,_field,_value,other\ncpu,a,1,2", + "cpu a=1,other=2", + }, + { + "annotated9_sortedTags", + "#datatype measurement,tag,tag,time,field\nmeasurement,b,a,c,time\ncpu,1,2,3,4", + "cpu,a=2,b=1 time=4 3", + }, + { + "allFieldTypes", + "#datatype measurement,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime\n" + + "m,s,d,b,l,ul,dur,by,d1,d2,time\n" + + `cpu,"str",1.0,true,1,1,1ms,YWFh,1`, + "cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1", + }, + { + "allFieldTypes", + "#datatype measurement,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime\n" + + "m,s,d,b,l,ul,dur,by,d1,d2,time\n" + + `cpu,"str",1.0,true,1,1,1ms,YWFh,1`, + "cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1", + }, + { + "allFieldTypes_ignoreAdditionalDateTimes", + "#datatype ,string,double,boolean,long,unsignedLong,duration,base64Binary,dateTime:RFC3339,dateTime:RFC3339Nano,\n" + + "_measurement,s,d,b,l,ul,dur,by,d1,d2,_time\n" + + `cpu,"str",1.0,true,1,1,1ms,YWFh,2020-01-10T10:10:10Z,2020-01-10T10:10:10Z,1`, + "cpu s=\"str\",d=1,b=true,l=1i,ul=1u,dur=1000000i,by=YWFh 1", + }, + { + "allExtraDataTypes", + "#datatype measurement,tag,field,ignored,dateTime\n" + + "m,t,f,i,dt\n" + + `cpu,myTag,0,myIgnored,1`, + "cpu,t=myTag f=0 1", + }, + { + "allTypes_escaped", + "#datatype ,string,string,,,,\n" + + `_measurement,s1,s2,"a,","b ",c=` + "\n" + + `"cpu, ","""",\,a,b,c`, + `cpu\,\ s1="\"",s2="\\",a\,=a,b\ =b,c\==c`, + }, + { + "default_values", + "#default cpu,yes,0,1\n#datatype ,tag,,\n_measurement,test,col1,_time\n,,,", + "cpu,test=yes col1=0 1", + }, + { + "no duplicate tags", // duplicate tags are ignored, the last column wins, https://github.com/influxdata/influxdb/issues/19453 + "#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string,string,string,string,string,string,string\n" + + "#group,true,true,false,false,false,false,true,true,true,true,true,true,true,true,true,true\n" + + "#default,_result,,,,,,,,,,,,,,,\n" + + ",result,table,_start,_stop,_time,_value,_field,_measurement,env,host,hostname,nodename,org,result,table,url\n" + + ",,0,2020-08-26T23:10:54.023607624Z,2020-08-26T23:15:54.023607624Z,2020-08-26T23:11:00Z,0,0.001,something,host,pod,node,host,,success,role,http://127.0.0.1:8099/metrics\n", + "something,env=host,host=pod,hostname=node,nodename=host,result=success,table=role,url=http://127.0.0.1:8099/metrics 0.001=0 1598483460000000000", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + rows := readCsv(t, test.csv) + table := CsvTable{} + var lines []string + for _, row := range rows { + rowProcessed := table.AddRow(row) + if rowProcessed { + line, err := table.CreateLine(row) + if err != nil && test.line != "" { + require.Nil(t, err.Error()) + } + lines = append(lines, line) + } + } + require.Equal(t, []string{test.line}, lines) + }) + } +} + +// Test_ConstantAnnotations tests processing of constant annotations +func Test_ConstantAnnotations(t *testing.T) { + var tests = []struct { + name string + csv string + line string + }{ + { + "measurement_1", + "#constant measurement,cpu\n" + + "a,b\n" + + "1,1", + "cpu a=1,b=1", + }, + { + "measurement_2", + "#constant,measurement,,cpu\n" + + "#constant,tag,cpu,cpu1\n" + + "#constant,long,of,0\n" + + "#constant,dateTime,,2\n" + + "a,b\n" + + "1,1", + "cpu,cpu=cpu1 a=1,b=1,of=0i 2", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + rows := readCsv(t, test.csv) + table := CsvTable{} + var lines []string + for _, row := range rows { + rowProcessed := table.AddRow(row) + if rowProcessed { + line, err := table.CreateLine(row) + if err != nil && test.line != "" { + require.Nil(t, err.Error()) + } + lines = append(lines, line) + } + } + require.Equal(t, []string{test.line}, lines) + }) + } +} + +// Test_ConcatAnnotations tests processing of concat annotations +func Test_ConcatAnnotations(t *testing.T) { + var tests = []struct { + name string + csv string + line string + }{ + { + "measurement_1", + "#concat measurement,cpu\n" + + "a,b\n" + + "1,1", + "cpu a=1,b=1", + }, + { + "measurement_2", + "#concat,measurement,${a}${b}\n" + + "#constant,tag,cpu,cpu1\n" + + "#constant,long,of,0\n" + + "#constant,dateTime,,2\n" + + "a,b\n" + + "1,1", + "11,cpu=cpu1 a=1,b=1,of=0i 2", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + rows := readCsv(t, test.csv) + table := CsvTable{} + var lines []string + for _, row := range rows { + rowProcessed := table.AddRow(row) + if rowProcessed { + line, err := table.CreateLine(row) + if err != nil && test.line != "" { + require.Nil(t, err.Error()) + } + lines = append(lines, line) + } + } + require.Equal(t, []string{test.line}, lines) + }) + } +} + +// Test_DataTypeInColumnName tests specification of column data type in the header row +func Test_DataTypeInColumnName(t *testing.T) { + var tests = []struct { + csv string + line string + ignoreDataTypeInColumnName bool + error string + }{ + { + csv: "m|measurement,b|boolean:x:,c|boolean:x:|x\n" + + "cpu,,", + line: `cpu c=true`, + }, + { + csv: "m|measurement,a|boolean,b|boolean:0:1,c|boolean:x:,d|boolean:x:\n" + + "cpu,1,1,x,y", + line: `cpu a=true,b=false,c=true,d=false`, + }, + { + csv: "#constant measurement,cpu\n" + + "a|long,b|string\n" + + "1,1", + line: `cpu a=1i,b="1"`, + }, + { + csv: "#constant measurement,cpu\n" + + "a|long,b|string\n" + + "1,1", + line: `cpu a|long=1,b|string=1`, + ignoreDataTypeInColumnName: true, + }, + { + csv: "#constant measurement,cpu\n" + + "#datatype long,string\n" + + "a|long,b|string\n" + + "1,1", + line: `cpu a|long=1i,b|string="1"`, + ignoreDataTypeInColumnName: true, + }, + { + csv: "#constant measurement,cpu\n" + + "a|long:strict: ,b|unsignedLong:strict: \n" + + "1 2,1 2", + line: `cpu a=12i,b=12u`, + }, + { + csv: "#constant measurement,cpu\n" + + "a|long:strict\n" + + "1.1,1", + error: "column 'a': '1.1' cannot fit into long data type", + }, + { + csv: "#constant measurement,cpu\n" + + "a|unsignedLong:strict\n" + + "1.1,1", + error: "column 'a': '1.1' cannot fit into unsignedLong data type", + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + rows := readCsv(t, test.csv) + table := CsvTable{} + table.IgnoreDataTypeInColumnName(test.ignoreDataTypeInColumnName) + var lines []string + for _, row := range rows { + rowProcessed := table.AddRow(row) + if rowProcessed { + line, err := table.CreateLine(row) + if err != nil { + if test.error == "" { + require.Nil(t, err.Error()) + } else { + require.Equal(t, test.error, err.Error()) + } + } + lines = append(lines, line) + } + } + require.Equal(t, []string{test.line}, lines) + }) + } +} + +// Test_CsvTable_dataErrors tests reporting of table data errors +func Test_CsvTable_dataErrors(t *testing.T) { + var tests = []struct { + name string + csv string + }{ + { + "error_1_is_not_dateTime:RFC3339", + "#datatype measurement,,\n#datatype ,dateTime:RFC3339,\nmeasurement,a,b\ncpu,1,2", + }, + { + "error_a_fieldValue_is_not_long", + "#datatype measurement,,\n#datatype ,long,\nmeasurement,_value,_field\ncpu,a,count", + }, + { + "error_a_is_not_long", + "#datatype measurement,,\n#datatype ,long,\nmeasurement,a,b\ncpu,a,2", + }, + { + "error_time_is_not_time", + "#datatype measurement,tag,time,field\nmeasurement,a,b,time\ncpu,1,2020-10,3", + }, + { + "error_no_measurement", + "#datatype ,\ncol1,col2\n1,2", + }, + { + "error_unsupportedFieldDataType", + "#datatype ,whatever\n_measurement,col2\na,2", + }, + { + "error_unsupportedFieldValueDataType", + "#datatype ,,whatever\n_measurement,_field,_value\na,1,2", + }, + { + "error_no_measurement_data", + "_measurement,col1\n,2", + }, + { + "error_derived_column_missing reference", + "#concat string,d,${col1}${col2}\n_measurement,col1\nm,2", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + rows := readCsv(t, test.csv) + table := CsvTable{} + var errors []error + for _, row := range rows { + rowProcessed := table.AddRow(row) + if rowProcessed { + _, err := table.CreateLine(row) + if err != nil { + errors = append(errors, err) + } + } + } + require.Equal(t, 1, len(errors)) + // fmt.Println(errors[0]) + require.NotNil(t, errors[0].Error()) + // LineLabel is the same as Label in all tested columns + for _, col := range table.Columns() { + require.Equal(t, col.Label, col.LineLabel()) + } + }) + } +} + +// Test_CsvTable_DataColumnsInfo tests reporting of table columns +func Test_CsvTable_DataColumnsInfo(t *testing.T) { + data := "#constant,measurement,cpu\n" + + "#constant,tag,xpu,xpu1\n" + + "#constant,tag,cpu,cpu1\n" + + "#constant,long,of,100\n" + + "#constant,dateTime,2\n" + + "x,y\n" + table := CsvTable{} + for _, row := range readCsv(t, data) { + require.False(t, table.AddRow(row)) + } + table.computeLineProtocolColumns() + // expected result is something like this: + // "CsvTable{ dataColumns: 2 constantColumns: 5\n" + + // " measurement: &{Label:#constant measurement DataType:measurement DataFormat: LinePart:2 DefaultValue:cpu Index:-1 TimeZone:UTC ParseF: escapedLabel:}\n" + + // " tag: {Label:cpu DataType:tag DataFormat: LinePart:3 DefaultValue:cpu1 Index:-1 TimeZone:UTC ParseF: escapedLabel:cpu}\n" + + // " tag: {Label:xpu DataType:tag DataFormat: LinePart:3 DefaultValue:xpu1 Index:-1 TimeZone:UTC ParseF: escapedLabel:xpu}\n" + + // " field: {Label:x DataType: DataFormat: LinePart:0 DefaultValue: Index:0 TimeZone:UTC ParseF: escapedLabel:x}\n" + + // " field: {Label:y DataType: DataFormat: LinePart:0 DefaultValue: Index:1 TimeZone:UTC ParseF: escapedLabel:y}\n" + + // " field: {Label:of DataType:long DataFormat: LinePart:0 DefaultValue:100 Index:-1 TimeZone:UTC ParseF: escapedLabel:of}\n" + + // " time: &{Label:#constant dateTime DataType:dateTime DataFormat: LinePart:5 DefaultValue:2 Index:-1 TimeZone:UTC ParseF: escapedLabel:}" + + // "\n}" + result := table.DataColumnsInfo() + require.Equal(t, 1, strings.Count(result, "measurement:")) + require.Equal(t, 2, strings.Count(result, "tag:")) + require.Equal(t, 3, strings.Count(result, "field:")) + require.Equal(t, 1, strings.Count(result, "time:")) + + var table2 *CsvTable + require.Equal(t, "", table2.DataColumnsInfo()) +} diff --git a/pkg/csv2lp/data_conversion.go b/pkg/csv2lp/data_conversion.go new file mode 100644 index 0000000..b004157 --- /dev/null +++ b/pkg/csv2lp/data_conversion.go @@ -0,0 +1,341 @@ +package csv2lp + +import ( + "encoding/base64" + "errors" + "fmt" + "io" + "log" + "math" + "strconv" + "strings" + "time" + + "golang.org/x/text/encoding/ianaindex" +) + +// see https://docs.influxdata.com/influxdb/latest/reference/syntax/annotated-csv/#data-types +const ( + stringDatatype = "string" + doubleDatatype = "double" + boolDatatype = "boolean" + longDatatype = "long" + uLongDatatype = "unsignedLong" + durationDatatype = "duration" + base64BinaryDataType = "base64Binary" + dateTimeDatatype = "dateTime" +) + +// predefined dateTime formats +const ( + RFC3339 = "RFC3339" + RFC3339Nano = "RFC3339Nano" + dataFormatNumber = "number" //the same as long, but serialized without i suffix, used for timestamps +) + +var supportedDataTypes map[string]struct{} + +func init() { + supportedDataTypes = make(map[string]struct{}, 9) + supportedDataTypes[stringDatatype] = struct{}{} + supportedDataTypes[doubleDatatype] = struct{}{} + supportedDataTypes[boolDatatype] = struct{}{} + supportedDataTypes[longDatatype] = struct{}{} + supportedDataTypes[uLongDatatype] = struct{}{} + supportedDataTypes[durationDatatype] = struct{}{} + supportedDataTypes[base64BinaryDataType] = struct{}{} + supportedDataTypes[dateTimeDatatype] = struct{}{} + supportedDataTypes[""] = struct{}{} +} + +// IsTypeSupported returns true if the data type is supported +func IsTypeSupported(dataType string) bool { + _, supported := supportedDataTypes[dataType] + return supported +} + +var replaceMeasurement *strings.Replacer = strings.NewReplacer(",", "\\,", " ", "\\ ") +var replaceTag *strings.Replacer = strings.NewReplacer(",", "\\,", " ", "\\ ", "=", "\\=") +var replaceQuoted *strings.Replacer = strings.NewReplacer("\"", "\\\"", "\\", "\\\\") + +func escapeMeasurement(val string) string { + for i := 0; i < len(val); i++ { + if val[i] == ',' || val[i] == ' ' { + return replaceMeasurement.Replace(val) + } + } + return val +} +func escapeTag(val string) string { + for i := 0; i < len(val); i++ { + if val[i] == ',' || val[i] == ' ' || val[i] == '=' { + return replaceTag.Replace(val) + } + } + return val +} +func escapeString(val string) string { + for i := 0; i < len(val); i++ { + if val[i] == '"' || val[i] == '\\' { + return replaceQuoted.Replace(val) + } + } + return val +} + +// normalizeNumberString normalizes the supplied value according to the supplied format. +// This normalization is intended to convert number strings of different locales to a strconv-parsable value. +// +// The format's first character is a fraction delimiter character. Next characters in the format +// are simply removed, they are typically used to visually separate groups in large numbers. +// The removeFraction parameter controls whether the returned value can contain also the fraction part. +// An empty format means ". \n\t\r_" +// +// For example, to get a strconv-parsable float from a Spanish value '3.494.826.157,123', use format ",." . +func normalizeNumberString(value string, format string, removeFraction bool) (normalized string, truncated bool) { + if len(format) == 0 { + format = ". \n\t\r_" + } + if strings.ContainsAny(value, format) { + formatRunes := []rune(format) + fractionRune := formatRunes[0] + ignored := formatRunes[1:] + retVal := strings.Builder{} + retVal.Grow(len(value)) + ForAllCharacters: + for _, c := range value { + // skip ignored characters + for i := 0; i < len(ignored); i++ { + if c == ignored[i] { + continue ForAllCharacters + } + } + if c == fractionRune { + if removeFraction { + return retVal.String(), true + } + retVal.WriteByte('.') + continue + } + retVal.WriteRune(c) + } + + return retVal.String(), false + } + return value, false +} + +func toTypedValue(val string, column *CsvTableColumn, lineNumber int) (interface{}, error) { + dataType := column.DataType + dataFormat := column.DataFormat + if column.ParseF != nil { + return column.ParseF(val) + } + switch dataType { + case stringDatatype: + return val, nil + case dateTimeDatatype: + switch dataFormat { + case "": // number or time.RFC3339 + t, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return time.Parse(time.RFC3339, val) + } + return time.Unix(0, t).UTC(), nil + case RFC3339: + return time.Parse(time.RFC3339, val) + case RFC3339Nano: + return time.Parse(time.RFC3339Nano, val) + case dataFormatNumber: + t, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return nil, err + } + return time.Unix(0, t).UTC(), nil + default: + if column.TimeZone != nil { + return time.ParseInLocation(dataFormat, val, column.TimeZone) + } + return time.Parse(dataFormat, val) + } + case durationDatatype: + return time.ParseDuration(val) + case doubleDatatype: + normalized, _ := normalizeNumberString(val, dataFormat, false) + return strconv.ParseFloat(normalized, 64) + case boolDatatype: + switch { + case len(val) == 0: + return nil, errors.New("Unsupported boolean value '" + val + "' , first character is expected to be 't','f','0','1','y','n'") + case val[0] == 't' || val[0] == 'T' || val[0] == 'y' || val[0] == 'Y' || val[0] == '1': + return true, nil + case val[0] == 'f' || val[0] == 'F' || val[0] == 'n' || val[0] == 'N' || val[0] == '0': + return false, nil + default: + return nil, errors.New("Unsupported boolean value '" + val + "' , first character is expected to be 't','f','0','1','y','n'") + } + case longDatatype: + normalized, truncated := normalizeNumberString(val, dataFormat, true) + if truncated { + error := CreateRowColumnError(lineNumber, column.Label, + fmt.Errorf("'%s' truncated to '%s' to fit into long data type", val, normalized)) + log.Printf("WARNING: %v\n", error) + } + return strconv.ParseInt(normalized, 10, 64) + case uLongDatatype: + normalized, truncated := normalizeNumberString(val, dataFormat, true) + if truncated { + error := CreateRowColumnError(lineNumber, column.Label, + fmt.Errorf("'%s' truncated to '%s' to fit into unsignedLong data type", val, normalized)) + log.Printf("WARNING: %v\n", error) + } + return strconv.ParseUint(normalized, 10, 64) + case base64BinaryDataType: + return base64.StdEncoding.DecodeString(val) + default: + return nil, fmt.Errorf("unsupported data type '%s'", dataType) + } +} + +func appendProtocolValue(buffer []byte, value interface{}) ([]byte, error) { + switch v := value.(type) { + case uint64: + return append(strconv.AppendUint(buffer, v, 10), 'u'), nil + case int64: + return append(strconv.AppendInt(buffer, v, 10), 'i'), nil + case int: + return append(strconv.AppendInt(buffer, int64(v), 10), 'i'), nil + case float64: + if math.IsNaN(v) { + return buffer, errors.New("value is NaN") + } + if math.IsInf(v, 0) { + return buffer, errors.New("value is Infinite") + } + return strconv.AppendFloat(buffer, v, 'f', -1, 64), nil + case float32: + v32 := float64(v) + if math.IsNaN(v32) { + return buffer, errors.New("value is NaN") + } + if math.IsInf(v32, 0) { + return buffer, errors.New("value is Infinite") + } + return strconv.AppendFloat(buffer, v32, 'f', -1, 64), nil + case string: + buffer = append(buffer, '"') + buffer = append(buffer, escapeString(v)...) + buffer = append(buffer, '"') + return buffer, nil + case []byte: + buf := make([]byte, base64.StdEncoding.EncodedLen(len(v))) + base64.StdEncoding.Encode(buf, v) + return append(buffer, buf...), nil + case bool: + if v { + return append(buffer, "true"...), nil + } + return append(buffer, "false"...), nil + case time.Time: + return strconv.AppendInt(buffer, v.UnixNano(), 10), nil + case time.Duration: + return append(strconv.AppendInt(buffer, v.Nanoseconds(), 10), 'i'), nil + default: + return buffer, fmt.Errorf("unsupported value type: %T", v) + } +} + +func appendConverted(buffer []byte, val string, column *CsvTableColumn, lineNumber int) ([]byte, error) { + if len(column.DataType) == 0 { // keep the value as it is + return append(buffer, val...), nil + } + typedVal, err := toTypedValue(val, column, lineNumber) + if err != nil { + return buffer, err + } + return appendProtocolValue(buffer, typedVal) +} + +func decodeNop(reader io.Reader) io.Reader { + return reader +} + +// CreateDecoder creates a decoding reader from the supplied encoding to UTF-8, or returns an error +func CreateDecoder(encoding string) (func(io.Reader) io.Reader, error) { + if len(encoding) > 0 && encoding != "UTF-8" { + enc, err := ianaindex.IANA.Encoding(encoding) + if err != nil { + return nil, fmt.Errorf("%v, see https://www.iana.org/assignments/character-sets/character-sets.xhtml", err) + } + if enc == nil { + return nil, fmt.Errorf("unsupported encoding: %s", encoding) + } + return enc.NewDecoder().Reader, nil + } + return decodeNop, nil +} + +// createBoolParseFn returns a function that converts a string value to boolean according to format "true,yes,1:false,no,0" +func createBoolParseFn(format string) func(string) (interface{}, error) { + var err error = nil + truthy := []string{} + falsy := []string{} + if !strings.Contains(format, ":") { + err = fmt.Errorf("unsupported boolean format: %s should be in 'true,yes,1:false,no,0' format, but no ':' is present", format) + } else { + colon := strings.Index(format, ":") + t := format[:colon] + f := format[colon+1:] + if t != "" { + truthy = strings.Split(t, ",") + } + if f != "" { + falsy = strings.Split(f, ",") + } + } + return func(val string) (interface{}, error) { + if err != nil { + return nil, err + } + for _, s := range falsy { + if s == val { + return false, nil + } + } + for _, s := range truthy { + if s == val { + return true, nil + } + } + if len(falsy) == 0 { + return false, nil + } + if len(truthy) == 0 { + return true, nil + } + + return nil, fmt.Errorf("unsupported boolean value: %s must one of %v or one of %v", val, truthy, falsy) + } +} + +// createStrictLongParseFn returns a function that converts a string value to long and fails also when a fraction digit is detected +func createStrictLongParseFn(dataFormat string) func(string) (interface{}, error) { + return func(val string) (interface{}, error) { + normalized, truncated := normalizeNumberString(val, dataFormat, true) + if truncated { + return 0, fmt.Errorf("'%s' cannot fit into long data type", val) + } + return strconv.ParseInt(normalized, 10, 64) + } +} + +// createStrictUnsignedLongParseFn returns a function that converts a string value to unsigned long and fails when a fraction digit is detected +func createStrictUnsignedLongParseFn(dataFormat string) func(string) (interface{}, error) { + return func(val string) (interface{}, error) { + normalized, truncated := normalizeNumberString(val, dataFormat, true) + if truncated { + return 0, fmt.Errorf("'%s' cannot fit into unsignedLong data type", val) + } + return strconv.ParseUint(normalized, 10, 64) + } +} diff --git a/pkg/csv2lp/data_conversion_test.go b/pkg/csv2lp/data_conversion_test.go new file mode 100644 index 0000000..6bd0aa3 --- /dev/null +++ b/pkg/csv2lp/data_conversion_test.go @@ -0,0 +1,350 @@ +package csv2lp + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "math" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// Test_EscapeMeasurement tests escapeMeasurement function +func Test_escapeMeasurement(t *testing.T) { + var tests = []struct { + value string + expect string + }{ + {"a", "a"}, {"", ""}, + {"a,", `a\,`}, + {"a ", `a\ `}, + {"a=", `a=`}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + require.Equal(t, test.expect, escapeMeasurement(test.value)) + }) + } +} + +// Test_EscapeTag tests escapeTag function +func Test_EscapeTag(t *testing.T) { + var tests = []struct { + value string + expect string + }{ + {"a", "a"}, {"", ""}, + {"a,", `a\,`}, + {"a ", `a\ `}, + {"a=", `a\=`}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + require.Equal(t, test.expect, escapeTag(test.value)) + }) + } +} + +// Test_EscapeString tests escapeString function +func Test_EscapeString(t *testing.T) { + var tests = []struct { + value string + expect string + }{ + {"a", `a`}, {"", ``}, + {`a"`, `a\"`}, + {`a\`, `a\\`}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + require.Equal(t, test.expect, escapeString(test.value)) + }) + } +} + +// Test_ToTypedValue tests toTypedValue function +func Test_ToTypedValue(t *testing.T) { + epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z") + var tests = []struct { + dataType string + value string + expect interface{} + }{ + {"string", "a", "a"}, + {"double", "1.0", float64(1.0)}, + {"boolean", "true", true}, + {"boolean", "True", true}, + {"boolean", "y", true}, + {"boolean", "Yes", true}, + {"boolean", "1", true}, + {"boolean", "false", false}, + {"boolean", "False", false}, + {"boolean", "n", false}, + {"boolean", "No", false}, + {"boolean", "0", false}, + {"boolean", "", nil}, + {"boolean", "?", nil}, + {"long", "1", int64(1)}, + {"unsignedLong", "1", uint64(1)}, + {"duration", "1ns", time.Duration(1)}, + {"base64Binary", "YWFh", []byte("aaa")}, + {"dateTime:RFC3339", "1970-01-01T00:00:00Z", epochTime}, + {"dateTime:RFC3339Nano", "1970-01-01T00:00:00.0Z", epochTime}, + {"dateTime:RFC3339", "1970-01-01T00:00:00.000000001Z", epochTime.Add(time.Duration(1))}, + {"dateTime:RFC3339Nano", "1970-01-01T00:00:00.000000002Z", epochTime.Add(time.Duration(2))}, + {"dateTime:number", "3", epochTime.Add(time.Duration(3))}, + {"dateTime", "4", epochTime.Add(time.Duration(4))}, + {"dateTime:2006-01-02", "1970-01-01", epochTime}, + {"dateTime", "1970-01-01T00:00:00Z", epochTime}, + {"dateTime", "1970-01-01T00:00:00.000000001Z", epochTime.Add(time.Duration(1))}, + {"double:, .", "200 100.299,0", float64(200100299.0)}, + {"long:, .", "200 100.299,0", int64(200100299)}, + {"unsignedLong:, .", "200 100.299,0", uint64(200100299)}, + {"u.type", "", nil}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) { + column := &CsvTableColumn{Label: "test"} + column.setupDataType(test.dataType) + val, err := toTypedValue(test.value, column, 1) + if err != nil && test.expect != nil { + require.Nil(t, err.Error()) + } + require.Equal(t, test.expect, val) + }) + } +} + +// Test_ToTypedValue_dateTimeCustomTimeZone tests custom timezone when calling toTypedValue function +func Test_ToTypedValue_dateTimeCustomTimeZone(t *testing.T) { + epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z") + tz, _ := parseTimeZone("-0100") + var tests = []struct { + dataType string + value string + expect interface{} + }{ + {"dateTime:RFC3339", "1970-01-01T00:00:00Z", epochTime}, + {"dateTime:RFC3339Nano", "1970-01-01T00:00:00.0Z", epochTime}, + {"dateTime:number", "3", epochTime.Add(time.Duration(3))}, + {"dateTime:2006-01-02", "1970-01-01", epochTime.Add(time.Hour)}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i)+" "+test.value, func(t *testing.T) { + column := &CsvTableColumn{} + column.TimeZone = tz + column.setupDataType(test.dataType) + val, err := toTypedValue(test.value, column, 1) + if err != nil && test.expect != nil { + require.Nil(t, err.Error()) + } + if test.expect == nil { + require.Equal(t, test.expect, val) + } else { + expectTime := test.expect.(time.Time) + time := val.(time.Time) + require.True(t, expectTime.Equal(time)) + } + }) + } +} + +// Test_WriteProtocolValue tests writeProtocolValue function +func Test_AppendProtocolValue(t *testing.T) { + epochTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z") + var tests = []struct { + value interface{} + expect string + }{ + {uint64(1), "1u"}, + {int64(1), "1i"}, + {int(1), "1i"}, + {float64(1.1), "1.1"}, + {math.NaN(), ""}, + {math.Inf(1), ""}, + {float32(1), "1"}, + {float32(math.NaN()), ""}, + {float32(math.Inf(1)), ""}, + {"a", `"a"`}, + {[]byte("aaa"), "YWFh"}, + {true, "true"}, + {false, "false"}, + {epochTime, "0"}, + {time.Duration(100), "100i"}, + {struct{}{}, ""}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + val, err := appendProtocolValue(nil, test.value) + if err != nil && test.expect != "" { + require.Nil(t, err.Error()) + } + require.Equal(t, test.expect, string(val)) + }) + } +} + +// Test_AppendConverted tests appendConverted function +func Test_AppendConverted(t *testing.T) { + var tests = []struct { + dataType string + value string + expect string + }{ + {"", "1", "1"}, + {"long", "a", ""}, + {"dateTime", "a", ""}, + {"dateTime:number", "a", ""}, + {"string", "a", `"a"`}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + column := &CsvTableColumn{Label: "test"} + column.setupDataType(test.dataType) + val, err := appendConverted(nil, test.value, column, 1) + if err != nil && test.expect != "" { + require.Nil(t, err.Error()) + } + require.Equal(t, test.expect, string(val)) + }) + } +} + +// Test_IsTypeSupported tests IsTypeSupported function +func Test_IsTypeSupported(t *testing.T) { + require.True(t, IsTypeSupported(stringDatatype), true) + require.True(t, IsTypeSupported(doubleDatatype), true) + require.True(t, IsTypeSupported(boolDatatype), true) + require.True(t, IsTypeSupported(longDatatype), true) + require.True(t, IsTypeSupported(uLongDatatype), true) + require.True(t, IsTypeSupported(durationDatatype), true) + require.True(t, IsTypeSupported(base64BinaryDataType), true) + require.True(t, IsTypeSupported(dateTimeDatatype), true) + require.True(t, IsTypeSupported(""), true) + require.False(t, IsTypeSupported(" "), false) + // time format is not part of data type + require.False(t, IsTypeSupported(dateTimeDatatype+":"+RFC3339)) + require.False(t, IsTypeSupported(dateTimeDatatype+":"+RFC3339Nano)) + require.False(t, IsTypeSupported(dateTimeDatatype+":"+dataFormatNumber)) +} + +// Test_NormalizeNumberString tests normalizeNumberString function +func Test_NormalizeNumberString(t *testing.T) { + var tests = []struct { + value string + format string + removeFraction bool + expect string + truncated bool + }{ + {"123", "", true, "123", false}, + {"123", ".", true, "123", false}, + {"123.456", ".", true, "123", true}, + {"123.456", ".", false, "123.456", false}, + {"1 2.3,456", ",. ", false, "123.456", false}, + {" 1 2\t3.456 \r\n", "", false, "123.456", false}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + // customize logging to check warnings + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + oldPrefix := log.Prefix() + prefix := "::PREFIX::" + log.SetPrefix(prefix) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + normalized, truncated := normalizeNumberString(test.value, test.format, test.removeFraction) + require.Equal(t, test.expect, normalized) + require.Equal(t, test.truncated, truncated) + }) + } +} + +// Test_CreateDecoder tests CreateDecoder function +func Test_CreateDecoder(t *testing.T) { + decoder, err := CreateDecoder("UTF-8") + toUtf8 := func(in []byte) string { + s, _ := ioutil.ReadAll(decoder(bytes.NewReader(in))) + return string(s) + } + require.NotNil(t, decoder) + require.Nil(t, err) + require.Equal(t, "\u2318", toUtf8([]byte{226, 140, 152})) + decoder, err = CreateDecoder("windows-1250") + require.NotNil(t, decoder) + require.Nil(t, err) + require.Equal(t, "\u0160", toUtf8([]byte{0x8A})) + decoder, err = CreateDecoder("whateveritis") + require.NotNil(t, err) + require.Nil(t, decoder) + // we can have valid IANA names that are not supported by golang/x/text + decoder, err = CreateDecoder("US-ASCII") + log.Printf("US-ASCII encoding support: %v,%v", decoder != nil, err) +} + +// Test_CreateBoolParseFn tests createBoolParseFn function +func Test_CreateBoolParseFn(t *testing.T) { + type pairT struct { + value string + expect string + } + var tests = []struct { + format string + pair []pairT + }{ + {"t,y,1:f,n,0", []pairT{ + {"y", "true"}, + {"0", "false"}, + {"T", "unsupported"}, + }}, + {"true", []pairT{ + {"true", "unsupported"}, + {"false", "unsupported"}, + }}, + {"true:", []pairT{ + {"true", "true"}, + {"other", "false"}, + }}, + {":false", []pairT{ + {"false", "false"}, + {"other", "true"}, + }}, + } + + for i, test := range tests { + fn := createBoolParseFn(test.format) + for j, pair := range test.pair { + t.Run(fmt.Sprint(i)+"_"+fmt.Sprint(j), func(t *testing.T) { + result, err := fn(pair.value) + switch pair.expect { + case "true": + require.Equal(t, true, result) + case "false": + require.Equal(t, false, result) + default: + require.NotNil(t, err) + require.True(t, strings.Contains(fmt.Sprintf("%v", err), pair.expect)) + } + }) + } + } +} diff --git a/pkg/csv2lp/examples_test.go b/pkg/csv2lp/examples_test.go new file mode 100644 index 0000000..0bf6c66 --- /dev/null +++ b/pkg/csv2lp/examples_test.go @@ -0,0 +1,195 @@ +package csv2lp + +import ( + "fmt" + "io/ioutil" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +type csvExample struct { + name string + csv string + lp string +} + +var examples []csvExample = []csvExample{ + { + "fluxQueryResult", + ` +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host +,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:17:57Z,0,time_steal,cpu,cpu1,rsavage.prod +,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:07Z,0,time_steal,cpu,cpu1,rsavage.prod + +#group,false,false,true,true,false,false,true,true,true,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string +#default,_result,,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host +,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:01Z,2.7263631815907954,usage_user,cpu,cpu-total,tahoecity.prod +,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:11Z,2.247752247752248,usage_user,cpu,cpu-total,tahoecity.prod +`, + ` +cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669077000000000 +cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669087000000000 +cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.7263631815907954 1582669081000000000 +cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.247752247752248 1582669091000000000 +`, + }, + { + "annotatedSimple", + ` +#datatype measurement,tag,tag,double,double,ignored,dateTime:number +m,cpu,host,time_steal,usage_user,nothing,time +cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000 +cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000 +`, + ` +cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.7 1482669077000000000 +cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.2 1482669087000000000 +`, + }, + { + "annotatedSimple_labels", + ` +m|measurement,cpu|tag,host|tag,time_steal|double,usage_user|double,nothing|ignored,time|dateTime:number +cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000 +cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000 +`, + ` +cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.7 1482669077000000000 +cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.2 1482669087000000000 +`, + }, + { + "annotatedDatatype", + ` +#datatype measurement,tag,string,double,boolean,long,unsignedLong,duration,dateTime +#default test,annotatedDatatypes,,,,,, +m,name,s,d,b,l,ul,dur,time +,,str1,1.0,true,1,1,1ms,1 +,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z +`, + ` +test,name=annotatedDatatypes s="str1",d=1,b=true,l=1i,ul=1u,dur=1000000i 1 +test,name=annotatedDatatypes s="str2",d=2,b=false,l=2i,ul=2u,dur=2000i 1578737410000000000 +`, + }, + { + "annotatedDatatype_labels", + ` +m|measurement|test,name|tag|annotatedDatatypes,s|string,d|double,b|boolean,l|long,ul|unsignedLong,dur|duration,time|dateTime +,,str1,1.0,true,1,1,1ms,1 +,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z`, + ` +test,name=annotatedDatatypes s="str1",d=1,b=true,l=1i,ul=1u,dur=1000000i 1 +test,name=annotatedDatatypes s="str2",d=2,b=false,l=2i,ul=2u,dur=2000i 1578737410000000000 +`, + }, + { + "datetypeFormats", + ` +#constant measurement,test +#constant tag,name,datetypeFormats +#timezone -0500 +#datatype dateTime:2006-01-02|1970-01-02,"double:,. ","boolean:y,Y:n,N|y" +t,d,b +1970-01-01,"123.456,78", +,"123 456,78",Y +`, + ` +test,name=datetypeFormats d=123456.78,b=true 18000000000000 +test,name=datetypeFormats d=123456.78,b=true 104400000000000 +`, + }, + { + "datetypeFormats_labels", + ` +#constant measurement,test +#constant tag,name,datetypeFormats +#timezone -0500 +t|dateTime:2006-01-02|1970-01-02,"d|double:,. ","b|boolean:y,Y:n,N|y" +1970-01-01,"123.456,78", +,"123 456,78",Y +`, + ` +test,name=datetypeFormats d=123456.78,b=true 18000000000000 +test,name=datetypeFormats d=123456.78,b=true 104400000000000 +`, + }, + { + "datetypeFormats_labels_override", + ` +#constant measurement,test2 +t|dateTime:2006-01-02,_|ignored,s|string|unknown +1970-01-01,"123.456,78", +,"123 456,78",Y +`, + ` +test2 s="unknown" 0 +test2 s="Y" +`, + }, + { + "datetypeFormats_labels_override", + ` +m|measurement,usage_user|double +cpu,2.7 +cpu,nil +cpu, +,2.9 +`, + ` +cpu usage_user=2.7 +`, + }, + { + "columnSeparator", + `sep=; +m|measurement;available|boolean:y,Y:|n;dt|dateTime:number +test;nil;1 +test;N;2 +test;";";3 +test;;4 +test;Y;5 +`, + ` +test available=false 1 +test available=false 2 +test available=false 3 +test available=false 4 +test available=true 5 +`, + }, +} + +func (example *csvExample) normalize() rune { + for len(example.lp) > 0 && example.lp[0] == '\n' { + example.lp = example.lp[1:] + } + if strings.HasPrefix(example.csv, "sep=") { + return (rune)(example.csv[4]) + } + return ',' +} + +// Test_Examples tests examples of README.md file herein +func Test_Examples(t *testing.T) { + for _, example := range examples { + t.Run(example.name, func(t *testing.T) { + comma := example.normalize() + transformer := CsvToLineProtocol(strings.NewReader(example.csv)) + transformer.SkipRowOnError(true) + result, err := ioutil.ReadAll(transformer) + if err != nil { + require.Nil(t, fmt.Sprintf("%s", err)) + } + require.Equal(t, comma, transformer.Comma()) + require.Equal(t, example.lp, string(result)) + }) + } +} diff --git a/pkg/csv2lp/line_reader.go b/pkg/csv2lp/line_reader.go new file mode 100644 index 0000000..71d0ce7 --- /dev/null +++ b/pkg/csv2lp/line_reader.go @@ -0,0 +1,100 @@ +package csv2lp + +import ( + "io" +) + +const ( + defaultBufSize = 4096 +) + +// LineReader wraps an io.Reader to count lines that go though read +// function and returns at most one line during every invocation of +// read. It provides a workaround to golang's CSV reader that +// does not expose current line number at all +// (see https://github.com/golang/go/issues/26679) +// +// At most one line is returned by every read in order to ensure that +// golang's CSV reader buffers at most one single line into its nested +// bufio.Reader. +type LineReader struct { + // LineNumber of the next read operation, 0 is the first line by default. + // It can be set to 1 start counting from 1. + LineNumber int + // LastLineNumber is the number of the last read row. + LastLineNumber int + + // rs is a wrapped reader + rd io.Reader // reader provided by the client + // buf contains last data read from rd + buf []byte + // readPos is a read position in the buffer + readPos int + // bufSize is the length of data read from rd into buf + bufSize int + // err contains the last error during read + err error +} + +// NewLineReader returns a new LineReader. +func NewLineReader(rd io.Reader) *LineReader { + return NewLineReaderSize(rd, defaultBufSize) +} + +// NewLineReaderSize returns a new Reader whose buffer has at least the specified +// size. +func NewLineReaderSize(rd io.Reader, size int) *LineReader { + if size < 2 { + size = 2 + } + return &LineReader{ + rd: rd, + buf: make([]byte, size), + } +} + +// Read reads data into p. It fills in data that either does +// not contain \n or ends with \n. +// It returns the number of bytes read into p. +func (lr *LineReader) Read(p []byte) (int, error) { + // handle pathological case of reading into empty array + if len(p) == 0 { + if lr.readPos < lr.bufSize { + return 0, nil + } + return 0, lr.readErr() + } + // read data into buf + if lr.readPos == lr.bufSize { + if lr.err != nil { + return 0, lr.readErr() + } + lr.readPos = 0 + lr.bufSize, lr.err = lr.rd.Read(lr.buf) + if lr.bufSize == 0 { + return 0, lr.readErr() + } + } + // copy at most one line and don't overflow internal buffer or p + i := 0 + lr.LastLineNumber = lr.LineNumber + for lr.readPos < lr.bufSize && i < len(p) { + b := lr.buf[lr.readPos] + lr.readPos++ + p[i] = b + i++ + // read at most one line + if b == '\n' { + lr.LineNumber++ + break + } + } + return i, nil +} + +// readErr returns the last error and resets err status +func (lr *LineReader) readErr() error { + err := lr.err + lr.err = nil + return err +} diff --git a/pkg/csv2lp/line_reader_test.go b/pkg/csv2lp/line_reader_test.go new file mode 100644 index 0000000..b79337f --- /dev/null +++ b/pkg/csv2lp/line_reader_test.go @@ -0,0 +1,148 @@ +package csv2lp_test + +import ( + "encoding/csv" + "fmt" + "io" + "strings" + "testing" + "testing/iotest" + + "github.com/influxdata/influx-cli/v2/pkg/csv2lp" + "github.com/stretchr/testify/require" +) + +// TestLineReader tests correctness of line reporting and reader implementation of LineReader +func TestLineReader(t *testing.T) { + type TestInput = struct { + lines [4]string + withDataErrorReader bool + } + tests := []TestInput{ + { + lines: [4]string{"a\n", "\n", "\n", "bcxy"}, + withDataErrorReader: false, + }, { + lines: [4]string{"a\n", "\n", "\n", "bcxy"}, + withDataErrorReader: true, + }, { + lines: [4]string{"a\n", "\n", "\n", "bcx\n"}, + withDataErrorReader: false, + }, { + lines: [4]string{"a\n", "\n", "\n", "bcx\n"}, + withDataErrorReader: true, + }} + for _, test := range tests { + lines := test.lines + input := strings.Join(lines[:], "") + withDataErrorReader := test.withDataErrorReader + t.Run(fmt.Sprintf("%s withDataErrorReader=%v", input, withDataErrorReader), func(t *testing.T) { + var reader io.Reader = strings.NewReader(input) + if withDataErrorReader { + // ensures that the reader reports the last EOF error also with data + reader = iotest.DataErrReader(reader) + } + lineReader := csv2lp.NewLineReaderSize(reader, 2) + var buf []byte = make([]byte, 4) + var err error + var read int + + // patologic case: reading from empty buffer returns 0 without an error + read, err = lineReader.Read(buf[0:0]) + require.Equal(t, 0, read) + require.Nil(t, err) + require.Equal(t, 0, lineReader.LastLineNumber) + + // 1st line + read, err = lineReader.Read(buf) + require.Equal(t, []byte(lines[0]), buf[0:read]) + require.Nil(t, err) + require.Equal(t, 0, lineReader.LastLineNumber) + + // 2nd + read, err = lineReader.Read(buf) + require.Equal(t, []byte(lines[1]), buf[0:read]) + require.Nil(t, err) + require.Equal(t, 1, lineReader.LastLineNumber) + + // reading into empty does not change the game + read, err = lineReader.Read(buf[0:0]) + require.Equal(t, 0, read) + require.Nil(t, err) + require.Equal(t, 1, lineReader.LastLineNumber) + + // 3rd + read, err = lineReader.Read(buf) + require.Equal(t, []byte(lines[2]), buf[0:read]) + require.Nil(t, err) + require.Equal(t, 2, lineReader.LastLineNumber) + + // 4th line cannot be fully read, because buffer size is 2 + read, err = lineReader.Read(buf) + require.Equal(t, []byte(lines[3][:2]), buf[0:read]) + require.Nil(t, err) + require.Equal(t, 3, lineReader.LastLineNumber) + read, err = lineReader.Read(buf) + require.Equal(t, []byte(lines[3][2:]), buf[0:read]) + require.Nil(t, err) + require.Equal(t, 3, lineReader.LastLineNumber) + + // 5th line => error + _, err = lineReader.Read(buf) + require.NotNil(t, err) + }) + } +} + +// TestLineReader_Read_BufferOverflow ensures calling Open into +// a slice does not panic. Fixes https://github.com/influxdata/influxdb/issues/19586 +func TestLineReader_Read_BufferOverflow(t *testing.T) { + sr := strings.NewReader("foo\nbar") + rd := csv2lp.NewLineReader(sr) + buf := make([]byte, 2) + + n, err := rd.Read(buf) + require.Equal(t, n, 2) + require.NoError(t, err) +} + +// TestLineReader_viaCsv tests correct line reporting when read through a CSV reader with various buffer sizes +// to emulate multiple required reads with a small test data set +func TestLineReader_viaCsv(t *testing.T) { + type RowWithLine = struct { + row []string + lineNumber int + } + + input := "a\n\nb\n\nc" + expected := []RowWithLine{ + {[]string{"a"}, 1}, + {[]string{"b"}, 3}, + {[]string{"c"}, 5}, + } + + bufferSizes := []int{-1, 0, 2, 50} + for _, bufferSize := range bufferSizes { + t.Run(fmt.Sprintf("buffer size: %d", bufferSize), func(t *testing.T) { + var lineReader *csv2lp.LineReader + if bufferSize < 0 { + lineReader = csv2lp.NewLineReader(strings.NewReader(input)) + } else { + lineReader = csv2lp.NewLineReaderSize(strings.NewReader(input), bufferSize) + } + lineReader.LineNumber = 1 // start with 1 + csvReader := csv.NewReader(lineReader) + + results := make([]RowWithLine, 0, 3) + for { + row, _ := csvReader.Read() + lineNumber := lineReader.LastLineNumber + if row == nil { + break + } + results = append(results, RowWithLine{row, lineNumber}) + } + require.Equal(t, expected, results) + }) + } +} diff --git a/pkg/csv2lp/multi_closer.go b/pkg/csv2lp/multi_closer.go new file mode 100644 index 0000000..0e8847a --- /dev/null +++ b/pkg/csv2lp/multi_closer.go @@ -0,0 +1,33 @@ +package csv2lp + +import ( + "io" + "log" +) + +// multicloser +type multiCloser struct { + closers []io.Closer +} + +// Close implements io.Closer to closes all nested closers and logs a warning on error +func (mc *multiCloser) Close() error { + var err error + for i := 0; i < len(mc.closers); i++ { + e := mc.closers[i].Close() + if e != nil { + if err == nil { + err = e + } + log.Println(err) + } + } + return err +} + +//MultiCloser creates an io.Closer that silently closes supplied io.Closer instances +func MultiCloser(closers ...io.Closer) io.Closer { + c := make([]io.Closer, len(closers)) + copy(c, closers) + return &multiCloser{c} +} diff --git a/pkg/csv2lp/multi_closer_test.go b/pkg/csv2lp/multi_closer_test.go new file mode 100644 index 0000000..42bb932 --- /dev/null +++ b/pkg/csv2lp/multi_closer_test.go @@ -0,0 +1,62 @@ +package csv2lp + +import ( + "bytes" + "errors" + "fmt" + "io" + "log" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +type testCloser struct { + errorMsg string +} + +func (c testCloser) Close() error { + if c.errorMsg == "" { + return nil + } + return errors.New(c.errorMsg) +} + +// Test_escapeMeasurement +func TestMultiCloser(t *testing.T) { + var buf bytes.Buffer + log.SetOutput(&buf) + oldFlags := log.Flags() + log.SetFlags(0) + oldPrefix := log.Prefix() + prefix := "::PREFIX::" + log.SetPrefix(prefix) + defer func() { + log.SetOutput(os.Stderr) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + + var tests = []struct { + subject io.Closer + errors int + }{ + {MultiCloser(), 0}, + {MultiCloser(testCloser{}, testCloser{}), 0}, + {MultiCloser(testCloser{"a"}, testCloser{}, testCloser{"c"}), 2}, + } + + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + buf.Reset() + err := test.subject.Close() + messages := strings.Count(buf.String(), prefix) + require.Equal(t, test.errors, messages) + if test.errors > 0 { + require.NotNil(t, err) + } + }) + } +} diff --git a/pkg/csv2lp/skip_header_lines.go b/pkg/csv2lp/skip_header_lines.go new file mode 100644 index 0000000..a52815f --- /dev/null +++ b/pkg/csv2lp/skip_header_lines.go @@ -0,0 +1,65 @@ +package csv2lp + +import ( + "io" +) + +// skipFirstLines is an io.Reader that skips first lines +type skipFirstLines struct { + // reader provides data + reader io.Reader + // skipLines contains the lines to skip + skipLines int + // line is a mutable variable that increases until skipLines is reached + line int + // quotedString indicates whether a quoted CSV string is being read, + // a new line inside a quoted string does not start a new CSV line + quotedString bool +} + +// Read implements io.Reader +func (state *skipFirstLines) Read(p []byte) (n int, err error) { +skipHeaderLines: + for state.line < state.skipLines { + n, err := state.reader.Read(p) + if n == 0 { + return n, err + } + for i := 0; i < n; i++ { + switch p[i] { + case '"': + // a quoted string starts or stops + state.quotedString = !state.quotedString + case '\n': + if !state.quotedString { + state.line++ + if state.line == state.skipLines { + // modify the buffer and return + if i == n-1 { + if err != nil { + return 0, err + } + // continue with the next chunk + break skipHeaderLines + } else { + // copy all bytes after the newline + for j := i + 1; j < n; j++ { + p[j-i-1] = p[j] + } + return n - i - 1, err + } + } + } + } + } + } + return state.reader.Read(p) +} + +// SkipHeaderLinesReader wraps a reader to skip the first skipLines lines in CSV data input +func SkipHeaderLinesReader(skipLines int, reader io.Reader) io.Reader { + return &skipFirstLines{ + skipLines: skipLines, + reader: reader, + } +} diff --git a/pkg/csv2lp/skip_header_lines_test.go b/pkg/csv2lp/skip_header_lines_test.go new file mode 100644 index 0000000..d909a40 --- /dev/null +++ b/pkg/csv2lp/skip_header_lines_test.go @@ -0,0 +1,93 @@ +package csv2lp + +import ( + "io" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// simulates the reader that returns all data together with EOF +type readOnceWithEOF struct { + reader io.Reader +} + +func (r *readOnceWithEOF) Read(p []byte) (n int, err error) { + n, _ = r.reader.Read(p) + return n, io.EOF +} + +// Test_SkipHeaderLines checks that first lines are skipped +func Test_SkipHeaderLines(t *testing.T) { + + var tests = []struct { + skipCount int + input string + result string + }{ + { + 10, + "1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n", + "", + }, + { + 0, + "1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n", + "1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n", + }, + { + 1, + "1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n", + "2\n3\n4\n5\n6\n7\n8\n9\n0\n", + }, + { + 5, + "1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n", + "6\n7\n8\n9\n0\n", + }, + { + 20, + "1\n2\n3\n4\n5\n6\n7\n8\n9\n0\n", + "", + }, + { + 1, + "\"\n\"\"\n\"\n2", + "2", + }, + } + + for i, test := range tests { + input := test.input + bufferSizes := []int{1, 2, 7, 0, len(input), len(input) + 1} + for _, bufferSize := range bufferSizes { + t.Run(strconv.Itoa(i)+"_"+strconv.Itoa(bufferSize), func(t *testing.T) { + var reader io.Reader + if bufferSize == 0 { + // emulate a reader that returns EOF together with data + bufferSize = len(input) + reader = SkipHeaderLinesReader(test.skipCount, &readOnceWithEOF{strings.NewReader(input)}) + } else { + reader = SkipHeaderLinesReader(test.skipCount, strings.NewReader(input)) + } + buffer := make([]byte, bufferSize) + result := make([]byte, 0, 100) + for { + n, err := reader.Read(buffer) + if n > 0 { + result = append(result, buffer[:n]...) + } + if err != nil { + if err != io.EOF { + require.Nil(t, err.Error()) + } + break + } + } + require.Equal(t, test.result, string(result)) + }) + } + } +}