feat: port line reader from influxdb (#45)
This commit is contained in:
parent
74d622bece
commit
ab91522081
243
internal/linereader/multi_reader.go
Normal file
243
internal/linereader/multi_reader.go
Normal file
@ -0,0 +1,243 @@
|
||||
package linereader
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/influx-cli/v2/pkg/csv2lp"
|
||||
)
|
||||
|
||||
var _ HttpClient = (*http.Client)(nil)
|
||||
|
||||
type HttpClient interface {
|
||||
Do(*http.Request) (*http.Response, error)
|
||||
}
|
||||
|
||||
type InputFormat int
|
||||
|
||||
const (
|
||||
InputFormatDerived InputFormat = iota
|
||||
InputFormatCSV
|
||||
InputFormatLP
|
||||
)
|
||||
|
||||
type InputCompression int
|
||||
|
||||
const (
|
||||
InputCompressionDerived InputCompression = iota
|
||||
InputCompressionGZIP
|
||||
InputCompressionNone
|
||||
)
|
||||
|
||||
type MultiInputLineReader struct {
|
||||
StdIn io.Reader
|
||||
HttpClient HttpClient
|
||||
ErrorOut io.Writer
|
||||
|
||||
Args []string
|
||||
Files []string
|
||||
URLs []string
|
||||
Format InputFormat
|
||||
Compression InputCompression
|
||||
Encoding string
|
||||
|
||||
// CSV-specific options.
|
||||
Headers []string
|
||||
SkipRowOnError bool
|
||||
SkipHeader int
|
||||
IgnoreDataTypeInColumnName bool
|
||||
Debug bool
|
||||
}
|
||||
|
||||
func (r *MultiInputLineReader) Open(ctx context.Context) (io.Reader, io.Closer, error) {
|
||||
if r.Debug {
|
||||
log.Printf("WriteFlags%+v", r)
|
||||
}
|
||||
|
||||
args := r.Args
|
||||
files := r.Files
|
||||
if len(args) > 0 && len(args[0]) > 1 && args[0][0] == '@' {
|
||||
// backward compatibility: @ in arg denotes a file
|
||||
files = append(files, args[0][1:])
|
||||
args = args[:0]
|
||||
}
|
||||
|
||||
readers := make([]io.Reader, 0, 2*len(r.Headers)+2*len(files)+2*len(r.URLs)+1)
|
||||
closers := make([]io.Closer, 0, len(files)+len(r.URLs))
|
||||
|
||||
// validate and setup decoding of files/stdin if encoding is supplied
|
||||
decode, err := csv2lp.CreateDecoder(r.Encoding)
|
||||
if err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), err
|
||||
}
|
||||
|
||||
// utility to manage common steps used to decode / decompress input sources,
|
||||
// while tracking resources that must be cleaned-up after reading.
|
||||
addReader := func(r io.Reader, name string, compressed bool) error {
|
||||
if compressed {
|
||||
rcz, err := gzip.NewReader(r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decompress %s: %w", name, err)
|
||||
}
|
||||
closers = append(closers, rcz)
|
||||
r = rcz
|
||||
}
|
||||
readers = append(readers, decode(r), strings.NewReader("\n"))
|
||||
return nil
|
||||
}
|
||||
|
||||
// prepend header lines
|
||||
if len(r.Headers) > 0 {
|
||||
for _, header := range r.Headers {
|
||||
readers = append(readers, strings.NewReader(header), strings.NewReader("\n"))
|
||||
|
||||
}
|
||||
if r.Format == InputFormatDerived {
|
||||
r.Format = InputFormatCSV
|
||||
}
|
||||
}
|
||||
|
||||
// add files
|
||||
for _, file := range files {
|
||||
f, err := os.Open(file)
|
||||
if err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", file, err)
|
||||
}
|
||||
closers = append(closers, f)
|
||||
|
||||
fname := file
|
||||
compressed := r.Compression == InputCompressionGZIP || (r.Compression == InputCompressionDerived && strings.HasSuffix(fname, ".gz"))
|
||||
if compressed {
|
||||
fname = strings.TrimSuffix(fname, ".gz")
|
||||
}
|
||||
if r.Format == InputFormatDerived && strings.HasSuffix(fname, ".csv") {
|
||||
r.Format = InputFormatCSV
|
||||
}
|
||||
|
||||
if err = addReader(f, file, compressed); err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), err
|
||||
}
|
||||
}
|
||||
|
||||
// allow URL data sources, a simple alternative to `curl -f -s http://... | influx batcher ...`
|
||||
for _, addr := range r.URLs {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err)
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, addr, nil)
|
||||
if err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err)
|
||||
}
|
||||
req.Header.Set("Accept-Encoding", "gzip")
|
||||
resp, err := r.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err)
|
||||
}
|
||||
if resp.Body != nil {
|
||||
closers = append(closers, resp.Body)
|
||||
}
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: response status_code=%d", addr, resp.StatusCode)
|
||||
}
|
||||
|
||||
compressed := r.Compression == InputCompressionGZIP ||
|
||||
resp.Header.Get("Content-Encoding") == "gzip" ||
|
||||
(r.Compression == InputCompressionDerived && strings.HasSuffix(u.Path, ".gz"))
|
||||
if compressed {
|
||||
u.Path = strings.TrimSuffix(u.Path, ".gz")
|
||||
}
|
||||
if r.Format == InputFormatDerived &&
|
||||
(strings.HasSuffix(u.Path, ".csv") || strings.HasPrefix(resp.Header.Get("Content-Type"), "text/csv")) {
|
||||
r.Format = InputFormatCSV
|
||||
}
|
||||
|
||||
if err = addReader(resp.Body, addr, compressed); err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), err
|
||||
}
|
||||
}
|
||||
|
||||
// add stdin or a single argument
|
||||
switch {
|
||||
case len(args) == 0:
|
||||
// use also stdIn if it is a terminal
|
||||
if r.StdIn != nil && !isCharacterDevice(r.StdIn) {
|
||||
if err = addReader(r.StdIn, "stdin", r.Compression == InputCompressionGZIP); err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), err
|
||||
}
|
||||
}
|
||||
case args[0] == "-":
|
||||
// "-" also means stdin
|
||||
if err = addReader(r.StdIn, "stdin", r.Compression == InputCompressionGZIP); err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), err
|
||||
}
|
||||
default:
|
||||
if err = addReader(strings.NewReader(args[0]), "arg 0", r.Compression == InputCompressionGZIP); err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), err
|
||||
}
|
||||
}
|
||||
|
||||
// skipHeader lines when set
|
||||
if r.SkipHeader > 0 {
|
||||
// find the last non-string reader (stdin or file)
|
||||
for i := len(readers) - 1; i >= 0; i-- {
|
||||
_, stringReader := readers[i].(*strings.Reader)
|
||||
if !stringReader { // ignore headers and new lines
|
||||
readers[i] = csv2lp.SkipHeaderLinesReader(r.SkipHeader, readers[i])
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// create writer for errors-file, if supplied
|
||||
var errorsFile *csv.Writer
|
||||
var rowSkippedListener func(*csv2lp.CsvToLineReader, error, []string)
|
||||
if r.ErrorOut != nil {
|
||||
errorsFile = csv.NewWriter(r.ErrorOut)
|
||||
rowSkippedListener = func(source *csv2lp.CsvToLineReader, lineError error, row []string) {
|
||||
log.Println(lineError)
|
||||
errorsFile.Comma = source.Comma()
|
||||
errorsFile.Write([]string{fmt.Sprintf("# error : %v", lineError)})
|
||||
if err := errorsFile.Write(row); err != nil {
|
||||
log.Printf("Unable to batcher to error-file: %v\n", err)
|
||||
}
|
||||
errorsFile.Flush() // flush is required
|
||||
}
|
||||
}
|
||||
|
||||
// concatenate readers
|
||||
reader := io.MultiReader(readers...)
|
||||
if r.Format == InputFormatCSV {
|
||||
csvReader := csv2lp.CsvToLineProtocol(reader)
|
||||
csvReader.LogTableColumns(r.Debug)
|
||||
csvReader.SkipRowOnError(r.SkipRowOnError)
|
||||
csvReader.Table.IgnoreDataTypeInColumnName(r.IgnoreDataTypeInColumnName)
|
||||
// change LineNumber to report file/stdin line numbers properly
|
||||
csvReader.LineNumber = r.SkipHeader - len(r.Headers)
|
||||
csvReader.RowSkipped = rowSkippedListener
|
||||
reader = csvReader
|
||||
}
|
||||
|
||||
return reader, csv2lp.MultiCloser(closers...), nil
|
||||
}
|
||||
|
||||
// isCharacterDevice returns true if the supplied reader is a character device (a terminal)
|
||||
func isCharacterDevice(reader io.Reader) bool {
|
||||
file, isFile := reader.(*os.File)
|
||||
if !isFile {
|
||||
return false
|
||||
}
|
||||
info, err := file.Stat()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return (info.Mode() & os.ModeCharDevice) == os.ModeCharDevice
|
||||
}
|
494
internal/linereader/multi_reader_test.go
Normal file
494
internal/linereader/multi_reader_test.go
Normal file
@ -0,0 +1,494 @@
|
||||
package linereader_test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influx-cli/v2/internal/linereader"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func readLines(reader io.Reader) []string {
|
||||
scanner := bufio.NewScanner(reader)
|
||||
retVal := make([]string, 0, 3)
|
||||
for scanner.Scan() {
|
||||
retVal = append(retVal, scanner.Text())
|
||||
}
|
||||
return retVal
|
||||
}
|
||||
|
||||
func createTempFile(t *testing.T, suffix string, contents []byte, compress bool) string {
|
||||
t.Helper()
|
||||
|
||||
file, err := ioutil.TempFile("", "influx_writeTest*."+suffix)
|
||||
require.NoError(t, err)
|
||||
defer file.Close()
|
||||
|
||||
var writer io.Writer = file
|
||||
if compress {
|
||||
gzipWriter := gzip.NewWriter(writer)
|
||||
defer gzipWriter.Close()
|
||||
writer = gzipWriter
|
||||
}
|
||||
|
||||
_, err = writer.Write(contents)
|
||||
require.NoError(t, err)
|
||||
|
||||
return file.Name()
|
||||
}
|
||||
|
||||
type mockClient struct {
|
||||
t *testing.T
|
||||
fail bool
|
||||
}
|
||||
|
||||
func (c *mockClient) Do(req *http.Request) (*http.Response, error) {
|
||||
if c.fail {
|
||||
resp := http.Response{StatusCode: 500}
|
||||
return &resp, nil
|
||||
}
|
||||
query := req.URL.Query()
|
||||
resp := http.Response{Header: map[string][]string{}}
|
||||
if contentType := query.Get("Content-Type"); contentType != "" {
|
||||
resp.Header.Set("Content-Type", contentType)
|
||||
}
|
||||
if encoding := query.Get("encoding"); encoding != "" {
|
||||
resp.Header.Set("Content-Encoding", encoding)
|
||||
}
|
||||
compress := query.Get("compress") != ""
|
||||
resp.StatusCode = http.StatusOK
|
||||
if data := query.Get("data"); data != "" {
|
||||
body := bytes.Buffer{}
|
||||
var writer io.Writer = &body
|
||||
if compress {
|
||||
gzw := gzip.NewWriter(writer)
|
||||
defer gzw.Close()
|
||||
writer = gzw
|
||||
}
|
||||
_, err := writer.Write([]byte(data))
|
||||
require.NoError(c.t, err)
|
||||
resp.Body = ioutil.NopCloser(&body)
|
||||
}
|
||||
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
func TestLineReader(t *testing.T) {
|
||||
gzipStdin := func(uncompressed string) io.Reader {
|
||||
contents := &bytes.Buffer{}
|
||||
writer := gzip.NewWriter(contents)
|
||||
_, err := writer.Write([]byte(uncompressed))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, writer.Close())
|
||||
return contents
|
||||
}
|
||||
|
||||
lpContents := "f1 b=f2,c=f3,d=f4"
|
||||
lpFile := createTempFile(t, "txt", []byte(lpContents), false)
|
||||
gzipLpFile := createTempFile(t, "txt.gz", []byte(lpContents), true)
|
||||
gzipLpFileNoExt := createTempFile(t, "lp", []byte(lpContents), true)
|
||||
stdInLpContents := "stdin3 i=stdin1,j=stdin2,k=stdin4"
|
||||
|
||||
csvContents := "_measurement,b,c,d\nf1,f2,f3,f4"
|
||||
csvFile := createTempFile(t, "csv", []byte(csvContents), false)
|
||||
gzipCsvFile := createTempFile(t, "csv.gz", []byte(csvContents), true)
|
||||
gzipCsvFileNoExt := createTempFile(t, "csv", []byte(csvContents), true)
|
||||
stdInCsvContents := "i,j,_measurement,k\nstdin1,stdin2,stdin3,stdin4"
|
||||
|
||||
defer func() {
|
||||
for _, f := range []string{lpFile, gzipLpFile, gzipLpFileNoExt, csvFile, gzipCsvFile, gzipCsvFileNoExt} {
|
||||
_ = os.Remove(f)
|
||||
}
|
||||
}()
|
||||
|
||||
var tests = []struct {
|
||||
name string
|
||||
// input
|
||||
args []string
|
||||
files []string
|
||||
urls []string
|
||||
format linereader.InputFormat
|
||||
compression linereader.InputCompression
|
||||
encoding string
|
||||
headers []string
|
||||
skipHeader int
|
||||
ignoreDataTypeInColumnName bool
|
||||
stdIn io.Reader
|
||||
// output
|
||||
firstLineCorrection int // 0 unless shifted by prepended headers or skipped rows
|
||||
lines []string
|
||||
}{
|
||||
{
|
||||
name: "read data from LP file",
|
||||
files: []string{lpFile},
|
||||
firstLineCorrection: 0,
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read data from LP file using non-UTF encoding",
|
||||
files: []string{lpFile},
|
||||
encoding: "ISO_8859-1",
|
||||
firstLineCorrection: 0,
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed LP data from file",
|
||||
files: []string{gzipLpFileNoExt},
|
||||
compression: linereader.InputCompressionGZIP,
|
||||
firstLineCorrection: 0,
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed data from LP file using non-UTF encoding",
|
||||
files: []string{gzipLpFileNoExt},
|
||||
compression: linereader.InputCompressionGZIP,
|
||||
encoding: "ISO_8859-1",
|
||||
firstLineCorrection: 0,
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed LP data from file ending in .gz",
|
||||
files: []string{gzipLpFile},
|
||||
firstLineCorrection: 0,
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed and uncompressed LP data from file in the same call",
|
||||
files: []string{gzipLpFile, lpFile},
|
||||
firstLineCorrection: 0,
|
||||
lines: []string{
|
||||
lpContents,
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read LP data from stdin",
|
||||
stdIn: strings.NewReader(stdInLpContents),
|
||||
lines: []string{
|
||||
stdInLpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed LP data from stdin",
|
||||
compression: linereader.InputCompressionGZIP,
|
||||
stdIn: gzipStdin(stdInLpContents),
|
||||
lines: []string{
|
||||
stdInLpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read LP data from stdin using '-' argument",
|
||||
args: []string{"-"},
|
||||
stdIn: strings.NewReader(stdInLpContents),
|
||||
lines: []string{
|
||||
stdInLpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed LP data from stdin using '-' argument",
|
||||
compression: linereader.InputCompressionGZIP,
|
||||
args: []string{"-"},
|
||||
stdIn: gzipStdin(stdInLpContents),
|
||||
lines: []string{
|
||||
stdInLpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read LP data from 1st argument",
|
||||
args: []string{stdInLpContents},
|
||||
lines: []string{
|
||||
stdInLpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read LP data from URL",
|
||||
urls: []string{fmt.Sprintf("/a?data=%s", url.QueryEscape(lpContents))},
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed LP data from URL",
|
||||
urls: []string{fmt.Sprintf("/a?data=%s&compress=true", url.QueryEscape(lpContents))},
|
||||
compression: linereader.InputCompressionGZIP,
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed LP data from URL ending in .gz",
|
||||
urls: []string{fmt.Sprintf("/a.gz?data=%s&compress=true", url.QueryEscape(lpContents))},
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed LP data from URL with gzip encoding",
|
||||
urls: []string{fmt.Sprintf("/a?data=%s&compress=true&encoding=gzip", url.QueryEscape(lpContents))},
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read data from CSV file + transform to line protocol",
|
||||
files: []string{csvFile},
|
||||
firstLineCorrection: 0, // no changes
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed CSV data from file + transform to line protocol",
|
||||
files: []string{gzipCsvFileNoExt},
|
||||
compression: linereader.InputCompressionGZIP,
|
||||
firstLineCorrection: 0,
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed CSV data from file ending in .csv.gz + transform to line protocol",
|
||||
files: []string{gzipCsvFile},
|
||||
firstLineCorrection: 0,
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read CSV data from --header and --file + transform to line protocol",
|
||||
headers: []string{"x,_measurement,y,z"},
|
||||
files: []string{csvFile},
|
||||
firstLineCorrection: -1, // shifted back by header line
|
||||
lines: []string{
|
||||
"b x=_measurement,y=c,z=d",
|
||||
"f2 x=f1,y=f3,z=f4",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read CSV data from --header and @file argument with 1st row in file skipped + transform to line protocol",
|
||||
headers: []string{"x,_measurement,y,z"},
|
||||
skipHeader: 1,
|
||||
args: []string{"@" + csvFile},
|
||||
firstLineCorrection: 0, // shifted (-1) back by header line, forward (+1) by skipHeader
|
||||
lines: []string{
|
||||
"f2 x=f1,y=f3,z=f4",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read CSV data from stdin + transform to line protocol",
|
||||
format: linereader.InputFormatCSV,
|
||||
stdIn: strings.NewReader(stdInCsvContents),
|
||||
lines: []string{
|
||||
stdInLpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed CSV data from stdin + transform to line protocol",
|
||||
format: linereader.InputFormatCSV,
|
||||
compression: linereader.InputCompressionGZIP,
|
||||
stdIn: gzipStdin(stdInCsvContents),
|
||||
lines: []string{
|
||||
stdInLpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read CSV data from stdin using '-' argument + transform to line protocol",
|
||||
format: linereader.InputFormatCSV,
|
||||
args: []string{"-"},
|
||||
stdIn: strings.NewReader(stdInCsvContents),
|
||||
lines: []string{
|
||||
stdInLpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed CSV data from stdin using '-' argument + transform to line protocol",
|
||||
format: linereader.InputFormatCSV,
|
||||
compression: linereader.InputCompressionGZIP,
|
||||
args: []string{"-"},
|
||||
stdIn: gzipStdin(stdInCsvContents),
|
||||
lines: []string{
|
||||
stdInLpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read CSV data from 1st argument + transform to line protocol",
|
||||
format: linereader.InputFormatCSV,
|
||||
args: []string{stdInCsvContents},
|
||||
lines: []string{
|
||||
stdInLpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read data from .csv URL + transform to line protocol",
|
||||
urls: []string{fmt.Sprintf("/a.csv?data=%s", url.QueryEscape(csvContents))},
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed CSV data from URL + transform to line protocol",
|
||||
urls: []string{fmt.Sprintf("/a.csv?data=%s&compress=true", url.QueryEscape(csvContents))},
|
||||
compression: linereader.InputCompressionGZIP,
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed CSV data from URL ending in .csv.gz + transform to line protocol",
|
||||
urls: []string{fmt.Sprintf("/a.csv.gz?data=%s&compress=true", url.QueryEscape(csvContents))},
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed CSV data from URL with gzip encoding + transform to line protocol",
|
||||
urls: []string{fmt.Sprintf("/a.csv?data=%s&compress=true&encoding=gzip", url.QueryEscape(csvContents))},
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read data from .csv URL + change header line + transform to line protocol",
|
||||
urls: []string{fmt.Sprintf("/a.csv?data=%s", url.QueryEscape(csvContents))},
|
||||
headers: []string{"k,j,_measurement,i"},
|
||||
skipHeader: 1,
|
||||
lines: []string{
|
||||
"f3 k=f1,j=f2,i=f4",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read data from URL with text/csv Content-Type + transform to line protocol",
|
||||
urls: []string{fmt.Sprintf("/a?Content-Type=text/csv&data=%s", url.QueryEscape(csvContents))},
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read compressed data from URL with text/csv Content-Type and gzip Content-Encoding + transform to line protocol",
|
||||
urls: []string{fmt.Sprintf("/a?Content-Type=text/csv&data=%s&compress=true&encoding=gzip", url.QueryEscape(csvContents))},
|
||||
lines: []string{
|
||||
lpContents,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
r := &linereader.MultiInputLineReader{
|
||||
StdIn: test.stdIn,
|
||||
HttpClient: &mockClient{t: t},
|
||||
Args: test.args,
|
||||
Files: test.files,
|
||||
URLs: test.urls,
|
||||
Format: test.format,
|
||||
Compression: test.compression,
|
||||
Headers: test.headers,
|
||||
SkipHeader: test.skipHeader,
|
||||
IgnoreDataTypeInColumnName: test.ignoreDataTypeInColumnName,
|
||||
}
|
||||
reader, closer, err := r.Open(context.Background())
|
||||
require.NotNil(t, closer)
|
||||
defer closer.Close()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, reader)
|
||||
lines := readLines(reader)
|
||||
require.Equal(t, test.lines, lines)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLineReaderErrors(t *testing.T) {
|
||||
csvFile1 := createTempFile(t, "csv", []byte("_measurement,b,c,d\nf1,f2,f3,f4"), false)
|
||||
defer os.Remove(csvFile1)
|
||||
|
||||
var tests = []struct {
|
||||
name string
|
||||
encoding string
|
||||
files []string
|
||||
urls []string
|
||||
message string
|
||||
}{
|
||||
{
|
||||
name: "unsupported encoding",
|
||||
encoding: "green",
|
||||
message: "https://www.iana.org/assignments/character-sets/character-sets.xhtml", // hint to available values
|
||||
},
|
||||
{
|
||||
name: "file not found",
|
||||
files: []string{csvFile1 + "x"},
|
||||
message: csvFile1,
|
||||
},
|
||||
{
|
||||
name: "unsupported URL",
|
||||
urls: []string{"wit://whatever"},
|
||||
message: "wit://whatever",
|
||||
},
|
||||
{
|
||||
name: "invalid URL",
|
||||
urls: []string{"http://test%zy"}, // 2 hex digits after % expected
|
||||
message: "http://test%zy",
|
||||
},
|
||||
{
|
||||
name: "URL with 500 status code",
|
||||
urls: []string{"/test"},
|
||||
message: "/test",
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
r := linereader.MultiInputLineReader{
|
||||
HttpClient: &mockClient{t: t, fail: true},
|
||||
Files: test.files,
|
||||
URLs: test.urls,
|
||||
Encoding: test.encoding,
|
||||
}
|
||||
_, closer, err := r.Open(context.Background())
|
||||
require.NotNil(t, closer)
|
||||
defer closer.Close()
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), test.message)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLineReaderErrorOut(t *testing.T) {
|
||||
stdInContents := "_measurement,a|long:strict\nm,1\nm,1.1"
|
||||
errorOut := bytes.Buffer{}
|
||||
|
||||
r := linereader.MultiInputLineReader{
|
||||
StdIn: strings.NewReader(stdInContents),
|
||||
ErrorOut: &errorOut,
|
||||
Format: linereader.InputFormatCSV,
|
||||
}
|
||||
reader, closer, err := r.Open(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer closer.Close()
|
||||
|
||||
out := bytes.Buffer{}
|
||||
_, err = io.Copy(&out, reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, "m a=1i", strings.Trim(out.String(), "\n"))
|
||||
errorLines := errorOut.String()
|
||||
require.Equal(t, "# error : line 3: column 'a': '1.1' cannot fit into long data type\nm,1.1", strings.Trim(errorLines, "\n"))
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user