feat: add skipRowOnError handling for raw line protocol files (#419)
* feat: add skipRowOnError for raw line protocol files * use common code from influxdb instead of copying * add test * remove dead code comment
This commit is contained in:
parent
e5707cd63c
commit
da2899d71d
@ -278,6 +278,8 @@ func (r *MultiInputLineReader) Open(ctx context.Context) (io.Reader, io.Closer,
|
||||
csvReader.LineNumber = r.SkipHeader - len(r.Headers)
|
||||
csvReader.RowSkipped = rowSkippedListener
|
||||
reader = csvReader
|
||||
} else if r.SkipRowOnError {
|
||||
reader = csv2lp.LineProtocolFilter(reader)
|
||||
}
|
||||
|
||||
return reader, csv2lp.MultiCloser(closers...), nil
|
||||
|
5
go.mod
5
go.mod
@ -16,6 +16,7 @@ require (
|
||||
github.com/golang/mock v1.5.0
|
||||
github.com/google/go-jsonnet v0.17.0
|
||||
github.com/influxdata/go-prompt v0.2.7
|
||||
github.com/influxdata/influxdb/v2 v2.3.0
|
||||
github.com/mattn/go-isatty v0.0.14
|
||||
github.com/olekukonko/tablewriter v0.0.5
|
||||
github.com/stretchr/testify v1.7.0
|
||||
@ -24,7 +25,7 @@ require (
|
||||
golang.org/x/text v0.3.7
|
||||
golang.org/x/tools v0.1.11-0.20220316014157-77aa08bb151a
|
||||
google.golang.org/protobuf v1.27.1
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
honnef.co/go/tools v0.3.0
|
||||
)
|
||||
|
||||
@ -49,7 +50,7 @@ require (
|
||||
github.com/rivo/uniseg v0.2.0 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.0.1 // indirect
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
|
||||
github.com/stretchr/objx v0.1.0 // indirect
|
||||
github.com/stretchr/objx v0.1.1 // indirect
|
||||
golang.org/x/exp/typeparams v0.0.0-20220218215828-6cf2b201936e // indirect
|
||||
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
|
||||
|
12
go.sum
12
go.sum
@ -40,21 +40,23 @@ github.com/gocarina/gocsv v0.0.0-20210408192840-02d7211d929d/go.mod h1:5YoVOkjYA
|
||||
github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g=
|
||||
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
|
||||
github.com/google/go-jsonnet v0.17.0 h1:/9NIEfhK1NQRKl3sP2536b2+x5HnZMdql7x3yK/l8JY=
|
||||
github.com/google/go-jsonnet v0.17.0/go.mod h1:sOcuej3UW1vpPTZOr8L7RQimqai1a57bt5j22LzGZCw=
|
||||
github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec h1:qv2VnGeEQHchGaZ/u7lxST/RaJw+cv273q79D81Xbog=
|
||||
github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec/go.mod h1:Q48J4R4DvxnHolD5P8pOtXigYlRuPLGl6moFx3ulM68=
|
||||
github.com/influxdata/go-prompt v0.2.7 h1:LoJCo+imRHw4Md1Y6SWFtLRpuukAHXkQ6pWS+DVTiMM=
|
||||
github.com/influxdata/go-prompt v0.2.7/go.mod h1:sK0TeJSbAWCKVby14Tx85GJE4BZpIZpguBBmFqAqruE=
|
||||
github.com/influxdata/influxdb/v2 v2.3.0 h1:vA558griYta4uQx/GLe0AQ8RsxIhNmb8q8aXV/JrBFo=
|
||||
github.com/influxdata/influxdb/v2 v2.3.0/go.mod h1:rg13oLyRzxzV4Saz1aMYXx3gRCeBD+lSaPnZxMqR5Os=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
||||
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
|
||||
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
|
||||
@ -105,8 +107,9 @@ github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
|
||||
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
|
||||
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
@ -175,7 +178,8 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
honnef.co/go/tools v0.3.0 h1:2LdYUZ7CIxnYgskbUZfY7FPggmqnh6shBqfWa8Tn3XU=
|
||||
honnef.co/go/tools v0.3.0/go.mod h1:vlRD9XErLMGT+mDuofSr0mMMquscM/1nQqtRSsh6m70=
|
||||
|
44
pkg/csv2lp/lp_reader.go
Normal file
44
pkg/csv2lp/lp_reader.go
Normal file
@ -0,0 +1,44 @@
|
||||
package csv2lp
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
)
|
||||
|
||||
// LineProtocolFilterReader wraps a line reader and parses points, skipping if invalid
|
||||
type LineProtocolFilterReader struct {
|
||||
// lineReader is used to report line number of the last read CSV line
|
||||
lineReader *LineReader
|
||||
// LineNumber represents line number of csv.Reader, 1 is the first
|
||||
LineNumber int
|
||||
}
|
||||
|
||||
// LineProtocolFilter creates a reader wrapper that parses points, skipping if invalid
|
||||
func LineProtocolFilter(reader io.Reader) *LineProtocolFilterReader {
|
||||
lineReader := NewLineReader(reader)
|
||||
lineReader.LineNumber = 1 // start counting from 1
|
||||
return &LineProtocolFilterReader{
|
||||
lineReader: lineReader,
|
||||
}
|
||||
}
|
||||
|
||||
func (state *LineProtocolFilterReader) Read(b []byte) (int, error) {
|
||||
for {
|
||||
bytesRead, err := state.lineReader.Read(b)
|
||||
if err != nil {
|
||||
return bytesRead, err
|
||||
}
|
||||
state.LineNumber = state.lineReader.LastLineNumber
|
||||
buf := b[0:bytesRead]
|
||||
pts, err := models.ParsePoints(buf) // any time precision because we won't actually use this point
|
||||
if err != nil {
|
||||
log.Printf("invalid point on line %d: %v\n", state.LineNumber, err)
|
||||
continue
|
||||
} else if len(pts) == 0 { // no points on this line
|
||||
continue
|
||||
}
|
||||
return bytesRead, nil
|
||||
}
|
||||
}
|
68
pkg/csv2lp/lp_reader_test.go
Normal file
68
pkg/csv2lp/lp_reader_test.go
Normal file
@ -0,0 +1,68 @@
|
||||
package csv2lp
|
||||
|
||||
import (
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLineProtocolFilter(t *testing.T) {
|
||||
var tests = []struct {
|
||||
input string
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
strings.Join([]string{
|
||||
"weather,location=us-midwest temperature=42 1465839830100400200",
|
||||
"awefw.,weather,location=us-east temperature=36 1465839830100400203",
|
||||
"weather,location=us-blah temperature=32 1465839830100400204",
|
||||
"weather,location=us-central temperature=31 1465839830100400205",
|
||||
}, "\n"),
|
||||
strings.Join([]string{
|
||||
"weather,location=us-midwest temperature=42 1465839830100400200",
|
||||
"weather,location=us-blah temperature=32 1465839830100400204",
|
||||
"weather,location=us-central temperature=31 1465839830100400205",
|
||||
}, "\n"),
|
||||
},
|
||||
{
|
||||
strings.Join([]string{
|
||||
"weather,location=us-midwest temperature=42 1465839830100400200",
|
||||
"weather,location=us-east temperature=36 1465839830100400203",
|
||||
"weather,location=us-blah temperature=32=33 1465839830100400204",
|
||||
"weather,,location=us-blah temperature=32 1465839830100400204",
|
||||
"weather,location=us-central temperature=31 1465839830100400205",
|
||||
}, "\n"),
|
||||
strings.Join([]string{
|
||||
"weather,location=us-midwest temperature=42 1465839830100400200",
|
||||
"weather,location=us-east temperature=36 1465839830100400203",
|
||||
"weather,location=us-central temperature=31 1465839830100400205",
|
||||
}, "\n"),
|
||||
},
|
||||
{
|
||||
strings.Join([]string{
|
||||
"weather,location=us-midwest temperature=42 1465839830100400200",
|
||||
"awefw.,weather,location=us-east temperature=36 1465839830100400203",
|
||||
" weather,location=us-blah temperature=32 1465839830100400204",
|
||||
"weather,location=us-east temperature=36 1465839830100400203 13413413",
|
||||
"weather,location=us-central temperature=31 1465839830100400205",
|
||||
"# this is a comment",
|
||||
}, "\n"),
|
||||
strings.Join([]string{
|
||||
"weather,location=us-midwest temperature=42 1465839830100400200",
|
||||
" weather,location=us-blah temperature=32 1465839830100400204",
|
||||
"weather,location=us-central temperature=31 1465839830100400205",
|
||||
}, "\n"),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
reader := LineProtocolFilter(strings.NewReader(tt.input))
|
||||
b, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
t.Errorf("failed reading: %v", err)
|
||||
continue
|
||||
}
|
||||
require.Equal(t, strings.TrimSpace(string(b)), strings.TrimSpace(tt.expected))
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user