Files
tidb/pkg/lightning/mydump/parser.go

697 lines
19 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mydump
import (
"bytes"
"context"
"fmt"
"io"
"regexp"
"strconv"
"strings"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/metric"
"github.com/pingcap/tidb/pkg/lightning/worker"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/zeropool"
"github.com/spkg/bom"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type blockParser struct {
// states for the lexer
reader PooledReader
// stores data that has NOT been parsed yet, it shares same memory as appendBuf.
buf []byte
// used to read data from the reader, the data will be moved to other buffers.
blockBuf []byte
isLastChunk bool
// The list of column names of the last INSERT statement.
columns []string
rowPool *zeropool.Pool[[]types.Datum]
lastRow Row
// the reader position we have parsed, if the underlying reader is not
// a compressed file, it's the file position we have parsed too.
// this value may go backward when failed to read quoted field, but it's
// for printing error message, and the parser should not be used later,
// so it's ok, see readQuotedField.
pos int64
// cache
remainBuf *bytes.Buffer
// holds the cached parsable data after last readBlock. all data inside is
// unparsed at the moment of the return of last readBlock, for current unparsed
// data, use buf.
appendBuf *bytes.Buffer
// the Logger associated with this parser for reporting failure
Logger log.Logger
metrics *metric.Metrics
checkRowLen bool
rowStartPos int64
}
func makeBlockParser(
reader ReadSeekCloser,
blockBufSize int64,
ioWorkers *worker.Pool,
metrics *metric.Metrics,
logger log.Logger,
) blockParser {
pool := zeropool.New[[]types.Datum](func() []types.Datum {
return make([]types.Datum, 0, 16)
})
return blockParser{
reader: MakePooledReader(reader, ioWorkers),
blockBuf: make([]byte, blockBufSize*config.BufferSizeScale),
remainBuf: &bytes.Buffer{},
appendBuf: &bytes.Buffer{},
Logger: logger,
rowPool: &pool,
metrics: metrics,
}
}
// ChunkParser is a parser of the data files (the file containing only INSERT
// statements).
type ChunkParser struct {
blockParser
escFlavor escapeFlavor
}
// Chunk represents a portion of the data file.
type Chunk struct {
Offset int64
// for parquet file, it's the total row count
// see makeParquetFileRegion
EndOffset int64
RealOffset int64
// we estimate row-id range of the chunk using file-size divided by some factor(depends on column count)
// after estimation, we will rebase them for all chunks of this table in this instance,
// then it's rebased again based on all instances of parallel import.
// allocatable row-id is in range (PrevRowIDMax, RowIDMax].
// PrevRowIDMax will be increased during local encoding
PrevRowIDMax int64
RowIDMax int64
// only assigned when using strict-mode for CSV files and the file contains header
Columns []string
}
// Row is the content of a row.
type Row struct {
// RowID is the row id of the row.
// as objects of this struct is reused, this RowID is increased when reading
// next row.
RowID int64
Row []types.Datum
Length int
}
// MarshalLogArray implements the zapcore.ArrayMarshaler interface
func (row Row) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
for _, r := range row.Row {
encoder.AppendString(r.String())
}
return nil
}
type escapeFlavor uint8
const (
escapeFlavorNone escapeFlavor = iota
escapeFlavorMySQL
escapeFlavorMySQLWithNull
)
// Parser provides some methods to parse a source data file.
type Parser interface {
// Pos returns means the position that parser have already handled. It's mainly used for checkpoint.
// For normal files it's the file offset we handled.
// For parquet files it's the row count we handled.
// For compressed files it's the uncompressed file offset we handled.
// TODO: replace pos with a new structure to specify position offset and rows offset
Pos() (pos int64, rowID int64)
SetPos(pos int64, rowID int64) error
// ScannedPos always returns the current file reader pointer's location
ScannedPos() (int64, error)
Close() error
ReadRow() error
LastRow() Row
RecycleRow(row Row)
// Columns returns the _lower-case_ column names corresponding to values in
// the LastRow.
Columns() []string
// SetColumns set restored column names to parser
SetColumns([]string)
SetLogger(log.Logger)
SetRowID(rowID int64)
}
// NewChunkParser creates a new parser which can read chunks out of a file.
func NewChunkParser(
ctx context.Context,
sqlMode mysql.SQLMode,
reader ReadSeekCloser,
blockBufSize int64,
ioWorkers *worker.Pool,
) *ChunkParser {
escFlavor := escapeFlavorMySQL
if sqlMode.HasNoBackslashEscapesMode() {
escFlavor = escapeFlavorNone
}
metrics, _ := metric.FromContext(ctx)
return &ChunkParser{
blockParser: makeBlockParser(reader, blockBufSize, ioWorkers, metrics, log.Wrap(logutil.Logger(ctx))),
escFlavor: escFlavor,
}
}
func (parser *blockParser) beginRowLenCheck() {
parser.checkRowLen = true
parser.rowStartPos = parser.pos
}
func (parser *blockParser) endRowLenCheck() {
parser.checkRowLen = false
}
// SetPos changes the reported position and row ID.
func (parser *blockParser) SetPos(pos int64, rowID int64) error {
p, err := parser.reader.Seek(pos, io.SeekStart)
if err != nil {
return errors.Trace(err)
}
if p != pos {
return errors.Errorf("set pos failed, required position: %d, got: %d", pos, p)
}
parser.pos = pos
parser.lastRow.RowID = rowID
return nil
}
// ScannedPos gets the read position of current reader.
// this always returns the position of the underlying file, either compressed or not.
func (parser *blockParser) ScannedPos() (int64, error) {
return parser.reader.Seek(0, io.SeekCurrent)
}
// Pos returns the current file offset.
// Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (parser *blockParser) Pos() (pos int64, lastRowID int64) {
return parser.pos, parser.lastRow.RowID
}
func (parser *blockParser) Close() error {
return parser.reader.Close()
}
func (parser *blockParser) Columns() []string {
return parser.columns
}
func (parser *blockParser) SetColumns(columns []string) {
parser.columns = columns
}
func (parser *blockParser) logSyntaxError() {
content := parser.buf
if len(content) > 256 {
content = content[:256]
}
parser.Logger.Error("syntax error",
zap.Int64("pos", parser.pos),
zap.ByteString("content", content),
)
}
func (parser *blockParser) SetLogger(logger log.Logger) {
parser.Logger = logger
}
// SetRowID changes the reported row ID when we firstly read compressed files.
func (parser *blockParser) SetRowID(rowID int64) {
parser.lastRow.RowID = rowID
}
type token byte
const (
tokNil token = iota
tokRowBegin
tokRowEnd
tokValues
tokNull
tokTrue
tokFalse
tokHexString
tokBinString
tokInteger
tokSingleQuoted
tokDoubleQuoted
tokBackQuoted
tokUnquoted
)
var tokenDescriptions = [...]string{
tokNil: "<Nil>",
tokRowBegin: "RowBegin",
tokRowEnd: "RowEnd",
tokValues: "Values",
tokNull: "Null",
tokTrue: "True",
tokFalse: "False",
tokHexString: "HexString",
tokBinString: "BinString",
tokInteger: "Integer",
tokSingleQuoted: "SingleQuoted",
tokDoubleQuoted: "DoubleQuoted",
tokBackQuoted: "BackQuoted",
tokUnquoted: "Unquoted",
}
// String implements the fmt.Stringer interface
//
// Mainly used for debugging a token.
func (tok token) String() string {
t := int(tok)
if t >= 0 && t < len(tokenDescriptions) {
if description := tokenDescriptions[t]; description != "" {
return description
}
}
return fmt.Sprintf("<Unknown(%d)>", t)
}
func (parser *blockParser) readBlock() error {
startTime := time.Now()
n, err := parser.reader.ReadFull(parser.blockBuf)
switch err {
case io.ErrUnexpectedEOF, io.EOF:
parser.isLastChunk = true
fallthrough
case nil:
// `parser.buf` reference to `appendBuf.Bytes`, so should use remainBuf to
// hold the `parser.buf` rest data to prevent slice overlap
parser.remainBuf.Reset()
parser.remainBuf.Write(parser.buf)
parser.appendBuf.Reset()
parser.appendBuf.Write(parser.remainBuf.Bytes())
blockData := parser.blockBuf[:n]
if parser.pos == 0 {
bomCleanedData := bom.Clean(blockData)
parser.pos += int64(n - len(bomCleanedData))
blockData = bomCleanedData
}
parser.appendBuf.Write(blockData)
parser.buf = parser.appendBuf.Bytes()
if parser.metrics != nil {
parser.metrics.ChunkParserReadBlockSecondsHistogram.Observe(time.Since(startTime).Seconds())
}
return nil
default:
return errors.Trace(err)
}
}
var chunkParserUnescapeRegexp = regexp.MustCompile(`(?s)\\.`)
func unescape(
input string,
delim string,
escFlavor escapeFlavor,
escChar byte,
unescapeRegexp *regexp.Regexp,
) string {
if len(delim) > 0 {
delim2 := delim + delim
if strings.Contains(input, delim2) {
input = strings.ReplaceAll(input, delim2, delim)
}
}
if escFlavor != escapeFlavorNone && strings.IndexByte(input, escChar) != -1 {
input = unescapeRegexp.ReplaceAllStringFunc(input, func(substr string) string {
switch substr[1] {
case '0':
return "\x00"
case 'b':
return "\b"
case 'n':
return "\n"
case 'r':
return "\r"
case 't':
return "\t"
case 'Z':
return "\x1a"
default:
return substr[1:]
}
})
}
return input
}
func (parser *ChunkParser) unescapeString(input string) string {
if len(input) >= 2 {
switch input[0] {
case '\'', '"':
return unescape(input[1:len(input)-1], input[:1], parser.escFlavor, '\\', chunkParserUnescapeRegexp)
case '`':
return unescape(input[1:len(input)-1], "`", escapeFlavorNone, '\\', chunkParserUnescapeRegexp)
}
}
return input
}
// ReadRow reads a row from the datafile.
func (parser *ChunkParser) ReadRow() error {
// This parser will recognize contents like:
//
// `tableName` (...) VALUES (...) (...) (...)
//
// Keywords like INSERT, INTO and separators like ',' and ';' are treated
// like comments and ignored. Therefore, this parser will accept some
// nonsense input. The advantage is the parser becomes extremely simple,
// suitable for us where we just want to quickly and accurately split the
// file apart, not to validate the content.
type state byte
const (
// the state after "INSERT INTO" before the column names or "VALUES"
stateTableName state = iota
// the state while reading the column names
stateColumns
// the state after reading "VALUES"
stateValues
// the state while reading row values
stateRow
)
// Dry-run sample of the state machine, first row:
//
// Input Token State
// ~~~~~ ~~~~~ ~~~~~
//
// stateValues
// INSERT
// INTO
// `tableName` tokBackQuoted
// stateTableName (reset columns)
// ( tokRowBegin
// stateColumns
// `a` tokBackQuoted
// stateColumns (append column)
// ,
// `b` tokBackQuoted
// stateColumns (append column)
// ) tokRowEnd
// stateValues
// VALUES
// stateValues (no-op)
// ( tokRowBegin
// stateRow (reset row)
// 1 tokInteger
// stateRow (append value)
// ,
// 2 tokInteger
// stateRow (append value)
// ) tokRowEnd
// return
//
//
// Second row:
//
// Input Token State
// ~~~~~ ~~~~~ ~~~~~
//
// stateValues
// ,
// ( tokRowBegin
// stateRow (reset row)
// 3 tokInteger
// stateRow (append value)
// ) tokRowEnd
// return
//
// Third row:
//
// Input Token State
// ~~~~~ ~~~~~ ~~~~~
//
// ;
// INSERT
// INTO
// `database` tokBackQuoted
// stateTableName (reset columns)
// .
// `tableName` tokBackQuoted
// stateTableName (no-op)
// VALUES
// stateValues
// ( tokRowBegin
// stateRow (reset row)
// 4 tokInteger
// stateRow (append value)
// ) tokRowEnd
// return
row := &parser.lastRow
st := stateValues
row.Length = 0
for {
tok, content, err := parser.lex()
if err != nil {
if err == io.EOF && st != stateValues {
return errors.Errorf("syntax error: premature EOF at offset %d", parser.pos)
}
return errors.Trace(err)
}
row.Length += len(content)
switch st {
case stateTableName:
switch tok {
case tokRowBegin:
st = stateColumns
case tokValues:
st = stateValues
case tokUnquoted, tokDoubleQuoted, tokBackQuoted:
default:
return errors.Errorf(
"syntax error: unexpected %s (%s) at offset %d, expecting %s",
tok, content, parser.pos, "table name",
)
}
case stateColumns:
switch tok {
case tokRowEnd:
st = stateValues
case tokUnquoted, tokDoubleQuoted, tokBackQuoted:
columnName := strings.ToLower(parser.unescapeString(string(content)))
parser.columns = append(parser.columns, columnName)
default:
return errors.Errorf(
"syntax error: unexpected %s (%s) at offset %d, expecting %s",
tok, content, parser.pos, "column list",
)
}
case stateValues:
switch tok {
case tokRowBegin:
row.RowID++
row.Row = parser.acquireDatumSlice()
st = stateRow
case tokUnquoted, tokDoubleQuoted, tokBackQuoted:
parser.columns = nil
st = stateTableName
case tokValues:
default:
return errors.Errorf(
"syntax error: unexpected %s (%s) at offset %d, expecting %s",
tok, content, parser.pos, "start of row",
)
}
case stateRow:
var value types.Datum
switch tok {
case tokRowEnd:
return nil
case tokNull:
value.SetNull()
case tokTrue:
value.SetInt64(1)
case tokFalse:
value.SetInt64(0)
case tokInteger:
c := string(content)
if strings.HasPrefix(c, "-") {
i, err := strconv.ParseInt(c, 10, 64)
if err == nil {
value.SetInt64(i)
break
}
} else {
u, err := strconv.ParseUint(c, 10, 64)
if err == nil {
value.SetUint64(u)
break
}
}
// if the integer is too long, fallback to treating it as a
// string (all types that treats integer specially like BIT
// can't handle integers more than 64 bits anyway)
fallthrough
case tokUnquoted, tokSingleQuoted, tokDoubleQuoted:
value.SetString(parser.unescapeString(string(content)), "utf8mb4_bin")
case tokHexString:
hexLit, err := types.ParseHexStr(string(content))
if err != nil {
return errors.Trace(err)
}
value.SetBinaryLiteral(hexLit)
case tokBinString:
binLit, err := types.ParseBitStr(string(content))
if err != nil {
return errors.Trace(err)
}
value.SetBinaryLiteral(binLit)
default:
return errors.Errorf(
"syntax error: unexpected %s (%s) at offset %d, expecting %s",
tok, content, parser.pos, "data literal",
)
}
row.Row = append(row.Row, value)
}
}
}
// LastRow is the copy of the row parsed by the last call to ReadRow().
func (parser *blockParser) LastRow() Row {
return parser.lastRow
}
// RecycleRow places the row object back into the allocation pool.
func (parser *blockParser) RecycleRow(row Row) {
// We need farther benchmarking to make sure whether send a pointer
// (instead of a slice) here can improve performance.
parser.rowPool.Put(row.Row[:0])
}
// acquireDatumSlice allocates an empty []types.Datum
func (parser *blockParser) acquireDatumSlice() []types.Datum {
return parser.rowPool.Get()
}
// ReadChunks parses the entire file and splits it into continuous chunks of
// size >= minSize.
func ReadChunks(parser Parser, minSize int64) ([]Chunk, error) {
var chunks []Chunk
pos, lastRowID := parser.Pos()
cur := Chunk{
Offset: pos,
EndOffset: pos,
PrevRowIDMax: lastRowID,
RowIDMax: lastRowID,
}
for {
switch err := parser.ReadRow(); errors.Cause(err) {
case nil:
cur.EndOffset, cur.RowIDMax = parser.Pos()
if cur.EndOffset-cur.Offset >= minSize {
chunks = append(chunks, cur)
cur.Offset = cur.EndOffset
cur.PrevRowIDMax = cur.RowIDMax
}
case io.EOF:
if cur.Offset < cur.EndOffset {
chunks = append(chunks, cur)
}
return chunks, nil
default:
return nil, errors.Trace(err)
}
}
}
// ReadUntil parses the entire file and splits it into continuous chunks of
// size >= minSize.
func ReadUntil(parser Parser, pos int64) error {
var curOffset int64
for curOffset < pos {
switch err := parser.ReadRow(); errors.Cause(err) {
case nil:
curOffset, _ = parser.Pos()
case io.EOF:
return nil
default:
return errors.Trace(err)
}
}
return nil
}
// OpenReader opens a reader for the given file and storage.
func OpenReader(
ctx context.Context,
fileMeta *SourceFileMeta,
store storeapi.Storage,
decompressCfg compressedio.DecompressConfig,
) (reader storeapi.ReadSeekCloser, err error) {
switch {
case fileMeta.Type == SourceTypeParquet:
reader, err = OpenParquetReader(ctx, store, fileMeta.Path)
case fileMeta.Compression != CompressionNone:
compressType, err2 := ToStorageCompressType(fileMeta.Compression)
if err2 != nil {
return nil, err2
}
reader, err = objstore.WithCompression(store, compressType, decompressCfg).Open(ctx, fileMeta.Path, nil)
default:
reader, err = store.Open(ctx, fileMeta.Path, nil)
}
return
}