Files
tidb/executor/importer/import.go

543 lines
17 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importer
import (
"fmt"
"math"
"runtime"
"strings"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/dbterror/exeerrors"
)
const (
// LoadDataFormatDelimitedData delimited data.
LoadDataFormatDelimitedData = "delimited data"
// LoadDataFormatSQLDump represents the data source file of LOAD DATA is mydumper-format DML file.
LoadDataFormatSQLDump = "sql file"
// LoadDataFormatParquet represents the data source file of LOAD DATA is parquet.
LoadDataFormatParquet = "parquet"
// LogicalImportMode represents the import mode is SQL-like.
LogicalImportMode = "logical" // tidb backend
physicalImportMode = "physical" // local backend
unlimitedWriteSpeed = config.ByteSize(math.MaxInt64)
minDiskQuota = config.ByteSize(10 << 30) // 10GiB
minWriteSpeed = config.ByteSize(1 << 10) // 1KiB/s
importModeOption = "import_mode"
diskQuotaOption = "disk_quota"
checksumOption = "checksum_table"
addIndexOption = "add_index"
analyzeOption = "analyze_table"
threadOption = "thread"
batchSizeOption = "batch_size"
maxWriteSpeedOption = "max_write_speed"
splitFileOption = "split_file"
recordErrorsOption = "record_errors"
)
var (
detachedOption = plannercore.DetachedOption
// name -> whether the option has value
supportedOptions = map[string]bool{
importModeOption: true,
diskQuotaOption: true,
checksumOption: true,
addIndexOption: true,
analyzeOption: true,
threadOption: true,
batchSizeOption: true,
maxWriteSpeedOption: true,
splitFileOption: true,
recordErrorsOption: true,
detachedOption: false,
}
// options only allowed when import mode is physical
optionsForPhysicalImport = map[string]struct{}{
diskQuotaOption: {},
checksumOption: {},
addIndexOption: {},
analyzeOption: {},
}
// LoadDataReadBlockSize is exposed for test.
LoadDataReadBlockSize = int64(config.ReadBlockSize)
)
// FieldMapping indicates the relationship between input field and table column or user variable
type FieldMapping struct {
Column *table.Column
UserVar *ast.VariableExpr
}
// LoadDataController load data controller.
// todo: need a better name
type LoadDataController struct {
Path string
Format string
ColumnsAndUserVars []*ast.ColumnNameOrUserVar
ColumnAssignments []*ast.Assignment
OnDuplicate ast.OnDuplicateKeyHandlingType
Table table.Table
SchemaName string
// how input field(or input column) from data file is mapped, either to a column or variable.
// if there's NO column list clause in load data statement, then it's table's columns
// else it's user defined list.
fieldMappings []*FieldMapping
// see InsertValues.insertColumns
// todo: our behavior is different with mysql. such as for table t(a,b)
// - "...(a,a) set a=100" is allowed in mysql, but not in tidb
// - "...(a,b) set b=100" will set b=100 in mysql, but in tidb the set is ignored.
insertColumns []*table.Column
// used for DELIMITED DATA format
FieldNullDef []string
NullValueOptEnclosed bool
plannercore.LineFieldsInfo
IgnoreLines uint64
// import options
importMode string
diskQuota config.ByteSize
checksum config.PostOpLevel
addIndex bool
analyze config.PostOpLevel
threadCnt int64
batchSize int64
maxWriteSpeed config.ByteSize // per second
splitFile bool
maxRecordedErrors int64 // -1 means record all error
Detached bool
}
// NewLoadDataController create new controller.
func NewLoadDataController(sctx sessionctx.Context, plan *plannercore.LoadData, tbl table.Table) (*LoadDataController, error) {
var format string
if plan.Format != nil {
format = strings.ToLower(*plan.Format)
} else {
// without FORMAT 'xxx' clause, default to DELIMITED DATA
format = LoadDataFormatDelimitedData
}
c := &LoadDataController{
Path: plan.Path,
Format: format,
ColumnsAndUserVars: plan.ColumnsAndUserVars,
ColumnAssignments: plan.ColumnAssignments,
OnDuplicate: plan.OnDuplicate,
SchemaName: plan.Table.Schema.O,
Table: tbl,
LineFieldsInfo: plannercore.NewLineFieldsInfo(plan.FieldsInfo, plan.LinesInfo),
}
if err := c.initFieldParams(plan); err != nil {
return nil, err
}
if err := c.initOptions(sctx, plan.Options); err != nil {
return nil, err
}
columnNames := c.initFieldMappings()
if err := c.initLoadColumns(columnNames); err != nil {
return nil, err
}
return c, nil
}
func (e *LoadDataController) initFieldParams(plan *plannercore.LoadData) error {
if e.Path == "" {
return exeerrors.ErrLoadDataEmptyPath
}
if e.Format != LoadDataFormatDelimitedData && e.Format != LoadDataFormatParquet && e.Format != LoadDataFormatSQLDump {
return exeerrors.ErrLoadDataUnsupportedFormat.GenWithStackByArgs(e.Format)
}
if e.Format != LoadDataFormatDelimitedData {
if plan.FieldsInfo != nil || plan.LinesInfo != nil || plan.IgnoreLines != nil {
return exeerrors.ErrLoadDataWrongFormatConfig.GenWithStackByArgs(fmt.Sprintf("cannot specify FIELDS ... or LINES ... or IGNORE N LINES for format '%s'", e.Format))
}
// no need to init those param for sql/parquet
return nil
}
if plan.IgnoreLines != nil {
e.IgnoreLines = *plan.IgnoreLines
}
var (
nullDef []string
nullValueOptEnclosed = false
)
// todo: move null defined into plannercore.LineFieldsInfo
// in load data, there maybe multiple null def, but in SELECT ... INTO OUTFILE there's only one
if plan.FieldsInfo != nil && plan.FieldsInfo.DefinedNullBy != nil {
nullDef = append(nullDef, *plan.FieldsInfo.DefinedNullBy)
nullValueOptEnclosed = plan.FieldsInfo.NullValueOptEnclosed
} else if len(e.FieldsEnclosedBy) != 0 {
nullDef = append(nullDef, "NULL")
}
if len(e.FieldsEscapedBy) != 0 {
nullDef = append(nullDef, string([]byte{e.FieldsEscapedBy[0], 'N'}))
}
e.FieldNullDef = nullDef
e.NullValueOptEnclosed = nullValueOptEnclosed
if nullValueOptEnclosed && len(e.FieldsEnclosedBy) == 0 {
return exeerrors.ErrLoadDataWrongFormatConfig.GenWithStackByArgs("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED")
}
// moved from planerbuilder.buildLoadData
// see https://github.com/pingcap/tidb/issues/33298
if len(e.FieldsTerminatedBy) == 0 {
return exeerrors.ErrLoadDataWrongFormatConfig.GenWithStackByArgs("load data with empty field terminator")
}
// TODO: support lines terminated is "".
if len(e.LinesTerminatedBy) == 0 {
return exeerrors.ErrLoadDataWrongFormatConfig.GenWithStackByArgs("LINES TERMINATED BY is empty")
}
if len(e.FieldsEnclosedBy) > 0 &&
(strings.HasPrefix(e.FieldsEnclosedBy, e.FieldsTerminatedBy) || strings.HasPrefix(e.FieldsTerminatedBy, e.FieldsEnclosedBy)) {
return exeerrors.ErrLoadDataWrongFormatConfig.GenWithStackByArgs("FIELDS ENCLOSED BY and TERMINATED BY must not be prefix of each other")
}
return nil
}
func (e *LoadDataController) initDefaultOptions() {
threadCnt := runtime.NumCPU()
if e.Format == LoadDataFormatParquet {
threadCnt = int(math.Max(1, float64(threadCnt)*0.75))
}
e.importMode = LogicalImportMode
_ = e.diskQuota.UnmarshalText([]byte("50GiB")) // todo confirm with pm
e.checksum = config.OpLevelRequired
e.addIndex = true
e.analyze = config.OpLevelOptional
e.threadCnt = int64(threadCnt)
e.batchSize = 1000
e.maxWriteSpeed = unlimitedWriteSpeed
e.splitFile = false
e.maxRecordedErrors = 100
e.Detached = false
}
func (e *LoadDataController) initOptions(seCtx sessionctx.Context, options []*plannercore.LoadDataOpt) error {
e.initDefaultOptions()
specifiedOptions := map[string]*plannercore.LoadDataOpt{}
for _, opt := range options {
hasValue, ok := supportedOptions[opt.Name]
if !ok {
return exeerrors.ErrUnknownOption.FastGenByArgs(opt.Name)
}
if hasValue && opt.Value == nil || !hasValue && opt.Value != nil {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
if _, ok = specifiedOptions[opt.Name]; ok {
return exeerrors.ErrDuplicateOption.FastGenByArgs(opt.Name)
}
specifiedOptions[opt.Name] = opt
}
var (
v string
err error
isNull bool
)
if opt, ok := specifiedOptions[importModeOption]; ok {
v, isNull, err = opt.Value.EvalString(seCtx, chunk.Row{})
if err != nil || isNull {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
v = strings.ToLower(v)
if v != LogicalImportMode && v != physicalImportMode {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
e.importMode = v
}
if e.importMode == LogicalImportMode {
// some options are only allowed in physical mode
for _, opt := range specifiedOptions {
if _, ok := optionsForPhysicalImport[opt.Name]; ok {
return exeerrors.ErrLoadDataUnsupportedOption.FastGenByArgs(opt.Name, e.importMode)
}
}
}
if opt, ok := specifiedOptions[diskQuotaOption]; ok {
v, isNull, err = opt.Value.EvalString(seCtx, chunk.Row{})
if err != nil || isNull {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
if err = e.diskQuota.UnmarshalText([]byte(v)); err != nil || e.diskQuota <= 0 {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
if opt, ok := specifiedOptions[checksumOption]; ok {
v, isNull, err = opt.Value.EvalString(seCtx, chunk.Row{})
if err != nil || isNull {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
if err = e.checksum.FromStringValue(v); err != nil {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
if opt, ok := specifiedOptions[addIndexOption]; ok {
var vInt int64
if !mysql.HasIsBooleanFlag(opt.Value.GetType().GetFlag()) {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
vInt, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{})
if err != nil || isNull {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
e.addIndex = vInt == 1
}
if opt, ok := specifiedOptions[analyzeOption]; ok {
v, isNull, err = opt.Value.EvalString(seCtx, chunk.Row{})
if err != nil || isNull {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
if err = e.analyze.FromStringValue(v); err != nil {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
if opt, ok := specifiedOptions[threadOption]; ok {
// boolean true will be taken as 1
e.threadCnt, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{})
if err != nil || isNull || e.threadCnt <= 0 {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
if opt, ok := specifiedOptions[batchSizeOption]; ok {
e.batchSize, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{})
if err != nil || isNull || e.batchSize < 0 {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
if opt, ok := specifiedOptions[maxWriteSpeedOption]; ok {
v, isNull, err = opt.Value.EvalString(seCtx, chunk.Row{})
if err != nil || isNull {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
if err = e.maxWriteSpeed.UnmarshalText([]byte(v)); err != nil || e.maxWriteSpeed <= 0 {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
if opt, ok := specifiedOptions[splitFileOption]; ok {
if !mysql.HasIsBooleanFlag(opt.Value.GetType().GetFlag()) {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
var vInt int64
vInt, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{})
if err != nil || isNull {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
e.splitFile = vInt == 1
}
if opt, ok := specifiedOptions[recordErrorsOption]; ok {
e.maxRecordedErrors, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{})
if err != nil || isNull || e.maxRecordedErrors < -1 {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
// todo: set a max value for this param?
}
if _, ok := specifiedOptions[detachedOption]; ok {
e.Detached = true
}
e.adjustOptions()
return nil
}
func (e *LoadDataController) adjustOptions() {
if e.diskQuota < minDiskQuota {
e.diskQuota = minDiskQuota
}
// max value is cpu-count
numCPU := int64(runtime.NumCPU())
if e.threadCnt > numCPU {
e.threadCnt = numCPU
}
if e.maxWriteSpeed < minWriteSpeed {
e.maxWriteSpeed = minWriteSpeed
}
}
// initFieldMappings make a field mapping slice to implicitly map input field to table column or user defined variable
// the slice's order is the same as the order of the input fields.
// Returns a slice of same ordered column names without user defined variable names.
func (e *LoadDataController) initFieldMappings() []string {
columns := make([]string, 0, len(e.ColumnsAndUserVars)+len(e.ColumnAssignments))
tableCols := e.Table.VisibleCols()
if len(e.ColumnsAndUserVars) == 0 {
for _, v := range tableCols {
fieldMapping := &FieldMapping{
Column: v,
}
e.fieldMappings = append(e.fieldMappings, fieldMapping)
columns = append(columns, v.Name.O)
}
return columns
}
var column *table.Column
for _, v := range e.ColumnsAndUserVars {
if v.ColumnName != nil {
column = table.FindCol(tableCols, v.ColumnName.Name.O)
columns = append(columns, v.ColumnName.Name.O)
} else {
column = nil
}
fieldMapping := &FieldMapping{
Column: column,
UserVar: v.UserVar,
}
e.fieldMappings = append(e.fieldMappings, fieldMapping)
}
return columns
}
// initLoadColumns sets columns which the input fields loaded to.
func (e *LoadDataController) initLoadColumns(columnNames []string) error {
var cols []*table.Column
var missingColName string
var err error
tableCols := e.Table.VisibleCols()
if len(columnNames) != len(tableCols) {
for _, v := range e.ColumnAssignments {
columnNames = append(columnNames, v.Column.Name.O)
}
}
cols, missingColName = table.FindCols(tableCols, columnNames, e.Table.Meta().PKIsHandle)
if missingColName != "" {
return dbterror.ErrBadField.GenWithStackByArgs(missingColName, "field list")
}
for _, col := range cols {
if !col.IsGenerated() {
// todo: should report error here, since in reorderColumns we report error if en(cols) != len(columnNames)
e.insertColumns = append(e.insertColumns, col)
}
}
// e.insertColumns is appended according to the original tables' column sequence.
// We have to reorder it to follow the use-specified column order which is shown in the columnNames.
if err = e.reorderColumns(columnNames); err != nil {
return err
}
// Check column whether is specified only once.
err = table.CheckOnce(cols)
if err != nil {
return err
}
return nil
}
// reorderColumns reorder the e.insertColumns according to the order of columnNames
// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name.
func (e *LoadDataController) reorderColumns(columnNames []string) error {
cols := e.insertColumns
if len(cols) != len(columnNames) {
return exeerrors.ErrColumnsNotMatched
}
reorderedColumns := make([]*table.Column, len(cols))
if columnNames == nil {
return nil
}
mapping := make(map[string]int)
for idx, colName := range columnNames {
mapping[strings.ToLower(colName)] = idx
}
for _, col := range cols {
idx := mapping[col.Name.L]
reorderedColumns[idx] = col
}
e.insertColumns = reorderedColumns
return nil
}
// GetInsertColumns get column list need to insert into target table.
// this list include all columns and in the same order as in fieldMappings and ColumnAssignments
func (e *LoadDataController) GetInsertColumns() []*table.Column {
return e.insertColumns
}
// GetFieldMapping get field mapping.
func (e *LoadDataController) GetFieldMapping() []*FieldMapping {
return e.fieldMappings
}
// GetFieldCount get field count.
func (e *LoadDataController) GetFieldCount() int {
return len(e.fieldMappings)
}
// GetBatchSize get batch size.
func (e *LoadDataController) GetBatchSize() int64 {
return e.batchSize
}
// GenerateCSVConfig generates a CSV config for parser from LoadDataWorker.
func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig {
return &config.CSVConfig{
Separator: e.FieldsTerminatedBy,
// ignore optionally enclosed
Delimiter: e.FieldsEnclosedBy,
Terminator: e.LinesTerminatedBy,
NotNull: false,
Null: e.FieldNullDef,
Header: false,
TrimLastSep: false,
EscapedBy: e.FieldsEscapedBy,
StartingBy: e.LinesStartingBy,
AllowEmptyLine: true,
QuotedNullIsText: !e.NullValueOptEnclosed,
UnescapedQuote: true,
}
}