1062 lines
30 KiB
Go
1062 lines
30 KiB
Go
// Copyright 2018 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package executor
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
backup "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/tidb/br/pkg/lightning/config"
|
|
"github.com/pingcap/tidb/br/pkg/lightning/log"
|
|
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
|
|
"github.com/pingcap/tidb/br/pkg/storage"
|
|
"github.com/pingcap/tidb/expression"
|
|
"github.com/pingcap/tidb/parser/ast"
|
|
"github.com/pingcap/tidb/parser/model"
|
|
"github.com/pingcap/tidb/parser/mysql"
|
|
plannercore "github.com/pingcap/tidb/planner/core"
|
|
"github.com/pingcap/tidb/sessionctx"
|
|
"github.com/pingcap/tidb/sessiontxn"
|
|
"github.com/pingcap/tidb/table"
|
|
"github.com/pingcap/tidb/types"
|
|
"github.com/pingcap/tidb/util/chunk"
|
|
"github.com/pingcap/tidb/util/dbterror"
|
|
"github.com/pingcap/tidb/util/logutil"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
const (
|
|
// LoadDataFormatSQLDump represents the data source file of LOAD DATA is
|
|
// mydumper-format DML file
|
|
LoadDataFormatSQLDump = "sql file"
|
|
// LoadDataFormatParquet represents the data source file of LOAD DATA is
|
|
// parquet
|
|
LoadDataFormatParquet = "parquet"
|
|
|
|
logicalImportMode = "logical" // tidb backend
|
|
physicalImportMode = "physical" // local backend
|
|
unlimitedWriteSpeed = config.ByteSize(math.MaxInt64)
|
|
minDiskQuota = config.ByteSize(10 << 30) // 10GiB
|
|
minBatchSize = config.ByteSize(1 << 10) // 1KiB
|
|
minWriteSpeed = config.ByteSize(1 << 10) // 1KiB/s
|
|
|
|
importModeOption = "import_mode"
|
|
diskQuotaOption = "disk_quota"
|
|
checksumOption = "checksum_table"
|
|
addIndexOption = "add_index"
|
|
analyzeOption = "analyze_table"
|
|
threadOption = "thread"
|
|
batchSizeOption = "batch_size"
|
|
maxWriteSpeedOption = "max_write_speed"
|
|
splitFileOption = "split_file"
|
|
recordErrorsOption = "record_errors"
|
|
detachedOption = "detached"
|
|
)
|
|
|
|
var (
|
|
taskQueueSize = 16 // the maximum number of pending tasks to commit in queue
|
|
// InTest is a flag that bypass gcs authentication in unit tests.
|
|
InTest bool
|
|
|
|
// name -> whether the option has value
|
|
supportedOptions = map[string]bool{
|
|
importModeOption: true,
|
|
diskQuotaOption: true,
|
|
checksumOption: true,
|
|
addIndexOption: true,
|
|
analyzeOption: true,
|
|
threadOption: true,
|
|
batchSizeOption: true,
|
|
maxWriteSpeedOption: true,
|
|
splitFileOption: true,
|
|
recordErrorsOption: true,
|
|
detachedOption: false,
|
|
}
|
|
|
|
// options only allowed when import mode is physical
|
|
optionsForPhysicalImport = map[string]struct{}{
|
|
diskQuotaOption: {},
|
|
checksumOption: {},
|
|
addIndexOption: {},
|
|
analyzeOption: {},
|
|
}
|
|
)
|
|
|
|
// LoadDataExec represents a load data executor.
|
|
type LoadDataExec struct {
|
|
baseExecutor
|
|
|
|
FileLocRef ast.FileLocRefTp
|
|
OnDuplicate ast.OnDuplicateKeyHandlingType
|
|
loadDataWorker *LoadDataWorker
|
|
}
|
|
|
|
// Next implements the Executor Next interface.
|
|
func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {
|
|
req.GrowAndReset(e.maxChunkSize)
|
|
|
|
if e.loadDataWorker.Path == "" {
|
|
return ErrLoadDataEmptyPath
|
|
}
|
|
|
|
// CSV-like
|
|
if e.loadDataWorker.format == "" {
|
|
if e.loadDataWorker.NullInfo != nil && e.loadDataWorker.NullInfo.OptEnclosed &&
|
|
(e.loadDataWorker.FieldsInfo == nil || e.loadDataWorker.FieldsInfo.Enclosed == nil) {
|
|
return ErrLoadDataWrongFormatConfig.GenWithStackByArgs("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED")
|
|
}
|
|
// TODO: support lines terminated is "".
|
|
if len(e.loadDataWorker.LinesInfo.Terminated) == 0 {
|
|
return ErrLoadDataWrongFormatConfig.GenWithStackByArgs("LINES TERMINATED BY is empty")
|
|
}
|
|
}
|
|
|
|
switch e.FileLocRef {
|
|
case ast.FileLocServerOrRemote:
|
|
u, err := storage.ParseRawURL(e.loadDataWorker.Path)
|
|
if err != nil {
|
|
return ErrLoadDataInvalidURI.GenWithStackByArgs(err.Error())
|
|
}
|
|
var filename string
|
|
u.Path, filename = filepath.Split(u.Path)
|
|
b, err := storage.ParseBackendFromURL(u, nil)
|
|
if err != nil {
|
|
return ErrLoadDataInvalidURI.GenWithStackByArgs(getMsgFromBRError(err))
|
|
}
|
|
if b.GetLocal() != nil {
|
|
return ErrLoadDataFromServerDisk.GenWithStackByArgs(e.loadDataWorker.Path)
|
|
}
|
|
return e.loadFromRemote(ctx, b, filename)
|
|
case ast.FileLocClient:
|
|
// let caller use handleQuerySpecial to read data in this connection
|
|
sctx := e.loadDataWorker.ctx
|
|
val := sctx.Value(LoadDataVarKey)
|
|
if val != nil {
|
|
sctx.SetValue(LoadDataVarKey, nil)
|
|
return errors.New("previous load data option wasn't closed normally")
|
|
}
|
|
sctx.SetValue(LoadDataVarKey, e.loadDataWorker)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *LoadDataExec) loadFromRemote(
|
|
ctx context.Context,
|
|
b *backup.StorageBackend,
|
|
filename string,
|
|
) error {
|
|
opt := &storage.ExternalStorageOptions{}
|
|
if InTest {
|
|
opt.NoCredentials = true
|
|
}
|
|
s, err := storage.New(ctx, b, opt)
|
|
if err != nil {
|
|
return ErrLoadDataCantAccess
|
|
}
|
|
fileReader, err := s.Open(ctx, filename)
|
|
if err != nil {
|
|
return ErrLoadDataCantRead.GenWithStackByArgs(getMsgFromBRError(err), "Please check the INFILE path is correct")
|
|
}
|
|
defer fileReader.Close()
|
|
|
|
e.loadDataWorker.loadRemoteInfo = loadRemoteInfo{
|
|
store: s,
|
|
path: filename,
|
|
}
|
|
return e.loadDataWorker.Load(ctx, fileReader)
|
|
}
|
|
|
|
// Close implements the Executor Close interface.
|
|
func (e *LoadDataExec) Close() error {
|
|
if e.runtimeStats != nil && e.loadDataWorker != nil && e.loadDataWorker.stats != nil {
|
|
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.loadDataWorker.stats)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Open implements the Executor Open interface.
|
|
func (e *LoadDataExec) Open(ctx context.Context) error {
|
|
if e.loadDataWorker.insertColumns != nil {
|
|
e.loadDataWorker.initEvalBuffer()
|
|
}
|
|
// Init for runtime stats.
|
|
e.loadDataWorker.collectRuntimeStatsEnabled()
|
|
return nil
|
|
}
|
|
|
|
// commitTask is used for fetching data from data preparing routine into committing routine.
|
|
type commitTask struct {
|
|
cnt uint64
|
|
rows [][]types.Datum
|
|
}
|
|
|
|
type loadRemoteInfo struct {
|
|
store storage.ExternalStorage
|
|
path string
|
|
}
|
|
|
|
// LoadDataWorker does a LOAD DATA job.
|
|
type LoadDataWorker struct {
|
|
*InsertValues
|
|
|
|
Path string
|
|
Ctx sessionctx.Context
|
|
// expose some fields for test
|
|
FieldsInfo *ast.FieldsClause
|
|
LinesInfo *ast.LinesClause
|
|
NullInfo *ast.NullDefinedBy
|
|
IgnoreLines uint64
|
|
|
|
format string
|
|
columnAssignments []*ast.Assignment
|
|
columnsAndUserVars []*ast.ColumnNameOrUserVar
|
|
fieldMappings []*FieldMapping
|
|
onDuplicate ast.OnDuplicateKeyHandlingType
|
|
|
|
// import options
|
|
importMode string
|
|
diskQuota config.ByteSize
|
|
checksum config.PostOpLevel
|
|
addIndex bool
|
|
analyze config.PostOpLevel
|
|
threadCnt int64
|
|
batchSize config.ByteSize
|
|
maxWriteSpeed config.ByteSize // per second
|
|
splitFile bool
|
|
maxRecordedErrors int64 // -1 means record all error
|
|
detached bool
|
|
|
|
table table.Table
|
|
row []types.Datum
|
|
rows [][]types.Datum
|
|
commitTaskQueue chan commitTask
|
|
loadRemoteInfo loadRemoteInfo
|
|
}
|
|
|
|
// NewLoadDataWorker creates a new LoadDataWorker that is ready to work.
|
|
func NewLoadDataWorker(
|
|
sctx sessionctx.Context,
|
|
plan *plannercore.LoadData,
|
|
tbl table.Table,
|
|
) (*LoadDataWorker, error) {
|
|
insertVal := &InsertValues{
|
|
baseExecutor: newBaseExecutor(sctx, nil, plan.ID()),
|
|
Table: tbl,
|
|
Columns: plan.Columns,
|
|
GenExprs: plan.GenCols.Exprs,
|
|
isLoadData: true,
|
|
txnInUse: sync.Mutex{},
|
|
maxRowsInBatch: uint64(sctx.GetSessionVars().DMLBatchSize),
|
|
}
|
|
loadDataWorker := &LoadDataWorker{
|
|
row: make([]types.Datum, 0, len(insertVal.insertColumns)),
|
|
commitTaskQueue: make(chan commitTask, taskQueueSize),
|
|
InsertValues: insertVal,
|
|
Path: plan.Path,
|
|
format: plan.Format,
|
|
table: tbl,
|
|
FieldsInfo: plan.FieldsInfo,
|
|
LinesInfo: plan.LinesInfo,
|
|
NullInfo: plan.NullInfo,
|
|
IgnoreLines: plan.IgnoreLines,
|
|
columnAssignments: plan.ColumnAssignments,
|
|
columnsAndUserVars: plan.ColumnsAndUserVars,
|
|
onDuplicate: plan.OnDuplicate,
|
|
Ctx: sctx,
|
|
}
|
|
if err := loadDataWorker.initOptions(plan.Options); err != nil {
|
|
return nil, err
|
|
}
|
|
columnNames := loadDataWorker.initFieldMappings()
|
|
if err := loadDataWorker.initLoadColumns(columnNames); err != nil {
|
|
return nil, err
|
|
}
|
|
loadDataWorker.ResetBatch()
|
|
return loadDataWorker, nil
|
|
}
|
|
|
|
func (e *LoadDataWorker) initDefaultOptions() {
|
|
threadCnt := runtime.NumCPU()
|
|
if e.format == LoadDataFormatParquet {
|
|
threadCnt = int(math.Max(1, float64(threadCnt)*0.75))
|
|
}
|
|
|
|
e.importMode = logicalImportMode
|
|
_ = e.diskQuota.UnmarshalText([]byte("50GiB")) // todo confirm with pm
|
|
e.checksum = config.OpLevelRequired
|
|
e.addIndex = true
|
|
e.analyze = config.OpLevelOptional
|
|
e.threadCnt = int64(threadCnt)
|
|
_ = e.batchSize.UnmarshalText([]byte("100MiB")) // todo confirm with pm
|
|
e.maxWriteSpeed = unlimitedWriteSpeed
|
|
e.splitFile = false
|
|
e.maxRecordedErrors = 100
|
|
e.detached = false
|
|
}
|
|
|
|
func (e *LoadDataWorker) initOptions(options []*plannercore.LoadDataOpt) error {
|
|
e.initDefaultOptions()
|
|
|
|
specifiedOptions := map[string]*plannercore.LoadDataOpt{}
|
|
for _, opt := range options {
|
|
hasValue, ok := supportedOptions[opt.Name]
|
|
if !ok {
|
|
return ErrUnknownOption.FastGenByArgs(opt.Name)
|
|
}
|
|
if hasValue && opt.Value == nil || !hasValue && opt.Value != nil {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
if _, ok = specifiedOptions[opt.Name]; ok {
|
|
return ErrDuplicateOption.FastGenByArgs(opt.Name)
|
|
}
|
|
specifiedOptions[opt.Name] = opt
|
|
}
|
|
|
|
var (
|
|
v string
|
|
err error
|
|
isNull bool
|
|
)
|
|
if opt, ok := specifiedOptions[importModeOption]; ok {
|
|
v, isNull, err = opt.Value.EvalString(e.Ctx, chunk.Row{})
|
|
if err != nil || isNull {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
v = strings.ToLower(v)
|
|
if v != logicalImportMode && v != physicalImportMode {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
e.importMode = v
|
|
}
|
|
|
|
if e.importMode == logicalImportMode {
|
|
// some options are only allowed in physical mode
|
|
for _, opt := range specifiedOptions {
|
|
if _, ok := optionsForPhysicalImport[opt.Name]; ok {
|
|
return ErrLoadDataUnsupportedOption.FastGenByArgs(opt.Name, e.importMode)
|
|
}
|
|
}
|
|
}
|
|
if opt, ok := specifiedOptions[diskQuotaOption]; ok {
|
|
v, isNull, err = opt.Value.EvalString(e.Ctx, chunk.Row{})
|
|
if err != nil || isNull {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
if err = e.diskQuota.UnmarshalText([]byte(v)); err != nil || e.diskQuota <= 0 {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
}
|
|
if opt, ok := specifiedOptions[checksumOption]; ok {
|
|
v, isNull, err = opt.Value.EvalString(e.Ctx, chunk.Row{})
|
|
if err != nil || isNull {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
if err = e.checksum.FromStringValue(v); err != nil {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
}
|
|
if opt, ok := specifiedOptions[addIndexOption]; ok {
|
|
var vInt int64
|
|
if !mysql.HasIsBooleanFlag(opt.Value.GetType().GetFlag()) {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
vInt, isNull, err = opt.Value.EvalInt(e.Ctx, chunk.Row{})
|
|
if err != nil || isNull {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
e.addIndex = vInt == 1
|
|
}
|
|
if opt, ok := specifiedOptions[analyzeOption]; ok {
|
|
v, isNull, err = opt.Value.EvalString(e.Ctx, chunk.Row{})
|
|
if err != nil || isNull {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
if err = e.analyze.FromStringValue(v); err != nil {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
}
|
|
if opt, ok := specifiedOptions[threadOption]; ok {
|
|
// boolean true will be taken as 1
|
|
e.threadCnt, isNull, err = opt.Value.EvalInt(e.Ctx, chunk.Row{})
|
|
if err != nil || isNull || e.threadCnt <= 0 {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
}
|
|
if opt, ok := specifiedOptions[batchSizeOption]; ok {
|
|
v, isNull, err = opt.Value.EvalString(e.Ctx, chunk.Row{})
|
|
if err != nil || isNull {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
if err = e.batchSize.UnmarshalText([]byte(v)); err != nil || e.batchSize <= 0 {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
}
|
|
if opt, ok := specifiedOptions[maxWriteSpeedOption]; ok {
|
|
v, isNull, err = opt.Value.EvalString(e.Ctx, chunk.Row{})
|
|
if err != nil || isNull {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
if err = e.maxWriteSpeed.UnmarshalText([]byte(v)); err != nil || e.maxWriteSpeed <= 0 {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
}
|
|
if opt, ok := specifiedOptions[splitFileOption]; ok {
|
|
if !mysql.HasIsBooleanFlag(opt.Value.GetType().GetFlag()) {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
var vInt int64
|
|
vInt, isNull, err = opt.Value.EvalInt(e.Ctx, chunk.Row{})
|
|
if err != nil || isNull {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
e.splitFile = vInt == 1
|
|
}
|
|
if opt, ok := specifiedOptions[recordErrorsOption]; ok {
|
|
e.maxRecordedErrors, isNull, err = opt.Value.EvalInt(e.Ctx, chunk.Row{})
|
|
if err != nil || isNull || e.maxRecordedErrors < -1 {
|
|
return ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
// todo: set a max value for this param?
|
|
}
|
|
if _, ok := specifiedOptions[detachedOption]; ok {
|
|
e.detached = true
|
|
}
|
|
|
|
e.adjustOptions()
|
|
return nil
|
|
}
|
|
|
|
func (e *LoadDataWorker) adjustOptions() {
|
|
if e.diskQuota < minDiskQuota {
|
|
e.diskQuota = minDiskQuota
|
|
}
|
|
// max value is cpu-count
|
|
numCPU := int64(runtime.NumCPU())
|
|
if e.threadCnt > numCPU {
|
|
e.threadCnt = numCPU
|
|
}
|
|
if e.batchSize < minBatchSize {
|
|
e.batchSize = minBatchSize
|
|
}
|
|
if e.maxWriteSpeed < minWriteSpeed {
|
|
e.maxWriteSpeed = minWriteSpeed
|
|
}
|
|
}
|
|
|
|
// FieldMapping indicates the relationship between input field and table column or user variable
|
|
type FieldMapping struct {
|
|
Column *table.Column
|
|
UserVar *ast.VariableExpr
|
|
}
|
|
|
|
// initFieldMappings make a field mapping slice to implicitly map input field to table column or user defined variable
|
|
// the slice's order is the same as the order of the input fields.
|
|
// Returns a slice of same ordered column names without user defined variable names.
|
|
func (e *LoadDataWorker) initFieldMappings() []string {
|
|
columns := make([]string, 0, len(e.columnsAndUserVars)+len(e.columnAssignments))
|
|
tableCols := e.table.Cols()
|
|
|
|
if len(e.columnsAndUserVars) == 0 {
|
|
for _, v := range tableCols {
|
|
fieldMapping := &FieldMapping{
|
|
Column: v,
|
|
}
|
|
e.fieldMappings = append(e.fieldMappings, fieldMapping)
|
|
columns = append(columns, v.Name.O)
|
|
}
|
|
|
|
return columns
|
|
}
|
|
|
|
var column *table.Column
|
|
|
|
for _, v := range e.columnsAndUserVars {
|
|
if v.ColumnName != nil {
|
|
column = table.FindCol(tableCols, v.ColumnName.Name.O)
|
|
columns = append(columns, v.ColumnName.Name.O)
|
|
} else {
|
|
column = nil
|
|
}
|
|
|
|
fieldMapping := &FieldMapping{
|
|
Column: column,
|
|
UserVar: v.UserVar,
|
|
}
|
|
e.fieldMappings = append(e.fieldMappings, fieldMapping)
|
|
}
|
|
|
|
return columns
|
|
}
|
|
|
|
// initLoadColumns sets columns which the input fields loaded to.
|
|
func (e *LoadDataWorker) initLoadColumns(columnNames []string) error {
|
|
var cols []*table.Column
|
|
var missingColName string
|
|
var err error
|
|
tableCols := e.table.Cols()
|
|
|
|
if len(columnNames) != len(tableCols) {
|
|
for _, v := range e.columnAssignments {
|
|
columnNames = append(columnNames, v.Column.Name.O)
|
|
}
|
|
}
|
|
|
|
cols, missingColName = table.FindCols(tableCols, columnNames, e.table.Meta().PKIsHandle)
|
|
if missingColName != "" {
|
|
return dbterror.ErrBadField.GenWithStackByArgs(missingColName, "field list")
|
|
}
|
|
|
|
for _, col := range cols {
|
|
if !col.IsGenerated() {
|
|
e.insertColumns = append(e.insertColumns, col)
|
|
}
|
|
if col.Name.L == model.ExtraHandleName.L {
|
|
if !e.ctx.GetSessionVars().AllowWriteRowID {
|
|
return errors.Errorf("load data statement for _tidb_rowid are not supported")
|
|
}
|
|
e.hasExtraHandle = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// e.insertColumns is appended according to the original tables' column sequence.
|
|
// We have to reorder it to follow the use-specified column order which is shown in the columnNames.
|
|
if err = e.reorderColumns(columnNames); err != nil {
|
|
return err
|
|
}
|
|
|
|
e.rowLen = len(e.insertColumns)
|
|
// Check column whether is specified only once.
|
|
err = table.CheckOnce(cols)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// reorderColumns reorder the e.insertColumns according to the order of columnNames
|
|
// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name.
|
|
func (e *LoadDataWorker) reorderColumns(columnNames []string) error {
|
|
cols := e.insertColumns
|
|
|
|
if len(cols) != len(columnNames) {
|
|
return ErrColumnsNotMatched
|
|
}
|
|
|
|
reorderedColumns := make([]*table.Column, len(cols))
|
|
|
|
if columnNames == nil {
|
|
return nil
|
|
}
|
|
|
|
mapping := make(map[string]int)
|
|
for idx, colName := range columnNames {
|
|
mapping[strings.ToLower(colName)] = idx
|
|
}
|
|
|
|
for _, col := range cols {
|
|
idx := mapping[col.Name.L]
|
|
reorderedColumns[idx] = col
|
|
}
|
|
|
|
e.insertColumns = reorderedColumns
|
|
|
|
return nil
|
|
}
|
|
|
|
// Load reads from readerFn and do load data job.
|
|
func (e *LoadDataWorker) Load(ctx context.Context, reader io.ReadSeekCloser) error {
|
|
var (
|
|
parser mydump.Parser
|
|
err error
|
|
)
|
|
|
|
switch strings.ToLower(e.format) {
|
|
case "":
|
|
// CSV-like
|
|
parser, err = mydump.NewCSVParser(
|
|
ctx,
|
|
e.GenerateCSVConfig(),
|
|
reader,
|
|
int64(config.ReadBlockSize),
|
|
nil,
|
|
false,
|
|
// TODO: support charset conversion
|
|
nil)
|
|
case LoadDataFormatSQLDump:
|
|
parser = mydump.NewChunkParser(
|
|
ctx,
|
|
e.Ctx.GetSessionVars().SQLMode,
|
|
reader,
|
|
int64(config.ReadBlockSize),
|
|
nil,
|
|
)
|
|
case LoadDataFormatParquet:
|
|
if e.loadRemoteInfo.store == nil {
|
|
return ErrLoadParquetFromLocal
|
|
}
|
|
parser, err = mydump.NewParquetParser(
|
|
ctx,
|
|
e.loadRemoteInfo.store,
|
|
reader,
|
|
e.loadRemoteInfo.path,
|
|
)
|
|
default:
|
|
return ErrLoadDataUnsupportedFormat.GenWithStackByArgs(e.format)
|
|
}
|
|
if err != nil {
|
|
return ErrLoadDataWrongFormatConfig.GenWithStack(err.Error())
|
|
}
|
|
parser.SetLogger(log.Logger{Logger: logutil.Logger(ctx)})
|
|
|
|
err = sessiontxn.NewTxn(ctx, e.Ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
group, groupCtx := errgroup.WithContext(ctx)
|
|
|
|
// processStream process input data, enqueue commit task
|
|
group.Go(func() error {
|
|
return e.processStream(groupCtx, parser)
|
|
})
|
|
group.Go(func() error {
|
|
return e.commitWork(groupCtx)
|
|
})
|
|
|
|
err = group.Wait()
|
|
e.SetMessage()
|
|
return err
|
|
}
|
|
|
|
// processStream process input stream from network
|
|
func (e *LoadDataWorker) processStream(
|
|
ctx context.Context,
|
|
parser mydump.Parser,
|
|
) (err error) {
|
|
defer func() {
|
|
r := recover()
|
|
if r != nil {
|
|
logutil.Logger(ctx).Error("process routine panicked",
|
|
zap.Reflect("r", r),
|
|
zap.Stack("stack"))
|
|
err = errors.Errorf("%v", r)
|
|
}
|
|
}()
|
|
|
|
checkKilled := time.NewTicker(30 * time.Second)
|
|
defer checkKilled.Stop()
|
|
|
|
for {
|
|
// prepare batch and enqueue task
|
|
if err = e.ReadRows(ctx, parser); err != nil {
|
|
return
|
|
}
|
|
if e.curBatchCnt == 0 {
|
|
close(e.commitTaskQueue)
|
|
return
|
|
}
|
|
|
|
TrySendTask:
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-checkKilled.C:
|
|
if atomic.CompareAndSwapUint32(&e.Ctx.GetSessionVars().Killed, 1, 0) {
|
|
logutil.Logger(ctx).Info("load data query interrupted quit data processing")
|
|
close(e.commitTaskQueue)
|
|
return ErrQueryInterrupted
|
|
}
|
|
goto TrySendTask
|
|
case e.commitTaskQueue <- commitTask{e.curBatchCnt, e.rows}:
|
|
}
|
|
// reset rows buffer, will reallocate buffer but NOT reuse
|
|
e.ResetBatch()
|
|
}
|
|
}
|
|
|
|
// ResetBatch reset the inner batch.
|
|
func (e *LoadDataWorker) ResetBatch() {
|
|
e.rows = make([][]types.Datum, 0, e.maxRowsInBatch)
|
|
e.curBatchCnt = 0
|
|
}
|
|
|
|
// commitWork commit batch sequentially
|
|
func (e *LoadDataWorker) commitWork(ctx context.Context) (err error) {
|
|
defer func() {
|
|
r := recover()
|
|
if r != nil {
|
|
logutil.Logger(ctx).Error("commitWork panicked",
|
|
zap.Reflect("r", r),
|
|
zap.Stack("stack"))
|
|
err = errors.Errorf("%v", r)
|
|
}
|
|
}()
|
|
|
|
var tasks uint64
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case task, ok := <-e.commitTaskQueue:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
start := time.Now()
|
|
if err = e.commitOneTask(ctx, task); err != nil {
|
|
return err
|
|
}
|
|
tasks++
|
|
logutil.Logger(ctx).Info("commit one task success",
|
|
zap.Duration("commit time usage", time.Since(start)),
|
|
zap.Uint64("keys processed", task.cnt),
|
|
zap.Uint64("tasks processed", tasks),
|
|
zap.Int("tasks in queue", len(e.commitTaskQueue)))
|
|
}
|
|
}
|
|
}
|
|
|
|
// commitOneTask insert Data from LoadDataWorker.rows, then make commit and refresh txn
|
|
func (e *LoadDataWorker) commitOneTask(ctx context.Context, task commitTask) error {
|
|
var err error
|
|
defer func() {
|
|
if err != nil {
|
|
e.Ctx.StmtRollback(ctx, false)
|
|
}
|
|
}()
|
|
err = e.CheckAndInsertOneBatch(ctx, task.rows, task.cnt)
|
|
if err != nil {
|
|
logutil.Logger(ctx).Error("commit error CheckAndInsert", zap.Error(err))
|
|
return err
|
|
}
|
|
failpoint.Inject("commitOneTaskErr", func() error {
|
|
return errors.New("mock commit one task error")
|
|
})
|
|
e.Ctx.StmtCommit(ctx)
|
|
// Make sure process stream routine never use invalid txn
|
|
e.txnInUse.Lock()
|
|
defer e.txnInUse.Unlock()
|
|
// Make sure that there are no retries when committing.
|
|
if err = e.Ctx.RefreshTxnCtx(ctx); err != nil {
|
|
logutil.Logger(ctx).Error("commit error refresh", zap.Error(err))
|
|
return err
|
|
}
|
|
return err
|
|
}
|
|
|
|
// CheckAndInsertOneBatch is used to commit one transaction batch fulfilled data
|
|
func (e *LoadDataWorker) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error {
|
|
if e.stats != nil && e.stats.BasicRuntimeStats != nil {
|
|
// Since this method will not call by executor Next,
|
|
// so we need record the basic executor runtime stats by ourselves.
|
|
start := time.Now()
|
|
defer func() {
|
|
e.stats.BasicRuntimeStats.Record(time.Since(start), 0)
|
|
}()
|
|
}
|
|
var err error
|
|
if cnt == 0 {
|
|
return err
|
|
}
|
|
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt)
|
|
|
|
replace := false
|
|
if e.onDuplicate == ast.OnDuplicateKeyHandlingReplace {
|
|
replace = true
|
|
}
|
|
|
|
err = e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD, replace)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (e *LoadDataWorker) addRecordLD(ctx context.Context, row []types.Datum) error {
|
|
if row == nil {
|
|
return nil
|
|
}
|
|
err := e.addRecord(ctx, row)
|
|
if err != nil {
|
|
e.handleWarning(err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReadRows reads rows from parser. When parser's reader meet EOF, it will return
|
|
// nil. For other errors it will return directly. When the rows batch is full it
|
|
// will also return nil.
|
|
// The result rows are saved in e.rows and update some members, caller can check
|
|
// if curBatchCnt == 0 to know if reached EOF.
|
|
func (e *LoadDataWorker) ReadRows(ctx context.Context, parser mydump.Parser) error {
|
|
ignoreOneLineFn := parser.ReadRow
|
|
if csvParser, ok := parser.(*mydump.CSVParser); ok {
|
|
ignoreOneLineFn = func() error {
|
|
_, _, err := csvParser.ReadUntilTerminator()
|
|
return err
|
|
}
|
|
}
|
|
|
|
for e.IgnoreLines > 0 {
|
|
err := ignoreOneLineFn()
|
|
if err != nil {
|
|
if errors.Cause(err) == io.EOF {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
e.IgnoreLines--
|
|
}
|
|
for {
|
|
if err := parser.ReadRow(); err != nil {
|
|
if errors.Cause(err) == io.EOF {
|
|
return nil
|
|
}
|
|
return ErrLoadDataCantRead.GenWithStackByArgs(
|
|
err.Error(),
|
|
"Only the following formats delimited text file (csv, tsv), parquet, sql are supported. Please provide the valid source file(s)",
|
|
)
|
|
}
|
|
// rowCount will be used in fillRow(), last insert ID will be assigned according to the rowCount = 1.
|
|
// So should add first here.
|
|
e.rowCount++
|
|
r, err := e.parserData2TableData(ctx, parser.LastRow().Row)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.rows = append(e.rows, r)
|
|
e.curBatchCnt++
|
|
if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 {
|
|
logutil.Logger(ctx).Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
|
|
zap.Uint64("totalRows", e.rowCount))
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// parserData2TableData encodes the data of parser output.
|
|
func (e *LoadDataWorker) parserData2TableData(
|
|
ctx context.Context,
|
|
parserData []types.Datum,
|
|
) ([]types.Datum, error) {
|
|
// Data interpretation is restrictive if the SQL mode is restrictive and neither
|
|
// the IGNORE nor the LOCAL modifier is specified. Errors terminate the load
|
|
// operation.
|
|
// ref https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-column-assignments
|
|
restrictive := e.Ctx.GetSessionVars().SQLMode.HasStrictMode() &&
|
|
e.onDuplicate != ast.OnDuplicateKeyHandlingIgnore
|
|
|
|
var errColNumMismatch error
|
|
switch {
|
|
case len(parserData) < len(e.fieldMappings):
|
|
errColNumMismatch = ErrWarnTooFewRecords.GenWithStackByArgs(e.rowCount)
|
|
case len(parserData) > len(e.fieldMappings):
|
|
errColNumMismatch = ErrWarnTooManyRecords.GenWithStackByArgs(e.rowCount)
|
|
}
|
|
|
|
if errColNumMismatch != nil {
|
|
if restrictive {
|
|
return nil, errColNumMismatch
|
|
}
|
|
e.handleWarning(errColNumMismatch)
|
|
}
|
|
|
|
row := make([]types.Datum, 0, len(e.insertColumns))
|
|
sessionVars := e.Ctx.GetSessionVars()
|
|
setVar := func(name string, col *types.Datum) {
|
|
// User variable names are not case-sensitive
|
|
// https://dev.mysql.com/doc/refman/8.0/en/user-variables.html
|
|
name = strings.ToLower(name)
|
|
if col == nil || col.IsNull() {
|
|
sessionVars.UnsetUserVar(name)
|
|
} else {
|
|
sessionVars.SetUserVarVal(name, *col)
|
|
}
|
|
}
|
|
|
|
for i := 0; i < len(e.fieldMappings); i++ {
|
|
if i >= len(parserData) {
|
|
if e.fieldMappings[i].Column == nil {
|
|
setVar(e.fieldMappings[i].UserVar.Name, nil)
|
|
continue
|
|
}
|
|
|
|
// If some columns is missing and their type is time and has not null flag, they should be set as current time.
|
|
if types.IsTypeTime(e.fieldMappings[i].Column.GetType()) && mysql.HasNotNullFlag(e.fieldMappings[i].Column.GetFlag()) {
|
|
row = append(row, types.NewTimeDatum(types.CurrentTime(e.fieldMappings[i].Column.GetType())))
|
|
continue
|
|
}
|
|
|
|
row = append(row, types.NewDatum(nil))
|
|
continue
|
|
}
|
|
|
|
if e.fieldMappings[i].Column == nil {
|
|
setVar(e.fieldMappings[i].UserVar.Name, &parserData[i])
|
|
continue
|
|
}
|
|
|
|
row = append(row, parserData[i])
|
|
}
|
|
for i := 0; i < len(e.columnAssignments); i++ {
|
|
// eval expression of `SET` clause
|
|
d, err := expression.EvalAstExpr(e.Ctx, e.columnAssignments[i].Expr)
|
|
if err != nil {
|
|
if restrictive {
|
|
return nil, err
|
|
}
|
|
e.handleWarning(err)
|
|
return nil, nil
|
|
}
|
|
row = append(row, d)
|
|
}
|
|
|
|
// a new row buffer will be allocated in getRow
|
|
newRow, err := e.getRow(ctx, row)
|
|
if err != nil {
|
|
if restrictive {
|
|
return nil, err
|
|
}
|
|
e.handleWarning(err)
|
|
return nil, nil
|
|
}
|
|
|
|
return newRow, nil
|
|
}
|
|
|
|
// SetMessage sets info message(ERR_LOAD_INFO) generated by LOAD statement, it is public because of the special way that
|
|
// LOAD statement is handled.
|
|
func (e *LoadDataWorker) SetMessage() {
|
|
stmtCtx := e.ctx.GetSessionVars().StmtCtx
|
|
numRecords := stmtCtx.RecordRows()
|
|
numDeletes := stmtCtx.DeletedRows()
|
|
numSkipped := numRecords - stmtCtx.CopiedRows()
|
|
numWarnings := stmtCtx.WarningCount()
|
|
msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings)
|
|
e.ctx.GetSessionVars().StmtCtx.SetMessage(msg)
|
|
}
|
|
|
|
// GenerateCSVConfig generates a CSV config for parser from LoadDataWorker.
|
|
func (e *LoadDataWorker) GenerateCSVConfig() *config.CSVConfig {
|
|
var (
|
|
nullDef []string
|
|
quotedNullIsText = true
|
|
)
|
|
|
|
if e.NullInfo != nil {
|
|
nullDef = append(nullDef, e.NullInfo.NullDef)
|
|
quotedNullIsText = !e.NullInfo.OptEnclosed
|
|
} else if e.FieldsInfo.Enclosed != nil {
|
|
nullDef = append(nullDef, "NULL")
|
|
}
|
|
if e.FieldsInfo.Escaped != nil {
|
|
nullDef = append(nullDef, string([]byte{*e.FieldsInfo.Escaped, 'N'}))
|
|
}
|
|
|
|
enclosed := ""
|
|
if e.FieldsInfo.Enclosed != nil {
|
|
enclosed = string([]byte{*e.FieldsInfo.Enclosed})
|
|
}
|
|
escaped := ""
|
|
if e.FieldsInfo.Escaped != nil {
|
|
escaped = string([]byte{*e.FieldsInfo.Escaped})
|
|
}
|
|
|
|
return &config.CSVConfig{
|
|
Separator: e.FieldsInfo.Terminated,
|
|
// ignore optionally enclosed
|
|
Delimiter: enclosed,
|
|
Terminator: e.LinesInfo.Terminated,
|
|
NotNull: false,
|
|
Null: nullDef,
|
|
Header: false,
|
|
TrimLastSep: false,
|
|
EscapedBy: escaped,
|
|
StartingBy: e.LinesInfo.Starting,
|
|
AllowEmptyLine: true,
|
|
QuotedNullIsText: quotedNullIsText,
|
|
UnescapedQuote: true,
|
|
}
|
|
}
|
|
|
|
// GetRows getter for rows
|
|
func (e *LoadDataWorker) GetRows() [][]types.Datum {
|
|
return e.rows
|
|
}
|
|
|
|
// GetCurBatchCnt getter for curBatchCnt
|
|
func (e *LoadDataWorker) GetCurBatchCnt() uint64 {
|
|
return e.curBatchCnt
|
|
}
|
|
|
|
var _ io.ReadSeekCloser = (*SimpleSeekerOnReadCloser)(nil)
|
|
|
|
// SimpleSeekerOnReadCloser provides Seek(0, SeekCurrent) on ReadCloser.
|
|
type SimpleSeekerOnReadCloser struct {
|
|
r io.ReadCloser
|
|
pos int
|
|
}
|
|
|
|
// NewSimpleSeekerOnReadCloser creates a SimpleSeekerOnReadCloser.
|
|
func NewSimpleSeekerOnReadCloser(r io.ReadCloser) *SimpleSeekerOnReadCloser {
|
|
return &SimpleSeekerOnReadCloser{r: r}
|
|
}
|
|
|
|
// Read implements io.Reader.
|
|
func (s *SimpleSeekerOnReadCloser) Read(p []byte) (n int, err error) {
|
|
n, err = s.r.Read(p)
|
|
s.pos += n
|
|
return
|
|
}
|
|
|
|
// Seek implements io.Seeker.
|
|
func (s *SimpleSeekerOnReadCloser) Seek(offset int64, whence int) (int64, error) {
|
|
// only support get reader's current offset
|
|
if offset == 0 && whence == io.SeekCurrent {
|
|
return int64(s.pos), nil
|
|
}
|
|
return 0, errors.Errorf("unsupported seek on SimpleSeekerOnReadCloser, offset: %d whence: %d", offset, whence)
|
|
}
|
|
|
|
// Close implements io.Closer.
|
|
func (s *SimpleSeekerOnReadCloser) Close() error {
|
|
return s.r.Close()
|
|
}
|
|
|
|
// loadDataVarKeyType is a dummy type to avoid naming collision in context.
|
|
type loadDataVarKeyType int
|
|
|
|
// String defines a Stringer function for debugging and pretty printing.
|
|
func (k loadDataVarKeyType) String() string {
|
|
return "load_data_var"
|
|
}
|
|
|
|
// LoadDataVarKey is a variable key for load data.
|
|
const LoadDataVarKey loadDataVarKeyType = 0
|