1603 lines
44 KiB
Go
1603 lines
44 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package mydump_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/csv"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"unicode/utf8"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/tidb/pkg/lightning/config"
|
|
"github.com/pingcap/tidb/pkg/lightning/log"
|
|
"github.com/pingcap/tidb/pkg/lightning/mydump"
|
|
"github.com/pingcap/tidb/pkg/lightning/worker"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var ioWorkersForCSV = worker.NewPool(context.Background(), 5, "test_csv")
|
|
|
|
func assertPosEqual(t *testing.T, parser mydump.Parser, expectPos, expectRowID int64) {
|
|
pos, rowID := parser.Pos()
|
|
require.Equal(t, expectPos, pos)
|
|
require.Equal(t, expectRowID, rowID)
|
|
}
|
|
|
|
var nullDatum types.Datum
|
|
|
|
type testCase struct {
|
|
input string
|
|
expected [][]types.Datum
|
|
}
|
|
|
|
func runTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []testCase) {
|
|
for _, tc := range cases {
|
|
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
|
|
assert.NoError(t, err)
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc.input), blockBufSize, ioWorkersForCSV, false, charsetConvertor)
|
|
assert.NoError(t, err)
|
|
for i, row := range tc.expected {
|
|
comment := fmt.Sprintf("input = %q, row = %d", tc.input, i+1)
|
|
e := parser.ReadRow()
|
|
assert.NoErrorf(t, e, "input = %q, row = %d, error = %s", tc.input, i+1, errors.ErrorStack(e))
|
|
assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment)
|
|
assert.Equal(t, row, parser.LastRow().Row, comment)
|
|
}
|
|
assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, "input = %q", tc.input)
|
|
}
|
|
}
|
|
|
|
func runFailingTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []string) {
|
|
for _, tc := range cases {
|
|
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
|
|
assert.NoError(t, err)
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc), blockBufSize, ioWorkersForCSV, false, charsetConvertor)
|
|
require.NoError(t, err)
|
|
e := parser.ReadRow()
|
|
assert.Regexpf(t, "syntax error.*", e.Error(), "input = %q / %s", tc, errors.ErrorStack(e))
|
|
}
|
|
}
|
|
|
|
func tpchDatums() [][]types.Datum {
|
|
datums := make([][]types.Datum, 0, 3)
|
|
datums = append(datums, []types.Datum{
|
|
types.NewStringDatum("1"),
|
|
types.NewStringDatum("goldenrod lavender spring chocolate lace"),
|
|
types.NewStringDatum("Manufacturer#1"),
|
|
types.NewStringDatum("Brand#13"),
|
|
types.NewStringDatum("PROMO BURNISHED COPPER"),
|
|
types.NewStringDatum("7"),
|
|
types.NewStringDatum("JUMBO PKG"),
|
|
types.NewStringDatum("901.00"),
|
|
types.NewStringDatum("ly. slyly ironi"),
|
|
})
|
|
datums = append(datums, []types.Datum{
|
|
types.NewStringDatum("2"),
|
|
types.NewStringDatum("blush thistle blue yellow saddle"),
|
|
types.NewStringDatum("Manufacturer#1"),
|
|
types.NewStringDatum("Brand#13"),
|
|
types.NewStringDatum("LARGE BRUSHED BRASS"),
|
|
types.NewStringDatum("1"),
|
|
types.NewStringDatum("LG CASE"),
|
|
types.NewStringDatum("902.00"),
|
|
types.NewStringDatum("lar accounts amo"),
|
|
})
|
|
datums = append(datums, []types.Datum{
|
|
types.NewStringDatum("3"),
|
|
types.NewStringDatum("spring green yellow purple cornsilk"),
|
|
types.NewStringDatum("Manufacturer#4"),
|
|
types.NewStringDatum("Brand#42"),
|
|
types.NewStringDatum("STANDARD POLISHED BRASS"),
|
|
types.NewStringDatum("21"),
|
|
types.NewStringDatum("WRAP CASE"),
|
|
types.NewStringDatum("903.00"),
|
|
types.NewStringDatum("egular deposits hag"),
|
|
})
|
|
|
|
return datums
|
|
}
|
|
|
|
func datumsToString(datums [][]types.Datum, delimitor string, quote string, lastSep bool) string {
|
|
var b strings.Builder
|
|
doubleQuote := quote + quote
|
|
for _, ds := range datums {
|
|
for i, d := range ds {
|
|
text := d.GetString()
|
|
if len(quote) > 0 {
|
|
b.WriteString(quote)
|
|
b.WriteString(strings.ReplaceAll(text, quote, doubleQuote))
|
|
b.WriteString(quote)
|
|
} else {
|
|
b.WriteString(text)
|
|
}
|
|
if lastSep || i < len(ds)-1 {
|
|
b.WriteString(delimitor)
|
|
}
|
|
}
|
|
b.WriteString("\r\n")
|
|
}
|
|
return b.String()
|
|
}
|
|
|
|
func TestTPCH(t *testing.T) {
|
|
datums := tpchDatums()
|
|
input := datumsToString(datums, "|", "", true)
|
|
reader := mydump.NewStringReader(input)
|
|
|
|
cfg := config.CSVConfig{
|
|
FieldsTerminatedBy: "|",
|
|
FieldsEnclosedBy: "",
|
|
TrimLastEmptyField: true,
|
|
}
|
|
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, reader, int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: datums[0],
|
|
Length: 116,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 126, 1)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 2,
|
|
Row: datums[1],
|
|
Length: 104,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 241, 2)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 3,
|
|
Row: datums[2],
|
|
Length: 117,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 369, 3)
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
}
|
|
|
|
func TestTPCHMultiBytes(t *testing.T) {
|
|
datums := tpchDatums()
|
|
sepsAndQuotes := [][2]string{
|
|
{",", ""},
|
|
{"\000", ""},
|
|
{",", ""},
|
|
{"🤔", ""},
|
|
{",", "。"},
|
|
{"||", ""},
|
|
{"|+|", ""},
|
|
{"##", ""},
|
|
{",", "'"},
|
|
{",", `"`},
|
|
{"🤔", `''`},
|
|
{"🤔", `"'`},
|
|
{"🤔", `"'`},
|
|
{"🤔", "🌚"}, // this two emoji have same prefix bytes
|
|
{"##", "#-"},
|
|
{"\\s", "\\q"},
|
|
{",", "1"},
|
|
{",", "ac"},
|
|
}
|
|
for _, SepAndQuote := range sepsAndQuotes {
|
|
inputStr := datumsToString(datums, SepAndQuote[0], SepAndQuote[1], false)
|
|
|
|
// extract all index in the middle of '\r\n' from the inputStr.
|
|
// they indicate where the parser stops after reading one row.
|
|
// should be equals to the number of datums.
|
|
var allExpectedParserPos []int
|
|
for {
|
|
last := 0
|
|
if len(allExpectedParserPos) > 0 {
|
|
last = allExpectedParserPos[len(allExpectedParserPos)-1]
|
|
}
|
|
pos := strings.IndexByte(inputStr[last:], '\r')
|
|
if pos < 0 {
|
|
break
|
|
}
|
|
allExpectedParserPos = append(allExpectedParserPos, last+pos+1)
|
|
}
|
|
require.Len(t, allExpectedParserPos, len(datums))
|
|
|
|
cfg := config.CSVConfig{
|
|
FieldsTerminatedBy: SepAndQuote[0],
|
|
FieldsEnclosedBy: SepAndQuote[1],
|
|
TrimLastEmptyField: false,
|
|
}
|
|
|
|
reader := mydump.NewStringReader(inputStr)
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, reader, int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
for i, expectedParserPos := range allExpectedParserPos {
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, int64(i+1), parser.LastRow().RowID)
|
|
require.Equal(t, datums[i], parser.LastRow().Row)
|
|
assertPosEqual(t, parser, int64(expectedParserPos), int64(i+1))
|
|
}
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
}
|
|
}
|
|
|
|
func TestRFC4180(t *testing.T) {
|
|
cfg := config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
}
|
|
|
|
// example 1, trailing new lines
|
|
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx\n"), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("aaa"),
|
|
types.NewStringDatum("bbb"),
|
|
types.NewStringDatum("ccc"),
|
|
},
|
|
Length: 9,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 12, 1)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 2,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("zzz"),
|
|
types.NewStringDatum("yyy"),
|
|
types.NewStringDatum("xxx"),
|
|
},
|
|
Length: 9,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 24, 2)
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
|
|
// example 2, no trailing new lines
|
|
|
|
parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx"), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("aaa"),
|
|
types.NewStringDatum("bbb"),
|
|
types.NewStringDatum("ccc"),
|
|
},
|
|
Length: 9,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 12, 1)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 2,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("zzz"),
|
|
types.NewStringDatum("yyy"),
|
|
types.NewStringDatum("xxx"),
|
|
},
|
|
Length: 9,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 23, 2)
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
|
|
// example 5, quoted fields
|
|
|
|
parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"aaa","bbb","ccc"`+"\nzzz,yyy,xxx"), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("aaa"),
|
|
types.NewStringDatum("bbb"),
|
|
types.NewStringDatum("ccc"),
|
|
},
|
|
Length: 9,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 18, 1)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 2,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("zzz"),
|
|
types.NewStringDatum("yyy"),
|
|
types.NewStringDatum("xxx"),
|
|
},
|
|
Length: 9,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 29, 2)
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
|
|
// example 6, line breaks within fields
|
|
|
|
parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"aaa","b
|
|
bb","ccc"
|
|
zzz,yyy,xxx`), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("aaa"),
|
|
types.NewStringDatum("b\nbb"),
|
|
types.NewStringDatum("ccc"),
|
|
},
|
|
Length: 10,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 19, 1)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 2,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("zzz"),
|
|
types.NewStringDatum("yyy"),
|
|
types.NewStringDatum("xxx"),
|
|
},
|
|
Length: 9,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 30, 2)
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
|
|
// example 7, quote escaping
|
|
|
|
parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"aaa","b""bb","ccc"`), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("aaa"),
|
|
types.NewStringDatum("b\"bb"),
|
|
types.NewStringDatum("ccc"),
|
|
},
|
|
Length: 10,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 19, 1)
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
}
|
|
|
|
func TestMySQL(t *testing.T) {
|
|
cfg := config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
LinesTerminatedBy: "\n",
|
|
FieldsEscapedBy: `\`,
|
|
NotNull: false,
|
|
FieldNullDefinedBy: []string{`\N`},
|
|
}
|
|
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"\"","\\","\?"
|
|
"\
|
|
",\N,\\N`), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum(`"`),
|
|
types.NewStringDatum(`\`),
|
|
types.NewStringDatum("?"),
|
|
},
|
|
Length: 6,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 15, 1)
|
|
|
|
require.NoError(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 2,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("\n"),
|
|
nullDatum,
|
|
types.NewStringDatum(`\N`),
|
|
},
|
|
Length: 7,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 26, 2)
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
|
|
parser, err = mydump.NewCSVParser(
|
|
context.Background(), &cfg,
|
|
mydump.NewStringReader(`"\0\b\n\r\t\Z\\\ \c\'\""`),
|
|
int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum(string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})),
|
|
},
|
|
Length: 23,
|
|
}, parser.LastRow())
|
|
|
|
cfg.UnescapedQuote = true
|
|
parser, err = mydump.NewCSVParser(
|
|
context.Background(), &cfg,
|
|
mydump.NewStringReader(`3,"a string containing a " quote",102.20
|
|
`),
|
|
int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("3"),
|
|
types.NewStringDatum(`a string containing a " quote`),
|
|
types.NewStringDatum("102.20"),
|
|
},
|
|
Length: 36,
|
|
}, parser.LastRow())
|
|
|
|
parser, err = mydump.NewCSVParser(
|
|
context.Background(), &cfg,
|
|
mydump.NewStringReader(`3,"a string containing a " quote","102.20"`),
|
|
int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("3"),
|
|
types.NewStringDatum(`a string containing a " quote`),
|
|
types.NewStringDatum("102.20"),
|
|
},
|
|
Length: 36,
|
|
}, parser.LastRow())
|
|
|
|
parser, err = mydump.NewCSVParser(
|
|
context.Background(), &cfg,
|
|
mydump.NewStringReader(`"a"b",c"d"e`),
|
|
int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum(`a"b`),
|
|
types.NewStringDatum(`c"d"e`),
|
|
},
|
|
Length: 8,
|
|
}, parser.LastRow())
|
|
}
|
|
|
|
func TestCustomEscapeChar(t *testing.T) {
|
|
cfg := config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
FieldsEscapedBy: `!`,
|
|
NotNull: false,
|
|
FieldNullDefinedBy: []string{`!N`},
|
|
}
|
|
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`"!"","!!","!\"
|
|
"!
|
|
",!N,!!N`), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum(`"`),
|
|
types.NewStringDatum(`!`),
|
|
types.NewStringDatum(`\`),
|
|
},
|
|
Length: 6,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 15, 1)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 2,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("\n"),
|
|
nullDatum,
|
|
types.NewStringDatum(`!N`),
|
|
},
|
|
Length: 7,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 26, 2)
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
|
|
cfg = config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
FieldsEscapedBy: ``,
|
|
NotNull: false,
|
|
FieldNullDefinedBy: []string{`NULL`},
|
|
}
|
|
|
|
parser, err = mydump.NewCSVParser(
|
|
context.Background(), &cfg,
|
|
mydump.NewStringReader(`"{""itemRangeType"":0,""itemContainType"":0,""shopRangeType"":1,""shopJson"":""[{\""id\"":\""A1234\"",\""shopName\"":\""AAAAAA\""}]""}"`),
|
|
int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum(`{"itemRangeType":0,"itemContainType":0,"shopRangeType":1,"shopJson":"[{\"id\":\"A1234\",\"shopName\":\"AAAAAA\"}]"}`),
|
|
},
|
|
Length: 115,
|
|
}, parser.LastRow())
|
|
}
|
|
|
|
func TestSyntaxErrorCSV(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
FieldsEscapedBy: `\`,
|
|
},
|
|
}
|
|
|
|
inputs := []string{
|
|
`"???`,
|
|
`\`,
|
|
`"\`,
|
|
`0"`,
|
|
`0\`,
|
|
"\"\v",
|
|
`"""`,
|
|
"\"\r",
|
|
"\"\x01",
|
|
}
|
|
|
|
runFailingTestCasesCSV(t, &cfg, int64(config.ReadBlockSize), inputs)
|
|
|
|
cfg.CSV.FieldsEscapedBy = ""
|
|
runFailingTestCasesCSV(t, &cfg, int64(config.ReadBlockSize), []string{`"\`})
|
|
}
|
|
|
|
func TestTSV(t *testing.T) {
|
|
cfg := config.CSVConfig{
|
|
FieldsTerminatedBy: "\t",
|
|
FieldsEnclosedBy: "",
|
|
BackslashEscape: false,
|
|
NotNull: false,
|
|
FieldNullDefinedBy: []string{""},
|
|
Header: true,
|
|
HeaderSchemaMatch: true,
|
|
}
|
|
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`a b c d e f
|
|
0 foo 0000-00-00
|
|
0 foo 0000-00-00
|
|
0 abc def ghi bar 1999-12-31`), int64(config.ReadBlockSize), ioWorkersForCSV, true, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("0"),
|
|
nullDatum,
|
|
nullDatum,
|
|
nullDatum,
|
|
types.NewStringDatum("foo"),
|
|
types.NewStringDatum("0000-00-00"),
|
|
},
|
|
Length: 14,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 32, 1)
|
|
require.Equal(t, []string{"a", "b", "c", "d", "e", "f"}, parser.Columns())
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 2,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("0"),
|
|
nullDatum,
|
|
nullDatum,
|
|
nullDatum,
|
|
types.NewStringDatum("foo"),
|
|
types.NewStringDatum("0000-00-00"),
|
|
},
|
|
Length: 14,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 52, 2)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 3,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("0"),
|
|
types.NewStringDatum("abc"),
|
|
types.NewStringDatum("def"),
|
|
types.NewStringDatum("ghi"),
|
|
types.NewStringDatum("bar"),
|
|
types.NewStringDatum("1999-12-31"),
|
|
},
|
|
Length: 23,
|
|
}, parser.LastRow())
|
|
assertPosEqual(t, parser, 80, 3)
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
}
|
|
|
|
func TestCsvWithWhiteSpaceLine(t *testing.T) {
|
|
cfg := config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
FieldNullDefinedBy: []string{""},
|
|
}
|
|
data := " \r\n\r\n0,,abc\r\n \r\n123,1999-12-31,test\r\n"
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("0"),
|
|
nullDatum,
|
|
types.NewStringDatum("abc"),
|
|
},
|
|
Length: 4,
|
|
}, parser.LastRow())
|
|
|
|
assertPosEqual(t, parser, 12, 1)
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 2,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("123"),
|
|
types.NewStringDatum("1999-12-31"),
|
|
types.NewStringDatum("test"),
|
|
},
|
|
Length: 17,
|
|
}, parser.LastRow())
|
|
require.Nil(t, parser.Close())
|
|
|
|
cfg.Header = true
|
|
cfg.HeaderSchemaMatch = true
|
|
data = " \r\na,b,c\r\n0,,abc\r\n"
|
|
parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), ioWorkersForCSV, true, nil)
|
|
require.NoError(t, err)
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, []string{"a", "b", "c"}, parser.Columns())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum("0"),
|
|
nullDatum,
|
|
types.NewStringDatum("abc"),
|
|
},
|
|
Length: 4,
|
|
}, parser.LastRow())
|
|
|
|
assertPosEqual(t, parser, 17, 1)
|
|
require.Nil(t, parser.Close())
|
|
}
|
|
|
|
func TestEmpty(t *testing.T) {
|
|
cfg := config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
}
|
|
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
|
|
// Try again with headers.
|
|
|
|
cfg.Header = true
|
|
cfg.HeaderSchemaMatch = true
|
|
|
|
parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), ioWorkersForCSV, true, nil)
|
|
require.NoError(t, err)
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
|
|
parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("h\n"), int64(config.ReadBlockSize), ioWorkersForCSV, true, nil)
|
|
require.NoError(t, err)
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
}
|
|
|
|
func TestCRLF(t *testing.T) {
|
|
cfg := config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
}
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader("a\rb\r\nc\n\n\n\nd"), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{types.NewStringDatum("a")},
|
|
Length: 1,
|
|
}, parser.LastRow())
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 2,
|
|
Row: []types.Datum{types.NewStringDatum("b")},
|
|
Length: 1,
|
|
}, parser.LastRow())
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 3,
|
|
Row: []types.Datum{types.NewStringDatum("c")},
|
|
Length: 1,
|
|
}, parser.LastRow())
|
|
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 4,
|
|
Row: []types.Datum{types.NewStringDatum("d")},
|
|
Length: 1,
|
|
}, parser.LastRow())
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
}
|
|
|
|
func TestQuotedSeparator(t *testing.T) {
|
|
cfg := config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
}
|
|
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`",",','`), int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Equal(t, mydump.Row{
|
|
RowID: 1,
|
|
Row: []types.Datum{
|
|
types.NewStringDatum(","),
|
|
types.NewStringDatum("'"),
|
|
types.NewStringDatum("'"),
|
|
},
|
|
Length: 3,
|
|
}, parser.LastRow())
|
|
|
|
require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)
|
|
}
|
|
|
|
func TestConsecutiveFields(t *testing.T) {
|
|
// Note: the behavior of reading `"xxx"yyy` here is undefined in RFC 4180.
|
|
// Python's CSV module returns `xxxyyy`.
|
|
// Rust's CSV package returns `xxxyyy`.
|
|
// Go's CSV package returns a parse error.
|
|
// NPM's CSV package returns a parse error.
|
|
// MySQL's LOAD DATA statement returns `"xxx"yyy` as-is.
|
|
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
},
|
|
}
|
|
|
|
testCases := []string{
|
|
`"x"?`,
|
|
"\"\"\x01",
|
|
"\"\"\v",
|
|
`abc""`,
|
|
}
|
|
|
|
runFailingTestCasesCSV(t, &cfg, int64(config.ReadBlockSize), testCases)
|
|
|
|
cfg.CSV.FieldsEnclosedBy = "|+|"
|
|
runFailingTestCasesCSV(t, &cfg, int64(config.ReadBlockSize), []string{
|
|
"abc|1|+||+|\r\n",
|
|
})
|
|
}
|
|
|
|
func TestTooLargeRow(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
},
|
|
}
|
|
bak := mydump.LargestEntryLimit
|
|
t.Cleanup(func() {
|
|
mydump.LargestEntryLimit = bak
|
|
})
|
|
mydump.LargestEntryLimit = 1024
|
|
t.Run("too long field", func(t *testing.T) {
|
|
var dataBuf bytes.Buffer
|
|
dataBuf.WriteString("a,b,c,d")
|
|
for range mydump.LargestEntryLimit {
|
|
dataBuf.WriteByte('d')
|
|
}
|
|
require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit)
|
|
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
|
|
require.NoError(t, err)
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor)
|
|
require.NoError(t, err)
|
|
e := parser.ReadRow()
|
|
require.Error(t, e)
|
|
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
|
|
})
|
|
|
|
t.Run("too long field with header", func(t *testing.T) {
|
|
var dataBuf bytes.Buffer
|
|
dataBuf.WriteString("c1,c2,c3,c4\na,b,c,d")
|
|
for range mydump.LargestEntryLimit {
|
|
dataBuf.WriteByte('d')
|
|
}
|
|
require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit)
|
|
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
|
|
require.NoError(t, err)
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkersForCSV, true, charsetConvertor)
|
|
require.NoError(t, err)
|
|
e := parser.ReadRow()
|
|
require.Error(t, e)
|
|
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
|
|
})
|
|
|
|
t.Run("field is short, but whole row too long", func(t *testing.T) {
|
|
var dataBuf bytes.Buffer
|
|
for i := range 16 {
|
|
if i > 0 {
|
|
dataBuf.WriteByte(',')
|
|
}
|
|
for range mydump.LargestEntryLimit / 16 {
|
|
dataBuf.WriteByte('d')
|
|
}
|
|
}
|
|
for range mydump.LargestEntryLimit - dataBuf.Len() + 16 {
|
|
dataBuf.WriteByte('d')
|
|
}
|
|
require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit)
|
|
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
|
|
require.NoError(t, err)
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor)
|
|
require.NoError(t, err)
|
|
e := parser.ReadRow()
|
|
require.Error(t, e)
|
|
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
|
|
})
|
|
}
|
|
|
|
func TestSpecialChars(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{FieldsTerminatedBy: ",", FieldsEnclosedBy: `"`},
|
|
}
|
|
testCases := []testCase{
|
|
{
|
|
input: "\x00",
|
|
expected: [][]types.Datum{{types.NewStringDatum("\x00")}},
|
|
},
|
|
{
|
|
input: `0\`,
|
|
expected: [][]types.Datum{{types.NewStringDatum(`0\`)}},
|
|
},
|
|
{
|
|
input: `\`,
|
|
expected: [][]types.Datum{{types.NewStringDatum(`\`)}},
|
|
},
|
|
{
|
|
input: "0\v",
|
|
expected: [][]types.Datum{{types.NewStringDatum("0\v")}},
|
|
},
|
|
{
|
|
input: "0\x00",
|
|
expected: [][]types.Datum{{types.NewStringDatum("0\x00")}},
|
|
},
|
|
{
|
|
input: "\n\r",
|
|
expected: [][]types.Datum{},
|
|
},
|
|
{
|
|
input: `"""",0`,
|
|
expected: [][]types.Datum{{types.NewStringDatum(`"`), types.NewStringDatum(`0`)}},
|
|
},
|
|
}
|
|
|
|
runTestCasesCSV(t, &cfg, int64(config.ReadBlockSize), testCases)
|
|
}
|
|
|
|
func TestContinuationCSV(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
FieldsEscapedBy: `\`,
|
|
TrimLastEmptyField: true,
|
|
},
|
|
}
|
|
|
|
testCases := []testCase{
|
|
{
|
|
input: `"abcdef",\njklm,nop` + "\r\n" + `"""""","\n",a,`,
|
|
expected: [][]types.Datum{
|
|
{
|
|
types.NewStringDatum("abcdef"),
|
|
types.NewStringDatum("\njklm"),
|
|
types.NewStringDatum("nop"),
|
|
},
|
|
{
|
|
types.NewStringDatum(`""`),
|
|
types.NewStringDatum("\n"),
|
|
types.NewStringDatum("a"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
input: `"VzMXdTXsLbiIqTYQlwPSudocNPKVsAqXgnuvupXEzlxkaFpBtHNDyoVEydoEgdnhsygaNHLpMTdEkpkrkNdzVjCbSoXvUqwoVaca"`,
|
|
expected: [][]types.Datum{{types.NewStringDatum("VzMXdTXsLbiIqTYQlwPSudocNPKVsAqXgnuvupXEzlxkaFpBtHNDyoVEydoEgdnhsygaNHLpMTdEkpkrkNdzVjCbSoXvUqwoVaca")}},
|
|
},
|
|
}
|
|
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
}
|
|
|
|
func TestBackslashAsSep(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: `\`,
|
|
FieldsEnclosedBy: `"`,
|
|
FieldNullDefinedBy: []string{""},
|
|
},
|
|
}
|
|
|
|
testCases := []testCase{
|
|
{
|
|
input: `0\`,
|
|
expected: [][]types.Datum{{types.NewStringDatum("0"), nullDatum}},
|
|
},
|
|
{
|
|
input: `\`,
|
|
expected: [][]types.Datum{{nullDatum, nullDatum}},
|
|
},
|
|
}
|
|
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
|
|
failingInputs := []string{
|
|
`"\`,
|
|
}
|
|
runFailingTestCasesCSV(t, &cfg, 1, failingInputs)
|
|
}
|
|
|
|
func TestBackslashAsDelim(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `\`,
|
|
FieldNullDefinedBy: []string{""},
|
|
},
|
|
}
|
|
|
|
testCases := []testCase{
|
|
{
|
|
input: `\\`,
|
|
expected: [][]types.Datum{{nullDatum}},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
|
|
failingInputs := []string{
|
|
`"\`,
|
|
}
|
|
runFailingTestCasesCSV(t, &cfg, 1, failingInputs)
|
|
|
|
cfg = config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `\`,
|
|
FieldNullDefinedBy: []string{""},
|
|
QuotedNullIsText: true,
|
|
},
|
|
}
|
|
|
|
testCases = []testCase{
|
|
{
|
|
input: `\\`,
|
|
expected: [][]types.Datum{{types.NewStringDatum("")}},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
}
|
|
|
|
// errorReader implements the Reader interface which always returns an error.
|
|
type errorReader struct{}
|
|
|
|
func (*errorReader) Read(p []byte) (int, error) {
|
|
return 0, errors.New("fake read error")
|
|
}
|
|
|
|
func (*errorReader) Seek(offset int64, whence int) (int64, error) {
|
|
return 0, errors.New("fake seek error")
|
|
}
|
|
|
|
func (*errorReader) Close() error {
|
|
return errors.New("fake close error")
|
|
}
|
|
|
|
func TestReadError(t *testing.T) {
|
|
cfg := config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
}
|
|
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, &errorReader{}, int64(config.ReadBlockSize), ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
require.Regexp(t, "fake read error", parser.ReadRow().Error())
|
|
}
|
|
|
|
// TestSyntaxErrorLog checks that a syntax error won't dump huge strings into the log.
|
|
func TestSyntaxErrorLog(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: "\t",
|
|
FieldsEnclosedBy: "'",
|
|
},
|
|
}
|
|
|
|
tc := mydump.NewStringReader("x'" + strings.Repeat("y", 50000))
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, tc, 50000, ioWorkersForCSV, false, nil)
|
|
require.NoError(t, err)
|
|
logger, buffer := log.MakeTestLogger()
|
|
parser.SetLogger(logger)
|
|
require.Regexp(t, "syntax error.*", parser.ReadRow().Error())
|
|
require.Nil(t, logger.Sync())
|
|
|
|
require.Equal(t,
|
|
`{"$lvl":"ERROR","$msg":"syntax error","pos":2,"content":"`+strings.Repeat("y", 256)+`"}`,
|
|
buffer.Stripped(),
|
|
)
|
|
}
|
|
|
|
// TestTrimLastSep checks that set `TrimLastEmptyField` to true trim only the last empty filed.
|
|
func TestTrimLastSep(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
TrimLastEmptyField: true,
|
|
},
|
|
}
|
|
parser, err := mydump.NewCSVParser(
|
|
context.Background(),
|
|
&cfg.CSV,
|
|
mydump.NewStringReader("123,456,789,\r\na,b,,\r\n,,,\r\n\"a\",\"\",\"\",\r\n"),
|
|
int64(config.ReadBlockSize),
|
|
ioWorkersForCSV,
|
|
false,
|
|
nil,
|
|
)
|
|
require.NoError(t, err)
|
|
for range 4 {
|
|
require.Nil(t, parser.ReadRow())
|
|
require.Len(t, parser.LastRow().Row, 3)
|
|
}
|
|
}
|
|
|
|
// TestTerminator checks for customized terminators.
|
|
func TestTerminator(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: "|+|",
|
|
LinesTerminatedBy: "|+|\n",
|
|
},
|
|
}
|
|
|
|
testCases := []testCase{
|
|
{
|
|
input: "5|+|abc\ndef\nghi|+|6|+|\n7|+|xy|+z|+|8|+|\n",
|
|
expected: [][]types.Datum{
|
|
{types.NewStringDatum("5"), types.NewStringDatum("abc\ndef\nghi"), types.NewStringDatum("6")},
|
|
{types.NewStringDatum("7"), types.NewStringDatum("xy|+z"), types.NewStringDatum("8")},
|
|
},
|
|
},
|
|
}
|
|
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
|
|
cfg.CSV.FieldsEnclosedBy = "|+>"
|
|
|
|
testCases = []testCase{
|
|
{
|
|
input: "xyz|+|+>|+|\n|+>|+|\n|+>|+|\r|+|\n",
|
|
expected: [][]types.Datum{
|
|
{types.NewStringDatum("xyz"), types.NewStringDatum("+>")},
|
|
{types.NewStringDatum("|+|\n"), types.NewStringDatum("\r")},
|
|
},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
}
|
|
|
|
func TestReadUntilTerminator(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: "#",
|
|
LinesTerminatedBy: "#\n",
|
|
},
|
|
}
|
|
parser, err := mydump.NewCSVParser(
|
|
context.Background(),
|
|
&cfg.CSV,
|
|
mydump.NewStringReader("xxx1#2#3#4#\n56#78"),
|
|
int64(config.ReadBlockSize),
|
|
ioWorkersForCSV,
|
|
false,
|
|
nil,
|
|
)
|
|
require.NoError(t, err)
|
|
content, idx, err := parser.ReadUntilTerminator()
|
|
require.NoError(t, err)
|
|
require.Equal(t, "xxx1#2#3#4#\n", string(content))
|
|
require.Equal(t, int64(12), idx)
|
|
content, idx, err = parser.ReadUntilTerminator()
|
|
require.ErrorIs(t, err, io.EOF)
|
|
require.Equal(t, "56#78", string(content))
|
|
require.Equal(t, int64(17), idx)
|
|
}
|
|
|
|
func TestNULL(t *testing.T) {
|
|
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html
|
|
// - For the default FIELDS and LINES values, NULL is written as a field value of \N for output, and a field value of \N is read as NULL for input (assuming that the ESCAPED BY character is \).
|
|
// - If FIELDS ENCLOSED BY is not empty, a field containing the literal word NULL as its value is read as a NULL value. This differs from the word NULL enclosed within FIELDS ENCLOSED BY characters, which is read as the string 'NULL'.
|
|
// - If FIELDS ESCAPED BY is empty, NULL is written as the word NULL.
|
|
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
LinesTerminatedBy: "\n",
|
|
FieldNullDefinedBy: []string{`\N`, `NULL`},
|
|
FieldsEscapedBy: `\`,
|
|
QuotedNullIsText: true,
|
|
},
|
|
}
|
|
testCases := []testCase{
|
|
{
|
|
input: `NULL,"NULL"
|
|
\N,"\N"
|
|
\\N,"\\N"`,
|
|
expected: [][]types.Datum{
|
|
{nullDatum, types.NewStringDatum("NULL")},
|
|
{nullDatum, nullDatum},
|
|
{types.NewStringDatum(`\N`), types.NewStringDatum(`\N`)},
|
|
},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
|
|
cfg = config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: ``,
|
|
LinesTerminatedBy: "\n",
|
|
FieldNullDefinedBy: []string{`\N`},
|
|
FieldsEscapedBy: `\`,
|
|
},
|
|
}
|
|
testCases = []testCase{
|
|
{
|
|
input: `NULL,"NULL"
|
|
\N,"\N"
|
|
\\N,"\\N"`,
|
|
expected: [][]types.Datum{
|
|
{types.NewStringDatum("NULL"), types.NewStringDatum(`"NULL"`)},
|
|
{nullDatum, types.NewStringDatum(`"N"`)},
|
|
{types.NewStringDatum(`\N`), types.NewStringDatum(`"\N"`)},
|
|
},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
|
|
cfg = config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: ``,
|
|
LinesTerminatedBy: "\n",
|
|
FieldNullDefinedBy: []string{`\N`},
|
|
FieldsEscapedBy: `\`,
|
|
},
|
|
}
|
|
testCases = []testCase{
|
|
{
|
|
input: `NULL,"NULL"
|
|
\N,"\N"
|
|
\\N,"\\N"`,
|
|
expected: [][]types.Datum{
|
|
{types.NewStringDatum("NULL"), types.NewStringDatum(`"NULL"`)},
|
|
{nullDatum, types.NewStringDatum(`"N"`)},
|
|
{types.NewStringDatum(`\N`), types.NewStringDatum(`"\N"`)},
|
|
},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
|
|
cfg = config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
LinesTerminatedBy: "\n",
|
|
FieldNullDefinedBy: []string{`NULL`},
|
|
FieldsEscapedBy: ``,
|
|
QuotedNullIsText: true,
|
|
},
|
|
}
|
|
testCases = []testCase{
|
|
{
|
|
input: `NULL,"NULL"
|
|
\N,"\N"
|
|
\\N,"\\N"`,
|
|
expected: [][]types.Datum{
|
|
{nullDatum, types.NewStringDatum(`NULL`)},
|
|
{types.NewStringDatum(`\N`), types.NewStringDatum(`\N`)},
|
|
{types.NewStringDatum(`\\N`), types.NewStringDatum(`\\N`)},
|
|
},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
}
|
|
|
|
func TestStartingBy(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
LinesTerminatedBy: "\n",
|
|
LinesStartingBy: "xxx",
|
|
},
|
|
}
|
|
testCases := []testCase{
|
|
{
|
|
input: `xxx"abc",1
|
|
something xxx"def",2
|
|
"ghi",3`,
|
|
expected: [][]types.Datum{
|
|
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
|
|
{types.NewStringDatum("def"), types.NewStringDatum("2")},
|
|
},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
|
|
testCases = []testCase{
|
|
{
|
|
input: `xxxabc,1
|
|
something xxxdef,2
|
|
ghi,3
|
|
"bad syntax"aaa`,
|
|
expected: [][]types.Datum{
|
|
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
|
|
{types.NewStringDatum("def"), types.NewStringDatum("2")},
|
|
},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
|
|
// test that special characters appears before LinesStartingBy, and LinesStartingBy only takes effect after once
|
|
|
|
testCases = []testCase{
|
|
{
|
|
input: `xxx"abc",1
|
|
something xxxdef,2
|
|
"ghi",3
|
|
"yyy"xxx"yyy",4
|
|
"yyy",5,xxxyyy,5
|
|
qwe,zzzxxxyyy,6
|
|
"yyyxxx"yyyxxx",7
|
|
yyy",5,xxxxxx,8
|
|
`,
|
|
expected: [][]types.Datum{
|
|
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
|
|
{types.NewStringDatum("def"), types.NewStringDatum("2")},
|
|
{types.NewStringDatum("yyy"), types.NewStringDatum("4")},
|
|
{types.NewStringDatum("yyy"), types.NewStringDatum("5")},
|
|
{types.NewStringDatum("yyy"), types.NewStringDatum("6")},
|
|
{types.NewStringDatum("yyyxxx"), types.NewStringDatum("7")},
|
|
{types.NewStringDatum("xxx"), types.NewStringDatum("8")},
|
|
},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
|
|
// test LinesStartingBy contains special characters
|
|
|
|
cfg = config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
LinesTerminatedBy: "\n",
|
|
LinesStartingBy: "x,xx",
|
|
},
|
|
}
|
|
testCases = []testCase{
|
|
{
|
|
input: `x,xx"abc",1
|
|
something x,xxdef,2
|
|
"ghi",3
|
|
"yyy"xxx"yyy",4
|
|
"yyy",5,xxxyyy,5
|
|
qwe,zzzxxxyyy,6
|
|
"yyyxxx"yyyxxx",7
|
|
yyy",5,xx,xxxx,8`,
|
|
expected: [][]types.Datum{
|
|
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
|
|
{types.NewStringDatum("def"), types.NewStringDatum("2")},
|
|
{types.NewStringDatum("xx"), types.NewStringDatum("8")},
|
|
},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
|
|
cfg = config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
LinesTerminatedBy: "\n",
|
|
LinesStartingBy: `x"xx`,
|
|
},
|
|
}
|
|
testCases = []testCase{
|
|
{
|
|
input: `x"xx"abc",1
|
|
something x"xxdef,2
|
|
"ghi",3
|
|
"yyy"xxx"yyy",4
|
|
"yyy",5,xxxyyy,5
|
|
qwe,zzzxxxyyy,6
|
|
"yyyxxx"yyyxxx",7
|
|
yyy",5,xx"xxxx,8
|
|
`,
|
|
expected: [][]types.Datum{
|
|
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
|
|
{types.NewStringDatum("def"), types.NewStringDatum("2")},
|
|
{types.NewStringDatum("xx"), types.NewStringDatum("8")},
|
|
},
|
|
},
|
|
}
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
|
|
cfg = config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
LinesTerminatedBy: "\n",
|
|
LinesStartingBy: "x\nxx",
|
|
},
|
|
}
|
|
_, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, nil, 1, ioWorkersForCSV, false, nil)
|
|
require.ErrorContains(t, err, "STARTING BY 'x\nxx' cannot contain LINES TERMINATED BY '\n'")
|
|
}
|
|
|
|
func TestCharsetConversion(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
LinesTerminatedBy: "。\n",
|
|
},
|
|
DataCharacterSet: "gb18030",
|
|
DataInvalidCharReplace: string(utf8.RuneError),
|
|
}
|
|
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
|
|
require.NoError(t, err)
|
|
originalInputPart1 := `不要温驯地走进那个良夜,老年应当在日暮时燃烧咆哮,怒斥,怒斥光明的消逝。
|
|
`
|
|
originalInputPart2 := `虽然智慧的人临终时懂得黑暗有理,因为他们的话没有迸发出闪电,他们也并不温驯地走进那个良夜。
|
|
`
|
|
// Insert an invalid char to test DataInvalidCharReplace.
|
|
rawInput, err := charsetConvertor.Encode(originalInputPart1 + string([]byte{0x99}) + originalInputPart2)
|
|
require.NoError(t, err)
|
|
|
|
testCases := []testCase{
|
|
{
|
|
input: rawInput,
|
|
expected: [][]types.Datum{
|
|
{types.NewStringDatum("不要温驯地走进那个良夜"),
|
|
types.NewStringDatum("老年应当在日暮时燃烧咆哮"),
|
|
types.NewStringDatum("怒斥"),
|
|
types.NewStringDatum("怒斥光明的消逝")},
|
|
{types.NewStringDatum(cfg.DataInvalidCharReplace + "虽然智慧的人临终时懂得黑暗有理"),
|
|
types.NewStringDatum("因为他们的话没有迸发出闪电"),
|
|
types.NewStringDatum("他们也并不温驯地走进那个良夜")},
|
|
},
|
|
},
|
|
}
|
|
|
|
runTestCasesCSV(t, &cfg, 1, testCases)
|
|
}
|
|
|
|
// Run `go test github.com/pingcap/pkg/lightning/mydump -check.b -check.bmem -test.v` to get benchmark result.
|
|
// Please ensure your temporary storage has (c.N / 2) KiB of free space.
|
|
|
|
type benchCSVParserSuite struct {
|
|
csvPath string
|
|
ioWorkers *worker.Pool
|
|
}
|
|
|
|
func newBenchCSVParserSuite(b *testing.B) *benchCSVParserSuite {
|
|
var s benchCSVParserSuite
|
|
s.ioWorkers = worker.NewPool(context.Background(), 5, "bench_csv")
|
|
dir := b.TempDir()
|
|
s.csvPath = filepath.Join(dir, "input.csv")
|
|
file, err := os.Create(s.csvPath)
|
|
require.NoError(b, err)
|
|
defer func() {
|
|
require.NoError(b, file.Close())
|
|
}()
|
|
for i := 0; i < b.N; i++ {
|
|
_, err = file.WriteString("18,1,1,0.3650,GC,BARBARBAR,rw9AOV1AjoI1,50000.00,-10.00,10.00,1,1,djj3Q2XaIPoYVy1FuF,gc80Q2o82Au3C9xv,PYOolSxG3w,DI,265111111,7586538936787184,2020-02-26 20:06:00.193,OE,YCkSPBVqoJ2V5F8zWs87V5XzbaIY70aWCD4dgcB6bjUzCr5wOJCJ2TYH49J7yWyysbudJIxlTAEWSJahY7hswLtTsqyjEkrlsN8iDMAa9Poj29miJ08tnn2G8mL64IlyywvnRGbLbyGvWDdrOSF42RyUFTWVyqlDWc6Gr5wyMPYgvweKemzFDVD3kro5JsmBmJY08EK54nQoyfo2sScyb34zcM9GFo9ZQTwloINfPYQKXQm32m0XvU7jiNmYpFTFJQjdqA825SEvQqMMefG2WG4jVu9UPdhdUjRsFRd0Gw7YPKByOlcuY0eKxT7sAzMKXx2000RR6dqHNXe47oVYd\n")
|
|
require.NoError(b, err)
|
|
}
|
|
return &s
|
|
}
|
|
|
|
func BenchmarkReadRowUsingMydumpCSVParser(b *testing.B) {
|
|
s := newBenchCSVParserSuite(b)
|
|
|
|
file, err := os.Open(s.csvPath)
|
|
require.NoError(b, err)
|
|
defer func() {
|
|
require.NoError(b, file.Close())
|
|
}()
|
|
|
|
cfg := config.CSVConfig{FieldsTerminatedBy: ","}
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg, file, 65536, ioWorkersForCSV, false, nil)
|
|
require.NoError(b, err)
|
|
parser.SetLogger(log.Logger{Logger: zap.NewNop()})
|
|
|
|
rowsCount := 0
|
|
for {
|
|
err := parser.ReadRow()
|
|
if err == nil {
|
|
parser.RecycleRow(parser.LastRow())
|
|
rowsCount++
|
|
continue
|
|
}
|
|
if errors.Cause(err) == io.EOF {
|
|
break
|
|
}
|
|
b.Fatal(err)
|
|
}
|
|
require.Equal(b, b.N, rowsCount)
|
|
}
|
|
|
|
func BenchmarkReadRowUsingEncodingCSV(b *testing.B) {
|
|
s := newBenchCSVParserSuite(b)
|
|
|
|
file, err := os.Open(s.csvPath)
|
|
require.NoError(b, err)
|
|
defer func() {
|
|
require.Nil(b, file.Close())
|
|
}()
|
|
|
|
csvParser := csv.NewReader(file)
|
|
|
|
rowsCount := 0
|
|
var datums []types.Datum
|
|
for {
|
|
records, err := csvParser.Read()
|
|
if err == nil {
|
|
// for fair comparison, we need to include the cost of conversion to Datum.
|
|
for _, record := range records {
|
|
datums = append(datums, types.NewStringDatum(record))
|
|
}
|
|
datums = datums[:0]
|
|
rowsCount++
|
|
continue
|
|
}
|
|
if errors.Cause(err) == io.EOF {
|
|
break
|
|
}
|
|
b.Fatal(err)
|
|
}
|
|
require.Equal(b, b.N, rowsCount)
|
|
}
|
|
|
|
func TestHeaderSchemaMatch(t *testing.T) {
|
|
cfg := config.MydumperRuntime{
|
|
CSV: config.CSVConfig{
|
|
FieldsTerminatedBy: ",",
|
|
FieldsEnclosedBy: `"`,
|
|
},
|
|
}
|
|
|
|
inputData := `id,val1,val2,val3
|
|
1,111,aaa,1.0
|
|
2,222,bbb,2.0
|
|
3,333,ccc,3.0
|
|
4,444,ddd,4.0`
|
|
|
|
parsedDataPart := [][]types.Datum{
|
|
{types.NewStringDatum("1"), types.NewStringDatum("111"), types.NewStringDatum("aaa"), types.NewStringDatum("1.0")},
|
|
{types.NewStringDatum("2"), types.NewStringDatum("222"), types.NewStringDatum("bbb"), types.NewStringDatum("2.0")},
|
|
{types.NewStringDatum("3"), types.NewStringDatum("333"), types.NewStringDatum("ccc"), types.NewStringDatum("3.0")},
|
|
{types.NewStringDatum("4"), types.NewStringDatum("444"), types.NewStringDatum("ddd"), types.NewStringDatum("4.0")},
|
|
}
|
|
|
|
type testCase struct {
|
|
Header bool
|
|
HeaderSchemaMatch bool
|
|
ExpectedData [][]types.Datum
|
|
ExpectedColumns []string
|
|
}
|
|
|
|
for _, tc := range []testCase{
|
|
{
|
|
Header: true,
|
|
HeaderSchemaMatch: true,
|
|
ExpectedData: parsedDataPart,
|
|
ExpectedColumns: []string{"id", "val1", "val2", "val3"},
|
|
},
|
|
{
|
|
Header: true,
|
|
HeaderSchemaMatch: false,
|
|
ExpectedData: parsedDataPart,
|
|
ExpectedColumns: nil,
|
|
},
|
|
{
|
|
Header: false,
|
|
HeaderSchemaMatch: true,
|
|
ExpectedData: append([][]types.Datum{
|
|
{types.NewStringDatum("id"), types.NewStringDatum("val1"), types.NewStringDatum("val2"), types.NewStringDatum("val3")},
|
|
}, parsedDataPart...),
|
|
ExpectedColumns: nil,
|
|
},
|
|
} {
|
|
comment := fmt.Sprintf("header = %v, header-schema-match = %v", tc.Header, tc.HeaderSchemaMatch)
|
|
cfg.CSV.Header = tc.Header
|
|
cfg.CSV.HeaderSchemaMatch = tc.HeaderSchemaMatch
|
|
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
|
|
assert.NoError(t, err)
|
|
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(inputData), int64(config.ReadBlockSize), ioWorkersForCSV, tc.Header, charsetConvertor)
|
|
assert.NoError(t, err)
|
|
for i, row := range tc.ExpectedData {
|
|
comment := fmt.Sprintf("row = %d, header = %v, header-schema-match = %v", i+1, tc.Header, tc.HeaderSchemaMatch)
|
|
e := parser.ReadRow()
|
|
assert.NoErrorf(t, e, "row = %d, error = %s", i+1, errors.ErrorStack(e))
|
|
assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment)
|
|
assert.Equal(t, row, parser.LastRow().Row, comment)
|
|
}
|
|
assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, comment)
|
|
assert.Equal(t, tc.ExpectedColumns, parser.Columns(), comment)
|
|
}
|
|
}
|