737 lines
22 KiB
Go
737 lines
22 KiB
Go
// Copyright 2020 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package mydump
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
"regexp"
|
|
"slices"
|
|
"strings"
|
|
|
|
"github.com/pingcap/errors"
|
|
tidbconfig "github.com/pingcap/tidb/pkg/config"
|
|
"github.com/pingcap/tidb/pkg/lightning/config"
|
|
"github.com/pingcap/tidb/pkg/lightning/log"
|
|
"github.com/pingcap/tidb/pkg/lightning/metric"
|
|
"github.com/pingcap/tidb/pkg/lightning/worker"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
)
|
|
|
|
var (
|
|
errUnterminatedQuotedField = errors.NewNoStackError("syntax error: unterminated quoted field")
|
|
errDanglingBackslash = errors.NewNoStackError("syntax error: no character after backslash")
|
|
errUnexpectedQuoteField = errors.NewNoStackError(
|
|
"syntax error: cannot have consecutive fields without separator")
|
|
// LargestEntryLimit is the max size for reading file to buf
|
|
LargestEntryLimit int
|
|
)
|
|
|
|
func init() {
|
|
LargestEntryLimit = tidbconfig.MaxTxnEntrySizeLimit
|
|
}
|
|
|
|
// CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.
|
|
type CSVParser struct {
|
|
blockParser
|
|
cfg *config.CSVConfig
|
|
|
|
comma []byte
|
|
quote []byte
|
|
newLine []byte
|
|
startingBy []byte
|
|
escapedBy string
|
|
unescapeRegexp *regexp.Regexp
|
|
|
|
charsetConvertor *CharsetConvertor
|
|
// These variables are used with IndexAnyByte to search a byte slice for the
|
|
// first index which some special character may appear.
|
|
// quoteByteSet is used inside quoted fields (so the first characters of
|
|
// the closing delimiter and backslash are special).
|
|
// unquoteByteSet is used outside quoted fields (so the first characters
|
|
// of the opening delimiter, separator, terminator and backslash are
|
|
// special).
|
|
// newLineByteSet is used in strict-format CSV dividing (so the first
|
|
// characters of the terminator are special).
|
|
quoteByteSet byteSet
|
|
unquoteByteSet byteSet
|
|
newLineByteSet byteSet
|
|
|
|
// recordBuffer holds the unescaped fields, one after another.
|
|
// The fields can be accessed by using the indexes in fieldIndexes.
|
|
// E.g., For the row `a,"b","c""d",e`, recordBuffer will contain `abc"de`
|
|
// and fieldIndexes will contain the indexes [1, 2, 5, 6].
|
|
recordBuffer []byte
|
|
|
|
// fieldIndexes is an index of fields inside recordBuffer.
|
|
// The i-th field ends at offset fieldIndexes[i] in recordBuffer.
|
|
fieldIndexes []int
|
|
fieldIsQuoted []bool
|
|
|
|
lastRecord []field
|
|
|
|
escFlavor escapeFlavor
|
|
// if set to true, csv parser will treat the first non-empty line as header line
|
|
shouldParseHeader bool
|
|
// in LOAD DATA, empty line should be treated as a valid record
|
|
allowEmptyLine bool
|
|
quotedNullIsText bool
|
|
unescapedQuote bool
|
|
}
|
|
|
|
type field struct {
|
|
content string
|
|
quoted bool
|
|
}
|
|
|
|
// NewCSVParser creates a CSV parser. The ownership of the reader is transferred
|
|
// to the parser.
|
|
func NewCSVParser(
|
|
ctx context.Context,
|
|
cfg *config.CSVConfig,
|
|
reader ReadSeekCloser,
|
|
blockBufSize int64,
|
|
ioWorkers *worker.Pool,
|
|
shouldParseHeader bool,
|
|
charsetConvertor *CharsetConvertor,
|
|
) (*CSVParser, error) {
|
|
var err error
|
|
var fieldTerminator, delimiter, lineTerminator string
|
|
// Do not do the conversion if the charsetConvertor is nil.
|
|
if charsetConvertor == nil {
|
|
fieldTerminator = cfg.FieldsTerminatedBy
|
|
delimiter = cfg.FieldsEnclosedBy
|
|
lineTerminator = cfg.LinesTerminatedBy
|
|
} else {
|
|
fieldTerminator, delimiter, lineTerminator, err = encodeSpecialSymbols(cfg, charsetConvertor)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
var quoteStopSet, newLineStopSet []byte
|
|
unquoteStopSet := []byte{fieldTerminator[0]}
|
|
if len(cfg.FieldsEnclosedBy) > 0 {
|
|
quoteStopSet = []byte{delimiter[0]}
|
|
unquoteStopSet = append(unquoteStopSet, delimiter[0])
|
|
}
|
|
if len(lineTerminator) > 0 {
|
|
newLineStopSet = []byte{lineTerminator[0]}
|
|
} else {
|
|
// The character set encoding of '\r' and '\n' is the same in UTF-8 and GBK.
|
|
newLineStopSet = []byte{'\r', '\n'}
|
|
}
|
|
unquoteStopSet = append(unquoteStopSet, newLineStopSet...)
|
|
|
|
if len(cfg.LinesStartingBy) > 0 {
|
|
if strings.Contains(cfg.LinesStartingBy, lineTerminator) {
|
|
return nil, errors.Errorf("STARTING BY '%s' cannot contain LINES TERMINATED BY '%s'", cfg.LinesStartingBy, lineTerminator)
|
|
}
|
|
}
|
|
|
|
escFlavor := escapeFlavorNone
|
|
var r *regexp.Regexp
|
|
if len(cfg.FieldsEscapedBy) > 0 {
|
|
escFlavor = escapeFlavorMySQL
|
|
quoteStopSet = append(quoteStopSet, cfg.FieldsEscapedBy[0])
|
|
unquoteStopSet = append(unquoteStopSet, cfg.FieldsEscapedBy[0])
|
|
// we need special treatment of the NULL value \N, used by MySQL.
|
|
if !cfg.NotNull && slices.Contains(cfg.FieldNullDefinedBy, cfg.FieldsEscapedBy+`N`) {
|
|
escFlavor = escapeFlavorMySQLWithNull
|
|
}
|
|
r, err = regexp.Compile(`(?s)` + regexp.QuoteMeta(cfg.FieldsEscapedBy) + `.`)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
metrics, _ := metric.FromContext(ctx)
|
|
return &CSVParser{
|
|
blockParser: makeBlockParser(reader, blockBufSize, ioWorkers, metrics, log.Wrap(logutil.Logger(ctx))),
|
|
cfg: cfg,
|
|
charsetConvertor: charsetConvertor,
|
|
comma: []byte(fieldTerminator),
|
|
quote: []byte(delimiter),
|
|
newLine: []byte(lineTerminator),
|
|
startingBy: []byte(cfg.LinesStartingBy),
|
|
escapedBy: cfg.FieldsEscapedBy,
|
|
unescapeRegexp: r,
|
|
escFlavor: escFlavor,
|
|
quoteByteSet: makeByteSet(quoteStopSet),
|
|
unquoteByteSet: makeByteSet(unquoteStopSet),
|
|
newLineByteSet: makeByteSet(newLineStopSet),
|
|
shouldParseHeader: shouldParseHeader,
|
|
allowEmptyLine: cfg.AllowEmptyLine,
|
|
quotedNullIsText: cfg.QuotedNullIsText,
|
|
unescapedQuote: cfg.UnescapedQuote,
|
|
}, nil
|
|
}
|
|
|
|
// encodeSpecialSymbols will encode the special symbols, e,g, separator, delimiter and terminator
|
|
// with the given charset according to the charset convertor.
|
|
func encodeSpecialSymbols(cfg *config.CSVConfig, cc *CharsetConvertor) (separator, delimiter, terminator string, err error) {
|
|
// FieldsTerminatedBy
|
|
separator, err = cc.Encode(cfg.FieldsTerminatedBy)
|
|
if err != nil {
|
|
return
|
|
}
|
|
// FieldsEnclosedBy
|
|
delimiter, err = cc.Encode(cfg.FieldsEnclosedBy)
|
|
if err != nil {
|
|
return
|
|
}
|
|
// LinesTerminatedBy
|
|
terminator, err = cc.Encode(cfg.LinesTerminatedBy)
|
|
if err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
func (parser *CSVParser) unescapeString(input field) (unescaped string, isNull bool, err error) {
|
|
// Convert the input from another charset to utf8mb4 before we return the string.
|
|
if unescaped, err = parser.charsetConvertor.Decode(input.content); err != nil {
|
|
return
|
|
}
|
|
if parser.escFlavor == escapeFlavorMySQLWithNull && unescaped == parser.escapedBy+`N` {
|
|
return input.content, true, nil
|
|
}
|
|
if len(parser.escapedBy) > 0 {
|
|
unescaped = unescape(unescaped, "", parser.escFlavor, parser.escapedBy[0], parser.unescapeRegexp)
|
|
}
|
|
if !(len(parser.quote) > 0 && parser.quotedNullIsText && input.quoted) {
|
|
// this branch represents "quote is not configured" or "quoted null is null" or "this field has no quote"
|
|
// we check null for them
|
|
isNull = !parser.cfg.NotNull &&
|
|
slices.Contains(parser.cfg.FieldNullDefinedBy, unescaped)
|
|
// avoid \\N becomes NULL
|
|
if parser.escFlavor == escapeFlavorMySQLWithNull && unescaped == parser.escapedBy+`N` {
|
|
isNull = false
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// csvToken is a type representing either a normal byte or some CSV-specific
|
|
// tokens such as the separator (comma), delimiter (quote) and terminator (new
|
|
// line).
|
|
type csvToken int16
|
|
|
|
const (
|
|
// csvTokenAnyUnquoted is a placeholder to represent any unquoted character.
|
|
csvTokenAnyUnquoted csvToken = 0
|
|
// csvTokenEscaped is a mask indicating an escaped character.
|
|
// The actual token is represented like `csvTokenEscaped | 'n'`.
|
|
csvTokenEscaped csvToken = 0x100
|
|
// csvTokenComma is the CSV separator token.
|
|
csvTokenComma csvToken = 0x200
|
|
// csvTokenNewLine is the CSV terminator token.
|
|
csvTokenNewLine csvToken = 0x400
|
|
// csvTokenDelimiter is the CSV delimiter token.
|
|
csvTokenDelimiter csvToken = 0x800
|
|
)
|
|
|
|
func (parser *CSVParser) readByte() (byte, error) {
|
|
if len(parser.buf) == 0 {
|
|
if err := parser.readBlock(); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
if len(parser.buf) == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
b := parser.buf[0]
|
|
parser.buf = parser.buf[1:]
|
|
parser.pos++
|
|
return b, nil
|
|
}
|
|
|
|
func (parser *CSVParser) peekBytes(cnt int) ([]byte, error) {
|
|
if len(parser.buf) < cnt {
|
|
if err := parser.readBlock(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if len(parser.buf) == 0 {
|
|
return nil, io.EOF
|
|
}
|
|
cnt = min(cnt, len(parser.buf))
|
|
return parser.buf[:cnt], nil
|
|
}
|
|
|
|
func (parser *CSVParser) skipBytes(n int) {
|
|
parser.buf = parser.buf[n:]
|
|
parser.pos += int64(n)
|
|
}
|
|
|
|
// tryPeekExact peeks the bytes ahead, and if it matches `content` exactly will
|
|
// return (true, false, nil). If meet EOF it will return (false, true, nil).
|
|
// For other errors it will return (false, false, err).
|
|
func (parser *CSVParser) tryPeekExact(content []byte) (matched bool, eof bool, err error) {
|
|
if len(content) == 0 {
|
|
return true, false, nil
|
|
}
|
|
bs, err := parser.peekBytes(len(content))
|
|
if err == nil {
|
|
if bytes.Equal(bs, content) {
|
|
return true, false, nil
|
|
}
|
|
} else if errors.Cause(err) == io.EOF {
|
|
return false, true, nil
|
|
}
|
|
return false, false, err
|
|
}
|
|
|
|
// tryReadExact peeks the bytes ahead, and if it matches `content` exactly will
|
|
// consume it (advance the cursor) and return `true`.
|
|
func (parser *CSVParser) tryReadExact(content []byte) (bool, error) {
|
|
matched, _, err := parser.tryPeekExact(content)
|
|
if matched {
|
|
parser.skipBytes(len(content))
|
|
}
|
|
return matched, err
|
|
}
|
|
|
|
func (parser *CSVParser) tryReadNewLine(b byte) (bool, error) {
|
|
if len(parser.newLine) == 0 {
|
|
return b == '\r' || b == '\n', nil
|
|
}
|
|
if b != parser.newLine[0] {
|
|
return false, nil
|
|
}
|
|
return parser.tryReadExact(parser.newLine[1:])
|
|
}
|
|
|
|
func (parser *CSVParser) tryReadOpenDelimiter(b byte) (bool, error) {
|
|
if len(parser.quote) == 0 || parser.quote[0] != b {
|
|
return false, nil
|
|
}
|
|
return parser.tryReadExact(parser.quote[1:])
|
|
}
|
|
|
|
// tryReadCloseDelimiter is currently equivalent to tryReadOpenDelimiter until
|
|
// we support asymmetric delimiters.
|
|
func (parser *CSVParser) tryReadCloseDelimiter(b byte) (bool, error) {
|
|
if parser.quote[0] != b {
|
|
return false, nil
|
|
}
|
|
return parser.tryReadExact(parser.quote[1:])
|
|
}
|
|
|
|
func (parser *CSVParser) tryReadComma(b byte) (bool, error) {
|
|
if parser.comma[0] != b {
|
|
return false, nil
|
|
}
|
|
return parser.tryReadExact(parser.comma[1:])
|
|
}
|
|
|
|
func (parser *CSVParser) tryReadEscaped(bs byte) (bool, byte, error) {
|
|
if parser.escapedBy == "" {
|
|
return false, 0, nil
|
|
}
|
|
if bs != parser.escapedBy[0] || parser.escFlavor == escapeFlavorNone {
|
|
return false, 0, nil
|
|
}
|
|
b, err := parser.readByte()
|
|
return true, b, parser.replaceEOF(err, errDanglingBackslash)
|
|
}
|
|
|
|
// readQuoteToken reads a token inside quoted fields.
|
|
func (parser *CSVParser) readQuotedToken(b byte) (csvToken, error) {
|
|
if ok, err := parser.tryReadCloseDelimiter(b); ok || err != nil {
|
|
return csvTokenDelimiter, err
|
|
}
|
|
if ok, eb, err := parser.tryReadEscaped(b); ok || err != nil {
|
|
return csvTokenEscaped | csvToken(eb), err
|
|
}
|
|
return csvToken(b), nil
|
|
}
|
|
|
|
// readUnquoteToken reads a token outside quoted fields.
|
|
func (parser *CSVParser) readUnquoteToken(b byte) (csvToken, error) {
|
|
if ok, err := parser.tryReadNewLine(b); ok || err != nil {
|
|
return csvTokenNewLine, err
|
|
}
|
|
if ok, err := parser.tryReadComma(b); ok || err != nil {
|
|
return csvTokenComma, err
|
|
}
|
|
if ok, err := parser.tryReadOpenDelimiter(b); ok || err != nil {
|
|
return csvTokenDelimiter, err
|
|
}
|
|
if ok, eb, err := parser.tryReadEscaped(b); ok || err != nil {
|
|
return csvTokenEscaped | csvToken(eb), err
|
|
}
|
|
return csvToken(b), nil
|
|
}
|
|
|
|
func (parser *CSVParser) appendCSVTokenToRecordBuffer(token csvToken) {
|
|
if token&csvTokenEscaped != 0 {
|
|
parser.recordBuffer = append(parser.recordBuffer, parser.escapedBy[0])
|
|
}
|
|
parser.recordBuffer = append(parser.recordBuffer, byte(token))
|
|
}
|
|
|
|
// readUntil reads the buffer until any character from the `chars` set is found.
|
|
// that character is excluded from the final buffer.
|
|
func (parser *CSVParser) readUntil(chars *byteSet) ([]byte, byte, error) {
|
|
index := IndexAnyByte(parser.buf, chars)
|
|
if index >= 0 {
|
|
ret := parser.buf[:index]
|
|
parser.buf = parser.buf[index:]
|
|
parser.pos += int64(index)
|
|
return ret, parser.buf[0], nil
|
|
}
|
|
|
|
// not found in parser.buf, need allocate and loop.
|
|
var buf []byte
|
|
for {
|
|
buf = append(buf, parser.buf...)
|
|
if parser.checkRowLen && parser.pos-parser.rowStartPos+int64(len(buf)) > int64(LargestEntryLimit) {
|
|
return buf, 0, errors.New("size of row cannot exceed the max value of txn-entry-size-limit")
|
|
}
|
|
parser.buf = nil
|
|
if err := parser.readBlock(); err != nil || len(parser.buf) == 0 {
|
|
if err == nil {
|
|
err = io.EOF
|
|
}
|
|
parser.pos += int64(len(buf))
|
|
return buf, 0, errors.Trace(err)
|
|
}
|
|
index := IndexAnyByte(parser.buf, chars)
|
|
if index >= 0 {
|
|
buf = append(buf, parser.buf[:index]...)
|
|
parser.buf = parser.buf[index:]
|
|
parser.pos += int64(len(buf))
|
|
return buf, parser.buf[0], nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (parser *CSVParser) readRecord(dst []field) ([]field, error) {
|
|
parser.recordBuffer = parser.recordBuffer[:0]
|
|
parser.fieldIndexes = parser.fieldIndexes[:0]
|
|
parser.fieldIsQuoted = parser.fieldIsQuoted[:0]
|
|
|
|
isEmptyLine := true
|
|
whitespaceLine := true
|
|
foundStartingByThisLine := false
|
|
prevToken := csvTokenNewLine
|
|
fieldIsQuoted := false
|
|
var firstToken csvToken
|
|
|
|
outside:
|
|
for {
|
|
// we should drop
|
|
// 1. the whole line if it does not contain startingBy
|
|
// 2. any character before startingBy
|
|
// since we have checked startingBy does not contain terminator, we can
|
|
// split at terminator to check the substring contains startingBy. Even
|
|
// if the terminator is inside a quoted field which means it's not the
|
|
// end of a line, the substring can still be dropped by rule 2.
|
|
if len(parser.startingBy) > 0 && !foundStartingByThisLine {
|
|
oldPos := parser.pos
|
|
content, _, err := parser.readUntilTerminator()
|
|
if err != nil {
|
|
if !(errors.Cause(err) == io.EOF) {
|
|
return nil, err
|
|
}
|
|
if len(content) == 0 {
|
|
return nil, err
|
|
}
|
|
// if we reached EOF, we should still check the content contains
|
|
// startingBy and try to put back and parse it.
|
|
}
|
|
idx := bytes.Index(content, parser.startingBy)
|
|
if idx == -1 {
|
|
continue
|
|
}
|
|
foundStartingByThisLine = true
|
|
content = content[idx+len(parser.startingBy):]
|
|
parser.buf = append(content, parser.buf...)
|
|
parser.pos = oldPos + int64(idx+len(parser.startingBy))
|
|
}
|
|
|
|
content, firstByte, err := parser.readUntil(&parser.unquoteByteSet)
|
|
|
|
if len(content) > 0 {
|
|
isEmptyLine = false
|
|
if prevToken == csvTokenDelimiter {
|
|
parser.logSyntaxError()
|
|
return nil, errors.AddStack(errUnexpectedQuoteField)
|
|
}
|
|
parser.recordBuffer = append(parser.recordBuffer, content...)
|
|
prevToken = csvTokenAnyUnquoted
|
|
}
|
|
|
|
if err != nil {
|
|
if isEmptyLine || errors.Cause(err) != io.EOF {
|
|
return nil, err
|
|
}
|
|
// treat EOF as the same as trailing \n.
|
|
firstToken = csvTokenNewLine
|
|
} else {
|
|
parser.skipBytes(1)
|
|
firstToken, err = parser.readUnquoteToken(firstByte)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
switch firstToken {
|
|
case csvTokenComma:
|
|
whitespaceLine = false
|
|
parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer))
|
|
parser.fieldIsQuoted = append(parser.fieldIsQuoted, fieldIsQuoted)
|
|
fieldIsQuoted = false
|
|
case csvTokenDelimiter:
|
|
if prevToken != csvTokenComma && prevToken != csvTokenNewLine {
|
|
if parser.unescapedQuote {
|
|
whitespaceLine = false
|
|
parser.recordBuffer = append(parser.recordBuffer, parser.quote...)
|
|
continue
|
|
}
|
|
parser.logSyntaxError()
|
|
return nil, errors.AddStack(errUnexpectedQuoteField)
|
|
}
|
|
if err = parser.readQuotedField(); err != nil {
|
|
return nil, err
|
|
}
|
|
fieldIsQuoted = true
|
|
whitespaceLine = false
|
|
case csvTokenNewLine:
|
|
foundStartingByThisLine = false
|
|
// new line = end of record (ignore empty lines)
|
|
prevToken = firstToken
|
|
if !parser.allowEmptyLine {
|
|
if isEmptyLine {
|
|
continue
|
|
}
|
|
// skip lines only contain whitespaces
|
|
if err == nil && whitespaceLine && len(bytes.TrimSpace(parser.recordBuffer)) == 0 {
|
|
parser.recordBuffer = parser.recordBuffer[:0]
|
|
continue
|
|
}
|
|
}
|
|
parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer))
|
|
parser.fieldIsQuoted = append(parser.fieldIsQuoted, fieldIsQuoted)
|
|
// the loop is end, no need to reset fieldIsQuoted
|
|
break outside
|
|
default:
|
|
if prevToken == csvTokenDelimiter {
|
|
parser.logSyntaxError()
|
|
return nil, errors.AddStack(errUnexpectedQuoteField)
|
|
}
|
|
parser.appendCSVTokenToRecordBuffer(firstToken)
|
|
}
|
|
prevToken = firstToken
|
|
isEmptyLine = false
|
|
}
|
|
// Create a single string and create slices out of it.
|
|
// This pins the memory of the fields together, but allocates once.
|
|
str := string(parser.recordBuffer) // Convert to string once to batch allocations
|
|
dst = dst[:0]
|
|
if cap(dst) < len(parser.fieldIndexes) {
|
|
dst = make([]field, len(parser.fieldIndexes))
|
|
}
|
|
dst = dst[:len(parser.fieldIndexes)]
|
|
var preIdx int
|
|
for i, idx := range parser.fieldIndexes {
|
|
dst[i].content = str[preIdx:idx]
|
|
dst[i].quoted = parser.fieldIsQuoted[i]
|
|
preIdx = idx
|
|
}
|
|
|
|
// Check or update the expected fields per record.
|
|
return dst, nil
|
|
}
|
|
|
|
func (parser *CSVParser) readQuotedField() error {
|
|
for {
|
|
prevPos := parser.pos
|
|
content, terminator, err := parser.readUntil(&parser.quoteByteSet)
|
|
if err != nil {
|
|
if errors.Cause(err) == io.EOF {
|
|
// return the position of quote to the caller.
|
|
// because we return an error here, the parser won't
|
|
// use the `pos` again, so it's safe to modify it here.
|
|
parser.pos = prevPos - 1
|
|
// set buf to parser.buf in order to print err log
|
|
parser.buf = content
|
|
err = parser.replaceEOF(err, errUnterminatedQuotedField)
|
|
}
|
|
return err
|
|
}
|
|
parser.recordBuffer = append(parser.recordBuffer, content...)
|
|
parser.skipBytes(1)
|
|
|
|
token, err := parser.readQuotedToken(terminator)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch token {
|
|
case csvTokenDelimiter:
|
|
// encountered '"' -> continue if we're seeing '""'.
|
|
doubledDelimiter, err := parser.tryReadExact(parser.quote)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if doubledDelimiter {
|
|
// consume the double quotation mark and continue
|
|
parser.recordBuffer = append(parser.recordBuffer, parser.quote...)
|
|
} else if parser.unescapedQuote {
|
|
// allow unescaped quote inside quoted field, so we only finish
|
|
// reading the field when we see a delimiter + comma/newline.
|
|
comma, _, err2 := parser.tryPeekExact(parser.comma)
|
|
if comma || err2 != nil {
|
|
return err2
|
|
}
|
|
newline, eof, err2 := parser.tryPeekExact(parser.newLine)
|
|
if eof || newline {
|
|
return nil
|
|
}
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
parser.recordBuffer = append(parser.recordBuffer, parser.quote...)
|
|
} else {
|
|
// the field is completed, exit.
|
|
return nil
|
|
}
|
|
default:
|
|
parser.appendCSVTokenToRecordBuffer(token)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (parser *CSVParser) replaceEOF(err error, replaced error) error {
|
|
if err == nil || errors.Cause(err) != io.EOF {
|
|
return err
|
|
}
|
|
if replaced != nil {
|
|
parser.logSyntaxError()
|
|
replaced = errors.AddStack(replaced)
|
|
}
|
|
return replaced
|
|
}
|
|
|
|
// ReadRow reads a row from the datafile.
|
|
func (parser *CSVParser) ReadRow() error {
|
|
row := &parser.lastRow
|
|
row.Length = 0
|
|
row.RowID++
|
|
|
|
// skip the header first
|
|
if parser.shouldParseHeader {
|
|
err := parser.ReadColumns()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
parser.shouldParseHeader = false
|
|
}
|
|
|
|
parser.beginRowLenCheck()
|
|
defer parser.endRowLenCheck()
|
|
|
|
fields, err := parser.readRecord(parser.lastRecord)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
parser.lastRecord = fields
|
|
// remove the last empty value
|
|
if parser.cfg.TrimLastEmptyField {
|
|
i := len(fields) - 1
|
|
if i >= 0 && len(fields[i].content) == 0 {
|
|
fields = fields[:i]
|
|
}
|
|
}
|
|
|
|
row.Row = parser.acquireDatumSlice()
|
|
if cap(row.Row) >= len(fields) {
|
|
row.Row = row.Row[:len(fields)]
|
|
} else {
|
|
row.Row = make([]types.Datum, len(fields))
|
|
}
|
|
for i, f := range fields {
|
|
row.Length += len(f.content)
|
|
unescaped, isNull, err := parser.unescapeString(f)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if isNull {
|
|
row.Row[i].SetNull()
|
|
} else {
|
|
row.Row[i].SetString(unescaped, "utf8mb4_bin")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReadColumns reads the columns of this CSV file.
|
|
func (parser *CSVParser) ReadColumns() error {
|
|
parser.beginRowLenCheck()
|
|
defer parser.endRowLenCheck()
|
|
columns, err := parser.readRecord(nil)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if !parser.cfg.HeaderSchemaMatch {
|
|
return nil
|
|
}
|
|
parser.columns = make([]string, 0, len(columns))
|
|
for _, colName := range columns {
|
|
colNameStr, _, err := parser.unescapeString(colName)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
parser.columns = append(parser.columns, strings.ToLower(colNameStr))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReadUntilTerminator seeks the file until the terminator token is found, and
|
|
// returns
|
|
// - the content with terminator, or the content read before meet error
|
|
// - the file offset beyond the terminator, or the offset when meet error
|
|
// - error
|
|
// Note that the terminator string pattern may be the content of a field, which
|
|
// means it's inside quotes. Caller should make sure to handle this case.
|
|
func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error) {
|
|
parser.beginRowLenCheck()
|
|
defer parser.endRowLenCheck()
|
|
return parser.readUntilTerminator()
|
|
}
|
|
|
|
func (parser *CSVParser) readUntilTerminator() ([]byte, int64, error) {
|
|
var ret []byte
|
|
for {
|
|
content, firstByte, err := parser.readUntil(&parser.newLineByteSet)
|
|
ret = append(ret, content...)
|
|
if err != nil {
|
|
return ret, parser.pos, err
|
|
}
|
|
parser.skipBytes(1)
|
|
ret = append(ret, firstByte)
|
|
if ok, err := parser.tryReadNewLine(firstByte); ok || err != nil {
|
|
if len(parser.newLine) >= 1 {
|
|
ret = append(ret, parser.newLine[1:]...)
|
|
}
|
|
return ret, parser.pos, err
|
|
}
|
|
}
|
|
}
|