Files
tidb/pkg/lightning/mydump/csv_parser.go

737 lines
22 KiB
Go

// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mydump
import (
"bytes"
"context"
"io"
"regexp"
"slices"
"strings"
"github.com/pingcap/errors"
tidbconfig "github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/metric"
"github.com/pingcap/tidb/pkg/lightning/worker"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
)
var (
errUnterminatedQuotedField = errors.NewNoStackError("syntax error: unterminated quoted field")
errDanglingBackslash = errors.NewNoStackError("syntax error: no character after backslash")
errUnexpectedQuoteField = errors.NewNoStackError(
"syntax error: cannot have consecutive fields without separator")
// LargestEntryLimit is the max size for reading file to buf
LargestEntryLimit int
)
func init() {
LargestEntryLimit = tidbconfig.MaxTxnEntrySizeLimit
}
// CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.
type CSVParser struct {
blockParser
cfg *config.CSVConfig
comma []byte
quote []byte
newLine []byte
startingBy []byte
escapedBy string
unescapeRegexp *regexp.Regexp
charsetConvertor *CharsetConvertor
// These variables are used with IndexAnyByte to search a byte slice for the
// first index which some special character may appear.
// quoteByteSet is used inside quoted fields (so the first characters of
// the closing delimiter and backslash are special).
// unquoteByteSet is used outside quoted fields (so the first characters
// of the opening delimiter, separator, terminator and backslash are
// special).
// newLineByteSet is used in strict-format CSV dividing (so the first
// characters of the terminator are special).
quoteByteSet byteSet
unquoteByteSet byteSet
newLineByteSet byteSet
// recordBuffer holds the unescaped fields, one after another.
// The fields can be accessed by using the indexes in fieldIndexes.
// E.g., For the row `a,"b","c""d",e`, recordBuffer will contain `abc"de`
// and fieldIndexes will contain the indexes [1, 2, 5, 6].
recordBuffer []byte
// fieldIndexes is an index of fields inside recordBuffer.
// The i-th field ends at offset fieldIndexes[i] in recordBuffer.
fieldIndexes []int
fieldIsQuoted []bool
lastRecord []field
escFlavor escapeFlavor
// if set to true, csv parser will treat the first non-empty line as header line
shouldParseHeader bool
// in LOAD DATA, empty line should be treated as a valid record
allowEmptyLine bool
quotedNullIsText bool
unescapedQuote bool
}
type field struct {
content string
quoted bool
}
// NewCSVParser creates a CSV parser. The ownership of the reader is transferred
// to the parser.
func NewCSVParser(
ctx context.Context,
cfg *config.CSVConfig,
reader ReadSeekCloser,
blockBufSize int64,
ioWorkers *worker.Pool,
shouldParseHeader bool,
charsetConvertor *CharsetConvertor,
) (*CSVParser, error) {
var err error
var fieldTerminator, delimiter, lineTerminator string
// Do not do the conversion if the charsetConvertor is nil.
if charsetConvertor == nil {
fieldTerminator = cfg.FieldsTerminatedBy
delimiter = cfg.FieldsEnclosedBy
lineTerminator = cfg.LinesTerminatedBy
} else {
fieldTerminator, delimiter, lineTerminator, err = encodeSpecialSymbols(cfg, charsetConvertor)
if err != nil {
return nil, err
}
}
var quoteStopSet, newLineStopSet []byte
unquoteStopSet := []byte{fieldTerminator[0]}
if len(cfg.FieldsEnclosedBy) > 0 {
quoteStopSet = []byte{delimiter[0]}
unquoteStopSet = append(unquoteStopSet, delimiter[0])
}
if len(lineTerminator) > 0 {
newLineStopSet = []byte{lineTerminator[0]}
} else {
// The character set encoding of '\r' and '\n' is the same in UTF-8 and GBK.
newLineStopSet = []byte{'\r', '\n'}
}
unquoteStopSet = append(unquoteStopSet, newLineStopSet...)
if len(cfg.LinesStartingBy) > 0 {
if strings.Contains(cfg.LinesStartingBy, lineTerminator) {
return nil, errors.Errorf("STARTING BY '%s' cannot contain LINES TERMINATED BY '%s'", cfg.LinesStartingBy, lineTerminator)
}
}
escFlavor := escapeFlavorNone
var r *regexp.Regexp
if len(cfg.FieldsEscapedBy) > 0 {
escFlavor = escapeFlavorMySQL
quoteStopSet = append(quoteStopSet, cfg.FieldsEscapedBy[0])
unquoteStopSet = append(unquoteStopSet, cfg.FieldsEscapedBy[0])
// we need special treatment of the NULL value \N, used by MySQL.
if !cfg.NotNull && slices.Contains(cfg.FieldNullDefinedBy, cfg.FieldsEscapedBy+`N`) {
escFlavor = escapeFlavorMySQLWithNull
}
r, err = regexp.Compile(`(?s)` + regexp.QuoteMeta(cfg.FieldsEscapedBy) + `.`)
if err != nil {
return nil, errors.Trace(err)
}
}
metrics, _ := metric.FromContext(ctx)
return &CSVParser{
blockParser: makeBlockParser(reader, blockBufSize, ioWorkers, metrics, log.Wrap(logutil.Logger(ctx))),
cfg: cfg,
charsetConvertor: charsetConvertor,
comma: []byte(fieldTerminator),
quote: []byte(delimiter),
newLine: []byte(lineTerminator),
startingBy: []byte(cfg.LinesStartingBy),
escapedBy: cfg.FieldsEscapedBy,
unescapeRegexp: r,
escFlavor: escFlavor,
quoteByteSet: makeByteSet(quoteStopSet),
unquoteByteSet: makeByteSet(unquoteStopSet),
newLineByteSet: makeByteSet(newLineStopSet),
shouldParseHeader: shouldParseHeader,
allowEmptyLine: cfg.AllowEmptyLine,
quotedNullIsText: cfg.QuotedNullIsText,
unescapedQuote: cfg.UnescapedQuote,
}, nil
}
// encodeSpecialSymbols will encode the special symbols, e,g, separator, delimiter and terminator
// with the given charset according to the charset convertor.
func encodeSpecialSymbols(cfg *config.CSVConfig, cc *CharsetConvertor) (separator, delimiter, terminator string, err error) {
// FieldsTerminatedBy
separator, err = cc.Encode(cfg.FieldsTerminatedBy)
if err != nil {
return
}
// FieldsEnclosedBy
delimiter, err = cc.Encode(cfg.FieldsEnclosedBy)
if err != nil {
return
}
// LinesTerminatedBy
terminator, err = cc.Encode(cfg.LinesTerminatedBy)
if err != nil {
return
}
return
}
func (parser *CSVParser) unescapeString(input field) (unescaped string, isNull bool, err error) {
// Convert the input from another charset to utf8mb4 before we return the string.
if unescaped, err = parser.charsetConvertor.Decode(input.content); err != nil {
return
}
if parser.escFlavor == escapeFlavorMySQLWithNull && unescaped == parser.escapedBy+`N` {
return input.content, true, nil
}
if len(parser.escapedBy) > 0 {
unescaped = unescape(unescaped, "", parser.escFlavor, parser.escapedBy[0], parser.unescapeRegexp)
}
if !(len(parser.quote) > 0 && parser.quotedNullIsText && input.quoted) {
// this branch represents "quote is not configured" or "quoted null is null" or "this field has no quote"
// we check null for them
isNull = !parser.cfg.NotNull &&
slices.Contains(parser.cfg.FieldNullDefinedBy, unescaped)
// avoid \\N becomes NULL
if parser.escFlavor == escapeFlavorMySQLWithNull && unescaped == parser.escapedBy+`N` {
isNull = false
}
}
return
}
// csvToken is a type representing either a normal byte or some CSV-specific
// tokens such as the separator (comma), delimiter (quote) and terminator (new
// line).
type csvToken int16
const (
// csvTokenAnyUnquoted is a placeholder to represent any unquoted character.
csvTokenAnyUnquoted csvToken = 0
// csvTokenEscaped is a mask indicating an escaped character.
// The actual token is represented like `csvTokenEscaped | 'n'`.
csvTokenEscaped csvToken = 0x100
// csvTokenComma is the CSV separator token.
csvTokenComma csvToken = 0x200
// csvTokenNewLine is the CSV terminator token.
csvTokenNewLine csvToken = 0x400
// csvTokenDelimiter is the CSV delimiter token.
csvTokenDelimiter csvToken = 0x800
)
func (parser *CSVParser) readByte() (byte, error) {
if len(parser.buf) == 0 {
if err := parser.readBlock(); err != nil {
return 0, err
}
}
if len(parser.buf) == 0 {
return 0, io.EOF
}
b := parser.buf[0]
parser.buf = parser.buf[1:]
parser.pos++
return b, nil
}
func (parser *CSVParser) peekBytes(cnt int) ([]byte, error) {
if len(parser.buf) < cnt {
if err := parser.readBlock(); err != nil {
return nil, err
}
}
if len(parser.buf) == 0 {
return nil, io.EOF
}
cnt = min(cnt, len(parser.buf))
return parser.buf[:cnt], nil
}
func (parser *CSVParser) skipBytes(n int) {
parser.buf = parser.buf[n:]
parser.pos += int64(n)
}
// tryPeekExact peeks the bytes ahead, and if it matches `content` exactly will
// return (true, false, nil). If meet EOF it will return (false, true, nil).
// For other errors it will return (false, false, err).
func (parser *CSVParser) tryPeekExact(content []byte) (matched bool, eof bool, err error) {
if len(content) == 0 {
return true, false, nil
}
bs, err := parser.peekBytes(len(content))
if err == nil {
if bytes.Equal(bs, content) {
return true, false, nil
}
} else if errors.Cause(err) == io.EOF {
return false, true, nil
}
return false, false, err
}
// tryReadExact peeks the bytes ahead, and if it matches `content` exactly will
// consume it (advance the cursor) and return `true`.
func (parser *CSVParser) tryReadExact(content []byte) (bool, error) {
matched, _, err := parser.tryPeekExact(content)
if matched {
parser.skipBytes(len(content))
}
return matched, err
}
func (parser *CSVParser) tryReadNewLine(b byte) (bool, error) {
if len(parser.newLine) == 0 {
return b == '\r' || b == '\n', nil
}
if b != parser.newLine[0] {
return false, nil
}
return parser.tryReadExact(parser.newLine[1:])
}
func (parser *CSVParser) tryReadOpenDelimiter(b byte) (bool, error) {
if len(parser.quote) == 0 || parser.quote[0] != b {
return false, nil
}
return parser.tryReadExact(parser.quote[1:])
}
// tryReadCloseDelimiter is currently equivalent to tryReadOpenDelimiter until
// we support asymmetric delimiters.
func (parser *CSVParser) tryReadCloseDelimiter(b byte) (bool, error) {
if parser.quote[0] != b {
return false, nil
}
return parser.tryReadExact(parser.quote[1:])
}
func (parser *CSVParser) tryReadComma(b byte) (bool, error) {
if parser.comma[0] != b {
return false, nil
}
return parser.tryReadExact(parser.comma[1:])
}
func (parser *CSVParser) tryReadEscaped(bs byte) (bool, byte, error) {
if parser.escapedBy == "" {
return false, 0, nil
}
if bs != parser.escapedBy[0] || parser.escFlavor == escapeFlavorNone {
return false, 0, nil
}
b, err := parser.readByte()
return true, b, parser.replaceEOF(err, errDanglingBackslash)
}
// readQuoteToken reads a token inside quoted fields.
func (parser *CSVParser) readQuotedToken(b byte) (csvToken, error) {
if ok, err := parser.tryReadCloseDelimiter(b); ok || err != nil {
return csvTokenDelimiter, err
}
if ok, eb, err := parser.tryReadEscaped(b); ok || err != nil {
return csvTokenEscaped | csvToken(eb), err
}
return csvToken(b), nil
}
// readUnquoteToken reads a token outside quoted fields.
func (parser *CSVParser) readUnquoteToken(b byte) (csvToken, error) {
if ok, err := parser.tryReadNewLine(b); ok || err != nil {
return csvTokenNewLine, err
}
if ok, err := parser.tryReadComma(b); ok || err != nil {
return csvTokenComma, err
}
if ok, err := parser.tryReadOpenDelimiter(b); ok || err != nil {
return csvTokenDelimiter, err
}
if ok, eb, err := parser.tryReadEscaped(b); ok || err != nil {
return csvTokenEscaped | csvToken(eb), err
}
return csvToken(b), nil
}
func (parser *CSVParser) appendCSVTokenToRecordBuffer(token csvToken) {
if token&csvTokenEscaped != 0 {
parser.recordBuffer = append(parser.recordBuffer, parser.escapedBy[0])
}
parser.recordBuffer = append(parser.recordBuffer, byte(token))
}
// readUntil reads the buffer until any character from the `chars` set is found.
// that character is excluded from the final buffer.
func (parser *CSVParser) readUntil(chars *byteSet) ([]byte, byte, error) {
index := IndexAnyByte(parser.buf, chars)
if index >= 0 {
ret := parser.buf[:index]
parser.buf = parser.buf[index:]
parser.pos += int64(index)
return ret, parser.buf[0], nil
}
// not found in parser.buf, need allocate and loop.
var buf []byte
for {
buf = append(buf, parser.buf...)
if parser.checkRowLen && parser.pos-parser.rowStartPos+int64(len(buf)) > int64(LargestEntryLimit) {
return buf, 0, errors.New("size of row cannot exceed the max value of txn-entry-size-limit")
}
parser.buf = nil
if err := parser.readBlock(); err != nil || len(parser.buf) == 0 {
if err == nil {
err = io.EOF
}
parser.pos += int64(len(buf))
return buf, 0, errors.Trace(err)
}
index := IndexAnyByte(parser.buf, chars)
if index >= 0 {
buf = append(buf, parser.buf[:index]...)
parser.buf = parser.buf[index:]
parser.pos += int64(len(buf))
return buf, parser.buf[0], nil
}
}
}
func (parser *CSVParser) readRecord(dst []field) ([]field, error) {
parser.recordBuffer = parser.recordBuffer[:0]
parser.fieldIndexes = parser.fieldIndexes[:0]
parser.fieldIsQuoted = parser.fieldIsQuoted[:0]
isEmptyLine := true
whitespaceLine := true
foundStartingByThisLine := false
prevToken := csvTokenNewLine
fieldIsQuoted := false
var firstToken csvToken
outside:
for {
// we should drop
// 1. the whole line if it does not contain startingBy
// 2. any character before startingBy
// since we have checked startingBy does not contain terminator, we can
// split at terminator to check the substring contains startingBy. Even
// if the terminator is inside a quoted field which means it's not the
// end of a line, the substring can still be dropped by rule 2.
if len(parser.startingBy) > 0 && !foundStartingByThisLine {
oldPos := parser.pos
content, _, err := parser.readUntilTerminator()
if err != nil {
if !(errors.Cause(err) == io.EOF) {
return nil, err
}
if len(content) == 0 {
return nil, err
}
// if we reached EOF, we should still check the content contains
// startingBy and try to put back and parse it.
}
idx := bytes.Index(content, parser.startingBy)
if idx == -1 {
continue
}
foundStartingByThisLine = true
content = content[idx+len(parser.startingBy):]
parser.buf = append(content, parser.buf...)
parser.pos = oldPos + int64(idx+len(parser.startingBy))
}
content, firstByte, err := parser.readUntil(&parser.unquoteByteSet)
if len(content) > 0 {
isEmptyLine = false
if prevToken == csvTokenDelimiter {
parser.logSyntaxError()
return nil, errors.AddStack(errUnexpectedQuoteField)
}
parser.recordBuffer = append(parser.recordBuffer, content...)
prevToken = csvTokenAnyUnquoted
}
if err != nil {
if isEmptyLine || errors.Cause(err) != io.EOF {
return nil, err
}
// treat EOF as the same as trailing \n.
firstToken = csvTokenNewLine
} else {
parser.skipBytes(1)
firstToken, err = parser.readUnquoteToken(firstByte)
if err != nil {
return nil, err
}
}
switch firstToken {
case csvTokenComma:
whitespaceLine = false
parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer))
parser.fieldIsQuoted = append(parser.fieldIsQuoted, fieldIsQuoted)
fieldIsQuoted = false
case csvTokenDelimiter:
if prevToken != csvTokenComma && prevToken != csvTokenNewLine {
if parser.unescapedQuote {
whitespaceLine = false
parser.recordBuffer = append(parser.recordBuffer, parser.quote...)
continue
}
parser.logSyntaxError()
return nil, errors.AddStack(errUnexpectedQuoteField)
}
if err = parser.readQuotedField(); err != nil {
return nil, err
}
fieldIsQuoted = true
whitespaceLine = false
case csvTokenNewLine:
foundStartingByThisLine = false
// new line = end of record (ignore empty lines)
prevToken = firstToken
if !parser.allowEmptyLine {
if isEmptyLine {
continue
}
// skip lines only contain whitespaces
if err == nil && whitespaceLine && len(bytes.TrimSpace(parser.recordBuffer)) == 0 {
parser.recordBuffer = parser.recordBuffer[:0]
continue
}
}
parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer))
parser.fieldIsQuoted = append(parser.fieldIsQuoted, fieldIsQuoted)
// the loop is end, no need to reset fieldIsQuoted
break outside
default:
if prevToken == csvTokenDelimiter {
parser.logSyntaxError()
return nil, errors.AddStack(errUnexpectedQuoteField)
}
parser.appendCSVTokenToRecordBuffer(firstToken)
}
prevToken = firstToken
isEmptyLine = false
}
// Create a single string and create slices out of it.
// This pins the memory of the fields together, but allocates once.
str := string(parser.recordBuffer) // Convert to string once to batch allocations
dst = dst[:0]
if cap(dst) < len(parser.fieldIndexes) {
dst = make([]field, len(parser.fieldIndexes))
}
dst = dst[:len(parser.fieldIndexes)]
var preIdx int
for i, idx := range parser.fieldIndexes {
dst[i].content = str[preIdx:idx]
dst[i].quoted = parser.fieldIsQuoted[i]
preIdx = idx
}
// Check or update the expected fields per record.
return dst, nil
}
func (parser *CSVParser) readQuotedField() error {
for {
prevPos := parser.pos
content, terminator, err := parser.readUntil(&parser.quoteByteSet)
if err != nil {
if errors.Cause(err) == io.EOF {
// return the position of quote to the caller.
// because we return an error here, the parser won't
// use the `pos` again, so it's safe to modify it here.
parser.pos = prevPos - 1
// set buf to parser.buf in order to print err log
parser.buf = content
err = parser.replaceEOF(err, errUnterminatedQuotedField)
}
return err
}
parser.recordBuffer = append(parser.recordBuffer, content...)
parser.skipBytes(1)
token, err := parser.readQuotedToken(terminator)
if err != nil {
return err
}
switch token {
case csvTokenDelimiter:
// encountered '"' -> continue if we're seeing '""'.
doubledDelimiter, err := parser.tryReadExact(parser.quote)
if err != nil {
return err
}
if doubledDelimiter {
// consume the double quotation mark and continue
parser.recordBuffer = append(parser.recordBuffer, parser.quote...)
} else if parser.unescapedQuote {
// allow unescaped quote inside quoted field, so we only finish
// reading the field when we see a delimiter + comma/newline.
comma, _, err2 := parser.tryPeekExact(parser.comma)
if comma || err2 != nil {
return err2
}
newline, eof, err2 := parser.tryPeekExact(parser.newLine)
if eof || newline {
return nil
}
if err2 != nil {
return err2
}
parser.recordBuffer = append(parser.recordBuffer, parser.quote...)
} else {
// the field is completed, exit.
return nil
}
default:
parser.appendCSVTokenToRecordBuffer(token)
}
}
}
func (parser *CSVParser) replaceEOF(err error, replaced error) error {
if err == nil || errors.Cause(err) != io.EOF {
return err
}
if replaced != nil {
parser.logSyntaxError()
replaced = errors.AddStack(replaced)
}
return replaced
}
// ReadRow reads a row from the datafile.
func (parser *CSVParser) ReadRow() error {
row := &parser.lastRow
row.Length = 0
row.RowID++
// skip the header first
if parser.shouldParseHeader {
err := parser.ReadColumns()
if err != nil {
return errors.Trace(err)
}
parser.shouldParseHeader = false
}
parser.beginRowLenCheck()
defer parser.endRowLenCheck()
fields, err := parser.readRecord(parser.lastRecord)
if err != nil {
return errors.Trace(err)
}
parser.lastRecord = fields
// remove the last empty value
if parser.cfg.TrimLastEmptyField {
i := len(fields) - 1
if i >= 0 && len(fields[i].content) == 0 {
fields = fields[:i]
}
}
row.Row = parser.acquireDatumSlice()
if cap(row.Row) >= len(fields) {
row.Row = row.Row[:len(fields)]
} else {
row.Row = make([]types.Datum, len(fields))
}
for i, f := range fields {
row.Length += len(f.content)
unescaped, isNull, err := parser.unescapeString(f)
if err != nil {
return errors.Trace(err)
}
if isNull {
row.Row[i].SetNull()
} else {
row.Row[i].SetString(unescaped, "utf8mb4_bin")
}
}
return nil
}
// ReadColumns reads the columns of this CSV file.
func (parser *CSVParser) ReadColumns() error {
parser.beginRowLenCheck()
defer parser.endRowLenCheck()
columns, err := parser.readRecord(nil)
if err != nil {
return errors.Trace(err)
}
if !parser.cfg.HeaderSchemaMatch {
return nil
}
parser.columns = make([]string, 0, len(columns))
for _, colName := range columns {
colNameStr, _, err := parser.unescapeString(colName)
if err != nil {
return errors.Trace(err)
}
parser.columns = append(parser.columns, strings.ToLower(colNameStr))
}
return nil
}
// ReadUntilTerminator seeks the file until the terminator token is found, and
// returns
// - the content with terminator, or the content read before meet error
// - the file offset beyond the terminator, or the offset when meet error
// - error
// Note that the terminator string pattern may be the content of a field, which
// means it's inside quotes. Caller should make sure to handle this case.
func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error) {
parser.beginRowLenCheck()
defer parser.endRowLenCheck()
return parser.readUntilTerminator()
}
func (parser *CSVParser) readUntilTerminator() ([]byte, int64, error) {
var ret []byte
for {
content, firstByte, err := parser.readUntil(&parser.newLineByteSet)
ret = append(ret, content...)
if err != nil {
return ret, parser.pos, err
}
parser.skipBytes(1)
ret = append(ret, firstByte)
if ok, err := parser.tryReadNewLine(firstByte); ok || err != nil {
if len(parser.newLine) >= 1 {
ret = append(ret, parser.newLine[1:]...)
}
return ret, parser.pos, err
}
}
}