484 lines
13 KiB
Go
484 lines
13 KiB
Go
package csv2lp
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// Test_CsvToLineProtocol_variousBufferSize tests conversion of annotated CSV data to line protocol data on various buffer sizes
|
|
func Test_CsvToLineProtocol_variousBufferSize(t *testing.T) {
|
|
var tests = []struct {
|
|
name string
|
|
csv string
|
|
lines string
|
|
err string
|
|
}{
|
|
{
|
|
"simple1",
|
|
"_measurement,a,b\ncpu,1,1\ncpu,b2\n",
|
|
"cpu a=1,b=1\ncpu a=b2\n",
|
|
"",
|
|
},
|
|
{
|
|
"simple1_withSep",
|
|
"sep=;\n_measurement;a;b\ncpu;1;1\ncpu;b2\n",
|
|
"cpu a=1,b=1\ncpu a=b2\n",
|
|
"",
|
|
},
|
|
{
|
|
"simple2",
|
|
"_measurement,a,b\ncpu,1,1\ncpu,\n",
|
|
"",
|
|
"no field data",
|
|
},
|
|
{
|
|
"simple3",
|
|
"_measurement,a,_time\ncpu,1,1\ncpu,2,invalidTime\n",
|
|
"",
|
|
"_time", // error in _time column
|
|
},
|
|
{
|
|
"constant_annotations",
|
|
"#constant,measurement,,cpu\n" +
|
|
"#constant,tag,xpu,xpu1\n" +
|
|
"#constant,tag,cpu,cpu1\n" +
|
|
"#constant,long,of,100\n" +
|
|
"#constant,dateTime,,2\n" +
|
|
"x,y\n" +
|
|
"1,2\n" +
|
|
"3,4\n",
|
|
"cpu,cpu=cpu1,xpu=xpu1 x=1,y=2,of=100i 2\n" +
|
|
"cpu,cpu=cpu1,xpu=xpu1 x=3,y=4,of=100i 2\n",
|
|
"", // no error
|
|
},
|
|
{
|
|
"timezone_annotation-0100",
|
|
"#timezone,-0100\n" +
|
|
"#constant,measurement,cpu\n" +
|
|
"#constant,dateTime:2006-01-02,1970-01-01\n" +
|
|
"x,y\n" +
|
|
"1,2\n",
|
|
"cpu x=1,y=2 3600000000000\n",
|
|
"", // no error
|
|
},
|
|
{
|
|
"timezone_annotation_EST",
|
|
"#timezone,EST\n" +
|
|
"#constant,measurement,cpu\n" +
|
|
"#constant,dateTime:2006-01-02,1970-01-01\n" +
|
|
"x,y\n" +
|
|
"1,2\n",
|
|
"cpu x=1,y=2 18000000000000\n",
|
|
"", // no error
|
|
},
|
|
}
|
|
bufferSizes := []int{40, 7, 3, 1}
|
|
|
|
for _, test := range tests {
|
|
for _, bufferSize := range bufferSizes {
|
|
t.Run(test.name+"_"+strconv.Itoa(bufferSize), func(t *testing.T) {
|
|
reader := CsvToLineProtocol(strings.NewReader(test.csv))
|
|
buffer := make([]byte, bufferSize)
|
|
lines := make([]byte, 0, 100)
|
|
for {
|
|
n, err := reader.Read(buffer)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if test.err != "" {
|
|
// fmt.Println(err)
|
|
if err := err.Error(); !strings.Contains(err, test.err) {
|
|
require.Equal(t, err, test.err)
|
|
}
|
|
return
|
|
}
|
|
require.Nil(t, err.Error())
|
|
break
|
|
}
|
|
lines = append(lines, buffer[:n]...)
|
|
}
|
|
if test.err == "" {
|
|
require.Equal(t, test.lines, string(lines))
|
|
} else {
|
|
require.Fail(t, "error message with '"+test.err+"' expected")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test_CsvToLineProtocol_samples tests conversion of annotated CSV data to line protocol data
|
|
func Test_CsvToLineProtocol_samples(t *testing.T) {
|
|
var tests = []struct {
|
|
name string
|
|
csv string
|
|
lines string
|
|
err string
|
|
}{
|
|
{
|
|
"queryResult_19452", // https://github.com/influxdata/influxdb/issues/19452
|
|
"#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n" +
|
|
"#group,false,false,true,true,false,false,true,true,true\n" +
|
|
"#default,_result,,,,,,,,\n" +
|
|
",result,table,_start,_stop,_time,_value,_field,_measurement,host\n" +
|
|
",,0,2020-08-26T22:59:23.598653Z,2020-08-26T23:00:23.598653Z,2020-08-26T22:59:30Z,15075651584,active,mem,ip-192-168-86-25.ec2.internal\n",
|
|
"mem,host=ip-192-168-86-25.ec2.internal active=15075651584i 1598482770000000000\n",
|
|
"", // no error
|
|
},
|
|
{
|
|
"queryResult_19452_group_first", // issue 19452, but with group annotation first
|
|
"#group,false,false,true,true,false,false,true,true,true\n" +
|
|
"#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n" +
|
|
"#default,_result,,,,,,,,\n" +
|
|
",result,table,_start,_stop,_time,_value,_field,_measurement,host\n" +
|
|
",,0,2020-08-26T22:59:23.598653Z,2020-08-26T23:00:23.598653Z,2020-08-26T22:59:30Z,15075651584,active,mem,ip-192-168-86-25.ec2.internal\n",
|
|
"mem,host=ip-192-168-86-25.ec2.internal active=15075651584i 1598482770000000000\n",
|
|
"", // no error
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
reader := CsvToLineProtocol(strings.NewReader(test.csv))
|
|
buffer := make([]byte, 100)
|
|
lines := make([]byte, 0, 100)
|
|
for {
|
|
n, err := reader.Read(buffer)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if test.err != "" {
|
|
// fmt.Println(err)
|
|
if err := err.Error(); !strings.Contains(err, test.err) {
|
|
require.Equal(t, err, test.err)
|
|
}
|
|
return
|
|
}
|
|
require.Nil(t, err.Error())
|
|
break
|
|
}
|
|
lines = append(lines, buffer[:n]...)
|
|
}
|
|
if test.err == "" {
|
|
require.Equal(t, test.lines, string(lines))
|
|
} else {
|
|
require.Fail(t, "error message with '"+test.err+"' expected")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// Test_CsvToLineProtocol_LogTableColumns checks correct logging of table columns
|
|
func Test_CsvToLineProtocol_LogTableColumns(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
log.SetOutput(&buf)
|
|
oldFlags := log.Flags()
|
|
log.SetFlags(0)
|
|
oldPrefix := log.Prefix()
|
|
prefix := "::PREFIX::"
|
|
log.SetPrefix(prefix)
|
|
defer func() {
|
|
log.SetOutput(os.Stderr)
|
|
log.SetFlags(oldFlags)
|
|
log.SetPrefix(oldPrefix)
|
|
}()
|
|
|
|
csv := "_measurement,a,b\ncpu,1,1\ncpu,b2\n"
|
|
|
|
reader := CsvToLineProtocol(strings.NewReader(csv)).LogTableColumns(true)
|
|
require.False(t, reader.skipRowOnError)
|
|
require.True(t, reader.logTableDataColumns)
|
|
// read all the data
|
|
io.ReadAll(reader)
|
|
|
|
out := buf.String()
|
|
// fmt.Println(out)
|
|
messages := strings.Count(out, prefix)
|
|
require.Equal(t, messages, 1)
|
|
}
|
|
|
|
// Test_CsvToLineProtocol_LogTimeZoneWarning checks correct logging of timezone warning
|
|
func Test_CsvToLineProtocol_LogTimeZoneWarning(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
log.SetOutput(&buf)
|
|
oldFlags := log.Flags()
|
|
log.SetFlags(0)
|
|
oldPrefix := log.Prefix()
|
|
prefix := "::PREFIX::"
|
|
log.SetPrefix(prefix)
|
|
defer func() {
|
|
log.SetOutput(os.Stderr)
|
|
log.SetFlags(oldFlags)
|
|
log.SetPrefix(oldPrefix)
|
|
}()
|
|
|
|
csv := "#timezone 1\n" +
|
|
"#constant,dateTime:2006-01-02,1970-01-01\n" +
|
|
"_measurement,a,b\ncpu,1,1"
|
|
|
|
reader := CsvToLineProtocol(strings.NewReader(csv))
|
|
bytes, _ := io.ReadAll(reader)
|
|
|
|
out := buf.String()
|
|
// fmt.Println(out) // "::PREFIX::WARNING: #timezone annotation: unknown time zone 1
|
|
messages := strings.Count(out, prefix)
|
|
require.Equal(t, messages, 1)
|
|
require.Equal(t, string(bytes), "cpu a=1,b=1 0\n")
|
|
}
|
|
|
|
// Test_CsvToLineProtocol_SkipRowOnError tests that error rows are skipped
|
|
func Test_CsvToLineProtocol_SkipRowOnError(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
log.SetOutput(&buf)
|
|
oldFlags := log.Flags()
|
|
log.SetFlags(0)
|
|
oldPrefix := log.Prefix()
|
|
prefix := "::PREFIX::"
|
|
log.SetPrefix(prefix)
|
|
defer func() {
|
|
log.SetOutput(os.Stderr)
|
|
log.SetFlags(oldFlags)
|
|
log.SetPrefix(oldPrefix)
|
|
}()
|
|
|
|
csv := "_measurement,a,_time\n,1,1\ncpu,2,2\ncpu,3,3a\n"
|
|
|
|
reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true)
|
|
require.Equal(t, reader.skipRowOnError, true)
|
|
require.Equal(t, reader.logTableDataColumns, false)
|
|
// read all the data
|
|
io.ReadAll(reader)
|
|
|
|
out := buf.String()
|
|
// fmt.Println(out)
|
|
messages := strings.Count(out, prefix)
|
|
require.Equal(t, messages, 2)
|
|
}
|
|
|
|
// Test_CsvToLineProtocol_RowSkipped tests that error rows are reported to configured RowSkipped listener
|
|
func Test_CsvToLineProtocol_RowSkipped(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
log.SetOutput(&buf)
|
|
oldFlags := log.Flags()
|
|
log.SetFlags(0)
|
|
defer func() {
|
|
log.SetOutput(os.Stderr)
|
|
log.SetFlags(oldFlags)
|
|
}()
|
|
|
|
type ActualArguments = struct {
|
|
src *CsvToLineReader
|
|
err error
|
|
row []string
|
|
}
|
|
type ExpectedArguments = struct {
|
|
errorString string
|
|
row []string
|
|
}
|
|
|
|
csv := "sep=;\n_measurement;a|long:strict\n;1\ncpu;2.1\ncpu;3a\n"
|
|
calledArgs := []ActualArguments{}
|
|
expectedArgs := []ExpectedArguments{
|
|
{
|
|
"line 3: column '_measurement': no measurement supplied",
|
|
[]string{"", "1"},
|
|
},
|
|
{
|
|
"line 4: column 'a': '2.1' cannot fit into long data type",
|
|
[]string{"cpu", "2.1"},
|
|
},
|
|
{
|
|
"line 5: column 'a': strconv.ParseInt:",
|
|
[]string{"cpu", "3a"},
|
|
},
|
|
}
|
|
|
|
reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true)
|
|
reader.RowSkipped = func(src *CsvToLineReader, err error, _row []string) {
|
|
// make a copy of _row
|
|
row := make([]string, len(_row))
|
|
copy(row, _row)
|
|
// remember for comparison
|
|
calledArgs = append(calledArgs, ActualArguments{
|
|
src, err, row,
|
|
})
|
|
}
|
|
// read all the data
|
|
io.ReadAll(reader)
|
|
|
|
out := buf.String()
|
|
require.Empty(t, out, "No log messages expected because RowSkipped handler is set")
|
|
|
|
require.Len(t, calledArgs, 3)
|
|
for i, expected := range expectedArgs {
|
|
require.Equal(t, reader, calledArgs[i].src)
|
|
require.Contains(t, calledArgs[i].err.Error(), expected.errorString)
|
|
require.Equal(t, expected.row, calledArgs[i].row)
|
|
}
|
|
}
|
|
|
|
// Test_CsvLineError tests CsvLineError error format
|
|
func Test_CsvLineError(t *testing.T) {
|
|
var tests = []struct {
|
|
err CsvLineError
|
|
value string
|
|
}{
|
|
{
|
|
CsvLineError{Line: 1, Err: errors.New("cause")},
|
|
"line 1: cause",
|
|
},
|
|
{
|
|
CsvLineError{Line: 2, Err: CsvColumnError{"a", errors.New("cause")}},
|
|
"line 2: column 'a': cause",
|
|
},
|
|
{
|
|
CsvLineError{Line: -1, Err: CsvColumnError{"a", errors.New("cause")}},
|
|
"column 'a': cause",
|
|
},
|
|
}
|
|
for _, test := range tests {
|
|
require.Equal(t, test.value, test.err.Error())
|
|
}
|
|
}
|
|
|
|
// Test_CsvToLineProtocol_lineNumbers tests that correct line numbers are reported
|
|
func Test_CsvToLineProtocol_lineNumbers(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
log.SetOutput(&buf)
|
|
oldFlags := log.Flags()
|
|
log.SetFlags(0)
|
|
defer func() {
|
|
log.SetOutput(os.Stderr)
|
|
log.SetFlags(oldFlags)
|
|
}()
|
|
|
|
type ActualArguments = struct {
|
|
src *CsvToLineReader
|
|
err error
|
|
row []string
|
|
}
|
|
type ExpectedArguments = struct {
|
|
errorString string
|
|
row []string
|
|
}
|
|
|
|
// note: csv contains a field with newline and an extra empty lines
|
|
csv := `sep=;
|
|
|
|
_measurement;a|long:strict
|
|
;1
|
|
cpu;"2
|
|
.1"
|
|
|
|
cpu;3a
|
|
`
|
|
calledArgs := []ActualArguments{}
|
|
expectedArgs := []ExpectedArguments{
|
|
{
|
|
"line 4: column '_measurement': no measurement supplied",
|
|
[]string{"", "1"},
|
|
},
|
|
{
|
|
"line 6: column 'a': '2\n.1' cannot fit into long data type",
|
|
[]string{"cpu", "2\n.1"},
|
|
},
|
|
{
|
|
"line 8: column 'a': strconv.ParseInt:",
|
|
[]string{"cpu", "3a"},
|
|
},
|
|
}
|
|
|
|
reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true)
|
|
reader.RowSkipped = func(src *CsvToLineReader, err error, _row []string) {
|
|
// make a copy of _row
|
|
row := make([]string, len(_row))
|
|
copy(row, _row)
|
|
// remember for comparison
|
|
calledArgs = append(calledArgs, ActualArguments{
|
|
src, err, row,
|
|
})
|
|
}
|
|
// read all the data
|
|
io.ReadAll(reader)
|
|
|
|
out := buf.String()
|
|
require.Empty(t, out, "No log messages expected because RowSkipped handler is set")
|
|
|
|
require.Len(t, calledArgs, 3)
|
|
for i, expected := range expectedArgs {
|
|
require.Equal(t, reader, calledArgs[i].src)
|
|
require.Contains(t, calledArgs[i].err.Error(), expected.errorString)
|
|
require.Equal(t, expected.row, calledArgs[i].row)
|
|
}
|
|
// 8 lines were read
|
|
require.Equal(t, 8, reader.LineNumber)
|
|
}
|
|
|
|
// Test_CsvToLineProtocol_LineEndingWarning checks correct logging of exotic line ending
|
|
func Test_CsvToLineProtocol_LineEndingWarning(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
log.SetOutput(&buf)
|
|
oldFlags := log.Flags()
|
|
log.SetFlags(0)
|
|
oldPrefix := log.Prefix()
|
|
prefix := "::PREFIX::"
|
|
log.SetPrefix(prefix)
|
|
defer func() {
|
|
log.SetOutput(os.Stderr)
|
|
log.SetFlags(oldFlags)
|
|
log.SetPrefix(oldPrefix)
|
|
}()
|
|
|
|
csv := "#datatype dateTime:number,string,tag,double,measurement\r" +
|
|
"time,sensor_id,parent,average,m\r" +
|
|
"1549240000000000000,xx:xx:xx:xx:xx:xx,2nd Floor Open Plan DS,0,test"
|
|
|
|
reader := CsvToLineProtocol(strings.NewReader(csv))
|
|
bytes, _ := io.ReadAll(reader)
|
|
|
|
out := buf.String()
|
|
messages := strings.Count(out, prefix)
|
|
require.Equal(t, messages, 1)
|
|
require.Contains(t, out, "line 1")
|
|
require.Contains(t, out, "standalone CR character found. Only CRLF and LF line endings are supported.")
|
|
require.Empty(t, bytes)
|
|
}
|
|
|
|
// Test_CsvToLineProtocol_WindowsLineEndings checks CRLF line endings
|
|
func Test_CsvToLineProtocol_WindowsLineEndings(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
log.SetOutput(&buf)
|
|
oldFlags := log.Flags()
|
|
log.SetFlags(0)
|
|
oldPrefix := log.Prefix()
|
|
prefix := "::PREFIX::"
|
|
log.SetPrefix(prefix)
|
|
defer func() {
|
|
log.SetOutput(os.Stderr)
|
|
log.SetFlags(oldFlags)
|
|
log.SetPrefix(oldPrefix)
|
|
}()
|
|
|
|
csv := "#datatype dateTime:number,string,tag,double,measurement\r\n" +
|
|
"time,sensor_id,parent,average,m\r\n" +
|
|
"1549240000000000000,a,b,0,test"
|
|
|
|
reader := CsvToLineProtocol(strings.NewReader(csv))
|
|
bytes, _ := io.ReadAll(reader)
|
|
|
|
out := buf.String()
|
|
messages := strings.Count(out, prefix)
|
|
require.Equal(t, messages, 0)
|
|
require.Equal(t, string(bytes), "test,parent=b sensor_id=\"a\",average=0 1549240000000000000\n")
|
|
}
|