From ab915220812e1d5961fbcd6c99fe34f5246eb6b0 Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Mon, 26 Apr 2021 10:47:02 -0400 Subject: [PATCH] feat: port line reader from influxdb (#45) --- internal/linereader/multi_reader.go | 243 +++++++++++ internal/linereader/multi_reader_test.go | 494 +++++++++++++++++++++++ 2 files changed, 737 insertions(+) create mode 100644 internal/linereader/multi_reader.go create mode 100644 internal/linereader/multi_reader_test.go diff --git a/internal/linereader/multi_reader.go b/internal/linereader/multi_reader.go new file mode 100644 index 0000000..035f1ef --- /dev/null +++ b/internal/linereader/multi_reader.go @@ -0,0 +1,243 @@ +package linereader + +import ( + "compress/gzip" + "context" + "encoding/csv" + "fmt" + "io" + "log" + "net/http" + "net/url" + "os" + "strings" + + "github.com/influxdata/influx-cli/v2/pkg/csv2lp" +) + +var _ HttpClient = (*http.Client)(nil) + +type HttpClient interface { + Do(*http.Request) (*http.Response, error) +} + +type InputFormat int + +const ( + InputFormatDerived InputFormat = iota + InputFormatCSV + InputFormatLP +) + +type InputCompression int + +const ( + InputCompressionDerived InputCompression = iota + InputCompressionGZIP + InputCompressionNone +) + +type MultiInputLineReader struct { + StdIn io.Reader + HttpClient HttpClient + ErrorOut io.Writer + + Args []string + Files []string + URLs []string + Format InputFormat + Compression InputCompression + Encoding string + + // CSV-specific options. + Headers []string + SkipRowOnError bool + SkipHeader int + IgnoreDataTypeInColumnName bool + Debug bool +} + +func (r *MultiInputLineReader) Open(ctx context.Context) (io.Reader, io.Closer, error) { + if r.Debug { + log.Printf("WriteFlags%+v", r) + } + + args := r.Args + files := r.Files + if len(args) > 0 && len(args[0]) > 1 && args[0][0] == '@' { + // backward compatibility: @ in arg denotes a file + files = append(files, args[0][1:]) + args = args[:0] + } + + readers := make([]io.Reader, 0, 2*len(r.Headers)+2*len(files)+2*len(r.URLs)+1) + closers := make([]io.Closer, 0, len(files)+len(r.URLs)) + + // validate and setup decoding of files/stdin if encoding is supplied + decode, err := csv2lp.CreateDecoder(r.Encoding) + if err != nil { + return nil, csv2lp.MultiCloser(closers...), err + } + + // utility to manage common steps used to decode / decompress input sources, + // while tracking resources that must be cleaned-up after reading. + addReader := func(r io.Reader, name string, compressed bool) error { + if compressed { + rcz, err := gzip.NewReader(r) + if err != nil { + return fmt.Errorf("failed to decompress %s: %w", name, err) + } + closers = append(closers, rcz) + r = rcz + } + readers = append(readers, decode(r), strings.NewReader("\n")) + return nil + } + + // prepend header lines + if len(r.Headers) > 0 { + for _, header := range r.Headers { + readers = append(readers, strings.NewReader(header), strings.NewReader("\n")) + + } + if r.Format == InputFormatDerived { + r.Format = InputFormatCSV + } + } + + // add files + for _, file := range files { + f, err := os.Open(file) + if err != nil { + return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", file, err) + } + closers = append(closers, f) + + fname := file + compressed := r.Compression == InputCompressionGZIP || (r.Compression == InputCompressionDerived && strings.HasSuffix(fname, ".gz")) + if compressed { + fname = strings.TrimSuffix(fname, ".gz") + } + if r.Format == InputFormatDerived && strings.HasSuffix(fname, ".csv") { + r.Format = InputFormatCSV + } + + if err = addReader(f, file, compressed); err != nil { + return nil, csv2lp.MultiCloser(closers...), err + } + } + + // allow URL data sources, a simple alternative to `curl -f -s http://... | influx batcher ...` + for _, addr := range r.URLs { + u, err := url.Parse(addr) + if err != nil { + return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, addr, nil) + if err != nil { + return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err) + } + req.Header.Set("Accept-Encoding", "gzip") + resp, err := r.HttpClient.Do(req) + if err != nil { + return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err) + } + if resp.Body != nil { + closers = append(closers, resp.Body) + } + if resp.StatusCode/100 != 2 { + return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: response status_code=%d", addr, resp.StatusCode) + } + + compressed := r.Compression == InputCompressionGZIP || + resp.Header.Get("Content-Encoding") == "gzip" || + (r.Compression == InputCompressionDerived && strings.HasSuffix(u.Path, ".gz")) + if compressed { + u.Path = strings.TrimSuffix(u.Path, ".gz") + } + if r.Format == InputFormatDerived && + (strings.HasSuffix(u.Path, ".csv") || strings.HasPrefix(resp.Header.Get("Content-Type"), "text/csv")) { + r.Format = InputFormatCSV + } + + if err = addReader(resp.Body, addr, compressed); err != nil { + return nil, csv2lp.MultiCloser(closers...), err + } + } + + // add stdin or a single argument + switch { + case len(args) == 0: + // use also stdIn if it is a terminal + if r.StdIn != nil && !isCharacterDevice(r.StdIn) { + if err = addReader(r.StdIn, "stdin", r.Compression == InputCompressionGZIP); err != nil { + return nil, csv2lp.MultiCloser(closers...), err + } + } + case args[0] == "-": + // "-" also means stdin + if err = addReader(r.StdIn, "stdin", r.Compression == InputCompressionGZIP); err != nil { + return nil, csv2lp.MultiCloser(closers...), err + } + default: + if err = addReader(strings.NewReader(args[0]), "arg 0", r.Compression == InputCompressionGZIP); err != nil { + return nil, csv2lp.MultiCloser(closers...), err + } + } + + // skipHeader lines when set + if r.SkipHeader > 0 { + // find the last non-string reader (stdin or file) + for i := len(readers) - 1; i >= 0; i-- { + _, stringReader := readers[i].(*strings.Reader) + if !stringReader { // ignore headers and new lines + readers[i] = csv2lp.SkipHeaderLinesReader(r.SkipHeader, readers[i]) + break + } + } + } + + // create writer for errors-file, if supplied + var errorsFile *csv.Writer + var rowSkippedListener func(*csv2lp.CsvToLineReader, error, []string) + if r.ErrorOut != nil { + errorsFile = csv.NewWriter(r.ErrorOut) + rowSkippedListener = func(source *csv2lp.CsvToLineReader, lineError error, row []string) { + log.Println(lineError) + errorsFile.Comma = source.Comma() + errorsFile.Write([]string{fmt.Sprintf("# error : %v", lineError)}) + if err := errorsFile.Write(row); err != nil { + log.Printf("Unable to batcher to error-file: %v\n", err) + } + errorsFile.Flush() // flush is required + } + } + + // concatenate readers + reader := io.MultiReader(readers...) + if r.Format == InputFormatCSV { + csvReader := csv2lp.CsvToLineProtocol(reader) + csvReader.LogTableColumns(r.Debug) + csvReader.SkipRowOnError(r.SkipRowOnError) + csvReader.Table.IgnoreDataTypeInColumnName(r.IgnoreDataTypeInColumnName) + // change LineNumber to report file/stdin line numbers properly + csvReader.LineNumber = r.SkipHeader - len(r.Headers) + csvReader.RowSkipped = rowSkippedListener + reader = csvReader + } + + return reader, csv2lp.MultiCloser(closers...), nil +} + +// isCharacterDevice returns true if the supplied reader is a character device (a terminal) +func isCharacterDevice(reader io.Reader) bool { + file, isFile := reader.(*os.File) + if !isFile { + return false + } + info, err := file.Stat() + if err != nil { + return false + } + return (info.Mode() & os.ModeCharDevice) == os.ModeCharDevice +} diff --git a/internal/linereader/multi_reader_test.go b/internal/linereader/multi_reader_test.go new file mode 100644 index 0000000..c2b6ec3 --- /dev/null +++ b/internal/linereader/multi_reader_test.go @@ -0,0 +1,494 @@ +package linereader_test + +import ( + "bufio" + "bytes" + "compress/gzip" + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" + "strings" + "testing" + + "github.com/influxdata/influx-cli/v2/internal/linereader" + "github.com/stretchr/testify/require" +) + +func readLines(reader io.Reader) []string { + scanner := bufio.NewScanner(reader) + retVal := make([]string, 0, 3) + for scanner.Scan() { + retVal = append(retVal, scanner.Text()) + } + return retVal +} + +func createTempFile(t *testing.T, suffix string, contents []byte, compress bool) string { + t.Helper() + + file, err := ioutil.TempFile("", "influx_writeTest*."+suffix) + require.NoError(t, err) + defer file.Close() + + var writer io.Writer = file + if compress { + gzipWriter := gzip.NewWriter(writer) + defer gzipWriter.Close() + writer = gzipWriter + } + + _, err = writer.Write(contents) + require.NoError(t, err) + + return file.Name() +} + +type mockClient struct { + t *testing.T + fail bool +} + +func (c *mockClient) Do(req *http.Request) (*http.Response, error) { + if c.fail { + resp := http.Response{StatusCode: 500} + return &resp, nil + } + query := req.URL.Query() + resp := http.Response{Header: map[string][]string{}} + if contentType := query.Get("Content-Type"); contentType != "" { + resp.Header.Set("Content-Type", contentType) + } + if encoding := query.Get("encoding"); encoding != "" { + resp.Header.Set("Content-Encoding", encoding) + } + compress := query.Get("compress") != "" + resp.StatusCode = http.StatusOK + if data := query.Get("data"); data != "" { + body := bytes.Buffer{} + var writer io.Writer = &body + if compress { + gzw := gzip.NewWriter(writer) + defer gzw.Close() + writer = gzw + } + _, err := writer.Write([]byte(data)) + require.NoError(c.t, err) + resp.Body = ioutil.NopCloser(&body) + } + + return &resp, nil +} + +func TestLineReader(t *testing.T) { + gzipStdin := func(uncompressed string) io.Reader { + contents := &bytes.Buffer{} + writer := gzip.NewWriter(contents) + _, err := writer.Write([]byte(uncompressed)) + require.NoError(t, err) + require.NoError(t, writer.Close()) + return contents + } + + lpContents := "f1 b=f2,c=f3,d=f4" + lpFile := createTempFile(t, "txt", []byte(lpContents), false) + gzipLpFile := createTempFile(t, "txt.gz", []byte(lpContents), true) + gzipLpFileNoExt := createTempFile(t, "lp", []byte(lpContents), true) + stdInLpContents := "stdin3 i=stdin1,j=stdin2,k=stdin4" + + csvContents := "_measurement,b,c,d\nf1,f2,f3,f4" + csvFile := createTempFile(t, "csv", []byte(csvContents), false) + gzipCsvFile := createTempFile(t, "csv.gz", []byte(csvContents), true) + gzipCsvFileNoExt := createTempFile(t, "csv", []byte(csvContents), true) + stdInCsvContents := "i,j,_measurement,k\nstdin1,stdin2,stdin3,stdin4" + + defer func() { + for _, f := range []string{lpFile, gzipLpFile, gzipLpFileNoExt, csvFile, gzipCsvFile, gzipCsvFileNoExt} { + _ = os.Remove(f) + } + }() + + var tests = []struct { + name string + // input + args []string + files []string + urls []string + format linereader.InputFormat + compression linereader.InputCompression + encoding string + headers []string + skipHeader int + ignoreDataTypeInColumnName bool + stdIn io.Reader + // output + firstLineCorrection int // 0 unless shifted by prepended headers or skipped rows + lines []string + }{ + { + name: "read data from LP file", + files: []string{lpFile}, + firstLineCorrection: 0, + lines: []string{ + lpContents, + }, + }, + { + name: "read data from LP file using non-UTF encoding", + files: []string{lpFile}, + encoding: "ISO_8859-1", + firstLineCorrection: 0, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed LP data from file", + files: []string{gzipLpFileNoExt}, + compression: linereader.InputCompressionGZIP, + firstLineCorrection: 0, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed data from LP file using non-UTF encoding", + files: []string{gzipLpFileNoExt}, + compression: linereader.InputCompressionGZIP, + encoding: "ISO_8859-1", + firstLineCorrection: 0, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed LP data from file ending in .gz", + files: []string{gzipLpFile}, + firstLineCorrection: 0, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed and uncompressed LP data from file in the same call", + files: []string{gzipLpFile, lpFile}, + firstLineCorrection: 0, + lines: []string{ + lpContents, + lpContents, + }, + }, + { + name: "read LP data from stdin", + stdIn: strings.NewReader(stdInLpContents), + lines: []string{ + stdInLpContents, + }, + }, + { + name: "read compressed LP data from stdin", + compression: linereader.InputCompressionGZIP, + stdIn: gzipStdin(stdInLpContents), + lines: []string{ + stdInLpContents, + }, + }, + { + name: "read LP data from stdin using '-' argument", + args: []string{"-"}, + stdIn: strings.NewReader(stdInLpContents), + lines: []string{ + stdInLpContents, + }, + }, + { + name: "read compressed LP data from stdin using '-' argument", + compression: linereader.InputCompressionGZIP, + args: []string{"-"}, + stdIn: gzipStdin(stdInLpContents), + lines: []string{ + stdInLpContents, + }, + }, + { + name: "read LP data from 1st argument", + args: []string{stdInLpContents}, + lines: []string{ + stdInLpContents, + }, + }, + { + name: "read LP data from URL", + urls: []string{fmt.Sprintf("/a?data=%s", url.QueryEscape(lpContents))}, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed LP data from URL", + urls: []string{fmt.Sprintf("/a?data=%s&compress=true", url.QueryEscape(lpContents))}, + compression: linereader.InputCompressionGZIP, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed LP data from URL ending in .gz", + urls: []string{fmt.Sprintf("/a.gz?data=%s&compress=true", url.QueryEscape(lpContents))}, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed LP data from URL with gzip encoding", + urls: []string{fmt.Sprintf("/a?data=%s&compress=true&encoding=gzip", url.QueryEscape(lpContents))}, + lines: []string{ + lpContents, + }, + }, + { + name: "read data from CSV file + transform to line protocol", + files: []string{csvFile}, + firstLineCorrection: 0, // no changes + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed CSV data from file + transform to line protocol", + files: []string{gzipCsvFileNoExt}, + compression: linereader.InputCompressionGZIP, + firstLineCorrection: 0, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed CSV data from file ending in .csv.gz + transform to line protocol", + files: []string{gzipCsvFile}, + firstLineCorrection: 0, + lines: []string{ + lpContents, + }, + }, + { + name: "read CSV data from --header and --file + transform to line protocol", + headers: []string{"x,_measurement,y,z"}, + files: []string{csvFile}, + firstLineCorrection: -1, // shifted back by header line + lines: []string{ + "b x=_measurement,y=c,z=d", + "f2 x=f1,y=f3,z=f4", + }, + }, + { + name: "read CSV data from --header and @file argument with 1st row in file skipped + transform to line protocol", + headers: []string{"x,_measurement,y,z"}, + skipHeader: 1, + args: []string{"@" + csvFile}, + firstLineCorrection: 0, // shifted (-1) back by header line, forward (+1) by skipHeader + lines: []string{ + "f2 x=f1,y=f3,z=f4", + }, + }, + { + name: "read CSV data from stdin + transform to line protocol", + format: linereader.InputFormatCSV, + stdIn: strings.NewReader(stdInCsvContents), + lines: []string{ + stdInLpContents, + }, + }, + { + name: "read compressed CSV data from stdin + transform to line protocol", + format: linereader.InputFormatCSV, + compression: linereader.InputCompressionGZIP, + stdIn: gzipStdin(stdInCsvContents), + lines: []string{ + stdInLpContents, + }, + }, + { + name: "read CSV data from stdin using '-' argument + transform to line protocol", + format: linereader.InputFormatCSV, + args: []string{"-"}, + stdIn: strings.NewReader(stdInCsvContents), + lines: []string{ + stdInLpContents, + }, + }, + { + name: "read compressed CSV data from stdin using '-' argument + transform to line protocol", + format: linereader.InputFormatCSV, + compression: linereader.InputCompressionGZIP, + args: []string{"-"}, + stdIn: gzipStdin(stdInCsvContents), + lines: []string{ + stdInLpContents, + }, + }, + { + name: "read CSV data from 1st argument + transform to line protocol", + format: linereader.InputFormatCSV, + args: []string{stdInCsvContents}, + lines: []string{ + stdInLpContents, + }, + }, + { + name: "read data from .csv URL + transform to line protocol", + urls: []string{fmt.Sprintf("/a.csv?data=%s", url.QueryEscape(csvContents))}, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed CSV data from URL + transform to line protocol", + urls: []string{fmt.Sprintf("/a.csv?data=%s&compress=true", url.QueryEscape(csvContents))}, + compression: linereader.InputCompressionGZIP, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed CSV data from URL ending in .csv.gz + transform to line protocol", + urls: []string{fmt.Sprintf("/a.csv.gz?data=%s&compress=true", url.QueryEscape(csvContents))}, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed CSV data from URL with gzip encoding + transform to line protocol", + urls: []string{fmt.Sprintf("/a.csv?data=%s&compress=true&encoding=gzip", url.QueryEscape(csvContents))}, + lines: []string{ + lpContents, + }, + }, + { + name: "read data from .csv URL + change header line + transform to line protocol", + urls: []string{fmt.Sprintf("/a.csv?data=%s", url.QueryEscape(csvContents))}, + headers: []string{"k,j,_measurement,i"}, + skipHeader: 1, + lines: []string{ + "f3 k=f1,j=f2,i=f4", + }, + }, + { + name: "read data from URL with text/csv Content-Type + transform to line protocol", + urls: []string{fmt.Sprintf("/a?Content-Type=text/csv&data=%s", url.QueryEscape(csvContents))}, + lines: []string{ + lpContents, + }, + }, + { + name: "read compressed data from URL with text/csv Content-Type and gzip Content-Encoding + transform to line protocol", + urls: []string{fmt.Sprintf("/a?Content-Type=text/csv&data=%s&compress=true&encoding=gzip", url.QueryEscape(csvContents))}, + lines: []string{ + lpContents, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r := &linereader.MultiInputLineReader{ + StdIn: test.stdIn, + HttpClient: &mockClient{t: t}, + Args: test.args, + Files: test.files, + URLs: test.urls, + Format: test.format, + Compression: test.compression, + Headers: test.headers, + SkipHeader: test.skipHeader, + IgnoreDataTypeInColumnName: test.ignoreDataTypeInColumnName, + } + reader, closer, err := r.Open(context.Background()) + require.NotNil(t, closer) + defer closer.Close() + require.NoError(t, err) + require.NotNil(t, reader) + lines := readLines(reader) + require.Equal(t, test.lines, lines) + }) + } +} + +func TestLineReaderErrors(t *testing.T) { + csvFile1 := createTempFile(t, "csv", []byte("_measurement,b,c,d\nf1,f2,f3,f4"), false) + defer os.Remove(csvFile1) + + var tests = []struct { + name string + encoding string + files []string + urls []string + message string + }{ + { + name: "unsupported encoding", + encoding: "green", + message: "https://www.iana.org/assignments/character-sets/character-sets.xhtml", // hint to available values + }, + { + name: "file not found", + files: []string{csvFile1 + "x"}, + message: csvFile1, + }, + { + name: "unsupported URL", + urls: []string{"wit://whatever"}, + message: "wit://whatever", + }, + { + name: "invalid URL", + urls: []string{"http://test%zy"}, // 2 hex digits after % expected + message: "http://test%zy", + }, + { + name: "URL with 500 status code", + urls: []string{"/test"}, + message: "/test", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r := linereader.MultiInputLineReader{ + HttpClient: &mockClient{t: t, fail: true}, + Files: test.files, + URLs: test.urls, + Encoding: test.encoding, + } + _, closer, err := r.Open(context.Background()) + require.NotNil(t, closer) + defer closer.Close() + require.Error(t, err) + require.Contains(t, err.Error(), test.message) + }) + } +} + +func TestLineReaderErrorOut(t *testing.T) { + stdInContents := "_measurement,a|long:strict\nm,1\nm,1.1" + errorOut := bytes.Buffer{} + + r := linereader.MultiInputLineReader{ + StdIn: strings.NewReader(stdInContents), + ErrorOut: &errorOut, + Format: linereader.InputFormatCSV, + } + reader, closer, err := r.Open(context.Background()) + require.NoError(t, err) + defer closer.Close() + + out := bytes.Buffer{} + _, err = io.Copy(&out, reader) + require.NoError(t, err) + + require.Equal(t, "m a=1i", strings.Trim(out.String(), "\n")) + errorLines := errorOut.String() + require.Equal(t, "# error : line 3: column 'a': '1.1' cannot fit into long data type\nm,1.1", strings.Trim(errorLines, "\n")) +}